diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2015-01-23 13:26:07 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2015-01-23 13:26:07 +0000 |
| commit | 2828a3b4925884f985fe30d6edfea2cc709cf444 (patch) | |
| tree | 0c136a0a3c8d3d1f7959df672a592a95fe5ddb37 /src | |
| parent | 3e064be3eaeb72fa02fcc565f615261bc1c52942 (diff) | |
| download | rabbitmq-server-git-2828a3b4925884f985fe30d6edfea2cc709cf444.tar.gz | |
Simplify purge/1 and friends by getting stats/3 to do the work of updating counters, rather than having separate code to do that. This is conceivably slightly uglier, since the #vqstate{} record is not internally consistent at some points while doing this... but it's so much simpler!
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 85 |
1 files changed, 28 insertions, 57 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 56a3295d1c..acdd4c7521 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -544,38 +544,19 @@ delete_and_terminate(_Reason, State) -> delete_crashed(#amqqueue{name = QName}) -> ok = rabbit_queue_index:erase(QName). -purge(State = #vqstate { q4 = Q4, - index_state = IndexState, - msg_store_clients = MSCState, - len = Len }) -> +purge(State = #vqstate { q4 = Q4, + len = Len }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. - Stats = {0, 0, 0}, - {Stats1, IndexState1} = - remove_queue_entries(Q4, Stats, IndexState, MSCState), - - {Stats2, State1 = #vqstate { q1 = Q1, - index_state = IndexState2, - msg_store_clients = MSCState1, - ram_bytes = RamBytes, - persistent_count = PCount, - persistent_bytes = PBytes }} = - purge_betas_and_deltas( - Stats1, State #vqstate { q4 = ?QUEUE:new(), - index_state = IndexState1 }), - - {{RamBytesDec, PCountDec, PBytesDec}, IndexState3} = - remove_queue_entries(Q1, Stats2, IndexState2, MSCState1), - - {Len, a(State1 #vqstate { q1 = ?QUEUE:new(), - index_state = IndexState3, - len = 0, - bytes = 0, - ram_msg_count = 0, - ram_bytes = RamBytes - RamBytesDec, - persistent_count = PCount - PCountDec, - persistent_bytes = PBytes - PBytesDec })}. + State1 = remove_queue_entries(Q4, State), + + State2 = #vqstate { q1 = Q1 } = + purge_betas_and_deltas(State1 #vqstate { q4 = ?QUEUE:new() }), + + State3 = remove_queue_entries(Q1, State2), + + {Len, a(State3 #vqstate { q1 = ?QUEUE:new() })}. purge_acks(State) -> a(purge_pending_ack(false, State)). @@ -1316,48 +1297,38 @@ remove(AckRequired, MsgStatus = #msg_status { State2 #vqstate {out_counter = OutCount + 1, index_state = IndexState2})}. -purge_betas_and_deltas(Stats, - State = #vqstate { q3 = Q3, - index_state = IndexState, - msg_store_clients = MSCState }) -> +purge_betas_and_deltas(State = #vqstate { q3 = Q3 }) -> case ?QUEUE:is_empty(Q3) of - true -> {Stats, State}; - false -> {Stats1, IndexState1} = remove_queue_entries( - Q3, Stats, IndexState, MSCState), - purge_betas_and_deltas(Stats1, - maybe_deltas_to_betas( - State #vqstate { - q3 = ?QUEUE:new(), - index_state = IndexState1 })) + true -> State; + false -> State1 = remove_queue_entries(Q3, State), + purge_betas_and_deltas(maybe_deltas_to_betas( + State1#vqstate{q3 = ?QUEUE:new()})) end. -remove_queue_entries(Q, {RamBytesDec, PCountDec, PBytesDec}, - IndexState, MSCState) -> - {MsgIdsByStore, RamBytesDec1, PCountDec1, PBytesDec1, Delivers, Acks} = +remove_queue_entries(Q, State = #vqstate{index_state = IndexState, + msg_store_clients = MSCState}) -> + {MsgIdsByStore, Delivers, Acks, State1} = ?QUEUE:foldl(fun remove_queue_entries1/2, - {orddict:new(), RamBytesDec, PCountDec, PBytesDec, [], []}, Q), + {orddict:new(), [], [], State}, Q), ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> msg_store_remove(MSCState, IsPersistent, MsgIds) end, ok, MsgIdsByStore), - {{RamBytesDec1, PCountDec1, PBytesDec1}, - rabbit_queue_index:ack(Acks, - rabbit_queue_index:deliver(Delivers, IndexState))}. + IndexState1 = rabbit_queue_index:ack( + Acks, rabbit_queue_index:deliver(Delivers, IndexState)), + State1#vqstate{index_state = IndexState1}. remove_queue_entries1( - #msg_status { msg_id = MsgId, seq_id = SeqId, - is_delivered = IsDelivered, msg_in_store = MsgInStore, - index_on_disk = IndexOnDisk, is_persistent = IsPersistent, - msg_props = #message_properties { size = Size } } = MsgStatus, - {MsgIdsByStore, RamBytesDec, PCountDec, PBytesDec, Delivers, Acks}) -> + #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk, + is_persistent = IsPersistent} = MsgStatus, + {MsgIdsByStore, Delivers, Acks, State}) -> {case MsgInStore of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, - RamBytesDec + Size * one_if(msg_in_ram(MsgStatus)), - PCountDec + one_if(IsPersistent), - PBytesDec + Size * one_if(IsPersistent), cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), - cons_if(IndexOnDisk, SeqId, Acks)}. + cons_if(IndexOnDisk, SeqId, Acks), + stats({-1, 0}, {MsgStatus, none}, State)}. %%---------------------------------------------------------------------------- %% Internal gubbins for publishing |
