summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl75
1 files changed, 43 insertions, 32 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d399e82010..00ffef2044 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1014,10 +1014,11 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
end,
Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
IndexState2 =
- case {AckRequired, MsgOnDisk, IndexOnDisk} of
- {false, true, false} -> Rem(), IndexState1;
- {false, true, true} -> Rem(), Ack();
- _ -> IndexState1
+ case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent, IsDelivered} of
+ {false, true, false, _, _} -> Rem(), IndexState1;
+ {false, true, true, _, _} -> Rem(), Ack();
+ { true, true, true, false, false} -> Ack();
+ _ -> IndexState1
end,
%% 3. If an ack is required, add something sensible to PA
@@ -1187,7 +1188,7 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {MsgIdsByStore, _AllMsgIds} =
+ {PersistentSeqIds, MsgIdsByStore, _AllMsgIds} =
dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
@@ -1199,7 +1200,7 @@ remove_pending_ack(KeepPersistent,
State1
end;
false -> IndexState1 =
- rabbit_queue_index:ack(dict:fetch_keys(PA), IndexState),
+ rabbit_queue_index:ack(PersistentSeqIds, IndexState),
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
State1 #vqstate { index_state = IndexState1 }
@@ -1208,7 +1209,7 @@ remove_pending_ack(KeepPersistent,
ack(_MsgStoreFun, _Fun, [], State) ->
{[], State};
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{MsgIdsByStore, AllMsgIds},
+ {{PersistentSeqIds, MsgIdsByStore, AllMsgIds},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1223,7 +1224,7 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
ram_ack_index =
gb_trees:delete_any(SeqId, RAI)})}
end, {accumulate_ack_init(), State}, AckTags),
- IndexState1 = rabbit_queue_index:ack(AckTags, IndexState),
+ IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState),
[ok = MsgStoreFun(MSCState, IsPersistent, MsgIds)
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
@@ -1233,17 +1234,18 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) }}.
-accumulate_ack_init() -> {orddict:new(), []}.
+accumulate_ack_init() -> {[], orddict:new(), []}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false,
msg_id = MsgId },
- {MsgIdsByStore, AllMsgIds}) ->
- {MsgIdsByStore, [MsgId | AllMsgIds]};
-accumulate_ack(_SeqId, {IsPersistent, MsgId, _MsgProps, _IndexOnDisk},
- {MsgIdsByStore, AllMsgIds}) ->
- {rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore),
+ {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
+ {PersistentSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]};
+accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps, _IndexOnDisk},
+ {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
+ {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
+ rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore),
[MsgId | AllMsgIds]}.
find_persistent_count(LensByStore) ->
@@ -1348,29 +1350,34 @@ requeue_single(AckTag, MsgPropsFun, #vqstate { pending_ack = PA,
publish_r(MsgStatus = #msg_status { seq_id = SeqId,
msg = Msg,
- index_on_disk = IndexOnDisk },
- State = #vqstate { q3 = Q3,
- q4 = Q4,
+ index_on_disk = IndexOnDisk,
+ msg_on_disk = MsgOnDisk },
+ State = #vqstate { q4 = Q4,
delta = Delta,
len = Len,
in_counter = InCounter,
ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount }) ->
(case pick_store(SeqId, State) of
- q4 -> case Msg of
- undefined ->
- {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
- read_msg(MsgStatus, State),
- State1 #vqstate { q4 = gb_trees:insert(
- SeqId, MsgStatus1, Q4a)};
- #basic_message{} ->
- State #vqstate { q4 = gb_trees:insert(SeqId, MsgStatus,
- Q4),
- ram_msg_count = RamMsgCount + 1 }
- end;
- q3 -> State #vqstate {
- q3 = q3tree:in_r(IndexOnDisk, MsgStatus, Q3),
- ram_index_count = RamIndexCount + one_if(not IndexOnDisk),
+ q4 -> case Msg of
+ undefined ->
+ {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
+ read_msg(MsgStatus, State),
+ State1 #vqstate { q4 = gb_trees:insert(
+ SeqId, MsgStatus1, Q4a)};
+ #basic_message{} ->
+ State #vqstate { q4 = gb_trees:insert(SeqId, MsgStatus, Q4),
+ ram_msg_count = RamMsgCount + 1 }
+ end;
+ q3 -> %% make sure index is on disk
+ {#msg_status { index_on_disk = IndexOnDisk1 } = MsgStatus1,
+ #vqstate { q3 = Q3 } = State1} =
+ maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State),
+ State1 #vqstate {
+ q3 = q3tree:in_r(IndexOnDisk1,
+ MsgStatus1,
+ Q3),
+ ram_index_count = RamIndexCount + one_if(not IndexOnDisk1),
ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) };
delta -> #delta { start_seq_id = StartSeqId,
@@ -1379,7 +1386,11 @@ publish_r(MsgStatus = #msg_status { seq_id = SeqId,
Delta1 = Delta #delta { start_seq_id = min(SeqId, StartSeqId),
count = Count + 1,
end_seq_id = max(SeqId + 1, EndSeqId)},
- State #vqstate { delta = Delta1 }
+ %% make sure the index and msg are on disk
+ {_MsgStatus, State1} = maybe_write_to_disk(
+ not MsgOnDisk, not IndexOnDisk,
+ MsgStatus, State),
+ State1 #vqstate { delta = Delta1 }
end) #vqstate { len = Len + 1,
in_counter = InCounter + 1 }.