diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-17 10:45:33 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-17 10:45:33 +0100 |
| commit | caefab720fc4bffae44f057438ea91debda63393 (patch) | |
| tree | 69bd4f343784c3324b2ee24ea9ff233d1b0cf995 | |
| parent | ad27f6e8847fb7c01a9c68dd7d6faf220e378c3d (diff) | |
| download | rabbitmq-server-git-caefab720fc4bffae44f057438ea91debda63393.tar.gz | |
move msg_seq_no from #basic_message{} to #delivery{}
| -rw-r--r-- | include/rabbit.hrl | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 55 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 9 |
6 files changed, 64 insertions, 36 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index e8255346c2..161b9b0b2a 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -66,10 +66,10 @@ -record(listener, {node, protocol, host, port}). -record(basic_message, {exchange_name, routing_key, content, guid, - is_persistent, msg_seq_no, origin}). + is_persistent}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, immediate, txn, sender, message}). +-record(delivery, {mandatory, immediate, txn, sender, message, origin, msg_seq_no}). -record(amqp_error, {name, explanation, method = none}). -record(event, {type, props, timestamp}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2453280e34..c25fd9b4cb 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -363,15 +363,18 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity). deliver(QPid, #delivery{immediate = true, - txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid}, + txn = Txn, sender = ChPid, message = Message, + msg_seq_no = MsgSeqNo}) -> + gen_server2:call(QPid, {deliver_immediately, Txn, Message, MsgSeqNo, ChPid}, infinity); deliver(QPid, #delivery{mandatory = true, - txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity), + txn = Txn, sender = ChPid, message = Message, + msg_seq_no = MsgSeqNo}) -> + gen_server2:call(QPid, {deliver, Txn, Message, MsgSeqNo, ChPid}, infinity), true; -deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}), +deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message, + msg_seq_no = MsgSeqNo}) -> + gen_server2:cast(QPid, {deliver, Txn, Message, MsgSeqNo, ChPid}), true. requeue(QPid, MsgIds, ChPid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a25ad48b3b..0dbd7f1778 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -60,7 +60,8 @@ sync_timer_ref, rate_timer_ref, expiry_timer_ref, - stats_timer + stats_timer, + guid_to_channel }). -record(consumer, {tag, ack_required}). @@ -122,7 +123,8 @@ init(Q) -> sync_timer_ref = undefined, rate_timer_ref = undefined, expiry_timer_ref = undefined, - stats_timer = rabbit_event:init_stats_timer()}, hibernate, + stats_timer = rabbit_event:init_stats_timer(), + guid_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -345,7 +347,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun }, FunAcc, ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), % PubAck after message delivered to consumer (disregard consumer acks) - confirm_message(Message), + State2 = confirm_message(Message#basic_message.guid, State1), ChAckTags1 = case AckRequired of true -> sets:add_element(AckTag, ChAckTags); false -> ChAckTags @@ -365,10 +367,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun }, FunAcc, {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - State2 = State1#q{ + State3 = State2#q{ active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers}, - deliver_msgs_to_consumers(Funs, FunAcc1, State2); + deliver_msgs_to_consumers(Funs, FunAcc1, State3); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> store_ch_record(C#cr{is_limit_active = true}), @@ -399,8 +401,21 @@ deliver_from_queue_deliver(AckRequired, false, {{Message, IsDelivered, AckTag}, 0 == Remaining, State #q { backing_queue_state = BQS1 }}. -confirm_message(#basic_message{msg_seq_no = MsgSeqNo, origin = ChPid}) -> - rabbit_channel:confirm(ChPid, MsgSeqNo). +confirm_message(Guid, State = #q{guid_to_channel = GTC}) -> + case dict:find(Guid, GTC) of + {ok, {ChPid, MsgSeqNo}} -> + rabbit_channel:confirm(ChPid, MsgSeqNo), + State#q{guid_to_channel = dict:erase(Guid, GTC)}; + _ -> + State + end. + +maybe_record_confirm_message(undefined, _, _, State) -> + State; +maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State) -> + State#q{guid_to_channel = + dict:store(Message#basic_message.guid, {ChPid, MsgSeqNo}, + State#q.guid_to_channel)}. run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Funs = {fun deliver_from_queue_pred/2, @@ -635,7 +650,7 @@ handle_call(consumers, _From, [{ChPid, ConsumerTag, AckRequired} | Acc] end, [], queue:join(ActiveConsumers, BlockedConsumers)), State); -handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> +handle_call({deliver_immediately, Txn, Message, MsgSeqNo, ChPid}, _From, State) -> %% Synchronous, "immediate" delivery mode %% %% FIXME: Is this correct semantics? @@ -649,13 +664,16 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), - reply(Delivered, NewState); + State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State), + {Delivered, State2} = attempt_delivery(Txn, ChPid, Message, State1 +), + reply(Delivered, State2); -handle_call({deliver, Txn, Message, ChPid}, _From, State) -> +handle_call({deliver, Txn, Message, MsgSeqNo, ChPid}, _From, State) -> %% Synchronous, "mandatory" delivery mode - {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), - reply(Delivered, NewState); + State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State), + {Delivered, State2} = deliver_or_enqueue(Txn, ChPid, Message, State1), + reply(Delivered, State2); handle_call({commit, Txn, ChPid}, From, State) -> NewState = commit_transaction(Txn, From, ChPid, State), @@ -681,7 +699,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1}); {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> % PubAck after message got - confirm_message(Message), + State2 = confirm_message(Message#basic_message.guid, State1), case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), store_ch_record( @@ -689,7 +707,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, false -> ok end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1}) + reply({ok, Remaining, Msg}, State2#q{backing_queue_state = BQS1}) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, @@ -798,10 +816,11 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). -handle_cast({deliver, Txn, Message, ChPid}, State) -> +handle_cast({deliver, Txn, Message, MsgSeqNo, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), - noreply(NewState); + State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State), + {_Delivered, State2} = deliver_or_enqueue(Txn, ChPid, Message, State1), + noreply(State2); handle_cast({ack, Txn, AckTags, ChPid}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 62ef33179c..b7cfa09a8a 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/4, properties/1, delivery/4]). +-export([publish/1, message/4, properties/1, delivery/4, delivery/5]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). -export([is_message_persistent/1]). @@ -53,6 +53,10 @@ -spec(delivery/4 :: (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), rabbit_types:message()) -> rabiit_types:delivery()). +-spec(delivery/5 :: + (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), + rabbit_types:message(), undefined | integer()) + -> rabiit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(), binary()) @@ -93,8 +97,11 @@ publish(Delivery = #delivery{ end. delivery(Mandatory, Immediate, Txn, Message) -> + delivery(Mandatory, Immediate, Txn, Message, undefined). + +delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, - sender = self(), message = Message}. + sender = self(), message = Message, msg_seq_no = MsgSeqNo}. build_content(Properties, BodyBin) -> %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f6e72814c9..954fb88891 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -521,13 +521,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, content = DecodedContent, guid = rabbit_guid:guid(), - is_persistent = IsPersistent, - msg_seq_no = MsgSeqNo, - origin = self()}, + is_persistent = IsPersistent}, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)), % PubAck after basic.returns State2 = case RoutingRes of routed -> State1; diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 0f8611d00f..a927ec647e 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -55,7 +55,8 @@ deliver(QPids, Delivery = #delivery{mandatory = false, immediate = false, - message = Msg}) -> + message = Msg, + msg_seq_no = MsgSeqNo}) -> %% optimisation: when Mandatory = false and Immediate = false, %% rabbit_amqqueue:deliver will deliver the message to the queue %% process asynchronously, and return true, which means all the @@ -65,9 +66,9 @@ deliver(QPids, Delivery = #delivery{mandatory = false, %% case below. delegate:invoke_no_result( QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), - case {QPids, Msg#basic_message.msg_seq_no} of - {[], MsgSeqNo} -> rabbit_channel:confirm(Msg#basic_message.origin, MsgSeqNo); - _ -> ok + case {QPids, MsgSeqNo} of + {[], _} -> rabbit_channel:confirm(self(), MsgSeqNo); + _ -> ok end, {routed, QPids}; |
