diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2015-02-13 12:26:36 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2015-02-13 12:26:36 +0000 |
| commit | bab7bae982cd48b887a9aae30f2c2a4874a1b82e (patch) | |
| tree | ddc698e3c9be01daa8c882011587a9dd94d2b09f | |
| parent | 547b69d14bcdfdd2060ba1797ef626dfa60d0261 (diff) | |
| download | rabbitmq-server-git-bab7bae982cd48b887a9aae30f2c2a4874a1b82e.tar.gz | |
API updates for default.
| -rw-r--r-- | src/rabbit_priority_queue.erl | 32 |
1 files changed, 16 insertions, 16 deletions
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index 635db5cc34..eabf289d65 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -32,7 +32,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, purge/1, purge_acks/1, - publish/5, publish_delivered/4, discard/3, drain_confirmed/1, + publish/6, publish_delivered/5, discard/4, drain_confirmed/1, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, @@ -159,7 +159,7 @@ delete_and_terminate(Reason, State = #state{bq = BQ}) -> delete_and_terminate(Reason, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough1(delete_and_terminate(Reason, BQS)). -delete_crashed(Q = #amqqueue{name = QName}) -> +delete_crashed(Q) -> BQ = bq(), case priorities(Q) of none -> BQ:delete_crashed(Q); @@ -176,40 +176,40 @@ purge_acks(State = #state{bq = BQ}) -> purge_acks(State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough1(purge_acks(BQS)). -publish(Msg, MsgProps, IsDelivered, ChPid, State = #state{bq = BQ}) -> +publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State = #state{bq = BQ}) -> pick1(fun (_P, BQSN) -> - BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQSN) + BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQSN) end, Msg, State); -publish(Msg, MsgProps, IsDelivered, ChPid, +publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> - ?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, BQS)). + ?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)). -publish_delivered(Msg, MsgProps, ChPid, State = #state{bq = BQ}) -> +publish_delivered(Msg, MsgProps, ChPid, Flow, State = #state{bq = BQ}) -> pick2(fun (P, BQSN) -> {AckTag, BQSN1} = BQ:publish_delivered( - Msg, MsgProps, ChPid, BQSN), + Msg, MsgProps, ChPid, Flow, BQSN), {{P, AckTag}, BQSN1} end, Msg, State); -publish_delivered(Msg, MsgProps, ChPid, +publish_delivered(Msg, MsgProps, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> - ?passthrough2(publish_delivered(Msg, MsgProps, ChPid, BQS)). + ?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)). %% TODO this is a hack. The BQ api does not give us enough information %% here - if we had the Msg we could look at its priority and forward %% to the appropriate sub-BQ. But we don't so we are stuck. %% -%% But fortunately VQ ignores discard/3, so we can too, *assuming we -%% are talking to VQ*. discard/3 is used by HA, but that's "above" us +%% But fortunately VQ ignores discard/4, so we can too, *assuming we +%% are talking to VQ*. discard/4 is used by HA, but that's "above" us %% (if in use) so we don't break that either, just some hypothetical %% alternate BQ implementation. -discard(_MsgId, _ChPid, State = #state{}) -> +discard(_MsgId, _ChPid, _Flow, State = #state{}) -> State; %% We should have something a bit like this here: %% pick1(fun (_P, BQSN) -> - %% BQ:discard(MsgId, ChPid, BQSN) + %% BQ:discard(MsgId, ChPid, Flow, BQSN) %% end, Msg, State); -discard(MsgId, ChPid, State = #passthrough{bq = BQ, bqs = BQS}) -> - ?passthrough1(discard(MsgId, ChPid, BQS)). +discard(MsgId, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(discard(MsgId, ChPid, Flow, BQS)). drain_confirmed(State = #state{bq = BQ}) -> fold_append2(fun (_P, BQSN) -> BQ:drain_confirmed(BQSN) end, State); |
