summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-10-10 17:40:23 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-10-10 17:40:23 +0200
commita9462aa8144cb3afe68054f01495b2296f40136e (patch)
tree7776e0168d30994344f9e8a0766a25518dac385b /src
parent06cde9069282cbb6169da10352ddff8f82825feb (diff)
downloadrabbitmq-server-git-a9462aa8144cb3afe68054f01495b2296f40136e.tar.gz
handles queue-mode policy/argument
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl10
-rw-r--r--src/rabbit_backing_queue.erl6
-rw-r--r--src/rabbit_mirror_queue_master.erl9
-rw-r--r--src/rabbit_mirror_queue_slave.erl7
-rw-r--r--src/rabbit_priority_queue.erl7
-rw-r--r--src/rabbit_variable_queue.erl55
6 files changed, 85 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 15fde37c9c..250e6fa7eb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -317,7 +317,8 @@ process_args_policy(State = #q{q = Q,
{<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2},
{<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
{<<"max-length">>, fun res_min/2, fun init_max_length/2},
- {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}],
+ {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2},
+ {<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}],
drop_expired_msgs(
lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
Fun(args_policy_lookup(Name, Resolve, Q), StateN)
@@ -360,6 +361,13 @@ init_max_bytes(MaxBytes, State) ->
{_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}),
State1.
+init_queue_mode(undefined, State) ->
+ State;
+init_queue_mode(Mode, State = #q {backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQS1 = BQ:set_queue_mode(binary_to_atom(Mode, utf8), BQS),
+ State#q{backing_queue_state = BQS1}.
+
reply(Reply, NewState) ->
{NewState1, Timeout} = next_state(NewState),
{reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index a03bda13c9..bb91f927b2 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -47,6 +47,8 @@
-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)).
-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
+-type(queue_mode() :: atom()).
+
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
%% Called on startup with a list of durable queue names. The queues
@@ -246,6 +248,8 @@
-callback is_duplicate(rabbit_types:basic_message(), state())
-> {boolean(), state()}.
+-callback set_queue_mode(queue_mode(), state()) -> state().
+
-else.
-export([behaviour_info/1]).
@@ -260,7 +264,7 @@ behaviour_info(callbacks) ->
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
{handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1},
- {info, 2}, {invoke, 3}, {is_duplicate, 2}] ;
+ {info, 2}, {invoke, 3}, {is_duplicate, 2}, {set_queue_mode, 2}] ;
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 7890128872..128d52851c 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -22,7 +22,7 @@
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
- msg_rates/1, info/2, invoke/3, is_duplicate/2]).
+ msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2]).
-export([start/1, stop/0, delete_crashed/1]).
@@ -444,6 +444,13 @@ is_duplicate(Message = #basic_message { id = MsgId },
confirmed = [MsgId | Confirmed] }}
end.
+set_queue_mode(Mode, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {set_queue_mode, Mode}),
+ BQS1 = BQ:set_queue_mode(Mode, BQS),
+ State #state { backing_queue_state = BQS1 }.
+
%% ---------------------------------------------------------------------------
%% Other exported functions
%% ---------------------------------------------------------------------------
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 7f309ab0b7..1ab4d0f16b 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -921,7 +921,12 @@ process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
BQ:delete_and_terminate(Reason, BQS),
- {stop, State #state { backing_queue_state = undefined }}.
+ {stop, State #state { backing_queue_state = undefined }};
+process_instruction({set_queue_mode, Mode},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ BQS1 = BQ:set_queue_mode(Mode, BQS),
+ {ok, State #state { backing_queue_state = BQS1 }}.
maybe_flow_ack(ChPid, flow) -> credit_flow:ack(ChPid);
maybe_flow_ack(_ChPid, noflow) -> ok.
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index a839badfc4..9fc8321073 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -39,7 +39,7 @@
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
- info/2, invoke/3, is_duplicate/2]).
+ info/2, invoke/3, is_duplicate/2, set_queue_mode/2]).
-record(state, {bq, bqss}).
-record(passthrough, {bq, bqs}).
@@ -395,6 +395,11 @@ is_duplicate(Msg, State = #state{bq = BQ}) ->
is_duplicate(Msg, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough2(is_duplicate(Msg, BQS)).
+set_queue_mode(Mode, State = #state{bq = BQ}) ->
+ foreach1(fun (_P, BQSN) -> BQ:set_queue_mode(Mode, BQSN) end, State);
+set_queue_mode(Mode, State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough1(set_queue_mode(Mode, BQS)).
+
%%----------------------------------------------------------------------------
bq() ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index a0e28e88f6..2fb5020c1b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -23,7 +23,8 @@
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
- info/2, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
+ info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
+ multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -300,7 +301,10 @@
disk_read_count,
disk_write_count,
- io_batch_size
+ io_batch_size,
+
+ %% default queue or lazy queue
+ mode
}).
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -397,7 +401,8 @@
disk_read_count :: non_neg_integer(),
disk_write_count :: non_neg_integer(),
- io_batch_size :: pos_integer()}).
+ io_batch_size :: pos_integer(),
+ mode :: 'default' | 'lazy' }).
%% Duplicated from rabbit_backing_queue
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
@@ -898,6 +903,46 @@ invoke( _, _, State) -> State.
is_duplicate(_Msg, State) -> {false, State}.
+set_queue_mode(Mode, State = #vqstate { mode = Mode }) ->
+ State;
+set_queue_mode(lazy, State = #vqstate {
+ target_ram_count = TargetRamCount }) ->
+ %% To become a lazy queue we need to page everything to disk first.
+ State1 = convert_to_lazy(State),
+ %% restore the original target_ram_count
+ a(State1 #vqstate { mode = lazy, target_ram_count = TargetRamCount });
+set_queue_mode(default, State) ->
+ %% becoming a default queue means loading messages from disk like
+ %% whene a queue is recovered.
+ a(maybe_deltas_to_betas(State #vqstate { mode = default }));
+set_queue_mode(_, State) ->
+ State.
+
+convert_to_lazy(State = #vqstate { ram_msg_count = 0}) ->
+ State;
+convert_to_lazy(State) ->
+ State1 = set_ram_duration_target(0, State),
+ %% When pushing messages to disk, we might have been blocked by
+ %% the msg_store, so we need to see if we have to wait for more
+ %% credit, and the keep paging messages.
+ %%
+ %% The amqqueue_process could have taken care of this, but between
+ %% the time it receives the bump_credit msg and calls BQ:resume to
+ %% keep paging messages to disk, some other request may arrive to
+ %% the BQ which at this moment is not in a proper state for a lazy
+ %% BQ (unless all messages have been paged to disk already).
+ wait_for_msg_store_credit(),
+ convert_to_lazy(State1).
+
+wait_for_msg_store_credit() ->
+ case credit_flow:blocked() of
+ true -> receive
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg)
+ end;
+ false -> ok
+ end.
+
%% Get the Timestamp property of the first msg, if present. This is
%% the one with the oldest timestamp among the heads of the pending
%% acks and unread queues. We can't check disk_pending_acks as these
@@ -1220,7 +1265,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
disk_read_count = 0,
disk_write_count = 0,
- io_batch_size = IoBatchSize },
+ io_batch_size = IoBatchSize,
+
+ mode = default },
a(maybe_deltas_to_betas(State)).
blank_rates(Now) ->