summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-02-02 16:35:05 +0000
committerSimon MacMullen <simon@rabbitmq.com>2015-02-02 16:35:05 +0000
commit08fb4929f5c21432e0c7452e9cd9350480d4c692 (patch)
treea924591308a9b0f47f1dae02dce46a189b957f35
parent5d07cfc33963e718f7c75cfda27a1ffd63f2016e (diff)
downloadrabbitmq-server-git-08fb4929f5c21432e0c7452e9cd9350480d4c692.tar.gz
disk_read_count / disk_write_count info items for queues.
-rw-r--r--src/rabbit_backing_queue.erl3
-rw-r--r--src/rabbit_variable_queue.erl70
2 files changed, 48 insertions, 25 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 0d00ced756..9bfba6b28f 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -22,7 +22,8 @@
messages_unacknowledged_ram, messages_persistent,
message_bytes, message_bytes_ready,
message_bytes_unacknowledged, message_bytes_ram,
- message_bytes_persistent, backing_queue_status]).
+ message_bytes_persistent,
+ disk_read_count, disk_write_count, backing_queue_status]).
-ifdef(use_specs).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index cf18e8ea6a..8f73b5bb36 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -293,7 +293,11 @@
unconfirmed,
confirmed,
ack_out_counter,
- ack_in_counter
+ ack_in_counter,
+ %% Unlike the other counters these two do not feed into
+ %% #rates{} and get reset
+ disk_read_count,
+ disk_write_count
}).
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -390,7 +394,9 @@
unconfirmed :: gb_sets:set(),
confirmed :: gb_sets:set(),
ack_out_counter :: non_neg_integer(),
- ack_in_counter :: non_neg_integer() }).
+ ack_in_counter :: non_neg_integer(),
+ disk_read_count :: non_neg_integer(),
+ disk_write_count :: non_neg_integer() }).
%% Duplicated from rabbit_backing_queue
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
@@ -871,6 +877,10 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
RamBytes;
info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
PersistentBytes;
+info(disk_read_count, #vqstate{disk_read_count = Count}) ->
+ Count;
+info(disk_write_count, #vqstate{disk_write_count = Count}) ->
+ Count;
info(backing_queue_status, #vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
@@ -1159,7 +1169,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
unconfirmed = gb_sets:new(),
confirmed = gb_sets:new(),
ack_out_counter = 0,
- ack_in_counter = 0 },
+ ack_in_counter = 0,
+ disk_read_count = 0,
+ disk_write_count = 0 },
a(maybe_deltas_to_betas(State)).
blank_rates(Now) ->
@@ -1200,10 +1212,12 @@ read_msg(#msg_status{msg = undefined,
read_msg(#msg_status{msg = Msg}, State) ->
{Msg, State}.
-read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
+read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState,
+ disk_read_count = Count}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
- {Msg, State #vqstate {msg_store_clients = MSCState1}}.
+ {Msg, State #vqstate {msg_store_clients = MSCState1,
+ disk_read_count = Count + 1}}.
stats(Signs, Statuses, State) ->
stats0(expand_signs(Signs), expand_statuses(Statuses), State).
@@ -1335,20 +1349,23 @@ remove_queue_entries1(
%%----------------------------------------------------------------------------
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
- msg_in_store = true }, _MSCState) ->
- MsgStatus;
+ msg_in_store = true }, State) ->
+ {MsgStatus, State};
maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
msg = Msg, msg_id = MsgId,
- is_persistent = IsPersistent }, MSCState)
+ is_persistent = IsPersistent },
+ State = #vqstate{ msg_store_clients = MSCState,
+ disk_write_count = Count})
when Force orelse IsPersistent ->
case persist_to(MsgStatus) of
msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId,
prepare_to_store(Msg)),
- MsgStatus#msg_status{msg_in_store = true};
- queue_index -> MsgStatus
+ {MsgStatus#msg_status{msg_in_store = true},
+ State#vqstate{disk_write_count = Count + 1}};
+ queue_index -> {MsgStatus, State}
end;
-maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) ->
- MsgStatus.
+maybe_write_msg_to_disk(_Force, MsgStatus, State) ->
+ {MsgStatus, State}.
maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
index_on_disk = true }, State) ->
@@ -1361,25 +1378,28 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
is_delivered = IsDelivered,
msg_props = MsgProps},
State = #vqstate{target_ram_count = TargetRamCount,
+ disk_write_count = DiskWriteCount,
index_state = IndexState})
when Force orelse IsPersistent ->
+ {MsgOrId, DiskWriteCount1} =
+ case persist_to(MsgStatus) of
+ msg_store -> {MsgId, DiskWriteCount};
+ queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1}
+ end,
IndexState1 = rabbit_queue_index:publish(
- case persist_to(MsgStatus) of
- msg_store -> MsgId;
- queue_index -> prepare_to_store(Msg)
- end, SeqId, MsgProps, IsPersistent, TargetRamCount,
+ MsgOrId, SeqId, MsgProps, IsPersistent, TargetRamCount,
IndexState),
IndexState2 = maybe_write_delivered(IsDelivered, SeqId, IndexState1),
{MsgStatus#msg_status{index_on_disk = true},
- State#vqstate{index_state = IndexState2}};
+ State#vqstate{index_state = IndexState2,
+ disk_write_count = DiskWriteCount1}};
maybe_write_index_to_disk(_Force, MsgStatus, State) ->
{MsgStatus, State}.
-maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
- State = #vqstate { msg_store_clients = MSCState }) ->
- MsgStatus1 = maybe_write_msg_to_disk(ForceMsg, MsgStatus, MSCState),
- maybe_write_index_to_disk(ForceIndex, MsgStatus1, State).
+maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
+ {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
+ maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1).
determine_persist_to(#basic_message{
content = #content{properties = Props,
@@ -1849,6 +1869,7 @@ maybe_deltas_to_betas(State = #vqstate {
ram_pending_ack = RPA,
disk_pending_ack = DPA,
qi_pending_ack = QPA,
+ disk_read_count = DiskReadCount,
transient_threshold = TransientThreshold }) ->
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
@@ -1861,9 +1882,10 @@ maybe_deltas_to_betas(State = #vqstate {
{Q3a, RamCountsInc, RamBytesInc, IndexState2} =
betas_from_index_entries(List, TransientThreshold,
RPA, DPA, QPA, IndexState1),
- State1 = State #vqstate { index_state = IndexState2,
- ram_msg_count = RamMsgCount + RamCountsInc,
- ram_bytes = RamBytes + RamBytesInc },
+ State1 = State #vqstate { index_state = IndexState2,
+ ram_msg_count = RamMsgCount + RamCountsInc,
+ ram_bytes = RamBytes + RamBytesInc,
+ disk_read_count = DiskReadCount + RamCountsInc},
case ?QUEUE:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being