diff options
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 52 |
1 files changed, 48 insertions, 4 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 75093ceee4..a2a699aae5 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -16,7 +16,7 @@ -module(rabbit_queue_consumers). --export([new/0, max_active_priority/1, inactive/1, count/0, all/1, +-export([new/0, max_active_priority/1, inactive/1, all/1, count/0, unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, send_drained/0, deliver/5, record_ack/3, subtract_acks/2, possibly_unblock/3, @@ -43,14 +43,55 @@ %%---------------------------------------------------------------------------- +-ifdef(use_specs). + +-type consumers() :: priority_queue:q(). +-type ch() :: pid(). +-type ack() :: non_neg_integer(). +-type cr_fun() :: fun ((#cr{}) -> #cr{}). +-type credit_args() :: {non_neg_integer(), boolean()} | 'none'. +-type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}. + +-spec new() -> consumers(). +-spec max_active_priority(consumers()) -> integer() | 'infinity' | 'empty'. +-spec inactive(consumers()) -> boolean(). +-spec all(consumers()) -> [{ch(), rabbit_types:ctag(), boolean(), + rabbit_framing:amqp_table()}]. +-spec count() -> non_neg_integer(). +-spec unacknowledged_message_count() -> non_neg_integer(). +-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), + credit_args(), rabbit_framing:amqp_table(), boolean(), + consumers()) -> consumers(). +-spec remove(ch(), rabbit_types:ctag(), consumers()) -> + 'not_found' | consumers(). +-spec erase_ch(ch(), consumers()) -> + 'not_found' | {[ack()], [rabbit_types:ctag()], + consumers()}. +-spec send_drained() -> 'ok'. +-spec deliver(fun ((boolean(), T) -> {fetch_result(), boolean(), T}), + boolean(), rabbit_amqqueue:name(), T, consumers()) -> + {boolean(), [{ch(), rabbit_types:ctag()}], T, consumers()}. +-spec record_ack(ch(), pid(), ack()) -> 'ok'. +-spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. +-spec possibly_unblock(cr_fun(), ch(), consumers()) -> + 'unchanged' | + {'unblocked', [rabbit_types:ctag()], consumers()}. +-spec resume_fun() -> cr_fun(). +-spec notify_sent_fun(non_neg_integer()) -> cr_fun(). +-spec activate_limit_fun() -> cr_fun(). +-spec credit_fun(boolean(), non_neg_integer(), boolean(), + rabbit_types:ctag()) -> cr_fun(). + +-endif. + +%%---------------------------------------------------------------------------- + new() -> priority_queue:new(). max_active_priority(Consumers) -> priority_queue:highest(Consumers). inactive(Consumers) -> priority_queue:is_empty(Consumers). -count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). - all(Consumers) -> lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end, consumers(Consumers, []), all_ch_record()). @@ -62,6 +103,8 @@ consumers(Consumers, Acc) -> [{ChPid, CTag, Ack, Args} | Acc1] end, Acc, Consumers). +count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). + unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). @@ -122,7 +165,8 @@ erase_ch(ChPid, Consumers) -> remove_consumers(ChPid, Consumers)} end. -send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()]. +send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], + ok. deliver(DeliverFun, Stop, QName, S, Consumers) -> deliver(DeliverFun, Stop, QName, [], S, Consumers). |
