diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-10 17:40:23 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-10 17:40:23 +0200 |
| commit | a9462aa8144cb3afe68054f01495b2296f40136e (patch) | |
| tree | 7776e0168d30994344f9e8a0766a25518dac385b /src | |
| parent | 06cde9069282cbb6169da10352ddff8f82825feb (diff) | |
| download | rabbitmq-server-git-a9462aa8144cb3afe68054f01495b2296f40136e.tar.gz | |
handles queue-mode policy/argument
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_priority_queue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 55 |
6 files changed, 85 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 15fde37c9c..250e6fa7eb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -317,7 +317,8 @@ process_args_policy(State = #q{q = Q, {<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2}, {<<"message-ttl">>, fun res_min/2, fun init_ttl/2}, {<<"max-length">>, fun res_min/2, fun init_max_length/2}, - {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}], + {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}, + {<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}], drop_expired_msgs( lists:foldl(fun({Name, Resolve, Fun}, StateN) -> Fun(args_policy_lookup(Name, Resolve, Q), StateN) @@ -360,6 +361,13 @@ init_max_bytes(MaxBytes, State) -> {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}), State1. +init_queue_mode(undefined, State) -> + State; +init_queue_mode(Mode, State = #q {backing_queue = BQ, + backing_queue_state = BQS}) -> + BQS1 = BQ:set_queue_mode(binary_to_atom(Mode, utf8), BQS), + State#q{backing_queue_state = BQS1}. + reply(Reply, NewState) -> {NewState1, Timeout} = next_state(NewState), {reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index a03bda13c9..bb91f927b2 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -47,6 +47,8 @@ -type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)). -type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())). +-type(queue_mode() :: atom()). + -spec(info_keys/0 :: () -> rabbit_types:info_keys()). %% Called on startup with a list of durable queue names. The queues @@ -246,6 +248,8 @@ -callback is_duplicate(rabbit_types:basic_message(), state()) -> {boolean(), state()}. +-callback set_queue_mode(queue_mode(), state()) -> state(). + -else. -export([behaviour_info/1]). @@ -260,7 +264,7 @@ behaviour_info(callbacks) -> {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, - {info, 2}, {invoke, 3}, {is_duplicate, 2}] ; + {info, 2}, {invoke, 3}, {is_duplicate, 2}, {set_queue_mode, 2}] ; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 7890128872..128d52851c 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -22,7 +22,7 @@ len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, - msg_rates/1, info/2, invoke/3, is_duplicate/2]). + msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2]). -export([start/1, stop/0, delete_crashed/1]). @@ -444,6 +444,13 @@ is_duplicate(Message = #basic_message { id = MsgId }, confirmed = [MsgId | Confirmed] }} end. +set_queue_mode(Mode, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {set_queue_mode, Mode}), + BQS1 = BQ:set_queue_mode(Mode, BQS), + State #state { backing_queue_state = BQS1 }. + %% --------------------------------------------------------------------------- %% Other exported functions %% --------------------------------------------------------------------------- diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 7f309ab0b7..1ab4d0f16b 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -921,7 +921,12 @@ process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:delete_and_terminate(Reason, BQS), - {stop, State #state { backing_queue_state = undefined }}. + {stop, State #state { backing_queue_state = undefined }}; +process_instruction({set_queue_mode, Mode}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQS1 = BQ:set_queue_mode(Mode, BQS), + {ok, State #state { backing_queue_state = BQS1 }}. maybe_flow_ack(ChPid, flow) -> credit_flow:ack(ChPid); maybe_flow_ack(_ChPid, noflow) -> ok. diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index a839badfc4..9fc8321073 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -39,7 +39,7 @@ ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, - info/2, invoke/3, is_duplicate/2]). + info/2, invoke/3, is_duplicate/2, set_queue_mode/2]). -record(state, {bq, bqss}). -record(passthrough, {bq, bqs}). @@ -395,6 +395,11 @@ is_duplicate(Msg, State = #state{bq = BQ}) -> is_duplicate(Msg, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough2(is_duplicate(Msg, BQS)). +set_queue_mode(Mode, State = #state{bq = BQ}) -> + foreach1(fun (_P, BQSN) -> BQ:set_queue_mode(Mode, BQSN) end, State); +set_queue_mode(Mode, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(set_queue_mode(Mode, BQS)). + %%---------------------------------------------------------------------------- bq() -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a0e28e88f6..2fb5020c1b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -23,7 +23,8 @@ ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, - info/2, invoke/3, is_duplicate/2, multiple_routing_keys/0]). + info/2, invoke/3, is_duplicate/2, set_queue_mode/2, + multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -300,7 +301,10 @@ disk_read_count, disk_write_count, - io_batch_size + io_batch_size, + + %% default queue or lazy queue + mode }). -record(rates, { in, out, ack_in, ack_out, timestamp }). @@ -397,7 +401,8 @@ disk_read_count :: non_neg_integer(), disk_write_count :: non_neg_integer(), - io_batch_size :: pos_integer()}). + io_batch_size :: pos_integer(), + mode :: 'default' | 'lazy' }). %% Duplicated from rabbit_backing_queue -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). @@ -898,6 +903,46 @@ invoke( _, _, State) -> State. is_duplicate(_Msg, State) -> {false, State}. +set_queue_mode(Mode, State = #vqstate { mode = Mode }) -> + State; +set_queue_mode(lazy, State = #vqstate { + target_ram_count = TargetRamCount }) -> + %% To become a lazy queue we need to page everything to disk first. + State1 = convert_to_lazy(State), + %% restore the original target_ram_count + a(State1 #vqstate { mode = lazy, target_ram_count = TargetRamCount }); +set_queue_mode(default, State) -> + %% becoming a default queue means loading messages from disk like + %% whene a queue is recovered. + a(maybe_deltas_to_betas(State #vqstate { mode = default })); +set_queue_mode(_, State) -> + State. + +convert_to_lazy(State = #vqstate { ram_msg_count = 0}) -> + State; +convert_to_lazy(State) -> + State1 = set_ram_duration_target(0, State), + %% When pushing messages to disk, we might have been blocked by + %% the msg_store, so we need to see if we have to wait for more + %% credit, and the keep paging messages. + %% + %% The amqqueue_process could have taken care of this, but between + %% the time it receives the bump_credit msg and calls BQ:resume to + %% keep paging messages to disk, some other request may arrive to + %% the BQ which at this moment is not in a proper state for a lazy + %% BQ (unless all messages have been paged to disk already). + wait_for_msg_store_credit(), + convert_to_lazy(State1). + +wait_for_msg_store_credit() -> + case credit_flow:blocked() of + true -> receive + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg) + end; + false -> ok + end. + %% Get the Timestamp property of the first msg, if present. This is %% the one with the oldest timestamp among the heads of the pending %% acks and unread queues. We can't check disk_pending_acks as these @@ -1220,7 +1265,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, disk_read_count = 0, disk_write_count = 0, - io_batch_size = IoBatchSize }, + io_batch_size = IoBatchSize, + + mode = default }, a(maybe_deltas_to_betas(State)). blank_rates(Now) -> |
