summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-09-29 12:40:48 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-09-29 12:40:48 +0100
commitaabf6f4b440504532a1e68b3a727bafe2550722d (patch)
tree57b719993f011b6d03c0b8b5771f6eb0cb241547
parentee6d8422aed1f4f1571005de89d8a4a43fe9ce76 (diff)
downloadrabbitmq-server-git-aabf6f4b440504532a1e68b3a727bafe2550722d.tar.gz
cosmetics and minor refactors
-rw-r--r--src/rabbit_channel.erl45
-rw-r--r--src/rabbit_msg_store.erl16
-rw-r--r--src/rabbit_variable_queue.erl63
3 files changed, 53 insertions, 71 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 4bb1f13b02..85ace25b74 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -460,30 +460,29 @@ send_or_enqueue_ack(undefined, State) ->
State;
send_or_enqueue_ack(_, State = #ch{confirm_enabled = false}) ->
State;
-send_or_enqueue_ack(MsgSeqNo,
- State = #ch{confirm_multiple = false}) ->
- do_if_not_dup(MsgSeqNo, State,
- fun(MSN, S = #ch{writer_pid = WriterPid,
- qpid_to_msgs = QTM}) ->
- ok = rabbit_writer:send_command(
- WriterPid, #'basic.ack'{delivery_tag = MSN}),
- S #ch { qpid_to_msgs =
- dict:map(fun (_, Msgs) ->
- gb_sets:delete_any(MsgSeqNo, Msgs)
- end, QTM) }
- end);
+send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = false}) ->
+ do_if_not_dup(
+ MsgSeqNo, State,
+ fun(MSN, State1 = #ch{writer_pid = WriterPid, qpid_to_msgs = QTM}) ->
+ ok = rabbit_writer:send_command(
+ WriterPid, #'basic.ack'{delivery_tag = MSN}),
+ QTM1 = dict:map(fun (_, Msgs) ->
+ gb_sets:delete_any(MsgSeqNo, Msgs)
+ end, QTM),
+ State1#ch{qpid_to_msgs = QTM1}
+ end);
send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) ->
- do_if_not_dup(MsgSeqNo, State,
- fun(MSN, S = #ch{qpid_to_msgs = QTM}) ->
- State1 = start_ack_timer(S),
- State1 #ch { held_confirms =
- gb_sets:add(MSN, State1#ch.held_confirms),
- qpid_to_msgs =
- dict:map(fun (_, Msgs) ->
- gb_sets:delete_any(MsgSeqNo,
- Msgs)
- end, QTM) }
- end).
+ do_if_not_dup(
+ MsgSeqNo, State,
+ fun(MSN, State1 = #ch{qpid_to_msgs = QTM}) ->
+ QTM1 = dict:map(fun (_, Msgs) ->
+ gb_sets:delete_any(MsgSeqNo, Msgs)
+ end, QTM),
+ start_ack_timer(
+ State1#ch{held_confirms =
+ gb_sets:add(MSN, State1#ch.held_confirms),
+ qpid_to_msgs = QTM1})
+ end).
msg_sent_to_queues(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) ->
case dict:find(QPid, QTM) of
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 695b44250f..7d0af80a47 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -814,15 +814,15 @@ reply(Reply, State) ->
{reply, Reply, State1, Timeout}.
next_state(State = #msstate { sync_timer_ref = undefined,
- on_sync = OS,
- cref_to_guids = CTG }) ->
- case {OS, dict:size(CTG)} of
+ on_sync = Syncs,
+ cref_to_guids = CTG }) ->
+ case {Syncs, dict:size(CTG)} of
{[], 0} -> {State, hibernate};
_ -> {start_sync_timer(State), 0}
end;
-next_state(State = #msstate { on_sync = OS,
+next_state(State = #msstate { on_sync = Syncs,
cref_to_guids = CTG }) ->
- case {OS, dict:size(CTG)} of
+ case {Syncs, dict:size(CTG)} of
{[], 0} -> {stop_sync_timer(State), hibernate};
_ -> {State, 0}
end.
@@ -837,10 +837,10 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
{ok, cancel} = timer:cancel(TRef),
State #msstate { sync_timer_ref = undefined }.
-internal_sync(State = #msstate { current_file_handle = CurHdl,
- on_sync = Syncs,
+internal_sync(State = #msstate { current_file_handle = CurHdl,
+ on_sync = Syncs,
client_ondisk_callback = CODC,
- cref_to_guids = CTG }) ->
+ cref_to_guids = CTG }) ->
State1 = stop_sync_timer(State),
State2 = case Syncs of
[] -> State1;
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9256b8ac89..c763fe4d84 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -415,10 +415,8 @@ init(QueueName, IsDurable, Recover,
false -> undefined
end,
- rabbit_msg_store:register_sync_callback(
- ?PERSISTENT_MSG_STORE,
- PRef,
- MsgOnDiskFun),
+ rabbit_msg_store:register_sync_callback(?PERSISTENT_MSG_STORE, PRef,
+ MsgOnDiskFun),
TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef),
State = #vqstate {
@@ -773,13 +771,13 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
{avg_ingress_rate , AvgIngressRate} ].
seqids_to_guids(SeqIds, #vqstate{ pending_ack = PA }) ->
- lists:foldl(fun(SeqId, Guids) ->
- {ok, AckEntry} = dict:find(SeqId, PA),
- [case AckEntry of
- #msg_status { msg = Msg } -> Msg#basic_message.guid;
- {_, Guid} -> Guid
- end | Guids]
- end, [], SeqIds).
+ lists:foldl(
+ fun(SeqId, Guids) ->
+ [case dict:fetch(SeqId, PA) of
+ #msg_status { msg = Msg } -> Msg#basic_message.guid;
+ {_, Guid} -> Guid
+ end | Guids]
+ end, [], SeqIds).
%%----------------------------------------------------------------------------
%% Minor helpers
@@ -1166,7 +1164,8 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
ok = orddict:fold(fun (MsgStore, Guids, ok) ->
MsgStoreFun(MsgStore, Guids)
end, ok, GuidsByStore),
- State2 = msgs_confirmed(seqids_to_guids(AckTags, State), State1),
+ State2 = msgs_confirmed(gb_sets:from_list(seqids_to_guids(AckTags, State1)),
+ State1),
PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
error -> 0;
{ok, Guids} -> length(Guids)
@@ -1187,17 +1186,12 @@ accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) ->
%% Internal plumbing for confirms (aka publisher acks)
%%----------------------------------------------------------------------------
-msgs_confirmed(Guids, State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- need_confirming = NC }) ->
- GuidSet = gb_sets:from_list(Guids),
- State #vqstate {
- msgs_on_disk =
- gb_sets:difference(MOD, GuidSet),
- msg_indices_on_disk =
- gb_sets:difference(MIOD, GuidSet),
- need_confirming =
- gb_sets:difference(NC, GuidSet) }.
+msgs_confirmed(GuidSet, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ need_confirming = NC }) ->
+ State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet),
+ msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet),
+ need_confirming = gb_sets:difference(NC, GuidSet) }.
msgs_written_to_disk(QPid, Guids) ->
spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
@@ -1207,14 +1201,9 @@ msgs_written_to_disk(QPid, Guids) ->
need_confirming = NC }) ->
GuidSet = gb_sets:from_list(Guids),
ToConfirmMsgs = gb_sets:intersection(GuidSet, MIOD),
- MOD1 = gb_sets:intersection(gb_sets:union(MOD, GuidSet), NC),
- { State #vqstate {
- msgs_on_disk =
- gb_sets:difference(MOD1, ToConfirmMsgs),
- msg_indices_on_disk =
- gb_sets:difference(MIOD, ToConfirmMsgs),
- need_confirming =
- gb_sets:difference(NC, ToConfirmMsgs) },
+ State1 = State #vqstate { msgs_on_disk =
+ gb_sets:intersection(gb_sets:union(MOD, GuidSet), NC) },
+ { msgs_confirmed(ToConfirmMsgs, State1),
{confirm, gb_sets:to_list(ToConfirmMsgs)} }
end)
end).
@@ -1228,15 +1217,9 @@ msg_indices_written_to_disk(Guids) ->
need_confirming = NC }) ->
GuidSet = gb_sets:from_list(Guids),
ToConfirmMsgs = gb_sets:intersection(GuidSet, MOD),
- MIOD1 =
- gb_sets:intersection(gb_sets:union(MIOD, GuidSet), NC),
- { State #vqstate {
- msgs_on_disk =
- gb_sets:difference(MOD, ToConfirmMsgs),
- msg_indices_on_disk =
- gb_sets:difference(MIOD1, ToConfirmMsgs),
- need_confirming =
- gb_sets:difference(NC, ToConfirmMsgs) },
+ State1 = State #vqstate { msg_indices_on_disk =
+ gb_sets:intersection(gb_sets:union(MIOD, GuidSet), NC) },
+ { msgs_confirmed(ToConfirmMsgs, State1),
{confirm, gb_sets:to_list(ToConfirmMsgs)} }
end)
end).