diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-29 08:34:48 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-29 08:34:48 +0000 |
| commit | c42ba3a379c099410ad0f6079457b7520d38dc3b (patch) | |
| tree | cae0262421630df9be266b5b0d0371e9ce89a201 | |
| parent | 7511ecd0b5dd9ecbc350daed964c19f7b7663744 (diff) | |
| parent | 6e869f0f778997a5be571423d02150e4fcdb94dd (diff) | |
| download | rabbitmq-server-git-c42ba3a379c099410ad0f6079457b7520d38dc3b.tar.gz | |
merge from bug23612
bug23612 removes most of the confirms from amqqueue_process.
| -rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 41 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 93 |
5 files changed, 77 insertions, 84 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index f67c6f46d1..6fa34ccc89 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -58,7 +58,7 @@ (fun ((rabbit_types:message_properties()) -> boolean()), state()) -> state()). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). --spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). +-spec(ack/2 :: ([ack()], state()) -> state()). -spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(), rabbit_types:message_properties(), state()) -> state()). -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 981dd31daa..e6e3989fc4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -374,12 +374,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), - {State2, ChAckTags1} = + ChAckTags1 = case AckRequired of - true -> {State1, - sets:add_element(AckTag, ChAckTags)}; - false -> {confirm_message(Message, State1), - ChAckTags} + true -> sets:add_element(AckTag, ChAckTags); + false -> ChAckTags end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, @@ -396,10 +394,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - State3 = State2#q{ + State2 = State1#q{ active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers}, - deliver_msgs_to_consumers(Funs, FunAcc1, State3); + deliver_msgs_to_consumers(Funs, FunAcc1, State2); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> true = maybe_store_ch_record(C#cr{is_limit_active = true}), @@ -432,15 +430,11 @@ confirm_messages(Guids, State) -> confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) -> case dict:find(Guid, GTC) of - {ok, {_ , undefined}} -> ok; {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo); _ -> ok end, State#q{guid_to_channel = dict:erase(Guid, GTC)}. -confirm_message(#basic_message{guid = Guid}, State) -> - confirm_message_by_guid(Guid, State). - record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> State; record_confirm_message(#delivery{sender = ChPid, @@ -455,11 +449,6 @@ record_confirm_message(#delivery{sender = ChPid, record_confirm_message(_Delivery, State) -> State. -ack_by_acktags(AckTags, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {AckdGuids, BQS1} = BQ:ack(AckTags, BQS), - confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}). - run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3}, @@ -823,7 +812,7 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State) -> reply(consumers(State), State); -handle_call({deliver_immediately, Delivery = #delivery{message = Message}}, +handle_call({deliver_immediately, Delivery}, _From, State) -> %% Synchronous, "immediate" delivery mode %% @@ -840,10 +829,7 @@ handle_call({deliver_immediately, Delivery = #delivery{message = Message}}, %% {Delivered, State1} = attempt_delivery(Delivery, record_confirm_message(Delivery, State)), - reply(Delivered, case Delivered of - true -> State1; - false -> confirm_message(Message, State1) - end); + reply(Delivered, State1); handle_call({deliver, Delivery}, _From, State) -> %% Synchronous, "mandatory" delivery mode @@ -881,7 +867,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, sets:add_element(AckTag, ChAckTags)}), State2; - false -> confirm_message(Message, State2) + false -> State2 end, Msg = {QName, self(), AckTag, IsDelivered, Message}, reply({ok, Remaining, Msg}, State3) @@ -1019,8 +1005,8 @@ handle_cast({ack, Txn, AckTags, ChPid}, case Txn of none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), NewC = C#cr{acktags = ChAckTags1}, - NewState = ack_by_acktags(AckTags, State), - {NewC, NewState}; + BQS1 = BQ:ack(AckTags, BQS), + {NewC, State#q{backing_queue_state = BQS1}}; _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), {C#cr{txn = Txn}, State#q{backing_queue_state = BQS1}} @@ -1029,7 +1015,9 @@ handle_cast({ack, Txn, AckTags, ChPid}, noreply(State1) end; -handle_cast({reject, AckTags, Requeue, ChPid}, State) -> +handle_cast({reject, AckTags, Requeue, ChPid}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> case lookup_ch(ChPid) of not_found -> noreply(State); @@ -1038,7 +1026,8 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> ack_by_acktags(AckTags, State) + false -> BQS1 = BQ:ack(AckTags, BQS), + State#q{backing_queue_state = BQS1} end) end; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 2e1834c796..c50be9c938 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -738,7 +738,8 @@ handle_cast({remove, CRef, Guids}, State) -> State1 = lists:foldl( fun (Guid, State2) -> remove_message(Guid, State2) end, State, Guids), - State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1), + State2 = client_confirm(CRef, gb_sets:from_list(Guids), + false, State1), noreply(maybe_compact(State2)); handle_cast({release, Guids}, State = @@ -875,7 +876,8 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, true -> file_handle_cache:sync(CurHdl) end, lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), - [client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs], + [client_confirm(CRef, Guids, true, State1) + || {CRef, Guids} <- CGs], State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. @@ -1055,11 +1057,11 @@ orddict_store(Key, Val, Dict) -> false = orddict:is_key(Key, Dict), orddict:store(Key, Val, Dict). -client_confirm(CRef, Guids, +client_confirm(CRef, Guids, WaitForIndex, State = #msstate { client_ondisk_callback = CODC, cref_to_guids = CTG }) -> case dict:find(CRef, CODC) of - {ok, Fun} -> Fun(Guids), + {ok, Fun} -> Fun(Guids, WaitForIndex), CTG1 = case dict:find(CRef, CTG) of {ok, Gs} -> Guids1 = gb_sets:difference(Gs, Guids), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index eca748a951..1f320a1071 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1881,7 +1881,7 @@ assert_props(List, PropVals) -> with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), VQ = rabbit_variable_queue:init(test_queue(), true, false, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -1983,7 +1983,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - {_, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), + VQ9 = rabbit_variable_queue:ack(AckTags, VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -1993,7 +1993,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - {_, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), + VQ3 = rabbit_variable_queue:ack([AckTag], VQ2), publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> @@ -2027,7 +2027,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - {_, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2057,7 +2057,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -2074,7 +2074,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2105,7 +2105,7 @@ test_queue_recover() -> rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), VQ1 = rabbit_variable_queue:init(QName, true, true, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), @@ -2167,3 +2167,4 @@ test_configurable_server_properties() -> passed. nop(_) -> ok. +nop(_, _) -> ok. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 565c61e7d0..c30be37cfd 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -412,7 +412,9 @@ stop_msg_store() -> init(QueueName, IsDurable, Recover) -> Self = self(), init(QueueName, IsDurable, Recover, - fun (Guids) -> msgs_written_to_disk(Self, Guids) end, + fun (Guids, WaitForIndex) -> + msgs_written_to_disk(Self, Guids, WaitForIndex) + end, fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end). init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) -> @@ -519,7 +521,9 @@ publish(Msg, MsgProps, State) -> {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). -publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) -> +publish_delivered(false, #basic_message { guid = Guid }, + _MsgProps, State = #vqstate { len = 0 }) -> + blind_confirm(self(), gb_sets:singleton(Guid)), {blank_ack, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, @@ -654,15 +658,9 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { persistent_count = PCount1 })}. ack(AckTags, State) -> - {Guids, State1} = - ack(fun msg_store_remove/3, - fun ({_IsPersistent, Guid, _MsgProps}, State1) -> - remove_confirms(gb_sets:singleton(Guid), State1); - (#msg_status{msg = #basic_message { guid = Guid }}, State1) -> - remove_confirms(gb_sets:singleton(Guid), State1) - end, - AckTags, State), - {Guids, a(State1)}. + a(ack(fun msg_store_remove/3, + fun (_, State0) -> State0 end, + AckTags, State)). tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, State = #vqstate { durable = IsDurable, @@ -712,23 +710,22 @@ tx_commit(Txn, Fun, MsgPropsFun, end)}. requeue(AckTags, MsgPropsFun, State) -> - {_Guids, State1} = - ack(fun msg_store_release/3, - fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> - {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps), - true, false, State1), - State2; - ({IsPersistent, Guid, MsgProps}, State1) -> - #vqstate { msg_store_clients = MSCState } = State1, - {{ok, Msg = #basic_message{}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, Guid), - State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps), - true, true, State2), - State3 - end, - AckTags, State), - a(reduce_memory_use(State1)). + a(reduce_memory_use( + ack(fun msg_store_release/3, + fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> + {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps), + true, false, State1), + State2; + ({IsPersistent, Guid, MsgProps}, State1) -> + #vqstate { msg_store_clients = MSCState } = State1, + {{ok, Msg = #basic_message{}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, Guid), + State2 = State1 #vqstate { msg_store_clients = MSCState1 }, + {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps), + true, true, State2), + State3 + end, + AckTags, State))). len(#vqstate { len = Len }) -> Len. @@ -1160,7 +1157,6 @@ tx_commit_index(State = #vqstate { on_sync = #sync { durable = IsDurable }) -> PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), - {_Guids, NewState} = ack(Acks, State), Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), {Msg, MsgProps} <- lists:reverse(PubsN)], {SeqIds, State1 = #vqstate { index_state = IndexState }} = @@ -1172,7 +1168,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { {SeqId, State3} = publish(Msg, MsgProps, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} - end, {PAcks, NewState}, Pubs), + end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], reduce_memory_use( @@ -1323,7 +1319,7 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {PersistentSeqIds, GuidsByStore, _AllGuids} = + {PersistentSeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, @@ -1342,9 +1338,9 @@ remove_pending_ack(KeepPersistent, end. ack(_MsgStoreFun, _Fun, [], State) -> - {[], State}; + State; ack(MsgStoreFun, Fun, AckTags, State) -> - {{PersistentSeqIds, GuidsByStore, AllGuids}, + {{PersistentSeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, @@ -1364,24 +1360,21 @@ ack(MsgStoreFun, Fun, AckTags, State) -> || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)], PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), - {lists:reverse(AllGuids), - State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) }}. + State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1, + ack_out_counter = AckOutCount + length(AckTags) }. -accumulate_ack_init() -> {[], orddict:new(), []}. +accumulate_ack_init() -> {[], orddict:new()}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, - index_on_disk = false, - guid = Guid }, - {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) -> - {PersistentSeqIdsAcc, GuidsByStore, [Guid | AllGuids]}; + index_on_disk = false }, + {PersistentSeqIdsAcc, GuidsByStore}) -> + {PersistentSeqIdsAcc, GuidsByStore}; accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, - {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) -> + {PersistentSeqIdsAcc, GuidsByStore}) -> {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore), - [Guid | AllGuids]}. + rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}. find_persistent_count(LensByStore) -> case orddict:find(true, LensByStore) of @@ -1403,7 +1396,15 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, msgs_confirmed(GuidSet, State) -> {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}. -msgs_written_to_disk(QPid, GuidSet) -> +blind_confirm(QPid, GuidSet) -> + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + QPid, fun (State) -> + msgs_confirmed(GuidSet, State) + end). + +msgs_written_to_disk(QPid, GuidSet, false) -> + blind_confirm(QPid, GuidSet); +msgs_written_to_disk(QPid, GuidSet, true) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( QPid, fun (State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, |
