summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-11 13:54:36 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-11 13:54:36 +0100
commit5cc92b64e688ba403233eff17fcfbdf3d1460431 (patch)
tree3a1b51949540d7a6004363ef6ac68d0e070a5ab9
parent912416cd714c6cbfabe95d0c6a9d473b34ecf9c1 (diff)
downloadrabbitmq-server-git-5cc92b64e688ba403233eff17fcfbdf3d1460431.tar.gz
added counter and single pubacks for transient messages
Publishes are counter starting with the first one after pubAck.select(). If many is set to false, transient messages are ack'd as soon as the are received, but after the exchange name is resolved.
-rw-r--r--src/rabbit_channel.erl51
1 files changed, 34 insertions, 17 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e42f49c590..38c4855020 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -49,6 +49,7 @@
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
pubAck_mode}).
+-record(pubAck, {enabled, count, many}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -172,7 +173,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
blocking = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = rabbit_event:init_stats_timer(),
- pubAck_mode = none},
+ pubAck_mode = #pubAck{ enabled = false,
+ count = 0 }},
rabbit_event:notify(
channel_created,
[{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]),
@@ -393,6 +395,16 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
+handle_pubAck(PA = #pubAck{ enabled = false }, _) ->
+ PA;
+handle_pubAck(PA = #pubAck{ count = Count, many = false }, WriterPid) ->
+ rabbit_log:info("handling pubAck in single mode (#~p)~n", [Count]),
+ ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{ delivery_tag = Count }),
+ PA#pubAck{ count = Count+1 };
+handle_pubAck(PA = #pubAck{ count = Count }, _WriterPid) ->
+ rabbit_log:info("handling pubAck in many mode (#~p)~n", [Count]),
+ PA#pubAck{ count = Count+1 }.
+
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -417,10 +429,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
immediate = Immediate},
Content, State = #ch{virtual_host = VHostPath,
transaction_id = TxnKey,
- writer_pid = WriterPid}) ->
+ writer_pid = WriterPid,
+ pubAck_mode = PubAck}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
+ PubAck1 = handle_pubAck(PubAck, WriterPid),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
@@ -442,9 +456,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
QPid <- DeliveredQPids]], publish, State),
+ State1 = State#ch{ pubAck_mode = PubAck1 },
{noreply, case TxnKey of
- none -> State;
- _ -> add_tx_participants(DeliveredQPids, State)
+ none -> State1;
+ _ -> add_tx_participants(DeliveredQPids, State1)
end};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
@@ -832,11 +847,13 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
-handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none,
- pubAck_mode = none}) ->
+handle_method(#'tx.select'{}, _,
+ State = #ch{transaction_id = none,
+ pubAck_mode = #pubAck{enabled = false}}) ->
{reply, #'tx.select_ok'{}, new_tx(State)};
-handle_method(#'tx.select'{}, _, State = #ch{pubAck_mode = none}) ->
+handle_method(#'tx.select'{}, _,
+ State = #ch{pubAck_mode = #pubAck{enabled = false}}) ->
{reply, #'tx.select_ok'{}, State};
handle_method(#'tx.select'{}, _, _State) ->
@@ -858,24 +875,24 @@ handle_method(#'tx.rollback'{}, _, State) ->
{reply, #'tx.rollback_ok'{}, internal_rollback(State)};
handle_method(#'pubAck.select'{many = Many}, _,
- State = #ch{transaction_id = none}) ->
+ State = #ch{ transaction_id = none,
+ pubAck_mode = PA}) ->
rabbit_log:info("got pubAck.select{many = ~p}~n", [Many]),
- {noreply, State#ch{ pubAck_mode = case Many of
- true -> 'pubAckMany';
- false -> 'pubAck'
- end
- }};
-handle_method(#'pubAck.select'{}, _, State) ->
+ {noreply, State#ch{ pubAck_mode = PA#'pubAck'{ enabled = true,
+ many = Many } }};
+handle_method(#'pubAck.select'{}, _, _State) ->
rabbit_misc:protocol_error(
precondition_failed, "transactional channel cannot be made pubAck", []);
-handle_method(#'pubAck.deselect'{}, _, State = #ch{ pubAck_mode = 'none' }) ->
+handle_method(#'pubAck.deselect'{}, _,
+ #ch{ pubAck_mode = #pubAck { enabled = false }}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel must first be in pubAck, to deselect", []);
-handle_method(#'pubAck.deselect'{}, _, State) ->
+handle_method(#'pubAck.deselect'{}, _, State = #ch{ pubAck_mode = PA }) ->
rabbit_log:info("got pubAck.deselect{}~n"),
- {reply, #'pubAck.deselect_ok'{}, State#ch{ pubAck_mode = 'none'}};
+ {reply, #'pubAck.deselect_ok'{},
+ State#ch{ pubAck_mode = PA#pubAck{ enabled = false }}};
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter_pid = LimiterPid}) ->