diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 116 |
1 files changed, 76 insertions, 40 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 81a0ee803c..0c211b46ab 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -30,7 +30,7 @@ prioritise_cast/2]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - limiter_pid, start_limiter_fun, tx_enabled, next_tag, + limiter_pid, start_limiter_fun, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_ack_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, consumer_monitors, queue_collector_pid, @@ -174,7 +174,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, conn_pid = ConnPid, limiter_pid = undefined, start_limiter_fun = StartLimiterFun, - tx_enabled = false, + tx_status = none, next_tag = 1, unacked_message_q = queue:new(), uncommitted_message_q = queue:new(), @@ -516,10 +516,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -blind_confirm({#delivery{message = #basic_message{exchange_name = XName}, - msg_seq_no = MsgSeqNo}, _QNames}, State) -> - record_confirm(MsgSeqNo, XName, State). - record_confirm(undefined, _, State) -> State; record_confirm(MsgSeqNo, XName, State) -> @@ -598,6 +594,15 @@ handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> ReaderPid ! {channel_closing, self()}, {noreply, State1}; +%% Even though the spec prohibits the client from sending commands +%% while waiting for the reply to a synchronous command, we generally +%% do allow this...except in the case of a pending tx.commit, where +%% it could wreak havoc. +handle_method(_Method, _, #ch{tx_status = TxStatus}) + when TxStatus =/= none andalso TxStatus =/= in_progress -> + rabbit_misc:protocol_error( + channel_error, "unexpected command while processing 'tx.commit'", []); + handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; @@ -606,7 +611,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, mandatory = Mandatory, immediate = Immediate}, Content, State = #ch{virtual_host = VHostPath, - tx_enabled = TxEnabled, + tx_status = TxStatus, confirm_enabled = ConfirmEnabled, trace_state = TraceState}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), @@ -618,10 +623,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), check_user_id_header(DecodedContent#content.properties, State), {MsgSeqNo, State1} = - case ConfirmEnabled of - false -> {undefined, State}; - true -> SeqNo = State#ch.publish_seqno, - {SeqNo, State#ch{publish_seqno = SeqNo + 1}} + case {TxStatus, ConfirmEnabled} of + {none, false} -> {undefined, State}; + {_, _} -> SeqNo = State#ch.publish_seqno, + {SeqNo, State#ch{publish_seqno = SeqNo + 1}} end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> @@ -629,13 +634,13 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), - State2 = case TxEnabled of - true -> TMQ = State1#ch.uncommitted_message_q, - NewTMQ = queue:in({Delivery, QNames}, TMQ), - State1#ch{uncommitted_message_q = NewTMQ}; - false -> deliver_to_queues({Delivery, QNames}, State1) - end, - {noreply, State2}; + {noreply, + case TxStatus of + none -> deliver_to_queues({Delivery, QNames}, State1); + in_progress -> TMQ = State1#ch.uncommitted_message_q, + NewTMQ = queue:in({Delivery, QNames}, TMQ), + State1#ch{uncommitted_message_q = NewTMQ} + end}; {error, Reason} -> rabbit_misc:protocol_error(precondition_failed, "invalid message: ~p", [Reason]) @@ -650,15 +655,15 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{unacked_message_q = UAMQ, - tx_enabled = TxEnabled}) -> + tx_status = TxStatus}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, - {noreply, case TxEnabled of - true -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, - Acked), - State1#ch{uncommitted_ack_q = NewTAQ}; - false -> ack(Acked, State1) - end}; + {noreply, + case TxStatus of + none -> ack(Acked, State1); + in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, Acked), + State1#ch{uncommitted_ack_q = NewTAQ} + end}; handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, @@ -1039,28 +1044,35 @@ handle_method(#'queue.purge'{queue = QueueNameBin, return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); +handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> + rabbit_misc:protocol_error( + precondition_failed, "cannot switch from confirm to tx mode", []); + handle_method(#'tx.select'{}, _, State) -> - {reply, #'tx.select_ok'{}, State#ch{tx_enabled = true}}; + {reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}}; -handle_method(#'tx.commit'{}, _, #ch{tx_enabled = false}) -> +handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, uncommitted_ack_q = TAQ}) -> - State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ), - {reply, #'tx.commit_ok'{}, new_tx(ack(TAQ, State1))}; + State1 = new_tx(ack(TAQ, rabbit_misc:queue_fold(fun deliver_to_queues/2, + State, TMQ))), + {noreply, maybe_complete_tx(State1#ch{tx_status = committing})}; -handle_method(#'tx.rollback'{}, _, #ch{tx_enabled = false}) -> +handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, - uncommitted_message_q = TMQ, uncommitted_ack_q = TAQ}) -> - State1 = rabbit_misc:queue_fold(fun blind_confirm/2, State, TMQ), - {reply, #'tx.rollback_ok'{}, new_tx(State1#ch{unacked_message_q = - queue:join(TAQ, UAMQ)})}; + {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = + queue:join(TAQ, UAMQ)})}; + +handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> + rabbit_misc:protocol_error( + precondition_failed, "cannot switch from tx to confirm mode", []); handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> return_ok(State#ch{confirm_enabled = true}, @@ -1126,7 +1138,7 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, {Nack, SendFun} = case Reason of normal -> {false, fun record_confirms/2}; - _ -> {true, fun send_nacks/2} + _ -> {true, fun send_nacks/2} end, {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), erase_queue_stats(QPid), @@ -1348,20 +1360,25 @@ lock_message(false, _MsgStruct, State) -> send_nacks([], State) -> State; -send_nacks(MXs, State) -> +send_nacks(MXs, State = #ch{tx_status = none}) -> MsgSeqNos = [ MsgSeqNo || {MsgSeqNo, _} <- MXs ], coalesce_and_send(MsgSeqNos, fun(MsgSeqNo, Multiple) -> #'basic.nack'{delivery_tag = MsgSeqNo, multiple = Multiple} - end, State). + end, State); +send_nacks(_, State) -> + maybe_complete_tx(State#ch{tx_status = failed}). -send_confirms(State = #ch{confirmed = C}) -> +send_confirms(State = #ch{tx_status = none, confirmed = C}) -> C1 = lists:append(C), MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State), MsgSeqNo end || {MsgSeqNo, ExchangeName} <- C1 ], - send_confirms(MsgSeqNos, State #ch{confirmed = []}). + send_confirms(MsgSeqNos, State #ch{confirmed = []}); +send_confirms(State) -> + maybe_complete_tx(State). + send_confirms([], State) -> State; send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> @@ -1391,6 +1408,25 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], State. +maybe_complete_tx(State = #ch{tx_status = in_progress}) -> + State; +maybe_complete_tx(State = #ch{unconfirmed_mq = UMQ}) -> + case gb_trees:is_empty(UMQ) of + false -> State; + true -> complete_tx(State#ch{confirmed = []}) + end. + +complete_tx(State = #ch{tx_status = committing}) -> + ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}), + State#ch{tx_status = in_progress}; +complete_tx(State = #ch{tx_status = failed}) -> + {noreply, State1} = send_exception( + rabbit_misc:amqp_error( + precondition_failed, "partial tx completion", [], + 'tx.commit'), + State), + State1#ch{tx_status = in_progress}. + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _) -> self(); @@ -1398,7 +1434,7 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid; i(number, #ch{channel = Channel}) -> Channel; i(user, #ch{user = User}) -> User#user.username; i(vhost, #ch{virtual_host = VHost}) -> VHost; -i(transactional, #ch{tx_enabled = TE}) -> TE; +i(transactional, #ch{tx_status = TE}) -> TE =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); |
