summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-29 08:34:48 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-29 08:34:48 +0000
commitc42ba3a379c099410ad0f6079457b7520d38dc3b (patch)
treecae0262421630df9be266b5b0d0371e9ce89a201
parent7511ecd0b5dd9ecbc350daed964c19f7b7663744 (diff)
parent6e869f0f778997a5be571423d02150e4fcdb94dd (diff)
downloadrabbitmq-server-git-c42ba3a379c099410ad0f6079457b7520d38dc3b.tar.gz
merge from bug23612
bug23612 removes most of the confirms from amqqueue_process.
-rw-r--r--include/rabbit_backing_queue_spec.hrl2
-rw-r--r--src/rabbit_amqqueue_process.erl41
-rw-r--r--src/rabbit_msg_store.erl10
-rw-r--r--src/rabbit_tests.erl15
-rw-r--r--src/rabbit_variable_queue.erl93
5 files changed, 77 insertions, 84 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index f67c6f46d1..6fa34ccc89 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -58,7 +58,7 @@
(fun ((rabbit_types:message_properties()) -> boolean()), state())
-> state()).
-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
--spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
+-spec(ack/2 :: ([ack()], state()) -> state()).
-spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(),
rabbit_types:message_properties(), state()) -> state()).
-spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 981dd31daa..e6e3989fc4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -374,12 +374,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
- {State2, ChAckTags1} =
+ ChAckTags1 =
case AckRequired of
- true -> {State1,
- sets:add_element(AckTag, ChAckTags)};
- false -> {confirm_message(Message, State1),
- ChAckTags}
+ true -> sets:add_element(AckTag, ChAckTags);
+ false -> ChAckTags
end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
@@ -396,10 +394,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{ActiveConsumers1,
queue:in(QEntry, BlockedConsumers1)}
end,
- State3 = State2#q{
+ State2 = State1#q{
active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers},
- deliver_msgs_to_consumers(Funs, FunAcc1, State3);
+ deliver_msgs_to_consumers(Funs, FunAcc1, State2);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
true = maybe_store_ch_record(C#cr{is_limit_active = true}),
@@ -432,15 +430,11 @@ confirm_messages(Guids, State) ->
confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) ->
case dict:find(Guid, GTC) of
- {ok, {_ , undefined}} -> ok;
{ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo);
_ -> ok
end,
State#q{guid_to_channel = dict:erase(Guid, GTC)}.
-confirm_message(#basic_message{guid = Guid}, State) ->
- confirm_message_by_guid(Guid, State).
-
record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
State;
record_confirm_message(#delivery{sender = ChPid,
@@ -455,11 +449,6 @@ record_confirm_message(#delivery{sender = ChPid,
record_confirm_message(_Delivery, State) ->
State.
-ack_by_acktags(AckTags, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {AckdGuids, BQS1} = BQ:ack(AckTags, BQS),
- confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}).
-
run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
fun deliver_from_queue_deliver/3},
@@ -823,7 +812,7 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State) ->
reply(consumers(State), State);
-handle_call({deliver_immediately, Delivery = #delivery{message = Message}},
+handle_call({deliver_immediately, Delivery},
_From, State) ->
%% Synchronous, "immediate" delivery mode
%%
@@ -840,10 +829,7 @@ handle_call({deliver_immediately, Delivery = #delivery{message = Message}},
%%
{Delivered, State1} =
attempt_delivery(Delivery, record_confirm_message(Delivery, State)),
- reply(Delivered, case Delivered of
- true -> State1;
- false -> confirm_message(Message, State1)
- end);
+ reply(Delivered, State1);
handle_call({deliver, Delivery}, _From, State) ->
%% Synchronous, "mandatory" delivery mode
@@ -881,7 +867,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
sets:add_element(AckTag,
ChAckTags)}),
State2;
- false -> confirm_message(Message, State2)
+ false -> State2
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
reply({ok, Remaining, Msg}, State3)
@@ -1019,8 +1005,8 @@ handle_cast({ack, Txn, AckTags, ChPid},
case Txn of
none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
NewC = C#cr{acktags = ChAckTags1},
- NewState = ack_by_acktags(AckTags, State),
- {NewC, NewState};
+ BQS1 = BQ:ack(AckTags, BQS),
+ {NewC, State#q{backing_queue_state = BQS1}};
_ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
{C#cr{txn = Txn},
State#q{backing_queue_state = BQS1}}
@@ -1029,7 +1015,9 @@ handle_cast({ack, Txn, AckTags, ChPid},
noreply(State1)
end;
-handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
+handle_cast({reject, AckTags, Requeue, ChPid},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
@@ -1038,7 +1026,8 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> ack_by_acktags(AckTags, State)
+ false -> BQS1 = BQ:ack(AckTags, BQS),
+ State#q{backing_queue_state = BQS1}
end)
end;
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 2e1834c796..c50be9c938 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -738,7 +738,8 @@ handle_cast({remove, CRef, Guids}, State) ->
State1 = lists:foldl(
fun (Guid, State2) -> remove_message(Guid, State2) end,
State, Guids),
- State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1),
+ State2 = client_confirm(CRef, gb_sets:from_list(Guids),
+ false, State1),
noreply(maybe_compact(State2));
handle_cast({release, Guids}, State =
@@ -875,7 +876,8 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
true -> file_handle_cache:sync(CurHdl)
end,
lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
- [client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs],
+ [client_confirm(CRef, Guids, true, State1)
+ || {CRef, Guids} <- CGs],
State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
@@ -1055,11 +1057,11 @@ orddict_store(Key, Val, Dict) ->
false = orddict:is_key(Key, Dict),
orddict:store(Key, Val, Dict).
-client_confirm(CRef, Guids,
+client_confirm(CRef, Guids, WaitForIndex,
State = #msstate { client_ondisk_callback = CODC,
cref_to_guids = CTG }) ->
case dict:find(CRef, CODC) of
- {ok, Fun} -> Fun(Guids),
+ {ok, Fun} -> Fun(Guids, WaitForIndex),
CTG1 = case dict:find(CRef, CTG) of
{ok, Gs} ->
Guids1 = gb_sets:difference(Gs, Guids),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index eca748a951..1f320a1071 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1881,7 +1881,7 @@ assert_props(List, PropVals) ->
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
VQ = rabbit_variable_queue:init(test_queue(), true, false,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -1983,7 +1983,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- {_, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8),
+ VQ9 = rabbit_variable_queue:ack(AckTags, VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -1993,7 +1993,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- {_, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
+ VQ3 = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2027,7 +2027,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- {_, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2057,7 +2057,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -2074,7 +2074,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2105,7 +2105,7 @@ test_queue_recover() ->
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
VQ1 = rabbit_variable_queue:init(QName, true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
@@ -2167,3 +2167,4 @@ test_configurable_server_properties() ->
passed.
nop(_) -> ok.
+nop(_, _) -> ok.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 565c61e7d0..c30be37cfd 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -412,7 +412,9 @@ stop_msg_store() ->
init(QueueName, IsDurable, Recover) ->
Self = self(),
init(QueueName, IsDurable, Recover,
- fun (Guids) -> msgs_written_to_disk(Self, Guids) end,
+ fun (Guids, WaitForIndex) ->
+ msgs_written_to_disk(Self, Guids, WaitForIndex)
+ end,
fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end).
init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
@@ -519,7 +521,9 @@ publish(Msg, MsgProps, State) ->
{_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) ->
+publish_delivered(false, #basic_message { guid = Guid },
+ _MsgProps, State = #vqstate { len = 0 }) ->
+ blind_confirm(self(), gb_sets:singleton(Guid)),
{blank_ack, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
guid = Guid },
@@ -654,15 +658,9 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
persistent_count = PCount1 })}.
ack(AckTags, State) ->
- {Guids, State1} =
- ack(fun msg_store_remove/3,
- fun ({_IsPersistent, Guid, _MsgProps}, State1) ->
- remove_confirms(gb_sets:singleton(Guid), State1);
- (#msg_status{msg = #basic_message { guid = Guid }}, State1) ->
- remove_confirms(gb_sets:singleton(Guid), State1)
- end,
- AckTags, State),
- {Guids, a(State1)}.
+ a(ack(fun msg_store_remove/3,
+ fun (_, State0) -> State0 end,
+ AckTags, State)).
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
State = #vqstate { durable = IsDurable,
@@ -712,23 +710,22 @@ tx_commit(Txn, Fun, MsgPropsFun,
end)}.
requeue(AckTags, MsgPropsFun, State) ->
- {_Guids, State1} =
- ack(fun msg_store_release/3,
- fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
- {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps),
- true, false, State1),
- State2;
- ({IsPersistent, Guid, MsgProps}, State1) ->
- #vqstate { msg_store_clients = MSCState } = State1,
- {{ok, Msg = #basic_message{}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, Guid),
- State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps),
- true, true, State2),
- State3
- end,
- AckTags, State),
- a(reduce_memory_use(State1)).
+ a(reduce_memory_use(
+ ack(fun msg_store_release/3,
+ fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
+ {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps),
+ true, false, State1),
+ State2;
+ ({IsPersistent, Guid, MsgProps}, State1) ->
+ #vqstate { msg_store_clients = MSCState } = State1,
+ {{ok, Msg = #basic_message{}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, Guid),
+ State2 = State1 #vqstate { msg_store_clients = MSCState1 },
+ {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps),
+ true, true, State2),
+ State3
+ end,
+ AckTags, State))).
len(#vqstate { len = Len }) -> Len.
@@ -1160,7 +1157,6 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
durable = IsDurable }) ->
PAcks = lists:append(SPAcks),
Acks = lists:append(SAcks),
- {_Guids, NewState} = ack(Acks, State),
Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
{Msg, MsgProps} <- lists:reverse(PubsN)],
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
@@ -1172,7 +1168,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
{SeqId, State3} =
publish(Msg, MsgProps, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
- end, {PAcks, NewState}, Pubs),
+ end, {PAcks, ack(Acks, State)}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
reduce_memory_use(
@@ -1323,7 +1319,7 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {PersistentSeqIds, GuidsByStore, _AllGuids} =
+ {PersistentSeqIds, GuidsByStore} =
dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
@@ -1342,9 +1338,9 @@ remove_pending_ack(KeepPersistent,
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- {[], State};
+ State;
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{PersistentSeqIds, GuidsByStore, AllGuids},
+ {{PersistentSeqIds, GuidsByStore},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1364,24 +1360,21 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
|| {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
- {lists:reverse(AllGuids),
- State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }}.
+ State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + length(AckTags) }.
-accumulate_ack_init() -> {[], orddict:new(), []}.
+accumulate_ack_init() -> {[], orddict:new()}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
- index_on_disk = false,
- guid = Guid },
- {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
- {PersistentSeqIdsAcc, GuidsByStore, [Guid | AllGuids]};
+ index_on_disk = false },
+ {PersistentSeqIdsAcc, GuidsByStore}) ->
+ {PersistentSeqIdsAcc, GuidsByStore};
accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps},
- {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
+ {PersistentSeqIdsAcc, GuidsByStore}) ->
{cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore),
- [Guid | AllGuids]}.
+ rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}.
find_persistent_count(LensByStore) ->
case orddict:find(true, LensByStore) of
@@ -1403,7 +1396,15 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
msgs_confirmed(GuidSet, State) ->
{gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}.
-msgs_written_to_disk(QPid, GuidSet) ->
+blind_confirm(QPid, GuidSet) ->
+ rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ QPid, fun (State) ->
+ msgs_confirmed(GuidSet, State)
+ end).
+
+msgs_written_to_disk(QPid, GuidSet, false) ->
+ blind_confirm(QPid, GuidSet);
+msgs_written_to_disk(QPid, GuidSet, true) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
QPid, fun (State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,