diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-09-29 00:41:51 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-09-29 00:41:51 +0200 |
| commit | f30aaa32cf9f3cd43e9531636f323a726812428a (patch) | |
| tree | d17e2261b8a694d7a112ea91252921f640e4a6c9 /src | |
| parent | c5dc37f9bb70a2daf95e79cc5248fdc9bbe229fb (diff) | |
| download | rabbitmq-server-git-f30aaa32cf9f3cd43e9531636f323a726812428a.tar.gz | |
implements BQ:batch_publish and BQ:batch_publish_delivered
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_backing_queue.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_priority_queue.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 132 |
4 files changed, 115 insertions, 47 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index a03bda13c9..ae8ce73283 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -33,6 +33,10 @@ -type(flow() :: 'flow' | 'noflow'). -type(msg_ids() :: [rabbit_types:msg_id()]). +-type(publish() :: {rabbit_types:basic_message(), + rabbit_types:message_properties(), boolean(), pid(), flow()}). +-type(delivered_publish() :: {rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), flow}). -type(fetch_result(Ack) :: ('empty' | {rabbit_types:basic_message(), boolean(), Ack})). -type(drop_result(Ack) :: @@ -104,6 +108,9 @@ rabbit_types:message_properties(), boolean(), pid(), flow(), state()) -> state(). +%% Like publish/6 but for batches of publishes. +-callback batch_publish([publish()], state()) -> state(). + %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). @@ -112,6 +119,11 @@ state()) -> {ack(), state()}. +%% Like publish_delivered/5 but for batches of publishes. +-callback batch_publish_delivered([delivered_publish()], + state()) + -> {[ack()], state()}. + %% Called to inform the BQ about messages which have reached the %% queue, but are not going to be further passed to BQ. -callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state(). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 7890128872..20a5285542 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -18,6 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1, publish/6, publish_delivered/5, + batch_publish/2, batch_publish_delivered/2, discard/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, @@ -241,6 +242,10 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow, BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). +%% TODO fix +batch_publish(_Publishes, _State) -> + exit({not_implemented, {?MODULE, batch_publish}}). + publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, Flow, State = #state { gm = GM, seen_status = SS, @@ -253,6 +258,10 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, State1 = State #state { backing_queue_state = BQS1 }, {AckTag, ensure_monitoring(ChPid, State1)}. +%% TODO fix +batch_publish_delivered(_Publishes, _State) -> + exit({not_implemented, {?MODULE, batch_publish_delivered}}). + discard(MsgId, ChPid, Flow, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index a839badfc4..f94e7274d5 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -35,6 +35,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, purge/1, purge_acks/1, publish/6, publish_delivered/5, discard/4, drain_confirmed/1, + batch_publish/2, batch_publish_delivered/2, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, @@ -203,6 +204,10 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)). +%% TODO fix +batch_publish(_Publishes, _State) -> + exit({not_implemented, {?MODULE, batch_publish}}). + publish_delivered(Msg, MsgProps, ChPid, Flow, State = #state{bq = BQ}) -> pick2(fun (P, BQSN) -> {AckTag, BQSN1} = BQ:publish_delivered( @@ -213,6 +218,10 @@ publish_delivered(Msg, MsgProps, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)). +%% TODO fix +batch_publish_delivered(_Publishes, _State) -> + exit({not_implemented, {?MODULE, batch_publish_delivered}}). + %% TODO this is a hack. The BQ api does not give us enough information %% here - if we had the Msg we could look at its priority and forward %% to the appropriate sub-BQ. But we don't so we are stuck. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8a8945114c..6407228547 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,7 +18,9 @@ -export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, purge/1, purge_acks/1, - publish/6, publish_delivered/5, discard/4, drain_confirmed/1, + publish/6, publish_delivered/5, + batch_publish/2, batch_publish_delivered/2, + discard/4, drain_confirmed/1, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, @@ -558,52 +560,31 @@ purge(State = #vqstate { len = Len }) -> purge_acks(State) -> a(purge_pending_ack(false, State)). -publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, - MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - IsDelivered, _ChPid, _Flow, - State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, - qi_embed_msgs_below = IndexMaxSize, - next_seq_id = SeqId, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC }) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), - {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - State2 = case ?QUEUE:is_empty(Q3) of - false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; - true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } - end, - InCount1 = InCount + 1, - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = stats({1, 0}, {none, MsgStatus1}, - State2#vqstate{ next_seq_id = SeqId + 1, - in_counter = InCount1, - unconfirmed = UC1 }), - a(reduce_memory_use(maybe_update_rates(State3))). - -publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, - id = MsgId }, - MsgProps = #message_properties { - needs_confirming = NeedsConfirming }, - _ChPid, _Flow, - State = #vqstate { qi_embed_msgs_below = IndexMaxSize, - next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC }) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), - {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - State2 = record_pending_ack(m(MsgStatus1), State1), - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = stats({0, 1}, {none, MsgStatus1}, - State2 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - unconfirmed = UC1 }), - {SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}. +publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) -> + State1 = + publish1(Msg, MsgProps, IsDelivered, ChPid, Flow, + fun maybe_write_to_disk/4, + State), + a(reduce_memory_use(maybe_update_rates(State1))). + +batch_publish(Publishes, State) -> + State1 = + lists:foldl(fun batch_publish1/2, State, Publishes), + State2 = ui(State1), + a(reduce_memory_use(maybe_update_rates(State2))). + +publish_delivered(Msg, MsgProps, ChPid, Flow, State) -> + {SeqId, State1} = + publish_delivered1(Msg, MsgProps, ChPid, Flow, + fun maybe_write_to_disk/4, + State), + {SeqId, a(reduce_memory_use(maybe_update_rates(State1)))}. + +batch_publish_delivered(Publishes, State) -> + {SeqIds, State1} = + lists:foldl(fun batch_publish_delivered1/2, {[], State}, Publishes), + State2 = ui(State1), + {lists:reverse(SeqIds), a(reduce_memory_use(maybe_update_rates(State2)))}. discard(_MsgId, _ChPid, _Flow, State) -> State. @@ -1518,6 +1499,63 @@ process_delivers_and_acks_fun(_) -> %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- +publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, + MsgProps = #message_properties { needs_confirming = NeedsConfirming }, + IsDelivered, _ChPid, _Flow, PersistFun, + State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, + qi_embed_msgs_below = IndexMaxSize, + next_seq_id = SeqId, + in_counter = InCount, + durable = IsDurable, + unconfirmed = UC }) -> + IsPersistent1 = IsDurable andalso IsPersistent, + MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), + {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State), + State2 = case ?QUEUE:is_empty(Q3) of + false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; + true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } + end, + InCount1 = InCount + 1, + UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), + stats({1, 0}, {none, MsgStatus1}, + State2#vqstate{ next_seq_id = SeqId + 1, + in_counter = InCount1, + unconfirmed = UC1 }). + +batch_publish1({Msg, MsgProps, IsDelivered, ChPid, Flow}, State) -> + publish1(Msg, MsgProps, IsDelivered, ChPid, Flow, + fun maybe_prepare_write_to_disk/4, + State). + +publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, + id = MsgId }, + MsgProps = #message_properties { + needs_confirming = NeedsConfirming }, + _ChPid, _Flow, PersistFun, + State = #vqstate { qi_embed_msgs_below = IndexMaxSize, + next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + durable = IsDurable, + unconfirmed = UC }) -> + IsPersistent1 = IsDurable andalso IsPersistent, + MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), + {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State), + State2 = record_pending_ack(m(MsgStatus1), State1), + UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), + State3 = stats({0, 1}, {none, MsgStatus1}, + State2 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + unconfirmed = UC1 }), + {SeqId, State3}. + +batch_publish_delivered1({Msg, MsgProps, ChPid, Flow}, {SeqIds, State}) -> + {SeqId, State1} = + publish_delivered1(Msg, MsgProps, ChPid, Flow, + fun maybe_prepare_write_to_disk/4, + State), + {[SeqId | SeqIds], State1}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_in_store = true }, State) -> |
