summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-13 11:55:46 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-13 11:55:46 +0100
commit0b7133e9ec8a891c7dc7779ba635bcb7213914d5 (patch)
tree70365d7f830c87a0079f35c917365bdcc225dfb8
parent6ff8e253b1b0d71d96958d38561ddc86bb42e6e8 (diff)
downloadrabbitmq-server-git-0b7133e9ec8a891c7dc7779ba635bcb7213914d5.tar.gz
new logic
Message sequence numbers are assigned uniquely when the basic.publish is received. If confirm is not enabled, this is undefined. Rabbit_channel gains a new API method: confirm. This schedules an ack for the given sequence number. The idea is that repeated acks are allowed at this point and de-duplication is done by the channel. This way, in order to confirm a message, another module just needs to know 1) the channel and 2) the message sequence number. No de-duplication is done at this point.
-rw-r--r--src/rabbit_channel.erl47
1 files changed, 26 insertions, 21 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 07d35ce0f3..04875b5e6c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -266,10 +266,10 @@ handle_cast(multiple_ack_flush,
false -> flush_multiple(As, WriterPid)
end,
{noreply, State#ch{confirm = C#confirm{held_acks = gb_sets:new(),
- tref = undefined}}}.
-%handle_cast({confirm, MsgSeqNo}, State) ->
-% rabbit_log:info("got confirm for #~p~n", [MsgSeqNo]),
-% {noreply, SOMETHING MAGIC
+ tref = undefined}}};
+handle_cast({confirm, MsgSeqNo}, State) ->
+ rabbit_log:info("got confirm for #~p~n", [MsgSeqNo]),
+ {noreply, send_or_enqueue_ack(MsgSeqNo, State)}.
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
@@ -424,23 +424,18 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-handle_confirm(State = #ch{confirm = #confirm{enabled = false}}, _) ->
+send_or_enqueue_ack(_, State = #ch{confirm = #confirm{enabled = false}}) ->
State;
-handle_confirm(State = #ch{writer_pid = WriterPid,
- confirm = C = #confirm{ count = Count, multiple = false}},
- false) ->
- rabbit_log:info("handling confirm in single transient mode (#~p)~n", [Count]),
- ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{ delivery_tag = Count }),
- State#ch{confirm = C#confirm{ count = Count+1 }};
-handle_confirm(State = #ch{confirm = #confirm{multiple = true}}, false) ->
- State1 = #ch{confirm = C = #confirm{count = Count,
- held_acks = As}} = start_ack_timer(State),
- rabbit_log:info("handling confirm in multiple transient mode (#~p)~n", [Count]),
- State1#ch{confirm = C#confirm{count = Count+1,
- held_acks = gb_sets:add(Count, As)}};
-handle_confirm(State = #ch{confirm = C = #confirm{count = Count}}, IsPersistent) ->
- rabbit_log:info("handling confirm (#~p, persistent = ~p)~n", [Count, IsPersistent]),
- State#ch{confirm = C#confirm{count = Count+1}}.
+send_or_enqueue_ack(MsgSeqNo,
+ State = #ch{writer_pid = WriterPid,
+ confirm = #confirm{multiple = false}}) ->
+ rabbit_log:info("handling confirm in single mode (#~p)~n", [MsgSeqNo]),
+ ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{delivery_tag = MsgSeqNo}),
+ State;
+send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm = #confirm{multiple = true}}) ->
+ rabbit_log:info("handling confirm in multiple mode (#~p)~n", [MsgSeqNo]),
+ State1 = #ch{confirm = C = #confirm{held_acks = As}} = start_ack_timer(State),
+ State1#ch{confirm = C#confirm{held_acks = gb_sets:add(MsgSeqNo, As)}}.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -474,7 +469,17 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
IsPersistent = is_message_persistent(DecodedContent),
- State1 = handle_confirm(State, IsPersistent),
+ {_MsgSeqNo, State1}
+ = case State#ch.confirm#confirm.enabled of
+ false ->
+ {undefined, State};
+ true ->
+ Count = State#ch.confirm#confirm.count,
+ NewState = send_or_enqueue_ack(Count, State),
+ Confirm = NewState#ch.confirm,
+ {Count,
+ NewState#ch{confirm = Confirm#confirm{count = Count+1}}}
+ end,
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = DecodedContent,