diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-29 13:52:25 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-29 13:52:25 +0100 |
| commit | d3eef85fe6a0f7adbaabf73d397275c712025e8a (patch) | |
| tree | c9613228a826a7e8b782cfc9bbcfc0de1214cbe7 | |
| parent | aabf6f4b440504532a1e68b3a727bafe2550722d (diff) | |
| download | rabbitmq-server-git-d3eef85fe6a0f7adbaabf73d397275c712025e8a.tar.gz | |
refactor
| -rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 48 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 3 |
4 files changed, 41 insertions, 34 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index c01e924688..8addfb9b76 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -36,6 +36,7 @@ -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(ack_required() :: boolean()). +-type(confirm_required() :: boolean()). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). @@ -43,9 +44,10 @@ -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). --spec(publish/3 :: (rabbit_types:basic_message(), boolean(), state()) -> state()). +-spec(publish/3 :: + (rabbit_types:basic_message(), confirm_required(), state()) -> state()). -spec(publish_delivered/4 :: - (ack_required(), rabbit_types:basic_message(), boolean(), state()) + (ack_required(), rabbit_types:basic_message(), confirm_required(), state()) -> {ack(), state()}). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). -spec(ack/2 :: ([ack()], state()) -> state()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d756fcb9f0..c7d63d0834 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -370,19 +370,13 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity). -deliver(QPid, #delivery{immediate = true, - txn = Txn, sender = ChPid, message = Message, - msg_seq_no = MsgSeqNo}) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message, MsgSeqNo, ChPid}, - infinity); -deliver(QPid, #delivery{mandatory = true, - txn = Txn, sender = ChPid, message = Message, - msg_seq_no = MsgSeqNo}) -> - gen_server2:call(QPid, {deliver, Txn, Message, MsgSeqNo, ChPid}, infinity), +deliver(QPid, Delivery = #delivery{immediate = true}) -> + gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity); +deliver(QPid, Delivery = #delivery{mandatory = true}) -> + gen_server2:call(QPid, {deliver, Delivery}, infinity), true; -deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message, - msg_seq_no = MsgSeqNo}) -> - gen_server2:cast(QPid, {deliver, Txn, Message, MsgSeqNo, ChPid}), +deliver(QPid, Delivery) -> + gen_server2:cast(QPid, {deliver, Delivery}), true. requeue(QPid, MsgIds, ChPid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 824c4401f0..0fc7ee35e4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -404,7 +404,7 @@ confirm_messages_internal(Guids, State) when is_list(Guids) -> confirm_message_internal(Guid, State0) end, State, Guids). -confirm_message_internal(Guid, State = #q { guid_to_channel = GTC }) -> +confirm_message_internal(Guid, State = #q{guid_to_channel = GTC}) -> case dict:find(Guid, GTC) of {ok, {_ , undefined}} -> ok; {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo); @@ -412,11 +412,11 @@ confirm_message_internal(Guid, State = #q { guid_to_channel = GTC }) -> end, State #q { guid_to_channel = dict:erase(Guid, GTC) }. -maybe_record_confirm_message(undefined, _, _, State) -> +maybe_record_confirm_message(#delivery{msg_seq_no = undefined }, State) -> State; -maybe_record_confirm_message(MsgSeqNo, - #basic_message { guid = Guid }, - ChPid, State) -> +maybe_record_confirm_message(#delivery{sender = ChPid, + message = #basic_message{guid = Guid}, + msg_seq_no = MsgSeqNo}, State) -> State #q { guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, State#q.guid_to_channel) }. @@ -427,7 +427,10 @@ run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), State1. -attempt_delivery(none, _ChPid, Message, MsgSeqNo, State = #q{backing_queue = BQ}) -> +attempt_delivery(#delivery{txn = none, + message = Message, + msg_seq_no = MsgSeqNo}, + State = #q{backing_queue = BQ}) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> @@ -438,13 +441,20 @@ attempt_delivery(none, _ChPid, Message, MsgSeqNo, State = #q{backing_queue = BQ} State1#q{backing_queue_state = BQS1}} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, _MSN, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +attempt_delivery(#delivery{txn = Txn, + sender = ChPid, + message = Message}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. -deliver_or_enqueue(Txn, ChPid, Message, MsgSeqNo, State = #q{backing_queue = BQ}) -> - case attempt_delivery(Txn, ChPid, Message, MsgSeqNo, State) of +deliver_or_enqueue(Delivery = #delivery{txn = Txn, + sender = ChPid, + message = Message, + msg_seq_no = MsgSeqNo}, + State = #q{backing_queue = BQ}) -> + case attempt_delivery(Delivery, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> @@ -690,7 +700,7 @@ handle_call(consumers, _From, [{ChPid, ConsumerTag, AckRequired} | Acc] end, [], queue:join(ActiveConsumers, BlockedConsumers)), State); -handle_call({deliver_immediately, Txn, Message, MsgSeqNo, ChPid}, _From, State) -> +handle_call({deliver_immediately, Delivery}, _From, State) -> %% Synchronous, "immediate" delivery mode %% %% FIXME: Is this correct semantics? @@ -704,14 +714,14 @@ handle_call({deliver_immediately, Txn, Message, MsgSeqNo, ChPid}, _From, State) %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State), - {Delivered, State2} = attempt_delivery(Txn, ChPid, Message, MsgSeqNo, State1), + State1 = maybe_record_confirm_message(Delivery, State), + {Delivered, State2} = attempt_delivery(Delivery, State1), reply(Delivered, State2); -handle_call({deliver, Txn, Message, MsgSeqNo, ChPid}, _From, State) -> +handle_call({deliver, Delivery}, _From, State) -> %% Synchronous, "mandatory" delivery mode - State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State), - {Delivered, State2} = deliver_or_enqueue(Txn, ChPid, Message, MsgSeqNo, State1), + State1 = maybe_record_confirm_message(Delivery, State), + {Delivered, State2} = deliver_or_enqueue(Delivery, State1), reply(Delivered, State2); handle_call({commit, Txn, ChPid}, From, State) -> @@ -868,10 +878,10 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). -handle_cast({deliver, Txn, Message, MsgSeqNo, ChPid}, State) -> +handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State), - {_Delivered, State2} = deliver_or_enqueue(Txn, ChPid, Message, MsgSeqNo, State1), + State1 = maybe_record_confirm_message(Delivery, State), + {_Delivered, State2} = deliver_or_enqueue(Delivery, State1), noreply(State2); handle_cast({ack, Txn, AckTags, ChPid}, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c763fe4d84..e6a9387106 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1164,7 +1164,8 @@ ack(MsgStoreFun, Fun, AckTags, State) -> ok = orddict:fold(fun (MsgStore, Guids, ok) -> MsgStoreFun(MsgStore, Guids) end, ok, GuidsByStore), - State2 = msgs_confirmed(gb_sets:from_list(seqids_to_guids(AckTags, State1)), + %% the AckTags were removed from State1, so use State in seqids_to_guids + State2 = msgs_confirmed(gb_sets:from_list(seqids_to_guids(AckTags, State)), State1), PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of error -> 0; |
