summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-13 17:01:02 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-13 17:01:02 +0000
commit253e8e3aab9f4b134b7f2d8930ad80a9e06b2a59 (patch)
tree9d348178975d3f80d5aeb650f62361107f2ef220 /src
parent092c4ce6f11b99ad12044a52ab8ed46589277405 (diff)
downloadrabbitmq-server-git-253e8e3aab9f4b134b7f2d8930ad80a9e06b2a59.tar.gz
Unbreak basic.reject, basic.nack and basic.recover. Fix up specs.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl19
-rw-r--r--src/rabbit_amqqueue_process.erl18
-rw-r--r--src/rabbit_channel.erl22
-rw-r--r--src/rabbit_limiter.erl6
-rw-r--r--src/rabbit_queue_consumers.erl5
5 files changed, 39 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index cb2eb635b8..f811bce2d6 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -22,7 +22,7 @@
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, deliver_flow/2, requeue/3, ack/4, reject/4]).
+ stat/1, deliver/2, deliver_flow/2, requeue/4, ack/4, reject/5]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
@@ -142,9 +142,10 @@
{routing_result(), qpids()}).
-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
{routing_result(), qpids()}).
--spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-%%-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
--spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
+-spec(requeue/4 :: (pid(), rabbit_types:ctag(), [msg_id()], pid()) -> 'ok').
+-spec(ack/4 :: (pid(), rabbit_types:ctag(), [msg_id()], pid()) -> 'ok').
+-spec(reject/5 :: (pid(), rabbit_types:ctag(), [msg_id()], boolean(), pid()) ->
+ 'ok').
-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()).
-spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()).
-spec(basic_get/4 :: (rabbit_types:amqqueue(), pid(), boolean(), pid()) ->
@@ -546,12 +547,14 @@ deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
-requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}).
+requeue(QPid, CTag, MsgIds, ChPid) ->
+ delegate:call(QPid, {requeue, CTag, MsgIds, ChPid}).
-ack(QPid, CTag, MsgIds, ChPid) -> delegate:cast(QPid, {ack, CTag, MsgIds, ChPid}).
+ack(QPid, CTag, MsgIds, ChPid) ->
+ delegate:cast(QPid, {ack, CTag, MsgIds, ChPid}).
-reject(QPid, MsgIds, Requeue, ChPid) ->
- delegate:cast(QPid, {reject, MsgIds, Requeue, ChPid}).
+reject(QPid, CTag, MsgIds, Requeue, ChPid) ->
+ delegate:cast(QPid, {reject, CTag, MsgIds, Requeue, ChPid}).
notify_down_all(QPids, ChPid) ->
{_, Bads} = delegate:call(QPids, {notify_down, ChPid}),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0066a07be2..d2ec49412e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -570,8 +570,8 @@ ack(AckTags, CTag, ChPid, State) ->
State1#q{backing_queue_state = BQS1}
end).
-requeue(AckTags, ChPid, State) ->
- subtract_acks(ChPid, fixme, AckTags, State,
+requeue(AckTags, CTag, ChPid, State) ->
+ subtract_acks(ChPid, CTag, AckTags, State,
fun (State1) -> requeue_and_run(AckTags, State1) end).
possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) ->
@@ -989,9 +989,9 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
State1 = State#q{backing_queue_state = BQS1},
reply({ok, Count}, maybe_send_drained(Count =:= 0, State1));
-handle_call({requeue, AckTags, ChPid}, From, State) ->
+handle_call({requeue, CTag, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
- noreply(requeue(AckTags, ChPid, State));
+ noreply(requeue(AckTags, CTag, ChPid, State));
handle_call(sync_mirrors, _From,
State = #q{backing_queue = rabbit_mirror_queue_master,
@@ -1056,18 +1056,18 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
handle_cast({ack, CTag, AckTags, ChPid}, State) ->
noreply(ack(AckTags, CTag, ChPid, State));
-handle_cast({reject, AckTags, true, ChPid}, State) ->
- noreply(requeue(AckTags, ChPid, State));
+handle_cast({reject, CTag, AckTags, true, ChPid}, State) ->
+ noreply(requeue(AckTags, CTag, ChPid, State));
-handle_cast({reject, AckTags, false, ChPid}, State) ->
+handle_cast({reject, CTag, AckTags, false, ChPid}, State) ->
noreply(with_dlx(
State#q.dlx,
- fun (X) -> subtract_acks(ChPid, fixme, AckTags, State,
+ fun (X) -> subtract_acks(ChPid, CTag, AckTags, State,
fun (State1) ->
dead_letter_rejected_msgs(
AckTags, X, State1)
end) end,
- fun () -> ack(AckTags, fixme, ChPid, State) end));
+ fun () -> ack(AckTags, CTag, ChPid, State) end));
handle_cast(delete_immediately, State) ->
stop(State);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0e2d16dc11..f496a8a55c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -881,11 +881,13 @@ handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) ->
OkFun = fun () -> ok end,
UAMQL = queue:to_list(UAMQ),
- foreach_per_queue(
- fun (QPid, MsgIds) ->
+ foreach_per_consumer(
+ fun ({QPid, CTag}, MsgIds) ->
rabbit_misc:with_exit_handler(
OkFun,
- fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end)
+ fun () ->
+ rabbit_amqqueue:requeue(QPid, CTag, MsgIds, self())
+ end)
end, lists:reverse(UAMQL)),
ok = notify_limiter(Limiter, UAMQL),
%% No answer required - basic.recover is the newer, synchronous
@@ -1353,9 +1355,9 @@ reject(DeliveryTag, Requeue, Multiple,
%% NB: Acked is in youngest-first order
reject(Requeue, Acked, Limiter) ->
- foreach_per_queue(
- fun (QPid, MsgIds) ->
- rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ foreach_per_consumer(
+ fun ({QPid, CTag}, MsgIds) ->
+ rabbit_amqqueue:reject(QPid, CTag, MsgIds, Requeue, self())
end, Acked),
ok = notify_limiter(Limiter, Acked).
@@ -1413,7 +1415,7 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
%% NB: Acked is in youngest-first order
ack(Acked, State = #ch{queue_names = QNames}) ->
- foreach_per_queue(
+ foreach_per_consumer(
fun ({QPid, CTag}, MsgIds) ->
ok = rabbit_amqqueue:ack(QPid, CTag, MsgIds, self()),
?INCR_STATS(case dict:find(QPid, QNames) of
@@ -1446,13 +1448,13 @@ notify_queues(State = #ch{consumer_mapping = Consumers,
sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
{rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}.
-foreach_per_queue(_F, []) ->
+foreach_per_consumer(_F, []) ->
ok;
-foreach_per_queue(F, [{_DTag, CTag, {QPid, MsgId}}]) -> %% common case
+foreach_per_consumer(F, [{_DTag, CTag, {QPid, MsgId}}]) -> %% common case
F({QPid, CTag}, [MsgId]);
%% NB: UAL should be in youngest-first order; the tree values will
%% then be in oldest-first order
-foreach_per_queue(F, UAL) ->
+foreach_per_consumer(F, UAL) ->
T = lists:foldl(fun ({_DTag, CTag, {QPid, MsgId}}, T) ->
rabbit_misc:gb_trees_cons({QPid, CTag}, MsgId, T)
end, gb_trees:empty(), UAL),
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 09bf03e6f3..210b6b7c4e 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -174,8 +174,10 @@
-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()).
-spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(),
boolean()) -> {boolean(), qstate()}).
+-spec(set_consumer_prefetch/4 :: (qstate(), rabbit_types:ctag(), boolean(),
+ non_neg_integer()) -> qstate()).
-spec(ack_from_queue/3 :: (qstate(), rabbit_types:ctag(), non_neg_integer())
- -> qstate()).
+ -> {boolean(), qstate()}).
-spec(drained/1 :: (qstate())
-> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}).
-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()).
@@ -309,7 +311,7 @@ ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
_ ->
{Credits, false}
end,
- {Limiter#qstate{credits = Credits1}, Unblocked}.
+ {Unblocked, Limiter#qstate{credits = Credits1}}.
drained(Limiter = #qstate{credits = Credits}) ->
{CTagCredits, Credits2} =
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 8ca6855708..3ba337ae13 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -81,7 +81,8 @@
{'delivered', boolean(), T, state()} |
{'undelivered', boolean(), state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
-%% -spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
+-spec subtract_acks(ch(), rabbit_types:ctag(), [ack()], state()) ->
+ 'not_found' | 'unchanged' | {'unblocked', state()}.
-spec possibly_unblock(cr_fun(), ch(), state()) ->
'unchanged' | {'unblocked', state()}.
-spec resume_fun() -> cr_fun().
@@ -245,7 +246,7 @@ subtract_acks(ChPid, CTag, AckTags, State) ->
not_found ->
not_found;
C = #cr{acktags = ChAckTags, limiter = Lim} ->
- {Lim2, Unblocked} =
+ {Unblocked, Lim2} =
rabbit_limiter:ack_from_queue(Lim, CTag, length(AckTags)),
AckTags2 = subtract_acks0(AckTags, [], ChAckTags),
C2 = C#cr{acktags = AckTags2, limiter = Lim2},