summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl21
-rw-r--r--src/rabbit_amqqueue_process.erl285
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/rabbit_variable_queue.erl59
4 files changed, 180 insertions, 187 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f0d12ae5de..1c8cf522b9 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -41,8 +41,8 @@
-export([consumers/1, consumers_all/1]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
--export([notify_sent/2, unblock/2, tx_commit_msg_store_callback/5,
- tx_commit_vq_callback/1, flush_all/2]).
+-export([notify_sent/2, unblock/2, maybe_run_queue_via_internal_queue/3,
+ flush_all/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -65,7 +65,6 @@
-type(qfun(A) :: fun ((amqqueue()) -> A)).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
--type(acktag() :: any()).
-spec(start/0 :: () -> 'ok').
-spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) ->
@@ -111,9 +110,7 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(tx_commit_msg_store_callback/5 ::
- (pid(), boolean(), [message()], [acktag()], {pid(), any()}) -> 'ok').
--spec(tx_commit_vq_callback/1 :: (pid()) -> 'ok').
+-spec(maybe_run_queue_via_internal_queue/3 :: (pid(), atom(), [any()]) -> 'ok').
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
@@ -172,7 +169,7 @@ recover_durable_queues(DurableQueues) ->
end) of
true ->
ok = gen_server2:call(Q#amqqueue.pid,
- init_variable_queue,
+ init_internal_queue,
infinity),
[Q|Acc];
false -> exit(Q#amqqueue.pid, shutdown),
@@ -204,7 +201,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
end,
ok = gen_server2:call(
Q#amqqueue.pid,
- init_variable_queue, infinity),
+ init_internal_queue, infinity),
Q;
[_] -> not_found %% existing Q on stopped node
end;
@@ -362,12 +359,8 @@ notify_sent(QPid, ChPid) ->
unblock(QPid, ChPid) ->
gen_server2:pcast(QPid, 7, {unblock, ChPid}).
-tx_commit_msg_store_callback(QPid, IsTransientPubs, Pubs, AckTags, From) ->
- gen_server2:pcast(QPid, 7, {tx_commit_msg_store_callback,
- IsTransientPubs, Pubs, AckTags, From}).
-
-tx_commit_vq_callback(QPid) ->
- gen_server2:pcast(QPid, 7, tx_commit_vq_callback).
+maybe_run_queue_via_internal_queue(QPid, Fun, Args) ->
+ gen_server2:pcast(QPid, 7, {maybe_run_queue_via_internal_queue, Fun, Args}).
flush_all(QPids, ChPid) ->
safe_pmap_ok(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b92de66743..33ea625c55 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -55,7 +55,9 @@
owner,
exclusive_consumer,
has_had_consumers,
- variable_queue_state,
+ internal_queue,
+ internal_queue_state,
+ internal_queue_timeout_fun,
next_msg_id,
active_consumers,
blocked_consumers,
@@ -94,7 +96,7 @@
consumers,
transactions,
memory,
- raw_vq_status
+ internal_queue_status
]).
%%----------------------------------------------------------------------------
@@ -116,7 +118,9 @@ init(Q) ->
owner = none,
exclusive_consumer = none,
has_had_consumers = false,
- variable_queue_state = undefined,
+ internal_queue = rabbit_variable_queue,
+ internal_queue_state = undefined,
+ internal_queue_timeout_fun = undefined,
next_msg_id = 1,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
@@ -124,30 +128,33 @@ init(Q) ->
rate_timer_ref = undefined}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-terminate(shutdown, #q{variable_queue_state = VQS}) ->
+terminate(shutdown, #q{internal_queue_state = IQS,
+ internal_queue = IQ}) ->
ok = rabbit_memory_monitor:deregister(self()),
- case VQS of
+ case IQS of
undefined -> ok;
- _ -> rabbit_variable_queue:terminate(VQS)
+ _ -> IQ:terminate(IQS)
end;
-terminate({shutdown, _}, #q{variable_queue_state = VQS}) ->
+terminate({shutdown, _}, #q{internal_queue_state = IQS,
+ internal_queue = IQ}) ->
ok = rabbit_memory_monitor:deregister(self()),
- case VQS of
+ case IQS of
undefined -> ok;
- _ -> rabbit_variable_queue:terminate(VQS)
+ _ -> IQ:terminate(IQS)
end;
-terminate(_Reason, State = #q{variable_queue_state = VQS}) ->
+terminate(_Reason, State = #q{internal_queue_state = IQS,
+ internal_queue = IQ}) ->
ok = rabbit_memory_monitor:deregister(self()),
%% FIXME: How do we cancel active subscriptions?
%% Ensure that any persisted tx messages are removed.
%% TODO: wait for all in flight tx_commits to complete
- case VQS of
+ case IQS of
undefined ->
ok;
_ ->
- VQS1 = rabbit_variable_queue:tx_rollback(
+ IQS1 = IQ:tx_rollback(
lists:concat([PM || #tx { pending_messages = PM } <-
- all_tx_record()]), VQS),
+ all_tx_record()]), IQS),
%% Delete from disk first. If we crash at this point, when
%% a durable queue, we will be recreated at startup,
%% possibly with partial content. The alternative is much
@@ -155,7 +162,7 @@ terminate(_Reason, State = #q{variable_queue_state = VQS}) ->
%% would then have a race between the disk delete and a
%% new queue with the same name being created and
%% published to.
- rabbit_variable_queue:delete_and_terminate(VQS1)
+ IQ:delete_and_terminate(IQS1)
end,
ok = rabbit_amqqueue:internal_delete(qname(State)).
@@ -174,18 +181,18 @@ noreply(NewState) ->
{NewState1, Timeout} = next_state(NewState),
{noreply, NewState1, Timeout}.
-next_state(State = #q{variable_queue_state = VQS}) ->
- next_state1(ensure_rate_timer(State),
- rabbit_variable_queue:needs_sync(VQS)).
+next_state(State = #q{internal_queue_state = IQS,
+ internal_queue = IQ}) ->
+ next_state1(ensure_rate_timer(State), IQ:needs_sync(IQS)).
-next_state1(State = #q{sync_timer_ref = undefined}, true) ->
- {start_sync_timer(State), 0};
-next_state1(State, true) ->
+next_state1(State = #q{sync_timer_ref = undefined}, Callback = {_Fun, _Args}) ->
+ {start_sync_timer(State, Callback), 0};
+next_state1(State, {_Fun, _Args}) ->
{State, 0};
-next_state1(State = #q{sync_timer_ref = undefined}, false) ->
+next_state1(State = #q{sync_timer_ref = undefined}, undefined) ->
{State, hibernate};
-next_state1(State, false) ->
- {stop_sync_timer(State), hibernate}.
+next_state1(State, undefined) ->
+ {stop_sync_timer(State#q{internal_queue_timeout_fun = undefined}), hibernate}.
ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
{ok, TRef} = timer:apply_after(?RATES_REMEASURE_INTERVAL, rabbit_amqqueue,
@@ -204,17 +211,20 @@ stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
State#q{rate_timer_ref = undefined}.
-start_sync_timer(State = #q{sync_timer_ref = undefined}) ->
- {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, rabbit_amqqueue,
- tx_commit_vq_callback, [self()]),
- State#q{sync_timer_ref = TRef}.
+start_sync_timer(State = #q{sync_timer_ref = undefined},
+ Callback = {Fun, Args}) ->
+ {ok, TRef} = timer:apply_after(
+ ?SYNC_INTERVAL, rabbit_amqqueue,
+ maybe_run_queue_via_internal_queue, [self(), Fun, Args]),
+ State#q{sync_timer_ref = TRef, internal_queue_timeout_fun = Callback}.
stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
- State#q{sync_timer_ref = undefined}.
+ State#q{sync_timer_ref = undefined, internal_queue_timeout_fun = undefined}.
-assert_invariant(#q{active_consumers = AC, variable_queue_state = VQS}) ->
- true = (queue:is_empty(AC) orelse rabbit_variable_queue:is_empty(VQS)).
+assert_invariant(#q{active_consumers = AC, internal_queue_state = IQS,
+ internal_queue = IQ}) ->
+ true = (queue:is_empty(AC) orelse IQ:is_empty(IQS)).
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -329,74 +339,73 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) ->
not IsEmpty.
deliver_from_queue_deliver(AckRequired, {false, AutoAcks},
- State = #q { variable_queue_state = VQS }) ->
- {{Message, IsDelivered, AckTag, Remaining}, VQS1} =
- rabbit_variable_queue:fetch(VQS),
+ State = #q{internal_queue_state = IQS,
+ internal_queue = IQ}) ->
+ {{Message, IsDelivered, AckTag, Remaining}, IQS1} = IQ:fetch(IQS),
AutoAcks1 = case AckRequired of
true -> AutoAcks;
false -> [AckTag | AutoAcks]
end,
{{Message, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1},
- State #q { variable_queue_state = VQS1 }}.
+ State #q { internal_queue_state = IQS1 }}.
-run_message_queue(State = #q { variable_queue_state = VQS }) ->
+run_message_queue(State = #q{internal_queue_state = IQS,
+ internal_queue = IQ}) ->
Funs = { fun deliver_from_queue_pred/2,
fun deliver_from_queue_deliver/3 },
- IsEmpty = rabbit_variable_queue:is_empty(VQS),
+ IsEmpty = IQ:is_empty(IQS),
{{_IsEmpty1, AutoAcks}, State1} =
deliver_msgs_to_consumers(Funs, {IsEmpty, []}, State),
- VQS1 = rabbit_variable_queue:ack(AutoAcks, State1 #q.variable_queue_state),
- State1 #q { variable_queue_state = VQS1 }.
+ IQS1 = IQ:ack(AutoAcks, State1 #q.internal_queue_state),
+ State1 #q { internal_queue_state = IQS1 }.
-attempt_delivery(none, _ChPid, Message, State) ->
+attempt_delivery(none, _ChPid, Message, State = #q{internal_queue = IQ}) ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
fun (AckRequired, false, State1) ->
{AckTag, State2} =
case AckRequired of
true ->
- {AckTag1, VQS} =
- rabbit_variable_queue:publish_delivered(
- Message, State1 #q.variable_queue_state),
- {AckTag1, State1 #q { variable_queue_state = VQS }};
+ {AckTag1, IQS} =
+ IQ:publish_delivered(
+ Message, State1 #q.internal_queue_state),
+ {AckTag1, State1 #q { internal_queue_state = IQS }};
false ->
{noack, State1}
end,
{{Message, false, AckTag}, true, State2}
end,
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
-attempt_delivery(Txn, ChPid, Message, State) ->
- VQS = rabbit_variable_queue:tx_publish(
- Message, State #q.variable_queue_state),
+attempt_delivery(Txn, ChPid, Message, State = #q{internal_queue = IQ}) ->
+ IQS = IQ:tx_publish(Message, State #q.internal_queue_state),
record_pending_message(Txn, ChPid, Message),
- {true, State #q { variable_queue_state = VQS }}.
+ {true, State #q { internal_queue_state = IQS }}.
-deliver_or_enqueue(Txn, ChPid, Message, State) ->
+deliver_or_enqueue(Txn, ChPid, Message, State = #q{internal_queue = IQ}) ->
case attempt_delivery(Txn, ChPid, Message, State) of
{true, NewState} ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
- {_SeqId, VQS} = rabbit_variable_queue:publish(
- Message, State #q.variable_queue_state),
- {false, NewState #q { variable_queue_state = VQS }}
+ {_SeqId, IQS} = IQ:publish(Message, State #q.internal_queue_state),
+ {false, NewState #q { internal_queue_state = IQS }}
end.
%% all these messages have already been delivered at least once and
%% not ack'd, but need to be either redelivered or requeued
deliver_or_requeue_n([], State) ->
State;
-deliver_or_requeue_n(MsgsWithAcks, State) ->
+deliver_or_requeue_n(MsgsWithAcks, State = #q{internal_queue = IQ}) ->
Funs = { fun deliver_or_requeue_msgs_pred/2,
fun deliver_or_requeue_msgs_deliver/3 },
{{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} =
deliver_msgs_to_consumers(
Funs, {length(MsgsWithAcks), [], MsgsWithAcks}, State),
- VQS = rabbit_variable_queue:ack(AutoAcks, NewState #q.variable_queue_state),
+ IQS = IQ:ack(AutoAcks, NewState #q.internal_queue_state),
case OutstandingMsgs of
- [] -> NewState #q { variable_queue_state = VQS };
- _ -> VQS1 = rabbit_variable_queue:requeue(OutstandingMsgs, VQS),
- NewState #q { variable_queue_state = VQS1 }
+ [] -> NewState #q { internal_queue_state = IQS };
+ _ -> IQS1 = IQ:requeue(OutstandingMsgs, IQS),
+ NewState #q { internal_queue_state = IQS1 }
end.
deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) ->
@@ -508,6 +517,16 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
+maybe_run_queue_via_internal_queue(Fun, Args,
+ State = #q{internal_queue_state = IQS,
+ internal_queue = IQ}) ->
+ {RunQueue, IQS1} = apply(IQ, Fun, Args ++ [IQS]),
+ State1 = State#q{internal_queue_state = IQS1},
+ case RunQueue of
+ true -> run_message_queue(State1);
+ false -> State1
+ end.
+
lookup_tx(Txn) ->
case get({txn, Txn}) of
undefined -> #tx{ch_pid = none,
@@ -537,35 +556,31 @@ record_pending_acks(Txn, ChPid, MsgIds) ->
store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending],
ch_pid = ChPid}).
-commit_transaction(Txn, From, State) ->
- #tx { ch_pid = ChPid,
- pending_messages = PendingMessages,
- pending_acks = PendingAcks
- } = lookup_tx(Txn),
+commit_transaction(Txn, From, State = #q{internal_queue = IQ}) ->
+ #tx{ch_pid = ChPid, pending_messages = PendingMessages,
+ pending_acks = PendingAcks} = lookup_tx(Txn),
PendingMessagesOrdered = lists:reverse(PendingMessages),
PendingAcksOrdered = lists:append(PendingAcks),
Acks =
case lookup_ch(ChPid) of
- not_found -> [];
- C = #cr { unacked_messages = UAM } ->
+ not_found ->
+ [];
+ C = #cr{unacked_messages = UAM} ->
{MsgsWithAcks, Remaining} =
collect_messages(PendingAcksOrdered, UAM),
store_ch_record(C#cr{unacked_messages = Remaining}),
[AckTag || {_Message, AckTag} <- MsgsWithAcks]
end,
- {RunQueue, VQS} =
- rabbit_variable_queue:tx_commit(
- PendingMessagesOrdered, Acks, From, State #q.variable_queue_state),
+ {RunQueue, IQS} = IQ:tx_commit(PendingMessagesOrdered, Acks, From,
+ State#q.internal_queue_state),
erase_tx(Txn),
- {RunQueue, State #q { variable_queue_state = VQS }}.
+ {RunQueue, State#q{internal_queue_state = IQS}}.
-rollback_transaction(Txn, State) ->
- #tx { pending_messages = PendingMessages
- } = lookup_tx(Txn),
- VQS = rabbit_variable_queue:tx_rollback(PendingMessages,
- State #q.variable_queue_state),
+rollback_transaction(Txn, State = #q{internal_queue = IQ}) ->
+ #tx{pending_messages = PendingMessages} = lookup_tx(Txn),
+ IQS = IQ:tx_rollback(PendingMessages, State #q.internal_queue_state),
erase_tx(Txn),
- State #q { variable_queue_state = VQS }.
+ State#q{internal_queue_state = IQS}.
collect_messages(MsgIds, UAM) ->
lists:mapfoldl(
@@ -592,8 +607,8 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = none}) ->
'';
i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
ConsumerTag;
-i(messages_ready, #q{variable_queue_state = VQS}) ->
- rabbit_variable_queue:len(VQS);
+i(messages_ready, #q{internal_queue_state = IQS, internal_queue = IQ}) ->
+ IQ:len(IQS);
i(messages_unacknowledged, _) ->
lists:sum([dict:size(UAM) ||
#cr{unacked_messages = UAM} <- all_ch_record()]);
@@ -614,25 +629,24 @@ i(transactions, _) ->
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
-i(raw_vq_status, State) ->
- rabbit_variable_queue:status(State#q.variable_queue_state);
+i(internal_queue_status, #q{internal_queue_state = IQS, internal_queue = IQ}) ->
+ IQ:status(IQS);
i(Item, _) ->
throw({bad_argument, Item}).
%---------------------------------------------------------------------------
-handle_call(init_variable_queue, From, State =
- #q{variable_queue_state = undefined,
+handle_call(init_internal_queue, From, State =
+ #q{internal_queue_state = undefined, internal_queue = IQ,
q = #amqqueue{name = QName, durable = IsDurable}}) ->
gen_server2:reply(From, ok),
PersistentStore = case IsDurable of
true -> ?PERSISTENT_MSG_STORE;
false -> ?TRANSIENT_MSG_STORE
end,
- noreply(State #q { variable_queue_state =
- rabbit_variable_queue:init(QName, PersistentStore) });
+ noreply(State#q{internal_queue_state = IQ:init(QName, PersistentStore)});
-handle_call(init_variable_queue, _From, State) ->
+handle_call(init_internal_queue, _From, State) ->
reply(ok, State);
handle_call(sync, _From, State) ->
@@ -697,27 +711,25 @@ handle_call({notify_down, ChPid}, _From, State) ->
end;
handle_call({basic_get, ChPid, NoAck}, _From,
- State = #q{q = #amqqueue{name = QName},
- next_msg_id = NextId,
- variable_queue_state = VQS
- }) ->
- case rabbit_variable_queue:fetch(VQS) of
- {empty, VQS1} -> reply(empty, State #q { variable_queue_state = VQS1 });
- {{Message, IsDelivered, AckTag, Remaining}, VQS1} ->
+ State = #q{q = #amqqueue{name = QName}, next_msg_id = NextId,
+ internal_queue_state = IQS, internal_queue = IQ}) ->
+ case IQ:fetch(IQS) of
+ {empty, IQS1} -> reply(empty, State #q { internal_queue_state = IQS1 });
+ {{Message, IsDelivered, AckTag, Remaining}, IQS1} ->
AckRequired = not(NoAck),
- VQS2 =
+ IQS2 =
case AckRequired of
true ->
C = #cr{unacked_messages = UAM} = ch_record(ChPid),
NewUAM = dict:store(NextId, {Message, AckTag}, UAM),
store_ch_record(C#cr{unacked_messages = NewUAM}),
- VQS1;
+ IQS1;
false ->
- rabbit_variable_queue:ack([AckTag], VQS1)
+ IQ:ack([AckTag], IQS1)
end,
Msg = {QName, self(), NextId, IsDelivered, Message},
reply({ok, Remaining, Msg},
- State #q { next_msg_id = NextId + 1, variable_queue_state = VQS2 })
+ State #q { next_msg_id = NextId + 1, internal_queue_state = IQS2 })
end;
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
@@ -797,14 +809,14 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end;
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
- variable_queue_state = VQS,
+ internal_queue_state = IQS,
+ internal_queue = IQ,
active_consumers = ActiveConsumers}) ->
- Length = rabbit_variable_queue:len(VQS),
- reply({ok, Name, Length, queue:len(ActiveConsumers)}, State);
+ reply({ok, Name, IQ:len(IQS), queue:len(ActiveConsumers)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
- State = #q { variable_queue_state = VQS }) ->
- Length = rabbit_variable_queue:len(VQS),
+ State = #q{internal_queue_state = IQS, internal_queue = IQ}) ->
+ Length = IQ:len(IQS),
IsEmpty = Length == 0,
IsUnused = is_unused(State),
if
@@ -816,9 +828,9 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
{stop, normal, {ok, Length}, State}
end;
-handle_call(purge, _From, State) ->
- {Count, VQS} = rabbit_variable_queue:purge(State #q.variable_queue_state),
- reply({ok, Count}, State #q { variable_queue_state = VQS });
+handle_call(purge, _From, State = #q{internal_queue = IQ}) ->
+ {Count, IQS} = IQ:purge(State#q.internal_queue_state),
+ reply({ok, Count}, State#q{internal_queue_state = IQS});
handle_call({claim_queue, ReaderPid}, _From,
State = #q{owner = Owner, exclusive_consumer = Holder}) ->
@@ -848,7 +860,7 @@ handle_cast({deliver, Txn, Message, ChPid}, State) ->
{_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
noreply(NewState);
-handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
+handle_cast({ack, Txn, MsgIds, ChPid}, State = #q{internal_queue = IQ}) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
@@ -856,11 +868,10 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
case Txn of
none ->
{MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM),
- VQS = rabbit_variable_queue:ack(
- [AckTag || {_Message, AckTag} <- MsgWithAcks],
- State #q.variable_queue_state),
+ IQS = IQ:ack([AckTag || {_Message, AckTag} <- MsgWithAcks],
+ State #q.internal_queue_state),
store_ch_record(C#cr{unacked_messages = Remaining}),
- noreply(State #q { variable_queue_state = VQS });
+ noreply(State #q { internal_queue_state = IQS });
_ ->
record_pending_acks(Txn, ChPid, MsgIds),
noreply(State)
@@ -894,23 +905,8 @@ handle_cast({notify_sent, ChPid}, State) ->
C#cr{unsent_message_count = Count - 1}
end));
-handle_cast({tx_commit_msg_store_callback, IsTransientPubs, Pubs, AckTags, From},
- State = #q{variable_queue_state = VQS}) ->
- {RunQueue, VQS1} = rabbit_variable_queue:tx_commit_from_msg_store(
- IsTransientPubs, Pubs, AckTags, From, VQS),
- State1 = State#q{variable_queue_state = VQS1},
- noreply(case RunQueue of
- true -> run_message_queue(State1);
- false -> State1
- end);
-
-handle_cast(tx_commit_vq_callback, State = #q{variable_queue_state = VQS}) ->
- {RunQueue, VQS1} = rabbit_variable_queue:tx_commit_from_vq(VQS),
- State1 = State#q{variable_queue_state = VQS1},
- noreply(case RunQueue of
- true -> run_message_queue(State1);
- false -> State1
- end);
+handle_cast({maybe_run_queue_via_internal_queue, Fun, Args}, State) ->
+ noreply(maybe_run_queue_via_internal_queue(Fun, Args, State));
handle_cast({limit, ChPid, LimiterPid}, State) ->
noreply(
@@ -932,21 +928,21 @@ handle_cast({flush, ChPid}, State) ->
ok = rabbit_channel:flushed(ChPid, self()),
noreply(State);
-handle_cast(remeasure_rates, State = #q{variable_queue_state = VQS}) ->
- VQS1 = rabbit_variable_queue:remeasure_rates(VQS),
- RamDuration = rabbit_variable_queue:ram_duration(VQS1),
+handle_cast(remeasure_rates, State = #q{internal_queue_state = IQS,
+ internal_queue = IQ}) ->
+ IQS1 = IQ:remeasure_rates(IQS),
+ RamDuration = IQ:ram_duration(IQS1),
DesiredDuration =
rabbit_memory_monitor:report_queue_duration(self(), RamDuration),
- VQS2 = rabbit_variable_queue:set_queue_ram_duration_target(
- DesiredDuration, VQS1),
+ IQS2 = IQ:set_queue_ram_duration_target(DesiredDuration, IQS1),
noreply(State#q{rate_timer_ref = just_measured,
- variable_queue_state = VQS2});
+ internal_queue_state = IQS2});
handle_cast({set_queue_duration, Duration},
- State = #q{variable_queue_state = VQS}) ->
- VQS1 = rabbit_variable_queue:set_queue_ram_duration_target(
- Duration, VQS),
- noreply(State#q{variable_queue_state = VQS1});
+ State = #q{internal_queue_state = IQS,
+ internal_queue = IQ}) ->
+ IQS1 = IQ:set_queue_ram_duration_target(Duration, IQS),
+ noreply(State#q{internal_queue_state = IQS1});
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -971,13 +967,12 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
{stop, NewState} -> {stop, normal, NewState}
end;
-handle_info(timeout, State = #q{variable_queue_state = VQS}) ->
- {RunQueue, VQS1} = rabbit_variable_queue:tx_commit_from_vq(VQS),
- State1 = State#q{variable_queue_state = VQS1},
- noreply(case RunQueue of
- true -> run_message_queue(State1);
- false -> State1
- end);
+handle_info(timeout, State = #q{internal_queue_timeout_fun = undefined}) ->
+ noreply(State);
+
+handle_info(timeout, State = #q{internal_queue_timeout_fun = {Fun, Args}}) ->
+ noreply(maybe_run_queue_via_internal_queue(
+ Fun, Args, State#q{internal_queue_timeout_fun = undefined}));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
@@ -986,11 +981,11 @@ handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
-handle_pre_hibernate(State = #q{ variable_queue_state = VQS }) ->
- VQS1 = rabbit_variable_queue:flush_journal(VQS),
+handle_pre_hibernate(State = #q{internal_queue_state = IQS,
+ internal_queue = IQ}) ->
+ IQS1 = IQ:handle_pre_hibernate(IQS),
%% no activity for a while == 0 egress and ingress rates
DesiredDuration =
rabbit_memory_monitor:report_queue_duration(self(), infinity),
- VQS2 = rabbit_variable_queue:set_queue_ram_duration_target(
- DesiredDuration, VQS1),
- {hibernate, stop_rate_timer(State#q{variable_queue_state = VQS2})}.
+ IQS2 = IQ:set_queue_ram_duration_target(DesiredDuration, IQS1),
+ {hibernate, stop_rate_timer(State#q{internal_queue_state = IQS2})}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 6b8998c2e6..22138bf112 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1409,7 +1409,7 @@ test_variable_queue_dynamic_duration_change() ->
{_SeqIds1, VQ7} = variable_queue_publish(true, 20, VQ6),
{VQ8, AckTags1} = variable_queue_fetch(20, true, false, 20, VQ7),
VQ9 = rabbit_variable_queue:ack(AckTags1, VQ8),
- VQ10 = rabbit_variable_queue:flush_journal(VQ9),
+ VQ10 = rabbit_variable_queue:handle_pre_hibernate(VQ9),
{empty, VQ11} = rabbit_variable_queue:fetch(VQ10),
rabbit_variable_queue:terminate(VQ11),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9d33cc7ce5..1934fafc3e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -35,8 +35,9 @@
set_queue_ram_duration_target/2, remeasure_rates/1,
ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1,
delete_and_terminate/1, requeue/2, tx_publish/2, tx_rollback/2,
- tx_commit/4, tx_commit_from_msg_store/5, tx_commit_from_vq/1,
- needs_sync/1, flush_journal/1, status/1]).
+ tx_commit/4, needs_sync/1, handle_pre_hibernate/1, status/1]).
+
+-export([tx_commit_post_msg_store/5, tx_commit_index/1]). %% internal
%%----------------------------------------------------------------------------
%% Definitions:
@@ -242,12 +243,12 @@
-spec(tx_rollback/2 :: ([msg_id()], vqstate()) -> vqstate()).
-spec(tx_commit/4 :: ([msg_id()], [ack()], {pid(), any()}, vqstate()) ->
{boolean(), vqstate()}).
--spec(tx_commit_from_msg_store/5 ::
+-spec(tx_commit_post_msg_store/5 ::
(boolean(), [msg_id()], [ack()], {pid(), any()}, vqstate()) ->
{boolean(), vqstate()}).
--spec(tx_commit_from_vq/1 :: (vqstate()) -> {boolean(), vqstate()}).
--spec(needs_sync/1 :: (vqstate()) -> boolean()).
--spec(flush_journal/1 :: (vqstate()) -> vqstate()).
+-spec(tx_commit_index/1 :: (vqstate()) -> {boolean(), vqstate()}).
+-spec(needs_sync/1 :: (vqstate()) -> ('undefined' | {atom(), [any()]})).
+-spec(handle_pre_hibernate/1 :: (vqstate()) -> vqstate()).
-spec(status/1 :: (vqstate()) -> [{atom(), any()}]).
-endif.
@@ -505,23 +506,26 @@ delete_and_terminate(State) ->
persistent_store = PersistentStore,
transient_threshold = TransientThreshold }} =
purge(State),
- IndexState1 =
+ %% flushing here is good because it deletes all full segments,
+ %% leaving only partial segments around.
+ IndexState1 = rabbit_queue_index:flush_journal(IndexState),
+ IndexState2 =
case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(
- IndexState) of
- {N, N, IndexState2} ->
- IndexState2;
- {DeltaSeqId, NextSeqId, IndexState2} ->
- {_DeleteCount, IndexState3} =
+ IndexState1) of
+ {N, N, IndexState3} ->
+ IndexState3;
+ {DeltaSeqId, NextSeqId, IndexState3} ->
+ {_DeleteCount, IndexState4} =
delete1(PersistentStore, TransientThreshold, NextSeqId, 0,
- DeltaSeqId, IndexState2),
- IndexState3
+ DeltaSeqId, IndexState3),
+ IndexState4
end,
- IndexState4 = rabbit_queue_index:terminate_and_erase(IndexState1),
+ IndexState5 = rabbit_queue_index:terminate_and_erase(IndexState2),
rabbit_msg_store:delete_client(PersistentStore, PRef),
rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef),
rabbit_msg_store:client_terminate(MSCStateP),
rabbit_msg_store:client_terminate(MSCStateT),
- State1 #vqstate { index_state = IndexState4 }.
+ State1 #vqstate { index_state = IndexState5 }.
%% [{Msg, AckTag}]
%% We guarantee that after fetch, only persistent msgs are left on
@@ -595,19 +599,20 @@ tx_commit(Pubs, AckTags, From, State =
case IsTransientPubs orelse
?TRANSIENT_MSG_STORE == PersistentStore of
true ->
- tx_commit_from_msg_store(
+ tx_commit_post_msg_store(
IsTransientPubs, Pubs, AckTags, From, State);
false ->
Self = self(),
ok = rabbit_msg_store:sync(
?PERSISTENT_MSG_STORE, PersistentMsgIds,
- fun () -> ok = rabbit_amqqueue:tx_commit_msg_store_callback(
- Self, IsTransientPubs, Pubs, AckTags, From)
+ fun () -> ok = rabbit_amqqueue:maybe_run_queue_via_internal_queue(
+ Self, tx_commit_post_msg_store,
+ [IsTransientPubs, Pubs, AckTags, From])
end),
{false, State}
end.
-tx_commit_from_msg_store(IsTransientPubs, Pubs, AckTags, From, State =
+tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, From, State =
#vqstate { on_sync = OnSync = {SAcks, SPubs, SFroms},
persistent_store = PersistentStore }) ->
%% If we are a non-durable queue, or (no persisent pubs, and no
@@ -617,17 +622,17 @@ tx_commit_from_msg_store(IsTransientPubs, Pubs, AckTags, From, State =
case PersistentStore == ?TRANSIENT_MSG_STORE orelse
(IsTransientPubs andalso [] == DiskAcks) of
true -> {Res, State1} =
- tx_commit_from_vq(State #vqstate {
- on_sync = {[], [Pubs], [From]} }),
+ tx_commit_index(State #vqstate {
+ on_sync = {[], [Pubs], [From]} }),
{Res, State1 #vqstate { on_sync = OnSync }};
false -> {false, State #vqstate { on_sync = { [DiskAcks | SAcks],
[Pubs | SPubs],
[From | SFroms] }}}
end.
-tx_commit_from_vq(State = #vqstate { on_sync = {_, _, []} }) ->
+tx_commit_index(State = #vqstate { on_sync = {_, _, []} }) ->
{false, State};
-tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms},
+tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms},
persistent_store = PersistentStore }) ->
Acks = lists:flatten(SAcks),
State1 = ack(Acks, State),
@@ -656,11 +661,11 @@ tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms},
State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }}.
needs_sync(#vqstate { on_sync = {_, _, []} }) ->
- false;
+ undefined;
needs_sync(_) ->
- true.
+ {tx_commit_index, []}.
-flush_journal(State = #vqstate { index_state = IndexState }) ->
+handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state =
rabbit_queue_index:flush_journal(IndexState) }.