diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-13 11:55:46 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-13 11:55:46 +0100 |
| commit | 0b7133e9ec8a891c7dc7779ba635bcb7213914d5 (patch) | |
| tree | 70365d7f830c87a0079f35c917365bdcc225dfb8 | |
| parent | 6ff8e253b1b0d71d96958d38561ddc86bb42e6e8 (diff) | |
| download | rabbitmq-server-git-0b7133e9ec8a891c7dc7779ba635bcb7213914d5.tar.gz | |
new logic
Message sequence numbers are assigned uniquely when the basic.publish
is received. If confirm is not enabled, this is undefined.
Rabbit_channel gains a new API method: confirm. This schedules an ack
for the given sequence number. The idea is that repeated acks are
allowed at this point and de-duplication is done by the channel. This
way, in order to confirm a message, another module just needs to
know 1) the channel and 2) the message sequence number.
No de-duplication is done at this point.
| -rw-r--r-- | src/rabbit_channel.erl | 47 |
1 files changed, 26 insertions, 21 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 07d35ce0f3..04875b5e6c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -266,10 +266,10 @@ handle_cast(multiple_ack_flush, false -> flush_multiple(As, WriterPid) end, {noreply, State#ch{confirm = C#confirm{held_acks = gb_sets:new(), - tref = undefined}}}. -%handle_cast({confirm, MsgSeqNo}, State) -> -% rabbit_log:info("got confirm for #~p~n", [MsgSeqNo]), -% {noreply, SOMETHING MAGIC + tref = undefined}}}; +handle_cast({confirm, MsgSeqNo}, State) -> + rabbit_log:info("got confirm for #~p~n", [MsgSeqNo]), + {noreply, send_or_enqueue_ack(MsgSeqNo, State)}. handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, @@ -424,23 +424,18 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -handle_confirm(State = #ch{confirm = #confirm{enabled = false}}, _) -> +send_or_enqueue_ack(_, State = #ch{confirm = #confirm{enabled = false}}) -> State; -handle_confirm(State = #ch{writer_pid = WriterPid, - confirm = C = #confirm{ count = Count, multiple = false}}, - false) -> - rabbit_log:info("handling confirm in single transient mode (#~p)~n", [Count]), - ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{ delivery_tag = Count }), - State#ch{confirm = C#confirm{ count = Count+1 }}; -handle_confirm(State = #ch{confirm = #confirm{multiple = true}}, false) -> - State1 = #ch{confirm = C = #confirm{count = Count, - held_acks = As}} = start_ack_timer(State), - rabbit_log:info("handling confirm in multiple transient mode (#~p)~n", [Count]), - State1#ch{confirm = C#confirm{count = Count+1, - held_acks = gb_sets:add(Count, As)}}; -handle_confirm(State = #ch{confirm = C = #confirm{count = Count}}, IsPersistent) -> - rabbit_log:info("handling confirm (#~p, persistent = ~p)~n", [Count, IsPersistent]), - State#ch{confirm = C#confirm{count = Count+1}}. +send_or_enqueue_ack(MsgSeqNo, + State = #ch{writer_pid = WriterPid, + confirm = #confirm{multiple = false}}) -> + rabbit_log:info("handling confirm in single mode (#~p)~n", [MsgSeqNo]), + ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{delivery_tag = MsgSeqNo}), + State; +send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm = #confirm{multiple = true}}) -> + rabbit_log:info("handling confirm in multiple mode (#~p)~n", [MsgSeqNo]), + State1 = #ch{confirm = C = #confirm{held_acks = As}} = start_ack_timer(State), + State1#ch{confirm = C#confirm{held_acks = gb_sets:add(MsgSeqNo, As)}}. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -474,7 +469,17 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), IsPersistent = is_message_persistent(DecodedContent), - State1 = handle_confirm(State, IsPersistent), + {_MsgSeqNo, State1} + = case State#ch.confirm#confirm.enabled of + false -> + {undefined, State}; + true -> + Count = State#ch.confirm#confirm.count, + NewState = send_or_enqueue_ack(Count, State), + Confirm = NewState#ch.confirm, + {Count, + NewState#ch{confirm = Confirm#confirm{count = Count+1}}} + end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, |
