diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 4 |
4 files changed, 23 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f811bce2d6..ddeeef98a1 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -142,9 +142,9 @@ {routing_result(), qpids()}). -spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> {routing_result(), qpids()}). --spec(requeue/4 :: (pid(), rabbit_types:ctag(), [msg_id()], pid()) -> 'ok'). --spec(ack/4 :: (pid(), rabbit_types:ctag(), [msg_id()], pid()) -> 'ok'). --spec(reject/5 :: (pid(), rabbit_types:ctag(), [msg_id()], boolean(), pid()) -> +-spec(requeue/4 :: (pid(), [msg_id()], rabbit_types:ctag(), pid()) -> 'ok'). +-spec(ack/4 :: (pid(), [msg_id()], rabbit_types:ctag(), pid()) -> 'ok'). +-spec(reject/5 :: (pid(), boolean(), [msg_id()], rabbit_types:ctag(), pid()) -> 'ok'). -spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()). @@ -547,14 +547,14 @@ deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow). deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow). -requeue(QPid, CTag, MsgIds, ChPid) -> - delegate:call(QPid, {requeue, CTag, MsgIds, ChPid}). +requeue(QPid, MsgIds, CTag, ChPid) -> + delegate:call(QPid, {requeue, MsgIds, CTag, ChPid}). -ack(QPid, CTag, MsgIds, ChPid) -> - delegate:cast(QPid, {ack, CTag, MsgIds, ChPid}). +ack(QPid, MsgIds, CTag, ChPid) -> + delegate:cast(QPid, {ack, MsgIds, CTag, ChPid}). -reject(QPid, CTag, MsgIds, Requeue, ChPid) -> - delegate:cast(QPid, {reject, CTag, MsgIds, Requeue, ChPid}). +reject(QPid, Requeue, MsgIds, CTag, ChPid) -> + delegate:cast(QPid, {reject, Requeue, MsgIds, CTag, ChPid}). notify_down_all(QPids, ChPid) -> {_, Bads} = delegate:call(QPids, {notify_down, ChPid}), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d2ec49412e..57c6fb238b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -563,7 +563,7 @@ fetch(AckRequired, State = #q{backing_queue = BQ, {Result, maybe_send_drained(Result =:= empty, State1)}. ack(AckTags, CTag, ChPid, State) -> - subtract_acks(ChPid, CTag, AckTags, State, + subtract_acks(AckTags, CTag, ChPid, State, fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), @@ -571,7 +571,7 @@ ack(AckTags, CTag, ChPid, State) -> end). requeue(AckTags, CTag, ChPid, State) -> - subtract_acks(ChPid, CTag, AckTags, State, + subtract_acks(AckTags, CTag, ChPid, State, fun (State1) -> requeue_and_run(AckTags, State1) end). possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) -> @@ -634,9 +634,9 @@ backing_queue_timeout(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:timeout(BQS)}. -subtract_acks(ChPid, CTag, AckTags, State = #q{consumers = Consumers}, Fun) -> +subtract_acks(AckTags, CTag, ChPid, State = #q{consumers = Consumers}, Fun) -> case rabbit_queue_consumers:subtract_acks( - ChPid, CTag, AckTags, Consumers) of + AckTags, CTag, ChPid, Consumers) of not_found -> State; unchanged -> Fun(State); {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, @@ -989,7 +989,7 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, State1 = State#q{backing_queue_state = BQS1}, reply({ok, Count}, maybe_send_drained(Count =:= 0, State1)); -handle_call({requeue, CTag, AckTags, ChPid}, From, State) -> +handle_call({requeue, AckTags, CTag, ChPid}, From, State) -> gen_server2:reply(From, ok), noreply(requeue(AckTags, CTag, ChPid, State)); @@ -1053,16 +1053,16 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, State1 = State#q{senders = Senders1}, noreply(deliver_or_enqueue(Delivery, Delivered, State1)); -handle_cast({ack, CTag, AckTags, ChPid}, State) -> +handle_cast({ack, AckTags, CTag, ChPid}, State) -> noreply(ack(AckTags, CTag, ChPid, State)); -handle_cast({reject, CTag, AckTags, true, ChPid}, State) -> +handle_cast({reject, true, AckTags, CTag, ChPid}, State) -> noreply(requeue(AckTags, CTag, ChPid, State)); -handle_cast({reject, CTag, AckTags, false, ChPid}, State) -> +handle_cast({reject, false, AckTags, CTag, ChPid}, State) -> noreply(with_dlx( State#q.dlx, - fun (X) -> subtract_acks(ChPid, CTag, AckTags, State, + fun (X) -> subtract_acks(AckTags, CTag, ChPid, State, fun (State1) -> dead_letter_rejected_msgs( AckTags, X, State1) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f496a8a55c..da62fd21c3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -886,7 +886,7 @@ handle_method(#'basic.recover_async'{requeue = true}, rabbit_misc:with_exit_handler( OkFun, fun () -> - rabbit_amqqueue:requeue(QPid, CTag, MsgIds, self()) + rabbit_amqqueue:requeue(QPid, MsgIds, CTag, self()) end) end, lists:reverse(UAMQL)), ok = notify_limiter(Limiter, UAMQL), @@ -1357,7 +1357,7 @@ reject(DeliveryTag, Requeue, Multiple, reject(Requeue, Acked, Limiter) -> foreach_per_consumer( fun ({QPid, CTag}, MsgIds) -> - rabbit_amqqueue:reject(QPid, CTag, MsgIds, Requeue, self()) + rabbit_amqqueue:reject(QPid, Requeue, MsgIds, CTag, self()) end, Acked), ok = notify_limiter(Limiter, Acked). @@ -1417,7 +1417,7 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> ack(Acked, State = #ch{queue_names = QNames}) -> foreach_per_consumer( fun ({QPid, CTag}, MsgIds) -> - ok = rabbit_amqqueue:ack(QPid, CTag, MsgIds, self()), + ok = rabbit_amqqueue:ack(QPid, MsgIds, CTag, self()), ?INCR_STATS(case dict:find(QPid, QNames) of {ok, QName} -> Count = length(MsgIds), [{queue_stats, QName, Count}]; diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 9106ef02bd..5087b93403 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -81,7 +81,7 @@ {'delivered', boolean(), T, state()} | {'undelivered', boolean(), state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. --spec subtract_acks(ch(), rabbit_types:ctag(), [ack()], state()) -> +-spec subtract_acks([ack()], rabbit_types:ctag(), ch(), state()) -> 'not_found' | 'unchanged' | {'unblocked', state()}. -spec possibly_unblock(cr_fun(), ch(), state()) -> 'unchanged' | {'unblocked', state()}. @@ -244,7 +244,7 @@ record_ack(ChPid, LimiterPid, AckTag) -> update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}), ok. -subtract_acks(ChPid, CTag, AckTags, State) -> +subtract_acks(AckTags, CTag, ChPid, State) -> case lookup_ch(ChPid) of not_found -> not_found; |
