summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/gen_server2.erl16
-rw-r--r--src/rabbit_amqqueue.erl26
-rw-r--r--src/rabbit_amqqueue_process.erl44
-rw-r--r--src/rabbit_backing_queue.erl8
-rw-r--r--src/rabbit_mnesia.erl18
-rw-r--r--src/rabbit_router.erl6
-rw-r--r--src/rabbit_tests.erl18
-rw-r--r--src/rabbit_types.erl3
-rw-r--r--src/rabbit_variable_queue.erl28
9 files changed, 118 insertions, 49 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 6e02b23ecb..d6b09babc2 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -880,6 +880,22 @@ handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name,
loop(GS2State #gs2_state { state = NState,
time = Time1,
debug = Debug1 });
+ {become, Mod, NState} ->
+ Debug1 = common_debug(Debug, fun print_event/3, Name,
+ {become, Mod, NState}),
+ loop(find_prioritisers(
+ GS2State #gs2_state { mod = Mod,
+ state = NState,
+ time = infinity,
+ debug = Debug1 }));
+ {become, Mod, NState, Time1} ->
+ Debug1 = common_debug(Debug, fun print_event/3, Name,
+ {become, Mod, NState}),
+ loop(find_prioritisers(
+ GS2State #gs2_state { mod = Mod,
+ state = NState,
+ time = Time1,
+ debug = Debug1 }));
_ ->
handle_common_termination(Reply, Msg, GS2State)
end.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 775c631d51..94c05f8b88 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -201,12 +201,13 @@ recover_durable_queues(DurableQueues) ->
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
- Q = start_queue_process(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
+ Q = start_queue_process(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
exclusive_owner = Owner,
- pid = none}),
+ pid = none,
+ extra_pids = []}),
case gen_server2:call(Q#amqqueue.pid, {init, false}) of
not_found -> rabbit_misc:not_found(QueueName);
Q1 -> Q1
@@ -486,9 +487,11 @@ on_node_down(Node) ->
rabbit_binding:new_deletions(),
rabbit_misc:execute_mnesia_transaction(
fun () -> qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid}
+ #amqqueue{name = QueueName, pid = Pid,
+ extra_pids = EPids}
<- mnesia:table(rabbit_queue),
- node(Pid) == Node]))
+ node(Pid) == Node,
+ [] =:= EPids]))
end))).
delete_queue(QueueName) ->
@@ -496,11 +499,12 @@ delete_queue(QueueName) ->
rabbit_binding:remove_transient_for_destination(QueueName).
pseudo_queue(QueueName, Pid) ->
- #amqqueue{name = QueueName,
- durable = false,
+ #amqqueue{name = QueueName,
+ durable = false,
auto_delete = false,
- arguments = [],
- pid = Pid}.
+ arguments = [],
+ pid = Pid,
+ extra_pids = []}.
safe_delegate_call_ok(F, Pids) ->
{_, Bad} = delegate:invoke(Pids,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 78bb683578..c1972c261b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -48,6 +48,8 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2, prioritise_info/2]).
+-export([init_with_backing_queue_state/4]).
+
% Queue's state
-record(q, {q,
exclusive_consumer,
@@ -112,12 +114,11 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
- {ok, BQ} = application:get_env(backing_queue_module),
{ok, #q{q = Q#amqqueue{pid = self()},
exclusive_consumer = none,
has_had_consumers = false,
- backing_queue = BQ,
+ backing_queue = backing_queue_module(Q),
backing_queue_state = undefined,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
@@ -130,6 +131,23 @@ init(Q) ->
guid_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+init_with_backing_queue_state(Q, BQ, BQS, RateTRef) ->
+ ?LOGDEBUG("Queue starting - ~p~n", [Q]),
+ process_args(#q{q = Q#amqqueue{pid = self()},
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new(),
+ expires = undefined,
+ sync_timer_ref = undefined,
+ rate_timer_ref = RateTRef,
+ expiry_timer_ref = undefined,
+ ttl = undefined,
+ stats_timer = rabbit_event:init_stats_timer(),
+ guid_to_channel = dict:new()}).
+
terminate(shutdown, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
@@ -150,8 +168,7 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
declare(Recover, From,
- State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined,
+ State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined,
stats_timer = StatsTimer}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
@@ -162,7 +179,7 @@ declare(Recover, From,
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
- BQS = BQ:init(QName, IsDurable, Recover),
+ BQS = BQ:init(Q, Recover),
State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
@@ -222,6 +239,13 @@ next_state(State) ->
false -> {stop_sync_timer(State2), hibernate}
end.
+backing_queue_module(#amqqueue{arguments = Args}) ->
+ case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of
+ undefined -> {ok, BQM} = application:get_env(backing_queue_module),
+ BQM;
+ _Nodes -> rabbit_mirror_queue_master
+ end.
+
ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) ->
{ok, TRef} = timer:apply_after(
?SYNC_INTERVAL,
@@ -489,7 +513,7 @@ attempt_delivery(#delivery{txn = none,
AckRequired, Message,
(?BASE_MESSAGE_PROPERTIES)#message_properties{
needs_confirming = NeedsConfirming},
- BQS),
+ ChPid, BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
@@ -500,9 +524,9 @@ attempt_delivery(#delivery{txn = Txn,
State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
record_current_channel_tx(ChPid, Txn),
- {true,
- State#q{backing_queue_state =
- BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}.
+ {true, State#q{backing_queue_state =
+ BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES,
+ ChPid, BQS)}}.
deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
@@ -513,7 +537,7 @@ deliver_or_enqueue(Delivery, State) ->
BQS1 = BQ:publish(Message,
(message_properties(State)) #message_properties{
needs_confirming = (MsgSeqNo =/= undefined)},
- BQS),
+ Delivery #delivery.sender, BQS),
{false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})}
end.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 352e76fd0c..d04944f946 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -48,7 +48,7 @@ behaviour_info(callbacks) ->
{stop, 0},
%% Initialise the backing queue and its state.
- {init, 3},
+ {init, 2},
%% Called on queue shutdown when queue isn't being deleted.
{terminate, 1},
@@ -62,12 +62,12 @@ behaviour_info(callbacks) ->
{purge, 1},
%% Publish a message.
- {publish, 3},
+ {publish, 4},
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
- {publish_delivered, 4},
+ {publish_delivered, 5},
%% Drop messages from the head of the queue while the supplied
%% predicate returns true.
@@ -81,7 +81,7 @@ behaviour_info(callbacks) ->
{ack, 2},
%% A publish, but in the context of a transaction.
- {tx_publish, 4},
+ {tx_publish, 5},
%% Acks, but in the context of a transaction.
{tx_ack, 3},
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index dadfc16e66..c97988d0f7 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -34,7 +34,8 @@
-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
cluster/1, force_cluster/1, reset/0, force_reset/0,
- is_clustered/0, empty_ram_only_tables/0, copy_db/1]).
+ is_clustered/0, empty_ram_only_tables/0, copy_db/1,
+ add_table_definition/1]).
-export([table_names/0]).
@@ -210,7 +211,20 @@ table_definitions() ->
{rabbit_queue,
[{record_name, amqqueue},
{attributes, record_info(fields, amqqueue)},
- {match, #amqqueue{name = queue_name_match(), _='_'}}]}].
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]}]
+ ++ plugin_table_definitions().
+
+%% TODO: re-work this abuse of the application env as a register with
+%% the generic registry that should be landing at some point.
+add_table_definition(Def) ->
+ ok = application:set_env(rabbit, plugin_mnesia_tables,
+ [Def | plugin_table_definitions()], infinity).
+
+plugin_table_definitions() ->
+ case application:get_env(rabbit, plugin_mnesia_tables) of
+ {ok, Defs} -> Defs;
+ undefined -> []
+ end.
binding_match() ->
#binding{source = exchange_name_match(),
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index d49c072ca2..a4ad7fbce8 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -117,7 +117,9 @@ check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.
lookup_qpids(QNames) ->
lists:foldl(fun (QName, QPids) ->
case mnesia:dirty_read({rabbit_queue, QName}) of
- [#amqqueue{pid = QPid}] -> [QPid | QPids];
- [] -> QPids
+ [#amqqueue{pid = QPid, extra_pids = EPids}] ->
+ EPids ++ [QPid | QPids];
+ [] ->
+ QPids
end
end, [], QNames).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index adf968cbe9..3343bb99cd 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1833,7 +1833,7 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
true -> 2;
false -> 1
end}, <<>>),
- #message_properties{}, VQN)
+ #message_properties{}, self(), VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
@@ -1851,9 +1851,13 @@ assert_prop(List, Prop, Value) ->
assert_props(List, PropVals) ->
[assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals].
+test_amqqueue(Durable) ->
+ (rabbit_amqqueue:pseudo_queue(test_queue(), self()))
+ #amqqueue { durable = Durable }.
+
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
- VQ = rabbit_variable_queue:init(test_queue(), true, false,
+ VQ = rabbit_variable_queue:init(test_amqqueue(true), false,
fun nop/1, fun nop/1),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
@@ -1912,7 +1916,7 @@ test_dropwhile(VQ0) ->
rabbit_basic:message(
rabbit_misc:r(<<>>, exchange, <<>>),
<<>>, #'P_basic'{}, <<>>),
- #message_properties{expiry = N}, VQN)
+ #message_properties{expiry = N}, self(), VQN)
end, VQ0, lists:seq(1, Count)),
%% drop the first 5 messages
@@ -2029,7 +2033,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ VQ7 = rabbit_variable_queue:init(test_amqqueue(true), true,
fun nop/1, fun nop/1),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
@@ -2046,7 +2050,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ VQ7 = rabbit_variable_queue:init(test_amqqueue(true), true,
fun nop/1, fun nop/1),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2054,7 +2058,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
test_queue_recover() ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
TxID = rabbit_guid:guid(),
- {new, #amqqueue { pid = QPid, name = QName }} =
+ {new, #amqqueue { pid = QPid, name = QName } = Q} =
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
<<>>, #'P_basic'{delivery_mode = 2}, <<>>),
@@ -2077,7 +2081,7 @@ test_queue_recover() ->
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
- VQ1 = rabbit_variable_queue:init(QName, true, true,
+ VQ1 = rabbit_variable_queue:init(Q, true,
fun nop/1, fun nop/1),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 548014be3e..bc1f9d7e4f 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -137,7 +137,8 @@
auto_delete :: boolean(),
exclusive_owner :: rabbit_types:maybe(pid()),
arguments :: rabbit_framing:amqp_table(),
- pid :: rabbit_types:maybe(pid())}).
+ pid :: rabbit_types:maybe(pid()),
+ extra_pids :: [pid()]}).
-type(exchange() ::
#exchange{name :: rabbit_exchange:name(),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0db5116559..d1da2c8917 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,9 +31,9 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/1, delete_and_terminate/1,
- purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
- tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
+-export([init/2, terminate/1, delete_and_terminate/1,
+ purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
+ tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
@@ -42,7 +42,7 @@
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/5]).
+-export([start_msg_store/2, stop_msg_store/0, init/4]).
%%----------------------------------------------------------------------------
%% Definitions:
@@ -409,13 +409,14 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(QueueName, IsDurable, Recover) ->
+init(Queue, Recover) ->
Self = self(),
- init(QueueName, IsDurable, Recover,
+ init(Queue, Recover,
fun (Guids) -> msgs_written_to_disk(Self, Guids) end,
fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end).
-init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(#amqqueue { name = QueueName, durable = IsDurable }, false,
+ MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
init(IsDurable, IndexState, 0, [],
case IsDurable of
@@ -425,7 +426,8 @@ init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
end,
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined));
-init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(#amqqueue { name = QueueName }, true,
+ MsgOnDiskFun, MsgIdxOnDiskFun) ->
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, TRef, Terms1} =
case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
@@ -515,16 +517,18 @@ purge(State = #vqstate { q4 = Q4,
ram_index_count = 0,
persistent_count = PCount1 })}.
-publish(Msg, MsgProps, State) ->
+publish(Msg, MsgProps, _ChPid, State) ->
{_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) ->
+publish_delivered(false, _Msg, _MsgProps, _ChPid,
+ State = #vqstate { len = 0 }) ->
{blank_ack, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
guid = Guid },
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
+ _ChPid,
State = #vqstate { len = 0,
next_seq_id = SeqId,
out_counter = OutCount,
@@ -665,8 +669,8 @@ ack(AckTags, State) ->
{Guids, a(State1)}.
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
- State = #vqstate { durable = IsDurable,
- msg_store_clients = MSCState }) ->
+ _ChPid, State = #vqstate { durable = IsDurable,
+ msg_store_clients = MSCState }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
case IsPersistent andalso IsDurable of