diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-27 11:22:06 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-27 11:22:06 +0100 |
| commit | a621678fbedecae97bd8226a2db197453ee4c018 (patch) | |
| tree | 4bf5e1b4c54019be7cf5ba13c178c153981835c5 /src | |
| parent | c1a9bcccece68e8db7b4ac58c1006cc9829eb40b (diff) | |
| download | rabbitmq-server-git-a621678fbedecae97bd8226a2db197453ee4c018.tar.gz | |
implement basic.reject
the simplest flavour that seems useful
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 11 |
3 files changed, 31 insertions, 1 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index df9474439f..e744e6695e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -39,7 +39,7 @@ -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, - stat/1, deliver/2, requeue/3, ack/4]). + stat/1, deliver/2, requeue/3, ack/4, reject/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). @@ -122,6 +122,7 @@ -spec(ack/4 :: (pid(), rabbit_types:maybe(rabbit_types:txn()), [msg_id()], pid()) -> 'ok'). +-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). -spec(commit_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> ok_or_errors()). -spec(rollback_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> 'ok'). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). @@ -335,6 +336,9 @@ requeue(QPid, MsgIds, ChPid) -> ack(QPid, Txn, MsgIds, ChPid) -> delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). +reject(QPid, MsgIds, Requeue, ChPid) -> + delegate_pcast(QPid, 7, {reject, MsgIds, Requeue, ChPid}). + commit_all(QPids, Txn, ChPid) -> safe_delegate_call_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 468a41b206..700e8b0b67 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -750,6 +750,21 @@ handle_cast({ack, Txn, AckTags, ChPid}, noreply(State #q { backing_queue_state = BQS1 }) end; +handle_cast({reject, AckTags, Requeue, ChPid}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + case lookup_ch(ChPid) of + not_found -> + noreply(State); + C = #cr{acktags = ChAckTags} -> + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + store_ch_record(C#cr{acktags = ChAckTags1}), + noreply(case Requeue of + true -> requeue_and_run(AckTags, State); + false -> BQS1 = BQ:ack(AckTags, BQS), + State #q { backing_queue_state = BQS1 } + end) + end; + handle_cast({rollback, Txn, ChPid}, State) -> noreply(rollback_transaction(Txn, ChPid, State)); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c4db3ace73..c94b80aab4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -638,6 +638,17 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}), {noreply, State2}; +handle_method(#'basic.reject'{delivery_tag = DeliveryTag, + requeue = Requeue}, + _, State = #ch{ unacked_message_q = UAMQ}) -> + {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false), + ok = fold_per_queue( + fun (QPid, MsgIds, ok) -> + rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) + end, ok, Acked), + ok = notify_limiter(State#ch.limiter_pid, Acked), + {noreply, State#ch{unacked_message_q = Remaining}}; + handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, passive = false, |
