summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-12-24 12:59:05 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-12-24 12:59:05 +0000
commit1741bbd334a2fb826ed2c2e2c1c63b004a4523bf (patch)
tree31e198f7a7b273bf0fa832f6536f4abd83d19f7b
parent143554f993ba5b148aa732cd0fe4d53ab908c8cb (diff)
downloadrabbitmq-server-git-1741bbd334a2fb826ed2c2e2c1c63b004a4523bf.tar.gz
specs
-rw-r--r--src/rabbit_queue_consumers.erl52
1 files changed, 48 insertions, 4 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 75093ceee4..a2a699aae5 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -16,7 +16,7 @@
-module(rabbit_queue_consumers).
--export([new/0, max_active_priority/1, inactive/1, count/0, all/1,
+-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
send_drained/0, deliver/5, record_ack/3, subtract_acks/2,
possibly_unblock/3,
@@ -43,14 +43,55 @@
%%----------------------------------------------------------------------------
+-ifdef(use_specs).
+
+-type consumers() :: priority_queue:q().
+-type ch() :: pid().
+-type ack() :: non_neg_integer().
+-type cr_fun() :: fun ((#cr{}) -> #cr{}).
+-type credit_args() :: {non_neg_integer(), boolean()} | 'none'.
+-type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}.
+
+-spec new() -> consumers().
+-spec max_active_priority(consumers()) -> integer() | 'infinity' | 'empty'.
+-spec inactive(consumers()) -> boolean().
+-spec all(consumers()) -> [{ch(), rabbit_types:ctag(), boolean(),
+ rabbit_framing:amqp_table()}].
+-spec count() -> non_neg_integer().
+-spec unacknowledged_message_count() -> non_neg_integer().
+-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
+ credit_args(), rabbit_framing:amqp_table(), boolean(),
+ consumers()) -> consumers().
+-spec remove(ch(), rabbit_types:ctag(), consumers()) ->
+ 'not_found' | consumers().
+-spec erase_ch(ch(), consumers()) ->
+ 'not_found' | {[ack()], [rabbit_types:ctag()],
+ consumers()}.
+-spec send_drained() -> 'ok'.
+-spec deliver(fun ((boolean(), T) -> {fetch_result(), boolean(), T}),
+ boolean(), rabbit_amqqueue:name(), T, consumers()) ->
+ {boolean(), [{ch(), rabbit_types:ctag()}], T, consumers()}.
+-spec record_ack(ch(), pid(), ack()) -> 'ok'.
+-spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
+-spec possibly_unblock(cr_fun(), ch(), consumers()) ->
+ 'unchanged' |
+ {'unblocked', [rabbit_types:ctag()], consumers()}.
+-spec resume_fun() -> cr_fun().
+-spec notify_sent_fun(non_neg_integer()) -> cr_fun().
+-spec activate_limit_fun() -> cr_fun().
+-spec credit_fun(boolean(), non_neg_integer(), boolean(),
+ rabbit_types:ctag()) -> cr_fun().
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
new() -> priority_queue:new().
max_active_priority(Consumers) -> priority_queue:highest(Consumers).
inactive(Consumers) -> priority_queue:is_empty(Consumers).
-count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
-
all(Consumers) ->
lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end,
consumers(Consumers, []), all_ch_record()).
@@ -62,6 +103,8 @@ consumers(Consumers, Acc) ->
[{ChPid, CTag, Ack, Args} | Acc1]
end, Acc, Consumers).
+count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
+
unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
@@ -122,7 +165,8 @@ erase_ch(ChPid, Consumers) ->
remove_consumers(ChPid, Consumers)}
end.
-send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()].
+send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
+ ok.
deliver(DeliverFun, Stop, QName, S, Consumers) ->
deliver(DeliverFun, Stop, QName, [], S, Consumers).