diff options
| -rw-r--r-- | src/gen_server2.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 105 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 15 |
3 files changed, 85 insertions, 40 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 87b56ba38e..cf54811fe7 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -508,8 +508,9 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, MinPri, Queue, process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue1, Debug, case Time == hibernate of - true -> roused_and_disinterested; - false -> timeout + true -> {roused_and_disinterested, MinPri}; + false when MinPri =:= any -> timeout; + false -> {timeout, MinPri} end) end end. diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 813ab7c433..3a520ecdca 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -42,7 +42,7 @@ tx_publish/1, tx_commit/3, tx_cancel/1, requeue/2, purge/1, delete_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1, - requeue_next_n/2 + requeue_next_n/2, prefetch/2 ]). -export([filesync/0, cache_info/0]). @@ -345,6 +345,9 @@ report_memory() -> set_mode(Mode) -> gen_server2:cast(?SERVER, {set_mode, Mode}). +prefetch(Q, Count) -> + gen_server2:pcast(?SERVER, -1, {prefetch, Q, Count}). + %% ---- GEN-SERVER INTERNAL API ---- init([FileSizeLimit, ReadFileHandlesLimit]) -> @@ -507,21 +510,28 @@ handle_cast({set_mode, Mode}, State) -> mixed -> fun to_ram_disk_mode/1 end)(State)); handle_cast(report_memory, State) -> - %% call noreply1/1, not noreply/1, as we don't want to restart the + %% call noreply1/2, not noreply/1/2, as we don't want to restart the %% memory_report_timer %% by unsetting the timer, we force a report on the next normal message - noreply1(State #dqstate { memory_report_timer = undefined }). + noreply1(State #dqstate { memory_report_timer = undefined }, 0); +handle_cast({prefetch, Q, Count}, State) -> + {ok, State1} = internal_prefetch(Q, Count, State), + noreply(State1, any). %% set minpri to any handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; -handle_info(timeout, State = #dqstate { commit_timer_ref = undefined }) -> - ok = report_memory(true, State), - %% don't use noreply/1 or noreply1/1 as they'll restart the memory timer - {noreply, stop_memory_timer(State), hibernate, 0}; -handle_info(timeout, State) -> +handle_info({timeout, 0}, State = #dqstate { commit_timer_ref = undefined }) -> + %% this is the binary timeout coming back, with minpri = 0 + %% don't use noreply/1/2 or noreply1/2 as they'll restart the memory timer + %% set timeout to 0, and go pick up any low priority messages + {noreply, stop_memory_timer(State), 0, any}; +handle_info({timeout, 0}, State) -> + %% must have commit_timer set, so timeout was 0, and we're not hibernating noreply(sync_current_file_handle(State)); -handle_info(_Info, State) -> - noreply(State). +handle_info(timeout, State) -> + %% no minpri supplied, so it must have been 'any', so go hibernate + ok = report_memory(true, State), + {noreply, State, hibernate, any}. terminate(_Reason, State) -> shutdown(State). @@ -643,30 +653,36 @@ to_ram_disk_mode(State = #dqstate { operation_mode = disk_only, ets_bytes_per_record = undefined }. noreply(NewState) -> - noreply1(start_memory_timer(NewState)). + noreply(NewState, 0). + +noreply(NewState, MinPri) -> + noreply1(start_memory_timer(NewState), MinPri). noreply1(NewState = #dqstate { on_sync_froms = [], - commit_timer_ref = undefined }) -> - {noreply, NewState, binary, 0}; -noreply1(NewState = #dqstate { commit_timer_ref = undefined }) -> - {noreply, start_commit_timer(NewState), 0, 0}; -noreply1(NewState = #dqstate { on_sync_froms = [] }) -> - {noreply, stop_commit_timer(NewState), binary, 0}; -noreply1(NewState) -> - {noreply, NewState, 0, 0}. + commit_timer_ref = undefined }, MinPri) -> + {noreply, NewState, binary, MinPri}; +noreply1(NewState = #dqstate { commit_timer_ref = undefined }, MinPri) -> + {noreply, start_commit_timer(NewState), 0, MinPri}; +noreply1(NewState = #dqstate { on_sync_froms = [] }, MinPri) -> + {noreply, stop_commit_timer(NewState), binary, MinPri}; +noreply1(NewState, MinPri) -> + {noreply, NewState, 0, MinPri}. reply(Reply, NewState) -> - reply1(Reply, start_memory_timer(NewState)). + reply(Reply, NewState, 0). + +reply(Reply, NewState, MinPri) -> + reply1(Reply, start_memory_timer(NewState), MinPri). reply1(Reply, NewState = #dqstate { on_sync_froms = [], - commit_timer_ref = undefined }) -> - {reply, Reply, NewState, binary, 0}; -reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }) -> - {reply, Reply, start_commit_timer(NewState), 0, 0}; -reply1(Reply, NewState = #dqstate { on_sync_froms = [] }) -> - {reply, Reply, stop_commit_timer(NewState), binary, 0}; -reply1(Reply, NewState) -> - {reply, Reply, NewState, 0, 0}. + commit_timer_ref = undefined }, MinPri) -> + {reply, Reply, NewState, binary, MinPri}; +reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }, MinPri) -> + {reply, Reply, start_commit_timer(NewState), 0, MinPri}; +reply1(Reply, NewState = #dqstate { on_sync_froms = [] }, MinPri) -> + {reply, Reply, stop_commit_timer(NewState), binary, MinPri}; +reply1(Reply, NewState, MinPri) -> + {reply, Reply, NewState, 0, MinPri}. form_filename(Name) -> filename:join(base_directory(), Name). @@ -829,8 +845,12 @@ decrement_cache(MsgId, #dqstate { message_cache = Cache }) -> ok. insert_into_cache(Message = #basic_message { guid = MsgId }, - MsgSize, #dqstate { message_cache = Cache }) -> - true = ets:insert_new(Cache, {MsgId, Message, MsgSize, 1}), + MsgSize, Forced, #dqstate { message_cache = Cache }) -> + Count = case Forced of + true -> 0; + false -> 1 + end, + true = ets:insert_new(Cache, {MsgId, Message, MsgSize, Count}), ok. %% ---- INTERNAL RAW FUNCTIONS ---- @@ -843,7 +863,7 @@ internal_deliver(Q, ReadMsg, FakeDeliver, Remaining = WriteSeqId - ReadSeqId - 1, {ok, Result, State1} = internal_read_message( - Q, ReadSeqId, FakeDeliver, ReadMsg, State), + Q, ReadSeqId, ReadMsg, FakeDeliver, false, State), true = ets:insert(Sequences, {Q, ReadSeqId+1, WriteSeqId}), {ok, @@ -856,7 +876,20 @@ internal_deliver(Q, ReadMsg, FakeDeliver, end, State1} end. -internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) -> +internal_prefetch(Q, Count, State = #dqstate { sequences = Sequences }) -> + {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), + Length = WriteSeqId - ReadSeqId, + Count1 = lists:min([Length, Count]), + StateN = + lists:foldl( + fun(N, State1) -> + {ok, _MsgStuff, State2} = + internal_read_message(Q, N, true, true, true, State1), + State2 + end, State, lists:seq(ReadSeqId, ReadSeqId + Count1 - 1)), + {ok, StateN}. + +internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) -> [Obj = #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] = mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}), @@ -876,14 +909,14 @@ internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) -> {ok, {MsgBody, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize), Message = bin_to_msg(MsgBody), - ok = case RefCount of - 1 -> + ok = if RefCount > 1 orelse ForceInCache -> + insert_into_cache(Message, BodySize, + ForceInCache, State1); + true -> ok %% it's not in the cache and we only %% have 1 queue with the message. So %% don't bother putting it in the %% cache. - ok; - _ -> insert_into_cache(Message, BodySize, State1) end, {ok, {Message, BodySize, Delivered, {MsgId, ReadSeqId}}, State1}; diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 61487c9d56..2ef534ff3d 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -200,7 +200,8 @@ to_mixed_mode(TxnMessages, State = %% don't actually do anything to the disk MsgBuf = case Length of 0 -> queue:new(); - _ -> queue:from_list([{disk, Length}]) + _ -> ok = rabbit_disk_queue:prefetch(Q, Length), + queue:from_list([{disk, Length}]) end, %% remove txn messages from disk which are neither persistent and %% durable. This is necessary to avoid leaks. This is also pretty @@ -341,6 +342,7 @@ deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q, end; false -> noack end, + ok = maybe_prefetch(Q, MsgBuf1), {Msg1, IsDelivered1, AckTag1, MsgBuf1}; {disk, Rem1} -> {Msg1 = #basic_message { is_persistent = IsPersistent }, @@ -353,7 +355,8 @@ deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q, noack end, MsgBuf3 = case Rem1 of - 1 -> MsgBuf1; + 1 -> ok = maybe_prefetch(Q, MsgBuf1), + MsgBuf1; _ -> queue:in_r({disk, Rem1 - 1}, MsgBuf1) end, {Msg1, IsDelivered1, AckTag2, MsgBuf3} @@ -362,6 +365,14 @@ deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q, {{Msg, IsDelivered, AckTag, Rem}, State #mqstate { msg_buf = MsgBuf2, length = Rem }}. +maybe_prefetch(Q, MsgBuf) -> + case queue:peek(MsgBuf) of + empty -> ok; + {value, {disk, Count}} -> rabbit_disk_queue:prefetch(Q, Count); + {value, _} -> ok + end. + + remove_noacks(MsgsWithAcks) -> {AckTags, ASize} = lists:foldl( |
