diff options
| -rw-r--r-- | src/gen_server2.erl | 233 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 90 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 14 |
3 files changed, 83 insertions, 254 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index cf54811fe7..e46f2645bd 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -37,32 +37,6 @@ %% Explicit timeouts (i.e. not 'binary') from the handle_* functions %% are still supported, and do not have any effect on the current %% timeout value. -%% -%% 6) init/1 can also return (either a further arg in addition to -%% timeout above, or as a key-value list with the timeout as {timeout, -%% Timeout}) a minimum priority (key: min_priority). This can also be -%% returned from handle_* functions (i.e. {noreply, NewState} or -%% {noreply, NewState, Timeout} or {noreply, NewState, Timeout, -%% MinPri} or {noreply, NewState, [{min_priority, MinPri}]} or -%% {noreply, NewState, [{min_priority, MinPri}, {timeout, -%% Timeout}]}). What this does is to only allow messages greater than -%% the indicated priority through to the module. To allow any message -%% through (as is the default), use 'any'. One effect of this is that -%% when hibernating, the process can be woken up to receive a message -%% which it then realises it is not interested in. When this happens, -%% handle_info(roused_and_disinterested, State) will be called as soon -%% as there are no further messages to process (i.e. upon waking, the -%% message queue is drained, and a timeout of 0 is used). -%% -%% This feature means that you can delay processing lower priority -%% messages. For example, when a min_priority of 0 is combined with -%% the binary backoff timeout, you can delay processing any -%% negative-priority messages until the first timeout fires which -%% indicates that, given a steady state, the process has been idle for -%% sufficiently long that it's reasonable to expect it to be -%% uninterrupted by higher-priority messages for some little while; -%% thus preventing low-priority, but lengthy jobs from getting in the -%% way of higher priority jobs that need quick responses. %% All modifications are (C) 2009 LShift Ltd. @@ -159,8 +133,7 @@ cast/2, pcast/3, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, - enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, - wake_hib/8]). + enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]). -export([behaviour_info/1]). @@ -338,15 +311,7 @@ multi_call(Nodes, Name, Req, Timeout) %%----------------------------------------------------------------- -%% enter_loop(Mod, Options, State) -> _ -%% enter_loop(Mod, Options, State, ServerName) -> _ -%% enter_loop(Mod, Options, State, [{Key, Value}]) -> _ -%% enter_loop(Mod, Options, State, Timeout) -> _ -%% enter_loop(Mod, Options, State, ServerName, [{Key, Value}]) -> _ -%% enter_loop(Mod, Options, State, ServerName, Timeout) -> _ -%% enter_loop(Mod, Options, State, ServerName, Timeout, MinPri) -> _ -%% -%% {Key, Value} = {min_priority, MinPri} | {timeout, Timeout} +%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_ %% %% Description: Makes an existing process into a gen_server. %% The calling process will enter the gen_server receive @@ -357,32 +322,22 @@ multi_call(Nodes, Name, Req, Timeout) %% process, including registering a name for it. %%----------------------------------------------------------------- enter_loop(Mod, Options, State) -> - enter_loop(Mod, Options, State, self(), []). + enter_loop(Mod, Options, State, self(), infinity). enter_loop(Mod, Options, State, ServerName = {_, _}) -> - enter_loop(Mod, Options, State, ServerName, []); - -enter_loop(Mod, Options, State, Opts) when is_list(Opts) -> - enter_loop(Mod, Options, State, self(), Opts); + enter_loop(Mod, Options, State, ServerName, infinity); enter_loop(Mod, Options, State, Timeout) -> - enter_loop(Mod, Options, State, self(), [{timeout, Timeout}]). + enter_loop(Mod, Options, State, self(), Timeout). -enter_loop(Mod, Options, State, ServerName, Opts) when is_list(Opts) -> +enter_loop(Mod, Options, State, ServerName, Timeout) -> Name = get_proc_name(ServerName), Parent = get_parent(), Debug = debug_options(Name, Options), Queue = priority_queue:new(), - [{timeout, Timeout}, {min_priority, MinPri}] = extract_timeout_minpri(Opts), {Timeout1, TimeoutState} = build_timeout_state(Timeout), - loop(Parent, Name, State, Mod, Timeout1, TimeoutState, MinPri, Queue, Debug); - -enter_loop(Mod, Options, State, ServerName, Timeout) -> - enter_loop(Mod, Options, State, ServerName, [{timeout, Timeout}]). + loop(Parent, Name, State, Mod, Timeout1, TimeoutState, Queue, Debug). -enter_loop(Mod, Options, State, ServerName, Timeout, MinPri) -> - enter_loop(Mod, Options, State, ServerName, - [{timeout, Timeout}, {min_priority, MinPri}]). %%%======================================================================== %%% Gen-callback functions %%%======================================================================== @@ -402,19 +357,13 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> Queue = priority_queue:new(), case catch Mod:init(Args) of {ok, State} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, infinity, undefined, - any, Queue, Debug); + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug); {ok, State, Timeout} -> proc_lib:init_ack(Starter, {ok, self()}), {Timeout1, TimeoutState} = build_timeout_state(Timeout), - loop(Parent, Name, State, Mod, Timeout1, TimeoutState, - any, Queue, Debug); - {ok, State, Timeout, MinPri} -> - proc_lib:init_ack(Starter, {ok, self()}), - {Timeout1, TimeoutState} = build_timeout_state(Timeout), - loop(Parent, Name, State, Mod, Timeout1, TimeoutState, - MinPri, Queue, Debug); + loop(Parent, Name, State, Mod, Timeout1, TimeoutState, Queue, + Debug); {stop, Reason} -> %% For consistency, we must make sure that the %% registered name (if any) is unregistered before @@ -458,71 +407,57 @@ build_timeout_state(Timeout) -> _ -> {Timeout, undefined} end. -extract_timeout_minpri(Opts) -> - rabbit_misc:keygets([{timeout, infinity}, {min_priority, any}], Opts). - %%%======================================================================== %%% Internal functions %%%======================================================================== %%% --------------------------------------------------- %%% The MAIN loop. %%% --------------------------------------------------- -loop(Parent, Name, State, Mod, hibernate, undefined, MinPri, Queue, Debug) -> - proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod, undefined, - MinPri, Queue, Debug]); -loop(Parent, Name, State, Mod, hibernate, {Current, Min, undefined}, - MinPri, Queue, Debug) -> +loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) -> + proc_lib:hibernate(?MODULE,wake_hib, + [Parent, Name, State, Mod, undefined, Queue, Debug]); +loop(Parent, Name, State, Mod, hibernate, {Current, Min, undefined}, Queue, + Debug) -> proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, - {Current, Min, now()}, - MinPri, Queue, Debug]); -loop(Parent, Name, State, Mod, Time, TimeoutState, MinPri, Queue, Debug) -> + {Current, Min, now()}, Queue, Debug]); +loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> receive Input -> loop(Parent, Name, State, Mod, - Time, TimeoutState, MinPri, in(Input, Queue), Debug) + Time, TimeoutState, in(Input, Queue), Debug) after 0 -> process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, - MinPri, Queue, Debug) + Queue, Debug, false) end. -process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, MinPri, Queue, - Debug) -> - Res = case MinPri of - any -> priority_queue:out(Queue); - _ -> priority_queue:out(MinPri, Queue) - end, - case Res of +process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, + Debug, Hib) -> + case priority_queue:out(Queue) of {{value, Msg}, Queue1} -> process_msg(Parent, Name, State, Mod, - Time, TimeoutState, Queue1, Debug, Msg); + Time, TimeoutState, Queue1, Debug, Hib, Msg); {empty, Queue1} -> Time1 = case {Time, TimeoutState} of - {hibernate, _} -> 0; {binary, {Current, _Min, undefined}} -> Current; _ -> Time end, receive Input -> loop(Parent, Name, State, Mod, - Time, TimeoutState, MinPri, in(Input, Queue1), Debug) + Time, TimeoutState, in(Input, Queue1), Debug) after Time1 -> process_msg(Parent, Name, State, Mod, - Time, TimeoutState, Queue1, Debug, - case Time == hibernate of - true -> {roused_and_disinterested, MinPri}; - false when MinPri =:= any -> timeout; - false -> {timeout, MinPri} - end) + Time, TimeoutState, Queue1, Debug, Hib, timeout) end end. -wake_hib(Parent, Name, State, Mod, TimeoutState, MinPri, Queue, Debug) -> +wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> Msg = receive Input -> Input end, TimeoutState1 = adjust_hibernate_after(TimeoutState), process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState1, - MinPri, in(Msg, Queue), Debug). + in(Msg, Queue), Debug, true). adjust_hibernate_after(undefined) -> undefined; @@ -553,12 +488,15 @@ in(Input, Queue) -> priority_queue:in(Input, Queue). process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, - Debug, Msg) -> + Debug, _Hib, Msg) -> case Msg of {system, From, Req} -> sys:handle_system_msg (Req, From, Parent, ?MODULE, Debug, [Name, State, Mod, Time, TimeoutState, Queue]); + %% gen_server puts Hib on the end as the 7th arg, but that + %% version of the function seems not to be documented so + %% leaving out for now. {'EXIT', Parent, Reason} -> terminate(Reason, Name, Msg, Mod, State, Debug); _Msg when Debug =:= [] -> @@ -769,34 +707,14 @@ handle_msg({'$gen_call', From, Msg}, case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> reply(From, Reply), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue, - []); - {reply, Reply, NState, Opts} when is_list(Opts) -> - reply(From, Reply), - [{timeout, Time}, {min_priority, MinPri}] = - extract_timeout_minpri(Opts), - loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, - []); - {reply, Reply, NState, Time} -> + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); + {reply, Reply, NState, Time1} -> reply(From, Reply), - loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, []); - {reply, Reply, NState, Time, MinPri} -> - reply(From, Reply), - loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, - []); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue, - []); - {noreply, NState, Opts} when is_list(Opts) -> - [{timeout, Time}, {min_priority, MinPri}] = - extract_timeout_minpri(Opts), - loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, - []); - {noreply, NState, Time} -> - loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, []); - {noreply, NState, Time, MinPri} -> - loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, - []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); + {noreply, NState, Time1} -> + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); {stop, Reason, Reply, NState} -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, NState, [])), @@ -816,44 +734,20 @@ handle_msg({'$gen_call', From, Msg}, case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue, - Debug1); - {reply, Reply, NState, Opts} when is_list(Opts) -> - Debug1 = reply(Name, From, Reply, NState, Debug), - [{timeout, Time}, {min_priority, MinPri}] = - extract_timeout_minpri(Opts), - loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, - Debug1); - {reply, Reply, NState, Time} -> - Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, Debug1); - {reply, Reply, NState, Time, MinPri} -> + {reply, Reply, NState, Time1} -> Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, - Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); {noreply, NState} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue, - Debug1); - {noreply, NState, Opts} when is_list(Opts) -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - [{timeout, Time}, {min_priority, MinPri}] = - extract_timeout_minpri(Opts), - loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, - Debug1); - {noreply, NState, Time} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, Debug1); - {noreply, NState, Time, MinPri} -> + {noreply, NState, Time1} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, - Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); {stop, Reason, Reply, NState} -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, NState, Debug)), @@ -873,18 +767,9 @@ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue) -> case Reply of {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue, - []); - {noreply, NState, Opts} when is_list(Opts) -> - [{timeout, Time}, {min_priority, MinPri}] = - extract_timeout_minpri(Opts), - loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, - []); - {noreply, NState, Time} -> - loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, []); - {noreply, NState, Time, MinPri} -> - loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, - []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); + {noreply, NState, Time1} -> + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); {stop, Reason, NState} -> terminate(Reason, Name, Msg, Mod, NState, []); {'EXIT', What} -> @@ -899,25 +784,12 @@ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue, {noreply, NState} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue, - Debug1); - {noreply, NState, Opts} when is_list(Opts) -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - [{timeout, Time}, {min_priority, MinPri}] = - extract_timeout_minpri(Opts), - loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, Debug1); - {noreply, NState, Time} -> + {noreply, NState, Time1} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, - Debug1); - {noreply, NState, Time, MinPri} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, - Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); {stop, Reason, NState} -> terminate(Reason, Name, Msg, Mod, NState, Debug); {'EXIT', What} -> @@ -935,9 +807,8 @@ reply(Name, {To, Tag}, Reply, State, Debug) -> %%----------------------------------------------------------------- %% Callback functions for system messages handling. %%----------------------------------------------------------------- -system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, MinPri, - Queue]) -> - loop(Parent, Name, State, Mod, Time, TimeoutState, MinPri, Queue, Debug). +system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) -> + loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug). -ifdef(use_specs). -spec system_terminate(_, _, _, [_]) -> no_return(). diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 0e43c38779..e8a63bc364 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -42,7 +42,7 @@ tx_publish/1, tx_commit/3, tx_cancel/1, requeue/2, purge/1, delete_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1, - requeue_next_n/2, prefetch/2, length/1, foldl/3 + requeue_next_n/2, length/1, foldl/3 ]). -export([filesync/0, cache_info/0]). @@ -280,7 +280,6 @@ -spec(cache_info/0 :: () -> [{atom(), term()}]). -spec(report_memory/0 :: () -> 'ok'). -spec(set_mode/1 :: ('disk' | 'mixed') -> 'ok'). --spec(prefetch/2 :: (queue_name(), non_neg_integer()) -> 'ok'). -endif. @@ -361,9 +360,6 @@ report_memory() -> set_mode(Mode) -> gen_server2:cast(?SERVER, {set_mode, Mode}). -prefetch(Q, Count) -> - gen_server2:pcast(?SERVER, -1, {prefetch, Q, Count}). - %% ---- GEN-SERVER INTERNAL API ---- init([FileSizeLimit, ReadFileHandlesLimit]) -> @@ -455,7 +451,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> %% ets_bytes_per_record otherwise. ok = rabbit_queue_mode_manager:report_memory(self(), 0, false), ok = report_memory(false, State2), - {ok, State2, {binary, ?HIBERNATE_AFTER_MIN}, 0}. + {ok, State2, {binary, ?HIBERNATE_AFTER_MIN}}. handle_call({deliver, Q}, _From, State) -> {ok, Result, State1} = internal_deliver(Q, true, State), @@ -535,25 +531,18 @@ handle_cast(report_memory, State) -> %% call noreply1/2, not noreply/1/2, as we don't want to restart the %% memory_report_timer %% by unsetting the timer, we force a report on the next normal message - noreply1(State #dqstate { memory_report_timer = undefined }, 0); -handle_cast({prefetch, Q, Count}, State) -> - {ok, State1} = internal_prefetch(Q, Count, State), - noreply(State1, any). %% set minpri to any + noreply1(State #dqstate { memory_report_timer = undefined }). handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; -handle_info({timeout, 0}, State = #dqstate { commit_timer_ref = undefined }) -> - %% this is the binary timeout coming back, with minpri = 0 - %% don't use noreply/1/2 or noreply1/2 as they'll restart the memory timer - %% set timeout to 0, and go pick up any low priority messages - {noreply, stop_memory_timer(State), 0, any}; -handle_info({timeout, 0}, State) -> - %% must have commit_timer set, so timeout was 0, and we're not hibernating - noreply(sync_current_file_handle(State)); -handle_info(timeout, State) -> - %% no minpri supplied, so it must have been 'any', so go hibernate +handle_info(timeout, State = #dqstate { commit_timer_ref = undefined }) -> + %% this is the binary timeout coming back + %% don't use noreply/1 or noreply1/1 as they'll restart the memory timer ok = report_memory(true, State), - {noreply, State, hibernate, any}. + {noreply, stop_memory_timer(State), hibernate}; +handle_info(timeout, State) -> + %% must have commit_timer set, so timeout was 0, and we're not hibernating + noreply(sync_current_file_handle(State)). terminate(_Reason, State) -> shutdown(State). @@ -675,36 +664,30 @@ to_ram_disk_mode(State = #dqstate { operation_mode = disk_only, ets_bytes_per_record = undefined }. noreply(NewState) -> - noreply(NewState, 0). - -noreply(NewState, MinPri) -> - noreply1(start_memory_timer(NewState), MinPri). + noreply1(start_memory_timer(NewState)). noreply1(NewState = #dqstate { on_sync_txns = [], - commit_timer_ref = undefined }, MinPri) -> - {noreply, NewState, binary, MinPri}; -noreply1(NewState = #dqstate { commit_timer_ref = undefined }, MinPri) -> - {noreply, start_commit_timer(NewState), 0, MinPri}; -noreply1(NewState = #dqstate { on_sync_txns = [] }, MinPri) -> - {noreply, stop_commit_timer(NewState), binary, MinPri}; -noreply1(NewState, MinPri) -> - {noreply, NewState, 0, MinPri}. + commit_timer_ref = undefined }) -> + {noreply, NewState, binary}; +noreply1(NewState = #dqstate { commit_timer_ref = undefined }) -> + {noreply, start_commit_timer(NewState), 0}; +noreply1(NewState = #dqstate { on_sync_txns = [] }) -> + {noreply, stop_commit_timer(NewState), binary}; +noreply1(NewState) -> + {noreply, NewState, 0}. reply(Reply, NewState) -> - reply(Reply, NewState, 0). - -reply(Reply, NewState, MinPri) -> - reply1(Reply, start_memory_timer(NewState), MinPri). + reply1(Reply, start_memory_timer(NewState)). reply1(Reply, NewState = #dqstate { on_sync_txns = [], - commit_timer_ref = undefined }, MinPri) -> - {reply, Reply, NewState, binary, MinPri}; -reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }, MinPri) -> - {reply, Reply, start_commit_timer(NewState), 0, MinPri}; -reply1(Reply, NewState = #dqstate { on_sync_txns = [] }, MinPri) -> - {reply, Reply, stop_commit_timer(NewState), binary, MinPri}; -reply1(Reply, NewState, MinPri) -> - {reply, Reply, NewState, 0, MinPri}. + commit_timer_ref = undefined }) -> + {reply, Reply, NewState, binary}; +reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }) -> + {reply, Reply, start_commit_timer(NewState), 0}; +reply1(Reply, NewState = #dqstate { on_sync_txns = [] }) -> + {reply, Reply, stop_commit_timer(NewState), binary}; +reply1(Reply, NewState) -> + {reply, Reply, NewState, 0}. form_filename(Name) -> filename:join(base_directory(), Name). @@ -905,23 +888,6 @@ internal_deliver(Q, ReadMsg, end, State1} end. -internal_prefetch(Q, Count, State = #dqstate { sequences = Sequences }) -> - {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), - Length = WriteSeqId - ReadSeqId, - Count1 = lists:min([Length, Count]), - StateN = internal_prefetch(Q, ReadSeqId + Count1 - 1, ReadSeqId, State), - {ok, StateN}. - -internal_prefetch(_Q, Target, Target, State) -> - State; -internal_prefetch(Q, Target, ReadSeqId, State) -> - {ok, _MsgStuff, State1} = - internal_read_message(Q, ReadSeqId, true, true, true, State), - case cache_is_full(State1) of - true -> State1; - false -> internal_prefetch(Q, Target, ReadSeqId + 1, State1) - end. - internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) -> {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId). diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 50b75789bd..3c2f99e670 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -348,17 +348,9 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q, {{Msg, IsDelivered, AckTag, Rem}, State #mqstate { msg_buf = MsgBuf2, length = Rem }}. -maybe_prefetch(disk, _MsgBuf) -> - ok; -maybe_prefetch(mixed, MsgBuf) -> - case queue:peek(MsgBuf) of - empty -> - ok; - {value, {#basic_message {}, _IsDelivered}} -> - ok; - {value, {Q, Count}} -> - rabbit_disk_queue:prefetch(Q, Count) - end. +maybe_prefetch(_, _) -> + %% disable just for the time being + ok. remove_noacks(MsgsWithAcks) -> lists:foldl( |
