diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-04 15:33:26 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-04 15:33:26 +0100 |
| commit | 4f7984cc8c35fbe512883ff2bc1784690607d679 (patch) | |
| tree | 4ec1e510a37a27b5059eb257735de597cc406e4d /src | |
| parent | 433ce28978c03c72895ee092fee949c24b04d0af (diff) | |
| download | rabbitmq-server-git-4f7984cc8c35fbe512883ff2bc1784690607d679.tar.gz | |
refactor
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 69 |
4 files changed, 40 insertions, 43 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f1085a0c74..72b9b49dac 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -422,9 +422,8 @@ record_confirm_message(#delivery{msg_seq_no = MsgSeqNo, ack_by_acktags(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - AckdGuids = BQ:seqids_to_guids(AckTags, BQS), - confirm_messages(AckdGuids, - State#q{backing_queue_state = BQ:ack(AckTags, BQS)}). + {BQS1, AckdGuids} = BQ:ack(AckTags, BQS), + confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}). run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Funs = {fun deliver_from_queue_pred/2, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 787fc82ccb..d641e824c7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -91,7 +91,7 @@ -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). --spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). +-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ddb78aff12..e03d1e94bb 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1854,7 +1854,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags, VQ8), + {VQ9, _} = rabbit_variable_queue:ack(AckTags, VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -1864,7 +1864,8 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)). + {VQ3, _} = rabbit_variable_queue:ack([AckTag], VQ2), + publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), @@ -1897,7 +1898,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + {VQ9, _} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a72ec2f7b6..c687ae02bf 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -34,7 +34,7 @@ -export([init/5, init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, - requeue/2, len/1, is_empty/1, seqids_to_guids/2, + requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -662,34 +662,27 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> end)}. requeue(AckTags, State) -> - a(reduce_memory_use( - ack(fun rabbit_msg_store:release/2, - fun (#msg_status { msg = Msg }, State1) -> - {_SeqId, State2} = publish(Msg, true, false, false, State1), - State2; - ({IsPersistent, Guid}, State1) -> - #vqstate { msg_store_clients = MSCState } = State1, - {{ok, Msg = #basic_message{}}, MSCState1} = - read_from_msg_store(MSCState, IsPersistent, Guid), - State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, true, true, false, State2), - State3 - end, - AckTags, State))). + {State1, _Guids} = + a(reduce_memory_use( + ack(fun rabbit_msg_store:release/2, + fun (#msg_status { msg = Msg }, State1) -> + {_SeqId, State2} = publish(Msg, true, false, false, State1), + State2; + ({IsPersistent, Guid}, State1) -> + #vqstate { msg_store_clients = MSCState } = State1, + {{ok, Msg = #basic_message{}}, MSCState1} = + read_from_msg_store(MSCState, IsPersistent, Guid), + State2 = State1 #vqstate { msg_store_clients = MSCState1 }, + {_SeqId, State3} = publish(Msg, true, true, false, State2), + State3 + end, + AckTags, State))), + State1. len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). -seqids_to_guids(SeqIds, #vqstate{ pending_ack = PA }) -> - lists:foldl( - fun(SeqId, Guids) -> - [case dict:fetch(SeqId, PA) of - #msg_status { msg = Msg } -> Msg#basic_message.guid; - {_, Guid} -> Guid - end | Guids] - end, [], SeqIds). - set_ram_duration_target(DurationTarget, State = #vqstate { rates = #rates { avg_egress = AvgEgressRate, @@ -789,6 +782,8 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, %% Minor helpers %%---------------------------------------------------------------------------- +a({State, Other}) -> + {a(State), Other}; a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, persistent_count = PersistentCount, @@ -990,6 +985,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), Pubs = lists:append(lists:reverse(SPubs)), + {NewState, _Guids} = ack(Acks, State), {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent }, @@ -997,7 +993,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { IsPersistent1 = IsDurable andalso IsPersistent, {SeqId, State3} = publish(Msg, false, IsPersistent1, false, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} - end, {PAcks, ack(Acks, State)}, Pubs), + end, {PAcks, NewState}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], reduce_memory_use( @@ -1166,28 +1162,27 @@ remove_pending_ack(KeepPersistent, end. ack(_MsgStoreFun, _Fun, [], State) -> - State; + {State, []}; ack(MsgStoreFun, Fun, AckTags, State) -> - {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, - persistent_count = PCount }} = + {Guids, {SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, + persistent_count = PCount }} = lists:foldl( - fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) -> + fun (SeqId, {Gs, Acc, State2 = #vqstate { pending_ack = PA }}) -> {ok, AckEntry} = dict:find(SeqId, PA), - {accumulate_ack(SeqId, AckEntry, Acc), + {[AckEntry | Gs], + accumulate_ack(SeqId, AckEntry, Acc), Fun(AckEntry, State2 #vqstate { pending_ack = dict:erase(SeqId, PA) })} - end, {{[], orddict:new()}, State}, AckTags), + end, {[], {[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = orddict:fold(fun (MsgStore, Guids, ok) -> MsgStoreFun(MsgStore, Guids) end, ok, GuidsByStore), - %% the AckTags were removed from State1, so use State in seqids_to_guids - State2 = remove_confirms( - gb_sets:from_list(seqids_to_guids(AckTags, State)), State1), + State2 = remove_confirms(gb_sets:from_list(Guids), State1), PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), - State2 #vqstate { index_state = IndexState1, - persistent_count = PCount1 }. + {State2 #vqstate { index_state = IndexState1, + persistent_count = PCount1 }, Guids}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, @@ -1289,6 +1284,8 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> end end. +reduce_memory_use({State, Other}) -> + {reduce_memory_use(State), Other}; reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, fun limit_ram_index/2, |
