summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-17 10:45:33 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-17 10:45:33 +0100
commitcaefab720fc4bffae44f057438ea91debda63393 (patch)
tree69bd4f343784c3324b2ee24ea9ff233d1b0cf995
parentad27f6e8847fb7c01a9c68dd7d6faf220e378c3d (diff)
downloadrabbitmq-server-git-caefab720fc4bffae44f057438ea91debda63393.tar.gz
move msg_seq_no from #basic_message{} to #delivery{}
-rw-r--r--include/rabbit.hrl4
-rw-r--r--src/rabbit_amqqueue.erl15
-rw-r--r--src/rabbit_amqqueue_process.erl55
-rw-r--r--src/rabbit_basic.erl11
-rw-r--r--src/rabbit_channel.erl6
-rw-r--r--src/rabbit_router.erl9
6 files changed, 64 insertions, 36 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index e8255346c2..161b9b0b2a 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -66,10 +66,10 @@
-record(listener, {node, protocol, host, port}).
-record(basic_message, {exchange_name, routing_key, content, guid,
- is_persistent, msg_seq_no, origin}).
+ is_persistent}).
-record(ssl_socket, {tcp, ssl}).
--record(delivery, {mandatory, immediate, txn, sender, message}).
+-record(delivery, {mandatory, immediate, txn, sender, message, origin, msg_seq_no}).
-record(amqp_error, {name, explanation, method = none}).
-record(event, {type, props, timestamp}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2453280e34..c25fd9b4cb 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -363,15 +363,18 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity).
deliver(QPid, #delivery{immediate = true,
- txn = Txn, sender = ChPid, message = Message}) ->
- gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid},
+ txn = Txn, sender = ChPid, message = Message,
+ msg_seq_no = MsgSeqNo}) ->
+ gen_server2:call(QPid, {deliver_immediately, Txn, Message, MsgSeqNo, ChPid},
infinity);
deliver(QPid, #delivery{mandatory = true,
- txn = Txn, sender = ChPid, message = Message}) ->
- gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity),
+ txn = Txn, sender = ChPid, message = Message,
+ msg_seq_no = MsgSeqNo}) ->
+ gen_server2:call(QPid, {deliver, Txn, Message, MsgSeqNo, ChPid}, infinity),
true;
-deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
- gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}),
+deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message,
+ msg_seq_no = MsgSeqNo}) ->
+ gen_server2:cast(QPid, {deliver, Txn, Message, MsgSeqNo, ChPid}),
true.
requeue(QPid, MsgIds, ChPid) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a25ad48b3b..0dbd7f1778 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -60,7 +60,8 @@
sync_timer_ref,
rate_timer_ref,
expiry_timer_ref,
- stats_timer
+ stats_timer,
+ guid_to_channel
}).
-record(consumer, {tag, ack_required}).
@@ -122,7 +123,8 @@ init(Q) ->
sync_timer_ref = undefined,
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
- stats_timer = rabbit_event:init_stats_timer()}, hibernate,
+ stats_timer = rabbit_event:init_stats_timer(),
+ guid_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -345,7 +347,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun }, FunAcc,
ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
% PubAck after message delivered to consumer (disregard consumer acks)
- confirm_message(Message),
+ State2 = confirm_message(Message#basic_message.guid, State1),
ChAckTags1 = case AckRequired of
true -> sets:add_element(AckTag, ChAckTags);
false -> ChAckTags
@@ -365,10 +367,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun }, FunAcc,
{ActiveConsumers1,
queue:in(QEntry, BlockedConsumers1)}
end,
- State2 = State1#q{
+ State3 = State2#q{
active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers},
- deliver_msgs_to_consumers(Funs, FunAcc1, State2);
+ deliver_msgs_to_consumers(Funs, FunAcc1, State3);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
store_ch_record(C#cr{is_limit_active = true}),
@@ -399,8 +401,21 @@ deliver_from_queue_deliver(AckRequired, false,
{{Message, IsDelivered, AckTag}, 0 == Remaining,
State #q { backing_queue_state = BQS1 }}.
-confirm_message(#basic_message{msg_seq_no = MsgSeqNo, origin = ChPid}) ->
- rabbit_channel:confirm(ChPid, MsgSeqNo).
+confirm_message(Guid, State = #q{guid_to_channel = GTC}) ->
+ case dict:find(Guid, GTC) of
+ {ok, {ChPid, MsgSeqNo}} ->
+ rabbit_channel:confirm(ChPid, MsgSeqNo),
+ State#q{guid_to_channel = dict:erase(Guid, GTC)};
+ _ ->
+ State
+ end.
+
+maybe_record_confirm_message(undefined, _, _, State) ->
+ State;
+maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State) ->
+ State#q{guid_to_channel =
+ dict:store(Message#basic_message.guid, {ChPid, MsgSeqNo},
+ State#q.guid_to_channel)}.
run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -635,7 +650,7 @@ handle_call(consumers, _From,
[{ChPid, ConsumerTag, AckRequired} | Acc]
end, [], queue:join(ActiveConsumers, BlockedConsumers)), State);
-handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
+handle_call({deliver_immediately, Txn, Message, MsgSeqNo, ChPid}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
%% FIXME: Is this correct semantics?
@@ -649,13 +664,16 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State),
- reply(Delivered, NewState);
+ State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State),
+ {Delivered, State2} = attempt_delivery(Txn, ChPid, Message, State1
+),
+ reply(Delivered, State2);
-handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
+handle_call({deliver, Txn, Message, MsgSeqNo, ChPid}, _From, State) ->
%% Synchronous, "mandatory" delivery mode
- {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
- reply(Delivered, NewState);
+ State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State),
+ {Delivered, State2} = deliver_or_enqueue(Txn, ChPid, Message, State1),
+ reply(Delivered, State2);
handle_call({commit, Txn, ChPid}, From, State) ->
NewState = commit_transaction(Txn, From, ChPid, State),
@@ -681,7 +699,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
{empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1});
{{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
% PubAck after message got
- confirm_message(Message),
+ State2 = confirm_message(Message#basic_message.guid, State1),
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
store_ch_record(
@@ -689,7 +707,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
false -> ok
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1})
+ reply({ok, Remaining, Msg}, State2#q{backing_queue_state = BQS1})
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid,
@@ -798,10 +816,11 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
-handle_cast({deliver, Txn, Message, ChPid}, State) ->
+handle_cast({deliver, Txn, Message, MsgSeqNo, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
- noreply(NewState);
+ State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State),
+ {_Delivered, State2} = deliver_or_enqueue(Txn, ChPid, Message, State1),
+ noreply(State2);
handle_cast({ack, Txn, AckTags, ChPid},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 62ef33179c..b7cfa09a8a 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/4, properties/1, delivery/4]).
+-export([publish/1, message/4, properties/1, delivery/4, delivery/5]).
-export([publish/4, publish/7]).
-export([build_content/2, from_content/1]).
-export([is_message_persistent/1]).
@@ -53,6 +53,10 @@
-spec(delivery/4 ::
(boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
rabbit_types:message()) -> rabiit_types:delivery()).
+-spec(delivery/5 ::
+ (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
+ rabbit_types:message(), undefined | integer())
+ -> rabiit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
properties_input(), binary())
@@ -93,8 +97,11 @@ publish(Delivery = #delivery{
end.
delivery(Mandatory, Immediate, Txn, Message) ->
+ delivery(Mandatory, Immediate, Txn, Message, undefined).
+
+delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) ->
#delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
- sender = self(), message = Message}.
+ sender = self(), message = Message, msg_seq_no = MsgSeqNo}.
build_content(Properties, BodyBin) ->
%% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f6e72814c9..954fb88891 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -521,13 +521,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
content = DecodedContent,
guid = rabbit_guid:guid(),
- is_persistent = IsPersistent,
- msg_seq_no = MsgSeqNo,
- origin = self()},
+ is_persistent = IsPersistent},
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
+ rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)),
% PubAck after basic.returns
State2 = case RoutingRes of
routed -> State1;
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 0f8611d00f..a927ec647e 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -55,7 +55,8 @@
deliver(QPids, Delivery = #delivery{mandatory = false,
immediate = false,
- message = Msg}) ->
+ message = Msg,
+ msg_seq_no = MsgSeqNo}) ->
%% optimisation: when Mandatory = false and Immediate = false,
%% rabbit_amqqueue:deliver will deliver the message to the queue
%% process asynchronously, and return true, which means all the
@@ -65,9 +66,9 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
%% case below.
delegate:invoke_no_result(
QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
- case {QPids, Msg#basic_message.msg_seq_no} of
- {[], MsgSeqNo} -> rabbit_channel:confirm(Msg#basic_message.origin, MsgSeqNo);
- _ -> ok
+ case {QPids, MsgSeqNo} of
+ {[], _} -> rabbit_channel:confirm(self(), MsgSeqNo);
+ _ -> ok
end,
{routed, QPids};