summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/gen_server2.erl5
-rw-r--r--src/rabbit_disk_queue.erl105
-rw-r--r--src/rabbit_mixed_queue.erl15
3 files changed, 85 insertions, 40 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 87b56ba38e..cf54811fe7 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -508,8 +508,9 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, MinPri, Queue,
process_msg(Parent, Name, State, Mod,
Time, TimeoutState, Queue1, Debug,
case Time == hibernate of
- true -> roused_and_disinterested;
- false -> timeout
+ true -> {roused_and_disinterested, MinPri};
+ false when MinPri =:= any -> timeout;
+ false -> {timeout, MinPri}
end)
end
end.
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 813ab7c433..3a520ecdca 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -42,7 +42,7 @@
tx_publish/1, tx_commit/3, tx_cancel/1,
requeue/2, purge/1, delete_queue/1,
delete_non_durable_queues/1, auto_ack_next_message/1,
- requeue_next_n/2
+ requeue_next_n/2, prefetch/2
]).
-export([filesync/0, cache_info/0]).
@@ -345,6 +345,9 @@ report_memory() ->
set_mode(Mode) ->
gen_server2:cast(?SERVER, {set_mode, Mode}).
+prefetch(Q, Count) ->
+ gen_server2:pcast(?SERVER, -1, {prefetch, Q, Count}).
+
%% ---- GEN-SERVER INTERNAL API ----
init([FileSizeLimit, ReadFileHandlesLimit]) ->
@@ -507,21 +510,28 @@ handle_cast({set_mode, Mode}, State) ->
mixed -> fun to_ram_disk_mode/1
end)(State));
handle_cast(report_memory, State) ->
- %% call noreply1/1, not noreply/1, as we don't want to restart the
+ %% call noreply1/2, not noreply/1/2, as we don't want to restart the
%% memory_report_timer
%% by unsetting the timer, we force a report on the next normal message
- noreply1(State #dqstate { memory_report_timer = undefined }).
+ noreply1(State #dqstate { memory_report_timer = undefined }, 0);
+handle_cast({prefetch, Q, Count}, State) ->
+ {ok, State1} = internal_prefetch(Q, Count, State),
+ noreply(State1, any). %% set minpri to any
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
-handle_info(timeout, State = #dqstate { commit_timer_ref = undefined }) ->
- ok = report_memory(true, State),
- %% don't use noreply/1 or noreply1/1 as they'll restart the memory timer
- {noreply, stop_memory_timer(State), hibernate, 0};
-handle_info(timeout, State) ->
+handle_info({timeout, 0}, State = #dqstate { commit_timer_ref = undefined }) ->
+ %% this is the binary timeout coming back, with minpri = 0
+ %% don't use noreply/1/2 or noreply1/2 as they'll restart the memory timer
+ %% set timeout to 0, and go pick up any low priority messages
+ {noreply, stop_memory_timer(State), 0, any};
+handle_info({timeout, 0}, State) ->
+ %% must have commit_timer set, so timeout was 0, and we're not hibernating
noreply(sync_current_file_handle(State));
-handle_info(_Info, State) ->
- noreply(State).
+handle_info(timeout, State) ->
+ %% no minpri supplied, so it must have been 'any', so go hibernate
+ ok = report_memory(true, State),
+ {noreply, State, hibernate, any}.
terminate(_Reason, State) ->
shutdown(State).
@@ -643,30 +653,36 @@ to_ram_disk_mode(State = #dqstate { operation_mode = disk_only,
ets_bytes_per_record = undefined }.
noreply(NewState) ->
- noreply1(start_memory_timer(NewState)).
+ noreply(NewState, 0).
+
+noreply(NewState, MinPri) ->
+ noreply1(start_memory_timer(NewState), MinPri).
noreply1(NewState = #dqstate { on_sync_froms = [],
- commit_timer_ref = undefined }) ->
- {noreply, NewState, binary, 0};
-noreply1(NewState = #dqstate { commit_timer_ref = undefined }) ->
- {noreply, start_commit_timer(NewState), 0, 0};
-noreply1(NewState = #dqstate { on_sync_froms = [] }) ->
- {noreply, stop_commit_timer(NewState), binary, 0};
-noreply1(NewState) ->
- {noreply, NewState, 0, 0}.
+ commit_timer_ref = undefined }, MinPri) ->
+ {noreply, NewState, binary, MinPri};
+noreply1(NewState = #dqstate { commit_timer_ref = undefined }, MinPri) ->
+ {noreply, start_commit_timer(NewState), 0, MinPri};
+noreply1(NewState = #dqstate { on_sync_froms = [] }, MinPri) ->
+ {noreply, stop_commit_timer(NewState), binary, MinPri};
+noreply1(NewState, MinPri) ->
+ {noreply, NewState, 0, MinPri}.
reply(Reply, NewState) ->
- reply1(Reply, start_memory_timer(NewState)).
+ reply(Reply, NewState, 0).
+
+reply(Reply, NewState, MinPri) ->
+ reply1(Reply, start_memory_timer(NewState), MinPri).
reply1(Reply, NewState = #dqstate { on_sync_froms = [],
- commit_timer_ref = undefined }) ->
- {reply, Reply, NewState, binary, 0};
-reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }) ->
- {reply, Reply, start_commit_timer(NewState), 0, 0};
-reply1(Reply, NewState = #dqstate { on_sync_froms = [] }) ->
- {reply, Reply, stop_commit_timer(NewState), binary, 0};
-reply1(Reply, NewState) ->
- {reply, Reply, NewState, 0, 0}.
+ commit_timer_ref = undefined }, MinPri) ->
+ {reply, Reply, NewState, binary, MinPri};
+reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }, MinPri) ->
+ {reply, Reply, start_commit_timer(NewState), 0, MinPri};
+reply1(Reply, NewState = #dqstate { on_sync_froms = [] }, MinPri) ->
+ {reply, Reply, stop_commit_timer(NewState), binary, MinPri};
+reply1(Reply, NewState, MinPri) ->
+ {reply, Reply, NewState, 0, MinPri}.
form_filename(Name) ->
filename:join(base_directory(), Name).
@@ -829,8 +845,12 @@ decrement_cache(MsgId, #dqstate { message_cache = Cache }) ->
ok.
insert_into_cache(Message = #basic_message { guid = MsgId },
- MsgSize, #dqstate { message_cache = Cache }) ->
- true = ets:insert_new(Cache, {MsgId, Message, MsgSize, 1}),
+ MsgSize, Forced, #dqstate { message_cache = Cache }) ->
+ Count = case Forced of
+ true -> 0;
+ false -> 1
+ end,
+ true = ets:insert_new(Cache, {MsgId, Message, MsgSize, Count}),
ok.
%% ---- INTERNAL RAW FUNCTIONS ----
@@ -843,7 +863,7 @@ internal_deliver(Q, ReadMsg, FakeDeliver,
Remaining = WriteSeqId - ReadSeqId - 1,
{ok, Result, State1} =
internal_read_message(
- Q, ReadSeqId, FakeDeliver, ReadMsg, State),
+ Q, ReadSeqId, ReadMsg, FakeDeliver, false, State),
true = ets:insert(Sequences,
{Q, ReadSeqId+1, WriteSeqId}),
{ok,
@@ -856,7 +876,20 @@ internal_deliver(Q, ReadMsg, FakeDeliver,
end, State1}
end.
-internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) ->
+internal_prefetch(Q, Count, State = #dqstate { sequences = Sequences }) ->
+ {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
+ Length = WriteSeqId - ReadSeqId,
+ Count1 = lists:min([Length, Count]),
+ StateN =
+ lists:foldl(
+ fun(N, State1) ->
+ {ok, _MsgStuff, State2} =
+ internal_read_message(Q, N, true, true, true, State1),
+ State2
+ end, State, lists:seq(ReadSeqId, ReadSeqId + Count1 - 1)),
+ {ok, StateN}.
+
+internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) ->
[Obj =
#dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] =
mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}),
@@ -876,14 +909,14 @@ internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) ->
{ok, {MsgBody, BodySize}} =
read_message_at_offset(FileHdl, Offset, TotalSize),
Message = bin_to_msg(MsgBody),
- ok = case RefCount of
- 1 ->
+ ok = if RefCount > 1 orelse ForceInCache ->
+ insert_into_cache(Message, BodySize,
+ ForceInCache, State1);
+ true -> ok
%% it's not in the cache and we only
%% have 1 queue with the message. So
%% don't bother putting it in the
%% cache.
- ok;
- _ -> insert_into_cache(Message, BodySize, State1)
end,
{ok, {Message, BodySize, Delivered, {MsgId, ReadSeqId}},
State1};
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 61487c9d56..2ef534ff3d 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -200,7 +200,8 @@ to_mixed_mode(TxnMessages, State =
%% don't actually do anything to the disk
MsgBuf = case Length of
0 -> queue:new();
- _ -> queue:from_list([{disk, Length}])
+ _ -> ok = rabbit_disk_queue:prefetch(Q, Length),
+ queue:from_list([{disk, Length}])
end,
%% remove txn messages from disk which are neither persistent and
%% durable. This is necessary to avoid leaks. This is also pretty
@@ -341,6 +342,7 @@ deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q,
end;
false -> noack
end,
+ ok = maybe_prefetch(Q, MsgBuf1),
{Msg1, IsDelivered1, AckTag1, MsgBuf1};
{disk, Rem1} ->
{Msg1 = #basic_message { is_persistent = IsPersistent },
@@ -353,7 +355,8 @@ deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q,
noack
end,
MsgBuf3 = case Rem1 of
- 1 -> MsgBuf1;
+ 1 -> ok = maybe_prefetch(Q, MsgBuf1),
+ MsgBuf1;
_ -> queue:in_r({disk, Rem1 - 1}, MsgBuf1)
end,
{Msg1, IsDelivered1, AckTag2, MsgBuf3}
@@ -362,6 +365,14 @@ deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q,
{{Msg, IsDelivered, AckTag, Rem},
State #mqstate { msg_buf = MsgBuf2, length = Rem }}.
+maybe_prefetch(Q, MsgBuf) ->
+ case queue:peek(MsgBuf) of
+ empty -> ok;
+ {value, {disk, Count}} -> rabbit_disk_queue:prefetch(Q, Count);
+ {value, _} -> ok
+ end.
+
+
remove_noacks(MsgsWithAcks) ->
{AckTags, ASize} =
lists:foldl(