summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-29 22:42:36 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-29 22:42:36 +0100
commit21ec534a390cb54e7486cf97228e116fa75499bf (patch)
tree9c72393292d46dca8d4763ab99b5f1f9d2fe81c4 /src
parent40172b86a6fd5b7ae5e3d25ef6a012ae69c59ffd (diff)
downloadrabbitmq-server-git-21ec534a390cb54e7486cf97228e116fa75499bf.tar.gz
refactor: extract removal from pending_ack
and in the process renamed remove_pending_ack to purge_pending_ack.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl58
1 files changed, 29 insertions, 29 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index e06f32bf42..1aba634367 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -438,7 +438,7 @@ terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
- remove_pending_ack(true, State),
+ purge_pending_ack(true, State),
PRef = case MSCStateP of
undefined -> undefined;
_ -> ok = rabbit_msg_store:client_terminate(MSCStateP),
@@ -457,12 +457,12 @@ terminate(_Reason, State) ->
%% needs to delete everything that's been delivered and not ack'd.
delete_and_terminate(_Reason, State) ->
%% TODO: there is no need to interact with qi at all - which we do
- %% as part of 'purge' and 'remove_pending_ack', other than
+ %% as part of 'purge' and 'purge_pending_ack', other than
%% deleting it.
{_PurgeCount, State1} = purge(State),
State2 = #vqstate { index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
- remove_pending_ack(false, State1),
+ purge_pending_ack(false, State1),
IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
case MSCStateP of
undefined -> ok;
@@ -574,13 +574,9 @@ ack(AckTags, State) ->
persistent_count = PCount,
ack_out_counter = AckOutCount }} =
lists:foldl(
- fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA,
- ram_ack_index = RAI }}) ->
- AckEntry = dict:fetch(SeqId, PA),
- {accumulate_ack(SeqId, AckEntry, Acc),
- State2 #vqstate {
- pending_ack = dict:erase(SeqId, PA),
- ram_ack_index = gb_trees:delete_any(SeqId, RAI)}}
+ fun (SeqId, {Acc, State2}) ->
+ {AckEntry, State3} = remove_pending_ack(SeqId, State2),
+ {accumulate_ack(SeqId, AckEntry, Acc), State3}
end, {accumulate_ack_init(), State}, AckTags),
IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
@@ -1215,10 +1211,16 @@ record_pending_ack(#msg_status { seq_id = SeqId,
ram_ack_index = RAI1,
ack_in_counter = AckInCount + 1}.
-remove_pending_ack(KeepPersistent,
- State = #vqstate { pending_ack = PA,
- index_state = IndexState,
- msg_store_clients = MSCState }) ->
+remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA,
+ ram_ack_index = RAI }) ->
+ {dict:fetch(SeqId, PA),
+ State #vqstate { pending_ack = dict:erase(SeqId, PA),
+ ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}.
+
+purge_pending_ack(KeepPersistent,
+ State = #vqstate { pending_ack = PA,
+ index_state = IndexState,
+ msg_store_clients = MSCState }) ->
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
@@ -1422,23 +1424,21 @@ delta_merge(SeqIds, #delta { start_seq_id = StartSeqId,
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
-msg_from_pending_ack(SeqId, MsgPropsFun,
- #vqstate { pending_ack = PA,
- ram_ack_index = RAI } = State) ->
- State1 = State #vqstate { pending_ack = dict:erase(SeqId, PA),
- ram_ack_index = gb_trees:delete_any(SeqId, RAI)},
+msg_from_pending_ack(SeqId, MsgPropsFun, State) ->
+ {AckEntry, State1} = remove_pending_ack(SeqId, State),
#msg_status { msg_props = MsgProps1 } = MsgStatus1 =
- case dict:fetch(SeqId, PA) of
+ case AckEntry of
{IsPersistent, MsgId, MsgProps, IndexOnDisk} ->
- #msg_status { seq_id = SeqId,
- msg_id = MsgId,
- msg = undefined,
- is_persistent = IsPersistent,
- is_delivered = true,
- msg_on_disk = true,
- index_on_disk = IndexOnDisk,
- msg_props = MsgProps };
- #msg_status{} = MsgStatus0 -> MsgStatus0
+ m(#msg_status { seq_id = SeqId,
+ msg_id = MsgId,
+ msg = undefined,
+ is_persistent = IsPersistent,
+ is_delivered = true,
+ msg_on_disk = true,
+ index_on_disk = IndexOnDisk,
+ msg_props = MsgProps });
+ #msg_status{} = MsgStatus ->
+ MsgStatus
end,
{MsgStatus1 #msg_status {
msg_props = (MsgPropsFun(MsgProps1)) #message_properties {