diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-13 17:01:02 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-13 17:01:02 +0000 |
| commit | 253e8e3aab9f4b134b7f2d8930ad80a9e06b2a59 (patch) | |
| tree | 9d348178975d3f80d5aeb650f62361107f2ef220 /src | |
| parent | 092c4ce6f11b99ad12044a52ab8ed46589277405 (diff) | |
| download | rabbitmq-server-git-253e8e3aab9f4b134b7f2d8930ad80a9e06b2a59.tar.gz | |
Unbreak basic.reject, basic.nack and basic.recover. Fix up specs.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 5 |
5 files changed, 39 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index cb2eb635b8..f811bce2d6 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -22,7 +22,7 @@ -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, - stat/1, deliver/2, deliver_flow/2, requeue/3, ack/4, reject/4]). + stat/1, deliver/2, deliver_flow/2, requeue/4, ack/4, reject/5]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). @@ -142,9 +142,10 @@ {routing_result(), qpids()}). -spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> {routing_result(), qpids()}). --spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -%%-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). --spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). +-spec(requeue/4 :: (pid(), rabbit_types:ctag(), [msg_id()], pid()) -> 'ok'). +-spec(ack/4 :: (pid(), rabbit_types:ctag(), [msg_id()], pid()) -> 'ok'). +-spec(reject/5 :: (pid(), rabbit_types:ctag(), [msg_id()], boolean(), pid()) -> + 'ok'). -spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(basic_get/4 :: (rabbit_types:amqqueue(), pid(), boolean(), pid()) -> @@ -546,12 +547,14 @@ deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow). deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow). -requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}). +requeue(QPid, CTag, MsgIds, ChPid) -> + delegate:call(QPid, {requeue, CTag, MsgIds, ChPid}). -ack(QPid, CTag, MsgIds, ChPid) -> delegate:cast(QPid, {ack, CTag, MsgIds, ChPid}). +ack(QPid, CTag, MsgIds, ChPid) -> + delegate:cast(QPid, {ack, CTag, MsgIds, ChPid}). -reject(QPid, MsgIds, Requeue, ChPid) -> - delegate:cast(QPid, {reject, MsgIds, Requeue, ChPid}). +reject(QPid, CTag, MsgIds, Requeue, ChPid) -> + delegate:cast(QPid, {reject, CTag, MsgIds, Requeue, ChPid}). notify_down_all(QPids, ChPid) -> {_, Bads} = delegate:call(QPids, {notify_down, ChPid}), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0066a07be2..d2ec49412e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -570,8 +570,8 @@ ack(AckTags, CTag, ChPid, State) -> State1#q{backing_queue_state = BQS1} end). -requeue(AckTags, ChPid, State) -> - subtract_acks(ChPid, fixme, AckTags, State, +requeue(AckTags, CTag, ChPid, State) -> + subtract_acks(ChPid, CTag, AckTags, State, fun (State1) -> requeue_and_run(AckTags, State1) end). possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) -> @@ -989,9 +989,9 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, State1 = State#q{backing_queue_state = BQS1}, reply({ok, Count}, maybe_send_drained(Count =:= 0, State1)); -handle_call({requeue, AckTags, ChPid}, From, State) -> +handle_call({requeue, CTag, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), - noreply(requeue(AckTags, ChPid, State)); + noreply(requeue(AckTags, CTag, ChPid, State)); handle_call(sync_mirrors, _From, State = #q{backing_queue = rabbit_mirror_queue_master, @@ -1056,18 +1056,18 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, handle_cast({ack, CTag, AckTags, ChPid}, State) -> noreply(ack(AckTags, CTag, ChPid, State)); -handle_cast({reject, AckTags, true, ChPid}, State) -> - noreply(requeue(AckTags, ChPid, State)); +handle_cast({reject, CTag, AckTags, true, ChPid}, State) -> + noreply(requeue(AckTags, CTag, ChPid, State)); -handle_cast({reject, AckTags, false, ChPid}, State) -> +handle_cast({reject, CTag, AckTags, false, ChPid}, State) -> noreply(with_dlx( State#q.dlx, - fun (X) -> subtract_acks(ChPid, fixme, AckTags, State, + fun (X) -> subtract_acks(ChPid, CTag, AckTags, State, fun (State1) -> dead_letter_rejected_msgs( AckTags, X, State1) end) end, - fun () -> ack(AckTags, fixme, ChPid, State) end)); + fun () -> ack(AckTags, CTag, ChPid, State) end)); handle_cast(delete_immediately, State) -> stop(State); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0e2d16dc11..f496a8a55c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -881,11 +881,13 @@ handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> OkFun = fun () -> ok end, UAMQL = queue:to_list(UAMQ), - foreach_per_queue( - fun (QPid, MsgIds) -> + foreach_per_consumer( + fun ({QPid, CTag}, MsgIds) -> rabbit_misc:with_exit_handler( OkFun, - fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end) + fun () -> + rabbit_amqqueue:requeue(QPid, CTag, MsgIds, self()) + end) end, lists:reverse(UAMQL)), ok = notify_limiter(Limiter, UAMQL), %% No answer required - basic.recover is the newer, synchronous @@ -1353,9 +1355,9 @@ reject(DeliveryTag, Requeue, Multiple, %% NB: Acked is in youngest-first order reject(Requeue, Acked, Limiter) -> - foreach_per_queue( - fun (QPid, MsgIds) -> - rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) + foreach_per_consumer( + fun ({QPid, CTag}, MsgIds) -> + rabbit_amqqueue:reject(QPid, CTag, MsgIds, Requeue, self()) end, Acked), ok = notify_limiter(Limiter, Acked). @@ -1413,7 +1415,7 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> %% NB: Acked is in youngest-first order ack(Acked, State = #ch{queue_names = QNames}) -> - foreach_per_queue( + foreach_per_consumer( fun ({QPid, CTag}, MsgIds) -> ok = rabbit_amqqueue:ack(QPid, CTag, MsgIds, self()), ?INCR_STATS(case dict:find(QPid, QNames) of @@ -1446,13 +1448,13 @@ notify_queues(State = #ch{consumer_mapping = Consumers, sets:union(sets:from_list(consumer_queues(Consumers)), DQ)), {rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}. -foreach_per_queue(_F, []) -> +foreach_per_consumer(_F, []) -> ok; -foreach_per_queue(F, [{_DTag, CTag, {QPid, MsgId}}]) -> %% common case +foreach_per_consumer(F, [{_DTag, CTag, {QPid, MsgId}}]) -> %% common case F({QPid, CTag}, [MsgId]); %% NB: UAL should be in youngest-first order; the tree values will %% then be in oldest-first order -foreach_per_queue(F, UAL) -> +foreach_per_consumer(F, UAL) -> T = lists:foldl(fun ({_DTag, CTag, {QPid, MsgId}}, T) -> rabbit_misc:gb_trees_cons({QPid, CTag}, MsgId, T) end, gb_trees:empty(), UAL), diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 09bf03e6f3..210b6b7c4e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -174,8 +174,10 @@ -spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()). -spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(), boolean()) -> {boolean(), qstate()}). +-spec(set_consumer_prefetch/4 :: (qstate(), rabbit_types:ctag(), boolean(), + non_neg_integer()) -> qstate()). -spec(ack_from_queue/3 :: (qstate(), rabbit_types:ctag(), non_neg_integer()) - -> qstate()). + -> {boolean(), qstate()}). -spec(drained/1 :: (qstate()) -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}). -spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()). @@ -309,7 +311,7 @@ ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) -> _ -> {Credits, false} end, - {Limiter#qstate{credits = Credits1}, Unblocked}. + {Unblocked, Limiter#qstate{credits = Credits1}}. drained(Limiter = #qstate{credits = Credits}) -> {CTagCredits, Credits2} = diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 8ca6855708..3ba337ae13 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -81,7 +81,8 @@ {'delivered', boolean(), T, state()} | {'undelivered', boolean(), state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. -%% -spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. +-spec subtract_acks(ch(), rabbit_types:ctag(), [ack()], state()) -> + 'not_found' | 'unchanged' | {'unblocked', state()}. -spec possibly_unblock(cr_fun(), ch(), state()) -> 'unchanged' | {'unblocked', state()}. -spec resume_fun() -> cr_fun(). @@ -245,7 +246,7 @@ subtract_acks(ChPid, CTag, AckTags, State) -> not_found -> not_found; C = #cr{acktags = ChAckTags, limiter = Lim} -> - {Lim2, Unblocked} = + {Unblocked, Lim2} = rabbit_limiter:ack_from_queue(Lim, CTag, length(AckTags)), AckTags2 = subtract_acks0(AckTags, [], ChAckTags), C2 = C#cr{acktags = AckTags2, limiter = Lim2}, |
