diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-03 10:36:22 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-03 10:36:22 +0100 |
| commit | f8316aaa103723def72be79b8042b2196cf769e5 (patch) | |
| tree | 60f4b41e236e12557bf19480de0555574d7760fe | |
| parent | 6a258c35498d38b3fcf434129fe6d75adb41cdb1 (diff) | |
| download | rabbitmq-server-git-f8316aaa103723def72be79b8042b2196cf769e5.tar.gz | |
when a consumer ack is received, the publisher ack is sent by the amqqueue_process, rather than the backing_queue
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 42 |
3 files changed, 50 insertions, 38 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3df59de6b1..6eeb397c68 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -402,6 +402,11 @@ deliver_from_queue_deliver(AckRequired, false, {{Message, IsDelivered, AckTag}, 0 == Remaining, State #q { backing_queue_state = BQS1 }}. +confirm_messages_internal(Guids, State) -> + lists:foldl(fun(Guid, State0) -> + confirm_message_internal(Guid, State0) + end, State, Guids). + confirm_message_internal(Guid, State = #q { guid_to_channel = GTC }) -> case dict:find(Guid, GTC) of {ok, {_ , undefined}} -> ok; @@ -547,10 +552,8 @@ qname(#q{q = #amqqueue{name = QName}}) -> QName. maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> case Fun(BQS) of {BQS1, {confirm, Guids}} -> - State1 = lists:foldl(fun (Guid, State0) -> - confirm_message_internal(Guid, State0) end, - State, Guids), - State1 #q { backing_queue_state = BQS1}; + confirm_messages_internal(Guids, + State #q { backing_queue_state = BQS1 }); BQS1 -> run_message_queue(State#q{backing_queue_state = BQS1}) end. @@ -841,14 +844,22 @@ handle_cast({ack, Txn, AckTags, ChPid}, not_found -> noreply(State); C = #cr{acktags = ChAckTags} -> - {C1, BQS1} = + {C1, State1} = case Txn of - none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)}; - _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} + none -> + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + NewC = C#cr{acktags = ChAckTags1}, + {NewBQS, AckdGuids} = BQ:ack(AckTags, BQS), + NewState = + confirm_messages_internal(AckdGuids, + State #q { backing_queue_state = NewBQS }), + {NewC, NewState}; + _ -> + {C#cr{txn = Txn}, + State #q { backing_queue_state = BQ:tx_ack(Txn, AckTags, BQS) }} end, store_ch_record(C1), - noreply(State#q{backing_queue_state = BQS1}) + noreply(State1) end; handle_cast({reject, AckTags, Requeue, ChPid}, @@ -861,8 +872,9 @@ handle_cast({reject, AckTags, Requeue, ChPid}, store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> BQS1 = BQ:ack(AckTags, BQS), - State #q { backing_queue_state = BQS1 } + false -> {BQS1, AckdGuids} = BQ:ack(AckTags, BQS), + confirm_messages_internal(AckdGuids, + State #q { backing_queue_state = BQS1 }) end) end; diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index fcc3d92c7b..c8af7c1000 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1801,8 +1801,7 @@ assert_props(List, PropVals) -> with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_queue(), true, false, - {fun nop/1, fun nop/1, fun nop/1}), + VQ = rabbit_variable_queue:init(test_queue(), true, false), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -1914,8 +1913,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true, - {fun nop/1, fun nop/1, fun nop/1}), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -1931,8 +1929,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true, - {fun nop/1, fun nop/1, fun nop/1}), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -1962,8 +1959,7 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = rabbit_variable_queue:init(QName, true, true, - {fun nop/1, fun nop/1, fun nop/1}), + VQ1 = rabbit_variable_queue:init(QName, true, true), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 448e8c93af..c33859ee59 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -758,6 +758,10 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, %% Minor helpers %%---------------------------------------------------------------------------- +a({State, _} = I) -> + a(State), + I; + a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, persistent_count = PersistentCount, @@ -966,7 +970,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { IsPersistent1 = IsDurable andalso IsPersistent, {SeqId, State3} = publish(Msg, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} - end, {PAcks, ack(Acks, State)}, Pubs), + end, {PAcks, element(1, ack(Acks, State))}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], reduce_memory_use( @@ -1117,7 +1121,7 @@ remove_pending_ack(KeepPersistent, end. ack(_MsgStoreFun, _Fun, [], State) -> - State; + {State, []}; ack(MsgStoreFun, Fun, AckTags, State) -> {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = @@ -1130,15 +1134,19 @@ ack(MsgStoreFun, Fun, AckTags, State) -> end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = orddict:fold(fun (MsgStore, Guids, ok) -> - confirm_messages(Guids), MsgStoreFun(MsgStore, Guids) end, ok, GuidsByStore), + AckdGuids = lists:append(orddict:fold(fun (_, Guids, Acc) -> + [Guids || Acc] + end, [], GuidsByStore)), + State2 = msgs_confirmed(AckdGuids, State1), PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of error -> 0; {ok, Guids} -> length(Guids) end, - State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1 }. + {State2 #vqstate { index_state = IndexState1, + persistent_count = PCount1 }, + AckdGuids}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, @@ -1153,20 +1161,13 @@ accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> %% Internal plumbing for confirms (aka publisher acks) %%---------------------------------------------------------------------------- -confirm_messages(Guids) -> - Self = self(), - spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( - Self, - fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD }) -> - { State #vqstate { - msgs_on_disk = - gb_sets:difference(MOD, gb_sets:from_list(Guids)), - msg_indices_on_disk = - gb_sets:delete_any(MIOD, gb_sets:from_list(Guids)) }, - {confirm, Guids} } - end) - end). +msgs_confirmed(Guids, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD }) -> + State #vqstate { + msgs_on_disk = + gb_sets:difference(MOD, gb_sets:from_list(Guids)), + msg_indices_on_disk = + gb_sets:delete_any(MIOD, gb_sets:from_list(Guids)) }. msgs_written_to_disk(QPid, Guids) -> spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( @@ -1242,6 +1243,9 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> end end. +reduce_memory_use({State, Other}) -> + {reduce_memory_use(State), Other}; + reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, fun limit_ram_index/2, |
