summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-07-04 10:38:30 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-07-04 10:38:30 +0100
commit37f6371719da169ac1a53d582e87fbf911251c3a (patch)
tree2b96616c3b8a71a5c04d41e7856087de1eaa2b00
parent53c99e56c0136881d8f9ffd409376e1314e2676d (diff)
downloadrabbitmq-server-git-37f6371719da169ac1a53d582e87fbf911251c3a.tar.gz
change tx semantics to 'batching'
We keep track of uncommitted messages and acks in the channel. All routing decisions are made instantly, which means errors are detected straight away. We increment pub/ack stats on commit only.
-rw-r--r--docs/rabbitmqctl.1.xml16
-rw-r--r--src/rabbit_basic.erl3
-rw-r--r--src/rabbit_channel.erl100
-rw-r--r--src/rabbit_exchange.erl20
4 files changed, 103 insertions, 36 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index fdb49912c7..93c3fcd877 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1217,7 +1217,21 @@
messages to the channel's consumers.
</para></listitem>
</varlistentry>
- <varlistentry>
+ <varlistentry>
+ <term>transactional</term>
+ <listitem><para>True if the channel is in transactional mode, false otherwise.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>messages_uncommitted</term>
+ <listitem><para>Number of messages received in an as yet
+ uncommitted transaction.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>acks_uncommitted</term>
+ <listitem><para>Number of acknowledgements received in an as yet
+ uncommitted transaction.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>confirm</term>
<listitem><para>True if the channel is in confirm mode, false otherwise.</para></listitem>
</varlistentry>
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index ec8ed35144..9cc406e718 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -170,7 +170,8 @@ publish(XName, RKey, Mandatory, Immediate, Props, Body) ->
end.
publish(X, Delivery) ->
- {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery),
+ {RoutingRes, DeliveredQPids} =
+ rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery),
{ok, RoutingRes, DeliveredQPids}.
is_message_persistent(#content{properties = #'P_basic'{
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f0f8c4dda9..df337aef3a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -30,7 +30,8 @@
prioritise_cast/2]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- limiter_pid, start_limiter_fun, next_tag, unacked_message_q,
+ limiter_pid, start_limiter_fun, tx_enabled, next_tag,
+ unacked_message_q, uncommitted_message_q, uncommitted_ack_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
@@ -40,10 +41,13 @@
-define(STATISTICS_KEYS,
[pid,
+ transactional,
confirm,
consumer_count,
messages_unacknowledged,
messages_unconfirmed,
+ messages_uncommitted,
+ acks_uncommitted,
prefetch_count,
client_flow_blocked]).
@@ -170,8 +174,11 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
conn_pid = ConnPid,
limiter_pid = undefined,
start_limiter_fun = StartLimiterFun,
+ tx_enabled = false,
next_tag = 1,
unacked_message_q = queue:new(),
+ uncommitted_message_q = queue:new(),
+ uncommitted_ack_q = queue:new(),
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
@@ -595,6 +602,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
mandatory = Mandatory,
immediate = Immediate},
Content, State = #ch{virtual_host = VHostPath,
+ tx_enabled = TxEnabled,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
@@ -614,16 +622,15 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
rabbit_trace:tap_trace_in(Message, TraceState),
- {RoutingRes, DeliveredQPids} =
- rabbit_exchange:publish(
- Exchange, rabbit_basic:delivery(Mandatory, Immediate, Message,
- MsgSeqNo)),
- State2 = process_routing_result(RoutingRes, DeliveredQPids,
- ExchangeName, MsgSeqNo, Message,
- State1),
- maybe_incr_stats([{ExchangeName, 1} |
- [{{QPid, ExchangeName}, 1} ||
- QPid <- DeliveredQPids]], publish, State2),
+ Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message,
+ MsgSeqNo),
+ QNames = rabbit_exchange:route(Exchange, Delivery),
+ State2 = case TxEnabled of
+ true -> TMQ = State1#ch.uncommitted_message_q,
+ NewTMQ = queue:in({Delivery, QNames}, TMQ),
+ State1#ch{uncommitted_message_q = NewTMQ};
+ false -> deliver_to_queues({Delivery, QNames}, State1)
+ end,
{noreply, State2};
{error, Reason} ->
rabbit_misc:protocol_error(precondition_failed,
@@ -638,12 +645,16 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
- _, State = #ch{unacked_message_q = UAMQ}) ->
+ _, State = #ch{unacked_message_q = UAMQ,
+ tx_enabled = TxEnabled}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
- QIncs = ack(Acked),
- maybe_incr_stats(QIncs, ack, State),
- ok = notify_limiter(State#ch.limiter_pid, Acked),
- {noreply, State#ch{unacked_message_q = Remaining}};
+ State1 = State#ch{unacked_message_q = Remaining},
+ {noreply, case TxEnabled of
+ true -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q,
+ Acked),
+ State1#ch{uncommitted_ack_q = NewTAQ};
+ false -> ack(Acked, State1)
+ end};
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
@@ -1024,6 +1035,26 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
+handle_method(#'tx.select'{}, _, State) ->
+ {reply, #'tx.select_ok'{}, State#ch{tx_enabled = true}};
+
+handle_method(#'tx.commit'{}, _, #ch{tx_enabled = false}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "channel is not transactional", []);
+
+handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
+ uncommitted_ack_q = TAQ}) ->
+ State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
+ {reply, #'tx.commit_ok'{}, new_tx(ack(TAQ, State1))};
+
+handle_method(#'tx.rollback'{}, _, #ch{tx_enabled = false}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "channel is not transactional", []);
+
+handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
+ uncommitted_ack_q = TAQ}) ->
+ {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q =
+ queue:join(TAQ, UAMQ)})};
handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
return_ok(State#ch{confirm_enabled = true},
@@ -1200,11 +1231,18 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
precondition_failed, "unknown delivery tag ~w", [DeliveryTag])
end.
-ack(UAQ) ->
- fold_per_queue(fun (QPid, MsgIds, L) ->
- ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
- [{QPid, length(MsgIds)} | L]
- end, [], UAQ).
+ack(Acked, State) ->
+ QIncs = fold_per_queue(
+ fun (QPid, MsgIds, L) ->
+ ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
+ [{QPid, length(MsgIds)} | L]
+ end, [], Acked),
+ maybe_incr_stats(QIncs, ack, State),
+ ok = notify_limiter(State#ch.limiter_pid, Acked),
+ State.
+
+new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
+ uncommitted_ack_q = queue:new()}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1255,6 +1293,18 @@ notify_limiter(LimiterPid, Acked) ->
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
+deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
+ exchange_name = XName},
+ msg_seq_no = MsgSeqNo},
+ QNames}, State) ->
+ {RoutingRes, DeliveredQPids} = rabbit_router:deliver(QNames, Delivery),
+ State1 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message, State),
+ maybe_incr_stats([{XName, 1} |
+ [{{QPid, XName}, 1} ||
+ QPid <- DeliveredQPids]], publish, State1),
+ State1.
+
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
@@ -1262,8 +1312,7 @@ process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
record_confirm(MsgSeqNo, XName, State);
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_consumers),
- maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
- return_not_delivered, State),
+ maybe_incr_stats([{XName, 1}], return_not_delivered, State),
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
@@ -1343,6 +1392,7 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
+i(transactional, #ch{tx_enabled = TE}) -> TE;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
@@ -1350,6 +1400,10 @@ i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) ->
gb_trees:size(UMQ);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
queue:len(UAMQ);
+i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
+ queue:len(TMQ);
+i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) ->
+ queue:len(TAQ);
i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
rabbit_limiter:get_limit(LimiterPid);
i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index cab1b99f6f..cecd879a9f 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -22,7 +22,7 @@
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
lookup/1, lookup_or_die/1, list/1,
info_keys/0, info/1, info/2, info_all/1, info_all/2,
- publish/2, delete/2]).
+ route/2, delete/2]).
%% these must be run inside a mnesia tx
-export([maybe_auto_delete/1, serial/1, peek_serial/1]).
@@ -66,8 +66,8 @@
-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys())
-> [rabbit_types:infos()]).
--spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
- -> {rabbit_router:routing_result(), [pid()]}).
+-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
+ -> [rabbit_amqqueue:name()]).
-spec(delete/2 ::
(name(), boolean())-> 'ok' |
rabbit_types:error('not_found') |
@@ -224,21 +224,19 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-publish(X = #exchange{name = XName}, Delivery) ->
- rabbit_router:deliver(
- route(Delivery, {queue:from_list([X]), XName, []}),
- Delivery).
+route(X = #exchange{name = XName}, Delivery) ->
+ route1(Delivery, {queue:from_list([X]), XName, []}).
-route(Delivery, {WorkList, SeenXs, QNames}) ->
+route1(Delivery, {WorkList, SeenXs, QNames}) ->
case queue:out(WorkList) of
{empty, _WorkList} ->
lists:usort(QNames);
{{value, X = #exchange{type = Type}}, WorkList1} ->
DstNames = process_alternate(
X, ((type_to_module(Type)):route(X, Delivery))),
- route(Delivery,
- lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames},
- DstNames))
+ route1(Delivery,
+ lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames},
+ DstNames))
end.
process_alternate(#exchange{name = XName, arguments = Args}, []) ->