summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-21 11:29:18 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-21 11:29:18 +0100
commit83d676cb1221e86bcaec55e3291262b7fc88627f (patch)
treef5452e41afb49003065334ffa38a9e0f66e7531f
parente3503c1672da06391ff10a301ed751931fd89407 (diff)
downloadrabbitmq-server-git-83d676cb1221e86bcaec55e3291262b7fc88627f.tar.gz
Stripping out old broken prefetch. Also reverted gen_server2 back to the revision at the end of bug21087 on the grounds that the min_pri stuff wasn't enormously compelling and added a good chunk of complexity. Also, I don't believe it'll be needed for the new prefetcher. All tests pass.
-rw-r--r--src/gen_server2.erl233
-rw-r--r--src/rabbit_disk_queue.erl90
-rw-r--r--src/rabbit_mixed_queue.erl14
3 files changed, 83 insertions, 254 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index cf54811fe7..e46f2645bd 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -37,32 +37,6 @@
%% Explicit timeouts (i.e. not 'binary') from the handle_* functions
%% are still supported, and do not have any effect on the current
%% timeout value.
-%%
-%% 6) init/1 can also return (either a further arg in addition to
-%% timeout above, or as a key-value list with the timeout as {timeout,
-%% Timeout}) a minimum priority (key: min_priority). This can also be
-%% returned from handle_* functions (i.e. {noreply, NewState} or
-%% {noreply, NewState, Timeout} or {noreply, NewState, Timeout,
-%% MinPri} or {noreply, NewState, [{min_priority, MinPri}]} or
-%% {noreply, NewState, [{min_priority, MinPri}, {timeout,
-%% Timeout}]}). What this does is to only allow messages greater than
-%% the indicated priority through to the module. To allow any message
-%% through (as is the default), use 'any'. One effect of this is that
-%% when hibernating, the process can be woken up to receive a message
-%% which it then realises it is not interested in. When this happens,
-%% handle_info(roused_and_disinterested, State) will be called as soon
-%% as there are no further messages to process (i.e. upon waking, the
-%% message queue is drained, and a timeout of 0 is used).
-%%
-%% This feature means that you can delay processing lower priority
-%% messages. For example, when a min_priority of 0 is combined with
-%% the binary backoff timeout, you can delay processing any
-%% negative-priority messages until the first timeout fires which
-%% indicates that, given a steady state, the process has been idle for
-%% sufficiently long that it's reasonable to expect it to be
-%% uninterrupted by higher-priority messages for some little while;
-%% thus preventing low-priority, but lengthy jobs from getting in the
-%% way of higher priority jobs that need quick responses.
%% All modifications are (C) 2009 LShift Ltd.
@@ -159,8 +133,7 @@
cast/2, pcast/3, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
- enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6,
- wake_hib/8]).
+ enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]).
-export([behaviour_info/1]).
@@ -338,15 +311,7 @@ multi_call(Nodes, Name, Req, Timeout)
%%-----------------------------------------------------------------
-%% enter_loop(Mod, Options, State) -> _
-%% enter_loop(Mod, Options, State, ServerName) -> _
-%% enter_loop(Mod, Options, State, [{Key, Value}]) -> _
-%% enter_loop(Mod, Options, State, Timeout) -> _
-%% enter_loop(Mod, Options, State, ServerName, [{Key, Value}]) -> _
-%% enter_loop(Mod, Options, State, ServerName, Timeout) -> _
-%% enter_loop(Mod, Options, State, ServerName, Timeout, MinPri) -> _
-%%
-%% {Key, Value} = {min_priority, MinPri} | {timeout, Timeout}
+%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_
%%
%% Description: Makes an existing process into a gen_server.
%% The calling process will enter the gen_server receive
@@ -357,32 +322,22 @@ multi_call(Nodes, Name, Req, Timeout)
%% process, including registering a name for it.
%%-----------------------------------------------------------------
enter_loop(Mod, Options, State) ->
- enter_loop(Mod, Options, State, self(), []).
+ enter_loop(Mod, Options, State, self(), infinity).
enter_loop(Mod, Options, State, ServerName = {_, _}) ->
- enter_loop(Mod, Options, State, ServerName, []);
-
-enter_loop(Mod, Options, State, Opts) when is_list(Opts) ->
- enter_loop(Mod, Options, State, self(), Opts);
+ enter_loop(Mod, Options, State, ServerName, infinity);
enter_loop(Mod, Options, State, Timeout) ->
- enter_loop(Mod, Options, State, self(), [{timeout, Timeout}]).
+ enter_loop(Mod, Options, State, self(), Timeout).
-enter_loop(Mod, Options, State, ServerName, Opts) when is_list(Opts) ->
+enter_loop(Mod, Options, State, ServerName, Timeout) ->
Name = get_proc_name(ServerName),
Parent = get_parent(),
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
- [{timeout, Timeout}, {min_priority, MinPri}] = extract_timeout_minpri(Opts),
{Timeout1, TimeoutState} = build_timeout_state(Timeout),
- loop(Parent, Name, State, Mod, Timeout1, TimeoutState, MinPri, Queue, Debug);
-
-enter_loop(Mod, Options, State, ServerName, Timeout) ->
- enter_loop(Mod, Options, State, ServerName, [{timeout, Timeout}]).
+ loop(Parent, Name, State, Mod, Timeout1, TimeoutState, Queue, Debug).
-enter_loop(Mod, Options, State, ServerName, Timeout, MinPri) ->
- enter_loop(Mod, Options, State, ServerName,
- [{timeout, Timeout}, {min_priority, MinPri}]).
%%%========================================================================
%%% Gen-callback functions
%%%========================================================================
@@ -402,19 +357,13 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
Queue = priority_queue:new(),
case catch Mod:init(Args) of
{ok, State} ->
- proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, infinity, undefined,
- any, Queue, Debug);
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug);
{ok, State, Timeout} ->
proc_lib:init_ack(Starter, {ok, self()}),
{Timeout1, TimeoutState} = build_timeout_state(Timeout),
- loop(Parent, Name, State, Mod, Timeout1, TimeoutState,
- any, Queue, Debug);
- {ok, State, Timeout, MinPri} ->
- proc_lib:init_ack(Starter, {ok, self()}),
- {Timeout1, TimeoutState} = build_timeout_state(Timeout),
- loop(Parent, Name, State, Mod, Timeout1, TimeoutState,
- MinPri, Queue, Debug);
+ loop(Parent, Name, State, Mod, Timeout1, TimeoutState, Queue,
+ Debug);
{stop, Reason} ->
%% For consistency, we must make sure that the
%% registered name (if any) is unregistered before
@@ -458,71 +407,57 @@ build_timeout_state(Timeout) ->
_ -> {Timeout, undefined}
end.
-extract_timeout_minpri(Opts) ->
- rabbit_misc:keygets([{timeout, infinity}, {min_priority, any}], Opts).
-
%%%========================================================================
%%% Internal functions
%%%========================================================================
%%% ---------------------------------------------------
%%% The MAIN loop.
%%% ---------------------------------------------------
-loop(Parent, Name, State, Mod, hibernate, undefined, MinPri, Queue, Debug) ->
- proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod, undefined,
- MinPri, Queue, Debug]);
-loop(Parent, Name, State, Mod, hibernate, {Current, Min, undefined},
- MinPri, Queue, Debug) ->
+loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) ->
+ proc_lib:hibernate(?MODULE,wake_hib,
+ [Parent, Name, State, Mod, undefined, Queue, Debug]);
+loop(Parent, Name, State, Mod, hibernate, {Current, Min, undefined}, Queue,
+ Debug) ->
proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod,
- {Current, Min, now()},
- MinPri, Queue, Debug]);
-loop(Parent, Name, State, Mod, Time, TimeoutState, MinPri, Queue, Debug) ->
+ {Current, Min, now()}, Queue, Debug]);
+loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
receive
Input -> loop(Parent, Name, State, Mod,
- Time, TimeoutState, MinPri, in(Input, Queue), Debug)
+ Time, TimeoutState, in(Input, Queue), Debug)
after 0 ->
process_next_msg(Parent, Name, State, Mod, Time, TimeoutState,
- MinPri, Queue, Debug)
+ Queue, Debug, false)
end.
-process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, MinPri, Queue,
- Debug) ->
- Res = case MinPri of
- any -> priority_queue:out(Queue);
- _ -> priority_queue:out(MinPri, Queue)
- end,
- case Res of
+process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
+ Debug, Hib) ->
+ case priority_queue:out(Queue) of
{{value, Msg}, Queue1} ->
process_msg(Parent, Name, State, Mod,
- Time, TimeoutState, Queue1, Debug, Msg);
+ Time, TimeoutState, Queue1, Debug, Hib, Msg);
{empty, Queue1} ->
Time1 = case {Time, TimeoutState} of
- {hibernate, _} -> 0;
{binary, {Current, _Min, undefined}} -> Current;
_ -> Time
end,
receive
Input ->
loop(Parent, Name, State, Mod,
- Time, TimeoutState, MinPri, in(Input, Queue1), Debug)
+ Time, TimeoutState, in(Input, Queue1), Debug)
after Time1 ->
process_msg(Parent, Name, State, Mod,
- Time, TimeoutState, Queue1, Debug,
- case Time == hibernate of
- true -> {roused_and_disinterested, MinPri};
- false when MinPri =:= any -> timeout;
- false -> {timeout, MinPri}
- end)
+ Time, TimeoutState, Queue1, Debug, Hib, timeout)
end
end.
-wake_hib(Parent, Name, State, Mod, TimeoutState, MinPri, Queue, Debug) ->
+wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
Msg = receive
Input ->
Input
end,
TimeoutState1 = adjust_hibernate_after(TimeoutState),
process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState1,
- MinPri, in(Msg, Queue), Debug).
+ in(Msg, Queue), Debug, true).
adjust_hibernate_after(undefined) ->
undefined;
@@ -553,12 +488,15 @@ in(Input, Queue) ->
priority_queue:in(Input, Queue).
process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
- Debug, Msg) ->
+ Debug, _Hib, Msg) ->
case Msg of
{system, From, Req} ->
sys:handle_system_msg
(Req, From, Parent, ?MODULE, Debug,
[Name, State, Mod, Time, TimeoutState, Queue]);
+ %% gen_server puts Hib on the end as the 7th arg, but that
+ %% version of the function seems not to be documented so
+ %% leaving out for now.
{'EXIT', Parent, Reason} ->
terminate(Reason, Name, Msg, Mod, State, Debug);
_Msg when Debug =:= [] ->
@@ -769,34 +707,14 @@ handle_msg({'$gen_call', From, Msg},
case catch Mod:handle_call(Msg, From, State) of
{reply, Reply, NState} ->
reply(From, Reply),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue,
- []);
- {reply, Reply, NState, Opts} when is_list(Opts) ->
- reply(From, Reply),
- [{timeout, Time}, {min_priority, MinPri}] =
- extract_timeout_minpri(Opts),
- loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
- []);
- {reply, Reply, NState, Time} ->
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
+ {reply, Reply, NState, Time1} ->
reply(From, Reply),
- loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, []);
- {reply, Reply, NState, Time, MinPri} ->
- reply(From, Reply),
- loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
- []);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
{noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue,
- []);
- {noreply, NState, Opts} when is_list(Opts) ->
- [{timeout, Time}, {min_priority, MinPri}] =
- extract_timeout_minpri(Opts),
- loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
- []);
- {noreply, NState, Time} ->
- loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, []);
- {noreply, NState, Time, MinPri} ->
- loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
- []);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
+ {noreply, NState, Time1} ->
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Name, Msg, Mod, NState, [])),
@@ -816,44 +734,20 @@ handle_msg({'$gen_call', From, Msg},
case catch Mod:handle_call(Msg, From, State) of
{reply, Reply, NState} ->
Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue,
- Debug1);
- {reply, Reply, NState, Opts} when is_list(Opts) ->
- Debug1 = reply(Name, From, Reply, NState, Debug),
- [{timeout, Time}, {min_priority, MinPri}] =
- extract_timeout_minpri(Opts),
- loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
- Debug1);
- {reply, Reply, NState, Time} ->
- Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue,
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
Debug1);
- {reply, Reply, NState, Time, MinPri} ->
+ {reply, Reply, NState, Time1} ->
Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
- Debug1);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
{noreply, NState} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue,
- Debug1);
- {noreply, NState, Opts} when is_list(Opts) ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {noreply, NState}),
- [{timeout, Time}, {min_priority, MinPri}] =
- extract_timeout_minpri(Opts),
- loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
- Debug1);
- {noreply, NState, Time} ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {noreply, NState}),
- loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue,
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
Debug1);
- {noreply, NState, Time, MinPri} ->
+ {noreply, NState, Time1} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
- Debug1);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Name, Msg, Mod, NState, Debug)),
@@ -873,18 +767,9 @@ handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
TimeoutState, Queue) ->
case Reply of
{noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue,
- []);
- {noreply, NState, Opts} when is_list(Opts) ->
- [{timeout, Time}, {min_priority, MinPri}] =
- extract_timeout_minpri(Opts),
- loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
- []);
- {noreply, NState, Time} ->
- loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, []);
- {noreply, NState, Time, MinPri} ->
- loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
- []);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
+ {noreply, NState, Time1} ->
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
{stop, Reason, NState} ->
terminate(Reason, Name, Msg, Mod, NState, []);
{'EXIT', What} ->
@@ -899,25 +784,12 @@ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue,
{noreply, NState} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue,
- Debug1);
- {noreply, NState, Opts} when is_list(Opts) ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {noreply, NState}),
- [{timeout, Time}, {min_priority, MinPri}] =
- extract_timeout_minpri(Opts),
- loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
Debug1);
- {noreply, NState, Time} ->
+ {noreply, NState, Time1} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue,
- Debug1);
- {noreply, NState, Time, MinPri} ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {noreply, NState}),
- loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
- Debug1);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
{stop, Reason, NState} ->
terminate(Reason, Name, Msg, Mod, NState, Debug);
{'EXIT', What} ->
@@ -935,9 +807,8 @@ reply(Name, {To, Tag}, Reply, State, Debug) ->
%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
%%-----------------------------------------------------------------
-system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, MinPri,
- Queue]) ->
- loop(Parent, Name, State, Mod, Time, TimeoutState, MinPri, Queue, Debug).
+system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) ->
+ loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug).
-ifdef(use_specs).
-spec system_terminate(_, _, _, [_]) -> no_return().
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 0e43c38779..e8a63bc364 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -42,7 +42,7 @@
tx_publish/1, tx_commit/3, tx_cancel/1,
requeue/2, purge/1, delete_queue/1,
delete_non_durable_queues/1, auto_ack_next_message/1,
- requeue_next_n/2, prefetch/2, length/1, foldl/3
+ requeue_next_n/2, length/1, foldl/3
]).
-export([filesync/0, cache_info/0]).
@@ -280,7 +280,6 @@
-spec(cache_info/0 :: () -> [{atom(), term()}]).
-spec(report_memory/0 :: () -> 'ok').
-spec(set_mode/1 :: ('disk' | 'mixed') -> 'ok').
--spec(prefetch/2 :: (queue_name(), non_neg_integer()) -> 'ok').
-endif.
@@ -361,9 +360,6 @@ report_memory() ->
set_mode(Mode) ->
gen_server2:cast(?SERVER, {set_mode, Mode}).
-prefetch(Q, Count) ->
- gen_server2:pcast(?SERVER, -1, {prefetch, Q, Count}).
-
%% ---- GEN-SERVER INTERNAL API ----
init([FileSizeLimit, ReadFileHandlesLimit]) ->
@@ -455,7 +451,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% ets_bytes_per_record otherwise.
ok = rabbit_queue_mode_manager:report_memory(self(), 0, false),
ok = report_memory(false, State2),
- {ok, State2, {binary, ?HIBERNATE_AFTER_MIN}, 0}.
+ {ok, State2, {binary, ?HIBERNATE_AFTER_MIN}}.
handle_call({deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, true, State),
@@ -535,25 +531,18 @@ handle_cast(report_memory, State) ->
%% call noreply1/2, not noreply/1/2, as we don't want to restart the
%% memory_report_timer
%% by unsetting the timer, we force a report on the next normal message
- noreply1(State #dqstate { memory_report_timer = undefined }, 0);
-handle_cast({prefetch, Q, Count}, State) ->
- {ok, State1} = internal_prefetch(Q, Count, State),
- noreply(State1, any). %% set minpri to any
+ noreply1(State #dqstate { memory_report_timer = undefined }).
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
-handle_info({timeout, 0}, State = #dqstate { commit_timer_ref = undefined }) ->
- %% this is the binary timeout coming back, with minpri = 0
- %% don't use noreply/1/2 or noreply1/2 as they'll restart the memory timer
- %% set timeout to 0, and go pick up any low priority messages
- {noreply, stop_memory_timer(State), 0, any};
-handle_info({timeout, 0}, State) ->
- %% must have commit_timer set, so timeout was 0, and we're not hibernating
- noreply(sync_current_file_handle(State));
-handle_info(timeout, State) ->
- %% no minpri supplied, so it must have been 'any', so go hibernate
+handle_info(timeout, State = #dqstate { commit_timer_ref = undefined }) ->
+ %% this is the binary timeout coming back
+ %% don't use noreply/1 or noreply1/1 as they'll restart the memory timer
ok = report_memory(true, State),
- {noreply, State, hibernate, any}.
+ {noreply, stop_memory_timer(State), hibernate};
+handle_info(timeout, State) ->
+ %% must have commit_timer set, so timeout was 0, and we're not hibernating
+ noreply(sync_current_file_handle(State)).
terminate(_Reason, State) ->
shutdown(State).
@@ -675,36 +664,30 @@ to_ram_disk_mode(State = #dqstate { operation_mode = disk_only,
ets_bytes_per_record = undefined }.
noreply(NewState) ->
- noreply(NewState, 0).
-
-noreply(NewState, MinPri) ->
- noreply1(start_memory_timer(NewState), MinPri).
+ noreply1(start_memory_timer(NewState)).
noreply1(NewState = #dqstate { on_sync_txns = [],
- commit_timer_ref = undefined }, MinPri) ->
- {noreply, NewState, binary, MinPri};
-noreply1(NewState = #dqstate { commit_timer_ref = undefined }, MinPri) ->
- {noreply, start_commit_timer(NewState), 0, MinPri};
-noreply1(NewState = #dqstate { on_sync_txns = [] }, MinPri) ->
- {noreply, stop_commit_timer(NewState), binary, MinPri};
-noreply1(NewState, MinPri) ->
- {noreply, NewState, 0, MinPri}.
+ commit_timer_ref = undefined }) ->
+ {noreply, NewState, binary};
+noreply1(NewState = #dqstate { commit_timer_ref = undefined }) ->
+ {noreply, start_commit_timer(NewState), 0};
+noreply1(NewState = #dqstate { on_sync_txns = [] }) ->
+ {noreply, stop_commit_timer(NewState), binary};
+noreply1(NewState) ->
+ {noreply, NewState, 0}.
reply(Reply, NewState) ->
- reply(Reply, NewState, 0).
-
-reply(Reply, NewState, MinPri) ->
- reply1(Reply, start_memory_timer(NewState), MinPri).
+ reply1(Reply, start_memory_timer(NewState)).
reply1(Reply, NewState = #dqstate { on_sync_txns = [],
- commit_timer_ref = undefined }, MinPri) ->
- {reply, Reply, NewState, binary, MinPri};
-reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }, MinPri) ->
- {reply, Reply, start_commit_timer(NewState), 0, MinPri};
-reply1(Reply, NewState = #dqstate { on_sync_txns = [] }, MinPri) ->
- {reply, Reply, stop_commit_timer(NewState), binary, MinPri};
-reply1(Reply, NewState, MinPri) ->
- {reply, Reply, NewState, 0, MinPri}.
+ commit_timer_ref = undefined }) ->
+ {reply, Reply, NewState, binary};
+reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }) ->
+ {reply, Reply, start_commit_timer(NewState), 0};
+reply1(Reply, NewState = #dqstate { on_sync_txns = [] }) ->
+ {reply, Reply, stop_commit_timer(NewState), binary};
+reply1(Reply, NewState) ->
+ {reply, Reply, NewState, 0}.
form_filename(Name) ->
filename:join(base_directory(), Name).
@@ -905,23 +888,6 @@ internal_deliver(Q, ReadMsg,
end, State1}
end.
-internal_prefetch(Q, Count, State = #dqstate { sequences = Sequences }) ->
- {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
- Length = WriteSeqId - ReadSeqId,
- Count1 = lists:min([Length, Count]),
- StateN = internal_prefetch(Q, ReadSeqId + Count1 - 1, ReadSeqId, State),
- {ok, StateN}.
-
-internal_prefetch(_Q, Target, Target, State) ->
- State;
-internal_prefetch(Q, Target, ReadSeqId, State) ->
- {ok, _MsgStuff, State1} =
- internal_read_message(Q, ReadSeqId, true, true, true, State),
- case cache_is_full(State1) of
- true -> State1;
- false -> internal_prefetch(Q, Target, ReadSeqId + 1, State1)
- end.
-
internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) ->
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId).
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 50b75789bd..3c2f99e670 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -348,17 +348,9 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
{{Msg, IsDelivered, AckTag, Rem},
State #mqstate { msg_buf = MsgBuf2, length = Rem }}.
-maybe_prefetch(disk, _MsgBuf) ->
- ok;
-maybe_prefetch(mixed, MsgBuf) ->
- case queue:peek(MsgBuf) of
- empty ->
- ok;
- {value, {#basic_message {}, _IsDelivered}} ->
- ok;
- {value, {Q, Count}} ->
- rabbit_disk_queue:prefetch(Q, Count)
- end.
+maybe_prefetch(_, _) ->
+ %% disable just for the time being
+ ok.
remove_noacks(MsgsWithAcks) ->
lists:foldl(