summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl60
-rw-r--r--src/rabbit_disk_queue.erl111
-rw-r--r--src/rabbit_mixed_queue.erl162
-rw-r--r--src/rabbit_queue_prefetcher.erl4
-rw-r--r--src/rabbit_tests.erl64
6 files changed, 189 insertions, 218 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1d9f8c53cd..6c4c0ebb69 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -42,7 +42,7 @@
-export([notify_sent/2, unblock/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
--export([set_mode/2, report_memory/1]).
+-export([set_mode/2]).
-import(mnesia).
-import(gen_server2).
@@ -107,7 +107,6 @@
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
--spec(report_memory/1 :: (pid()) -> 'ok').
-endif.
@@ -227,9 +226,6 @@ map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
set_mode(QPid, Mode) ->
gen_server2:pcast(QPid, 10, {set_mode, Mode}).
-report_memory(QPid) ->
- gen_server2:cast(QPid, report_memory).
-
info(#amqqueue{ pid = QPid }) ->
gen_server2:pcall(QPid, 9, info, infinity).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 14a0370d83..b1c409b1f2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -38,7 +38,7 @@
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
--define(MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds
+-define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds
-export([start_link/1]).
@@ -142,8 +142,8 @@ noreply(NewState) ->
{noreply, start_memory_timer(NewState), hibernate}.
start_memory_timer(State = #q { memory_report_timer = undefined }) ->
- {ok, TRef} = timer:apply_after(?MEMORY_REPORT_TIME_INTERVAL,
- rabbit_amqqueue, report_memory, [self()]),
+ {ok, TRef} = timer:send_after(?MINIMUM_MEMORY_REPORT_TIME_INTERVAL,
+ report_memory),
report_memory(false, State #q { memory_report_timer = TRef });
start_memory_timer(State) ->
State.
@@ -199,11 +199,12 @@ record_current_channel_tx(ChPid, Txn) ->
%% that wasn't happening already)
store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
-deliver_queue(Funs = {PredFun, DeliverFun}, FunAcc,
- State = #q{q = #amqqueue{name = QName},
- active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers,
- next_msg_id = NextId}) ->
+deliver_msgs_to_consumers(
+ Funs = {PredFun, DeliverFun}, FunAcc,
+ State = #q{q = #amqqueue{name = QName},
+ active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers,
+ next_msg_id = NextId}) ->
case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
@@ -246,7 +247,7 @@ deliver_queue(Funs = {PredFun, DeliverFun}, FunAcc,
blocked_consumers = NewBlockedConsumers,
next_msg_id = NextId + 1
},
- deliver_queue(Funs, FunAcc1, State2);
+ deliver_msgs_to_consumers(Funs, FunAcc1, State2);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
store_ch_record(C#cr{is_limit_active = true}),
@@ -254,7 +255,7 @@ deliver_queue(Funs = {PredFun, DeliverFun}, FunAcc,
move_consumers(ChPid,
ActiveConsumers,
BlockedConsumers),
- deliver_queue(
+ deliver_msgs_to_consumers(
Funs, FunAcc,
State#q{active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers});
@@ -271,7 +272,7 @@ deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) ->
deliver_from_queue_deliver(AckRequired, {false, AutoAcks},
State = #q { mixed_state = MS }) ->
{{Msg, IsDelivered, AckTag, Remaining}, MS1} =
- rabbit_mixed_queue:deliver(MS),
+ rabbit_mixed_queue:fetch(MS),
AutoAcks1 =
case AckRequired of
true -> AutoAcks;
@@ -285,7 +286,7 @@ run_message_queue(State = #q { mixed_state = MS }) ->
fun deliver_from_queue_deliver/3 },
IsEmpty = rabbit_mixed_queue:is_empty(MS),
{{_IsEmpty1, AutoAcks}, State1} =
- deliver_queue(Funs, {IsEmpty, []}, State),
+ deliver_msgs_to_consumers(Funs, {IsEmpty, []}, State),
{ok, MS1} =
rabbit_mixed_queue:ack(AutoAcks, State1 #q.mixed_state),
State1 #q { mixed_state = MS1 }.
@@ -306,7 +307,7 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) ->
end,
{{Msg, false, AckTag}, true, State2}
end,
- deliver_queue({ PredFun, DeliverFun }, false, State);
+ deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
attempt_immediate_delivery(Txn, ChPid, Msg, State) ->
{ok, MS} = rabbit_mixed_queue:tx_publish(Msg, State #q.mixed_state),
record_pending_message(Txn, ChPid, Msg),
@@ -330,8 +331,8 @@ deliver_or_requeue_n(MsgsWithAcks, State) ->
Funs = { fun deliver_or_requeue_msgs_pred/2,
fun deliver_or_requeue_msgs_deliver/3 },
{{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} =
- deliver_queue(Funs, {length(MsgsWithAcks) - 1, [], MsgsWithAcks},
- State),
+ deliver_msgs_to_consumers(
+ Funs, {length(MsgsWithAcks), [], MsgsWithAcks}, State),
{ok, MS} = rabbit_mixed_queue:ack(AutoAcks,
NewState #q.mixed_state),
case OutstandingMsgs of
@@ -341,7 +342,7 @@ deliver_or_requeue_n(MsgsWithAcks, State) ->
end.
deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) ->
- -1 < Len.
+ 0 < Len.
deliver_or_requeue_msgs_deliver(
false, {Len, AcksAcc, [(MsgAckTag = {Msg, _}) | MsgsWithAcks]}, State) ->
{{Msg, true, noack}, {Len - 1, [MsgAckTag | AcksAcc], MsgsWithAcks}, State};
@@ -612,11 +613,11 @@ handle_call({basic_get, ChPid, NoAck}, _From,
next_msg_id = NextId,
mixed_state = MS
}) ->
- case rabbit_mixed_queue:deliver(MS) of
+ case rabbit_mixed_queue:fetch(MS) of
{empty, MS1} -> reply(empty, State #q { mixed_state = MS1 });
{{Msg, IsDelivered, AckTag, Remaining}, MS1} ->
AckRequired = not(NoAck),
- {ok, MS3} =
+ {ok, MS2} =
case AckRequired of
true ->
C = #cr{unacked_messages = UAM} = ch_record(ChPid),
@@ -628,9 +629,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
end,
Message = {QName, self(), NextId, IsDelivered, Msg},
reply({ok, Remaining, Message},
- State #q { next_msg_id = NextId + 1,
- mixed_state = MS3
- })
+ State #q { next_msg_id = NextId + 1, mixed_state = MS2 })
end;
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
@@ -769,9 +768,9 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
not_found ->
noreply(State);
C = #cr{unacked_messages = UAM} ->
- {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM),
case Txn of
none ->
+ {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM),
{ok, MS} =
rabbit_mixed_queue:ack(MsgWithAcks, State #q.mixed_state),
store_ch_record(C#cr{unacked_messages = Remaining}),
@@ -829,16 +828,13 @@ handle_cast({set_mode, Mode}, State = #q { mixed_state = MS }) ->
PendingMessages =
lists:flatten([Pending || #tx { pending_messages = Pending}
<- all_tx_record()]),
- {ok, MS1} = (case Mode of
- disk -> fun rabbit_mixed_queue:to_disk_only_mode/2;
- mixed -> fun rabbit_mixed_queue:to_mixed_mode/2
- end)(PendingMessages, MS),
- noreply(State #q { mixed_state = MS1 });
-
-handle_cast(report_memory, State) ->
- %% deliberately don't call noreply/2 as we don't want to restart the timer
- %% by unsetting the timer, we force a report on the next normal message
- {noreply, State #q { memory_report_timer = undefined }, hibernate}.
+ {ok, MS1} = rabbit_mixed_queue:set_mode(Mode, PendingMessages, MS),
+ noreply(State #q { mixed_state = MS1 }).
+
+handle_info(report_memory, State) ->
+ %% deliberately don't call noreply/2 as we don't want to restart the timer.
+ %% By unsetting the timer, we force a report on the next normal message
+ {noreply, State #q { memory_report_timer = undefined }, hibernate};
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 5940f5ad04..e2f341ffd4 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -39,7 +39,7 @@
terminate/2, code_change/3]).
-export([handle_pre_hibernate/1]).
--export([publish/3, deliver/1, phantom_deliver/1, ack/2,
+-export([publish/3, fetch/1, phantom_fetch/1, ack/2,
tx_publish/1, tx_commit/3, tx_cancel/1,
requeue/2, purge/1, delete_queue/1,
delete_non_durable_queues/1, auto_ack_next_message/1,
@@ -48,27 +48,27 @@
-export([filesync/0, cache_info/0]).
--export([stop/0, stop_and_obliterate/0, report_memory/0,
- set_mode/1, to_disk_only_mode/0, to_ram_disk_mode/0]).
+-export([stop/0, stop_and_obliterate/0, set_mode/1, to_disk_only_mode/0,
+ to_ram_disk_mode/0]).
-include("rabbit.hrl").
--define(WRITE_OK_SIZE_BITS, 8).
--define(WRITE_OK_TRANSIENT, 255).
--define(WRITE_OK_PERSISTENT, 254).
--define(INTEGER_SIZE_BYTES, 8).
--define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
--define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
--define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
--define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences).
--define(CACHE_ETS_NAME, rabbit_disk_queue_cache).
--define(FILE_EXTENSION, ".rdq").
--define(FILE_EXTENSION_TMP, ".rdt").
--define(FILE_EXTENSION_DETS, ".dets").
--define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))).
--define(MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds
--define(BATCH_SIZE, 10000).
--define(CACHE_MAX_SIZE, 10485760).
+-define(WRITE_OK_SIZE_BITS, 8).
+-define(WRITE_OK_TRANSIENT, 255).
+-define(WRITE_OK_PERSISTENT, 254).
+-define(INTEGER_SIZE_BYTES, 8).
+-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
+-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
+-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
+-define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences).
+-define(CACHE_ETS_NAME, rabbit_disk_queue_cache).
+-define(FILE_EXTENSION, ".rdq").
+-define(FILE_EXTENSION_TMP, ".rdt").
+-define(FILE_EXTENSION_DETS, ".dets").
+-define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))).
+-define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in millisecs
+-define(BATCH_SIZE, 10000).
+-define(CACHE_MAX_SIZE, 10485760).
-define(SERVER, ?MODULE).
@@ -94,11 +94,11 @@
file_size_limit, %% how big can our files get?
read_file_handles, %% file handles for reading (LRU)
read_file_handles_limit, %% how many file handles can we open?
- on_sync_txns, %% list of commiters to run on sync (reversed)
+ on_sync_txns, %% list of commiters to run on sync (reversed)
commit_timer_ref, %% TRef for our interval timer
last_sync_offset, %% current_offset at the last time we sync'd
message_cache, %% ets message cache
- memory_report_timer, %% TRef for the memory report timer
+ memory_report_timer_ref, %% TRef for the memory report timer
wordsize, %% bytes in a word on this platform
mnesia_bytes_per_record, %% bytes per record in mnesia in ram_disk mode
ets_bytes_per_record %% bytes per record in msg_location_ets
@@ -253,10 +253,10 @@
-spec(start_link/0 :: () ->
({'ok', pid()} | 'ignore' | {'error', any()})).
-spec(publish/3 :: (queue_name(), message(), bool()) -> 'ok').
--spec(deliver/1 :: (queue_name()) ->
+-spec(fetch/1 :: (queue_name()) ->
('empty' | {message(), non_neg_integer(),
bool(), {msg_id(), seq_id()}, non_neg_integer()})).
--spec(phantom_deliver/1 :: (queue_name()) ->
+-spec(phantom_fetch/1 :: (queue_name()) ->
( 'empty' | {msg_id(), bool(), bool(), {msg_id(), seq_id()},
non_neg_integer()})).
-spec(prefetch/1 :: (queue_name()) -> 'ok').
@@ -281,7 +281,6 @@
-spec(to_ram_disk_mode/0 :: () -> 'ok').
-spec(filesync/0 :: () -> 'ok').
-spec(cache_info/0 :: () -> [{atom(), term()}]).
--spec(report_memory/0 :: () -> 'ok').
-spec(set_mode/1 :: ('disk' | 'mixed') -> 'ok').
-endif.
@@ -295,11 +294,11 @@ start_link() ->
publish(Q, Message = #basic_message {}, IsDelivered) ->
gen_server2:cast(?SERVER, {publish, Q, Message, IsDelivered}).
-deliver(Q) ->
- gen_server2:call(?SERVER, {deliver, Q}, infinity).
+fetch(Q) ->
+ gen_server2:call(?SERVER, {fetch, Q}, infinity).
-phantom_deliver(Q) ->
- gen_server2:call(?SERVER, {phantom_deliver, Q}, infinity).
+phantom_fetch(Q) ->
+ gen_server2:call(?SERVER, {phantom_fetch, Q}, infinity).
prefetch(Q) ->
gen_server2:pcast(?SERVER, -1, {prefetch, Q, self()}).
@@ -360,9 +359,6 @@ filesync() ->
cache_info() ->
gen_server2:call(?SERVER, cache_info, infinity).
-report_memory() ->
- gen_server2:cast(?SERVER, report_memory).
-
set_mode(Mode) ->
gen_server2:pcast(?SERVER, 10, {set_mode, Mode}).
@@ -406,8 +402,6 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% seems to blow up if it is set private
MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]),
- TRef = start_memory_timer(),
-
InitName = "0" ++ ?FILE_EXTENSION,
State =
#dqstate { msg_location_dets = MsgLocationDets,
@@ -430,7 +424,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
last_sync_offset = 0,
message_cache = ets:new(?CACHE_ETS_NAME,
[set, private]),
- memory_report_timer = TRef,
+ memory_report_timer_ref = undefined,
wordsize = erlang:system_info(wordsize),
mnesia_bytes_per_record = undefined,
ets_bytes_per_record = undefined
@@ -457,14 +451,14 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% ets_bytes_per_record otherwise.
ok = rabbit_queue_mode_manager:report_memory(self(), 0, false),
ok = report_memory(false, State2),
- {ok, State2, hibernate, {backoff, ?HIBERNATE_AFTER_MIN,
- ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+ {ok, start_memory_timer(State2), hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({deliver, Q}, _From, State) ->
- {ok, Result, State1} = internal_deliver(Q, true, false, true, State),
+handle_call({fetch, Q}, _From, State) ->
+ {ok, Result, State1} = internal_fetch(Q, true, false, true, State),
reply(Result, State1);
-handle_call({phantom_deliver, Q}, _From, State) ->
- {ok, Result, State1} = internal_deliver(Q, false, false, true, State),
+handle_call({phantom_fetch, Q}, _From, State) ->
+ {ok, Result, State1} = internal_fetch(Q, false, false, true, State),
reply(Result, State1);
handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) ->
State1 =
@@ -534,13 +528,8 @@ handle_cast({set_mode, Mode}, State) ->
disk -> fun to_disk_only_mode/1;
mixed -> fun to_ram_disk_mode/1
end)(State));
-handle_cast(report_memory, State) ->
- %% call noreply1/2, not noreply/1/2, as we don't want to restart the
- %% memory_report_timer
- %% by unsetting the timer, we force a report on the next normal message
- noreply1(State #dqstate { memory_report_timer = undefined });
handle_cast({prefetch, Q, From}, State) ->
- {ok, Result, State1} = internal_deliver(Q, true, true, false, State),
+ {ok, Result, State1} = internal_fetch(Q, true, true, false, State),
Cont = rabbit_misc:with_exit_handler(
fun () -> false end,
fun () ->
@@ -550,7 +539,7 @@ handle_cast({prefetch, Q, From}, State) ->
State3 =
case Cont of
true ->
- case internal_deliver(Q, false, false, true, State1) of
+ case internal_fetch(Q, false, false, true, State1) of
{ok, empty, State2} -> State2;
{ok, {_MsgId, _IsPersistent, _Delivered, _MsgSeqId, _Rem},
State2} -> State2
@@ -559,6 +548,11 @@ handle_cast({prefetch, Q, From}, State) ->
end,
noreply(State3).
+handle_info(report_memory, State) ->
+ %% call noreply1/2, not noreply/1/2, as we don't want to restart the
+ %% memory_report_timer_ref.
+ %% By unsetting the timer, we force a report on the next normal message
+ noreply1(State #dqstate { memory_report_timer_ref = undefined });
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info(timeout, State) ->
@@ -595,7 +589,7 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
State1 #dqstate { current_file_handle = undefined,
current_dirty = false,
read_file_handles = {dict:new(), gb_trees:empty()},
- memory_report_timer = undefined
+ memory_report_timer_ref = undefined
}.
code_change(_OldVsn, State, _Extra) ->
@@ -603,20 +597,17 @@ code_change(_OldVsn, State, _Extra) ->
%% ---- UTILITY FUNCTIONS ----
-stop_memory_timer(State = #dqstate { memory_report_timer = undefined }) ->
+stop_memory_timer(State = #dqstate { memory_report_timer_ref = undefined }) ->
State;
-stop_memory_timer(State = #dqstate { memory_report_timer = TRef }) ->
+stop_memory_timer(State = #dqstate { memory_report_timer_ref = TRef }) ->
{ok, cancel} = timer:cancel(TRef),
- State #dqstate { memory_report_timer = undefined }.
-
-start_memory_timer() ->
- {ok, TRef} = timer:apply_after(?MEMORY_REPORT_TIME_INTERVAL,
- rabbit_disk_queue, report_memory, []),
- TRef.
+ State #dqstate { memory_report_timer_ref = undefined }.
-start_memory_timer(State = #dqstate { memory_report_timer = undefined }) ->
+start_memory_timer(State = #dqstate { memory_report_timer_ref = undefined }) ->
ok = report_memory(false, State),
- State #dqstate { memory_report_timer = start_memory_timer() };
+ {ok, TRef} = timer:send_after(?MINIMUM_MEMORY_REPORT_TIME_INTERVAL,
+ report_memory),
+ State #dqstate { memory_report_timer_ref = TRef };
start_memory_timer(State) ->
State.
@@ -893,7 +884,7 @@ cache_is_full(#dqstate { message_cache = Cache }) ->
%% ---- INTERNAL RAW FUNCTIONS ----
-internal_deliver(Q, ReadMsg, FakeDeliver, Advance,
+internal_fetch(Q, ReadMsg, FakeDeliver, Advance,
State = #dqstate { sequences = Sequences }) ->
case sequence_lookup(Sequences, Q) of
{SeqId, SeqId} -> {ok, empty, State};
@@ -971,7 +962,7 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) -
end.
internal_auto_ack(Q, State) ->
- case internal_deliver(Q, false, false, true, State) of
+ case internal_fetch(Q, false, false, true, State) of
{ok, empty, State1} -> {ok, State1};
{ok, {_MsgId, _IsPersistent, _Delivered, MsgSeqId, _Remaining},
State1} ->
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 3d989662db..2b25ab0fac 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -35,11 +35,11 @@
-export([init/2]).
--export([publish/2, publish_delivered/2, deliver/1, ack/2,
+-export([publish/2, publish_delivered/2, fetch/1, ack/2,
tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1,
length/1, is_empty/1, delete_queue/1, maybe_prefetch/1]).
--export([to_disk_only_mode/2, to_mixed_mode/2, info/1,
+-export([set_mode/3, info/1,
estimate_queue_memory_and_reset_counters/1]).
-record(mqstate, { mode,
@@ -76,7 +76,7 @@
-spec(publish/2 :: (message(), mqstate()) -> okmqs()).
-spec(publish_delivered/2 :: (message(), mqstate()) ->
{'ok', acktag(), mqstate()}).
--spec(deliver/1 :: (mqstate()) ->
+-spec(fetch/1 :: (mqstate()) ->
{('empty' | {message(), boolean(), acktag(), non_neg_integer()}),
mqstate()}).
-spec(ack/2 :: ([{message(), acktag()}], mqstate()) -> okmqs()).
@@ -91,8 +91,7 @@
-spec(length/1 :: (mqstate()) -> non_neg_integer()).
-spec(is_empty/1 :: (mqstate()) -> boolean()).
--spec(to_disk_only_mode/2 :: ([message()], mqstate()) -> okmqs()).
--spec(to_mixed_mode/2 :: ([message()], mqstate()) -> okmqs()).
+-spec(set_mode/3 :: (mode(), [message()], mqstate()) -> okmqs()).
-spec(estimate_queue_memory_and_reset_counters/1 :: (mqstate()) ->
{mqstate(), non_neg_integer(), non_neg_integer(),
@@ -120,8 +119,13 @@ size_of_message(
SumAcc + size(Frag)
end, 0, Payload).
-to_disk_only_mode(_TxnMessages, State = #mqstate { mode = disk }) ->
+set_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) ->
{ok, State};
+set_mode(disk, TxnMessages, State) ->
+ to_disk_only_mode(TxnMessages, State);
+set_mode(mixed, TxnMessages, State) ->
+ to_mixed_mode(TxnMessages, State).
+
to_disk_only_mode(TxnMessages, State =
#mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
is_durable = IsDurable, prefetcher = Prefetcher
@@ -219,8 +223,6 @@ flush_requeue_to_disk_queue(Q, RequeueCount, Commit) ->
ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount),
[].
-to_mixed_mode(_TxnMessages, State = #mqstate { mode = mixed }) ->
- {ok, State};
to_mixed_mode(TxnMessages, State = #mqstate { mode = disk, queue = Q,
is_durable = IsDurable }) ->
rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]),
@@ -248,6 +250,16 @@ to_mixed_mode(TxnMessages, State = #mqstate { mode = disk, queue = Q,
garbage_collect(),
{ok, State #mqstate { mode = mixed }}.
+gain_memory(Inc, State = #mqstate { memory_size = QSize,
+ memory_gain = Gain }) ->
+ State #mqstate { memory_size = QSize + Inc,
+ memory_gain = Gain + Inc }.
+
+lose_memory(Dec, State = #mqstate { memory_size = QSize,
+ memory_loss = Loss }) ->
+ State #mqstate { memory_size = QSize - Dec,
+ memory_loss = Loss + Dec }.
+
inc_queue_length(_Q, MsgBuf, 0) ->
MsgBuf;
inc_queue_length(Q, MsgBuf, Count) ->
@@ -264,7 +276,7 @@ dec_queue_length(Count, State = #mqstate { queue = Q, msg_buf = MsgBuf }) ->
{{value, {Q, Len}}, MsgBuf1} ->
case Len of
Count ->
- maybe_prefetch(State #mqstate { msg_buf = MsgBuf1 });
+ State #mqstate { msg_buf = MsgBuf1 };
_ when Len > Count ->
State #mqstate { msg_buf = queue:in_r({Q, Len-Count},
MsgBuf1)}
@@ -286,26 +298,23 @@ maybe_prefetch(State) ->
State.
publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length,
- msg_buf = MsgBuf, memory_size = QSize,
- memory_gain = Gain }) ->
+ msg_buf = MsgBuf }) ->
MsgBuf1 = inc_queue_length(Q, MsgBuf, 1),
ok = rabbit_disk_queue:publish(Q, Msg, false),
MsgSize = size_of_message(Msg),
- {ok, State #mqstate { memory_gain = Gain + MsgSize,
- memory_size = QSize + MsgSize,
- msg_buf = MsgBuf1, length = Length + 1 }};
+ {ok, gain_memory(MsgSize, State #mqstate { msg_buf = MsgBuf1,
+ length = Length + 1 })};
publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
#mqstate { queue = Q, mode = mixed, is_durable = IsDurable,
- msg_buf = MsgBuf, length = Length, memory_size = QSize,
- memory_gain = Gain }) ->
+ msg_buf = MsgBuf, length = Length }) ->
ok = case IsDurable andalso IsPersistent of
true -> rabbit_disk_queue:publish(Q, Msg, false);
false -> ok
end,
MsgSize = size_of_message(Msg),
- {ok, State #mqstate { msg_buf = queue:in({Msg, false}, MsgBuf),
- length = Length + 1, memory_size = QSize + MsgSize,
- memory_gain = Gain + MsgSize }}.
+ {ok, gain_memory(MsgSize,
+ State #mqstate { msg_buf = queue:in({Msg, false}, MsgBuf),
+ length = Length + 1 })}.
%% Assumption here is that the queue is empty already (only called via
%% attempt_immediate_delivery).
@@ -313,20 +322,18 @@ publish_delivered(Msg =
#basic_message { guid = MsgId, is_persistent = IsPersistent},
State =
#mqstate { mode = Mode, is_durable = IsDurable,
- queue = Q, length = 0,
- memory_size = QSize, memory_gain = Gain })
+ queue = Q, length = 0 })
when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
ok = rabbit_disk_queue:publish(Q, Msg, true),
MsgSize = size_of_message(Msg),
- State1 = State #mqstate { memory_size = QSize + MsgSize,
- memory_gain = Gain + MsgSize },
+ State1 = gain_memory(MsgSize, State),
case IsDurable andalso IsPersistent of
true ->
%% must call phantom_deliver otherwise the msg remains at
%% the head of the queue. This is synchronous, but
%% unavoidable as we need the AckTag
{MsgId, IsPersistent, true, AckTag, 0} =
- rabbit_disk_queue:phantom_deliver(Q),
+ rabbit_disk_queue:phantom_fetch(Q),
{ok, AckTag, State1};
false ->
%% in this case, we don't actually care about the ack, so
@@ -334,18 +341,15 @@ publish_delivered(Msg =
ok = rabbit_disk_queue:auto_ack_next_message(Q),
{ok, noack, State1}
end;
-publish_delivered(Msg, State =
- #mqstate { mode = mixed, length = 0, memory_size = QSize,
- memory_gain = Gain }) ->
+publish_delivered(Msg, State = #mqstate { mode = mixed, length = 0 }) ->
MsgSize = size_of_message(Msg),
- {ok, noack, State #mqstate { memory_size = QSize + MsgSize,
- memory_gain = Gain + MsgSize }}.
+ {ok, noack, gain_memory(MsgSize, State)}.
-deliver(State = #mqstate { length = 0 }) ->
+fetch(State = #mqstate { length = 0 }) ->
{empty, State};
-deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
- is_durable = IsDurable, length = Length,
- prefetcher = Prefetcher }) ->
+fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q,
+ is_durable = IsDurable, length = Length,
+ prefetcher = Prefetcher }) ->
{{value, Value}, MsgBuf1} = queue:out(MsgBuf),
Rem = Length - 1,
State1 = State #mqstate { length = Rem },
@@ -356,13 +360,13 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
case IsDurable andalso IsPersistent of
true ->
{MsgId, IsPersistent, IsDelivered, AckTag1, _PRem}
- = rabbit_disk_queue:phantom_deliver(Q),
+ = rabbit_disk_queue:phantom_fetch(Q),
AckTag1;
false ->
noack
end,
- State2 = maybe_prefetch(State1 #mqstate { msg_buf = MsgBuf1 }),
- {{Msg, IsDelivered, AckTag, Rem}, State2};
+ {{Msg, IsDelivered, AckTag, Rem},
+ State1 #mqstate { msg_buf = MsgBuf1 }};
{Msg = #basic_message { is_persistent = IsPersistent },
IsDelivered, AckTag} ->
%% message has come via the prefetcher, thus it's been
@@ -375,21 +379,21 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
State2 = dec_queue_length(1, State1),
{Msg = #basic_message { is_persistent = IsPersistent },
_Size, IsDelivered, AckTag, _PersistRem}
- = rabbit_disk_queue:deliver(Q),
+ = rabbit_disk_queue:fetch(Q),
AckTag1 = maybe_ack(Q, IsDurable, IsPersistent, AckTag),
{{Msg, IsDelivered, AckTag1, Rem}, State2};
_ ->
case rabbit_queue_prefetcher:drain(Prefetcher) of
- empty -> deliver(State #mqstate { prefetcher = undefined });
+ empty -> fetch(State #mqstate { prefetcher = undefined });
{Fetched, Len, Status} ->
State2 = #mqstate { msg_buf = MsgBuf2 } =
dec_queue_length(Len, State),
- deliver(State2 #mqstate
- { msg_buf = queue:join(Fetched, MsgBuf2),
- prefetcher = case Status of
- finished -> undefined;
- continuing -> Prefetcher
- end })
+ fetch(State2 #mqstate
+ { msg_buf = queue:join(Fetched, MsgBuf2),
+ prefetcher = case Status of
+ finished -> undefined;
+ continuing -> Prefetcher
+ end })
end
end.
@@ -407,38 +411,30 @@ remove_noacks(MsgsWithAcks) ->
{[AckTag | AccAckTags], size_of_message(Msg) + AccSize}
end, {[], 0}, MsgsWithAcks).
-ack(MsgsWithAcks, State = #mqstate { queue = Q, memory_size = QSize,
- memory_loss = Loss }) ->
+ack(MsgsWithAcks, State = #mqstate { queue = Q }) ->
{AckTags, ASize} = remove_noacks(MsgsWithAcks),
ok = case AckTags of
[] -> ok;
_ -> rabbit_disk_queue:ack(Q, AckTags)
end,
- State1 = State #mqstate { memory_size = QSize - ASize,
- memory_loss = Loss + ASize },
- {ok, State1}.
+ {ok, lose_memory(ASize, State)}.
tx_publish(Msg = #basic_message { is_persistent = IsPersistent },
- State = #mqstate { mode = Mode, memory_size = QSize,
- is_durable = IsDurable, memory_gain = Gain })
+ State = #mqstate { mode = Mode, is_durable = IsDurable })
when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
ok = rabbit_disk_queue:tx_publish(Msg),
MsgSize = size_of_message(Msg),
- {ok, State #mqstate { memory_size = QSize + MsgSize,
- memory_gain = Gain + MsgSize }};
-tx_publish(Msg, State = #mqstate { mode = mixed, memory_size = QSize,
- memory_gain = Gain }) ->
+ {ok, gain_memory(MsgSize, State)};
+tx_publish(Msg, State = #mqstate { mode = mixed }) ->
%% this message will reappear in the tx_commit, so ignore for now
MsgSize = size_of_message(Msg),
- {ok, State #mqstate { memory_size = QSize + MsgSize,
- memory_gain = Gain + MsgSize }}.
+ {ok, gain_memory(MsgSize, State)}.
only_msg_ids(Pubs) ->
lists:map(fun (Msg) -> {Msg #basic_message.guid, false} end, Pubs).
tx_commit(Publishes, MsgsWithAcks,
State = #mqstate { mode = disk, queue = Q, length = Length,
- memory_size = QSize, memory_loss = Loss,
msg_buf = MsgBuf }) ->
{RealAcks, ASize} = remove_noacks(MsgsWithAcks),
ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok;
@@ -446,14 +442,12 @@ tx_commit(Publishes, MsgsWithAcks,
RealAcks)
end,
Len = erlang:length(Publishes),
- {ok, State #mqstate { length = Length + Len,
- msg_buf = inc_queue_length(Q, MsgBuf, Len),
- memory_size = QSize - ASize,
- memory_loss = Loss + ASize }};
+ {ok, lose_memory(ASize, State #mqstate
+ { length = Length + Len,
+ msg_buf = inc_queue_length(Q, MsgBuf, Len) })};
tx_commit(Publishes, MsgsWithAcks,
State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
- is_durable = IsDurable, length = Length,
- memory_size = QSize, memory_loss = Loss }) ->
+ is_durable = IsDurable, length = Length }) ->
{PersistentPubs, MsgBuf1} =
lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent },
{Acc, MsgBuf2}) ->
@@ -471,23 +465,20 @@ tx_commit(Publishes, MsgsWithAcks,
false -> rabbit_disk_queue:tx_commit(
Q, lists:reverse(PersistentPubs), RealAcks)
end,
- {ok, State #mqstate { msg_buf = MsgBuf1, memory_size = QSize - ASize,
- length = Length + erlang:length(Publishes),
- memory_loss = Loss + ASize }}.
+ {ok, lose_memory(ASize, State #mqstate
+ { msg_buf = MsgBuf1,
+ length = Length + erlang:length(Publishes) })}.
-tx_cancel(Publishes, State = #mqstate { mode = disk, memory_size = QSize,
- memory_loss = Loss }) ->
+tx_cancel(Publishes, State = #mqstate { mode = disk }) ->
{MsgIds, CSize} =
lists:foldl(
fun (Msg = #basic_message { guid = MsgId }, {MsgIdsAcc, CSizeAcc}) ->
{[MsgId | MsgIdsAcc], CSizeAcc + size_of_message(Msg)}
end, {[], 0}, Publishes),
ok = rabbit_disk_queue:tx_cancel(MsgIds),
- {ok, State #mqstate { memory_size = QSize - CSize,
- memory_loss = Loss + CSize }};
-tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable,
- memory_size = QSize,
- memory_loss = Loss }) ->
+ {ok, lose_memory(CSize, State)};
+tx_cancel(Publishes,
+ State = #mqstate { mode = mixed, is_durable = IsDurable }) ->
{PersistentPubs, CSize} =
lists:foldl(
fun (Msg = #basic_message { is_persistent = IsPersistent,
@@ -503,8 +494,7 @@ tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable,
rabbit_disk_queue:tx_cancel(PersistentPubs);
true -> ok
end,
- {ok, State #mqstate { memory_size = QSize - CSize,
- memory_loss = Loss + CSize }}.
+ {ok, lose_memory(CSize, State)}.
%% [{Msg, AckTag}]
requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
@@ -555,32 +545,30 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
length = Length + erlang:length(MessagesWithAckTags)}}.
purge(State = #mqstate { queue = Q, mode = disk, length = Count,
- memory_loss = Loss, memory_size = QSize }) ->
+ memory_size = QSize }) ->
Count = rabbit_disk_queue:purge(Q),
- {Count, State #mqstate { length = 0, memory_size = 0,
- memory_loss = Loss + QSize }};
+ {Count, lose_memory(QSize, State)};
purge(State = #mqstate { queue = Q, mode = mixed, length = Length,
- memory_loss = Loss, memory_size = QSize,
- prefetcher = Prefetcher }) ->
+ memory_size = QSize, prefetcher = Prefetcher }) ->
case Prefetcher of
undefined -> ok;
_ -> rabbit_queue_prefetcher:drain_and_stop(Prefetcher)
end,
rabbit_disk_queue:purge(Q),
- {Length,
- State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0,
- memory_loss = Loss + QSize, prefetcher = undefined }}.
+ {Length, lose_memory(QSize, State #mqstate { msg_buf = queue:new(),
+ length = 0,
+ prefetcher = undefined })}.
delete_queue(State = #mqstate { queue = Q, memory_size = QSize,
- memory_loss = Loss, prefetcher = Prefetcher
+ prefetcher = Prefetcher
}) ->
case Prefetcher of
undefined -> ok;
_ -> rabbit_queue_prefetcher:drain_and_stop(Prefetcher)
end,
ok = rabbit_disk_queue:delete_queue(Q),
- {ok, State #mqstate { length = 0, memory_size = 0, msg_buf = queue:new(),
- memory_loss = Loss + QSize, prefetcher = undefined }}.
+ {ok, lose_memory(QSize, State #mqstate { length = 0, msg_buf = queue:new(),
+ prefetcher = undefined })}.
length(#mqstate { length = Length }) ->
Length.
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl
index c847848de8..ad6b1ce2d4 100644
--- a/src/rabbit_queue_prefetcher.erl
+++ b/src/rabbit_queue_prefetcher.erl
@@ -94,7 +94,7 @@
%% to its internal queue. A cast is not sufficient here because the
%% mixed_queue could come along, drain the prefetcher, thus
%% catching the msg just sent by the disk_queue and then call
-%% disk_queue:deliver(Q) which is normal priority call, which could
+%% disk_queue:fetch(Q) which is normal priority call, which could
%% overtake a reply cast from the prefetcher to the disk queue,
%% which would result in the same message being delivered
%% twice. Thus when the disk_queue calls prefetcher:publish(Msg),
@@ -146,7 +146,7 @@
%% mixed_queue tries to drain the prefetcher. We must therefore ensure
%% that this msg can't also be delivered to the mixed_queue directly
%% by the disk_queue through the mixed_queue calling
-%% disk_queue:deliver(Q) which is why the prefetcher:publish function
+%% disk_queue:fetch(Q) which is why the prefetcher:publish function
%% is a call and not a cast, thus blocking the disk_queue.
%%
%% Finally, the prefetcher is only created when the mixed_queue is
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ffd675a0b7..ad5a248314 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -157,7 +157,7 @@ test_simple_n_element_queue(N) ->
passed.
test_unfold() ->
- {[], test} = rabbit_misc:unfold(fun (V) -> false end, test),
+ {[], test} = rabbit_misc:unfold(fun (_V) -> false end, test),
List = lists:seq(2,20,2),
{List, 0} = rabbit_misc:unfold(fun (0) -> false;
(N) -> {true, N*2, N-1}
@@ -848,7 +848,7 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
[begin
Remaining = MsgCount - N,
{Message, _TSize, false, SeqId,
- Remaining} = rabbit_disk_queue:deliver(Q),
+ Remaining} = rabbit_disk_queue:fetch(Q),
ok = rdq_match_message(Message, N, Msg, MsgSizeBytes),
SeqId
end || N <- List],
@@ -895,7 +895,7 @@ rdq_stress_gc(MsgCount) ->
fun (MsgId, Acc) ->
Remaining = MsgCount - MsgId,
{Message, _TSize, false, SeqId, Remaining} =
- rabbit_disk_queue:deliver(q),
+ rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, MsgId, Msg, MsgSizeBytes),
dict:store(MsgId, SeqId, Acc)
end, dict:new(), List),
@@ -904,7 +904,7 @@ rdq_stress_gc(MsgCount) ->
rabbit_disk_queue:ack(q, [SeqId])
end || MsgId <- AckList2],
rabbit_disk_queue:tx_commit(q, [], []),
- empty = rabbit_disk_queue:deliver(q),
+ empty = rabbit_disk_queue:fetch(q),
rdq_stop(),
passed.
@@ -923,7 +923,7 @@ rdq_test_startup_with_queue_gaps() ->
Seqs = [begin
Remaining = Total - N,
{Message, _TSize, false, SeqId, Remaining} =
- rabbit_disk_queue:deliver(q),
+ rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, N, Msg, 256),
SeqId
end || N <- lists:seq(1,Half)],
@@ -945,7 +945,7 @@ rdq_test_startup_with_queue_gaps() ->
Seqs2 = [begin
Remaining = round(Total - ((Half + N)/2)),
{Message, _TSize, true, SeqId, Remaining} =
- rabbit_disk_queue:deliver(q),
+ rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, N, Msg, 256),
SeqId
end || N <- lists:seq(2,Half,2)],
@@ -955,13 +955,13 @@ rdq_test_startup_with_queue_gaps() ->
Seqs3 = [begin
Remaining = Total - N,
{Message, _TSize, false, SeqId, Remaining} =
- rabbit_disk_queue:deliver(q),
+ rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, N, Msg, 256),
SeqId
end || N <- lists:seq(1 + Half,Total)],
rabbit_disk_queue:tx_commit(q, [], Seqs3),
io:format("Read second half done~n", []),
- empty = rabbit_disk_queue:deliver(q),
+ empty = rabbit_disk_queue:fetch(q),
rdq_stop(),
passed.
@@ -980,7 +980,7 @@ rdq_test_redeliver() ->
Seqs = [begin
Remaining = Total - N,
{Message, _TSize, false, SeqId, Remaining} =
- rabbit_disk_queue:deliver(q),
+ rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, N, Msg, 256),
SeqId
end || N <- lists:seq(1,Half)],
@@ -1001,7 +1001,7 @@ rdq_test_redeliver() ->
Seqs2 = [begin
Remaining = round(Total - N + (Half/2)),
{Message, _TSize, false, SeqId, Remaining} =
- rabbit_disk_queue:deliver(q),
+ rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, N, Msg, 256),
SeqId
end || N <- lists:seq(1+Half, Total)],
@@ -1009,12 +1009,12 @@ rdq_test_redeliver() ->
Seqs3 = [begin
Remaining = round((Half - N) / 2) - 1,
{Message, _TSize, true, SeqId, Remaining} =
- rabbit_disk_queue:deliver(q),
+ rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, N, Msg, 256),
SeqId
end || N <- lists:seq(1, Half, 2)],
rabbit_disk_queue:tx_commit(q, [], Seqs3),
- empty = rabbit_disk_queue:deliver(q),
+ empty = rabbit_disk_queue:fetch(q),
rdq_stop(),
passed.
@@ -1033,7 +1033,7 @@ rdq_test_purge() ->
Seqs = [begin
Remaining = Total - N,
{Message, _TSize, false, SeqId, Remaining} =
- rabbit_disk_queue:deliver(q),
+ rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, N, Msg, 256),
SeqId
end || N <- lists:seq(1,Half)],
@@ -1042,7 +1042,7 @@ rdq_test_purge() ->
io:format("Purge done~n", []),
rabbit_disk_queue:tx_commit(q, [], Seqs),
io:format("Ack first half done~n", []),
- empty = rabbit_disk_queue:deliver(q),
+ empty = rabbit_disk_queue:fetch(q),
rdq_stop(),
passed.
@@ -1051,7 +1051,7 @@ rdq_new_mixed_queue(Q, Durable, Disk) ->
{MS1, _, _, _} =
rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS),
case Disk of
- true -> {ok, MS2} = rabbit_mixed_queue:to_disk_only_mode([], MS1),
+ true -> {ok, MS2} = rabbit_mixed_queue:set_mode(disk, [], MS1),
MS2;
false -> MS1
end.
@@ -1083,11 +1083,11 @@ rdq_test_mixed_queue_modes() ->
30 = rabbit_mixed_queue:length(MS6),
io:format("Published a mixture of messages; ~w~n",
[rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS6)]),
- {ok, MS7} = rabbit_mixed_queue:to_disk_only_mode([], MS6),
+ {ok, MS7} = rabbit_mixed_queue:set_mode(disk, [], MS6),
30 = rabbit_mixed_queue:length(MS7),
io:format("Converted to disk only mode; ~w~n",
[rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS7)]),
- {ok, MS8} = rabbit_mixed_queue:to_mixed_mode([], MS7),
+ {ok, MS8} = rabbit_mixed_queue:set_mode(mixed, [], MS7),
30 = rabbit_mixed_queue:length(MS8),
io:format("Converted to mixed mode; ~w~n",
[rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS8)]),
@@ -1097,12 +1097,12 @@ rdq_test_mixed_queue_modes() ->
Rem = 30 - N,
{{#basic_message { is_persistent = false },
false, _AckTag, Rem},
- MS9a} = rabbit_mixed_queue:deliver(MS9),
+ MS9a} = rabbit_mixed_queue:fetch(MS9),
MS9a
end, MS8, lists:seq(1,10)),
20 = rabbit_mixed_queue:length(MS10),
io:format("Delivered initial non persistent messages~n"),
- {ok, MS11} = rabbit_mixed_queue:to_disk_only_mode([], MS10),
+ {ok, MS11} = rabbit_mixed_queue:set_mode(disk, [], MS10),
20 = rabbit_mixed_queue:length(MS11),
io:format("Converted to disk only mode~n"),
rdq_stop(),
@@ -1116,13 +1116,13 @@ rdq_test_mixed_queue_modes() ->
Rem = 10 - N,
{{Msg = #basic_message { is_persistent = true },
false, AckTag, Rem},
- MS13a} = rabbit_mixed_queue:deliver(MS13),
+ MS13a} = rabbit_mixed_queue:fetch(MS13),
{MS13a, [{Msg, AckTag} | AcksAcc]}
end, {MS12, []}, lists:seq(1,10)),
0 = rabbit_mixed_queue:length(MS14),
{ok, MS15} = rabbit_mixed_queue:ack(AckTags, MS14),
io:format("Delivered and acked all messages~n"),
- {ok, MS16} = rabbit_mixed_queue:to_disk_only_mode([], MS15),
+ {ok, MS16} = rabbit_mixed_queue:set_mode(disk, [], MS15),
0 = rabbit_mixed_queue:length(MS16),
io:format("Converted to disk only mode~n"),
rdq_stop(),
@@ -1149,28 +1149,28 @@ rdq_test_mode_conversion_mid_txn() ->
rdq_start(),
MS0 = rdq_new_mixed_queue(q, true, false),
passed = rdq_tx_publish_mixed_alter_commit_get(
- MS0, MsgsA, MsgsB, fun rabbit_mixed_queue:to_disk_only_mode/2, commit),
+ MS0, MsgsA, MsgsB, disk, commit),
rdq_stop_virgin_start(),
MS1 = rdq_new_mixed_queue(q, true, false),
passed = rdq_tx_publish_mixed_alter_commit_get(
- MS1, MsgsA, MsgsB, fun rabbit_mixed_queue:to_disk_only_mode/2, cancel),
+ MS1, MsgsA, MsgsB, disk, cancel),
rdq_stop_virgin_start(),
MS2 = rdq_new_mixed_queue(q, true, true),
passed = rdq_tx_publish_mixed_alter_commit_get(
- MS2, MsgsA, MsgsB, fun rabbit_mixed_queue:to_mixed_mode/2, commit),
+ MS2, MsgsA, MsgsB, mixed, commit),
rdq_stop_virgin_start(),
MS3 = rdq_new_mixed_queue(q, true, true),
passed = rdq_tx_publish_mixed_alter_commit_get(
- MS3, MsgsA, MsgsB, fun rabbit_mixed_queue:to_mixed_mode/2, cancel),
+ MS3, MsgsA, MsgsB, mixed, cancel),
rdq_stop(),
passed.
-rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, ChangeFun, CommitOrCancel) ->
+rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) ->
0 = rabbit_mixed_queue:length(MS0),
MS2 = lists:foldl(
fun (Msg, MS1) ->
@@ -1185,7 +1185,7 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, ChangeFun, CommitOrCanc
MS3a
end, MS2, MsgsB),
Len0 = rabbit_mixed_queue:length(MS4),
- {ok, MS5} = ChangeFun(MsgsB, MS4),
+ {ok, MS5} = rabbit_mixed_queue:set_mode(Mode, MsgsB, MS4),
Len0 = rabbit_mixed_queue:length(MS5),
{ok, MS9} =
case CommitOrCancel of
@@ -1198,7 +1198,7 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, ChangeFun, CommitOrCanc
fun (Msg, {Acc, MS7}) ->
Rem = Len1 - (Msg #basic_message.guid) - 1,
{{Msg, false, AckTag, Rem}, MS7a} =
- rabbit_mixed_queue:deliver(MS7),
+ rabbit_mixed_queue:fetch(MS7),
{[{Msg, AckTag} | Acc], MS7a}
end, {[], MS6}, MsgsA ++ MsgsB),
0 = rabbit_mixed_queue:length(MS8),
@@ -1211,7 +1211,7 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, ChangeFun, CommitOrCanc
fun (Msg, {Acc, MS7}) ->
Rem = Len0 - (Msg #basic_message.guid) - 1,
{{Msg, false, AckTag, Rem}, MS7a} =
- rabbit_mixed_queue:deliver(MS7),
+ rabbit_mixed_queue:fetch(MS7),
{[{Msg, AckTag} | Acc], MS7a}
end, {[], MS6}, MsgsA),
0 = rabbit_mixed_queue:length(MS8),
@@ -1244,7 +1244,7 @@ rdq_test_disk_queue_modes() ->
Seqs = [begin
Remaining = Total - N,
{Message, _TSize, false, SeqId, Remaining} =
- rabbit_disk_queue:deliver(q),
+ rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, N, Msg, 256),
SeqId
end || N <- Half1],
@@ -1254,7 +1254,7 @@ rdq_test_disk_queue_modes() ->
Seqs2 = [begin
Remaining = Total - N,
{Message, _TSize, false, SeqId, Remaining} =
- rabbit_disk_queue:deliver(q),
+ rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, N, Msg, 256),
SeqId
end || N <- Half2],
@@ -1262,7 +1262,7 @@ rdq_test_disk_queue_modes() ->
ok = rabbit_disk_queue:tx_commit(q, [], Seqs),
ok = rabbit_disk_queue:to_disk_only_mode(),
ok = rabbit_disk_queue:tx_commit(q, [], Seqs2),
- empty = rabbit_disk_queue:deliver(q),
+ empty = rabbit_disk_queue:fetch(q),
rdq_stop(),
passed.