summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl18
-rw-r--r--src/rabbit_amqqueue_process.erl18
-rw-r--r--src/rabbit_channel.erl6
-rw-r--r--src/rabbit_queue_consumers.erl4
4 files changed, 23 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f811bce2d6..ddeeef98a1 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -142,9 +142,9 @@
{routing_result(), qpids()}).
-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
{routing_result(), qpids()}).
--spec(requeue/4 :: (pid(), rabbit_types:ctag(), [msg_id()], pid()) -> 'ok').
--spec(ack/4 :: (pid(), rabbit_types:ctag(), [msg_id()], pid()) -> 'ok').
--spec(reject/5 :: (pid(), rabbit_types:ctag(), [msg_id()], boolean(), pid()) ->
+-spec(requeue/4 :: (pid(), [msg_id()], rabbit_types:ctag(), pid()) -> 'ok').
+-spec(ack/4 :: (pid(), [msg_id()], rabbit_types:ctag(), pid()) -> 'ok').
+-spec(reject/5 :: (pid(), boolean(), [msg_id()], rabbit_types:ctag(), pid()) ->
'ok').
-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()).
-spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()).
@@ -547,14 +547,14 @@ deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
-requeue(QPid, CTag, MsgIds, ChPid) ->
- delegate:call(QPid, {requeue, CTag, MsgIds, ChPid}).
+requeue(QPid, MsgIds, CTag, ChPid) ->
+ delegate:call(QPid, {requeue, MsgIds, CTag, ChPid}).
-ack(QPid, CTag, MsgIds, ChPid) ->
- delegate:cast(QPid, {ack, CTag, MsgIds, ChPid}).
+ack(QPid, MsgIds, CTag, ChPid) ->
+ delegate:cast(QPid, {ack, MsgIds, CTag, ChPid}).
-reject(QPid, CTag, MsgIds, Requeue, ChPid) ->
- delegate:cast(QPid, {reject, CTag, MsgIds, Requeue, ChPid}).
+reject(QPid, Requeue, MsgIds, CTag, ChPid) ->
+ delegate:cast(QPid, {reject, Requeue, MsgIds, CTag, ChPid}).
notify_down_all(QPids, ChPid) ->
{_, Bads} = delegate:call(QPids, {notify_down, ChPid}),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d2ec49412e..57c6fb238b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -563,7 +563,7 @@ fetch(AckRequired, State = #q{backing_queue = BQ,
{Result, maybe_send_drained(Result =:= empty, State1)}.
ack(AckTags, CTag, ChPid, State) ->
- subtract_acks(ChPid, CTag, AckTags, State,
+ subtract_acks(AckTags, CTag, ChPid, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_Guids, BQS1} = BQ:ack(AckTags, BQS),
@@ -571,7 +571,7 @@ ack(AckTags, CTag, ChPid, State) ->
end).
requeue(AckTags, CTag, ChPid, State) ->
- subtract_acks(ChPid, CTag, AckTags, State,
+ subtract_acks(AckTags, CTag, ChPid, State,
fun (State1) -> requeue_and_run(AckTags, State1) end).
possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) ->
@@ -634,9 +634,9 @@ backing_queue_timeout(State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:timeout(BQS)}.
-subtract_acks(ChPid, CTag, AckTags, State = #q{consumers = Consumers}, Fun) ->
+subtract_acks(AckTags, CTag, ChPid, State = #q{consumers = Consumers}, Fun) ->
case rabbit_queue_consumers:subtract_acks(
- ChPid, CTag, AckTags, Consumers) of
+ AckTags, CTag, ChPid, Consumers) of
not_found -> State;
unchanged -> Fun(State);
{unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1},
@@ -989,7 +989,7 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
State1 = State#q{backing_queue_state = BQS1},
reply({ok, Count}, maybe_send_drained(Count =:= 0, State1));
-handle_call({requeue, CTag, AckTags, ChPid}, From, State) ->
+handle_call({requeue, AckTags, CTag, ChPid}, From, State) ->
gen_server2:reply(From, ok),
noreply(requeue(AckTags, CTag, ChPid, State));
@@ -1053,16 +1053,16 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State1 = State#q{senders = Senders1},
noreply(deliver_or_enqueue(Delivery, Delivered, State1));
-handle_cast({ack, CTag, AckTags, ChPid}, State) ->
+handle_cast({ack, AckTags, CTag, ChPid}, State) ->
noreply(ack(AckTags, CTag, ChPid, State));
-handle_cast({reject, CTag, AckTags, true, ChPid}, State) ->
+handle_cast({reject, true, AckTags, CTag, ChPid}, State) ->
noreply(requeue(AckTags, CTag, ChPid, State));
-handle_cast({reject, CTag, AckTags, false, ChPid}, State) ->
+handle_cast({reject, false, AckTags, CTag, ChPid}, State) ->
noreply(with_dlx(
State#q.dlx,
- fun (X) -> subtract_acks(ChPid, CTag, AckTags, State,
+ fun (X) -> subtract_acks(AckTags, CTag, ChPid, State,
fun (State1) ->
dead_letter_rejected_msgs(
AckTags, X, State1)
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f496a8a55c..da62fd21c3 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -886,7 +886,7 @@ handle_method(#'basic.recover_async'{requeue = true},
rabbit_misc:with_exit_handler(
OkFun,
fun () ->
- rabbit_amqqueue:requeue(QPid, CTag, MsgIds, self())
+ rabbit_amqqueue:requeue(QPid, MsgIds, CTag, self())
end)
end, lists:reverse(UAMQL)),
ok = notify_limiter(Limiter, UAMQL),
@@ -1357,7 +1357,7 @@ reject(DeliveryTag, Requeue, Multiple,
reject(Requeue, Acked, Limiter) ->
foreach_per_consumer(
fun ({QPid, CTag}, MsgIds) ->
- rabbit_amqqueue:reject(QPid, CTag, MsgIds, Requeue, self())
+ rabbit_amqqueue:reject(QPid, Requeue, MsgIds, CTag, self())
end, Acked),
ok = notify_limiter(Limiter, Acked).
@@ -1417,7 +1417,7 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
ack(Acked, State = #ch{queue_names = QNames}) ->
foreach_per_consumer(
fun ({QPid, CTag}, MsgIds) ->
- ok = rabbit_amqqueue:ack(QPid, CTag, MsgIds, self()),
+ ok = rabbit_amqqueue:ack(QPid, MsgIds, CTag, self()),
?INCR_STATS(case dict:find(QPid, QNames) of
{ok, QName} -> Count = length(MsgIds),
[{queue_stats, QName, Count}];
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 9106ef02bd..5087b93403 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -81,7 +81,7 @@
{'delivered', boolean(), T, state()} |
{'undelivered', boolean(), state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
--spec subtract_acks(ch(), rabbit_types:ctag(), [ack()], state()) ->
+-spec subtract_acks([ack()], rabbit_types:ctag(), ch(), state()) ->
'not_found' | 'unchanged' | {'unblocked', state()}.
-spec possibly_unblock(cr_fun(), ch(), state()) ->
'unchanged' | {'unblocked', state()}.
@@ -244,7 +244,7 @@ record_ack(ChPid, LimiterPid, AckTag) ->
update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}),
ok.
-subtract_acks(ChPid, CTag, AckTags, State) ->
+subtract_acks(AckTags, CTag, ChPid, State) ->
case lookup_ch(ChPid) of
not_found ->
not_found;