diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-02 23:24:47 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-02 23:24:47 +0200 |
| commit | 5c2d50deed094e2e6e07990793bc8b09739a34c9 (patch) | |
| tree | 5f3f0f097cd8e7a8cd1fad4eb5eac772eb903db4 | |
| parent | ccae00a253236f817587e9145e8fc1e85cbf9fd1 (diff) | |
| download | rabbitmq-server-git-5c2d50deed094e2e6e07990793bc8b09739a34c9.tar.gz | |
implements batch publishing for mirrored queues
| -rw-r--r-- | src/rabbit_backing_queue.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 44 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 26 |
4 files changed, 83 insertions, 27 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index ae8ce73283..2b808e206c 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -34,9 +34,9 @@ -type(flow() :: 'flow' | 'noflow'). -type(msg_ids() :: [rabbit_types:msg_id()]). -type(publish() :: {rabbit_types:basic_message(), - rabbit_types:message_properties(), boolean(), pid(), flow()}). + rabbit_types:message_properties(), boolean()}). -type(delivered_publish() :: {rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), flow}). + rabbit_types:message_properties()}). -type(fetch_result(Ack) :: ('empty' | {rabbit_types:basic_message(), boolean(), Ack})). -type(drop_result(Ack) :: @@ -109,7 +109,7 @@ state()) -> state(). %% Like publish/6 but for batches of publishes. --callback batch_publish([publish()], state()) -> state(). +-callback batch_publish([publish()], pid(), flow(), state()) -> state(). %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls @@ -120,7 +120,7 @@ -> {ack(), state()}. %% Like publish_delivered/5 but for batches of publishes. --callback batch_publish_delivered([delivered_publish()], +-callback batch_publish_delivered([delivered_publish()], pid(), flow(), state()) -> {[ack()], state()}. @@ -265,8 +265,9 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, {delete_and_terminate, 2}, {delete_crashed, 1}, {purge, 1}, - {purge_acks, 1}, {publish, 6}, - {publish_delivered, 5}, {discard, 4}, {drain_confirmed, 1}, + {purge_acks, 1}, {publish, 6}, {publish_delivered, 5}, + {batch_publish, 4}, {batch_publish_delivered, 4}, + {discard, 4}, {drain_confirmed, 1}, {dropwhile, 2}, {fetchwhile, 4}, {fetch, 2}, {drop, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 67c8f34ac3..3b7d077b21 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -18,7 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1, publish/6, publish_delivered/5, - batch_publish/2, batch_publish_delivered/2, + batch_publish/4, batch_publish_delivered/4, discard/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, @@ -244,9 +244,25 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow, BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). -%% TODO fix -batch_publish(_Publishes, _State) -> - exit({not_implemented, {?MODULE, batch_publish}}). +batch_publish(Publishes, ChPid, Flow, + State = #state { gm = GM, + seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {Publishes1, false, MsgSizes} = + lists:foldl(fun ({Msg = #basic_message { id = MsgId }, + MsgProps, _IsDelivered}, {Pubs, false, Sizes}) -> + {[{Msg, MsgProps, true} | Pubs], %% [0] + false = dict:is_key(MsgId, SS), %% ASSERTION + Sizes + rabbit_basic:msg_size(Msg)} + end, {[], false, 0}, Publishes), + Publishes2 = lists:reverse(Publishes1), + ok = gm:broadcast(GM, {batch_publish, ChPid, Flow, Publishes2}, + MsgSizes), + BQS1 = BQ:batch_publish(Publishes2, ChPid, Flow, BQS), + ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). +%% [0] when the slave process the publish instruction, it sets the +%% IsDelivered flag to true. publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, Flow, State = #state { gm = GM, @@ -260,9 +276,22 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, State1 = State #state { backing_queue_state = BQS1 }, {AckTag, ensure_monitoring(ChPid, State1)}. -%% TODO fix -batch_publish_delivered(_Publishes, _State) -> - exit({not_implemented, {?MODULE, batch_publish_delivered}}). +batch_publish_delivered(Publishes, ChPid, Flow, + State = #state { gm = GM, + seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {false, MsgSizes} = + lists:foldl(fun ({Msg = #basic_message { id = MsgId }, _MsgProps}, + {false, Sizes}) -> + {false = dict:is_key(MsgId, SS), %% ASSERTION + Sizes + rabbit_basic:msg_size(Msg)} + end, {false, 0}, Publishes), + ok = gm:broadcast(GM, {batch_publish_delivered, ChPid, Flow, Publishes}, + MsgSizes), + {AckTags, BQS1} = BQ:batch_publish_delivered(Publishes, ChPid, Flow, BQS), + State1 = State #state { backing_queue_state = BQS1 }, + {AckTags, ensure_monitoring(ChPid, State1)}. discard(MsgId, ChPid, Flow, State = #state { gm = GM, backing_queue = BQ, @@ -502,7 +531,6 @@ depth_fun() -> %% --------------------------------------------------------------------------- %% Helpers %% --------------------------------------------------------------------------- - drop_one(AckTag, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 7f309ab0b7..5da91c70c5 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -851,6 +851,15 @@ process_instruction({publish, ChPid, Flow, MsgProps, publish_or_discard(published, ChPid, MsgId, State), BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, Flow, BQS), {ok, State1 #state { backing_queue_state = BQS1 }}; +process_instruction({batch_publish, ChPid, Flow, Publishes}, State) -> + maybe_flow_ack(ChPid, Flow), + State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = + lists:foldl(fun ({#basic_message { id = MsgId }, + _MsgProps, _IsDelivered}, St) -> + publish_or_discard(published, ChPid, MsgId, St) + end, State, Publishes), + BQS1 = BQ:batch_publish(Publishes, ChPid, Flow, BQS), + {ok, State1 #state { backing_queue_state = BQS1 }}; process_instruction({publish_delivered, ChPid, Flow, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> maybe_flow_ack(ChPid, Flow), @@ -860,6 +869,24 @@ process_instruction({publish_delivered, ChPid, Flow, MsgProps, {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS), {ok, maybe_store_ack(true, MsgId, AckTag, State1 #state { backing_queue_state = BQS1 })}; +process_instruction({batch_publish_delivered, ChPid, Flow, Publishes}, State) -> + maybe_flow_ack(ChPid, Flow), + {MsgIds, + State1 = #state { backing_queue = BQ, backing_queue_state = BQS }} = + lists:foldl(fun ({#basic_message { id = MsgId }, _MsgProps}, + {MsgIds, St}) -> + {[MsgId | MsgIds], + publish_or_discard(published, ChPid, MsgId, St)} + end, {[], State}, Publishes), + true = BQ:is_empty(BQS), + {AckTags, BQS1} = BQ:batch_publish_delivered(Publishes, ChPid, Flow, BQS), + MsgIdsAndAcks = lists:zip(lists:reverse(MsgIds), AckTags), + State2 = lists:foldl( + fun ({MsgId, AckTag}, St) -> + maybe_store_ack(true, MsgId, AckTag, St) + end, State1 #state { backing_queue_state = BQS1 }, + MsgIdsAndAcks), + {ok, State2}; process_instruction({discard, ChPid, Flow, MsgId}, State) -> maybe_flow_ack(ChPid, Flow), State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6407228547..ee45a8d671 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -19,7 +19,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, purge/1, purge_acks/1, publish/6, publish_delivered/5, - batch_publish/2, batch_publish_delivered/2, + batch_publish/4, batch_publish_delivered/4, discard/4, drain_confirmed/1, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, @@ -567,9 +567,9 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) -> State), a(reduce_memory_use(maybe_update_rates(State1))). -batch_publish(Publishes, State) -> - State1 = - lists:foldl(fun batch_publish1/2, State, Publishes), +batch_publish(Publishes, ChPid, Flow, State) -> + {ChPid, Flow, State1} = + lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes), State2 = ui(State1), a(reduce_memory_use(maybe_update_rates(State2))). @@ -580,9 +580,10 @@ publish_delivered(Msg, MsgProps, ChPid, Flow, State) -> State), {SeqId, a(reduce_memory_use(maybe_update_rates(State1)))}. -batch_publish_delivered(Publishes, State) -> - {SeqIds, State1} = - lists:foldl(fun batch_publish_delivered1/2, {[], State}, Publishes), +batch_publish_delivered(Publishes, ChPid, Flow, State) -> + {ChPid, Flow, SeqIds, State1} = + lists:foldl(fun batch_publish_delivered1/2, + {ChPid, Flow, [], State}, Publishes), State2 = ui(State1), {lists:reverse(SeqIds), a(reduce_memory_use(maybe_update_rates(State2)))}. @@ -1522,10 +1523,9 @@ publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, in_counter = InCount1, unconfirmed = UC1 }). -batch_publish1({Msg, MsgProps, IsDelivered, ChPid, Flow}, State) -> - publish1(Msg, MsgProps, IsDelivered, ChPid, Flow, - fun maybe_prepare_write_to_disk/4, - State). +batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) -> + {ChPid, Flow, publish1(Msg, MsgProps, IsDelivered, ChPid, Flow, + fun maybe_prepare_write_to_disk/4, State)}. publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, @@ -1550,12 +1550,12 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, unconfirmed = UC1 }), {SeqId, State3}. -batch_publish_delivered1({Msg, MsgProps, ChPid, Flow}, {SeqIds, State}) -> +batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) -> {SeqId, State1} = publish_delivered1(Msg, MsgProps, ChPid, Flow, fun maybe_prepare_write_to_disk/4, State), - {[SeqId | SeqIds], State1}. + {ChPid, Flow, [SeqId | SeqIds], State1}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_in_store = true }, State) -> |
