diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-05-21 13:38:20 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-05-21 13:38:20 +0100 |
| commit | 3756f096565791dfb1968164c67c047c322fa390 (patch) | |
| tree | aa3f96866115555aebc4ab5fff668d17f84df5cc | |
| parent | 9c18c3d9d2809bc419b50d6eb0f3a64aee310a40 (diff) | |
| download | rabbitmq-server-git-3756f096565791dfb1968164c67c047c322fa390.tar.gz | |
Added is_empty and length functions.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 127 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 13 |
2 files changed, 87 insertions, 53 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 8c602b5310..2bc40123df 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -42,6 +42,8 @@ tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1, requeue/2, requeue_with_seqs/2, purge/1]). +-export([length/1, is_empty/1]). + -export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]). -include("rabbit.hrl"). @@ -83,7 +85,7 @@ %% FileSummary: this is an ets table which contains: %% {File, ValidTotalSize, ContiguousTop, Left, Right} %% Sequences: this is an ets table which contains: -%% {Q, ReadSeqId, WriteSeqId} +%% {Q, ReadSeqId, WriteSeqId, QueueLength} %% rabbit_disk_queue: this is an mnesia table which contains: %% #dq_msg_loc { queue_and_seq_id = {Q, SeqId}, %% is_delivered = IsDelivered, @@ -245,6 +247,8 @@ -spec(stop_and_obliterate/0 :: () -> 'ok'). -spec(to_ram_disk_mode/0 :: () -> 'ok'). -spec(to_disk_only_mode/0 :: () -> 'ok'). +-spec(length/1 :: (queue_name()) -> non_neg_integer()). +-spec(is_empty/1 :: (queue_name()) -> bool()). -endif. @@ -303,6 +307,13 @@ to_disk_only_mode() -> to_ram_disk_mode() -> gen_server:call(?SERVER, to_ram_disk_mode, infinity). +length(Q) -> + gen_server:call(?SERVER, {length, Q}, infinity). + +is_empty(Q) -> + Length = rabbit_disk_queue:length(Q), + Length == 0. + %% ---- GEN-SERVER INTERNAL API ---- init([FileSizeLimit, ReadFileHandlesLimit]) -> @@ -378,7 +389,7 @@ handle_call({phantom_deliver, Q}, _From, State) -> {ok, Result, State1} = internal_deliver(Q, false, State), {reply, Result, State1}; handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) -> - PubMsgSeqIds = lists:zip(PubMsgIds, lists:duplicate(length(PubMsgIds), next)), + PubMsgSeqIds = lists:zip(PubMsgIds, lists:duplicate(erlang:length(PubMsgIds), next)), {ok, State1} = internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, State), {reply, ok, State1}; handle_call({tx_commit_with_seqs, Q, PubSeqMsgIds, AckSeqIds}, _From, State) -> @@ -418,7 +429,12 @@ handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = disk_on {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_copies), true = ets:from_dets(MsgLocationEts, MsgLocationDets), ok = dets:delete_all_objects(MsgLocationDets), - {reply, ok, State #dqstate { operation_mode = ram_disk }}. + {reply, ok, State #dqstate { operation_mode = ram_disk }}; +handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) -> + case ets:lookup(Sequences, Q) of + [] -> {reply, 0, State}; + [{Q, _ReadSeqId, _WriteSeqId, Length}] -> {reply, Length, State} + end. handle_cast({publish, Q, MsgId, MsgBody}, State) -> {ok, State1} = internal_publish(Q, MsgId, next, MsgBody, State), @@ -436,7 +452,7 @@ handle_cast({tx_cancel, MsgIds}, State) -> {ok, State1} = internal_tx_cancel(MsgIds, State), {noreply, State1}; handle_cast({requeue, Q, MsgSeqIds}, State) -> - MsgSeqSeqIds = lists:zip(MsgSeqIds, lists:duplicate(length(MsgSeqIds), next)), + MsgSeqSeqIds = lists:zip(MsgSeqIds, lists:duplicate(erlang:length(MsgSeqIds), next)), {ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State), {noreply, State1}; handle_cast({requeue_with_seqs, Q, MsgSeqSeqIds}, State) -> @@ -522,29 +538,28 @@ dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, operation_mo internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) -> case ets:lookup(Sequences, Q) of [] -> {ok, empty, State}; - [{Q, ReadSeqId, WriteSeqId}] -> - case mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}) of - [] -> {ok, empty, State}; - [Obj = - #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId, - next_seq_id = ReadSeqId2}] -> - [{MsgId, _RefCount, File, Offset, TotalSize}] = - dets_ets_lookup(State, MsgId), - true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId}), - ok = - if Delivered -> ok; - true -> - mnesia:dirty_write(rabbit_disk_queue, - Obj #dq_msg_loc {is_delivered = true}) - end, - if ReadMsg -> - {FileHdl, State1} = get_read_handle(File, State), - {ok, {MsgBody, BodySize}} = - read_message_at_offset(FileHdl, Offset, TotalSize), - {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}}, - State1}; - true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, State} - end + [{Q, _ReadSeqId, _WriteSeqId, 0}] -> {ok, empty, State}; + [{Q, ReadSeqId, WriteSeqId, Length}] -> + [Obj = + #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId, + next_seq_id = ReadSeqId2}] = + mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}), + [{MsgId, _RefCount, File, Offset, TotalSize}] = + dets_ets_lookup(State, MsgId), + true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId, Length - 1}), + ok = + if Delivered -> ok; + true -> + mnesia:dirty_write(rabbit_disk_queue, + Obj #dq_msg_loc {is_delivered = true}) + end, + if ReadMsg -> + {FileHdl, State1} = get_read_handle(File, State), + {ok, {MsgBody, BodySize}} = + read_message_at_offset(FileHdl, Offset, TotalSize), + {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}}, + State1}; + true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, State} end end. @@ -673,17 +688,18 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, current_file_name = CurName, sequences = Sequences }) -> - {PubList, PubAcc, ReadSeqId} = + {PubList, PubAcc, ReadSeqId, Length} = case PubMsgSeqIds of [] -> {[], undefined, undefined}; [_|PubMsgSeqIdsTail] -> - {InitReadSeqId, InitWriteSeqId} = + {InitReadSeqId, InitWriteSeqId, InitLength} = case ets:lookup(Sequences, Q) of - [] -> {0,0}; - [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2} + [] -> {0,0,0}; + [{Q, ReadSeqId2, WriteSeqId2, Length2}] -> + {ReadSeqId2, WriteSeqId2, Length2} end, { lists:zip(PubMsgSeqIds, (PubMsgSeqIdsTail ++ [{next, next}])), - InitWriteSeqId, InitReadSeqId} + InitWriteSeqId, InitReadSeqId, InitLength} end, {atomic, {Sync, WriteSeqId, State2}} = mnesia:transaction( @@ -719,7 +735,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, {Sync2, WriteSeqId3, State3} end), true = if PubList =:= [] -> true; - true -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId}) + true -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId, Length + erlang:length(PubList)}) end, ok = if Sync -> file:sync(CurHdl); true -> ok @@ -730,16 +746,16 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, internal_publish(Q, MsgId, SeqId, MsgBody, State) -> {ok, State1 = #dqstate { sequences = Sequences }} = internal_tx_publish(MsgId, MsgBody, State), - {ReadSeqId, WriteSeqId} = + {ReadSeqId, WriteSeqId, Length} = case ets:lookup(Sequences, Q) of [] -> %% previously unseen queue - {0, 0}; - [{Q, ReadSeqId2, WriteSeqId2}] -> - {ReadSeqId2, WriteSeqId2} + {0, 0, 0}; + [{Q, ReadSeqId2, WriteSeqId2, Length2}] -> + {ReadSeqId2, WriteSeqId2, Length2} end, WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId), WriteSeqId3Next = WriteSeqId3 + 1, - true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId3Next}), + true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId3Next, Length + 1}), ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId3}, msg_id = MsgId, @@ -750,7 +766,7 @@ internal_publish(Q, MsgId, SeqId, MsgBody, State) -> internal_tx_cancel(MsgIds, State) -> %% we don't need seq ids because we're not touching mnesia, %% because seqids were never assigned - MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)), + MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds), undefined)), remove_messages(undefined, MsgSeqIds, false, State). internal_requeue(_Q, [], State) -> @@ -780,7 +796,7 @@ internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail], %% as they have no concept of sequence id anyway). %% the Q _must_ already exist - [{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q), + [{Q, ReadSeqId, WriteSeqId, Length}] = ets:lookup(Sequences, Q), MsgSeqIdsZipped = lists:zip(MsgSeqIds, MsgSeqIdsTail ++ [{next, next}]), {atomic, WriteSeqId2} = mnesia:transaction( @@ -806,13 +822,13 @@ internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail], NextSeqIdTo2 end, WriteSeqId, MsgSeqIdsZipped) end), - true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2}), + true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2, Length + erlang:length(MsgSeqIds)}), {ok, State}. internal_purge(Q, State = #dqstate { sequences = Sequences }) -> case ets:lookup(Sequences, Q) of [] -> {ok, 0, State}; - [{Q, ReadSeqId, WriteSeqId}] -> + [{Q, ReadSeqId, WriteSeqId, _Length}] -> {atomic, {ok, State2}} = mnesia:transaction( fun() -> @@ -825,7 +841,7 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) -> end, [], lists:seq(ReadSeqId, WriteSeqId - 1)), remove_messages(Q, MsgSeqIds, txn, State) end), - true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}), + true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId, 0}), {ok, WriteSeqId - ReadSeqId, State2} end. @@ -1146,7 +1162,7 @@ load_from_disk(State) -> ok = mnesia:read_lock_table(rabbit_disk_queue), mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId }, true) -> true = 1 =:= - length(dets_ets_lookup(State1, MsgId)) + erlang:length(dets_ets_lookup(State1, MsgId)) end, true, rabbit_disk_queue) end), @@ -1171,9 +1187,16 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> [] -> true = ets:insert_new(Sequences, {Q, SeqId, NextWrite}); - [Orig = {Q, Read, Write}] -> + [Orig = {Q, Read, Write, Length}] -> Repl = {Q, lists:min([Read, SeqId]), - lists:max([Write, NextWrite])}, + %% Length is wrong here, but + %% it doesn't matter because + %% we'll pull out the gaps in + %% remove_gaps_in_sequences in + %% then do a straight + %% subtraction to get the + %% right length + lists:max([Write, NextWrite]), Length}, if Orig /= Repl -> true = ets:insert(Sequences, Repl); true -> true @@ -1199,9 +1222,11 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) -> fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), lists:foreach( - fun ({Q, ReadSeqId, WriteSeqId}) -> + fun ({Q, ReadSeqId, WriteSeqId, _Length}) -> Gap = shuffle_up(Q, ReadSeqId - 1, WriteSeqId - 1, 0), - true = ets:insert(Sequences, {Q, ReadSeqId + Gap, WriteSeqId}) + ReadSeqId2 = ReadSeqId + Gap, + Length = WriteSeqId - ReadSeqId2, + true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId, Length}) end, ets:match_object(Sequences, '_')) end). @@ -1244,7 +1269,7 @@ load_messages(Left, [File|Files], {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), {ValidMessagesRev, ValidTotalSize} = lists:foldl( fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - case length(mnesia:dirty_index_match_object + case erlang:length(mnesia:dirty_index_match_object (rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue_and_seq_id = '_', @@ -1289,7 +1314,7 @@ recover_crashed_compactions1(Files, TmpFile) -> %% all of these messages should appear in the mnesia table, %% otherwise they wouldn't have been copied out lists:foreach(fun (MsgId) -> - true = 0 < length(mnesia:dirty_index_match_object + true = 0 < erlang:length(mnesia:dirty_index_match_object (rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue_and_seq_id = '_', @@ -1329,7 +1354,7 @@ recover_crashed_compactions1(Files, TmpFile) -> %% we're in case 4 above. %% check that everything in the main file is a valid message in mnesia lists:foreach(fun (MsgId) -> - true = 0 < length(mnesia:dirty_index_match_object + true = 0 < erlang:length(mnesia:dirty_index_match_object (rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue_and_seq_id = '_', diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 4749e1dac4..c909e2a5a9 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -35,7 +35,7 @@ -export([publish/4, deliver/1, ack/2, tx_publish/4, tx_commit/3, tx_cancel/2, - requeue/2, purge/1]). + requeue/2, purge/1, length/1, is_empty/1]). -record(mqstate, { mode, msg_buf, @@ -49,7 +49,7 @@ start_link(Queue, Mode) when Mode =:= disk orelse Mode =:= mixed -> rabbit_disk_queue:start_link(?FILE_SIZE_LIMIT), rabbit_disk_queue:to_ram_disk_mode(), %% TODO, CHANGE ME - {ok, #mqstate { mode = Mode, msg_buf = queue:new(), next_write_seq = 0, queue = Queue }}. + {ok, #mqstate { mode = Mode, msg_buf = queue:new(), next_write_seq = 1, queue = Queue }}. publish(MsgId, Msg, _IsPersistent, State = #mqstate { mode = disk, queue = Q }) -> ok = rabbit_disk_queue:publish(Q, MsgId, Msg), @@ -173,3 +173,12 @@ purge(State = #mqstate { queue = Q, msg_buf = MsgBuf, mode = mixed }) -> rabbit_disk_queue:purge(Q), Count = queue:len(MsgBuf), {Count, State #mqstate { msg_buf = queue:new() }}. + +length(State = #mqstate { queue = Q, mode = disk }) -> + Length = rabbit_disk_queue:length(Q), + {Length, State}; +length(State = #mqstate { mode = mixed, msg_buf = MsgBuf }) -> + {queue:length(MsgBuf), State}. + +is_empty(State) -> + 0 == rabbit_mixed_queue:length(State). |
