diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-13 12:21:01 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-13 12:21:01 +0100 |
| commit | 3a4cf175f8da0de9eca9619b218a18530c0ac55c (patch) | |
| tree | f1b9ed93dbb08cd58fbcf74be53a630c33b81c45 | |
| parent | 0b7133e9ec8a891c7dc7779ba635bcb7213914d5 (diff) | |
| download | rabbitmq-server-git-3a4cf175f8da0de9eca9619b218a18530c0ac55c.tar.gz | |
only transient messages are immediately ack'd (so far)
In addition, a message sequence number is recorded in the #delivery{}
sent to the exchange. If the message doesn't need to be ack'd, this
sequence number is undefined.
| -rw-r--r-- | include/rabbit.hrl | 2 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 18 |
3 files changed, 22 insertions, 10 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index b9abd78857..bf68021702 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -69,7 +69,7 @@ is_persistent}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, immediate, txn, sender, message}). +-record(delivery, {mandatory, immediate, txn, sender, message, msg_seq_no}). -record(amqp_error, {name, explanation, method = none}). -record(event, {type, props, timestamp}). diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index d62fc07cb0..3aa25d6327 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/4, properties/1, delivery/4]). +-export([publish/1, message/4, properties/1, delivery/4, delivery/5]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). -export([is_message_persistent/1]). @@ -51,8 +51,12 @@ -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). -spec(delivery/4 :: + (boolean(), boolean(), rabbit_types:maybe(rabbit_type:txn()), + rabbit_types:message()) -> rabiit_types:delivery()). +-spec(delivery/5 :: (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), - rabbit_types:message()) -> rabbit_types:delivery()). + rabbit_types:message(), integer() | undefined) + -> rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(), binary()) @@ -93,8 +97,10 @@ publish(Delivery = #delivery{ end. delivery(Mandatory, Immediate, Txn, Message) -> + delivery(Mandatory, Immediate, Txn, Message, undefined). +delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, - sender = self(), message = Message}. + sender = self(), message = Message, msg_seq_no = MsgSeqNo}. build_content(Properties, BodyBin) -> %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 04875b5e6c..58f03e7a1d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -157,8 +157,8 @@ flush(Pid) -> flush_multiple_acks(Pid) -> gen_server2:cast(Pid, multiple_ack_flush). -confirm(Pid, MessageSequenceNumber) -> - gen_server2:cast(Pid, {confirm, MessageSequenceNumber}). +confirm(Pid, MsgSeqNo) -> + gen_server2:cast(Pid, {confirm, MsgSeqNo}). %%--------------------------------------------------------------------------- @@ -469,15 +469,21 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), IsPersistent = is_message_persistent(DecodedContent), - {_MsgSeqNo, State1} + {MsgSeqNo, State1} = case State#ch.confirm#confirm.enabled of false -> {undefined, State}; true -> Count = State#ch.confirm#confirm.count, - NewState = send_or_enqueue_ack(Count, State), + {CountOrUndefined, NewState} = + case IsPersistent of + true -> {Count, State}; + false -> {undefined, + send_or_enqueue_ack( + Count, State)} + end, Confirm = NewState#ch.confirm, - {Count, + {CountOrUndefined, NewState#ch{confirm = Confirm#confirm{count = Count+1}}} end, Message = #basic_message{exchange_name = ExchangeName, @@ -488,7 +494,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)), case RoutingRes of routed -> ok; unroutable -> ok = basic_return(Message, WriterPid, no_route); |
