diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 285 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 59 |
4 files changed, 180 insertions, 187 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f0d12ae5de..1c8cf522b9 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -41,8 +41,8 @@ -export([consumers/1, consumers_all/1]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2, tx_commit_msg_store_callback/5, - tx_commit_vq_callback/1, flush_all/2]). +-export([notify_sent/2, unblock/2, maybe_run_queue_via_internal_queue/3, + flush_all/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -65,7 +65,6 @@ -type(qfun(A) :: fun ((amqqueue()) -> A)). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). --type(acktag() :: any()). -spec(start/0 :: () -> 'ok'). -spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) -> @@ -111,9 +110,7 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(tx_commit_msg_store_callback/5 :: - (pid(), boolean(), [message()], [acktag()], {pid(), any()}) -> 'ok'). --spec(tx_commit_vq_callback/1 :: (pid()) -> 'ok'). +-spec(maybe_run_queue_via_internal_queue/3 :: (pid(), atom(), [any()]) -> 'ok'). -spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). @@ -172,7 +169,7 @@ recover_durable_queues(DurableQueues) -> end) of true -> ok = gen_server2:call(Q#amqqueue.pid, - init_variable_queue, + init_internal_queue, infinity), [Q|Acc]; false -> exit(Q#amqqueue.pid, shutdown), @@ -204,7 +201,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) -> end, ok = gen_server2:call( Q#amqqueue.pid, - init_variable_queue, infinity), + init_internal_queue, infinity), Q; [_] -> not_found %% existing Q on stopped node end; @@ -362,12 +359,8 @@ notify_sent(QPid, ChPid) -> unblock(QPid, ChPid) -> gen_server2:pcast(QPid, 7, {unblock, ChPid}). -tx_commit_msg_store_callback(QPid, IsTransientPubs, Pubs, AckTags, From) -> - gen_server2:pcast(QPid, 7, {tx_commit_msg_store_callback, - IsTransientPubs, Pubs, AckTags, From}). - -tx_commit_vq_callback(QPid) -> - gen_server2:pcast(QPid, 7, tx_commit_vq_callback). +maybe_run_queue_via_internal_queue(QPid, Fun, Args) -> + gen_server2:pcast(QPid, 7, {maybe_run_queue_via_internal_queue, Fun, Args}). flush_all(QPids, ChPid) -> safe_pmap_ok( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b92de66743..33ea625c55 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -55,7 +55,9 @@ owner, exclusive_consumer, has_had_consumers, - variable_queue_state, + internal_queue, + internal_queue_state, + internal_queue_timeout_fun, next_msg_id, active_consumers, blocked_consumers, @@ -94,7 +96,7 @@ consumers, transactions, memory, - raw_vq_status + internal_queue_status ]). %%---------------------------------------------------------------------------- @@ -116,7 +118,9 @@ init(Q) -> owner = none, exclusive_consumer = none, has_had_consumers = false, - variable_queue_state = undefined, + internal_queue = rabbit_variable_queue, + internal_queue_state = undefined, + internal_queue_timeout_fun = undefined, next_msg_id = 1, active_consumers = queue:new(), blocked_consumers = queue:new(), @@ -124,30 +128,33 @@ init(Q) -> rate_timer_ref = undefined}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -terminate(shutdown, #q{variable_queue_state = VQS}) -> +terminate(shutdown, #q{internal_queue_state = IQS, + internal_queue = IQ}) -> ok = rabbit_memory_monitor:deregister(self()), - case VQS of + case IQS of undefined -> ok; - _ -> rabbit_variable_queue:terminate(VQS) + _ -> IQ:terminate(IQS) end; -terminate({shutdown, _}, #q{variable_queue_state = VQS}) -> +terminate({shutdown, _}, #q{internal_queue_state = IQS, + internal_queue = IQ}) -> ok = rabbit_memory_monitor:deregister(self()), - case VQS of + case IQS of undefined -> ok; - _ -> rabbit_variable_queue:terminate(VQS) + _ -> IQ:terminate(IQS) end; -terminate(_Reason, State = #q{variable_queue_state = VQS}) -> +terminate(_Reason, State = #q{internal_queue_state = IQS, + internal_queue = IQ}) -> ok = rabbit_memory_monitor:deregister(self()), %% FIXME: How do we cancel active subscriptions? %% Ensure that any persisted tx messages are removed. %% TODO: wait for all in flight tx_commits to complete - case VQS of + case IQS of undefined -> ok; _ -> - VQS1 = rabbit_variable_queue:tx_rollback( + IQS1 = IQ:tx_rollback( lists:concat([PM || #tx { pending_messages = PM } <- - all_tx_record()]), VQS), + all_tx_record()]), IQS), %% Delete from disk first. If we crash at this point, when %% a durable queue, we will be recreated at startup, %% possibly with partial content. The alternative is much @@ -155,7 +162,7 @@ terminate(_Reason, State = #q{variable_queue_state = VQS}) -> %% would then have a race between the disk delete and a %% new queue with the same name being created and %% published to. - rabbit_variable_queue:delete_and_terminate(VQS1) + IQ:delete_and_terminate(IQS1) end, ok = rabbit_amqqueue:internal_delete(qname(State)). @@ -174,18 +181,18 @@ noreply(NewState) -> {NewState1, Timeout} = next_state(NewState), {noreply, NewState1, Timeout}. -next_state(State = #q{variable_queue_state = VQS}) -> - next_state1(ensure_rate_timer(State), - rabbit_variable_queue:needs_sync(VQS)). +next_state(State = #q{internal_queue_state = IQS, + internal_queue = IQ}) -> + next_state1(ensure_rate_timer(State), IQ:needs_sync(IQS)). -next_state1(State = #q{sync_timer_ref = undefined}, true) -> - {start_sync_timer(State), 0}; -next_state1(State, true) -> +next_state1(State = #q{sync_timer_ref = undefined}, Callback = {_Fun, _Args}) -> + {start_sync_timer(State, Callback), 0}; +next_state1(State, {_Fun, _Args}) -> {State, 0}; -next_state1(State = #q{sync_timer_ref = undefined}, false) -> +next_state1(State = #q{sync_timer_ref = undefined}, undefined) -> {State, hibernate}; -next_state1(State, false) -> - {stop_sync_timer(State), hibernate}. +next_state1(State, undefined) -> + {stop_sync_timer(State#q{internal_queue_timeout_fun = undefined}), hibernate}. ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> {ok, TRef} = timer:apply_after(?RATES_REMEASURE_INTERVAL, rabbit_amqqueue, @@ -204,17 +211,20 @@ stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), State#q{rate_timer_ref = undefined}. -start_sync_timer(State = #q{sync_timer_ref = undefined}) -> - {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, rabbit_amqqueue, - tx_commit_vq_callback, [self()]), - State#q{sync_timer_ref = TRef}. +start_sync_timer(State = #q{sync_timer_ref = undefined}, + Callback = {Fun, Args}) -> + {ok, TRef} = timer:apply_after( + ?SYNC_INTERVAL, rabbit_amqqueue, + maybe_run_queue_via_internal_queue, [self(), Fun, Args]), + State#q{sync_timer_ref = TRef, internal_queue_timeout_fun = Callback}. stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), - State#q{sync_timer_ref = undefined}. + State#q{sync_timer_ref = undefined, internal_queue_timeout_fun = undefined}. -assert_invariant(#q{active_consumers = AC, variable_queue_state = VQS}) -> - true = (queue:is_empty(AC) orelse rabbit_variable_queue:is_empty(VQS)). +assert_invariant(#q{active_consumers = AC, internal_queue_state = IQS, + internal_queue = IQ}) -> + true = (queue:is_empty(AC) orelse IQ:is_empty(IQS)). lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -329,74 +339,73 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) -> not IsEmpty. deliver_from_queue_deliver(AckRequired, {false, AutoAcks}, - State = #q { variable_queue_state = VQS }) -> - {{Message, IsDelivered, AckTag, Remaining}, VQS1} = - rabbit_variable_queue:fetch(VQS), + State = #q{internal_queue_state = IQS, + internal_queue = IQ}) -> + {{Message, IsDelivered, AckTag, Remaining}, IQS1} = IQ:fetch(IQS), AutoAcks1 = case AckRequired of true -> AutoAcks; false -> [AckTag | AutoAcks] end, {{Message, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1}, - State #q { variable_queue_state = VQS1 }}. + State #q { internal_queue_state = IQS1 }}. -run_message_queue(State = #q { variable_queue_state = VQS }) -> +run_message_queue(State = #q{internal_queue_state = IQS, + internal_queue = IQ}) -> Funs = { fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3 }, - IsEmpty = rabbit_variable_queue:is_empty(VQS), + IsEmpty = IQ:is_empty(IQS), {{_IsEmpty1, AutoAcks}, State1} = deliver_msgs_to_consumers(Funs, {IsEmpty, []}, State), - VQS1 = rabbit_variable_queue:ack(AutoAcks, State1 #q.variable_queue_state), - State1 #q { variable_queue_state = VQS1 }. + IQS1 = IQ:ack(AutoAcks, State1 #q.internal_queue_state), + State1 #q { internal_queue_state = IQS1 }. -attempt_delivery(none, _ChPid, Message, State) -> +attempt_delivery(none, _ChPid, Message, State = #q{internal_queue = IQ}) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1) -> {AckTag, State2} = case AckRequired of true -> - {AckTag1, VQS} = - rabbit_variable_queue:publish_delivered( - Message, State1 #q.variable_queue_state), - {AckTag1, State1 #q { variable_queue_state = VQS }}; + {AckTag1, IQS} = + IQ:publish_delivered( + Message, State1 #q.internal_queue_state), + {AckTag1, State1 #q { internal_queue_state = IQS }}; false -> {noack, State1} end, {{Message, false, AckTag}, true, State2} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, State) -> - VQS = rabbit_variable_queue:tx_publish( - Message, State #q.variable_queue_state), +attempt_delivery(Txn, ChPid, Message, State = #q{internal_queue = IQ}) -> + IQS = IQ:tx_publish(Message, State #q.internal_queue_state), record_pending_message(Txn, ChPid, Message), - {true, State #q { variable_queue_state = VQS }}. + {true, State #q { internal_queue_state = IQS }}. -deliver_or_enqueue(Txn, ChPid, Message, State) -> +deliver_or_enqueue(Txn, ChPid, Message, State = #q{internal_queue = IQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - {_SeqId, VQS} = rabbit_variable_queue:publish( - Message, State #q.variable_queue_state), - {false, NewState #q { variable_queue_state = VQS }} + {_SeqId, IQS} = IQ:publish(Message, State #q.internal_queue_state), + {false, NewState #q { internal_queue_state = IQS }} end. %% all these messages have already been delivered at least once and %% not ack'd, but need to be either redelivered or requeued deliver_or_requeue_n([], State) -> State; -deliver_or_requeue_n(MsgsWithAcks, State) -> +deliver_or_requeue_n(MsgsWithAcks, State = #q{internal_queue = IQ}) -> Funs = { fun deliver_or_requeue_msgs_pred/2, fun deliver_or_requeue_msgs_deliver/3 }, {{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} = deliver_msgs_to_consumers( Funs, {length(MsgsWithAcks), [], MsgsWithAcks}, State), - VQS = rabbit_variable_queue:ack(AutoAcks, NewState #q.variable_queue_state), + IQS = IQ:ack(AutoAcks, NewState #q.internal_queue_state), case OutstandingMsgs of - [] -> NewState #q { variable_queue_state = VQS }; - _ -> VQS1 = rabbit_variable_queue:requeue(OutstandingMsgs, VQS), - NewState #q { variable_queue_state = VQS1 } + [] -> NewState #q { internal_queue_state = IQS }; + _ -> IQS1 = IQ:requeue(OutstandingMsgs, IQS), + NewState #q { internal_queue_state = IQS1 } end. deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) -> @@ -508,6 +517,16 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. +maybe_run_queue_via_internal_queue(Fun, Args, + State = #q{internal_queue_state = IQS, + internal_queue = IQ}) -> + {RunQueue, IQS1} = apply(IQ, Fun, Args ++ [IQS]), + State1 = State#q{internal_queue_state = IQS1}, + case RunQueue of + true -> run_message_queue(State1); + false -> State1 + end. + lookup_tx(Txn) -> case get({txn, Txn}) of undefined -> #tx{ch_pid = none, @@ -537,35 +556,31 @@ record_pending_acks(Txn, ChPid, MsgIds) -> store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], ch_pid = ChPid}). -commit_transaction(Txn, From, State) -> - #tx { ch_pid = ChPid, - pending_messages = PendingMessages, - pending_acks = PendingAcks - } = lookup_tx(Txn), +commit_transaction(Txn, From, State = #q{internal_queue = IQ}) -> + #tx{ch_pid = ChPid, pending_messages = PendingMessages, + pending_acks = PendingAcks} = lookup_tx(Txn), PendingMessagesOrdered = lists:reverse(PendingMessages), PendingAcksOrdered = lists:append(PendingAcks), Acks = case lookup_ch(ChPid) of - not_found -> []; - C = #cr { unacked_messages = UAM } -> + not_found -> + []; + C = #cr{unacked_messages = UAM} -> {MsgsWithAcks, Remaining} = collect_messages(PendingAcksOrdered, UAM), store_ch_record(C#cr{unacked_messages = Remaining}), [AckTag || {_Message, AckTag} <- MsgsWithAcks] end, - {RunQueue, VQS} = - rabbit_variable_queue:tx_commit( - PendingMessagesOrdered, Acks, From, State #q.variable_queue_state), + {RunQueue, IQS} = IQ:tx_commit(PendingMessagesOrdered, Acks, From, + State#q.internal_queue_state), erase_tx(Txn), - {RunQueue, State #q { variable_queue_state = VQS }}. + {RunQueue, State#q{internal_queue_state = IQS}}. -rollback_transaction(Txn, State) -> - #tx { pending_messages = PendingMessages - } = lookup_tx(Txn), - VQS = rabbit_variable_queue:tx_rollback(PendingMessages, - State #q.variable_queue_state), +rollback_transaction(Txn, State = #q{internal_queue = IQ}) -> + #tx{pending_messages = PendingMessages} = lookup_tx(Txn), + IQS = IQ:tx_rollback(PendingMessages, State #q.internal_queue_state), erase_tx(Txn), - State #q { variable_queue_state = VQS }. + State#q{internal_queue_state = IQS}. collect_messages(MsgIds, UAM) -> lists:mapfoldl( @@ -592,8 +607,8 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = none}) -> ''; i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) -> ConsumerTag; -i(messages_ready, #q{variable_queue_state = VQS}) -> - rabbit_variable_queue:len(VQS); +i(messages_ready, #q{internal_queue_state = IQS, internal_queue = IQ}) -> + IQ:len(IQS); i(messages_unacknowledged, _) -> lists:sum([dict:size(UAM) || #cr{unacked_messages = UAM} <- all_ch_record()]); @@ -614,25 +629,24 @@ i(transactions, _) -> i(memory, _) -> {memory, M} = process_info(self(), memory), M; -i(raw_vq_status, State) -> - rabbit_variable_queue:status(State#q.variable_queue_state); +i(internal_queue_status, #q{internal_queue_state = IQS, internal_queue = IQ}) -> + IQ:status(IQS); i(Item, _) -> throw({bad_argument, Item}). %--------------------------------------------------------------------------- -handle_call(init_variable_queue, From, State = - #q{variable_queue_state = undefined, +handle_call(init_internal_queue, From, State = + #q{internal_queue_state = undefined, internal_queue = IQ, q = #amqqueue{name = QName, durable = IsDurable}}) -> gen_server2:reply(From, ok), PersistentStore = case IsDurable of true -> ?PERSISTENT_MSG_STORE; false -> ?TRANSIENT_MSG_STORE end, - noreply(State #q { variable_queue_state = - rabbit_variable_queue:init(QName, PersistentStore) }); + noreply(State#q{internal_queue_state = IQ:init(QName, PersistentStore)}); -handle_call(init_variable_queue, _From, State) -> +handle_call(init_internal_queue, _From, State) -> reply(ok, State); handle_call(sync, _From, State) -> @@ -697,27 +711,25 @@ handle_call({notify_down, ChPid}, _From, State) -> end; handle_call({basic_get, ChPid, NoAck}, _From, - State = #q{q = #amqqueue{name = QName}, - next_msg_id = NextId, - variable_queue_state = VQS - }) -> - case rabbit_variable_queue:fetch(VQS) of - {empty, VQS1} -> reply(empty, State #q { variable_queue_state = VQS1 }); - {{Message, IsDelivered, AckTag, Remaining}, VQS1} -> + State = #q{q = #amqqueue{name = QName}, next_msg_id = NextId, + internal_queue_state = IQS, internal_queue = IQ}) -> + case IQ:fetch(IQS) of + {empty, IQS1} -> reply(empty, State #q { internal_queue_state = IQS1 }); + {{Message, IsDelivered, AckTag, Remaining}, IQS1} -> AckRequired = not(NoAck), - VQS2 = + IQS2 = case AckRequired of true -> C = #cr{unacked_messages = UAM} = ch_record(ChPid), NewUAM = dict:store(NextId, {Message, AckTag}, UAM), store_ch_record(C#cr{unacked_messages = NewUAM}), - VQS1; + IQS1; false -> - rabbit_variable_queue:ack([AckTag], VQS1) + IQ:ack([AckTag], IQS1) end, Msg = {QName, self(), NextId, IsDelivered, Message}, reply({ok, Remaining, Msg}, - State #q { next_msg_id = NextId + 1, variable_queue_state = VQS2 }) + State #q { next_msg_id = NextId + 1, internal_queue_state = IQS2 }) end; handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, @@ -797,14 +809,14 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, end; handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, - variable_queue_state = VQS, + internal_queue_state = IQS, + internal_queue = IQ, active_consumers = ActiveConsumers}) -> - Length = rabbit_variable_queue:len(VQS), - reply({ok, Name, Length, queue:len(ActiveConsumers)}, State); + reply({ok, Name, IQ:len(IQS), queue:len(ActiveConsumers)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, - State = #q { variable_queue_state = VQS }) -> - Length = rabbit_variable_queue:len(VQS), + State = #q{internal_queue_state = IQS, internal_queue = IQ}) -> + Length = IQ:len(IQS), IsEmpty = Length == 0, IsUnused = is_unused(State), if @@ -816,9 +828,9 @@ handle_call({delete, IfUnused, IfEmpty}, _From, {stop, normal, {ok, Length}, State} end; -handle_call(purge, _From, State) -> - {Count, VQS} = rabbit_variable_queue:purge(State #q.variable_queue_state), - reply({ok, Count}, State #q { variable_queue_state = VQS }); +handle_call(purge, _From, State = #q{internal_queue = IQ}) -> + {Count, IQS} = IQ:purge(State#q.internal_queue_state), + reply({ok, Count}, State#q{internal_queue_state = IQS}); handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> @@ -848,7 +860,7 @@ handle_cast({deliver, Txn, Message, ChPid}, State) -> {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), noreply(NewState); -handle_cast({ack, Txn, MsgIds, ChPid}, State) -> +handle_cast({ack, Txn, MsgIds, ChPid}, State = #q{internal_queue = IQ}) -> case lookup_ch(ChPid) of not_found -> noreply(State); @@ -856,11 +868,10 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> case Txn of none -> {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM), - VQS = rabbit_variable_queue:ack( - [AckTag || {_Message, AckTag} <- MsgWithAcks], - State #q.variable_queue_state), + IQS = IQ:ack([AckTag || {_Message, AckTag} <- MsgWithAcks], + State #q.internal_queue_state), store_ch_record(C#cr{unacked_messages = Remaining}), - noreply(State #q { variable_queue_state = VQS }); + noreply(State #q { internal_queue_state = IQS }); _ -> record_pending_acks(Txn, ChPid, MsgIds), noreply(State) @@ -894,23 +905,8 @@ handle_cast({notify_sent, ChPid}, State) -> C#cr{unsent_message_count = Count - 1} end)); -handle_cast({tx_commit_msg_store_callback, IsTransientPubs, Pubs, AckTags, From}, - State = #q{variable_queue_state = VQS}) -> - {RunQueue, VQS1} = rabbit_variable_queue:tx_commit_from_msg_store( - IsTransientPubs, Pubs, AckTags, From, VQS), - State1 = State#q{variable_queue_state = VQS1}, - noreply(case RunQueue of - true -> run_message_queue(State1); - false -> State1 - end); - -handle_cast(tx_commit_vq_callback, State = #q{variable_queue_state = VQS}) -> - {RunQueue, VQS1} = rabbit_variable_queue:tx_commit_from_vq(VQS), - State1 = State#q{variable_queue_state = VQS1}, - noreply(case RunQueue of - true -> run_message_queue(State1); - false -> State1 - end); +handle_cast({maybe_run_queue_via_internal_queue, Fun, Args}, State) -> + noreply(maybe_run_queue_via_internal_queue(Fun, Args, State)); handle_cast({limit, ChPid, LimiterPid}, State) -> noreply( @@ -932,21 +928,21 @@ handle_cast({flush, ChPid}, State) -> ok = rabbit_channel:flushed(ChPid, self()), noreply(State); -handle_cast(remeasure_rates, State = #q{variable_queue_state = VQS}) -> - VQS1 = rabbit_variable_queue:remeasure_rates(VQS), - RamDuration = rabbit_variable_queue:ram_duration(VQS1), +handle_cast(remeasure_rates, State = #q{internal_queue_state = IQS, + internal_queue = IQ}) -> + IQS1 = IQ:remeasure_rates(IQS), + RamDuration = IQ:ram_duration(IQS1), DesiredDuration = rabbit_memory_monitor:report_queue_duration(self(), RamDuration), - VQS2 = rabbit_variable_queue:set_queue_ram_duration_target( - DesiredDuration, VQS1), + IQS2 = IQ:set_queue_ram_duration_target(DesiredDuration, IQS1), noreply(State#q{rate_timer_ref = just_measured, - variable_queue_state = VQS2}); + internal_queue_state = IQS2}); handle_cast({set_queue_duration, Duration}, - State = #q{variable_queue_state = VQS}) -> - VQS1 = rabbit_variable_queue:set_queue_ram_duration_target( - Duration, VQS), - noreply(State#q{variable_queue_state = VQS1}); + State = #q{internal_queue_state = IQS, + internal_queue = IQ}) -> + IQS1 = IQ:set_queue_ram_duration_target(Duration, IQS), + noreply(State#q{internal_queue_state = IQS1}); handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -971,13 +967,12 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> {stop, NewState} -> {stop, normal, NewState} end; -handle_info(timeout, State = #q{variable_queue_state = VQS}) -> - {RunQueue, VQS1} = rabbit_variable_queue:tx_commit_from_vq(VQS), - State1 = State#q{variable_queue_state = VQS1}, - noreply(case RunQueue of - true -> run_message_queue(State1); - false -> State1 - end); +handle_info(timeout, State = #q{internal_queue_timeout_fun = undefined}) -> + noreply(State); + +handle_info(timeout, State = #q{internal_queue_timeout_fun = {Fun, Args}}) -> + noreply(maybe_run_queue_via_internal_queue( + Fun, Args, State#q{internal_queue_timeout_fun = undefined})); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; @@ -986,11 +981,11 @@ handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. -handle_pre_hibernate(State = #q{ variable_queue_state = VQS }) -> - VQS1 = rabbit_variable_queue:flush_journal(VQS), +handle_pre_hibernate(State = #q{internal_queue_state = IQS, + internal_queue = IQ}) -> + IQS1 = IQ:handle_pre_hibernate(IQS), %% no activity for a while == 0 egress and ingress rates DesiredDuration = rabbit_memory_monitor:report_queue_duration(self(), infinity), - VQS2 = rabbit_variable_queue:set_queue_ram_duration_target( - DesiredDuration, VQS1), - {hibernate, stop_rate_timer(State#q{variable_queue_state = VQS2})}. + IQS2 = IQ:set_queue_ram_duration_target(DesiredDuration, IQS1), + {hibernate, stop_rate_timer(State#q{internal_queue_state = IQS2})}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 6b8998c2e6..22138bf112 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1409,7 +1409,7 @@ test_variable_queue_dynamic_duration_change() -> {_SeqIds1, VQ7} = variable_queue_publish(true, 20, VQ6), {VQ8, AckTags1} = variable_queue_fetch(20, true, false, 20, VQ7), VQ9 = rabbit_variable_queue:ack(AckTags1, VQ8), - VQ10 = rabbit_variable_queue:flush_journal(VQ9), + VQ10 = rabbit_variable_queue:handle_pre_hibernate(VQ9), {empty, VQ11} = rabbit_variable_queue:fetch(VQ10), rabbit_variable_queue:terminate(VQ11), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 9d33cc7ce5..1934fafc3e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -35,8 +35,9 @@ set_queue_ram_duration_target/2, remeasure_rates/1, ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1, delete_and_terminate/1, requeue/2, tx_publish/2, tx_rollback/2, - tx_commit/4, tx_commit_from_msg_store/5, tx_commit_from_vq/1, - needs_sync/1, flush_journal/1, status/1]). + tx_commit/4, needs_sync/1, handle_pre_hibernate/1, status/1]). + +-export([tx_commit_post_msg_store/5, tx_commit_index/1]). %% internal %%---------------------------------------------------------------------------- %% Definitions: @@ -242,12 +243,12 @@ -spec(tx_rollback/2 :: ([msg_id()], vqstate()) -> vqstate()). -spec(tx_commit/4 :: ([msg_id()], [ack()], {pid(), any()}, vqstate()) -> {boolean(), vqstate()}). --spec(tx_commit_from_msg_store/5 :: +-spec(tx_commit_post_msg_store/5 :: (boolean(), [msg_id()], [ack()], {pid(), any()}, vqstate()) -> {boolean(), vqstate()}). --spec(tx_commit_from_vq/1 :: (vqstate()) -> {boolean(), vqstate()}). --spec(needs_sync/1 :: (vqstate()) -> boolean()). --spec(flush_journal/1 :: (vqstate()) -> vqstate()). +-spec(tx_commit_index/1 :: (vqstate()) -> {boolean(), vqstate()}). +-spec(needs_sync/1 :: (vqstate()) -> ('undefined' | {atom(), [any()]})). +-spec(handle_pre_hibernate/1 :: (vqstate()) -> vqstate()). -spec(status/1 :: (vqstate()) -> [{atom(), any()}]). -endif. @@ -505,23 +506,26 @@ delete_and_terminate(State) -> persistent_store = PersistentStore, transient_threshold = TransientThreshold }} = purge(State), - IndexState1 = + %% flushing here is good because it deletes all full segments, + %% leaving only partial segments around. + IndexState1 = rabbit_queue_index:flush_journal(IndexState), + IndexState2 = case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id( - IndexState) of - {N, N, IndexState2} -> - IndexState2; - {DeltaSeqId, NextSeqId, IndexState2} -> - {_DeleteCount, IndexState3} = + IndexState1) of + {N, N, IndexState3} -> + IndexState3; + {DeltaSeqId, NextSeqId, IndexState3} -> + {_DeleteCount, IndexState4} = delete1(PersistentStore, TransientThreshold, NextSeqId, 0, - DeltaSeqId, IndexState2), - IndexState3 + DeltaSeqId, IndexState3), + IndexState4 end, - IndexState4 = rabbit_queue_index:terminate_and_erase(IndexState1), + IndexState5 = rabbit_queue_index:terminate_and_erase(IndexState2), rabbit_msg_store:delete_client(PersistentStore, PRef), rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef), rabbit_msg_store:client_terminate(MSCStateP), rabbit_msg_store:client_terminate(MSCStateT), - State1 #vqstate { index_state = IndexState4 }. + State1 #vqstate { index_state = IndexState5 }. %% [{Msg, AckTag}] %% We guarantee that after fetch, only persistent msgs are left on @@ -595,19 +599,20 @@ tx_commit(Pubs, AckTags, From, State = case IsTransientPubs orelse ?TRANSIENT_MSG_STORE == PersistentStore of true -> - tx_commit_from_msg_store( + tx_commit_post_msg_store( IsTransientPubs, Pubs, AckTags, From, State); false -> Self = self(), ok = rabbit_msg_store:sync( ?PERSISTENT_MSG_STORE, PersistentMsgIds, - fun () -> ok = rabbit_amqqueue:tx_commit_msg_store_callback( - Self, IsTransientPubs, Pubs, AckTags, From) + fun () -> ok = rabbit_amqqueue:maybe_run_queue_via_internal_queue( + Self, tx_commit_post_msg_store, + [IsTransientPubs, Pubs, AckTags, From]) end), {false, State} end. -tx_commit_from_msg_store(IsTransientPubs, Pubs, AckTags, From, State = +tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, From, State = #vqstate { on_sync = OnSync = {SAcks, SPubs, SFroms}, persistent_store = PersistentStore }) -> %% If we are a non-durable queue, or (no persisent pubs, and no @@ -617,17 +622,17 @@ tx_commit_from_msg_store(IsTransientPubs, Pubs, AckTags, From, State = case PersistentStore == ?TRANSIENT_MSG_STORE orelse (IsTransientPubs andalso [] == DiskAcks) of true -> {Res, State1} = - tx_commit_from_vq(State #vqstate { - on_sync = {[], [Pubs], [From]} }), + tx_commit_index(State #vqstate { + on_sync = {[], [Pubs], [From]} }), {Res, State1 #vqstate { on_sync = OnSync }}; false -> {false, State #vqstate { on_sync = { [DiskAcks | SAcks], [Pubs | SPubs], [From | SFroms] }}} end. -tx_commit_from_vq(State = #vqstate { on_sync = {_, _, []} }) -> +tx_commit_index(State = #vqstate { on_sync = {_, _, []} }) -> {false, State}; -tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, +tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, persistent_store = PersistentStore }) -> Acks = lists:flatten(SAcks), State1 = ack(Acks, State), @@ -656,11 +661,11 @@ tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }}. needs_sync(#vqstate { on_sync = {_, _, []} }) -> - false; + undefined; needs_sync(_) -> - true. + {tx_commit_index, []}. -flush_journal(State = #vqstate { index_state = IndexState }) -> +handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush_journal(IndexState) }. |
