diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-19 18:29:39 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-19 18:29:39 +0100 |
| commit | bc61f0b7fbea972eb305c7d16bf7570eaa6cc6cc (patch) | |
| tree | e2d3f74b704d8c7bf1bb8191152cca2b1c6d213e | |
| parent | f422662d84296f46c71195770887c355e18a6d9d (diff) | |
| download | rabbitmq-server-git-bc61f0b7fbea972eb305c7d16bf7570eaa6cc6cc.tar.gz | |
Pretty much all the low hanging fruit. Need to check the ets access in txns in disk_queue and also deal with the clean shutdown and delivery bits.
> ** queue_prefetcher
> - s/publish/deliver ?
No, I really don't like that. Publish is about pushing messages to the receiver. Thus it's named correctly.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 60 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 111 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 162 | ||||
| -rw-r--r-- | src/rabbit_queue_prefetcher.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 64 |
6 files changed, 189 insertions, 218 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1d9f8c53cd..6c4c0ebb69 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -42,7 +42,7 @@ -export([notify_sent/2, unblock/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). --export([set_mode/2, report_memory/1]). +-export([set_mode/2]). -import(mnesia). -import(gen_server2). @@ -107,7 +107,6 @@ -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). --spec(report_memory/1 :: (pid()) -> 'ok'). -endif. @@ -227,9 +226,6 @@ map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). set_mode(QPid, Mode) -> gen_server2:pcast(QPid, 10, {set_mode, Mode}). -report_memory(QPid) -> - gen_server2:cast(QPid, report_memory). - info(#amqqueue{ pid = QPid }) -> gen_server2:pcall(QPid, 9, info, infinity). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 14a0370d83..b1c409b1f2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -38,7 +38,7 @@ -define(UNSENT_MESSAGE_LIMIT, 100). -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). --define(MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds +-define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds -export([start_link/1]). @@ -142,8 +142,8 @@ noreply(NewState) -> {noreply, start_memory_timer(NewState), hibernate}. start_memory_timer(State = #q { memory_report_timer = undefined }) -> - {ok, TRef} = timer:apply_after(?MEMORY_REPORT_TIME_INTERVAL, - rabbit_amqqueue, report_memory, [self()]), + {ok, TRef} = timer:send_after(?MINIMUM_MEMORY_REPORT_TIME_INTERVAL, + report_memory), report_memory(false, State #q { memory_report_timer = TRef }); start_memory_timer(State) -> State. @@ -199,11 +199,12 @@ record_current_channel_tx(ChPid, Txn) -> %% that wasn't happening already) store_ch_record((ch_record(ChPid))#cr{txn = Txn}). -deliver_queue(Funs = {PredFun, DeliverFun}, FunAcc, - State = #q{q = #amqqueue{name = QName}, - active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers, - next_msg_id = NextId}) -> +deliver_msgs_to_consumers( + Funs = {PredFun, DeliverFun}, FunAcc, + State = #q{q = #amqqueue{name = QName}, + active_consumers = ActiveConsumers, + blocked_consumers = BlockedConsumers, + next_msg_id = NextId}) -> case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, @@ -246,7 +247,7 @@ deliver_queue(Funs = {PredFun, DeliverFun}, FunAcc, blocked_consumers = NewBlockedConsumers, next_msg_id = NextId + 1 }, - deliver_queue(Funs, FunAcc1, State2); + deliver_msgs_to_consumers(Funs, FunAcc1, State2); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> store_ch_record(C#cr{is_limit_active = true}), @@ -254,7 +255,7 @@ deliver_queue(Funs = {PredFun, DeliverFun}, FunAcc, move_consumers(ChPid, ActiveConsumers, BlockedConsumers), - deliver_queue( + deliver_msgs_to_consumers( Funs, FunAcc, State#q{active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers}); @@ -271,7 +272,7 @@ deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) -> deliver_from_queue_deliver(AckRequired, {false, AutoAcks}, State = #q { mixed_state = MS }) -> {{Msg, IsDelivered, AckTag, Remaining}, MS1} = - rabbit_mixed_queue:deliver(MS), + rabbit_mixed_queue:fetch(MS), AutoAcks1 = case AckRequired of true -> AutoAcks; @@ -285,7 +286,7 @@ run_message_queue(State = #q { mixed_state = MS }) -> fun deliver_from_queue_deliver/3 }, IsEmpty = rabbit_mixed_queue:is_empty(MS), {{_IsEmpty1, AutoAcks}, State1} = - deliver_queue(Funs, {IsEmpty, []}, State), + deliver_msgs_to_consumers(Funs, {IsEmpty, []}, State), {ok, MS1} = rabbit_mixed_queue:ack(AutoAcks, State1 #q.mixed_state), State1 #q { mixed_state = MS1 }. @@ -306,7 +307,7 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) -> end, {{Msg, false, AckTag}, true, State2} end, - deliver_queue({ PredFun, DeliverFun }, false, State); + deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); attempt_immediate_delivery(Txn, ChPid, Msg, State) -> {ok, MS} = rabbit_mixed_queue:tx_publish(Msg, State #q.mixed_state), record_pending_message(Txn, ChPid, Msg), @@ -330,8 +331,8 @@ deliver_or_requeue_n(MsgsWithAcks, State) -> Funs = { fun deliver_or_requeue_msgs_pred/2, fun deliver_or_requeue_msgs_deliver/3 }, {{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} = - deliver_queue(Funs, {length(MsgsWithAcks) - 1, [], MsgsWithAcks}, - State), + deliver_msgs_to_consumers( + Funs, {length(MsgsWithAcks), [], MsgsWithAcks}, State), {ok, MS} = rabbit_mixed_queue:ack(AutoAcks, NewState #q.mixed_state), case OutstandingMsgs of @@ -341,7 +342,7 @@ deliver_or_requeue_n(MsgsWithAcks, State) -> end. deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) -> - -1 < Len. + 0 < Len. deliver_or_requeue_msgs_deliver( false, {Len, AcksAcc, [(MsgAckTag = {Msg, _}) | MsgsWithAcks]}, State) -> {{Msg, true, noack}, {Len - 1, [MsgAckTag | AcksAcc], MsgsWithAcks}, State}; @@ -612,11 +613,11 @@ handle_call({basic_get, ChPid, NoAck}, _From, next_msg_id = NextId, mixed_state = MS }) -> - case rabbit_mixed_queue:deliver(MS) of + case rabbit_mixed_queue:fetch(MS) of {empty, MS1} -> reply(empty, State #q { mixed_state = MS1 }); {{Msg, IsDelivered, AckTag, Remaining}, MS1} -> AckRequired = not(NoAck), - {ok, MS3} = + {ok, MS2} = case AckRequired of true -> C = #cr{unacked_messages = UAM} = ch_record(ChPid), @@ -628,9 +629,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, end, Message = {QName, self(), NextId, IsDelivered, Msg}, reply({ok, Remaining, Message}, - State #q { next_msg_id = NextId + 1, - mixed_state = MS3 - }) + State #q { next_msg_id = NextId + 1, mixed_state = MS2 }) end; handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, @@ -769,9 +768,9 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> not_found -> noreply(State); C = #cr{unacked_messages = UAM} -> - {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM), case Txn of none -> + {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM), {ok, MS} = rabbit_mixed_queue:ack(MsgWithAcks, State #q.mixed_state), store_ch_record(C#cr{unacked_messages = Remaining}), @@ -829,16 +828,13 @@ handle_cast({set_mode, Mode}, State = #q { mixed_state = MS }) -> PendingMessages = lists:flatten([Pending || #tx { pending_messages = Pending} <- all_tx_record()]), - {ok, MS1} = (case Mode of - disk -> fun rabbit_mixed_queue:to_disk_only_mode/2; - mixed -> fun rabbit_mixed_queue:to_mixed_mode/2 - end)(PendingMessages, MS), - noreply(State #q { mixed_state = MS1 }); - -handle_cast(report_memory, State) -> - %% deliberately don't call noreply/2 as we don't want to restart the timer - %% by unsetting the timer, we force a report on the next normal message - {noreply, State #q { memory_report_timer = undefined }, hibernate}. + {ok, MS1} = rabbit_mixed_queue:set_mode(Mode, PendingMessages, MS), + noreply(State #q { mixed_state = MS1 }). + +handle_info(report_memory, State) -> + %% deliberately don't call noreply/2 as we don't want to restart the timer. + %% By unsetting the timer, we force a report on the next normal message + {noreply, State #q { memory_report_timer = undefined }, hibernate}; handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 5940f5ad04..e2f341ffd4 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -39,7 +39,7 @@ terminate/2, code_change/3]). -export([handle_pre_hibernate/1]). --export([publish/3, deliver/1, phantom_deliver/1, ack/2, +-export([publish/3, fetch/1, phantom_fetch/1, ack/2, tx_publish/1, tx_commit/3, tx_cancel/1, requeue/2, purge/1, delete_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1, @@ -48,27 +48,27 @@ -export([filesync/0, cache_info/0]). --export([stop/0, stop_and_obliterate/0, report_memory/0, - set_mode/1, to_disk_only_mode/0, to_ram_disk_mode/0]). +-export([stop/0, stop_and_obliterate/0, set_mode/1, to_disk_only_mode/0, + to_ram_disk_mode/0]). -include("rabbit.hrl"). --define(WRITE_OK_SIZE_BITS, 8). --define(WRITE_OK_TRANSIENT, 255). --define(WRITE_OK_PERSISTENT, 254). --define(INTEGER_SIZE_BYTES, 8). --define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). --define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). --define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). --define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences). --define(CACHE_ETS_NAME, rabbit_disk_queue_cache). --define(FILE_EXTENSION, ".rdq"). --define(FILE_EXTENSION_TMP, ".rdt"). --define(FILE_EXTENSION_DETS, ".dets"). --define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))). --define(MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds --define(BATCH_SIZE, 10000). --define(CACHE_MAX_SIZE, 10485760). +-define(WRITE_OK_SIZE_BITS, 8). +-define(WRITE_OK_TRANSIENT, 255). +-define(WRITE_OK_PERSISTENT, 254). +-define(INTEGER_SIZE_BYTES, 8). +-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). +-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). +-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). +-define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences). +-define(CACHE_ETS_NAME, rabbit_disk_queue_cache). +-define(FILE_EXTENSION, ".rdq"). +-define(FILE_EXTENSION_TMP, ".rdt"). +-define(FILE_EXTENSION_DETS, ".dets"). +-define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))). +-define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in millisecs +-define(BATCH_SIZE, 10000). +-define(CACHE_MAX_SIZE, 10485760). -define(SERVER, ?MODULE). @@ -94,11 +94,11 @@ file_size_limit, %% how big can our files get? read_file_handles, %% file handles for reading (LRU) read_file_handles_limit, %% how many file handles can we open? - on_sync_txns, %% list of commiters to run on sync (reversed) + on_sync_txns, %% list of commiters to run on sync (reversed) commit_timer_ref, %% TRef for our interval timer last_sync_offset, %% current_offset at the last time we sync'd message_cache, %% ets message cache - memory_report_timer, %% TRef for the memory report timer + memory_report_timer_ref, %% TRef for the memory report timer wordsize, %% bytes in a word on this platform mnesia_bytes_per_record, %% bytes per record in mnesia in ram_disk mode ets_bytes_per_record %% bytes per record in msg_location_ets @@ -253,10 +253,10 @@ -spec(start_link/0 :: () -> ({'ok', pid()} | 'ignore' | {'error', any()})). -spec(publish/3 :: (queue_name(), message(), bool()) -> 'ok'). --spec(deliver/1 :: (queue_name()) -> +-spec(fetch/1 :: (queue_name()) -> ('empty' | {message(), non_neg_integer(), bool(), {msg_id(), seq_id()}, non_neg_integer()})). --spec(phantom_deliver/1 :: (queue_name()) -> +-spec(phantom_fetch/1 :: (queue_name()) -> ( 'empty' | {msg_id(), bool(), bool(), {msg_id(), seq_id()}, non_neg_integer()})). -spec(prefetch/1 :: (queue_name()) -> 'ok'). @@ -281,7 +281,6 @@ -spec(to_ram_disk_mode/0 :: () -> 'ok'). -spec(filesync/0 :: () -> 'ok'). -spec(cache_info/0 :: () -> [{atom(), term()}]). --spec(report_memory/0 :: () -> 'ok'). -spec(set_mode/1 :: ('disk' | 'mixed') -> 'ok'). -endif. @@ -295,11 +294,11 @@ start_link() -> publish(Q, Message = #basic_message {}, IsDelivered) -> gen_server2:cast(?SERVER, {publish, Q, Message, IsDelivered}). -deliver(Q) -> - gen_server2:call(?SERVER, {deliver, Q}, infinity). +fetch(Q) -> + gen_server2:call(?SERVER, {fetch, Q}, infinity). -phantom_deliver(Q) -> - gen_server2:call(?SERVER, {phantom_deliver, Q}, infinity). +phantom_fetch(Q) -> + gen_server2:call(?SERVER, {phantom_fetch, Q}, infinity). prefetch(Q) -> gen_server2:pcast(?SERVER, -1, {prefetch, Q, self()}). @@ -360,9 +359,6 @@ filesync() -> cache_info() -> gen_server2:call(?SERVER, cache_info, infinity). -report_memory() -> - gen_server2:cast(?SERVER, report_memory). - set_mode(Mode) -> gen_server2:pcast(?SERVER, 10, {set_mode, Mode}). @@ -406,8 +402,6 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> %% seems to blow up if it is set private MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]), - TRef = start_memory_timer(), - InitName = "0" ++ ?FILE_EXTENSION, State = #dqstate { msg_location_dets = MsgLocationDets, @@ -430,7 +424,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> last_sync_offset = 0, message_cache = ets:new(?CACHE_ETS_NAME, [set, private]), - memory_report_timer = TRef, + memory_report_timer_ref = undefined, wordsize = erlang:system_info(wordsize), mnesia_bytes_per_record = undefined, ets_bytes_per_record = undefined @@ -457,14 +451,14 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> %% ets_bytes_per_record otherwise. ok = rabbit_queue_mode_manager:report_memory(self(), 0, false), ok = report_memory(false, State2), - {ok, State2, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, - ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + {ok, start_memory_timer(State2), hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({deliver, Q}, _From, State) -> - {ok, Result, State1} = internal_deliver(Q, true, false, true, State), +handle_call({fetch, Q}, _From, State) -> + {ok, Result, State1} = internal_fetch(Q, true, false, true, State), reply(Result, State1); -handle_call({phantom_deliver, Q}, _From, State) -> - {ok, Result, State1} = internal_deliver(Q, false, false, true, State), +handle_call({phantom_fetch, Q}, _From, State) -> + {ok, Result, State1} = internal_fetch(Q, false, false, true, State), reply(Result, State1); handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) -> State1 = @@ -534,13 +528,8 @@ handle_cast({set_mode, Mode}, State) -> disk -> fun to_disk_only_mode/1; mixed -> fun to_ram_disk_mode/1 end)(State)); -handle_cast(report_memory, State) -> - %% call noreply1/2, not noreply/1/2, as we don't want to restart the - %% memory_report_timer - %% by unsetting the timer, we force a report on the next normal message - noreply1(State #dqstate { memory_report_timer = undefined }); handle_cast({prefetch, Q, From}, State) -> - {ok, Result, State1} = internal_deliver(Q, true, true, false, State), + {ok, Result, State1} = internal_fetch(Q, true, true, false, State), Cont = rabbit_misc:with_exit_handler( fun () -> false end, fun () -> @@ -550,7 +539,7 @@ handle_cast({prefetch, Q, From}, State) -> State3 = case Cont of true -> - case internal_deliver(Q, false, false, true, State1) of + case internal_fetch(Q, false, false, true, State1) of {ok, empty, State2} -> State2; {ok, {_MsgId, _IsPersistent, _Delivered, _MsgSeqId, _Rem}, State2} -> State2 @@ -559,6 +548,11 @@ handle_cast({prefetch, Q, From}, State) -> end, noreply(State3). +handle_info(report_memory, State) -> + %% call noreply1/2, not noreply/1/2, as we don't want to restart the + %% memory_report_timer_ref. + %% By unsetting the timer, we force a report on the next normal message + noreply1(State #dqstate { memory_report_timer_ref = undefined }); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info(timeout, State) -> @@ -595,7 +589,7 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, State1 #dqstate { current_file_handle = undefined, current_dirty = false, read_file_handles = {dict:new(), gb_trees:empty()}, - memory_report_timer = undefined + memory_report_timer_ref = undefined }. code_change(_OldVsn, State, _Extra) -> @@ -603,20 +597,17 @@ code_change(_OldVsn, State, _Extra) -> %% ---- UTILITY FUNCTIONS ---- -stop_memory_timer(State = #dqstate { memory_report_timer = undefined }) -> +stop_memory_timer(State = #dqstate { memory_report_timer_ref = undefined }) -> State; -stop_memory_timer(State = #dqstate { memory_report_timer = TRef }) -> +stop_memory_timer(State = #dqstate { memory_report_timer_ref = TRef }) -> {ok, cancel} = timer:cancel(TRef), - State #dqstate { memory_report_timer = undefined }. - -start_memory_timer() -> - {ok, TRef} = timer:apply_after(?MEMORY_REPORT_TIME_INTERVAL, - rabbit_disk_queue, report_memory, []), - TRef. + State #dqstate { memory_report_timer_ref = undefined }. -start_memory_timer(State = #dqstate { memory_report_timer = undefined }) -> +start_memory_timer(State = #dqstate { memory_report_timer_ref = undefined }) -> ok = report_memory(false, State), - State #dqstate { memory_report_timer = start_memory_timer() }; + {ok, TRef} = timer:send_after(?MINIMUM_MEMORY_REPORT_TIME_INTERVAL, + report_memory), + State #dqstate { memory_report_timer_ref = TRef }; start_memory_timer(State) -> State. @@ -893,7 +884,7 @@ cache_is_full(#dqstate { message_cache = Cache }) -> %% ---- INTERNAL RAW FUNCTIONS ---- -internal_deliver(Q, ReadMsg, FakeDeliver, Advance, +internal_fetch(Q, ReadMsg, FakeDeliver, Advance, State = #dqstate { sequences = Sequences }) -> case sequence_lookup(Sequences, Q) of {SeqId, SeqId} -> {ok, empty, State}; @@ -971,7 +962,7 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) - end. internal_auto_ack(Q, State) -> - case internal_deliver(Q, false, false, true, State) of + case internal_fetch(Q, false, false, true, State) of {ok, empty, State1} -> {ok, State1}; {ok, {_MsgId, _IsPersistent, _Delivered, MsgSeqId, _Remaining}, State1} -> diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 3d989662db..2b25ab0fac 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -35,11 +35,11 @@ -export([init/2]). --export([publish/2, publish_delivered/2, deliver/1, ack/2, +-export([publish/2, publish_delivered/2, fetch/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1, length/1, is_empty/1, delete_queue/1, maybe_prefetch/1]). --export([to_disk_only_mode/2, to_mixed_mode/2, info/1, +-export([set_mode/3, info/1, estimate_queue_memory_and_reset_counters/1]). -record(mqstate, { mode, @@ -76,7 +76,7 @@ -spec(publish/2 :: (message(), mqstate()) -> okmqs()). -spec(publish_delivered/2 :: (message(), mqstate()) -> {'ok', acktag(), mqstate()}). --spec(deliver/1 :: (mqstate()) -> +-spec(fetch/1 :: (mqstate()) -> {('empty' | {message(), boolean(), acktag(), non_neg_integer()}), mqstate()}). -spec(ack/2 :: ([{message(), acktag()}], mqstate()) -> okmqs()). @@ -91,8 +91,7 @@ -spec(length/1 :: (mqstate()) -> non_neg_integer()). -spec(is_empty/1 :: (mqstate()) -> boolean()). --spec(to_disk_only_mode/2 :: ([message()], mqstate()) -> okmqs()). --spec(to_mixed_mode/2 :: ([message()], mqstate()) -> okmqs()). +-spec(set_mode/3 :: (mode(), [message()], mqstate()) -> okmqs()). -spec(estimate_queue_memory_and_reset_counters/1 :: (mqstate()) -> {mqstate(), non_neg_integer(), non_neg_integer(), @@ -120,8 +119,13 @@ size_of_message( SumAcc + size(Frag) end, 0, Payload). -to_disk_only_mode(_TxnMessages, State = #mqstate { mode = disk }) -> +set_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) -> {ok, State}; +set_mode(disk, TxnMessages, State) -> + to_disk_only_mode(TxnMessages, State); +set_mode(mixed, TxnMessages, State) -> + to_mixed_mode(TxnMessages, State). + to_disk_only_mode(TxnMessages, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, is_durable = IsDurable, prefetcher = Prefetcher @@ -219,8 +223,6 @@ flush_requeue_to_disk_queue(Q, RequeueCount, Commit) -> ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount), []. -to_mixed_mode(_TxnMessages, State = #mqstate { mode = mixed }) -> - {ok, State}; to_mixed_mode(TxnMessages, State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable }) -> rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]), @@ -248,6 +250,16 @@ to_mixed_mode(TxnMessages, State = #mqstate { mode = disk, queue = Q, garbage_collect(), {ok, State #mqstate { mode = mixed }}. +gain_memory(Inc, State = #mqstate { memory_size = QSize, + memory_gain = Gain }) -> + State #mqstate { memory_size = QSize + Inc, + memory_gain = Gain + Inc }. + +lose_memory(Dec, State = #mqstate { memory_size = QSize, + memory_loss = Loss }) -> + State #mqstate { memory_size = QSize - Dec, + memory_loss = Loss + Dec }. + inc_queue_length(_Q, MsgBuf, 0) -> MsgBuf; inc_queue_length(Q, MsgBuf, Count) -> @@ -264,7 +276,7 @@ dec_queue_length(Count, State = #mqstate { queue = Q, msg_buf = MsgBuf }) -> {{value, {Q, Len}}, MsgBuf1} -> case Len of Count -> - maybe_prefetch(State #mqstate { msg_buf = MsgBuf1 }); + State #mqstate { msg_buf = MsgBuf1 }; _ when Len > Count -> State #mqstate { msg_buf = queue:in_r({Q, Len-Count}, MsgBuf1)} @@ -286,26 +298,23 @@ maybe_prefetch(State) -> State. publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length, - msg_buf = MsgBuf, memory_size = QSize, - memory_gain = Gain }) -> + msg_buf = MsgBuf }) -> MsgBuf1 = inc_queue_length(Q, MsgBuf, 1), ok = rabbit_disk_queue:publish(Q, Msg, false), MsgSize = size_of_message(Msg), - {ok, State #mqstate { memory_gain = Gain + MsgSize, - memory_size = QSize + MsgSize, - msg_buf = MsgBuf1, length = Length + 1 }}; + {ok, gain_memory(MsgSize, State #mqstate { msg_buf = MsgBuf1, + length = Length + 1 })}; publish(Msg = #basic_message { is_persistent = IsPersistent }, State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable, - msg_buf = MsgBuf, length = Length, memory_size = QSize, - memory_gain = Gain }) -> + msg_buf = MsgBuf, length = Length }) -> ok = case IsDurable andalso IsPersistent of true -> rabbit_disk_queue:publish(Q, Msg, false); false -> ok end, MsgSize = size_of_message(Msg), - {ok, State #mqstate { msg_buf = queue:in({Msg, false}, MsgBuf), - length = Length + 1, memory_size = QSize + MsgSize, - memory_gain = Gain + MsgSize }}. + {ok, gain_memory(MsgSize, + State #mqstate { msg_buf = queue:in({Msg, false}, MsgBuf), + length = Length + 1 })}. %% Assumption here is that the queue is empty already (only called via %% attempt_immediate_delivery). @@ -313,20 +322,18 @@ publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent}, State = #mqstate { mode = Mode, is_durable = IsDurable, - queue = Q, length = 0, - memory_size = QSize, memory_gain = Gain }) + queue = Q, length = 0 }) when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> ok = rabbit_disk_queue:publish(Q, Msg, true), MsgSize = size_of_message(Msg), - State1 = State #mqstate { memory_size = QSize + MsgSize, - memory_gain = Gain + MsgSize }, + State1 = gain_memory(MsgSize, State), case IsDurable andalso IsPersistent of true -> %% must call phantom_deliver otherwise the msg remains at %% the head of the queue. This is synchronous, but %% unavoidable as we need the AckTag {MsgId, IsPersistent, true, AckTag, 0} = - rabbit_disk_queue:phantom_deliver(Q), + rabbit_disk_queue:phantom_fetch(Q), {ok, AckTag, State1}; false -> %% in this case, we don't actually care about the ack, so @@ -334,18 +341,15 @@ publish_delivered(Msg = ok = rabbit_disk_queue:auto_ack_next_message(Q), {ok, noack, State1} end; -publish_delivered(Msg, State = - #mqstate { mode = mixed, length = 0, memory_size = QSize, - memory_gain = Gain }) -> +publish_delivered(Msg, State = #mqstate { mode = mixed, length = 0 }) -> MsgSize = size_of_message(Msg), - {ok, noack, State #mqstate { memory_size = QSize + MsgSize, - memory_gain = Gain + MsgSize }}. + {ok, noack, gain_memory(MsgSize, State)}. -deliver(State = #mqstate { length = 0 }) -> +fetch(State = #mqstate { length = 0 }) -> {empty, State}; -deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q, - is_durable = IsDurable, length = Length, - prefetcher = Prefetcher }) -> +fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q, + is_durable = IsDurable, length = Length, + prefetcher = Prefetcher }) -> {{value, Value}, MsgBuf1} = queue:out(MsgBuf), Rem = Length - 1, State1 = State #mqstate { length = Rem }, @@ -356,13 +360,13 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q, case IsDurable andalso IsPersistent of true -> {MsgId, IsPersistent, IsDelivered, AckTag1, _PRem} - = rabbit_disk_queue:phantom_deliver(Q), + = rabbit_disk_queue:phantom_fetch(Q), AckTag1; false -> noack end, - State2 = maybe_prefetch(State1 #mqstate { msg_buf = MsgBuf1 }), - {{Msg, IsDelivered, AckTag, Rem}, State2}; + {{Msg, IsDelivered, AckTag, Rem}, + State1 #mqstate { msg_buf = MsgBuf1 }}; {Msg = #basic_message { is_persistent = IsPersistent }, IsDelivered, AckTag} -> %% message has come via the prefetcher, thus it's been @@ -375,21 +379,21 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q, State2 = dec_queue_length(1, State1), {Msg = #basic_message { is_persistent = IsPersistent }, _Size, IsDelivered, AckTag, _PersistRem} - = rabbit_disk_queue:deliver(Q), + = rabbit_disk_queue:fetch(Q), AckTag1 = maybe_ack(Q, IsDurable, IsPersistent, AckTag), {{Msg, IsDelivered, AckTag1, Rem}, State2}; _ -> case rabbit_queue_prefetcher:drain(Prefetcher) of - empty -> deliver(State #mqstate { prefetcher = undefined }); + empty -> fetch(State #mqstate { prefetcher = undefined }); {Fetched, Len, Status} -> State2 = #mqstate { msg_buf = MsgBuf2 } = dec_queue_length(Len, State), - deliver(State2 #mqstate - { msg_buf = queue:join(Fetched, MsgBuf2), - prefetcher = case Status of - finished -> undefined; - continuing -> Prefetcher - end }) + fetch(State2 #mqstate + { msg_buf = queue:join(Fetched, MsgBuf2), + prefetcher = case Status of + finished -> undefined; + continuing -> Prefetcher + end }) end end. @@ -407,38 +411,30 @@ remove_noacks(MsgsWithAcks) -> {[AckTag | AccAckTags], size_of_message(Msg) + AccSize} end, {[], 0}, MsgsWithAcks). -ack(MsgsWithAcks, State = #mqstate { queue = Q, memory_size = QSize, - memory_loss = Loss }) -> +ack(MsgsWithAcks, State = #mqstate { queue = Q }) -> {AckTags, ASize} = remove_noacks(MsgsWithAcks), ok = case AckTags of [] -> ok; _ -> rabbit_disk_queue:ack(Q, AckTags) end, - State1 = State #mqstate { memory_size = QSize - ASize, - memory_loss = Loss + ASize }, - {ok, State1}. + {ok, lose_memory(ASize, State)}. tx_publish(Msg = #basic_message { is_persistent = IsPersistent }, - State = #mqstate { mode = Mode, memory_size = QSize, - is_durable = IsDurable, memory_gain = Gain }) + State = #mqstate { mode = Mode, is_durable = IsDurable }) when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> ok = rabbit_disk_queue:tx_publish(Msg), MsgSize = size_of_message(Msg), - {ok, State #mqstate { memory_size = QSize + MsgSize, - memory_gain = Gain + MsgSize }}; -tx_publish(Msg, State = #mqstate { mode = mixed, memory_size = QSize, - memory_gain = Gain }) -> + {ok, gain_memory(MsgSize, State)}; +tx_publish(Msg, State = #mqstate { mode = mixed }) -> %% this message will reappear in the tx_commit, so ignore for now MsgSize = size_of_message(Msg), - {ok, State #mqstate { memory_size = QSize + MsgSize, - memory_gain = Gain + MsgSize }}. + {ok, gain_memory(MsgSize, State)}. only_msg_ids(Pubs) -> lists:map(fun (Msg) -> {Msg #basic_message.guid, false} end, Pubs). tx_commit(Publishes, MsgsWithAcks, State = #mqstate { mode = disk, queue = Q, length = Length, - memory_size = QSize, memory_loss = Loss, msg_buf = MsgBuf }) -> {RealAcks, ASize} = remove_noacks(MsgsWithAcks), ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok; @@ -446,14 +442,12 @@ tx_commit(Publishes, MsgsWithAcks, RealAcks) end, Len = erlang:length(Publishes), - {ok, State #mqstate { length = Length + Len, - msg_buf = inc_queue_length(Q, MsgBuf, Len), - memory_size = QSize - ASize, - memory_loss = Loss + ASize }}; + {ok, lose_memory(ASize, State #mqstate + { length = Length + Len, + msg_buf = inc_queue_length(Q, MsgBuf, Len) })}; tx_commit(Publishes, MsgsWithAcks, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, - is_durable = IsDurable, length = Length, - memory_size = QSize, memory_loss = Loss }) -> + is_durable = IsDurable, length = Length }) -> {PersistentPubs, MsgBuf1} = lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent }, {Acc, MsgBuf2}) -> @@ -471,23 +465,20 @@ tx_commit(Publishes, MsgsWithAcks, false -> rabbit_disk_queue:tx_commit( Q, lists:reverse(PersistentPubs), RealAcks) end, - {ok, State #mqstate { msg_buf = MsgBuf1, memory_size = QSize - ASize, - length = Length + erlang:length(Publishes), - memory_loss = Loss + ASize }}. + {ok, lose_memory(ASize, State #mqstate + { msg_buf = MsgBuf1, + length = Length + erlang:length(Publishes) })}. -tx_cancel(Publishes, State = #mqstate { mode = disk, memory_size = QSize, - memory_loss = Loss }) -> +tx_cancel(Publishes, State = #mqstate { mode = disk }) -> {MsgIds, CSize} = lists:foldl( fun (Msg = #basic_message { guid = MsgId }, {MsgIdsAcc, CSizeAcc}) -> {[MsgId | MsgIdsAcc], CSizeAcc + size_of_message(Msg)} end, {[], 0}, Publishes), ok = rabbit_disk_queue:tx_cancel(MsgIds), - {ok, State #mqstate { memory_size = QSize - CSize, - memory_loss = Loss + CSize }}; -tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable, - memory_size = QSize, - memory_loss = Loss }) -> + {ok, lose_memory(CSize, State)}; +tx_cancel(Publishes, + State = #mqstate { mode = mixed, is_durable = IsDurable }) -> {PersistentPubs, CSize} = lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent, @@ -503,8 +494,7 @@ tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable, rabbit_disk_queue:tx_cancel(PersistentPubs); true -> ok end, - {ok, State #mqstate { memory_size = QSize - CSize, - memory_loss = Loss + CSize }}. + {ok, lose_memory(CSize, State)}. %% [{Msg, AckTag}] requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q, @@ -555,32 +545,30 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, length = Length + erlang:length(MessagesWithAckTags)}}. purge(State = #mqstate { queue = Q, mode = disk, length = Count, - memory_loss = Loss, memory_size = QSize }) -> + memory_size = QSize }) -> Count = rabbit_disk_queue:purge(Q), - {Count, State #mqstate { length = 0, memory_size = 0, - memory_loss = Loss + QSize }}; + {Count, lose_memory(QSize, State)}; purge(State = #mqstate { queue = Q, mode = mixed, length = Length, - memory_loss = Loss, memory_size = QSize, - prefetcher = Prefetcher }) -> + memory_size = QSize, prefetcher = Prefetcher }) -> case Prefetcher of undefined -> ok; _ -> rabbit_queue_prefetcher:drain_and_stop(Prefetcher) end, rabbit_disk_queue:purge(Q), - {Length, - State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0, - memory_loss = Loss + QSize, prefetcher = undefined }}. + {Length, lose_memory(QSize, State #mqstate { msg_buf = queue:new(), + length = 0, + prefetcher = undefined })}. delete_queue(State = #mqstate { queue = Q, memory_size = QSize, - memory_loss = Loss, prefetcher = Prefetcher + prefetcher = Prefetcher }) -> case Prefetcher of undefined -> ok; _ -> rabbit_queue_prefetcher:drain_and_stop(Prefetcher) end, ok = rabbit_disk_queue:delete_queue(Q), - {ok, State #mqstate { length = 0, memory_size = 0, msg_buf = queue:new(), - memory_loss = Loss + QSize, prefetcher = undefined }}. + {ok, lose_memory(QSize, State #mqstate { length = 0, msg_buf = queue:new(), + prefetcher = undefined })}. length(#mqstate { length = Length }) -> Length. diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl index c847848de8..ad6b1ce2d4 100644 --- a/src/rabbit_queue_prefetcher.erl +++ b/src/rabbit_queue_prefetcher.erl @@ -94,7 +94,7 @@ %% to its internal queue. A cast is not sufficient here because the %% mixed_queue could come along, drain the prefetcher, thus %% catching the msg just sent by the disk_queue and then call -%% disk_queue:deliver(Q) which is normal priority call, which could +%% disk_queue:fetch(Q) which is normal priority call, which could %% overtake a reply cast from the prefetcher to the disk queue, %% which would result in the same message being delivered %% twice. Thus when the disk_queue calls prefetcher:publish(Msg), @@ -146,7 +146,7 @@ %% mixed_queue tries to drain the prefetcher. We must therefore ensure %% that this msg can't also be delivered to the mixed_queue directly %% by the disk_queue through the mixed_queue calling -%% disk_queue:deliver(Q) which is why the prefetcher:publish function +%% disk_queue:fetch(Q) which is why the prefetcher:publish function %% is a call and not a cast, thus blocking the disk_queue. %% %% Finally, the prefetcher is only created when the mixed_queue is diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ffd675a0b7..ad5a248314 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -157,7 +157,7 @@ test_simple_n_element_queue(N) -> passed. test_unfold() -> - {[], test} = rabbit_misc:unfold(fun (V) -> false end, test), + {[], test} = rabbit_misc:unfold(fun (_V) -> false end, test), List = lists:seq(2,20,2), {List, 0} = rabbit_misc:unfold(fun (0) -> false; (N) -> {true, N*2, N-1} @@ -848,7 +848,7 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> [begin Remaining = MsgCount - N, {Message, _TSize, false, SeqId, - Remaining} = rabbit_disk_queue:deliver(Q), + Remaining} = rabbit_disk_queue:fetch(Q), ok = rdq_match_message(Message, N, Msg, MsgSizeBytes), SeqId end || N <- List], @@ -895,7 +895,7 @@ rdq_stress_gc(MsgCount) -> fun (MsgId, Acc) -> Remaining = MsgCount - MsgId, {Message, _TSize, false, SeqId, Remaining} = - rabbit_disk_queue:deliver(q), + rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, MsgId, Msg, MsgSizeBytes), dict:store(MsgId, SeqId, Acc) end, dict:new(), List), @@ -904,7 +904,7 @@ rdq_stress_gc(MsgCount) -> rabbit_disk_queue:ack(q, [SeqId]) end || MsgId <- AckList2], rabbit_disk_queue:tx_commit(q, [], []), - empty = rabbit_disk_queue:deliver(q), + empty = rabbit_disk_queue:fetch(q), rdq_stop(), passed. @@ -923,7 +923,7 @@ rdq_test_startup_with_queue_gaps() -> Seqs = [begin Remaining = Total - N, {Message, _TSize, false, SeqId, Remaining} = - rabbit_disk_queue:deliver(q), + rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, N, Msg, 256), SeqId end || N <- lists:seq(1,Half)], @@ -945,7 +945,7 @@ rdq_test_startup_with_queue_gaps() -> Seqs2 = [begin Remaining = round(Total - ((Half + N)/2)), {Message, _TSize, true, SeqId, Remaining} = - rabbit_disk_queue:deliver(q), + rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, N, Msg, 256), SeqId end || N <- lists:seq(2,Half,2)], @@ -955,13 +955,13 @@ rdq_test_startup_with_queue_gaps() -> Seqs3 = [begin Remaining = Total - N, {Message, _TSize, false, SeqId, Remaining} = - rabbit_disk_queue:deliver(q), + rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, N, Msg, 256), SeqId end || N <- lists:seq(1 + Half,Total)], rabbit_disk_queue:tx_commit(q, [], Seqs3), io:format("Read second half done~n", []), - empty = rabbit_disk_queue:deliver(q), + empty = rabbit_disk_queue:fetch(q), rdq_stop(), passed. @@ -980,7 +980,7 @@ rdq_test_redeliver() -> Seqs = [begin Remaining = Total - N, {Message, _TSize, false, SeqId, Remaining} = - rabbit_disk_queue:deliver(q), + rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, N, Msg, 256), SeqId end || N <- lists:seq(1,Half)], @@ -1001,7 +1001,7 @@ rdq_test_redeliver() -> Seqs2 = [begin Remaining = round(Total - N + (Half/2)), {Message, _TSize, false, SeqId, Remaining} = - rabbit_disk_queue:deliver(q), + rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, N, Msg, 256), SeqId end || N <- lists:seq(1+Half, Total)], @@ -1009,12 +1009,12 @@ rdq_test_redeliver() -> Seqs3 = [begin Remaining = round((Half - N) / 2) - 1, {Message, _TSize, true, SeqId, Remaining} = - rabbit_disk_queue:deliver(q), + rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, N, Msg, 256), SeqId end || N <- lists:seq(1, Half, 2)], rabbit_disk_queue:tx_commit(q, [], Seqs3), - empty = rabbit_disk_queue:deliver(q), + empty = rabbit_disk_queue:fetch(q), rdq_stop(), passed. @@ -1033,7 +1033,7 @@ rdq_test_purge() -> Seqs = [begin Remaining = Total - N, {Message, _TSize, false, SeqId, Remaining} = - rabbit_disk_queue:deliver(q), + rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, N, Msg, 256), SeqId end || N <- lists:seq(1,Half)], @@ -1042,7 +1042,7 @@ rdq_test_purge() -> io:format("Purge done~n", []), rabbit_disk_queue:tx_commit(q, [], Seqs), io:format("Ack first half done~n", []), - empty = rabbit_disk_queue:deliver(q), + empty = rabbit_disk_queue:fetch(q), rdq_stop(), passed. @@ -1051,7 +1051,7 @@ rdq_new_mixed_queue(Q, Durable, Disk) -> {MS1, _, _, _} = rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS), case Disk of - true -> {ok, MS2} = rabbit_mixed_queue:to_disk_only_mode([], MS1), + true -> {ok, MS2} = rabbit_mixed_queue:set_mode(disk, [], MS1), MS2; false -> MS1 end. @@ -1083,11 +1083,11 @@ rdq_test_mixed_queue_modes() -> 30 = rabbit_mixed_queue:length(MS6), io:format("Published a mixture of messages; ~w~n", [rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS6)]), - {ok, MS7} = rabbit_mixed_queue:to_disk_only_mode([], MS6), + {ok, MS7} = rabbit_mixed_queue:set_mode(disk, [], MS6), 30 = rabbit_mixed_queue:length(MS7), io:format("Converted to disk only mode; ~w~n", [rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS7)]), - {ok, MS8} = rabbit_mixed_queue:to_mixed_mode([], MS7), + {ok, MS8} = rabbit_mixed_queue:set_mode(mixed, [], MS7), 30 = rabbit_mixed_queue:length(MS8), io:format("Converted to mixed mode; ~w~n", [rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS8)]), @@ -1097,12 +1097,12 @@ rdq_test_mixed_queue_modes() -> Rem = 30 - N, {{#basic_message { is_persistent = false }, false, _AckTag, Rem}, - MS9a} = rabbit_mixed_queue:deliver(MS9), + MS9a} = rabbit_mixed_queue:fetch(MS9), MS9a end, MS8, lists:seq(1,10)), 20 = rabbit_mixed_queue:length(MS10), io:format("Delivered initial non persistent messages~n"), - {ok, MS11} = rabbit_mixed_queue:to_disk_only_mode([], MS10), + {ok, MS11} = rabbit_mixed_queue:set_mode(disk, [], MS10), 20 = rabbit_mixed_queue:length(MS11), io:format("Converted to disk only mode~n"), rdq_stop(), @@ -1116,13 +1116,13 @@ rdq_test_mixed_queue_modes() -> Rem = 10 - N, {{Msg = #basic_message { is_persistent = true }, false, AckTag, Rem}, - MS13a} = rabbit_mixed_queue:deliver(MS13), + MS13a} = rabbit_mixed_queue:fetch(MS13), {MS13a, [{Msg, AckTag} | AcksAcc]} end, {MS12, []}, lists:seq(1,10)), 0 = rabbit_mixed_queue:length(MS14), {ok, MS15} = rabbit_mixed_queue:ack(AckTags, MS14), io:format("Delivered and acked all messages~n"), - {ok, MS16} = rabbit_mixed_queue:to_disk_only_mode([], MS15), + {ok, MS16} = rabbit_mixed_queue:set_mode(disk, [], MS15), 0 = rabbit_mixed_queue:length(MS16), io:format("Converted to disk only mode~n"), rdq_stop(), @@ -1149,28 +1149,28 @@ rdq_test_mode_conversion_mid_txn() -> rdq_start(), MS0 = rdq_new_mixed_queue(q, true, false), passed = rdq_tx_publish_mixed_alter_commit_get( - MS0, MsgsA, MsgsB, fun rabbit_mixed_queue:to_disk_only_mode/2, commit), + MS0, MsgsA, MsgsB, disk, commit), rdq_stop_virgin_start(), MS1 = rdq_new_mixed_queue(q, true, false), passed = rdq_tx_publish_mixed_alter_commit_get( - MS1, MsgsA, MsgsB, fun rabbit_mixed_queue:to_disk_only_mode/2, cancel), + MS1, MsgsA, MsgsB, disk, cancel), rdq_stop_virgin_start(), MS2 = rdq_new_mixed_queue(q, true, true), passed = rdq_tx_publish_mixed_alter_commit_get( - MS2, MsgsA, MsgsB, fun rabbit_mixed_queue:to_mixed_mode/2, commit), + MS2, MsgsA, MsgsB, mixed, commit), rdq_stop_virgin_start(), MS3 = rdq_new_mixed_queue(q, true, true), passed = rdq_tx_publish_mixed_alter_commit_get( - MS3, MsgsA, MsgsB, fun rabbit_mixed_queue:to_mixed_mode/2, cancel), + MS3, MsgsA, MsgsB, mixed, cancel), rdq_stop(), passed. -rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, ChangeFun, CommitOrCancel) -> +rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) -> 0 = rabbit_mixed_queue:length(MS0), MS2 = lists:foldl( fun (Msg, MS1) -> @@ -1185,7 +1185,7 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, ChangeFun, CommitOrCanc MS3a end, MS2, MsgsB), Len0 = rabbit_mixed_queue:length(MS4), - {ok, MS5} = ChangeFun(MsgsB, MS4), + {ok, MS5} = rabbit_mixed_queue:set_mode(Mode, MsgsB, MS4), Len0 = rabbit_mixed_queue:length(MS5), {ok, MS9} = case CommitOrCancel of @@ -1198,7 +1198,7 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, ChangeFun, CommitOrCanc fun (Msg, {Acc, MS7}) -> Rem = Len1 - (Msg #basic_message.guid) - 1, {{Msg, false, AckTag, Rem}, MS7a} = - rabbit_mixed_queue:deliver(MS7), + rabbit_mixed_queue:fetch(MS7), {[{Msg, AckTag} | Acc], MS7a} end, {[], MS6}, MsgsA ++ MsgsB), 0 = rabbit_mixed_queue:length(MS8), @@ -1211,7 +1211,7 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, ChangeFun, CommitOrCanc fun (Msg, {Acc, MS7}) -> Rem = Len0 - (Msg #basic_message.guid) - 1, {{Msg, false, AckTag, Rem}, MS7a} = - rabbit_mixed_queue:deliver(MS7), + rabbit_mixed_queue:fetch(MS7), {[{Msg, AckTag} | Acc], MS7a} end, {[], MS6}, MsgsA), 0 = rabbit_mixed_queue:length(MS8), @@ -1244,7 +1244,7 @@ rdq_test_disk_queue_modes() -> Seqs = [begin Remaining = Total - N, {Message, _TSize, false, SeqId, Remaining} = - rabbit_disk_queue:deliver(q), + rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, N, Msg, 256), SeqId end || N <- Half1], @@ -1254,7 +1254,7 @@ rdq_test_disk_queue_modes() -> Seqs2 = [begin Remaining = Total - N, {Message, _TSize, false, SeqId, Remaining} = - rabbit_disk_queue:deliver(q), + rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, N, Msg, 256), SeqId end || N <- Half2], @@ -1262,7 +1262,7 @@ rdq_test_disk_queue_modes() -> ok = rabbit_disk_queue:tx_commit(q, [], Seqs), ok = rabbit_disk_queue:to_disk_only_mode(), ok = rabbit_disk_queue:tx_commit(q, [], Seqs2), - empty = rabbit_disk_queue:deliver(q), + empty = rabbit_disk_queue:fetch(q), rdq_stop(), passed. |
