summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-02-13 12:26:36 +0000
committerSimon MacMullen <simon@rabbitmq.com>2015-02-13 12:26:36 +0000
commitbab7bae982cd48b887a9aae30f2c2a4874a1b82e (patch)
treeddc698e3c9be01daa8c882011587a9dd94d2b09f
parent547b69d14bcdfdd2060ba1797ef626dfa60d0261 (diff)
downloadrabbitmq-server-git-bab7bae982cd48b887a9aae30f2c2a4874a1b82e.tar.gz
API updates for default.
-rw-r--r--src/rabbit_priority_queue.erl32
1 files changed, 16 insertions, 16 deletions
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index 635db5cc34..eabf289d65 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -32,7 +32,7 @@
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
purge/1, purge_acks/1,
- publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
+ publish/6, publish_delivered/5, discard/4, drain_confirmed/1,
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
@@ -159,7 +159,7 @@ delete_and_terminate(Reason, State = #state{bq = BQ}) ->
delete_and_terminate(Reason, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(delete_and_terminate(Reason, BQS)).
-delete_crashed(Q = #amqqueue{name = QName}) ->
+delete_crashed(Q) ->
BQ = bq(),
case priorities(Q) of
none -> BQ:delete_crashed(Q);
@@ -176,40 +176,40 @@ purge_acks(State = #state{bq = BQ}) ->
purge_acks(State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(purge_acks(BQS)).
-publish(Msg, MsgProps, IsDelivered, ChPid, State = #state{bq = BQ}) ->
+publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State = #state{bq = BQ}) ->
pick1(fun (_P, BQSN) ->
- BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQSN)
+ BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQSN)
end, Msg, State);
-publish(Msg, MsgProps, IsDelivered, ChPid,
+publish(Msg, MsgProps, IsDelivered, ChPid, Flow,
State = #passthrough{bq = BQ, bqs = BQS}) ->
- ?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, BQS)).
+ ?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)).
-publish_delivered(Msg, MsgProps, ChPid, State = #state{bq = BQ}) ->
+publish_delivered(Msg, MsgProps, ChPid, Flow, State = #state{bq = BQ}) ->
pick2(fun (P, BQSN) ->
{AckTag, BQSN1} = BQ:publish_delivered(
- Msg, MsgProps, ChPid, BQSN),
+ Msg, MsgProps, ChPid, Flow, BQSN),
{{P, AckTag}, BQSN1}
end, Msg, State);
-publish_delivered(Msg, MsgProps, ChPid,
+publish_delivered(Msg, MsgProps, ChPid, Flow,
State = #passthrough{bq = BQ, bqs = BQS}) ->
- ?passthrough2(publish_delivered(Msg, MsgProps, ChPid, BQS)).
+ ?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)).
%% TODO this is a hack. The BQ api does not give us enough information
%% here - if we had the Msg we could look at its priority and forward
%% to the appropriate sub-BQ. But we don't so we are stuck.
%%
-%% But fortunately VQ ignores discard/3, so we can too, *assuming we
-%% are talking to VQ*. discard/3 is used by HA, but that's "above" us
+%% But fortunately VQ ignores discard/4, so we can too, *assuming we
+%% are talking to VQ*. discard/4 is used by HA, but that's "above" us
%% (if in use) so we don't break that either, just some hypothetical
%% alternate BQ implementation.
-discard(_MsgId, _ChPid, State = #state{}) ->
+discard(_MsgId, _ChPid, _Flow, State = #state{}) ->
State;
%% We should have something a bit like this here:
%% pick1(fun (_P, BQSN) ->
- %% BQ:discard(MsgId, ChPid, BQSN)
+ %% BQ:discard(MsgId, ChPid, Flow, BQSN)
%% end, Msg, State);
-discard(MsgId, ChPid, State = #passthrough{bq = BQ, bqs = BQS}) ->
- ?passthrough1(discard(MsgId, ChPid, BQS)).
+discard(MsgId, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough1(discard(MsgId, ChPid, Flow, BQS)).
drain_confirmed(State = #state{bq = BQ}) ->
fold_append2(fun (_P, BQSN) -> BQ:drain_confirmed(BQSN) end, State);