summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-05-21 13:38:20 +0100
committerMatthew Sackman <matthew@lshift.net>2009-05-21 13:38:20 +0100
commit3756f096565791dfb1968164c67c047c322fa390 (patch)
treeaa3f96866115555aebc4ab5fff668d17f84df5cc
parent9c18c3d9d2809bc419b50d6eb0f3a64aee310a40 (diff)
downloadrabbitmq-server-git-3756f096565791dfb1968164c67c047c322fa390.tar.gz
Added is_empty and length functions.
-rw-r--r--src/rabbit_disk_queue.erl127
-rw-r--r--src/rabbit_mixed_queue.erl13
2 files changed, 87 insertions, 53 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 8c602b5310..2bc40123df 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -42,6 +42,8 @@
tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1,
requeue/2, requeue_with_seqs/2, purge/1]).
+-export([length/1, is_empty/1]).
+
-export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]).
-include("rabbit.hrl").
@@ -83,7 +85,7 @@
%% FileSummary: this is an ets table which contains:
%% {File, ValidTotalSize, ContiguousTop, Left, Right}
%% Sequences: this is an ets table which contains:
-%% {Q, ReadSeqId, WriteSeqId}
+%% {Q, ReadSeqId, WriteSeqId, QueueLength}
%% rabbit_disk_queue: this is an mnesia table which contains:
%% #dq_msg_loc { queue_and_seq_id = {Q, SeqId},
%% is_delivered = IsDelivered,
@@ -245,6 +247,8 @@
-spec(stop_and_obliterate/0 :: () -> 'ok').
-spec(to_ram_disk_mode/0 :: () -> 'ok').
-spec(to_disk_only_mode/0 :: () -> 'ok').
+-spec(length/1 :: (queue_name()) -> non_neg_integer()).
+-spec(is_empty/1 :: (queue_name()) -> bool()).
-endif.
@@ -303,6 +307,13 @@ to_disk_only_mode() ->
to_ram_disk_mode() ->
gen_server:call(?SERVER, to_ram_disk_mode, infinity).
+length(Q) ->
+ gen_server:call(?SERVER, {length, Q}, infinity).
+
+is_empty(Q) ->
+ Length = rabbit_disk_queue:length(Q),
+ Length == 0.
+
%% ---- GEN-SERVER INTERNAL API ----
init([FileSizeLimit, ReadFileHandlesLimit]) ->
@@ -378,7 +389,7 @@ handle_call({phantom_deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, false, State),
{reply, Result, State1};
handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) ->
- PubMsgSeqIds = lists:zip(PubMsgIds, lists:duplicate(length(PubMsgIds), next)),
+ PubMsgSeqIds = lists:zip(PubMsgIds, lists:duplicate(erlang:length(PubMsgIds), next)),
{ok, State1} = internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, State),
{reply, ok, State1};
handle_call({tx_commit_with_seqs, Q, PubSeqMsgIds, AckSeqIds}, _From, State) ->
@@ -418,7 +429,12 @@ handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = disk_on
{atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_copies),
true = ets:from_dets(MsgLocationEts, MsgLocationDets),
ok = dets:delete_all_objects(MsgLocationDets),
- {reply, ok, State #dqstate { operation_mode = ram_disk }}.
+ {reply, ok, State #dqstate { operation_mode = ram_disk }};
+handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) ->
+ case ets:lookup(Sequences, Q) of
+ [] -> {reply, 0, State};
+ [{Q, _ReadSeqId, _WriteSeqId, Length}] -> {reply, Length, State}
+ end.
handle_cast({publish, Q, MsgId, MsgBody}, State) ->
{ok, State1} = internal_publish(Q, MsgId, next, MsgBody, State),
@@ -436,7 +452,7 @@ handle_cast({tx_cancel, MsgIds}, State) ->
{ok, State1} = internal_tx_cancel(MsgIds, State),
{noreply, State1};
handle_cast({requeue, Q, MsgSeqIds}, State) ->
- MsgSeqSeqIds = lists:zip(MsgSeqIds, lists:duplicate(length(MsgSeqIds), next)),
+ MsgSeqSeqIds = lists:zip(MsgSeqIds, lists:duplicate(erlang:length(MsgSeqIds), next)),
{ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State),
{noreply, State1};
handle_cast({requeue_with_seqs, Q, MsgSeqSeqIds}, State) ->
@@ -522,29 +538,28 @@ dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, operation_mo
internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) ->
case ets:lookup(Sequences, Q) of
[] -> {ok, empty, State};
- [{Q, ReadSeqId, WriteSeqId}] ->
- case mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}) of
- [] -> {ok, empty, State};
- [Obj =
- #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId,
- next_seq_id = ReadSeqId2}] ->
- [{MsgId, _RefCount, File, Offset, TotalSize}] =
- dets_ets_lookup(State, MsgId),
- true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId}),
- ok =
- if Delivered -> ok;
- true ->
- mnesia:dirty_write(rabbit_disk_queue,
- Obj #dq_msg_loc {is_delivered = true})
- end,
- if ReadMsg ->
- {FileHdl, State1} = get_read_handle(File, State),
- {ok, {MsgBody, BodySize}} =
- read_message_at_offset(FileHdl, Offset, TotalSize),
- {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}},
- State1};
- true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, State}
- end
+ [{Q, _ReadSeqId, _WriteSeqId, 0}] -> {ok, empty, State};
+ [{Q, ReadSeqId, WriteSeqId, Length}] ->
+ [Obj =
+ #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId,
+ next_seq_id = ReadSeqId2}] =
+ mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}),
+ [{MsgId, _RefCount, File, Offset, TotalSize}] =
+ dets_ets_lookup(State, MsgId),
+ true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId, Length - 1}),
+ ok =
+ if Delivered -> ok;
+ true ->
+ mnesia:dirty_write(rabbit_disk_queue,
+ Obj #dq_msg_loc {is_delivered = true})
+ end,
+ if ReadMsg ->
+ {FileHdl, State1} = get_read_handle(File, State),
+ {ok, {MsgBody, BodySize}} =
+ read_message_at_offset(FileHdl, Offset, TotalSize),
+ {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}},
+ State1};
+ true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, State}
end
end.
@@ -673,17 +688,18 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
current_file_name = CurName,
sequences = Sequences
}) ->
- {PubList, PubAcc, ReadSeqId} =
+ {PubList, PubAcc, ReadSeqId, Length} =
case PubMsgSeqIds of
[] -> {[], undefined, undefined};
[_|PubMsgSeqIdsTail] ->
- {InitReadSeqId, InitWriteSeqId} =
+ {InitReadSeqId, InitWriteSeqId, InitLength} =
case ets:lookup(Sequences, Q) of
- [] -> {0,0};
- [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2}
+ [] -> {0,0,0};
+ [{Q, ReadSeqId2, WriteSeqId2, Length2}] ->
+ {ReadSeqId2, WriteSeqId2, Length2}
end,
{ lists:zip(PubMsgSeqIds, (PubMsgSeqIdsTail ++ [{next, next}])),
- InitWriteSeqId, InitReadSeqId}
+ InitWriteSeqId, InitReadSeqId, InitLength}
end,
{atomic, {Sync, WriteSeqId, State2}} =
mnesia:transaction(
@@ -719,7 +735,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
{Sync2, WriteSeqId3, State3}
end),
true = if PubList =:= [] -> true;
- true -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId})
+ true -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId, Length + erlang:length(PubList)})
end,
ok = if Sync -> file:sync(CurHdl);
true -> ok
@@ -730,16 +746,16 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
internal_publish(Q, MsgId, SeqId, MsgBody, State) ->
{ok, State1 = #dqstate { sequences = Sequences }} =
internal_tx_publish(MsgId, MsgBody, State),
- {ReadSeqId, WriteSeqId} =
+ {ReadSeqId, WriteSeqId, Length} =
case ets:lookup(Sequences, Q) of
[] -> %% previously unseen queue
- {0, 0};
- [{Q, ReadSeqId2, WriteSeqId2}] ->
- {ReadSeqId2, WriteSeqId2}
+ {0, 0, 0};
+ [{Q, ReadSeqId2, WriteSeqId2, Length2}] ->
+ {ReadSeqId2, WriteSeqId2, Length2}
end,
WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId),
WriteSeqId3Next = WriteSeqId3 + 1,
- true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId3Next}),
+ true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId3Next, Length + 1}),
ok = mnesia:dirty_write(rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId3},
msg_id = MsgId,
@@ -750,7 +766,7 @@ internal_publish(Q, MsgId, SeqId, MsgBody, State) ->
internal_tx_cancel(MsgIds, State) ->
%% we don't need seq ids because we're not touching mnesia,
%% because seqids were never assigned
- MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)),
+ MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds), undefined)),
remove_messages(undefined, MsgSeqIds, false, State).
internal_requeue(_Q, [], State) ->
@@ -780,7 +796,7 @@ internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail],
%% as they have no concept of sequence id anyway).
%% the Q _must_ already exist
- [{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q),
+ [{Q, ReadSeqId, WriteSeqId, Length}] = ets:lookup(Sequences, Q),
MsgSeqIdsZipped = lists:zip(MsgSeqIds, MsgSeqIdsTail ++ [{next, next}]),
{atomic, WriteSeqId2} =
mnesia:transaction(
@@ -806,13 +822,13 @@ internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail],
NextSeqIdTo2
end, WriteSeqId, MsgSeqIdsZipped)
end),
- true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2}),
+ true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2, Length + erlang:length(MsgSeqIds)}),
{ok, State}.
internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
case ets:lookup(Sequences, Q) of
[] -> {ok, 0, State};
- [{Q, ReadSeqId, WriteSeqId}] ->
+ [{Q, ReadSeqId, WriteSeqId, _Length}] ->
{atomic, {ok, State2}} =
mnesia:transaction(
fun() ->
@@ -825,7 +841,7 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
end, [], lists:seq(ReadSeqId, WriteSeqId - 1)),
remove_messages(Q, MsgSeqIds, txn, State)
end),
- true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}),
+ true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId, 0}),
{ok, WriteSeqId - ReadSeqId, State2}
end.
@@ -1146,7 +1162,7 @@ load_from_disk(State) ->
ok = mnesia:read_lock_table(rabbit_disk_queue),
mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId }, true) ->
true = 1 =:=
- length(dets_ets_lookup(State1, MsgId))
+ erlang:length(dets_ets_lookup(State1, MsgId))
end,
true, rabbit_disk_queue)
end),
@@ -1171,9 +1187,16 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
[] ->
true = ets:insert_new(Sequences,
{Q, SeqId, NextWrite});
- [Orig = {Q, Read, Write}] ->
+ [Orig = {Q, Read, Write, Length}] ->
Repl = {Q, lists:min([Read, SeqId]),
- lists:max([Write, NextWrite])},
+ %% Length is wrong here, but
+ %% it doesn't matter because
+ %% we'll pull out the gaps in
+ %% remove_gaps_in_sequences in
+ %% then do a straight
+ %% subtraction to get the
+ %% right length
+ lists:max([Write, NextWrite]), Length},
if Orig /= Repl ->
true = ets:insert(Sequences, Repl);
true -> true
@@ -1199,9 +1222,11 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) ->
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
lists:foreach(
- fun ({Q, ReadSeqId, WriteSeqId}) ->
+ fun ({Q, ReadSeqId, WriteSeqId, _Length}) ->
Gap = shuffle_up(Q, ReadSeqId - 1, WriteSeqId - 1, 0),
- true = ets:insert(Sequences, {Q, ReadSeqId + Gap, WriteSeqId})
+ ReadSeqId2 = ReadSeqId + Gap,
+ Length = WriteSeqId - ReadSeqId2,
+ true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId, Length})
end, ets:match_object(Sequences, '_'))
end).
@@ -1244,7 +1269,7 @@ load_messages(Left, [File|Files],
{ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
{ValidMessagesRev, ValidTotalSize} = lists:foldl(
fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case length(mnesia:dirty_index_match_object
+ case erlang:length(mnesia:dirty_index_match_object
(rabbit_disk_queue,
#dq_msg_loc { msg_id = MsgId,
queue_and_seq_id = '_',
@@ -1289,7 +1314,7 @@ recover_crashed_compactions1(Files, TmpFile) ->
%% all of these messages should appear in the mnesia table,
%% otherwise they wouldn't have been copied out
lists:foreach(fun (MsgId) ->
- true = 0 < length(mnesia:dirty_index_match_object
+ true = 0 < erlang:length(mnesia:dirty_index_match_object
(rabbit_disk_queue,
#dq_msg_loc { msg_id = MsgId,
queue_and_seq_id = '_',
@@ -1329,7 +1354,7 @@ recover_crashed_compactions1(Files, TmpFile) ->
%% we're in case 4 above.
%% check that everything in the main file is a valid message in mnesia
lists:foreach(fun (MsgId) ->
- true = 0 < length(mnesia:dirty_index_match_object
+ true = 0 < erlang:length(mnesia:dirty_index_match_object
(rabbit_disk_queue,
#dq_msg_loc { msg_id = MsgId,
queue_and_seq_id = '_',
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 4749e1dac4..c909e2a5a9 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -35,7 +35,7 @@
-export([publish/4, deliver/1, ack/2,
tx_publish/4, tx_commit/3, tx_cancel/2,
- requeue/2, purge/1]).
+ requeue/2, purge/1, length/1, is_empty/1]).
-record(mqstate, { mode,
msg_buf,
@@ -49,7 +49,7 @@
start_link(Queue, Mode) when Mode =:= disk orelse Mode =:= mixed ->
rabbit_disk_queue:start_link(?FILE_SIZE_LIMIT),
rabbit_disk_queue:to_ram_disk_mode(), %% TODO, CHANGE ME
- {ok, #mqstate { mode = Mode, msg_buf = queue:new(), next_write_seq = 0, queue = Queue }}.
+ {ok, #mqstate { mode = Mode, msg_buf = queue:new(), next_write_seq = 1, queue = Queue }}.
publish(MsgId, Msg, _IsPersistent, State = #mqstate { mode = disk, queue = Q }) ->
ok = rabbit_disk_queue:publish(Q, MsgId, Msg),
@@ -173,3 +173,12 @@ purge(State = #mqstate { queue = Q, msg_buf = MsgBuf, mode = mixed }) ->
rabbit_disk_queue:purge(Q),
Count = queue:len(MsgBuf),
{Count, State #mqstate { msg_buf = queue:new() }}.
+
+length(State = #mqstate { queue = Q, mode = disk }) ->
+ Length = rabbit_disk_queue:length(Q),
+ {Length, State};
+length(State = #mqstate { mode = mixed, msg_buf = MsgBuf }) ->
+ {queue:length(MsgBuf), State}.
+
+is_empty(State) ->
+ 0 == rabbit_mixed_queue:length(State).