summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl116
1 files changed, 76 insertions, 40 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 81a0ee803c..0c211b46ab 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -30,7 +30,7 @@
prioritise_cast/2]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- limiter_pid, start_limiter_fun, tx_enabled, next_tag,
+ limiter_pid, start_limiter_fun, tx_status, next_tag,
unacked_message_q, uncommitted_message_q, uncommitted_ack_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
@@ -174,7 +174,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
conn_pid = ConnPid,
limiter_pid = undefined,
start_limiter_fun = StartLimiterFun,
- tx_enabled = false,
+ tx_status = none,
next_tag = 1,
unacked_message_q = queue:new(),
uncommitted_message_q = queue:new(),
@@ -516,10 +516,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-blind_confirm({#delivery{message = #basic_message{exchange_name = XName},
- msg_seq_no = MsgSeqNo}, _QNames}, State) ->
- record_confirm(MsgSeqNo, XName, State).
-
record_confirm(undefined, _, State) ->
State;
record_confirm(MsgSeqNo, XName, State) ->
@@ -598,6 +594,15 @@ handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
ReaderPid ! {channel_closing, self()},
{noreply, State1};
+%% Even though the spec prohibits the client from sending commands
+%% while waiting for the reply to a synchronous command, we generally
+%% do allow this...except in the case of a pending tx.commit, where
+%% it could wreak havoc.
+handle_method(_Method, _, #ch{tx_status = TxStatus})
+ when TxStatus =/= none andalso TxStatus =/= in_progress ->
+ rabbit_misc:protocol_error(
+ channel_error, "unexpected command while processing 'tx.commit'", []);
+
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
@@ -606,7 +611,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
mandatory = Mandatory,
immediate = Immediate},
Content, State = #ch{virtual_host = VHostPath,
- tx_enabled = TxEnabled,
+ tx_status = TxStatus,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
@@ -618,10 +623,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
check_user_id_header(DecodedContent#content.properties, State),
{MsgSeqNo, State1} =
- case ConfirmEnabled of
- false -> {undefined, State};
- true -> SeqNo = State#ch.publish_seqno,
- {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
+ case {TxStatus, ConfirmEnabled} of
+ {none, false} -> {undefined, State};
+ {_, _} -> SeqNo = State#ch.publish_seqno,
+ {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
@@ -629,13 +634,13 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message,
MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
- State2 = case TxEnabled of
- true -> TMQ = State1#ch.uncommitted_message_q,
- NewTMQ = queue:in({Delivery, QNames}, TMQ),
- State1#ch{uncommitted_message_q = NewTMQ};
- false -> deliver_to_queues({Delivery, QNames}, State1)
- end,
- {noreply, State2};
+ {noreply,
+ case TxStatus of
+ none -> deliver_to_queues({Delivery, QNames}, State1);
+ in_progress -> TMQ = State1#ch.uncommitted_message_q,
+ NewTMQ = queue:in({Delivery, QNames}, TMQ),
+ State1#ch{uncommitted_message_q = NewTMQ}
+ end};
{error, Reason} ->
rabbit_misc:protocol_error(precondition_failed,
"invalid message: ~p", [Reason])
@@ -650,15 +655,15 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{unacked_message_q = UAMQ,
- tx_enabled = TxEnabled}) ->
+ tx_status = TxStatus}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
- {noreply, case TxEnabled of
- true -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q,
- Acked),
- State1#ch{uncommitted_ack_q = NewTAQ};
- false -> ack(Acked, State1)
- end};
+ {noreply,
+ case TxStatus of
+ none -> ack(Acked, State1);
+ in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, Acked),
+ State1#ch{uncommitted_ack_q = NewTAQ}
+ end};
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
@@ -1039,28 +1044,35 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
+handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "cannot switch from confirm to tx mode", []);
+
handle_method(#'tx.select'{}, _, State) ->
- {reply, #'tx.select_ok'{}, State#ch{tx_enabled = true}};
+ {reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}};
-handle_method(#'tx.commit'{}, _, #ch{tx_enabled = false}) ->
+handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
uncommitted_ack_q = TAQ}) ->
- State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
- {reply, #'tx.commit_ok'{}, new_tx(ack(TAQ, State1))};
+ State1 = new_tx(ack(TAQ, rabbit_misc:queue_fold(fun deliver_to_queues/2,
+ State, TMQ))),
+ {noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
-handle_method(#'tx.rollback'{}, _, #ch{tx_enabled = false}) ->
+handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_message_q = TMQ,
uncommitted_ack_q = TAQ}) ->
- State1 = rabbit_misc:queue_fold(fun blind_confirm/2, State, TMQ),
- {reply, #'tx.rollback_ok'{}, new_tx(State1#ch{unacked_message_q =
- queue:join(TAQ, UAMQ)})};
+ {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q =
+ queue:join(TAQ, UAMQ)})};
+
+handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "cannot switch from tx to confirm mode", []);
handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
return_ok(State#ch{confirm_enabled = true},
@@ -1126,7 +1138,7 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
{Nack, SendFun} = case Reason of
normal -> {false, fun record_confirms/2};
- _ -> {true, fun send_nacks/2}
+ _ -> {true, fun send_nacks/2}
end,
{MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
erase_queue_stats(QPid),
@@ -1348,20 +1360,25 @@ lock_message(false, _MsgStruct, State) ->
send_nacks([], State) ->
State;
-send_nacks(MXs, State) ->
+send_nacks(MXs, State = #ch{tx_status = none}) ->
MsgSeqNos = [ MsgSeqNo || {MsgSeqNo, _} <- MXs ],
coalesce_and_send(MsgSeqNos,
fun(MsgSeqNo, Multiple) ->
#'basic.nack'{delivery_tag = MsgSeqNo,
multiple = Multiple}
- end, State).
+ end, State);
+send_nacks(_, State) ->
+ maybe_complete_tx(State#ch{tx_status = failed}).
-send_confirms(State = #ch{confirmed = C}) ->
+send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
C1 = lists:append(C),
MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State),
MsgSeqNo
end || {MsgSeqNo, ExchangeName} <- C1 ],
- send_confirms(MsgSeqNos, State #ch{confirmed = []}).
+ send_confirms(MsgSeqNos, State #ch{confirmed = []});
+send_confirms(State) ->
+ maybe_complete_tx(State).
+
send_confirms([], State) ->
State;
send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
@@ -1391,6 +1408,25 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
State.
+maybe_complete_tx(State = #ch{tx_status = in_progress}) ->
+ State;
+maybe_complete_tx(State = #ch{unconfirmed_mq = UMQ}) ->
+ case gb_trees:is_empty(UMQ) of
+ false -> State;
+ true -> complete_tx(State#ch{confirmed = []})
+ end.
+
+complete_tx(State = #ch{tx_status = committing}) ->
+ ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}),
+ State#ch{tx_status = in_progress};
+complete_tx(State = #ch{tx_status = failed}) ->
+ {noreply, State1} = send_exception(
+ rabbit_misc:amqp_error(
+ precondition_failed, "partial tx completion", [],
+ 'tx.commit'),
+ State),
+ State1#ch{tx_status = in_progress}.
+
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _) -> self();
@@ -1398,7 +1434,7 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
-i(transactional, #ch{tx_enabled = TE}) -> TE;
+i(transactional, #ch{tx_status = TE}) -> TE =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);