summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-10-02 23:24:47 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-10-02 23:24:47 +0200
commit5c2d50deed094e2e6e07990793bc8b09739a34c9 (patch)
tree5f3f0f097cd8e7a8cd1fad4eb5eac772eb903db4
parentccae00a253236f817587e9145e8fc1e85cbf9fd1 (diff)
downloadrabbitmq-server-git-5c2d50deed094e2e6e07990793bc8b09739a34c9.tar.gz
implements batch publishing for mirrored queues
-rw-r--r--src/rabbit_backing_queue.erl13
-rw-r--r--src/rabbit_mirror_queue_master.erl44
-rw-r--r--src/rabbit_mirror_queue_slave.erl27
-rw-r--r--src/rabbit_variable_queue.erl26
4 files changed, 83 insertions, 27 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index ae8ce73283..2b808e206c 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -34,9 +34,9 @@
-type(flow() :: 'flow' | 'noflow').
-type(msg_ids() :: [rabbit_types:msg_id()]).
-type(publish() :: {rabbit_types:basic_message(),
- rabbit_types:message_properties(), boolean(), pid(), flow()}).
+ rabbit_types:message_properties(), boolean()}).
-type(delivered_publish() :: {rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), flow}).
+ rabbit_types:message_properties()}).
-type(fetch_result(Ack) ::
('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
-type(drop_result(Ack) ::
@@ -109,7 +109,7 @@
state()) -> state().
%% Like publish/6 but for batches of publishes.
--callback batch_publish([publish()], state()) -> state().
+-callback batch_publish([publish()], pid(), flow(), state()) -> state().
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
@@ -120,7 +120,7 @@
-> {ack(), state()}.
%% Like publish_delivered/5 but for batches of publishes.
--callback batch_publish_delivered([delivered_publish()],
+-callback batch_publish_delivered([delivered_publish()], pid(), flow(),
state())
-> {[ack()], state()}.
@@ -265,8 +265,9 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
{delete_and_terminate, 2}, {delete_crashed, 1}, {purge, 1},
- {purge_acks, 1}, {publish, 6},
- {publish_delivered, 5}, {discard, 4}, {drain_confirmed, 1},
+ {purge_acks, 1}, {publish, 6}, {publish_delivered, 5},
+ {batch_publish, 4}, {batch_publish_delivered, 4},
+ {discard, 4}, {drain_confirmed, 1},
{dropwhile, 2}, {fetchwhile, 4}, {fetch, 2},
{drop, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 67c8f34ac3..3b7d077b21 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, purge_acks/1, publish/6, publish_delivered/5,
- batch_publish/2, batch_publish_delivered/2,
+ batch_publish/4, batch_publish_delivered/4,
discard/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
@@ -244,9 +244,25 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow,
BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
-%% TODO fix
-batch_publish(_Publishes, _State) ->
- exit({not_implemented, {?MODULE, batch_publish}}).
+batch_publish(Publishes, ChPid, Flow,
+ State = #state { gm = GM,
+ seen_status = SS,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Publishes1, false, MsgSizes} =
+ lists:foldl(fun ({Msg = #basic_message { id = MsgId },
+ MsgProps, _IsDelivered}, {Pubs, false, Sizes}) ->
+ {[{Msg, MsgProps, true} | Pubs], %% [0]
+ false = dict:is_key(MsgId, SS), %% ASSERTION
+ Sizes + rabbit_basic:msg_size(Msg)}
+ end, {[], false, 0}, Publishes),
+ Publishes2 = lists:reverse(Publishes1),
+ ok = gm:broadcast(GM, {batch_publish, ChPid, Flow, Publishes2},
+ MsgSizes),
+ BQS1 = BQ:batch_publish(Publishes2, ChPid, Flow, BQS),
+ ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
+%% [0] when the slave process the publish instruction, it sets the
+%% IsDelivered flag to true.
publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
ChPid, Flow, State = #state { gm = GM,
@@ -260,9 +276,22 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
-%% TODO fix
-batch_publish_delivered(_Publishes, _State) ->
- exit({not_implemented, {?MODULE, batch_publish_delivered}}).
+batch_publish_delivered(Publishes, ChPid, Flow,
+ State = #state { gm = GM,
+ seen_status = SS,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {false, MsgSizes} =
+ lists:foldl(fun ({Msg = #basic_message { id = MsgId }, _MsgProps},
+ {false, Sizes}) ->
+ {false = dict:is_key(MsgId, SS), %% ASSERTION
+ Sizes + rabbit_basic:msg_size(Msg)}
+ end, {false, 0}, Publishes),
+ ok = gm:broadcast(GM, {batch_publish_delivered, ChPid, Flow, Publishes},
+ MsgSizes),
+ {AckTags, BQS1} = BQ:batch_publish_delivered(Publishes, ChPid, Flow, BQS),
+ State1 = State #state { backing_queue_state = BQS1 },
+ {AckTags, ensure_monitoring(ChPid, State1)}.
discard(MsgId, ChPid, Flow, State = #state { gm = GM,
backing_queue = BQ,
@@ -502,7 +531,6 @@ depth_fun() ->
%% ---------------------------------------------------------------------------
%% Helpers
%% ---------------------------------------------------------------------------
-
drop_one(AckTag, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 7f309ab0b7..5da91c70c5 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -851,6 +851,15 @@ process_instruction({publish, ChPid, Flow, MsgProps,
publish_or_discard(published, ChPid, MsgId, State),
BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, Flow, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
+process_instruction({batch_publish, ChPid, Flow, Publishes}, State) ->
+ maybe_flow_ack(ChPid, Flow),
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ lists:foldl(fun ({#basic_message { id = MsgId },
+ _MsgProps, _IsDelivered}, St) ->
+ publish_or_discard(published, ChPid, MsgId, St)
+ end, State, Publishes),
+ BQS1 = BQ:batch_publish(Publishes, ChPid, Flow, BQS),
+ {ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({publish_delivered, ChPid, Flow, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
maybe_flow_ack(ChPid, Flow),
@@ -860,6 +869,24 @@ process_instruction({publish_delivered, ChPid, Flow, MsgProps,
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS),
{ok, maybe_store_ack(true, MsgId, AckTag,
State1 #state { backing_queue_state = BQS1 })};
+process_instruction({batch_publish_delivered, ChPid, Flow, Publishes}, State) ->
+ maybe_flow_ack(ChPid, Flow),
+ {MsgIds,
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS }} =
+ lists:foldl(fun ({#basic_message { id = MsgId }, _MsgProps},
+ {MsgIds, St}) ->
+ {[MsgId | MsgIds],
+ publish_or_discard(published, ChPid, MsgId, St)}
+ end, {[], State}, Publishes),
+ true = BQ:is_empty(BQS),
+ {AckTags, BQS1} = BQ:batch_publish_delivered(Publishes, ChPid, Flow, BQS),
+ MsgIdsAndAcks = lists:zip(lists:reverse(MsgIds), AckTags),
+ State2 = lists:foldl(
+ fun ({MsgId, AckTag}, St) ->
+ maybe_store_ack(true, MsgId, AckTag, St)
+ end, State1 #state { backing_queue_state = BQS1 },
+ MsgIdsAndAcks),
+ {ok, State2};
process_instruction({discard, ChPid, Flow, MsgId}, State) ->
maybe_flow_ack(ChPid, Flow),
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6407228547..ee45a8d671 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -19,7 +19,7 @@
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
purge/1, purge_acks/1,
publish/6, publish_delivered/5,
- batch_publish/2, batch_publish_delivered/2,
+ batch_publish/4, batch_publish_delivered/4,
discard/4, drain_confirmed/1,
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
@@ -567,9 +567,9 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) ->
State),
a(reduce_memory_use(maybe_update_rates(State1))).
-batch_publish(Publishes, State) ->
- State1 =
- lists:foldl(fun batch_publish1/2, State, Publishes),
+batch_publish(Publishes, ChPid, Flow, State) ->
+ {ChPid, Flow, State1} =
+ lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes),
State2 = ui(State1),
a(reduce_memory_use(maybe_update_rates(State2))).
@@ -580,9 +580,10 @@ publish_delivered(Msg, MsgProps, ChPid, Flow, State) ->
State),
{SeqId, a(reduce_memory_use(maybe_update_rates(State1)))}.
-batch_publish_delivered(Publishes, State) ->
- {SeqIds, State1} =
- lists:foldl(fun batch_publish_delivered1/2, {[], State}, Publishes),
+batch_publish_delivered(Publishes, ChPid, Flow, State) ->
+ {ChPid, Flow, SeqIds, State1} =
+ lists:foldl(fun batch_publish_delivered1/2,
+ {ChPid, Flow, [], State}, Publishes),
State2 = ui(State1),
{lists:reverse(SeqIds), a(reduce_memory_use(maybe_update_rates(State2)))}.
@@ -1522,10 +1523,9 @@ publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
in_counter = InCount1,
unconfirmed = UC1 }).
-batch_publish1({Msg, MsgProps, IsDelivered, ChPid, Flow}, State) ->
- publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
- fun maybe_prepare_write_to_disk/4,
- State).
+batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) ->
+ {ChPid, Flow, publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
+ fun maybe_prepare_write_to_disk/4, State)}.
publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
@@ -1550,12 +1550,12 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
unconfirmed = UC1 }),
{SeqId, State3}.
-batch_publish_delivered1({Msg, MsgProps, ChPid, Flow}, {SeqIds, State}) ->
+batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) ->
{SeqId, State1} =
publish_delivered1(Msg, MsgProps, ChPid, Flow,
fun maybe_prepare_write_to_disk/4,
State),
- {[SeqId | SeqIds], State1}.
+ {ChPid, Flow, [SeqId | SeqIds], State1}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_in_store = true }, State) ->