diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-05 09:36:25 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-05 09:36:25 +0100 |
| commit | 6c39deac72d788f09f27628b840169e1e106675d (patch) | |
| tree | dbd721029767d9cf3487796f634f9632052c4c89 /src | |
| parent | 7f60fde50a2ad16a7c512063c18baba2af2b9ad5 (diff) | |
| download | rabbitmq-server-git-6c39deac72d788f09f27628b840169e1e106675d.tar.gz | |
cosmetics and minor refactoring
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 58 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 42 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 68 |
6 files changed, 101 insertions, 103 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d0d971ac83..9abe9069d9 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -345,10 +345,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {QName, self(), AckTag, IsDelivered, Message}), {State2, ChAckTags1} = case AckRequired of - true -> {State1, sets:add_element(AckTag, ChAckTags)}; - false -> {confirm_message( - Message#basic_message.guid, - State1), ChAckTags} + true -> {State1, + sets:add_element(AckTag, ChAckTags)}; + false -> {confirm_message(Message, State1), + ChAckTags} end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, @@ -399,12 +399,10 @@ deliver_from_queue_deliver(AckRequired, false, {{Message, IsDelivered, AckTag}, 0 == Remaining, State#q{backing_queue_state = BQS1}}. -confirm_messages(Guids, State) when is_list(Guids) -> - lists:foldl(fun(Guid, State0) -> - confirm_message(Guid, State0) - end, State, Guids). +confirm_messages(Guids, State) -> + lists:foldl(fun confirm_message_by_guid/2, State, Guids). -confirm_message(Guid, State = #q{guid_to_channel = GTC}) -> +confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) -> case dict:find(Guid, GTC) of {ok, {_ , undefined}} -> ok; {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo); @@ -412,6 +410,9 @@ confirm_message(Guid, State = #q{guid_to_channel = GTC}) -> end, State#q{guid_to_channel = dict:erase(Guid, GTC)}. +confirm_message(#basic_message{guid = Guid}, State) -> + confirm_message_by_guid(Guid, State). + record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> State; record_confirm_message(#delivery{msg_seq_no = MsgSeqNo, @@ -454,14 +455,14 @@ attempt_delivery(#delivery{txn = Txn, record_current_channel_tx(ChPid, Txn), {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. -deliver_or_enqueue(Delivery = #delivery{message = Message, - msg_seq_no = MsgSeqNo}, - State = #q{backing_queue = BQ}) -> +deliver_or_enqueue(Delivery, State) -> case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of - {true, NewState} -> {true, NewState}; - {false, NewState} -> BQS = BQ:publish(Message, MsgSeqNo =/= undefined, - NewState#q.backing_queue_state), - {false, NewState#q{backing_queue_state = BQS}} + {true, State1} -> + {true, State1}; + {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> + #delivery{message = Message, msg_seq_no = MsgSeqNo} = Delivery, + BQS1 = BQ:publish(Message, MsgSeqNo =/= undefined, BQS), + {false, State1#q{backing_queue_state = BQS1}} end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> @@ -698,7 +699,7 @@ handle_call(consumers, _From, [{ChPid, ConsumerTag, AckRequired} | Acc] end, [], queue:join(ActiveConsumers, BlockedConsumers)), State); -handle_call({deliver_immediately, Delivery = #delivery{message = Msg}}, +handle_call({deliver_immediately, Delivery = #delivery{message = Message}}, _From, State) -> %% Synchronous, "immediate" delivery mode %% @@ -713,11 +714,11 @@ handle_call({deliver_immediately, Delivery = #delivery{message = Msg}}, %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, State1} = attempt_delivery(Delivery, - record_confirm_message(Delivery, State)), + {Delivered, State1} = + attempt_delivery(Delivery, record_confirm_message(Delivery, State)), reply(Delivered, case Delivered of true -> State1; - false -> confirm_message(Msg#basic_message.guid, State1) + false -> confirm_message(Message, State1) end); handle_call({deliver, Delivery}, _From, State) -> @@ -748,15 +749,14 @@ handle_call({basic_get, ChPid, NoAck}, _From, case BQ:fetch(AckRequired, BQS) of {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1}); {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> - State2 = case AckRequired of - true -> - C = #cr{acktags = ChAckTags} = ch_record(ChPid), - store_ch_record( - C#cr{acktags = sets:add_element(AckTag, ChAckTags)}), - State1; - false -> - confirm_message(Message#basic_message.guid, State1) - end, + State2 = + case AckRequired of + true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + ChAckTags1 = sets:add_element(AckTag, ChAckTags), + store_ch_record(C#cr{acktags = ChAckTags1}), + State1; + false -> confirm_message(Message, State1) + end, Msg = {QName, self(), AckTag, IsDelivered, Message}, reply({ok, Remaining, Msg}, State2#q{backing_queue_state = BQS1}) end; diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 348310d9b9..8ca98f6e9f 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -52,8 +52,8 @@ (rabbit_types:delivery()) -> publish_result()). -spec(delivery/5 :: (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), - rabbit_types:message(), undefined | integer()) - -> rabiit_types:delivery()). + rabbit_types:message(), undefined | integer()) -> + rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(), binary()) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d641e824c7..7d9f80644b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -291,17 +291,14 @@ handle_cast({confirm, MsgSeqNo}, State) -> handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{qpid_to_msgs = QTM}) -> - State1 = case dict:find(QPid, QTM) of - {ok, Msgs} -> - S = gb_sets:fold(fun (MsgSeqNo, State0) -> - send_or_enqueue_ack(MsgSeqNo, State0) - end, State, Msgs), - S #ch{qpid_to_msgs = dict:erase(QPid, QTM)}; - error -> - State + State2 = case dict:find(QPid, QTM) of + {ok, Msgs} -> State1 = gb_sets:fold(fun send_or_enqueue_ack/2, + State, Msgs), + State1 #ch{qpid_to_msgs = dict:erase(QPid, QTM)}; + error -> State end, erase_queue_stats(QPid), - {noreply, queue_blocked(QPid, State1)}. + {noreply, queue_blocked(QPid, State2)}. handle_pre_hibernate(State = #ch{writer_pid = WriterPid, held_confirms = As, @@ -452,18 +449,20 @@ send_or_enqueue_ack(undefined, State) -> send_or_enqueue_ack(_, State = #ch{confirm_enabled = false}) -> State; send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = false}) -> - do_if_unconfirmed(MsgSeqNo, State, - fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command( - WriterPid, #'basic.ack'{delivery_tag = MSN}), - State1 - end); + do_if_unconfirmed( + MsgSeqNo, State, + fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.ack'{delivery_tag = MSN}), + State1 + end); send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) -> - do_if_unconfirmed(MsgSeqNo, State, - fun(MSN, State1 = #ch{held_confirms = As}) -> - start_ack_timer(State1#ch{held_confirms = - gb_sets:add(MSN, As)}) - end). + do_if_unconfirmed( + MsgSeqNo, State, + fun(MSN, State1 = #ch{held_confirms = As}) -> + start_ack_timer(State1#ch{held_confirms = + gb_sets:add(MSN, As)}) + end). msg_sent_to_queue(undefined, _QPid, State) -> State; @@ -473,7 +472,8 @@ msg_sent_to_queue(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) -> error -> erlang:monitor(process, QPid), gb_sets:new() end, - State#ch{qpid_to_msgs = dict:store(QPid, gb_sets:add(MsgSeqNo, Msgs1), QTM)}. + QTM1 = dict:store(QPid, gb_sets:add(MsgSeqNo, Msgs1), QTM), + State#ch{qpid_to_msgs = QTM1}. do_if_unconfirmed(MsgSeqNo, State = #ch{unconfirmed = UC}, Fun) -> case gb_sets:is_element(MsgSeqNo, UC) of diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 17f8fb1d7e..c2e74a2312 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -128,9 +128,8 @@ -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', startup_fun_state()) -> rabbit_types:ok_pid_or_error()). --spec(write/4 :: (server(), rabbit_guid:guid(), - msg(), client_msstate()) - -> rabbit_types:ok(client_msstate())). +-spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) -> + rabbit_types:ok(client_msstate())). -spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) -> {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). -spec(contains/2 :: (server(), rabbit_guid:guid()) -> boolean()). @@ -140,7 +139,8 @@ -spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). --spec(client_init/3 :: (server(), rabbit_guid:guid(), guid_fun()) -> client_msstate()). +-spec(client_init/3 :: (server(), rabbit_guid:guid(), guid_fun()) -> + client_msstate()). -spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). -spec(client_delete_and_terminate/3 :: (client_msstate(), server(), rabbit_guid:guid()) -> 'ok'). @@ -363,7 +363,8 @@ set_maximum_since_use(Server, Age) -> client_init(Server, Ref, MsgOnDiskFun) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = - gen_server2:call(Server, {new_client_state, Ref, MsgOnDiskFun}, infinity), + gen_server2:call(Server, {new_client_state, Ref, MsgOnDiskFun}, + infinity), #client_msstate { file_handle_cache = dict:new(), index_state = IState, index_module = IModule, @@ -639,7 +640,8 @@ handle_call({new_client_state, CRef, Callback}, _From, handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); -handle_call({client_terminate, #client_msstate { client_ref = CRef }}, _From, State) -> +handle_call({client_terminate, #client_msstate { client_ref = CRef }}, _From, + State) -> reply(ok, clear_client_callback(CRef, State)). handle_cast({write, CRef, Guid}, @@ -690,7 +692,8 @@ handle_cast({write, CRef, Guid}, {#msg_location.ref_count, RefCount + 1}, State), CTG1 = case {dict:find(CRef, CODC), File =:= CurFile} of - {{ok, _} , true} -> rabbit_misc:dict_cons(CRef, Guid, CTG); + {{ok, _} , true} -> rabbit_misc:dict_cons(CRef, Guid, + CTG); {{ok, Fun}, false} -> Fun([Guid]), CTG; _ -> CTG end, @@ -763,9 +766,11 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); -handle_cast({client_delete, CRef}, State = #msstate { client_refs = ClientRefs }) -> +handle_cast({client_delete, CRef}, + State = #msstate { client_refs = ClientRefs }) -> State1 = clear_client_callback(CRef, State), - noreply(State1 #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). + noreply(State1 #msstate { + client_refs = sets:del_element(CRef, ClientRefs) }). handle_info(timeout, State) -> noreply(internal_sync(State)); @@ -854,8 +859,7 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, end, lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), [(dict:fetch(CRef, CODC))(Guids) || {CRef, Guids} <- CGs], - State1 #msstate { cref_to_guids = dict:new(), - on_sync = [] }. + State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. read_message(Guid, From, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 81b6daf9c3..6f19633664 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -196,11 +196,11 @@ }). -type(startup_fun_state() :: {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}), - A}). + A}). -spec(init/5 :: (rabbit_amqqueue:name(), boolean(), boolean(), - fun ((rabbit_guid:guid()) -> boolean()), on_sync_fun()) - -> {'undefined' | non_neg_integer(), [any()], qistate()}). + fun ((rabbit_guid:guid()) -> boolean()), on_sync_fun()) -> + {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). -spec(publish/4 :: (rabbit_guid:guid(), seq_id(), boolean(), qistate()) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1d6b324d3a..b07ee279df 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -990,7 +990,8 @@ tx_commit_index(State = #vqstate { on_sync = #sync { fun (Msg = #basic_message { is_persistent = IsPersistent }, {SeqIdsAcc, State2}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = publish(Msg, false, IsPersistent1, false, State2), + {SeqId, State3} = publish(Msg, false, IsPersistent1, false, + State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} end, {PAcks, NewState}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), @@ -1046,8 +1047,7 @@ sum_guids_by_store_to_len(LensByStore, GuidsByStore) -> %% Internal gubbins for publishing %%---------------------------------------------------------------------------- -publish(Msg = #basic_message { is_persistent = IsPersistent, - guid = Guid }, +publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, IsDelivered, MsgOnDisk, NeedsConfirming, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, @@ -1067,13 +1067,12 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, end, PCount1 = PCount + one_if(IsPersistent1), Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), - {SeqId, State2 #vqstate { - next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - ram_msg_count = RamMsgCount + 1, - unconfirmed = Unconfirmed1 }}. + {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + ram_msg_count = RamMsgCount + 1, + unconfirmed = Unconfirmed1 }}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, MSCState) -> @@ -1159,7 +1158,7 @@ remove_pending_ack(KeepPersistent, ack(_MsgStoreFun, _Fun, [], State) -> {State, []}; -ack(MsgStoreFun, Fun, AckTags, State = #vqstate { pending_ack = PendAck }) -> +ack(MsgStoreFun, Fun, AckTags, State) -> {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = lists:foldl( @@ -1179,7 +1178,7 @@ ack(MsgStoreFun, Fun, AckTags, State = #vqstate { pending_ack = PendAck }) -> PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), {State2 #vqstate { index_state = IndexState1, - persistent_count = PCount1 }, AckdGuids}. + persistent_count = PCount1 }, AckdGuids}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, @@ -1211,34 +1210,29 @@ msgs_confirmed(GuidSet, State) -> msgs_written_to_disk(QPid, Guids) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, - fun(State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - GuidSet = gb_sets:from_list(Guids), - msgs_confirmed( - gb_sets:intersection(GuidSet, MIOD), - State #vqstate { - msgs_on_disk = - gb_sets:intersection( - gb_sets:union(MOD, GuidSet), UC) }) - end). + QPid, fun(State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + GuidSet = gb_sets:from_list(Guids), + msgs_confirmed(gb_sets:intersection(GuidSet, MIOD), + State #vqstate { + msgs_on_disk = + gb_sets:intersection( + gb_sets:union(MOD, GuidSet), UC) }) + end). msg_indices_written_to_disk(QPid, Guids) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, - fun(State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - GuidSet = gb_sets:from_list(Guids), - msgs_confirmed( - gb_sets:intersection(GuidSet, MOD), - State #vqstate { - msg_indices_on_disk = - gb_sets:intersection( - gb_sets:union(MIOD, GuidSet), UC) }) - end). - + QPid, fun(State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + GuidSet = gb_sets:from_list(Guids), + msgs_confirmed(gb_sets:intersection(GuidSet, MOD), + State #vqstate { + msg_indices_on_disk = + gb_sets:intersection( + gb_sets:union(MIOD, GuidSet), UC) }) + end). %%---------------------------------------------------------------------------- %% Phase changes |
