diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-11 18:00:22 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-11 18:00:22 +0100 |
| commit | 1cc1cf76c916a2f09a118912ecd13ceae8306547 (patch) | |
| tree | acfe20870f3ef50810071ca4a3519eac3252faed | |
| parent | a080c6c13de45247d511f8e897ffea574a800fc0 (diff) | |
| download | rabbitmq-server-git-1cc1cf76c916a2f09a118912ecd13ceae8306547.tar.gz | |
added support for multiple publisher acks for transient messages
When in confirm multiple mode, transient messages queue up in a
gb_set. Every 5s, these acks are sent out (individually, for now).
| -rw-r--r-- | src/rabbit_channel.erl | 38 |
1 files changed, 33 insertions, 5 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 09d1b385c7..dbfb11c0b8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -49,7 +49,7 @@ username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, confirm}). --record(confirm, {enabled, count, multiple, tref}). +-record(confirm, {enabled, count, multiple, tref, held_acks}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -70,6 +70,8 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). +-define(MULTIPLE_ACK_FLUSH_INTERVAL, 5000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -176,7 +178,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> confirm = #confirm{ enabled = false, count = 0, multiple = false, - tref = not_started }}, + tref = not_started, + held_acks = gb_sets:new()}}, rabbit_event:notify( channel_created, [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), @@ -253,7 +256,20 @@ handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> erase_queue_stats(QPid), - {noreply, queue_blocked(QPid, State)}. + {noreply, queue_blocked(QPid, State)}; +handle_info(multiple_ack_flush, + State = #ch{ writer_pid = WriterPid, + confirm = #confirm{held_acks = As} } ) -> + rabbit_log:info("channel got a multiple_ack_flush message~n" + "held acks: ~p~n", [gb_sets:to_list(As)]), + gb_sets:fold(fun(A, L) -> + ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{ delivery_tag = A }), + L + end, -1, As), + {noreply, State#ch{confirm = #confirm{ held_acks = gb_sets:new()}}}. + handle_pre_hibernate(State) -> ok = clear_permission_cache(), @@ -400,9 +416,15 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> handle_confirm(C = #confirm{ enabled = false }, _, _) -> C; handle_confirm(C = #confirm{ count = Count, multiple = false }, false, WriterPid) -> - rabbit_log:info("handling confirm in single mode (#~p)~n", [Count]), + rabbit_log:info("handling confirm in single transient mode (#~p)~n", [Count]), ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{ delivery_tag = Count }), C#confirm{ count = Count+1 }; +handle_confirm(C = #confirm{ count = Count, + multiple = true, + held_acks = As }, false, _WriterPid) -> + rabbit_log:info("handling confirm in multiple transient mode (#~p)~n", [Count]), + C#confirm{ count = Count+1, + held_acks = gb_sets:add(Count, As) }; handle_confirm(C = #confirm{ count = Count }, IsPersistent, _WriterPid) -> rabbit_log:info("handling confirm (#~p, persistent = ~p)~n", [Count, IsPersistent]), C#confirm{ count = Count+1 }. @@ -883,8 +905,14 @@ handle_method(#'confirm.select'{multiple = Multiple, confirm = C = #confirm{enabled = false}}) -> rabbit_log:info("got confirm.select{multiple = ~p, nowait = ~p}~n", [Multiple, NoWait]), + TRef = case Multiple of + false -> not_started; + true -> timer:send_interval(?MULTIPLE_ACK_FLUSH_INTERVAL, + multiple_ack_flush) + end, State1 = State#ch{confirm = C#confirm{ enabled = true, - multiple = Multiple }}, + multiple = Multiple, + tref = TRef }}, case NoWait of true -> {noreply, State1}; false -> {reply, #'confirm.select_ok'{}, State1} |
