summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-16 14:02:22 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-16 14:02:22 +0100
commitc0ba7b2d281834605eb60c67b5cfa97d6c24b759 (patch)
treef4d4ec6223e83a8c8440099568f6ab90a4ed0881
parent67b986d695826fe3d906f95e0b886a26790c75c2 (diff)
downloadrabbitmq-server-git-c0ba7b2d281834605eb60c67b5cfa97d6c24b759.tar.gz
Changes to the way we track messages that are pending acks to ensure that on queue deletion we remove everything from the message stores that shouldn't be there
-rw-r--r--src/rabbit_variable_queue.erl166
1 files changed, 107 insertions, 59 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index aa1589a6d0..7cebf2b1bf 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -201,7 +201,7 @@
-type(bpqueue() :: any()).
-type(seq_id() :: non_neg_integer()).
--type(ack() :: {'ack', seq_id(), guid(), boolean()} | 'blank_ack').
+-type(ack() :: {'ack', seq_id()} | 'blank_ack').
-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
count :: non_neg_integer (),
@@ -229,7 +229,7 @@
rate_timestamp :: {integer(), integer(), integer()},
len :: non_neg_integer(),
on_sync :: {[[ack()]], [[guid()]], [{pid(), any()}]},
- msg_store_clients :: {{any(), binary()}, {any(), binary()}},
+ msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}},
persistent_store :: pid() | atom(),
persistent_count :: non_neg_integer(),
transient_threshold :: non_neg_integer()
@@ -323,23 +323,25 @@ terminate(State) ->
State1 = #vqstate {
persistent_count = PCount, index_state = IndexState,
msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}} } =
- tx_commit_index(State),
+ remove_pending_ack(true, tx_commit_index(State)),
rabbit_msg_store:client_terminate(MSCStateP),
rabbit_msg_store:client_terminate(MSCStateT),
Terms = [{persistent_ref, PRef}, {transient_ref, TRef},
{persistent_count, PCount}],
State1 #vqstate { index_state =
- rabbit_queue_index:terminate(Terms, IndexState) }.
+ rabbit_queue_index:terminate(Terms, IndexState),
+ msg_store_clients = undefined }.
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
delete_and_terminate(State) ->
- {_PurgeCount, State1 = #vqstate {
- index_state = IndexState,
- msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}},
- persistent_store = PersistentStore,
- transient_threshold = TransientThreshold }} =
- purge(State),
+ {_PurgeCount, State1} = purge(State),
+ State2 = #vqstate {
+ index_state = IndexState,
+ msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}},
+ persistent_store = PersistentStore,
+ transient_threshold = TransientThreshold } =
+ remove_pending_ack(false, State1),
%% flushing here is good because it deletes all full segments,
%% leaving only partial segments around.
IndexState1 = rabbit_queue_index:flush_journal(IndexState),
@@ -359,7 +361,8 @@ delete_and_terminate(State) ->
rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef),
rabbit_msg_store:client_terminate(MSCStateP),
rabbit_msg_store:client_terminate(MSCStateT),
- State1 #vqstate { index_state = IndexState5 }.
+ State2 #vqstate { index_state = IndexState5,
+ msg_store_clients = undefined }.
purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len,
persistent_store = PersistentStore }) ->
@@ -402,16 +405,16 @@ publish_delivered(true, Msg = #basic_message { guid = Guid,
next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1 },
- AckTag = {ack, SeqId, Guid, IsPersistent},
- {AckTag,
+ {{ack, SeqId},
case MsgStatus1 #msg_status.msg_on_disk of
true ->
{#msg_status { index_on_disk = true }, IndexState1} =
maybe_write_index_to_disk(false, MsgStatus1, IndexState),
- State1 #vqstate { index_state = IndexState1 };
+ State1 #vqstate { index_state = IndexState1,
+ pending_ack = dict:store(SeqId, {true, Guid},
+ PA) };
false ->
- State1 #vqstate { pending_ack =
- dict:store(AckTag, MsgStatus1, PA) }
+ State1 #vqstate { pending_ack = dict:store(SeqId, MsgStatus1, PA) }
end}.
fetch(AckRequired, State =
@@ -428,7 +431,7 @@ fetch(AckRequired, State =
Q4a} ->
AckTag = case AckRequired of
- true -> {ack, SeqId, Guid, IsPersistent};
+ true -> {ack, SeqId};
false -> blank_ack
end,
@@ -444,17 +447,17 @@ fetch(AckRequired, State =
IndexState2 =
case MsgOnDisk andalso not AckRequired of
true -> %% Remove from disk now
+ ok = case MsgOnDisk of
+ true ->
+ rabbit_msg_store:remove(MsgStore, [Guid]);
+ false ->
+ ok
+ end,
case IndexOnDisk of
true ->
- ok = rabbit_msg_store:remove(MsgStore, [Guid]),
rabbit_queue_index:write_acks([SeqId],
IndexState1);
false ->
- ok = case MsgOnDisk of
- true -> rabbit_msg_store:remove(
- MsgStore, [Guid]);
- false -> ok
- end,
IndexState1
end;
false ->
@@ -469,10 +472,16 @@ fetch(AckRequired, State =
false -> IndexState2
end,
- %% 4. If it's not on disk and we need an Ack, add it to PA
- PA1 = case AckRequired andalso not MsgOnDisk of
- true -> dict:store(AckTag, MsgStatus #msg_status {
- is_delivered = true }, PA);
+ %% 4. If an ack is required, add something sensible to PA
+ PA1 = case AckRequired of
+ true ->
+ Entry =
+ case MsgOnDisk of
+ true -> {IsPersistent, Guid};
+ false -> MsgStatus #msg_status {
+ is_delivered = true }
+ end,
+ dict:store(SeqId, Entry, PA);
false -> PA
end,
@@ -492,21 +501,19 @@ ack(AckTags, State = #vqstate { index_state = IndexState,
pending_ack = PA }) ->
{GuidsByStore, SeqIds, PA1} =
lists:foldl(
- fun (blank_ack, Acc) -> Acc;
- ({ack, SeqId, Guid, true}, {Dict, SeqIds, PAN}) ->
- {rabbit_misc:dict_cons(PersistentStore, Guid, Dict),
- [SeqId | SeqIds], PAN};
- ({ack, _SeqId, Guid, false} = AckTag, {Dict, SeqIds, PAN}) ->
- case dict:find(AckTag, PAN) of
- error ->
- %% must be in the transient store and won't
- %% be in the queue index.
- {rabbit_misc:dict_cons(
- ?TRANSIENT_MSG_STORE, Guid, Dict), SeqIds, PAN};
+ fun ({ack, SeqId}, {Dict, SeqIds, PAN}) ->
+ PAN1 = dict:erase(SeqId, PAN),
+ case dict:find(SeqId, PAN) of
{ok, #msg_status { index_on_disk = false, %% ASSERTIONS
msg_on_disk = false,
is_persistent = false }} ->
- {Dict, SeqIds, dict:erase(AckTag, PAN)}
+ {Dict, SeqIds, PAN1};
+ {ok, {false, Guid}} ->
+ {rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, Guid,
+ Dict), SeqIds, PAN1};
+ {ok, {true, Guid}} ->
+ {rabbit_misc:dict_cons(PersistentStore, Guid, Dict),
+ [SeqId | SeqIds], PAN1}
end
end, {dict:new(), [], PA}, AckTags),
IndexState1 = rabbit_queue_index:write_acks(SeqIds, IndexState),
@@ -568,23 +575,31 @@ tx_commit(Txn, From, State = #vqstate { persistent_store = PersistentStore }) ->
State
end}.
-requeue(AckTags, State = #vqstate { persistent_store = PersistentStore,
- pending_ack = PA }) ->
+requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) ->
{SeqIds, GuidsByStore,
State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
lists:foldl(
- fun ({ack, SeqId, Guid, IsPersistent} = AckTag,
- {SeqIdsAcc, Dict, StateN = #vqstate {
- msg_store_clients = MSCStateN }}) ->
- case dict:find(AckTag, PA) of
- error ->
+ fun ({ack, SeqId},
+ {SeqIdsAcc, Dict, StateN =
+ #vqstate { msg_store_clients = MSCStateN,
+ pending_ack = PAN}}) ->
+ PAN1 = dict:erase(SeqId, PAN),
+ StateN1 = StateN #vqstate { pending_ack = PAN1 },
+ case dict:find(SeqId, PAN) of
+ {ok, #msg_status { index_on_disk = false,
+ msg_on_disk = false,
+ is_persistent = false,
+ msg = Msg }} ->
+ {_SeqId, StateN2} = publish(Msg, true, false, StateN1),
+ {SeqIdsAcc, Dict, StateN2};
+ {ok, {IsPersistent, Guid}} ->
{{ok, Msg = #basic_message{}}, MSCStateN1} =
- read_from_msg_store(PersistentStore, MSCStateN,
- IsPersistent, Guid),
- StateN1 = StateN #vqstate {
+ read_from_msg_store(
+ PersistentStore, MSCStateN, IsPersistent, Guid),
+ StateN2 = StateN1 #vqstate {
msg_store_clients = MSCStateN1 },
- {_SeqId, StateN2} = publish(Msg, true, true, StateN1),
+ {_SeqId, StateN3} = publish(Msg, true, true, StateN2),
{SeqIdsAcc1, MsgStore} =
case IsPersistent of
true ->
@@ -594,15 +609,7 @@ requeue(AckTags, State = #vqstate { persistent_store = PersistentStore,
end,
{SeqIdsAcc1,
rabbit_misc:dict_cons(MsgStore, Guid, Dict),
- StateN2};
- {ok, #msg_status { index_on_disk = false,
- msg_on_disk = false,
- is_persistent = false,
- msg = Msg }} ->
- {_SeqId, StateN1} = publish(Msg, true, false, StateN),
- {SeqIdsAcc, Dict,
- StateN1 #vqstate {
- pending_ack = dict:erase(AckTag, PA) }}
+ StateN3}
end
end, {[], dict:new(), State}, AckTags),
IndexState1 = rabbit_queue_index:write_acks(SeqIds, IndexState),
@@ -704,6 +711,47 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
%% Minor helpers
%%----------------------------------------------------------------------------
+remove_pending_ack(KeepPersistent,
+ State = #vqstate { pending_ack = PA,
+ persistent_store = PersistentStore,
+ index_state = IndexState }) ->
+ {SeqIds, GuidsByStore} =
+ dict:fold(fun (SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) ->
+ case IsPersistent of
+ true -> {[SeqId | SeqIdsAcc],
+ rabbit_misc:dict_cons(
+ PersistentStore, Guid, Dict)};
+ false -> {SeqIdsAcc,
+ rabbit_misc:dict_cons(
+ ?TRANSIENT_MSG_STORE, Guid, Dict)}
+ end;
+ (_SeqId, #basic_message {}, Acc) ->
+ Acc
+ end, {[], dict:new()}, PA),
+ case KeepPersistent of
+ true ->
+ State1 =
+ State #vqstate {
+ pending_ack =
+ dict:filter(
+ fun (_SeqId, {IsPersistent, _Guid}) -> IsPersistent;
+ (_SeqId, #basic_message {}) -> false
+ end, PA) },
+ case dict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of
+ error -> State1;
+ {ok, Guids} -> ok = rabbit_msg_store:remove(
+ ?TRANSIENT_MSG_STORE, Guids),
+ State1
+ end;
+ false ->
+ IndexState1 = rabbit_queue_index:write_acks(SeqIds, IndexState),
+ ok = dict:fold(fun (MsgStore, Guids, ok) ->
+ rabbit_msg_store:remove(MsgStore, Guids)
+ end, ok, GuidsByStore),
+ State #vqstate { pending_ack = dict:new(),
+ index_state = IndexState1 }
+ end.
+
lookup_tx(Txn) ->
case get({txn, Txn}) of
undefined -> #tx { pending_messages = [],