diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2015-02-02 16:35:05 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2015-02-02 16:35:05 +0000 |
| commit | 08fb4929f5c21432e0c7452e9cd9350480d4c692 (patch) | |
| tree | a924591308a9b0f47f1dae02dce46a189b957f35 | |
| parent | 5d07cfc33963e718f7c75cfda27a1ffd63f2016e (diff) | |
| download | rabbitmq-server-git-08fb4929f5c21432e0c7452e9cd9350480d4c692.tar.gz | |
disk_read_count / disk_write_count info items for queues.
| -rw-r--r-- | src/rabbit_backing_queue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 70 |
2 files changed, 48 insertions, 25 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 0d00ced756..9bfba6b28f 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -22,7 +22,8 @@ messages_unacknowledged_ram, messages_persistent, message_bytes, message_bytes_ready, message_bytes_unacknowledged, message_bytes_ram, - message_bytes_persistent, backing_queue_status]). + message_bytes_persistent, + disk_read_count, disk_write_count, backing_queue_status]). -ifdef(use_specs). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index cf18e8ea6a..8f73b5bb36 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -293,7 +293,11 @@ unconfirmed, confirmed, ack_out_counter, - ack_in_counter + ack_in_counter, + %% Unlike the other counters these two do not feed into + %% #rates{} and get reset + disk_read_count, + disk_write_count }). -record(rates, { in, out, ack_in, ack_out, timestamp }). @@ -390,7 +394,9 @@ unconfirmed :: gb_sets:set(), confirmed :: gb_sets:set(), ack_out_counter :: non_neg_integer(), - ack_in_counter :: non_neg_integer() }). + ack_in_counter :: non_neg_integer(), + disk_read_count :: non_neg_integer(), + disk_write_count :: non_neg_integer() }). %% Duplicated from rabbit_backing_queue -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). @@ -871,6 +877,10 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> RamBytes; info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> PersistentBytes; +info(disk_read_count, #vqstate{disk_read_count = Count}) -> + Count; +info(disk_write_count, #vqstate{disk_write_count = Count}) -> + Count; info(backing_queue_status, #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, @@ -1159,7 +1169,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, unconfirmed = gb_sets:new(), confirmed = gb_sets:new(), ack_out_counter = 0, - ack_in_counter = 0 }, + ack_in_counter = 0, + disk_read_count = 0, + disk_write_count = 0 }, a(maybe_deltas_to_betas(State)). blank_rates(Now) -> @@ -1200,10 +1212,12 @@ read_msg(#msg_status{msg = undefined, read_msg(#msg_status{msg = Msg}, State) -> {Msg, State}. -read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) -> +read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState, + disk_read_count = Count}) -> {{ok, Msg = #basic_message {}}, MSCState1} = msg_store_read(MSCState, IsPersistent, MsgId), - {Msg, State #vqstate {msg_store_clients = MSCState1}}. + {Msg, State #vqstate {msg_store_clients = MSCState1, + disk_read_count = Count + 1}}. stats(Signs, Statuses, State) -> stats0(expand_signs(Signs), expand_statuses(Statuses), State). @@ -1335,20 +1349,23 @@ remove_queue_entries1( %%---------------------------------------------------------------------------- maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { - msg_in_store = true }, _MSCState) -> - MsgStatus; + msg_in_store = true }, State) -> + {MsgStatus, State}; maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { msg = Msg, msg_id = MsgId, - is_persistent = IsPersistent }, MSCState) + is_persistent = IsPersistent }, + State = #vqstate{ msg_store_clients = MSCState, + disk_write_count = Count}) when Force orelse IsPersistent -> case persist_to(MsgStatus) of msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId, prepare_to_store(Msg)), - MsgStatus#msg_status{msg_in_store = true}; - queue_index -> MsgStatus + {MsgStatus#msg_status{msg_in_store = true}, + State#vqstate{disk_write_count = Count + 1}}; + queue_index -> {MsgStatus, State} end; -maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) -> - MsgStatus. +maybe_write_msg_to_disk(_Force, MsgStatus, State) -> + {MsgStatus, State}. maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { index_on_disk = true }, State) -> @@ -1361,25 +1378,28 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status { is_delivered = IsDelivered, msg_props = MsgProps}, State = #vqstate{target_ram_count = TargetRamCount, + disk_write_count = DiskWriteCount, index_state = IndexState}) when Force orelse IsPersistent -> + {MsgOrId, DiskWriteCount1} = + case persist_to(MsgStatus) of + msg_store -> {MsgId, DiskWriteCount}; + queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1} + end, IndexState1 = rabbit_queue_index:publish( - case persist_to(MsgStatus) of - msg_store -> MsgId; - queue_index -> prepare_to_store(Msg) - end, SeqId, MsgProps, IsPersistent, TargetRamCount, + MsgOrId, SeqId, MsgProps, IsPersistent, TargetRamCount, IndexState), IndexState2 = maybe_write_delivered(IsDelivered, SeqId, IndexState1), {MsgStatus#msg_status{index_on_disk = true}, - State#vqstate{index_state = IndexState2}}; + State#vqstate{index_state = IndexState2, + disk_write_count = DiskWriteCount1}}; maybe_write_index_to_disk(_Force, MsgStatus, State) -> {MsgStatus, State}. -maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, - State = #vqstate { msg_store_clients = MSCState }) -> - MsgStatus1 = maybe_write_msg_to_disk(ForceMsg, MsgStatus, MSCState), - maybe_write_index_to_disk(ForceIndex, MsgStatus1, State). +maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> + {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), + maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1). determine_persist_to(#basic_message{ content = #content{properties = Props, @@ -1849,6 +1869,7 @@ maybe_deltas_to_betas(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA, qi_pending_ack = QPA, + disk_read_count = DiskReadCount, transient_threshold = TransientThreshold }) -> #delta { start_seq_id = DeltaSeqId, count = DeltaCount, @@ -1861,9 +1882,10 @@ maybe_deltas_to_betas(State = #vqstate { {Q3a, RamCountsInc, RamBytesInc, IndexState2} = betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState1), - State1 = State #vqstate { index_state = IndexState2, - ram_msg_count = RamMsgCount + RamCountsInc, - ram_bytes = RamBytes + RamBytesInc }, + State1 = State #vqstate { index_state = IndexState2, + ram_msg_count = RamMsgCount + RamCountsInc, + ram_bytes = RamBytes + RamBytesInc, + disk_read_count = DiskReadCount + RamCountsInc}, case ?QUEUE:len(Q3a) of 0 -> %% we ignored every message in the segment due to it being |
