summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-01-23 13:26:07 +0000
committerSimon MacMullen <simon@rabbitmq.com>2015-01-23 13:26:07 +0000
commit2828a3b4925884f985fe30d6edfea2cc709cf444 (patch)
tree0c136a0a3c8d3d1f7959df672a592a95fe5ddb37 /src
parent3e064be3eaeb72fa02fcc565f615261bc1c52942 (diff)
downloadrabbitmq-server-git-2828a3b4925884f985fe30d6edfea2cc709cf444.tar.gz
Simplify purge/1 and friends by getting stats/3 to do the work of updating counters, rather than having separate code to do that. This is conceivably slightly uglier, since the #vqstate{} record is not internally consistent at some points while doing this... but it's so much simpler!
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl85
1 files changed, 28 insertions, 57 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 56a3295d1c..acdd4c7521 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -544,38 +544,19 @@ delete_and_terminate(_Reason, State) ->
delete_crashed(#amqqueue{name = QName}) ->
ok = rabbit_queue_index:erase(QName).
-purge(State = #vqstate { q4 = Q4,
- index_state = IndexState,
- msg_store_clients = MSCState,
- len = Len }) ->
+purge(State = #vqstate { q4 = Q4,
+ len = Len }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
- Stats = {0, 0, 0},
- {Stats1, IndexState1} =
- remove_queue_entries(Q4, Stats, IndexState, MSCState),
-
- {Stats2, State1 = #vqstate { q1 = Q1,
- index_state = IndexState2,
- msg_store_clients = MSCState1,
- ram_bytes = RamBytes,
- persistent_count = PCount,
- persistent_bytes = PBytes }} =
- purge_betas_and_deltas(
- Stats1, State #vqstate { q4 = ?QUEUE:new(),
- index_state = IndexState1 }),
-
- {{RamBytesDec, PCountDec, PBytesDec}, IndexState3} =
- remove_queue_entries(Q1, Stats2, IndexState2, MSCState1),
-
- {Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
- index_state = IndexState3,
- len = 0,
- bytes = 0,
- ram_msg_count = 0,
- ram_bytes = RamBytes - RamBytesDec,
- persistent_count = PCount - PCountDec,
- persistent_bytes = PBytes - PBytesDec })}.
+ State1 = remove_queue_entries(Q4, State),
+
+ State2 = #vqstate { q1 = Q1 } =
+ purge_betas_and_deltas(State1 #vqstate { q4 = ?QUEUE:new() }),
+
+ State3 = remove_queue_entries(Q1, State2),
+
+ {Len, a(State3 #vqstate { q1 = ?QUEUE:new() })}.
purge_acks(State) -> a(purge_pending_ack(false, State)).
@@ -1316,48 +1297,38 @@ remove(AckRequired, MsgStatus = #msg_status {
State2 #vqstate {out_counter = OutCount + 1,
index_state = IndexState2})}.
-purge_betas_and_deltas(Stats,
- State = #vqstate { q3 = Q3,
- index_state = IndexState,
- msg_store_clients = MSCState }) ->
+purge_betas_and_deltas(State = #vqstate { q3 = Q3 }) ->
case ?QUEUE:is_empty(Q3) of
- true -> {Stats, State};
- false -> {Stats1, IndexState1} = remove_queue_entries(
- Q3, Stats, IndexState, MSCState),
- purge_betas_and_deltas(Stats1,
- maybe_deltas_to_betas(
- State #vqstate {
- q3 = ?QUEUE:new(),
- index_state = IndexState1 }))
+ true -> State;
+ false -> State1 = remove_queue_entries(Q3, State),
+ purge_betas_and_deltas(maybe_deltas_to_betas(
+ State1#vqstate{q3 = ?QUEUE:new()}))
end.
-remove_queue_entries(Q, {RamBytesDec, PCountDec, PBytesDec},
- IndexState, MSCState) ->
- {MsgIdsByStore, RamBytesDec1, PCountDec1, PBytesDec1, Delivers, Acks} =
+remove_queue_entries(Q, State = #vqstate{index_state = IndexState,
+ msg_store_clients = MSCState}) ->
+ {MsgIdsByStore, Delivers, Acks, State1} =
?QUEUE:foldl(fun remove_queue_entries1/2,
- {orddict:new(), RamBytesDec, PCountDec, PBytesDec, [], []}, Q),
+ {orddict:new(), [], [], State}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
- {{RamBytesDec1, PCountDec1, PBytesDec1},
- rabbit_queue_index:ack(Acks,
- rabbit_queue_index:deliver(Delivers, IndexState))}.
+ IndexState1 = rabbit_queue_index:ack(
+ Acks, rabbit_queue_index:deliver(Delivers, IndexState)),
+ State1#vqstate{index_state = IndexState1}.
remove_queue_entries1(
- #msg_status { msg_id = MsgId, seq_id = SeqId,
- is_delivered = IsDelivered, msg_in_store = MsgInStore,
- index_on_disk = IndexOnDisk, is_persistent = IsPersistent,
- msg_props = #message_properties { size = Size } } = MsgStatus,
- {MsgIdsByStore, RamBytesDec, PCountDec, PBytesDec, Delivers, Acks}) ->
+ #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
+ msg_in_store = MsgInStore, index_on_disk = IndexOnDisk,
+ is_persistent = IsPersistent} = MsgStatus,
+ {MsgIdsByStore, Delivers, Acks, State}) ->
{case MsgInStore of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
- RamBytesDec + Size * one_if(msg_in_ram(MsgStatus)),
- PCountDec + one_if(IsPersistent),
- PBytesDec + Size * one_if(IsPersistent),
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
- cons_if(IndexOnDisk, SeqId, Acks)}.
+ cons_if(IndexOnDisk, SeqId, Acks),
+ stats({-1, 0}, {MsgStatus, none}, State)}.
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing