summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-16 11:14:08 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-16 11:14:08 +0100
commite6ab220d0f26546730ce34d126e010028c8d2320 (patch)
treeb947fc534a57ab75e037959c204eed5abd882adc
parent183085e1c386bac6342302a92ea2cf865ebdb8fc (diff)
downloadrabbitmq-server-git-e6ab220d0f26546730ce34d126e010028c8d2320.tar.gz
Thorough reworking of API - the BQ is now responsible for hanging onto unacked msgs and all details of transactions
-rw-r--r--include/rabbit_backing_queue_spec.hrl21
-rw-r--r--src/rabbit_amqqueue.erl17
-rw-r--r--src/rabbit_amqqueue_process.erl356
-rw-r--r--src/rabbit_backing_queue.erl11
-rw-r--r--src/rabbit_channel.erl4
-rw-r--r--src/rabbit_msg_store.erl4
-rw-r--r--src/rabbit_queue_index.erl2
-rw-r--r--src/rabbit_tests.erl10
-rw-r--r--src/rabbit_variable_queue.erl660
9 files changed, 526 insertions, 559 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 3a0f701b85..d86a538288 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -29,22 +29,25 @@
%% Contributor(s): ______________________________________.
%%
+-type(fetch_result() ::
+ %% Message, IsDelivered, AckTag, Remaining_Len
+ ('empty'|{basic_message(), boolean(), ack(), non_neg_integer()})).
+
-spec(start/1 :: ([queue_name()]) -> 'ok').
-spec(init/2 :: (queue_name(), boolean()) -> state()).
-spec(terminate/1 :: (state()) -> state()).
-spec(delete_and_terminate/1 :: (state()) -> state()).
-spec(purge/1 :: (state()) -> {non_neg_integer(), state()}).
-spec(publish/2 :: (basic_message(), state()) -> state()).
--spec(publish_delivered/2 :: (basic_message(), state()) -> {ack(), state()}).
--spec(fetch/1 :: (state()) ->
- {('empty'|{basic_message(), boolean(), ack(), non_neg_integer()}),
- state()}).
+-spec(publish_delivered/3 ::
+ (boolean(), basic_message(), state()) -> {ack(), state()}).
+-spec(fetch/2 :: (boolean(), state()) -> {fetch_result(), state()}).
-spec(ack/2 :: ([ack()], state()) -> state()).
--spec(tx_publish/2 :: (basic_message(), state()) -> state()).
--spec(tx_rollback/2 :: ([guid()], state()) -> state()).
--spec(tx_commit/4 :: ([basic_message()], [ack()], {pid(), any()}, state()) ->
- {boolean(), state()}).
--spec(requeue/2 :: ([{basic_message(), ack()}], state()) -> state()).
+-spec(tx_publish/3 :: (txn(), basic_message(), state()) -> state()).
+-spec(tx_ack/3 :: (txn(), [ack()], state()) -> state()).
+-spec(tx_rollback/2 :: (txn(), state()) -> {[ack()], state()}).
+-spec(tx_commit/3 :: (txn(), {pid(), any()}, state()) -> {[ack()], state()}).
+-spec(requeue/2 :: ([ack()], state()) -> state()).
-spec(len/1 :: (state()) -> non_neg_integer()).
-spec(is_empty/1 :: (state()) -> boolean()).
-spec(set_ram_duration_target/2 ::
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6125fddcc2..cc6f08b7d4 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -43,7 +43,7 @@
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2, unblock/2, maybe_run_queue_via_backing_queue/2,
flush_all/2]).
--export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
+-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-import(mnesia).
@@ -92,8 +92,8 @@
-spec(deliver/2 :: (pid(), delivery()) -> boolean()).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
--spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
--spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
+-spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()).
+-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()).
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
@@ -107,8 +107,7 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(maybe_run_queue_via_backing_queue/2 ::
- (pid(), (fun ((A) -> {boolean(), A}))) -> 'ok').
+-spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok').
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
@@ -298,16 +297,16 @@ requeue(QPid, MsgIds, ChPid) ->
ack(QPid, Txn, MsgIds, ChPid) ->
gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
-commit_all(QPids, Txn) ->
+commit_all(QPids, Txn, ChPid) ->
safe_pmap_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end,
+ fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end,
QPids).
-rollback_all(QPids, Txn) ->
+rollback_all(QPids, Txn, ChPid) ->
safe_pmap_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end,
+ fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end,
QPids).
notify_down_all(QPids, ChPid) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9697cc1347..efbc276692 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -56,7 +56,6 @@
backing_queue,
backing_queue_state,
backing_queue_timeout_fun,
- next_msg_id,
active_consumers,
blocked_consumers,
sync_timer_ref,
@@ -65,8 +64,6 @@
-record(consumer, {tag, ack_required}).
--record(tx, {ch_pid, pending_messages, pending_acks}).
-
%% These are held in our process dictionary
-record(cr, {consumer_count,
ch_pid,
@@ -88,9 +85,7 @@
exclusive_consumer_tag,
messages_ready,
messages_unacknowledged,
- messages_uncommitted,
messages,
- acks_uncommitted,
consumers,
transactions,
memory,
@@ -122,7 +117,6 @@ init([Q, InitBQ]) ->
backing_queue = BQ,
backing_queue_state = maybe_init_backing_queue(InitBQ, BQ, Q),
backing_queue_timeout_fun = undefined,
- next_msg_id = 1,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
sync_timer_ref = undefined,
@@ -135,49 +129,39 @@ maybe_init_backing_queue(
maybe_init_backing_queue(false, _BQ, _Q) ->
undefined.
-terminate(shutdown, #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
- ok = rabbit_memory_monitor:deregister(self()),
- case BQS of
- undefined -> ok;
- _ -> BQ:terminate(BQS)
- end;
-terminate({shutdown, _}, #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
- ok = rabbit_memory_monitor:deregister(self()),
- case BQS of
- undefined -> ok;
- _ -> BQ:terminate(BQS)
- end;
-terminate(_Reason, State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+terminate(shutdown, State) ->
+ terminate_shutdown(terminate, State);
+terminate({shutdown, _}, State) ->
+ terminate_shutdown(terminate, State);
+terminate(_Reason, State) ->
ok = rabbit_memory_monitor:deregister(self()),
%% FIXME: How do we cancel active subscriptions?
%% Ensure that any persisted tx messages are removed.
%% TODO: wait for all in flight tx_commits to complete
- case BQS of
- undefined ->
- ok;
- _ ->
- BQS1 = BQ:tx_rollback(
- lists:concat([PM || #tx { pending_messages = PM } <-
- all_tx_record()]), BQS),
- %% Delete from disk first. If we crash at this point, when
- %% a durable queue, we will be recreated at startup,
- %% possibly with partial content. The alternative is much
- %% worse however - if we called internal_delete first, we
- %% would then have a race between the disk delete and a
- %% new queue with the same name being created and
- %% published to.
- BQ:delete_and_terminate(BQS1)
- end,
- ok = rabbit_amqqueue:internal_delete(qname(State)).
+ State1 = terminate_shutdown(delete_and_terminate, State),
+ ok = rabbit_amqqueue:internal_delete(qname(State1)).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------
+terminate_shutdown(Fun, State =
+ #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ ok = rabbit_memory_monitor:deregister(self()),
+ case BQS of
+ undefined -> State;
+ _ -> BQS1 = lists:foldl(
+ fun (#cr{txn = none}, BQSN) ->
+ BQSN;
+ (#cr{txn = Txn}, BQSN) ->
+ {_AckTags, BQSN1} =
+ BQ:tx_rollback(Txn, BQSN),
+ BQSN1
+ end, BQS, all_ch_record()),
+ State#q{backing_queue_state = BQ:Fun(BQS1)}
+ end.
+
reply(Reply, NewState) ->
assert_invariant(NewState),
{NewState1, Timeout} = next_state(NewState),
@@ -248,7 +232,7 @@ ch_record(ChPid) ->
C = #cr{consumer_count = 0,
ch_pid = ChPid,
monitor_ref = MonitorRef,
- unacked_messages = dict:new(),
+ unacked_messages = [],
is_limit_active = false,
txn = none,
unsent_message_count = 0},
@@ -282,8 +266,7 @@ record_current_channel_tx(ChPid, Txn) ->
deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
State = #q{q = #amqqueue{name = QName},
active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers,
- next_msg_id = NextId}) ->
+ blocked_consumers = BlockedConsumers}) ->
case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
@@ -299,12 +282,11 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
DeliverFun(AckRequired, FunAcc, State),
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, IsDelivered, Message}),
- NewUAM =
- case AckRequired of
- true -> dict:store(NextId, {Message, AckTag}, UAM);
- false -> UAM
- end,
+ {QName, self(), AckTag, IsDelivered, Message}),
+ NewUAM = case AckRequired of
+ true -> [AckTag|UAM];
+ false -> UAM
+ end,
NewC = C#cr{unsent_message_count = Count + 1,
unacked_messages = NewUAM},
store_ch_record(NewC),
@@ -322,8 +304,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
end,
State2 = State1#q{
active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers,
- next_msg_id = NextId + 1},
+ blocked_consumers = NewBlockedConsumers},
deliver_msgs_to_consumers(Funs, FunAcc1, State2);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
@@ -344,50 +325,39 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{FunAcc, State}
end.
-deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) ->
+deliver_from_queue_pred(IsEmpty, _State) ->
not IsEmpty.
-deliver_from_queue_deliver(AckRequired, {false, AutoAcks},
+
+deliver_from_queue_deliver(AckRequired, false,
State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
- {{Message, IsDelivered, AckTag, Remaining}, BQS1} = BQ:fetch(BQS),
- AutoAcks1 = case AckRequired of
- true -> AutoAcks;
- false -> [AckTag | AutoAcks]
- end,
- {{Message, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1},
+ {{Message, IsDelivered, AckTag, Remaining}, BQS1} =
+ BQ:fetch(AckRequired, BQS),
+ {{Message, IsDelivered, AckTag}, 0 == Remaining,
State #q { backing_queue_state = BQS1 }}.
run_message_queue(State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
- Funs = { fun deliver_from_queue_pred/2,
- fun deliver_from_queue_deliver/3 },
+ Funs = {fun deliver_from_queue_pred/2,
+ fun deliver_from_queue_deliver/3},
IsEmpty = BQ:is_empty(BQS),
- {{_IsEmpty1, AutoAcks}, State1} =
- deliver_msgs_to_consumers(Funs, {IsEmpty, []}, State),
- BQS1 = BQ:ack(AutoAcks, State1 #q.backing_queue_state),
- State1 #q { backing_queue_state = BQS1 }.
+ {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State),
+ State1.
attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
- fun (AckRequired, false, State1) ->
- {AckTag, State2} =
- case AckRequired of
- true ->
- {AckTag1, BQS} =
- BQ:publish_delivered(
- Message, State1 #q.backing_queue_state),
- {AckTag1, State1 #q { backing_queue_state = BQS }};
- false ->
- {noack, State1}
- end,
- {{Message, false, AckTag}, true, State2}
+ fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
+ {AckTag, BQS1} =
+ BQ:publish_delivered(AckRequired, Message, BQS),
+ {{Message, false, AckTag}, true,
+ State1#q{backing_queue_state = BQS1}}
end,
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
-attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
- BQS = BQ:tx_publish(Message, State #q.backing_queue_state),
- record_pending_message(Txn, ChPid, Message),
- {true, State #q { backing_queue_state = BQS }}.
+attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ record_current_channel_tx(ChPid, Txn),
+ {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}.
deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
case attempt_delivery(Txn, ChPid, Message, State) of
@@ -396,49 +366,22 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
BQS = BQ:publish(Message, State #q.backing_queue_state),
- {false, NewState #q { backing_queue_state = BQS }}
+ {false, NewState#q{backing_queue_state = BQS}}
end.
-%% all these messages have already been delivered at least once and
-%% not ack'd, but need to be either redelivered or requeued
-deliver_or_requeue_n([], State) ->
- State;
-deliver_or_requeue_n(MsgsWithAcks, State = #q{backing_queue = BQ}) ->
- Funs = { fun deliver_or_requeue_msgs_pred/2,
- fun deliver_or_requeue_msgs_deliver/3 },
- {{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} =
- deliver_msgs_to_consumers(
- Funs, {length(MsgsWithAcks), [], MsgsWithAcks}, State),
- BQS = BQ:ack(AutoAcks, NewState #q.backing_queue_state),
- case OutstandingMsgs of
- [] -> NewState #q { backing_queue_state = BQS };
- _ -> BQS1 = BQ:requeue(OutstandingMsgs, BQS),
- NewState #q { backing_queue_state = BQS1 }
- end.
-
-deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) ->
- 0 < Len.
-deliver_or_requeue_msgs_deliver(
- false, {Len, AcksAcc, [{Message, AckTag} | MsgsWithAcks]}, State) ->
- {{Message, true, noack}, {Len - 1, [AckTag | AcksAcc], MsgsWithAcks},
- State};
-deliver_or_requeue_msgs_deliver(
- true, {Len, AcksAcc, [{Message, AckTag} | MsgsWithAcks]}, State) ->
- {{Message, true, AckTag}, {Len - 1, AcksAcc, MsgsWithAcks}, State}.
+requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
+ maybe_run_queue_via_backing_queue(
+ fun (BQS) -> BQ:requeue(AckTags, BQS) end, State).
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
remove_consumer(ChPid, ConsumerTag, Queue) ->
- %% TODO: replace this with queue:filter/2 once we move to R12
- queue:from_list(lists:filter(
- fun ({CP, #consumer{tag = CT}}) ->
- (CP /= ChPid) or (CT /= ConsumerTag)
- end, queue:to_list(Queue))).
+ queue:filter(fun ({CP, #consumer{tag = CT}}) ->
+ (CP /= ChPid) or (CT /= ConsumerTag)
+ end, Queue).
remove_consumers(ChPid, Queue) ->
- %% TODO: replace this with queue:filter/2 once we move to R12
- queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(Queue))).
+ queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue).
move_consumers(ChPid, From, To) ->
{Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
@@ -489,12 +432,10 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
true -> {stop, State1};
false -> State2 = case Txn of
none -> State1;
- _ -> rollback_transaction(Txn, State1)
+ _ -> rollback_transaction(Txn, ChPid,
+ State1)
end,
- {ok, deliver_or_requeue_n(
- [MsgWithAck ||
- {_MsgId, MsgWithAck} <- dict:to_list(UAM)],
- State2)}
+ {ok, requeue_and_run(UAM, State2)}
end
end.
@@ -526,72 +467,34 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- {RunQueue, BQS1} = Fun(BQS),
- State1 = State#q{backing_queue_state = BQS1},
- case RunQueue of
- true -> run_message_queue(State1);
- false -> State1
- end.
+ run_message_queue(State#q{backing_queue_state = Fun(BQS)}).
-lookup_tx(Txn) ->
- case get({txn, Txn}) of
- undefined -> #tx{ch_pid = none,
- pending_messages = [],
- pending_acks = []};
- V -> V
- end.
-
-store_tx(Txn, Tx) ->
- put({txn, Txn}, Tx).
-
-erase_tx(Txn) ->
- erase({txn, Txn}).
-
-all_tx_record() ->
- [T || {{txn, _}, T} <- get()].
+commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {AckTags, BQS1} = BQ:tx_commit(Txn, From, BQS),
+ case lookup_ch(ChPid) of
+ not_found ->
+ [];
+ C = #cr{unacked_messages = UAM} ->
+ Remaining = ordsets:to_list(ordsets:subtract(
+ ordsets:from_list(UAM),
+ ordsets:from_list(AckTags))),
+ store_ch_record(C#cr{unacked_messages = Remaining, txn = none})
+ end,
+ State#q{backing_queue_state = BQS1}.
-record_pending_message(Txn, ChPid, Message) ->
- Tx = #tx{pending_messages = Pending} = lookup_tx(Txn),
- record_current_channel_tx(ChPid, Txn),
- store_tx(Txn, Tx#tx{pending_messages = [Message | Pending],
- ch_pid = ChPid}).
+rollback_transaction(Txn, _ChPid, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
+ %% Iff we removed acktags from the channel record on ack+txn then
+ %% we would add them back in here (would also require ChPid)
+ State#q{backing_queue_state = BQS1}.
-record_pending_acks(Txn, ChPid, MsgIds) ->
- Tx = #tx{pending_acks = Pending} = lookup_tx(Txn),
- record_current_channel_tx(ChPid, Txn),
- store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending],
- ch_pid = ChPid}).
-
-commit_transaction(Txn, From, State = #q{backing_queue = BQ}) ->
- #tx{ch_pid = ChPid, pending_messages = PendingMessages,
- pending_acks = PendingAcks} = lookup_tx(Txn),
- PendingMessagesOrdered = lists:reverse(PendingMessages),
- PendingAcksOrdered = lists:append(PendingAcks),
- Acks =
- case lookup_ch(ChPid) of
- not_found ->
- [];
- C = #cr{unacked_messages = UAM} ->
- {MsgsWithAcks, Remaining} =
- collect_messages(PendingAcksOrdered, UAM),
- store_ch_record(C#cr{unacked_messages = Remaining}),
- [AckTag || {_Message, AckTag} <- MsgsWithAcks]
- end,
- {RunQueue, BQS} = BQ:tx_commit(PendingMessagesOrdered, Acks, From,
- State#q.backing_queue_state),
- erase_tx(Txn),
- {RunQueue, State#q{backing_queue_state = BQS}}.
-
-rollback_transaction(Txn, State = #q{backing_queue = BQ}) ->
- #tx{pending_messages = PendingMessages} = lookup_tx(Txn),
- BQS = BQ:tx_rollback(PendingMessages, State #q.backing_queue_state),
- erase_tx(Txn),
- State#q{backing_queue_state = BQS}.
-
-collect_messages(MsgIds, UAM) ->
- lists:mapfoldl(
- fun (MsgId, D) -> {dict:fetch(MsgId, D), dict:erase(MsgId, D)} end,
- UAM, MsgIds).
+collect_messages(AckTags, UAM) ->
+ AckTagsSet = ordsets:from_list(AckTags),
+ UAMSet = ordsets:from_list(UAM),
+ {ordsets:to_list(ordsets:intersection(AckTagsSet, UAMSet)),
+ ordsets:to_list(ordsets:subtract(UAMSet, AckTagsSet))}.
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -616,22 +519,15 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:len(BQS);
i(messages_unacknowledged, _) ->
- lists:sum([dict:size(UAM) ||
+ lists:sum([ordsets:size(UAM) ||
#cr{unacked_messages = UAM} <- all_ch_record()]);
-i(messages_uncommitted, _) ->
- lists:sum([length(Pending) ||
- #tx{pending_messages = Pending} <- all_tx_record()]);
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
- messages_unacknowledged,
- messages_uncommitted]]);
-i(acks_uncommitted, _) ->
- lists:sum([length(Pending) ||
- #tx{pending_acks = Pending} <- all_tx_record()]);
+ messages_unacknowledged]]);
i(consumers, State) ->
queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers);
i(transactions, _) ->
- length(all_tx_record());
+ length([ok || #cr{txn = Txn} <- all_ch_record(), Txn =/= none]);
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -685,12 +581,9 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
{Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
reply(Delivered, NewState);
-handle_call({commit, Txn}, From, State) ->
- {RunQueue, NewState} = commit_transaction(Txn, From, State),
- noreply(case RunQueue of
- true -> run_message_queue(NewState);
- false -> NewState
- end);
+handle_call({commit, Txn, ChPid}, From, State) ->
+ NewState = commit_transaction(Txn, From, ChPid, State),
+ noreply(run_message_queue(NewState));
handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
@@ -704,25 +597,19 @@ handle_call({notify_down, ChPid}, _From, State) ->
end;
handle_call({basic_get, ChPid, NoAck}, _From,
- State = #q{q = #amqqueue{name = QName}, next_msg_id = NextId,
+ State = #q{q = #amqqueue{name = QName},
backing_queue_state = BQS, backing_queue = BQ}) ->
- case BQ:fetch(BQS) of
- {empty, BQS1} -> reply(empty, State #q { backing_queue_state = BQS1 });
+ AckRequired = not NoAck,
+ case BQ:fetch(AckRequired, BQS) of
+ {empty, BQS1} -> reply(empty, State#q{backing_queue_state = BQS1});
{{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
- AckRequired = not(NoAck),
- BQS2 =
- case AckRequired of
- true ->
- C = #cr{unacked_messages = UAM} = ch_record(ChPid),
- NewUAM = dict:store(NextId, {Message, AckTag}, UAM),
- store_ch_record(C#cr{unacked_messages = NewUAM}),
- BQS1;
- false ->
- BQ:ack([AckTag], BQS1)
- end,
- Msg = {QName, self(), NextId, IsDelivered, Message},
- reply({ok, Remaining, Msg},
- State #q { next_msg_id = NextId + 1, backing_queue_state = BQS2 })
+ case AckRequired of
+ true -> C = #cr{unacked_messages = UAM} = ch_record(ChPid),
+ store_ch_record(C#cr{unacked_messages = [AckTag|UAM]});
+ false -> ok
+ end,
+ reply({ok, Remaining, {QName, self(), AckTag, IsDelivered, Message}},
+ State#q{backing_queue_state = BQS1})
end;
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
@@ -740,7 +627,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ok ->
C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag,
- ack_required = not(NoAck)},
+ ack_required = not NoAck},
store_ch_record(C#cr{consumer_count = ConsumerCount +1,
limiter_pid = LimiterPid}),
case ConsumerCount of
@@ -862,37 +749,36 @@ handle_cast({deliver, Txn, Message, ChPid}, State) ->
{_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
noreply(NewState);
-handle_cast({ack, Txn, MsgIds, ChPid}, State = #q{backing_queue = BQ}) ->
+handle_cast({ack, Txn, AckTags, ChPid}, State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
C = #cr{unacked_messages = UAM} ->
- case Txn of
- none ->
- {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM),
- BQS = BQ:ack([AckTag || {_Message, AckTag} <- MsgWithAcks],
- State #q.backing_queue_state),
- store_ch_record(C#cr{unacked_messages = Remaining}),
- noreply(State #q { backing_queue_state = BQS });
- _ ->
- record_pending_acks(Txn, ChPid, MsgIds),
- noreply(State)
- end
+ {AckTags1, Remaining} = collect_messages(AckTags, UAM),
+ {C1, BQS1} =
+ case Txn of
+ none -> {C#cr{unacked_messages = Remaining},
+ BQ:ack(AckTags1, BQS)};
+ _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags1, BQS)}
+ end,
+ store_ch_record(C1),
+ noreply(State #q { backing_queue_state = BQS1 })
end;
-handle_cast({rollback, Txn}, State) ->
- noreply(rollback_transaction(Txn, State));
+handle_cast({rollback, Txn, ChPid}, State) ->
+ noreply(rollback_transaction(Txn, ChPid, State));
-handle_cast({requeue, MsgIds, ChPid}, State) ->
+handle_cast({requeue, AckTags, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
[ChPid]),
noreply(State);
C = #cr{unacked_messages = UAM} ->
- {MsgWithAcks, NewUAM} = collect_messages(MsgIds, UAM),
- store_ch_record(C#cr{unacked_messages = NewUAM}),
- noreply(deliver_or_requeue_n(MsgWithAcks, State))
+ {AckTags1, Remaining} = collect_messages(AckTags, UAM),
+ store_ch_record(C#cr{unacked_messages = Remaining}),
+ noreply(requeue_and_run(AckTags1, State))
end;
handle_cast({unblock, ChPid}, State) ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index cc6fda55fb..8e7de95e1b 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -63,17 +63,20 @@ behaviour_info(callbacks) ->
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
- {publish_delivered, 2},
+ {publish_delivered, 3},
%% Produce the next message
- {fetch, 1},
+ {fetch, 2},
%% Acktags supplied are for messages which can now be forgotten
%% about
{ack, 2},
%% A publish, but in the context of a transaction.
- {tx_publish, 2},
+ {tx_publish, 3},
+
+ %% Acks, but in the context of a transaction.
+ {tx_ack, 3},
%% Undo anything which has been done by the tx_publish of the
%% indicated messages.
@@ -81,7 +84,7 @@ behaviour_info(callbacks) ->
%% Commit these publishes and acktags. The publishes you will
%% have previously seen in calls to tx_publish.
- {tx_commit, 4},
+ {tx_commit, 3},
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 9aeb4623f1..7d3cd7225d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -928,7 +928,7 @@ new_tx(State) ->
internal_commit(State = #ch{transaction_id = TxnKey,
tx_participants = Participants}) ->
case rabbit_amqqueue:commit_all(sets:to_list(Participants),
- TxnKey) of
+ TxnKey, self()) of
ok -> ok = notify_limiter(State#ch.limiter_pid,
State#ch.uncommitted_ack_q),
new_tx(State);
@@ -945,7 +945,7 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
queue:len(UAQ),
queue:len(UAMQ)]),
case rabbit_amqqueue:rollback_all(sets:to_list(Participants),
- TxnKey) of
+ TxnKey, self()) of
ok -> NewUAMQ = queue:join(UAQ, UAMQ),
new_tx(State#ch{unacked_message_q = NewUAMQ});
{error, Errors} -> rabbit_misc:protocol_error(
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 4ac4a16edc..74fa098039 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -338,10 +338,12 @@ read(Server, Guid, CState =
end.
contains(Server, Guid) -> gen_server2:call(Server, {contains, Guid}, infinity).
+remove(_Server, []) -> ok;
remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}).
+release(_Server, []) -> ok;
release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}).
sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}).
-sync(Server) -> gen_server2:pcast(Server, 8, sync). %% internal
+sync(Server) -> gen_server2:pcast(Server, 8, sync). %% internal
gc_done(Server, Reclaimed, Source, Destination) ->
gen_server2:pcast(Server, 8, {gc_done, Reclaimed, Source, Destination}).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index f678433913..d6ef0cb8b9 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -344,6 +344,8 @@ write_delivered(SeqId, State) ->
JournalHdl, <<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>),
maybe_flush_journal(add_to_journal(SeqId, del, State1)).
+write_acks([], State) ->
+ State;
write_acks(SeqIds, State) ->
{JournalHdl, State1} = get_journal_handle(State),
ok = file_handle_cache:append(
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c772271f01..4bef843596 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1362,7 +1362,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
Rem = Len - N,
{{#basic_message { is_persistent = IsPersistent },
IsDelivered, AckTagN, Rem}, VQM} =
- rabbit_variable_queue:fetch(VQN),
+ rabbit_variable_queue:fetch(true, VQN),
{VQM, [AckTagN | AckTagsAcc]}
end, {VQ, []}, lists:seq(1, Count)).
@@ -1399,7 +1399,7 @@ test_variable_queue_dynamic_duration_change() ->
VQ3 = test_variable_queue_dynamic_duration_change_f(Len1, VQ2),
{VQ4, AckTags} = variable_queue_fetch(Len1, false, false, Len1, VQ3),
VQ5 = rabbit_variable_queue:ack(AckTags, VQ4),
- {empty, VQ6} = rabbit_variable_queue:fetch(VQ5),
+ {empty, VQ6} = rabbit_variable_queue:fetch(true, VQ5),
%% just publish and fetch some persistent msgs, this hits the the
%% partial segment path in queue_index due to the period when
@@ -1408,7 +1408,7 @@ test_variable_queue_dynamic_duration_change() ->
{VQ8, AckTags1} = variable_queue_fetch(20, true, false, 20, VQ7),
VQ9 = rabbit_variable_queue:ack(AckTags1, VQ8),
VQ10 = rabbit_variable_queue:handle_pre_hibernate(VQ9),
- {empty, VQ11} = rabbit_variable_queue:fetch(VQ10),
+ {empty, VQ11} = rabbit_variable_queue:fetch(true, VQ10),
rabbit_variable_queue:terminate(VQ11),
@@ -1416,7 +1416,7 @@ test_variable_queue_dynamic_duration_change() ->
test_variable_queue_dynamic_duration_change_f(Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
- {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(VQ1),
+ {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
VQ3 = rabbit_variable_queue:ack([AckTag], VQ2),
receive
{duration, _, stop} ->
@@ -1475,7 +1475,7 @@ test_variable_queue_partial_segments_delta_thing() ->
HalfSegment + 1, VQ6),
VQ8 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ7),
%% should be empty now
- {empty, VQ9} = rabbit_variable_queue:fetch(VQ8),
+ {empty, VQ9} = rabbit_variable_queue:fetch(true, VQ8),
rabbit_variable_queue:terminate(VQ9),
passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 60e50800d4..90e1eb6c2a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,11 +31,11 @@
-module(rabbit_variable_queue).
--export([init/2, terminate/1, publish/2, publish_delivered/2,
- set_ram_duration_target/2, ram_duration/1,
- fetch/1, ack/2, len/1, is_empty/1, purge/1,
- delete_and_terminate/1, requeue/2, tx_publish/2, tx_rollback/2,
- tx_commit/4, sync_callback/1, handle_pre_hibernate/1, status/1]).
+-export([init/2, terminate/1, publish/2, publish_delivered/3,
+ set_ram_duration_target/2, ram_duration/1, fetch/2, ack/2, len/1,
+ is_empty/1, purge/1, delete_and_terminate/1, requeue/2, tx_publish/3,
+ tx_ack/3, tx_rollback/2, tx_commit/3, sync_callback/1,
+ handle_pre_hibernate/1, status/1]).
-export([start/1]).
@@ -160,13 +160,14 @@
msg_store_clients,
persistent_store,
persistent_count,
- transient_threshold
+ transient_threshold,
+ pending_ack
}).
-record(msg_status,
- { msg,
+ { seq_id,
guid,
- seq_id,
+ msg,
is_persistent,
is_delivered,
msg_on_disk,
@@ -179,6 +180,8 @@
end_seq_id %% note the end_seq_id is always >, not >=
}).
+-record(tx, {pending_messages, pending_acks}).
+
%% When we discover, on publish, that we should write some indices to
%% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of
%% betas that we must be due to write indices for before we do any
@@ -198,8 +201,7 @@
-type(bpqueue() :: any()).
-type(seq_id() :: non_neg_integer()).
--type(ack() :: {'ack_index_and_store', guid(), seq_id(), atom() | pid()}
- | 'ack_not_on_disk').
+-type(ack() :: {'ack', seq_id(), guid(), boolean()} | 'blank_ack').
-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
count :: non_neg_integer (),
@@ -234,9 +236,8 @@
}).
-spec(tx_commit_post_msg_store/5 ::
- (boolean(), [guid()], [ack()], {pid(), any()}, state()) ->
- {boolean(), state()}).
--spec(tx_commit_index/1 :: (state()) -> {boolean(), state()}).
+ (boolean(), [guid()], [ack()], {pid(), any()}, state()) -> state()).
+-spec(tx_commit_index/1 :: (state()) -> state()).
-include("rabbit_backing_queue_spec.hrl").
@@ -313,7 +314,8 @@ init(QueueName, IsDurable) ->
{rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), TRef}},
persistent_store = PersistentStore,
persistent_count = DeltaCount1,
- transient_threshold = NextSeqId
+ transient_threshold = NextSeqId,
+ pending_ack = dict:new()
},
maybe_deltas_to_betas(State).
@@ -327,157 +329,185 @@ terminate(State = #vqstate {
{persistent_count, PCount}],
State #vqstate { index_state = rabbit_queue_index:terminate(Terms, IndexState) }.
+%% the only difference between purge and delete is that delete also
+%% needs to delete everything that's been delivered and not ack'd.
+delete_and_terminate(State) ->
+ {_PurgeCount, State1 = #vqstate {
+ index_state = IndexState,
+ msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}},
+ persistent_store = PersistentStore,
+ transient_threshold = TransientThreshold }} =
+ purge(State),
+ %% flushing here is good because it deletes all full segments,
+ %% leaving only partial segments around.
+ IndexState1 = rabbit_queue_index:flush_journal(IndexState),
+ IndexState2 =
+ case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(
+ IndexState1) of
+ {N, N, IndexState3} ->
+ IndexState3;
+ {DeltaSeqId, NextSeqId, IndexState3} ->
+ {_DeleteCount, IndexState4} =
+ delete1(PersistentStore, TransientThreshold, NextSeqId, 0,
+ DeltaSeqId, IndexState3),
+ IndexState4
+ end,
+ IndexState5 = rabbit_queue_index:terminate_and_erase(IndexState2),
+ rabbit_msg_store:delete_client(PersistentStore, PRef),
+ rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef),
+ rabbit_msg_store:client_terminate(MSCStateP),
+ rabbit_msg_store:client_terminate(MSCStateT),
+ State1 #vqstate { index_state = IndexState5 }.
+
+purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len,
+ persistent_store = PersistentStore }) ->
+ {Q4Count, IndexState1} =
+ remove_queue_entries(PersistentStore, fun rabbit_misc:queue_fold/3,
+ Q4, IndexState),
+ {Len, State1} =
+ purge1(Q4Count, State #vqstate { index_state = IndexState1,
+ q4 = queue:new() }),
+ {Len, State1 #vqstate { len = 0, ram_msg_count = 0, ram_index_count = 0,
+ persistent_count = 0 }}.
+
publish(Msg, State) ->
State1 = limit_ram_index(State),
{_SeqId, State2} = publish(Msg, false, false, State1),
State2.
-publish_delivered(Msg = #basic_message { guid = Guid,
- is_persistent = IsPersistent },
+publish_delivered(false, _Msg, State = #vqstate { len = 0 }) ->
+ {blank_ack, State};
+publish_delivered(true, Msg = #basic_message { guid = Guid,
+ is_persistent = IsPersistent },
State = #vqstate { len = 0, index_state = IndexState,
next_seq_id = SeqId,
out_counter = OutCount,
in_counter = InCount,
msg_store_clients = MSCState,
persistent_store = PersistentStore,
- persistent_count = PCount }) ->
- State1 = State #vqstate { out_counter = OutCount + 1,
- in_counter = InCount + 1 },
+ persistent_count = PCount,
+ pending_ack = PA }) ->
MsgStatus = #msg_status {
msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent,
is_delivered = true, msg_on_disk = false, index_on_disk = false },
{MsgStatus1, MSCState1} = maybe_write_msg_to_disk(PersistentStore, false,
MsgStatus, MSCState),
- State2 = State1 #vqstate { msg_store_clients = MSCState1,
- persistent_count = PCount + case IsPersistent of
- true -> 1;
- false -> 0
- end },
- case MsgStatus1 #msg_status.msg_on_disk of
- true ->
- {#msg_status { index_on_disk = true }, IndexState1} =
- maybe_write_index_to_disk(false, MsgStatus1, IndexState),
- {{ack_index_and_store, Guid, SeqId,
- find_msg_store(IsPersistent, PersistentStore)},
- State2 #vqstate { index_state = IndexState1,
- next_seq_id = SeqId + 1 }};
- false ->
- {ack_not_on_disk, State2}
- end.
-
-set_ram_duration_target(
- DurationTarget, State = #vqstate { avg_egress_rate = AvgEgressRate,
- avg_ingress_rate = AvgIngressRate,
- target_ram_msg_count = TargetRamMsgCount
- }) ->
- Rate = AvgEgressRate + AvgIngressRate,
- TargetRamMsgCount1 =
- case DurationTarget of
- infinity -> undefined;
- undefined -> undefined;
- _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
- end,
- State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1,
- duration_target = DurationTarget },
- case TargetRamMsgCount1 == undefined orelse
- TargetRamMsgCount1 >= TargetRamMsgCount of
- true -> State1;
- false -> reduce_memory_use(State1)
- end.
-
-ram_duration(State = #vqstate { egress_rate = Egress,
- ingress_rate = Ingress,
- rate_timestamp = Timestamp,
- in_counter = InCount,
- out_counter = OutCount,
- ram_msg_count = RamMsgCount,
- duration_target = DurationTarget,
- ram_msg_count_prev = RamMsgCountPrev }) ->
- Now = now(),
- {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
- {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
-
- Duration = %% msgs / (msgs/sec) == sec
- case AvgEgressRate == 0 andalso AvgIngressRate == 0 of
- true -> infinity;
- false -> (RamMsgCountPrev + RamMsgCount) /
- (2 * (AvgEgressRate + AvgIngressRate))
- end,
-
- {Duration, set_ram_duration_target(
- DurationTarget,
- State #vqstate { egress_rate = Egress1,
- avg_egress_rate = AvgEgressRate,
- ingress_rate = Ingress1,
- avg_ingress_rate = AvgIngressRate,
- rate_timestamp = Now,
- ram_msg_count_prev = RamMsgCount,
- out_counter = 0, in_counter = 0 })}.
-
-fetch(State =
+ State1 = State #vqstate { msg_store_clients = MSCState1,
+ persistent_count = PCount + case IsPersistent of
+ true -> 1;
+ false -> 0
+ end,
+ next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1 },
+ AckTag = {ack, SeqId, Guid, IsPersistent},
+ {AckTag,
+ case MsgStatus1 #msg_status.msg_on_disk of
+ true ->
+ {#msg_status { index_on_disk = true }, IndexState1} =
+ maybe_write_index_to_disk(false, MsgStatus1, IndexState),
+ State1 #vqstate { index_state = IndexState1 };
+ false ->
+ State1 #vqstate { pending_ack =
+ dict:store(AckTag, MsgStatus1, PA) }
+ end}.
+
+fetch(AckRequired, State =
#vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount,
index_state = IndexState, len = Len,
- persistent_store = PersistentStore }) ->
+ persistent_store = PersistentStore, pending_ack = PA }) ->
case queue:out(Q4) of
{empty, _Q4} ->
- fetch_from_q3_or_delta(State);
- {{value, #msg_status {
+ fetch_from_q3_or_delta(AckRequired, State);
+ {{value, MsgStatus = #msg_status {
msg = Msg, guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent, is_delivered = IsDelivered,
msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }},
Q4a} ->
- {IndexState1, IsPersistent} =
- case IndexOnDisk of
- true ->
- IndexState2 =
- case IsDelivered of
- false -> rabbit_queue_index:write_delivered(
- SeqId, IndexState);
- true -> IndexState
- end,
- {case IsPersistent of
- true -> IndexState2;
- false -> rabbit_queue_index:write_acks(
- [SeqId], IndexState2)
- end, IsPersistent};
- false -> %% If index isn't on disk, we can't be persistent
- {IndexState, false}
- end,
+
+ AckTag = case AckRequired of
+ true -> {ack, SeqId, Guid, IsPersistent};
+ false -> blank_ack
+ end,
+
+ %% 1. Mark it delivered if necessary
+ IndexState1 = case IndexOnDisk andalso not IsDelivered of
+ true -> rabbit_queue_index:write_delivered(
+ SeqId, IndexState);
+ false -> IndexState
+ end,
+
+ %% 2. If it's on disk and there's no Ack required, remove it
MsgStore = find_msg_store(IsPersistent, PersistentStore),
- AckTag =
- case IsPersistent of
- true -> true = MsgOnDisk, %% ASSERTION
- {ack_index_and_store, Guid, SeqId, MsgStore};
- false -> ok = case MsgOnDisk of
- true ->
- rabbit_msg_store:remove(
- MsgStore, [Guid]);
- false -> ok
- end,
- ack_not_on_disk
+ IndexState2 =
+ case MsgOnDisk andalso not AckRequired of
+ true -> %% Remove from disk now
+ case IndexOnDisk of
+ true ->
+ ok = rabbit_msg_store:remove(MsgStore, [Guid]),
+ rabbit_queue_index:write_acks([SeqId],
+ IndexState1);
+ false ->
+ ok = case MsgOnDisk of
+ true -> rabbit_msg_store:remove(
+ MsgStore, [Guid]);
+ false -> ok
+ end,
+ IndexState1
+ end;
+ false ->
+ IndexState1
+ end,
+
+ %% 3. If it's on disk, not persistent and an ack's
+ %% required then remove it from the queue index only.
+ IndexState3 =
+ case IndexOnDisk andalso AckRequired andalso not IsPersistent of
+ true -> rabbit_queue_index:write_acks([SeqId], IndexState2);
+ false -> IndexState2
end,
+
+ %% 4. If it's not on disk and we need an Ack, add it to PA
+ PA1 = case AckRequired andalso not MsgOnDisk of
+ true -> dict:store(AckTag, MsgStatus #msg_status {
+ is_delivered = true }, PA);
+ false -> PA
+ end,
+
Len1 = Len - 1,
{{Msg, IsDelivered, AckTag, Len1},
State #vqstate { q4 = Q4a, out_counter = OutCount + 1,
ram_msg_count = RamMsgCount - 1,
- index_state = IndexState1, len = Len1 }}
+ index_state = IndexState3, len = Len1,
+ pending_ack = PA1 }}
end.
ack([], State) ->
State;
ack(AckTags, State = #vqstate { index_state = IndexState,
persistent_count = PCount,
- persistent_store = PersistentStore }) ->
- {GuidsByStore, SeqIds} =
+ persistent_store = PersistentStore,
+ pending_ack = PA }) ->
+ {GuidsByStore, SeqIds, PA1} =
lists:foldl(
- fun (ack_not_on_disk, Acc) -> Acc;
- ({ack_index_and_store, Guid, SeqId, MsgStore}, {Dict, SeqIds}) ->
- {rabbit_misc:dict_cons(MsgStore, Guid, Dict), [SeqId | SeqIds]}
- end, {dict:new(), []}, AckTags),
- IndexState1 = case SeqIds of
- [] -> IndexState;
- _ -> rabbit_queue_index:write_acks(SeqIds, IndexState)
- end,
+ fun (blank_ack, Acc) -> Acc;
+ ({ack, SeqId, Guid, true}, {Dict, SeqIds, PAN}) ->
+ {rabbit_misc:dict_cons(PersistentStore, Guid, Dict),
+ [SeqId | SeqIds], PAN};
+ ({ack, _SeqId, Guid, false} = AckTag, {Dict, SeqIds, PAN}) ->
+ case dict:find(AckTag, PAN) of
+ error ->
+ %% must be in the transient store and won't
+ %% be in the queue index.
+ {rabbit_misc:dict_cons(
+ ?TRANSIENT_MSG_STORE, Guid, Dict), SeqIds, PAN};
+ {ok, #msg_status { index_on_disk = false, %% ASSERTIONS
+ msg_on_disk = false,
+ is_persistent = false }} ->
+ {Dict, SeqIds, dict:erase(AckTag, PAN)}
+ end
+ end, {dict:new(), [], PA}, AckTags),
+ IndexState1 = rabbit_queue_index:write_acks(SeqIds, IndexState),
ok = dict:fold(fun (MsgStore, Guids, ok) ->
rabbit_msg_store:remove(MsgStore, Guids)
end, ok, GuidsByStore),
@@ -485,90 +515,104 @@ ack(AckTags, State = #vqstate { index_state = IndexState,
error -> 0;
{ok, Guids} -> length(Guids)
end,
- State #vqstate { index_state = IndexState1, persistent_count = PCount1 }.
+ State #vqstate { index_state = IndexState1, persistent_count = PCount1,
+ pending_ack = PA1 }.
-len(#vqstate { len = Len }) ->
- Len.
+tx_publish(Txn,
+ Msg = #basic_message { is_persistent = true, guid = Guid },
+ State = #vqstate { msg_store_clients = MSCState,
+ persistent_store = PersistentStore }) ->
+ MsgStatus = #msg_status {
+ msg = Msg, guid = Guid, seq_id = undefined, is_persistent = true,
+ is_delivered = false, msg_on_disk = false, index_on_disk = false },
+ {#msg_status { msg_on_disk = true }, MSCState1} =
+ maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState),
+ publish_in_tx(Txn, Msg),
+ State #vqstate { msg_store_clients = MSCState1 };
+tx_publish(Txn, Msg, State) ->
+ publish_in_tx(Txn, Msg),
+ State.
-is_empty(State) ->
- 0 == len(State).
+tx_ack(Txn, AckTags, State) ->
+ ack_in_tx(Txn, AckTags),
+ State.
-purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len,
- persistent_store = PersistentStore }) ->
- {Q4Count, IndexState1} =
- remove_queue_entries(PersistentStore, fun rabbit_misc:queue_fold/3,
- Q4, IndexState),
- {Len, State1} =
- purge1(Q4Count, State #vqstate { index_state = IndexState1,
- q4 = queue:new() }),
- {Len, State1 #vqstate { len = 0, ram_msg_count = 0, ram_index_count = 0,
- persistent_count = 0 }}.
+tx_rollback(Txn, State = #vqstate { persistent_store = PersistentStore }) ->
+ #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
+ erase_tx(Txn),
+ ok = rabbit_msg_store:remove(PersistentStore, persistent_guids(Pubs)),
+ {lists:flatten(AckTags), State}.
-%% the only difference between purge and delete is that delete also
-%% needs to delete everything that's been delivered and not ack'd.
-delete_and_terminate(State) ->
- {_PurgeCount, State1 = #vqstate {
- index_state = IndexState,
- msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}},
- persistent_store = PersistentStore,
- transient_threshold = TransientThreshold }} =
- purge(State),
- %% flushing here is good because it deletes all full segments,
- %% leaving only partial segments around.
- IndexState1 = rabbit_queue_index:flush_journal(IndexState),
- IndexState2 =
- case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(
- IndexState1) of
- {N, N, IndexState3} ->
- IndexState3;
- {DeltaSeqId, NextSeqId, IndexState3} ->
- {_DeleteCount, IndexState4} =
- delete1(PersistentStore, TransientThreshold, NextSeqId, 0,
- DeltaSeqId, IndexState3),
- IndexState4
- end,
- IndexState5 = rabbit_queue_index:terminate_and_erase(IndexState2),
- rabbit_msg_store:delete_client(PersistentStore, PRef),
- rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef),
- rabbit_msg_store:client_terminate(MSCStateP),
- rabbit_msg_store:client_terminate(MSCStateT),
- State1 #vqstate { index_state = IndexState5 }.
+tx_commit(Txn, From, State = #vqstate { persistent_store = PersistentStore }) ->
+ %% If we are a non-durable queue, or we have no persistent pubs,
+ %% we can skip the msg_store loop.
+ #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
+ erase_tx(Txn),
+ PubsOrdered = lists:reverse(Pubs),
+ AckTags1 = lists:flatten(AckTags),
+ PersistentGuids = persistent_guids(PubsOrdered),
+ IsTransientPubs = [] == PersistentGuids,
+ {AckTags1,
+ case IsTransientPubs orelse
+ ?TRANSIENT_MSG_STORE == PersistentStore of
+ true ->
+ tx_commit_post_msg_store(
+ IsTransientPubs, PubsOrdered, AckTags1, From, State);
+ false ->
+ Self = self(),
+ ok =
+ rabbit_msg_store:sync(
+ ?PERSISTENT_MSG_STORE, PersistentGuids,
+ fun () ->
+ ok =
+ rabbit_amqqueue:maybe_run_queue_via_backing_queue(
+ Self,
+ fun (StateN) -> tx_commit_post_msg_store(
+ IsTransientPubs, PubsOrdered,
+ AckTags1, From, StateN)
+ end)
+ end),
+ State
+ end}.
-%% [{Msg, AckTag}]
-%% We guarantee that after fetch, only persistent msgs are left on
-%% disk. This means that in a requeue, we set MsgOnDisk to true, thus
-%% avoiding calls to msg_store:write for persistent msgs. It also
-%% means that we don't need to worry about calling msg_store:remove
-%% (as ack would do) because transient msgs won't be on disk anyway,
-%% thus they won't need to be removed. However, we do call
-%% msg_store:release so that the cache isn't held full of msgs which
-%% are now at the tail of the queue.
-requeue(MsgsWithAckTags, State) ->
+requeue(AckTags, State = #vqstate { persistent_store = PersistentStore,
+ pending_ack = PA }) ->
{SeqIds, GuidsByStore,
State1 = #vqstate { index_state = IndexState,
- persistent_count = PCount,
- persistent_store = PersistentStore }} =
+ persistent_count = PCount }} =
lists:foldl(
- fun ({Msg = #basic_message { guid = Guid }, AckTag},
- {SeqIdsAcc, Dict, StateN}) ->
- {SeqIdsAcc1, Dict1, MsgOnDisk} =
- case AckTag of
- ack_not_on_disk ->
- {SeqIdsAcc, Dict, false};
- {ack_index_and_store, Guid, SeqId, MsgStore} ->
- {[SeqId | SeqIdsAcc],
- rabbit_misc:dict_cons(MsgStore, Guid, Dict),
- true}
- end,
- {_SeqId, StateN1} =
- publish(Msg, true, MsgOnDisk, StateN),
- {SeqIdsAcc1, Dict1, StateN1}
- end, {[], dict:new(), State}, MsgsWithAckTags),
- IndexState1 =
- case SeqIds of
- [] -> IndexState;
- _ -> rabbit_queue_index:write_acks(SeqIds, IndexState)
- end,
+ fun ({ack, SeqId, Guid, IsPersistent} = AckTag,
+ {SeqIdsAcc, Dict, StateN = #vqstate {
+ msg_store_clients = MSCStateN }}) ->
+ case dict:find(AckTag, PA) of
+ error ->
+ {{ok, Msg = #basic_message{}}, MSCStateN1} =
+ read_from_msg_store(PersistentStore, MSCStateN,
+ IsPersistent, Guid),
+ StateN1 = StateN #vqstate {
+ msg_store_clients = MSCStateN1 },
+ {_SeqId, StateN2} = publish(Msg, true, true, StateN1),
+ {SeqIdsAcc1, MsgStore} =
+ case IsPersistent of
+ true ->
+ {[SeqId | SeqIdsAcc], PersistentStore};
+ false ->
+ {SeqIdsAcc, ?TRANSIENT_MSG_STORE}
+ end,
+ {SeqIdsAcc1,
+ rabbit_misc:dict_cons(MsgStore, Guid, Dict),
+ StateN2};
+ {ok, #msg_status { index_on_disk = false,
+ msg_on_disk = false,
+ is_persistent = false,
+ msg = Msg }} ->
+ {_SeqId, StateN1} = publish(Msg, true, false, StateN),
+ {SeqIdsAcc, Dict,
+ StateN1 #vqstate {
+ pending_ack = dict:erase(AckTag, PA) }}
+ end
+ end, {[], dict:new(), State}, AckTags),
+ IndexState1 = rabbit_queue_index:write_acks(SeqIds, IndexState),
ok = dict:fold(fun (MsgStore, Guids, ok) ->
rabbit_msg_store:release(MsgStore, Guids)
end, ok, GuidsByStore),
@@ -579,98 +623,60 @@ requeue(MsgsWithAckTags, State) ->
State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1 }.
-tx_publish(Msg = #basic_message { is_persistent = true, guid = Guid },
- State = #vqstate { msg_store_clients = MSCState,
- persistent_store = PersistentStore }) ->
- MsgStatus = #msg_status {
- msg = Msg, guid = Guid, seq_id = undefined, is_persistent = true,
- is_delivered = false, msg_on_disk = false, index_on_disk = false },
- {#msg_status { msg_on_disk = true }, MSCState1} =
- maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState),
- State #vqstate { msg_store_clients = MSCState1 };
-tx_publish(_Msg, State) ->
- State.
+len(#vqstate { len = Len }) ->
+ Len.
-tx_rollback(Pubs, State = #vqstate { persistent_store = PersistentStore }) ->
- ok = case persistent_guids(Pubs) of
- [] -> ok;
- PP -> rabbit_msg_store:remove(PersistentStore, PP)
- end,
- State.
+is_empty(State) ->
+ 0 == len(State).
-tx_commit(Pubs, AckTags, From, State =
- #vqstate { persistent_store = PersistentStore }) ->
- %% If we are a non-durable queue, or we have no persistent pubs,
- %% we can skip the msg_store loop.
- PersistentGuids = persistent_guids(Pubs),
- IsTransientPubs = [] == PersistentGuids,
- case IsTransientPubs orelse
- ?TRANSIENT_MSG_STORE == PersistentStore of
- true ->
- tx_commit_post_msg_store(
- IsTransientPubs, Pubs, AckTags, From, State);
- false ->
- Self = self(),
- ok = rabbit_msg_store:sync(
- ?PERSISTENT_MSG_STORE, PersistentGuids,
- fun () -> ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self,
- fun (StateN) -> tx_commit_post_msg_store(
- IsTransientPubs, Pubs,
- AckTags, From, StateN)
- end)
- end),
- {false, State}
+set_ram_duration_target(
+ DurationTarget, State = #vqstate { avg_egress_rate = AvgEgressRate,
+ avg_ingress_rate = AvgIngressRate,
+ target_ram_msg_count = TargetRamMsgCount
+ }) ->
+ Rate = AvgEgressRate + AvgIngressRate,
+ TargetRamMsgCount1 =
+ case DurationTarget of
+ infinity -> undefined;
+ undefined -> undefined;
+ _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
+ end,
+ State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1,
+ duration_target = DurationTarget },
+ case TargetRamMsgCount1 == undefined orelse
+ TargetRamMsgCount1 >= TargetRamMsgCount of
+ true -> State1;
+ false -> reduce_memory_use(State1)
end.
-tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, From, State =
- #vqstate { on_sync = OnSync = {SAcks, SPubs, SFroms},
- persistent_store = PersistentStore }) ->
- %% If we are a non-durable queue, or (no persisent pubs, and no
- %% persistent acks) then we can skip the queue_index loop.
- DiskAcks =
- lists:filter(fun (AckTag) -> AckTag /= ack_not_on_disk end, AckTags),
- case PersistentStore == ?TRANSIENT_MSG_STORE orelse
- (IsTransientPubs andalso [] == DiskAcks) of
- true -> {Res, State1} =
- tx_commit_index(State #vqstate {
- on_sync = {[], [Pubs], [From]} }),
- {Res, State1 #vqstate { on_sync = OnSync }};
- false -> {false, State #vqstate { on_sync = { [DiskAcks | SAcks],
- [Pubs | SPubs],
- [From | SFroms] }}}
- end.
+ram_duration(State = #vqstate { egress_rate = Egress,
+ ingress_rate = Ingress,
+ rate_timestamp = Timestamp,
+ in_counter = InCount,
+ out_counter = OutCount,
+ ram_msg_count = RamMsgCount,
+ duration_target = DurationTarget,
+ ram_msg_count_prev = RamMsgCountPrev }) ->
+ Now = now(),
+ {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
+ {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
-tx_commit_index(State = #vqstate { on_sync = {_, _, []} }) ->
- {false, State};
-tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms},
- persistent_store = PersistentStore }) ->
- Acks = lists:flatten(SAcks),
- State1 = ack(Acks, State),
- AckSeqIds = lists:foldl(fun ({ack_index_and_store, _Guid,
- SeqId, ?PERSISTENT_MSG_STORE}, SeqIdsAcc) ->
- [SeqId | SeqIdsAcc];
- (_, SeqIdsAcc) ->
- SeqIdsAcc
- end, [], Acks),
- IsPersistentStore = ?PERSISTENT_MSG_STORE == PersistentStore,
- Pubs = lists:flatten(lists:reverse(SPubs)),
- {SeqIds, State2 = #vqstate { index_state = IndexState }} =
- lists:foldl(
- fun (Msg = #basic_message { is_persistent = IsPersistent },
- {SeqIdsAcc, StateN}) ->
- {SeqId, StateN1} =
- publish(Msg, false, IsPersistent, StateN),
- {case IsPersistentStore andalso IsPersistent of
- true -> [SeqId | SeqIdsAcc];
- false -> SeqIdsAcc
- end, StateN1}
- end, {AckSeqIds, State1}, Pubs),
- IndexState1 =
- rabbit_queue_index:sync_seq_ids(SeqIds, IndexState),
- [ gen_server2:reply(From, ok) || From <- lists:reverse(SFroms) ],
- {Pubs /= [],
- State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }}.
+ Duration = %% msgs / (msgs/sec) == sec
+ case AvgEgressRate == 0 andalso AvgIngressRate == 0 of
+ true -> infinity;
+ false -> (RamMsgCountPrev + RamMsgCount) /
+ (2 * (AvgEgressRate + AvgIngressRate))
+ end,
+
+ {Duration, set_ram_duration_target(
+ DurationTarget,
+ State #vqstate { egress_rate = Egress1,
+ avg_egress_rate = AvgEgressRate,
+ ingress_rate = Ingress1,
+ avg_ingress_rate = AvgIngressRate,
+ rate_timestamp = Now,
+ ram_msg_count_prev = RamMsgCount,
+ out_counter = 0, in_counter = 0 })}.
sync_callback(#vqstate { on_sync = {_, _, []} }) -> undefined;
sync_callback(_) -> fun tx_commit_index/1.
@@ -705,6 +711,27 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
%% Minor helpers
%%----------------------------------------------------------------------------
+lookup_tx(Txn) ->
+ case get({txn, Txn}) of
+ undefined -> #tx { pending_messages = [],
+ pending_acks = [] };
+ V -> V
+ end.
+
+store_tx(Txn, Tx) ->
+ put({txn, Txn}, Tx).
+
+erase_tx(Txn) ->
+ erase({txn, Txn}).
+
+publish_in_tx(Txn, Msg) ->
+ Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
+ store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }).
+
+ack_in_tx(Txn, AckTags) ->
+ Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
+ store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }).
+
update_rate(Now, Then, Count, {OThen, OCount}) ->
%% form the avg over the current period and the previous
Avg = 1000000 * ((Count + OCount) / timer:now_diff(Now, OThen)),
@@ -712,7 +739,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
persistent_guids(Pubs) ->
[Guid || Obj = #basic_message { guid = Guid } <- Pubs,
- Obj #basic_message.is_persistent].
+ Obj #basic_message.is_persistent].
betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) ->
{Filtered, IndexState1} =
@@ -805,6 +832,50 @@ should_force_index_to_disk(State =
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
+tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, From, State =
+ #vqstate { on_sync = OnSync = {SAcks, SPubs, SFroms},
+ persistent_store = PersistentStore }) ->
+ %% If we are a non-durable queue, or (no persisent pubs, and no
+ %% persistent acks) then we can skip the queue_index loop.
+ case PersistentStore == ?TRANSIENT_MSG_STORE orelse
+ (IsTransientPubs andalso [] == AckTags) of %%% AGH FIX ME
+ true -> State1 = tx_commit_index(State #vqstate {
+ on_sync = {[], [Pubs], [From]} }),
+ State1 #vqstate { on_sync = OnSync };
+ false -> State #vqstate { on_sync = { [AckTags | SAcks],
+ [Pubs | SPubs],
+ [From | SFroms] }}
+ end.
+
+tx_commit_index(State = #vqstate { on_sync = {_, _, []} }) ->
+ State;
+tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms},
+ persistent_store = PersistentStore }) ->
+ Acks = lists:flatten(SAcks),
+ State1 = ack(Acks, State),
+ AckSeqIds = lists:foldl(fun ({ack, SeqId, _Guid, true}, SeqIdsAcc) ->
+ [SeqId | SeqIdsAcc];
+ (_, SeqIdsAcc) ->
+ SeqIdsAcc
+ end, [], Acks),
+ IsPersistentStore = ?PERSISTENT_MSG_STORE == PersistentStore,
+ Pubs = lists:flatten(lists:reverse(SPubs)),
+ {SeqIds, State2 = #vqstate { index_state = IndexState }} =
+ lists:foldl(
+ fun (Msg = #basic_message { is_persistent = IsPersistent },
+ {SeqIdsAcc, StateN}) ->
+ {SeqId, StateN1} =
+ publish(Msg, false, IsPersistent, StateN),
+ {case IsPersistentStore andalso IsPersistent of
+ true -> [SeqId | SeqIdsAcc];
+ false -> SeqIdsAcc
+ end, StateN1}
+ end, {AckSeqIds, State1}, Pubs),
+ IndexState1 =
+ rabbit_queue_index:sync_seq_ids(SeqIds, IndexState),
+ [ gen_server2:reply(From, ok) || From <- lists:reverse(SFroms) ],
+ State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }.
+
delete1(_PersistentStore, _TransientThreshold, NextSeqId, Count, DeltaSeqId,
IndexState) when DeltaSeqId >= NextSeqId ->
{Count, IndexState};
@@ -887,7 +958,8 @@ remove_queue_entries1(
end,
{PersistentStore, CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}.
-fetch_from_q3_or_delta(State = #vqstate {
+fetch_from_q3_or_delta(AckRequired,
+ State = #vqstate {
q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount },
q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount,
@@ -933,7 +1005,7 @@ fetch_from_q3_or_delta(State = #vqstate {
%% delta and q3 are maintained
State1
end,
- fetch(State2)
+ fetch(AckRequired, State2)
end.
reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount,