summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-09-29 13:52:25 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-09-29 13:52:25 +0100
commitd3eef85fe6a0f7adbaabf73d397275c712025e8a (patch)
treec9613228a826a7e8b782cfc9bbcfc0de1214cbe7
parentaabf6f4b440504532a1e68b3a727bafe2550722d (diff)
downloadrabbitmq-server-git-d3eef85fe6a0f7adbaabf73d397275c712025e8a.tar.gz
refactor
-rw-r--r--include/rabbit_backing_queue_spec.hrl6
-rw-r--r--src/rabbit_amqqueue.erl18
-rw-r--r--src/rabbit_amqqueue_process.erl48
-rw-r--r--src/rabbit_variable_queue.erl3
4 files changed, 41 insertions, 34 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index c01e924688..8addfb9b76 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -36,6 +36,7 @@
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(ack_required() :: boolean()).
+-type(confirm_required() :: boolean()).
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
@@ -43,9 +44,10 @@
-spec(terminate/1 :: (state()) -> state()).
-spec(delete_and_terminate/1 :: (state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
--spec(publish/3 :: (rabbit_types:basic_message(), boolean(), state()) -> state()).
+-spec(publish/3 ::
+ (rabbit_types:basic_message(), confirm_required(), state()) -> state()).
-spec(publish_delivered/4 ::
- (ack_required(), rabbit_types:basic_message(), boolean(), state())
+ (ack_required(), rabbit_types:basic_message(), confirm_required(), state())
-> {ack(), state()}).
-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
-spec(ack/2 :: ([ack()], state()) -> state()).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d756fcb9f0..c7d63d0834 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -370,19 +370,13 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity).
-deliver(QPid, #delivery{immediate = true,
- txn = Txn, sender = ChPid, message = Message,
- msg_seq_no = MsgSeqNo}) ->
- gen_server2:call(QPid, {deliver_immediately, Txn, Message, MsgSeqNo, ChPid},
- infinity);
-deliver(QPid, #delivery{mandatory = true,
- txn = Txn, sender = ChPid, message = Message,
- msg_seq_no = MsgSeqNo}) ->
- gen_server2:call(QPid, {deliver, Txn, Message, MsgSeqNo, ChPid}, infinity),
+deliver(QPid, Delivery = #delivery{immediate = true}) ->
+ gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity);
+deliver(QPid, Delivery = #delivery{mandatory = true}) ->
+ gen_server2:call(QPid, {deliver, Delivery}, infinity),
true;
-deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message,
- msg_seq_no = MsgSeqNo}) ->
- gen_server2:cast(QPid, {deliver, Txn, Message, MsgSeqNo, ChPid}),
+deliver(QPid, Delivery) ->
+ gen_server2:cast(QPid, {deliver, Delivery}),
true.
requeue(QPid, MsgIds, ChPid) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 824c4401f0..0fc7ee35e4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -404,7 +404,7 @@ confirm_messages_internal(Guids, State) when is_list(Guids) ->
confirm_message_internal(Guid, State0)
end, State, Guids).
-confirm_message_internal(Guid, State = #q { guid_to_channel = GTC }) ->
+confirm_message_internal(Guid, State = #q{guid_to_channel = GTC}) ->
case dict:find(Guid, GTC) of
{ok, {_ , undefined}} -> ok;
{ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo);
@@ -412,11 +412,11 @@ confirm_message_internal(Guid, State = #q { guid_to_channel = GTC }) ->
end,
State #q { guid_to_channel = dict:erase(Guid, GTC) }.
-maybe_record_confirm_message(undefined, _, _, State) ->
+maybe_record_confirm_message(#delivery{msg_seq_no = undefined }, State) ->
State;
-maybe_record_confirm_message(MsgSeqNo,
- #basic_message { guid = Guid },
- ChPid, State) ->
+maybe_record_confirm_message(#delivery{sender = ChPid,
+ message = #basic_message{guid = Guid},
+ msg_seq_no = MsgSeqNo}, State) ->
State #q { guid_to_channel =
dict:store(Guid, {ChPid, MsgSeqNo}, State#q.guid_to_channel) }.
@@ -427,7 +427,10 @@ run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
{_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State),
State1.
-attempt_delivery(none, _ChPid, Message, MsgSeqNo, State = #q{backing_queue = BQ}) ->
+attempt_delivery(#delivery{txn = none,
+ message = Message,
+ msg_seq_no = MsgSeqNo},
+ State = #q{backing_queue = BQ}) ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
@@ -438,13 +441,20 @@ attempt_delivery(none, _ChPid, Message, MsgSeqNo, State = #q{backing_queue = BQ}
State1#q{backing_queue_state = BQS1}}
end,
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
-attempt_delivery(Txn, ChPid, Message, _MSN, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+attempt_delivery(#delivery{txn = Txn,
+ sender = ChPid,
+ message = Message},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
record_current_channel_tx(ChPid, Txn),
{true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}.
-deliver_or_enqueue(Txn, ChPid, Message, MsgSeqNo, State = #q{backing_queue = BQ}) ->
- case attempt_delivery(Txn, ChPid, Message, MsgSeqNo, State) of
+deliver_or_enqueue(Delivery = #delivery{txn = Txn,
+ sender = ChPid,
+ message = Message,
+ msg_seq_no = MsgSeqNo},
+ State = #q{backing_queue = BQ}) ->
+ case attempt_delivery(Delivery, State) of
{true, NewState} ->
{true, NewState};
{false, NewState} ->
@@ -690,7 +700,7 @@ handle_call(consumers, _From,
[{ChPid, ConsumerTag, AckRequired} | Acc]
end, [], queue:join(ActiveConsumers, BlockedConsumers)), State);
-handle_call({deliver_immediately, Txn, Message, MsgSeqNo, ChPid}, _From, State) ->
+handle_call({deliver_immediately, Delivery}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
%% FIXME: Is this correct semantics?
@@ -704,14 +714,14 @@ handle_call({deliver_immediately, Txn, Message, MsgSeqNo, ChPid}, _From, State)
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State),
- {Delivered, State2} = attempt_delivery(Txn, ChPid, Message, MsgSeqNo, State1),
+ State1 = maybe_record_confirm_message(Delivery, State),
+ {Delivered, State2} = attempt_delivery(Delivery, State1),
reply(Delivered, State2);
-handle_call({deliver, Txn, Message, MsgSeqNo, ChPid}, _From, State) ->
+handle_call({deliver, Delivery}, _From, State) ->
%% Synchronous, "mandatory" delivery mode
- State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State),
- {Delivered, State2} = deliver_or_enqueue(Txn, ChPid, Message, MsgSeqNo, State1),
+ State1 = maybe_record_confirm_message(Delivery, State),
+ {Delivered, State2} = deliver_or_enqueue(Delivery, State1),
reply(Delivered, State2);
handle_call({commit, Txn, ChPid}, From, State) ->
@@ -868,10 +878,10 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
-handle_cast({deliver, Txn, Message, MsgSeqNo, ChPid}, State) ->
+handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State),
- {_Delivered, State2} = deliver_or_enqueue(Txn, ChPid, Message, MsgSeqNo, State1),
+ State1 = maybe_record_confirm_message(Delivery, State),
+ {_Delivered, State2} = deliver_or_enqueue(Delivery, State1),
noreply(State2);
handle_cast({ack, Txn, AckTags, ChPid},
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c763fe4d84..e6a9387106 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1164,7 +1164,8 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
ok = orddict:fold(fun (MsgStore, Guids, ok) ->
MsgStoreFun(MsgStore, Guids)
end, ok, GuidsByStore),
- State2 = msgs_confirmed(gb_sets:from_list(seqids_to_guids(AckTags, State1)),
+ %% the AckTags were removed from State1, so use State in seqids_to_guids
+ State2 = msgs_confirmed(gb_sets:from_list(seqids_to_guids(AckTags, State)),
State1),
PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
error -> 0;