summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-09-29 00:41:51 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-09-29 00:41:51 +0200
commitf30aaa32cf9f3cd43e9531636f323a726812428a (patch)
treed17e2261b8a694d7a112ea91252921f640e4a6c9 /src
parentc5dc37f9bb70a2daf95e79cc5248fdc9bbe229fb (diff)
downloadrabbitmq-server-git-f30aaa32cf9f3cd43e9531636f323a726812428a.tar.gz
implements BQ:batch_publish and BQ:batch_publish_delivered
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_backing_queue.erl12
-rw-r--r--src/rabbit_mirror_queue_master.erl9
-rw-r--r--src/rabbit_priority_queue.erl9
-rw-r--r--src/rabbit_variable_queue.erl132
4 files changed, 115 insertions, 47 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index a03bda13c9..ae8ce73283 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -33,6 +33,10 @@
-type(flow() :: 'flow' | 'noflow').
-type(msg_ids() :: [rabbit_types:msg_id()]).
+-type(publish() :: {rabbit_types:basic_message(),
+ rabbit_types:message_properties(), boolean(), pid(), flow()}).
+-type(delivered_publish() :: {rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), flow}).
-type(fetch_result(Ack) ::
('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
-type(drop_result(Ack) ::
@@ -104,6 +108,9 @@
rabbit_types:message_properties(), boolean(), pid(), flow(),
state()) -> state().
+%% Like publish/6 but for batches of publishes.
+-callback batch_publish([publish()], state()) -> state().
+
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
@@ -112,6 +119,11 @@
state())
-> {ack(), state()}.
+%% Like publish_delivered/5 but for batches of publishes.
+-callback batch_publish_delivered([delivered_publish()],
+ state())
+ -> {[ack()], state()}.
+
%% Called to inform the BQ about messages which have reached the
%% queue, but are not going to be further passed to BQ.
-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state().
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 7890128872..20a5285542 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,6 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, purge_acks/1, publish/6, publish_delivered/5,
+ batch_publish/2, batch_publish_delivered/2,
discard/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
@@ -241,6 +242,10 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow,
BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
+%% TODO fix
+batch_publish(_Publishes, _State) ->
+ exit({not_implemented, {?MODULE, batch_publish}}).
+
publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
ChPid, Flow, State = #state { gm = GM,
seen_status = SS,
@@ -253,6 +258,10 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
+%% TODO fix
+batch_publish_delivered(_Publishes, _State) ->
+ exit({not_implemented, {?MODULE, batch_publish_delivered}}).
+
discard(MsgId, ChPid, Flow, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index a839badfc4..f94e7274d5 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -35,6 +35,7 @@
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
purge/1, purge_acks/1,
publish/6, publish_delivered/5, discard/4, drain_confirmed/1,
+ batch_publish/2, batch_publish_delivered/2,
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
@@ -203,6 +204,10 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow,
State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)).
+%% TODO fix
+batch_publish(_Publishes, _State) ->
+ exit({not_implemented, {?MODULE, batch_publish}}).
+
publish_delivered(Msg, MsgProps, ChPid, Flow, State = #state{bq = BQ}) ->
pick2(fun (P, BQSN) ->
{AckTag, BQSN1} = BQ:publish_delivered(
@@ -213,6 +218,10 @@ publish_delivered(Msg, MsgProps, ChPid, Flow,
State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)).
+%% TODO fix
+batch_publish_delivered(_Publishes, _State) ->
+ exit({not_implemented, {?MODULE, batch_publish_delivered}}).
+
%% TODO this is a hack. The BQ api does not give us enough information
%% here - if we had the Msg we could look at its priority and forward
%% to the appropriate sub-BQ. But we don't so we are stuck.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8a8945114c..6407228547 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,7 +18,9 @@
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
purge/1, purge_acks/1,
- publish/6, publish_delivered/5, discard/4, drain_confirmed/1,
+ publish/6, publish_delivered/5,
+ batch_publish/2, batch_publish_delivered/2,
+ discard/4, drain_confirmed/1,
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
@@ -558,52 +560,31 @@ purge(State = #vqstate { len = Len }) ->
purge_acks(State) -> a(purge_pending_ack(false, State)).
-publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
- MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- IsDelivered, _ChPid, _Flow,
- State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- qi_embed_msgs_below = IndexMaxSize,
- next_seq_id = SeqId,
- in_counter = InCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
- IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
- {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
- State2 = case ?QUEUE:is_empty(Q3) of
- false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
- true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
- end,
- InCount1 = InCount + 1,
- UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = stats({1, 0}, {none, MsgStatus1},
- State2#vqstate{ next_seq_id = SeqId + 1,
- in_counter = InCount1,
- unconfirmed = UC1 }),
- a(reduce_memory_use(maybe_update_rates(State3))).
-
-publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
- id = MsgId },
- MsgProps = #message_properties {
- needs_confirming = NeedsConfirming },
- _ChPid, _Flow,
- State = #vqstate { qi_embed_msgs_below = IndexMaxSize,
- next_seq_id = SeqId,
- out_counter = OutCount,
- in_counter = InCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
- IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize),
- {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
- State2 = record_pending_ack(m(MsgStatus1), State1),
- UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = stats({0, 1}, {none, MsgStatus1},
- State2 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- unconfirmed = UC1 }),
- {SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}.
+publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) ->
+ State1 =
+ publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
+ fun maybe_write_to_disk/4,
+ State),
+ a(reduce_memory_use(maybe_update_rates(State1))).
+
+batch_publish(Publishes, State) ->
+ State1 =
+ lists:foldl(fun batch_publish1/2, State, Publishes),
+ State2 = ui(State1),
+ a(reduce_memory_use(maybe_update_rates(State2))).
+
+publish_delivered(Msg, MsgProps, ChPid, Flow, State) ->
+ {SeqId, State1} =
+ publish_delivered1(Msg, MsgProps, ChPid, Flow,
+ fun maybe_write_to_disk/4,
+ State),
+ {SeqId, a(reduce_memory_use(maybe_update_rates(State1)))}.
+
+batch_publish_delivered(Publishes, State) ->
+ {SeqIds, State1} =
+ lists:foldl(fun batch_publish_delivered1/2, {[], State}, Publishes),
+ State2 = ui(State1),
+ {lists:reverse(SeqIds), a(reduce_memory_use(maybe_update_rates(State2)))}.
discard(_MsgId, _ChPid, _Flow, State) -> State.
@@ -1518,6 +1499,63 @@ process_delivers_and_acks_fun(_) ->
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
+publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
+ MsgProps = #message_properties { needs_confirming = NeedsConfirming },
+ IsDelivered, _ChPid, _Flow, PersistFun,
+ State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ qi_embed_msgs_below = IndexMaxSize,
+ next_seq_id = SeqId,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
+ {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State),
+ State2 = case ?QUEUE:is_empty(Q3) of
+ false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
+ true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
+ end,
+ InCount1 = InCount + 1,
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ stats({1, 0}, {none, MsgStatus1},
+ State2#vqstate{ next_seq_id = SeqId + 1,
+ in_counter = InCount1,
+ unconfirmed = UC1 }).
+
+batch_publish1({Msg, MsgProps, IsDelivered, ChPid, Flow}, State) ->
+ publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
+ fun maybe_prepare_write_to_disk/4,
+ State).
+
+publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
+ id = MsgId },
+ MsgProps = #message_properties {
+ needs_confirming = NeedsConfirming },
+ _ChPid, _Flow, PersistFun,
+ State = #vqstate { qi_embed_msgs_below = IndexMaxSize,
+ next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize),
+ {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State),
+ State2 = record_pending_ack(m(MsgStatus1), State1),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ State3 = stats({0, 1}, {none, MsgStatus1},
+ State2 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ unconfirmed = UC1 }),
+ {SeqId, State3}.
+
+batch_publish_delivered1({Msg, MsgProps, ChPid, Flow}, {SeqIds, State}) ->
+ {SeqId, State1} =
+ publish_delivered1(Msg, MsgProps, ChPid, Flow,
+ fun maybe_prepare_write_to_disk/4,
+ State),
+ {[SeqId | SeqIds], State1}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_in_store = true }, State) ->