summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-19 16:19:56 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-19 16:19:56 +0100
commit7531d0f281c1a7a652336e31dbcaeb502b6715ab (patch)
tree89d5b2cab75f600c56233eae919306530090e395 /src
parent5ea831a8b641f8282f375972a65038d85440760b (diff)
downloadrabbitmq-server-git-7531d0f281c1a7a652336e31dbcaeb502b6715ab.tar.gz
Lots of fixes for handling of acktags, which was badly broken in some places
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl72
1 files changed, 35 insertions, 37 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 1a609dcc99..9328164b64 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -201,7 +201,7 @@
-type(bpqueue() :: any()).
-type(seq_id() :: non_neg_integer()).
--type(ack() :: {'ack', seq_id()} | 'blank_ack').
+-type(ack() :: seq_id() | 'blank_ack').
-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
count :: non_neg_integer (),
@@ -405,7 +405,7 @@ publish_delivered(true, Msg = #basic_message { guid = Guid,
next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1 },
- {{ack, SeqId},
+ {SeqId,
case MsgStatus1 #msg_status.msg_on_disk of
true ->
{#msg_status { index_on_disk = true }, IndexState1} =
@@ -431,7 +431,7 @@ fetch(AckRequired, State =
Q4a} ->
AckTag = case AckRequired of
- true -> {ack, SeqId};
+ true -> SeqId;
false -> blank_ack
end,
@@ -501,7 +501,7 @@ ack(AckTags, State = #vqstate { index_state = IndexState,
pending_ack = PA }) ->
{GuidsByStore, SeqIds, PA1} =
lists:foldl(
- fun ({ack, SeqId}, {Dict, SeqIds, PAN}) ->
+ fun (SeqId, {Dict, SeqIds, PAN}) ->
PAN1 = dict:erase(SeqId, PAN),
case dict:find(SeqId, PAN) of
{ok, #msg_status { index_on_disk = false, %% ASSERTIONS
@@ -580,10 +580,9 @@ requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) ->
State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
lists:foldl(
- fun ({ack, SeqId},
- {SeqIdsAcc, Dict, StateN =
- #vqstate { msg_store_clients = MSCStateN,
- pending_ack = PAN}}) ->
+ fun (SeqId, {SeqIdsAcc, Dict, StateN =
+ #vqstate { msg_store_clients = MSCStateN,
+ pending_ack = PAN}}) ->
PAN1 = dict:erase(SeqId, PAN),
StateN1 = StateN #vqstate { pending_ack = PAN1 },
case dict:find(SeqId, PAN) of
@@ -715,28 +714,27 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
persistent_store = PersistentStore,
index_state = IndexState }) ->
- {SeqIds, GuidsByStore} =
- dict:fold(fun (SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) ->
- case IsPersistent of
- true -> {[SeqId | SeqIdsAcc],
- rabbit_misc:dict_cons(
- PersistentStore, Guid, Dict)};
- false -> {SeqIdsAcc,
- rabbit_misc:dict_cons(
- ?TRANSIENT_MSG_STORE, Guid, Dict)}
- end;
- (_SeqId, #msg_status {}, Acc) ->
- Acc
- end, {[], dict:new()}, PA),
+ {SeqIds, GuidsByStore, PA1} =
+ dict:fold(
+ fun (SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict, PAN}) ->
+ PAN1 = case KeepPersistent andalso IsPersistent of
+ true -> PAN;
+ false -> dict:erase(SeqId, PAN)
+ end,
+ case IsPersistent of
+ true -> {[SeqId | SeqIdsAcc],
+ rabbit_misc:dict_cons(
+ PersistentStore, Guid, Dict), PAN1};
+ false -> {SeqIdsAcc,
+ rabbit_misc:dict_cons(
+ ?TRANSIENT_MSG_STORE, Guid, Dict), PAN1}
+ end;
+ (SeqId, #msg_status {}, {SeqIdsAcc, Dict, PAN}) ->
+ {SeqIdsAcc, Dict, dict:erase(SeqId, PAN)}
+ end, {[], dict:new(), PA}, PA),
case KeepPersistent of
true ->
- State1 =
- State #vqstate {
- pending_ack =
- dict:filter(
- fun (_SeqId, {IsPersistent, _Guid}) -> IsPersistent;
- (_SeqId, #msg_status {}) -> false
- end, PA) },
+ State1 = State #vqstate { pending_ack = PA1 },
case dict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of
error -> State1;
{ok, Guids} -> ok = rabbit_msg_store:remove(
@@ -911,9 +909,14 @@ tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, From, State =
%% persistent acks) then we can skip the queue_index loop.
case PersistentStore == ?TRANSIENT_MSG_STORE orelse
(IsTransientPubs andalso
- lists:foldl(fun (AckTag, true ) -> dict:is_key(AckTag, PA);
- (_AckTag, false) -> false
- end, true, AckTags)) of
+ lists:foldl(
+ fun (AckTag, true ) ->
+ case dict:find(AckTag, PA) of
+ {ok, #msg_status{}} -> true;
+ {ok, {IsPersistent, _Guid}} -> not IsPersistent
+ end;
+ (_AckTag, false) -> false
+ end, true, AckTags)) of
true -> State1 = tx_commit_index(State #vqstate {
on_sync = {[], [Pubs], [From]} }),
State1 #vqstate { on_sync = OnSync };
@@ -928,11 +931,6 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms},
persistent_store = PersistentStore }) ->
Acks = lists:flatten(SAcks),
State1 = ack(Acks, State),
- AckSeqIds = lists:foldl(fun ({ack, SeqId, _Guid, true}, SeqIdsAcc) ->
- [SeqId | SeqIdsAcc];
- (_, SeqIdsAcc) ->
- SeqIdsAcc
- end, [], Acks),
IsPersistentStore = ?PERSISTENT_MSG_STORE == PersistentStore,
Pubs = lists:flatten(lists:reverse(SPubs)),
{SeqIds, State2 = #vqstate { index_state = IndexState }} =
@@ -945,7 +943,7 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms},
true -> [SeqId | SeqIdsAcc];
false -> SeqIdsAcc
end, StateN1}
- end, {AckSeqIds, State1}, Pubs),
+ end, {Acks, State1}, Pubs),
IndexState1 =
rabbit_queue_index:sync_seq_ids(SeqIds, IndexState),
[ gen_server2:reply(From, ok) || From <- lists:reverse(SFroms) ],