diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-09-29 12:40:48 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-09-29 12:40:48 +0100 |
| commit | aabf6f4b440504532a1e68b3a727bafe2550722d (patch) | |
| tree | 57b719993f011b6d03c0b8b5771f6eb0cb241547 | |
| parent | ee6d8422aed1f4f1571005de89d8a4a43fe9ce76 (diff) | |
| download | rabbitmq-server-git-aabf6f4b440504532a1e68b3a727bafe2550722d.tar.gz | |
cosmetics and minor refactors
| -rw-r--r-- | src/rabbit_channel.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 63 |
3 files changed, 53 insertions, 71 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4bb1f13b02..85ace25b74 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -460,30 +460,29 @@ send_or_enqueue_ack(undefined, State) -> State; send_or_enqueue_ack(_, State = #ch{confirm_enabled = false}) -> State; -send_or_enqueue_ack(MsgSeqNo, - State = #ch{confirm_multiple = false}) -> - do_if_not_dup(MsgSeqNo, State, - fun(MSN, S = #ch{writer_pid = WriterPid, - qpid_to_msgs = QTM}) -> - ok = rabbit_writer:send_command( - WriterPid, #'basic.ack'{delivery_tag = MSN}), - S #ch { qpid_to_msgs = - dict:map(fun (_, Msgs) -> - gb_sets:delete_any(MsgSeqNo, Msgs) - end, QTM) } - end); +send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = false}) -> + do_if_not_dup( + MsgSeqNo, State, + fun(MSN, State1 = #ch{writer_pid = WriterPid, qpid_to_msgs = QTM}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.ack'{delivery_tag = MSN}), + QTM1 = dict:map(fun (_, Msgs) -> + gb_sets:delete_any(MsgSeqNo, Msgs) + end, QTM), + State1#ch{qpid_to_msgs = QTM1} + end); send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) -> - do_if_not_dup(MsgSeqNo, State, - fun(MSN, S = #ch{qpid_to_msgs = QTM}) -> - State1 = start_ack_timer(S), - State1 #ch { held_confirms = - gb_sets:add(MSN, State1#ch.held_confirms), - qpid_to_msgs = - dict:map(fun (_, Msgs) -> - gb_sets:delete_any(MsgSeqNo, - Msgs) - end, QTM) } - end). + do_if_not_dup( + MsgSeqNo, State, + fun(MSN, State1 = #ch{qpid_to_msgs = QTM}) -> + QTM1 = dict:map(fun (_, Msgs) -> + gb_sets:delete_any(MsgSeqNo, Msgs) + end, QTM), + start_ack_timer( + State1#ch{held_confirms = + gb_sets:add(MSN, State1#ch.held_confirms), + qpid_to_msgs = QTM1}) + end). msg_sent_to_queues(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) -> case dict:find(QPid, QTM) of diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 695b44250f..7d0af80a47 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -814,15 +814,15 @@ reply(Reply, State) -> {reply, Reply, State1, Timeout}. next_state(State = #msstate { sync_timer_ref = undefined, - on_sync = OS, - cref_to_guids = CTG }) -> - case {OS, dict:size(CTG)} of + on_sync = Syncs, + cref_to_guids = CTG }) -> + case {Syncs, dict:size(CTG)} of {[], 0} -> {State, hibernate}; _ -> {start_sync_timer(State), 0} end; -next_state(State = #msstate { on_sync = OS, +next_state(State = #msstate { on_sync = Syncs, cref_to_guids = CTG }) -> - case {OS, dict:size(CTG)} of + case {Syncs, dict:size(CTG)} of {[], 0} -> {stop_sync_timer(State), hibernate}; _ -> {State, 0} end. @@ -837,10 +837,10 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> {ok, cancel} = timer:cancel(TRef), State #msstate { sync_timer_ref = undefined }. -internal_sync(State = #msstate { current_file_handle = CurHdl, - on_sync = Syncs, +internal_sync(State = #msstate { current_file_handle = CurHdl, + on_sync = Syncs, client_ondisk_callback = CODC, - cref_to_guids = CTG }) -> + cref_to_guids = CTG }) -> State1 = stop_sync_timer(State), State2 = case Syncs of [] -> State1; diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 9256b8ac89..c763fe4d84 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -415,10 +415,8 @@ init(QueueName, IsDurable, Recover, false -> undefined end, - rabbit_msg_store:register_sync_callback( - ?PERSISTENT_MSG_STORE, - PRef, - MsgOnDiskFun), + rabbit_msg_store:register_sync_callback(?PERSISTENT_MSG_STORE, PRef, + MsgOnDiskFun), TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), State = #vqstate { @@ -773,13 +771,13 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, {avg_ingress_rate , AvgIngressRate} ]. seqids_to_guids(SeqIds, #vqstate{ pending_ack = PA }) -> - lists:foldl(fun(SeqId, Guids) -> - {ok, AckEntry} = dict:find(SeqId, PA), - [case AckEntry of - #msg_status { msg = Msg } -> Msg#basic_message.guid; - {_, Guid} -> Guid - end | Guids] - end, [], SeqIds). + lists:foldl( + fun(SeqId, Guids) -> + [case dict:fetch(SeqId, PA) of + #msg_status { msg = Msg } -> Msg#basic_message.guid; + {_, Guid} -> Guid + end | Guids] + end, [], SeqIds). %%---------------------------------------------------------------------------- %% Minor helpers @@ -1166,7 +1164,8 @@ ack(MsgStoreFun, Fun, AckTags, State) -> ok = orddict:fold(fun (MsgStore, Guids, ok) -> MsgStoreFun(MsgStore, Guids) end, ok, GuidsByStore), - State2 = msgs_confirmed(seqids_to_guids(AckTags, State), State1), + State2 = msgs_confirmed(gb_sets:from_list(seqids_to_guids(AckTags, State1)), + State1), PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of error -> 0; {ok, Guids} -> length(Guids) @@ -1187,17 +1186,12 @@ accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> %% Internal plumbing for confirms (aka publisher acks) %%---------------------------------------------------------------------------- -msgs_confirmed(Guids, State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - need_confirming = NC }) -> - GuidSet = gb_sets:from_list(Guids), - State #vqstate { - msgs_on_disk = - gb_sets:difference(MOD, GuidSet), - msg_indices_on_disk = - gb_sets:difference(MIOD, GuidSet), - need_confirming = - gb_sets:difference(NC, GuidSet) }. +msgs_confirmed(GuidSet, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + need_confirming = NC }) -> + State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet), + msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet), + need_confirming = gb_sets:difference(NC, GuidSet) }. msgs_written_to_disk(QPid, Guids) -> spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( @@ -1207,14 +1201,9 @@ msgs_written_to_disk(QPid, Guids) -> need_confirming = NC }) -> GuidSet = gb_sets:from_list(Guids), ToConfirmMsgs = gb_sets:intersection(GuidSet, MIOD), - MOD1 = gb_sets:intersection(gb_sets:union(MOD, GuidSet), NC), - { State #vqstate { - msgs_on_disk = - gb_sets:difference(MOD1, ToConfirmMsgs), - msg_indices_on_disk = - gb_sets:difference(MIOD, ToConfirmMsgs), - need_confirming = - gb_sets:difference(NC, ToConfirmMsgs) }, + State1 = State #vqstate { msgs_on_disk = + gb_sets:intersection(gb_sets:union(MOD, GuidSet), NC) }, + { msgs_confirmed(ToConfirmMsgs, State1), {confirm, gb_sets:to_list(ToConfirmMsgs)} } end) end). @@ -1228,15 +1217,9 @@ msg_indices_written_to_disk(Guids) -> need_confirming = NC }) -> GuidSet = gb_sets:from_list(Guids), ToConfirmMsgs = gb_sets:intersection(GuidSet, MOD), - MIOD1 = - gb_sets:intersection(gb_sets:union(MIOD, GuidSet), NC), - { State #vqstate { - msgs_on_disk = - gb_sets:difference(MOD, ToConfirmMsgs), - msg_indices_on_disk = - gb_sets:difference(MIOD1, ToConfirmMsgs), - need_confirming = - gb_sets:difference(NC, ToConfirmMsgs) }, + State1 = State #vqstate { msg_indices_on_disk = + gb_sets:intersection(gb_sets:union(MIOD, GuidSet), NC) }, + { msgs_confirmed(ToConfirmMsgs, State1), {confirm, gb_sets:to_list(ToConfirmMsgs)} } end) end). |
