summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-13 12:21:01 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-13 12:21:01 +0100
commit3a4cf175f8da0de9eca9619b218a18530c0ac55c (patch)
treef1b9ed93dbb08cd58fbcf74be53a630c33b81c45
parent0b7133e9ec8a891c7dc7779ba635bcb7213914d5 (diff)
downloadrabbitmq-server-git-3a4cf175f8da0de9eca9619b218a18530c0ac55c.tar.gz
only transient messages are immediately ack'd (so far)
In addition, a message sequence number is recorded in the #delivery{} sent to the exchange. If the message doesn't need to be ack'd, this sequence number is undefined.
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_basic.erl12
-rw-r--r--src/rabbit_channel.erl18
3 files changed, 22 insertions, 10 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index b9abd78857..bf68021702 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -69,7 +69,7 @@
is_persistent}).
-record(ssl_socket, {tcp, ssl}).
--record(delivery, {mandatory, immediate, txn, sender, message}).
+-record(delivery, {mandatory, immediate, txn, sender, message, msg_seq_no}).
-record(amqp_error, {name, explanation, method = none}).
-record(event, {type, props, timestamp}).
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index d62fc07cb0..3aa25d6327 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/4, properties/1, delivery/4]).
+-export([publish/1, message/4, properties/1, delivery/4, delivery/5]).
-export([publish/4, publish/7]).
-export([build_content/2, from_content/1]).
-export([is_message_persistent/1]).
@@ -51,8 +51,12 @@
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
-spec(delivery/4 ::
+ (boolean(), boolean(), rabbit_types:maybe(rabbit_type:txn()),
+ rabbit_types:message()) -> rabiit_types:delivery()).
+-spec(delivery/5 ::
(boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
- rabbit_types:message()) -> rabbit_types:delivery()).
+ rabbit_types:message(), integer() | undefined)
+ -> rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
properties_input(), binary())
@@ -93,8 +97,10 @@ publish(Delivery = #delivery{
end.
delivery(Mandatory, Immediate, Txn, Message) ->
+ delivery(Mandatory, Immediate, Txn, Message, undefined).
+delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) ->
#delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
- sender = self(), message = Message}.
+ sender = self(), message = Message, msg_seq_no = MsgSeqNo}.
build_content(Properties, BodyBin) ->
%% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 04875b5e6c..58f03e7a1d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -157,8 +157,8 @@ flush(Pid) ->
flush_multiple_acks(Pid) ->
gen_server2:cast(Pid, multiple_ack_flush).
-confirm(Pid, MessageSequenceNumber) ->
- gen_server2:cast(Pid, {confirm, MessageSequenceNumber}).
+confirm(Pid, MsgSeqNo) ->
+ gen_server2:cast(Pid, {confirm, MsgSeqNo}).
%%---------------------------------------------------------------------------
@@ -469,15 +469,21 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
IsPersistent = is_message_persistent(DecodedContent),
- {_MsgSeqNo, State1}
+ {MsgSeqNo, State1}
= case State#ch.confirm#confirm.enabled of
false ->
{undefined, State};
true ->
Count = State#ch.confirm#confirm.count,
- NewState = send_or_enqueue_ack(Count, State),
+ {CountOrUndefined, NewState} =
+ case IsPersistent of
+ true -> {Count, State};
+ false -> {undefined,
+ send_or_enqueue_ack(
+ Count, State)}
+ end,
Confirm = NewState#ch.confirm,
- {Count,
+ {CountOrUndefined,
NewState#ch{confirm = Confirm#confirm{count = Count+1}}}
end,
Message = #basic_message{exchange_name = ExchangeName,
@@ -488,7 +494,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
+ rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)),
case RoutingRes of
routed -> ok;
unroutable -> ok = basic_return(Message, WriterPid, no_route);