summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-10-05 09:36:25 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-10-05 09:36:25 +0100
commit6c39deac72d788f09f27628b840169e1e106675d (patch)
treedbd721029767d9cf3487796f634f9632052c4c89 /src
parent7f60fde50a2ad16a7c512063c18baba2af2b9ad5 (diff)
downloadrabbitmq-server-git-6c39deac72d788f09f27628b840169e1e106675d.tar.gz
cosmetics and minor refactoring
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl58
-rw-r--r--src/rabbit_basic.erl4
-rw-r--r--src/rabbit_channel.erl42
-rw-r--r--src/rabbit_msg_store.erl26
-rw-r--r--src/rabbit_queue_index.erl6
-rw-r--r--src/rabbit_variable_queue.erl68
6 files changed, 101 insertions, 103 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d0d971ac83..9abe9069d9 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -345,10 +345,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{QName, self(), AckTag, IsDelivered, Message}),
{State2, ChAckTags1} =
case AckRequired of
- true -> {State1, sets:add_element(AckTag, ChAckTags)};
- false -> {confirm_message(
- Message#basic_message.guid,
- State1), ChAckTags}
+ true -> {State1,
+ sets:add_element(AckTag, ChAckTags)};
+ false -> {confirm_message(Message, State1),
+ ChAckTags}
end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
@@ -399,12 +399,10 @@ deliver_from_queue_deliver(AckRequired, false,
{{Message, IsDelivered, AckTag}, 0 == Remaining,
State#q{backing_queue_state = BQS1}}.
-confirm_messages(Guids, State) when is_list(Guids) ->
- lists:foldl(fun(Guid, State0) ->
- confirm_message(Guid, State0)
- end, State, Guids).
+confirm_messages(Guids, State) ->
+ lists:foldl(fun confirm_message_by_guid/2, State, Guids).
-confirm_message(Guid, State = #q{guid_to_channel = GTC}) ->
+confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) ->
case dict:find(Guid, GTC) of
{ok, {_ , undefined}} -> ok;
{ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo);
@@ -412,6 +410,9 @@ confirm_message(Guid, State = #q{guid_to_channel = GTC}) ->
end,
State#q{guid_to_channel = dict:erase(Guid, GTC)}.
+confirm_message(#basic_message{guid = Guid}, State) ->
+ confirm_message_by_guid(Guid, State).
+
record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
State;
record_confirm_message(#delivery{msg_seq_no = MsgSeqNo,
@@ -454,14 +455,14 @@ attempt_delivery(#delivery{txn = Txn,
record_current_channel_tx(ChPid, Txn),
{true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}.
-deliver_or_enqueue(Delivery = #delivery{message = Message,
- msg_seq_no = MsgSeqNo},
- State = #q{backing_queue = BQ}) ->
+deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
- {true, NewState} -> {true, NewState};
- {false, NewState} -> BQS = BQ:publish(Message, MsgSeqNo =/= undefined,
- NewState#q.backing_queue_state),
- {false, NewState#q{backing_queue_state = BQS}}
+ {true, State1} ->
+ {true, State1};
+ {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
+ #delivery{message = Message, msg_seq_no = MsgSeqNo} = Delivery,
+ BQS1 = BQ:publish(Message, MsgSeqNo =/= undefined, BQS),
+ {false, State1#q{backing_queue_state = BQS1}}
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
@@ -698,7 +699,7 @@ handle_call(consumers, _From,
[{ChPid, ConsumerTag, AckRequired} | Acc]
end, [], queue:join(ActiveConsumers, BlockedConsumers)), State);
-handle_call({deliver_immediately, Delivery = #delivery{message = Msg}},
+handle_call({deliver_immediately, Delivery = #delivery{message = Message}},
_From, State) ->
%% Synchronous, "immediate" delivery mode
%%
@@ -713,11 +714,11 @@ handle_call({deliver_immediately, Delivery = #delivery{message = Msg}},
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, State1} = attempt_delivery(Delivery,
- record_confirm_message(Delivery, State)),
+ {Delivered, State1} =
+ attempt_delivery(Delivery, record_confirm_message(Delivery, State)),
reply(Delivered, case Delivered of
true -> State1;
- false -> confirm_message(Msg#basic_message.guid, State1)
+ false -> confirm_message(Message, State1)
end);
handle_call({deliver, Delivery}, _From, State) ->
@@ -748,15 +749,14 @@ handle_call({basic_get, ChPid, NoAck}, _From,
case BQ:fetch(AckRequired, BQS) of
{empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1});
{{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
- State2 = case AckRequired of
- true ->
- C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- store_ch_record(
- C#cr{acktags = sets:add_element(AckTag, ChAckTags)}),
- State1;
- false ->
- confirm_message(Message#basic_message.guid, State1)
- end,
+ State2 =
+ case AckRequired of
+ true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
+ ChAckTags1 = sets:add_element(AckTag, ChAckTags),
+ store_ch_record(C#cr{acktags = ChAckTags1}),
+ State1;
+ false -> confirm_message(Message, State1)
+ end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
reply({ok, Remaining, Msg}, State2#q{backing_queue_state = BQS1})
end;
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 348310d9b9..8ca98f6e9f 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -52,8 +52,8 @@
(rabbit_types:delivery()) -> publish_result()).
-spec(delivery/5 ::
(boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
- rabbit_types:message(), undefined | integer())
- -> rabiit_types:delivery()).
+ rabbit_types:message(), undefined | integer()) ->
+ rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
properties_input(), binary())
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d641e824c7..7d9f80644b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -291,17 +291,14 @@ handle_cast({confirm, MsgSeqNo}, State) ->
handle_info({'DOWN', _MRef, process, QPid, _Reason},
State = #ch{qpid_to_msgs = QTM}) ->
- State1 = case dict:find(QPid, QTM) of
- {ok, Msgs} ->
- S = gb_sets:fold(fun (MsgSeqNo, State0) ->
- send_or_enqueue_ack(MsgSeqNo, State0)
- end, State, Msgs),
- S #ch{qpid_to_msgs = dict:erase(QPid, QTM)};
- error ->
- State
+ State2 = case dict:find(QPid, QTM) of
+ {ok, Msgs} -> State1 = gb_sets:fold(fun send_or_enqueue_ack/2,
+ State, Msgs),
+ State1 #ch{qpid_to_msgs = dict:erase(QPid, QTM)};
+ error -> State
end,
erase_queue_stats(QPid),
- {noreply, queue_blocked(QPid, State1)}.
+ {noreply, queue_blocked(QPid, State2)}.
handle_pre_hibernate(State = #ch{writer_pid = WriterPid,
held_confirms = As,
@@ -452,18 +449,20 @@ send_or_enqueue_ack(undefined, State) ->
send_or_enqueue_ack(_, State = #ch{confirm_enabled = false}) ->
State;
send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = false}) ->
- do_if_unconfirmed(MsgSeqNo, State,
- fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(
- WriterPid, #'basic.ack'{delivery_tag = MSN}),
- State1
- end);
+ do_if_unconfirmed(
+ MsgSeqNo, State,
+ fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(
+ WriterPid, #'basic.ack'{delivery_tag = MSN}),
+ State1
+ end);
send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) ->
- do_if_unconfirmed(MsgSeqNo, State,
- fun(MSN, State1 = #ch{held_confirms = As}) ->
- start_ack_timer(State1#ch{held_confirms =
- gb_sets:add(MSN, As)})
- end).
+ do_if_unconfirmed(
+ MsgSeqNo, State,
+ fun(MSN, State1 = #ch{held_confirms = As}) ->
+ start_ack_timer(State1#ch{held_confirms =
+ gb_sets:add(MSN, As)})
+ end).
msg_sent_to_queue(undefined, _QPid, State) ->
State;
@@ -473,7 +472,8 @@ msg_sent_to_queue(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) ->
error -> erlang:monitor(process, QPid),
gb_sets:new()
end,
- State#ch{qpid_to_msgs = dict:store(QPid, gb_sets:add(MsgSeqNo, Msgs1), QTM)}.
+ QTM1 = dict:store(QPid, gb_sets:add(MsgSeqNo, Msgs1), QTM),
+ State#ch{qpid_to_msgs = QTM1}.
do_if_unconfirmed(MsgSeqNo, State = #ch{unconfirmed = UC}, Fun) ->
case gb_sets:is_element(MsgSeqNo, UC) of
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 17f8fb1d7e..c2e74a2312 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -128,9 +128,8 @@
-spec(start_link/4 ::
(atom(), file:filename(), [binary()] | 'undefined',
startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
--spec(write/4 :: (server(), rabbit_guid:guid(),
- msg(), client_msstate())
- -> rabbit_types:ok(client_msstate())).
+-spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) ->
+ rabbit_types:ok(client_msstate())).
-spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) ->
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
-spec(contains/2 :: (server(), rabbit_guid:guid()) -> boolean()).
@@ -140,7 +139,8 @@
-spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) ->
'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
--spec(client_init/3 :: (server(), rabbit_guid:guid(), guid_fun()) -> client_msstate()).
+-spec(client_init/3 :: (server(), rabbit_guid:guid(), guid_fun()) ->
+ client_msstate()).
-spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok').
-spec(client_delete_and_terminate/3 ::
(client_msstate(), server(), rabbit_guid:guid()) -> 'ok').
@@ -363,7 +363,8 @@ set_maximum_since_use(Server, Age) ->
client_init(Server, Ref, MsgOnDiskFun) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
- gen_server2:call(Server, {new_client_state, Ref, MsgOnDiskFun}, infinity),
+ gen_server2:call(Server, {new_client_state, Ref, MsgOnDiskFun},
+ infinity),
#client_msstate { file_handle_cache = dict:new(),
index_state = IState,
index_module = IModule,
@@ -639,7 +640,8 @@ handle_call({new_client_state, CRef, Callback}, _From,
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
-handle_call({client_terminate, #client_msstate { client_ref = CRef }}, _From, State) ->
+handle_call({client_terminate, #client_msstate { client_ref = CRef }}, _From,
+ State) ->
reply(ok, clear_client_callback(CRef, State)).
handle_cast({write, CRef, Guid},
@@ -690,7 +692,8 @@ handle_cast({write, CRef, Guid},
{#msg_location.ref_count, RefCount + 1},
State),
CTG1 = case {dict:find(CRef, CODC), File =:= CurFile} of
- {{ok, _} , true} -> rabbit_misc:dict_cons(CRef, Guid, CTG);
+ {{ok, _} , true} -> rabbit_misc:dict_cons(CRef, Guid,
+ CTG);
{{ok, Fun}, false} -> Fun([Guid]), CTG;
_ -> CTG
end,
@@ -763,9 +766,11 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast({client_delete, CRef}, State = #msstate { client_refs = ClientRefs }) ->
+handle_cast({client_delete, CRef},
+ State = #msstate { client_refs = ClientRefs }) ->
State1 = clear_client_callback(CRef, State),
- noreply(State1 #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
+ noreply(State1 #msstate {
+ client_refs = sets:del_element(CRef, ClientRefs) }).
handle_info(timeout, State) ->
noreply(internal_sync(State));
@@ -854,8 +859,7 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
end,
lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
[(dict:fetch(CRef, CODC))(Guids) || {CRef, Guids} <- CGs],
- State1 #msstate { cref_to_guids = dict:new(),
- on_sync = [] }.
+ State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
read_message(Guid, From,
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 81b6daf9c3..6f19633664 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -196,11 +196,11 @@
}).
-type(startup_fun_state() ::
{fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}),
- A}).
+ A}).
-spec(init/5 :: (rabbit_amqqueue:name(), boolean(), boolean(),
- fun ((rabbit_guid:guid()) -> boolean()), on_sync_fun())
- -> {'undefined' | non_neg_integer(), [any()], qistate()}).
+ fun ((rabbit_guid:guid()) -> boolean()), on_sync_fun()) ->
+ {'undefined' | non_neg_integer(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
-spec(publish/4 :: (rabbit_guid:guid(), seq_id(), boolean(), qistate()) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 1d6b324d3a..b07ee279df 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -990,7 +990,8 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
fun (Msg = #basic_message { is_persistent = IsPersistent },
{SeqIdsAcc, State2}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- {SeqId, State3} = publish(Msg, false, IsPersistent1, false, State2),
+ {SeqId, State3} = publish(Msg, false, IsPersistent1, false,
+ State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
end, {PAcks, NewState}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
@@ -1046,8 +1047,7 @@ sum_guids_by_store_to_len(LensByStore, GuidsByStore) ->
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
-publish(Msg = #basic_message { is_persistent = IsPersistent,
- guid = Guid },
+publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
IsDelivered, MsgOnDisk, NeedsConfirming,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
next_seq_id = SeqId,
@@ -1067,13 +1067,12 @@ publish(Msg = #basic_message { is_persistent = IsPersistent,
end,
PCount1 = PCount + one_if(IsPersistent1),
Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
- {SeqId, State2 #vqstate {
- next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- ram_msg_count = RamMsgCount + 1,
- unconfirmed = Unconfirmed1 }}.
+ {SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ ram_msg_count = RamMsgCount + 1,
+ unconfirmed = Unconfirmed1 }}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, MSCState) ->
@@ -1159,7 +1158,7 @@ remove_pending_ack(KeepPersistent,
ack(_MsgStoreFun, _Fun, [], State) ->
{State, []};
-ack(MsgStoreFun, Fun, AckTags, State = #vqstate { pending_ack = PendAck }) ->
+ack(MsgStoreFun, Fun, AckTags, State) ->
{{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
lists:foldl(
@@ -1179,7 +1178,7 @@ ack(MsgStoreFun, Fun, AckTags, State = #vqstate { pending_ack = PendAck }) ->
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
{State2 #vqstate { index_state = IndexState1,
- persistent_count = PCount1 }, AckdGuids}.
+ persistent_count = PCount1 }, AckdGuids}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
@@ -1211,34 +1210,29 @@ msgs_confirmed(GuidSet, State) ->
msgs_written_to_disk(QPid, Guids) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid,
- fun(State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- GuidSet = gb_sets:from_list(Guids),
- msgs_confirmed(
- gb_sets:intersection(GuidSet, MIOD),
- State #vqstate {
- msgs_on_disk =
- gb_sets:intersection(
- gb_sets:union(MOD, GuidSet), UC) })
- end).
+ QPid, fun(State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ GuidSet = gb_sets:from_list(Guids),
+ msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:intersection(
+ gb_sets:union(MOD, GuidSet), UC) })
+ end).
msg_indices_written_to_disk(QPid, Guids) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid,
- fun(State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- GuidSet = gb_sets:from_list(Guids),
- msgs_confirmed(
- gb_sets:intersection(GuidSet, MOD),
- State #vqstate {
- msg_indices_on_disk =
- gb_sets:intersection(
- gb_sets:union(MIOD, GuidSet), UC) })
- end).
-
+ QPid, fun(State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ GuidSet = gb_sets:from_list(Guids),
+ msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
+ State #vqstate {
+ msg_indices_on_disk =
+ gb_sets:intersection(
+ gb_sets:union(MIOD, GuidSet), UC) })
+ end).
%%----------------------------------------------------------------------------
%% Phase changes