diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-17 12:52:35 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-17 12:52:35 +0100 |
| commit | 0f044b9ee0564b24668c1ef3995248bea6e19c51 (patch) | |
| tree | c8db24bec5ead094d65a8fbb6390b83cb4be490e | |
| parent | 1e1f45c489012726832f68faf2f0ea6a76623c6a (diff) | |
| download | rabbitmq-server-git-0f044b9ee0564b24668c1ef3995248bea6e19c51.tar.gz | |
fixed line lengths
| -rw-r--r-- | src/rabbit.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 72 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 71 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_queue_mode_manager.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 93 |
10 files changed, 177 insertions, 116 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 2eecac5ed3..fbadc5f2c5 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -158,9 +158,9 @@ start(normal, []) -> ok = rabbit_exchange:recover(), {ok, DurableQueues} = rabbit_amqqueue:recover(), DurableQueueNames = - sets:from_list(lists:map( - fun(Q) -> Q #amqqueue.name end, DurableQueues)), - ok = rabbit_disk_queue:delete_non_durable_queues(DurableQueueNames) + sets:from_list([ Q #amqqueue.name || Q <- DurableQueues ]), + ok = rabbit_disk_queue:delete_non_durable_queues( + DurableQueueNames) end}, {"guid generator", fun () -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 01d40aa1a8..a1f36f31cb 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -136,12 +136,15 @@ recover_durable_queues() -> %% another node has deleted the queue (and possibly %% re-created it). case rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:match_object( - rabbit_durable_queue, RecoveredQ, read) of - [_] -> ok = store_queue(Q), - true; - [] -> false - end + fun () -> + Match = + mnesia:match_object( + rabbit_durable_queue, RecoveredQ, read), + case Match of + [_] -> ok = store_queue(Q), + true; + [] -> false + end end) of true -> [Q|Acc]; false -> exit(Q#amqqueue.pid, shutdown), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a701fa4d1d..6fc3166432 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -40,7 +40,8 @@ -export([start_link/1]). --export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). +-export([init/1, terminate/2, code_change/3, + handle_call/3, handle_cast/2, handle_info/2]). -import(queue). -import(erlang). @@ -191,10 +192,11 @@ deliver_queue(Fun, FunAcc0, rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, {QName, self(), NextId, IsDelivered, Msg}), - NewUAM = case AckRequired of - true -> dict:store(NextId, {Msg, AckTag}, UAM); - false -> UAM - end, + NewUAM = + case AckRequired of + true -> dict:store(NextId, {Msg, AckTag}, UAM); + false -> UAM + end, NewC = C#cr{unsent_message_count = Count + 1, unacked_messages = NewUAM}, store_ch_record(NewC), @@ -210,9 +212,10 @@ deliver_queue(Fun, FunAcc0, {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - State3 = State2 #q { active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers, - next_msg_id = NextId + 1 + State3 = State2 #q { + active_consumers = NewActiveConsumers, + blocked_consumers = NewBlockedConsumers, + next_msg_id = NextId + 1 }, if Remaining == 0 -> {FunAcc1, State3}; true -> deliver_queue(Fun, FunAcc1, State3) @@ -238,7 +241,8 @@ deliver_queue(Fun, FunAcc0, deliver_from_queue(is_message_ready, undefined, #q { mixed_state = MS }) -> not rabbit_mixed_queue:is_empty(MS); -deliver_from_queue(AckRequired, Acc = undefined, State = #q { mixed_state = MS }) -> +deliver_from_queue(AckRequired, Acc = undefined, + State = #q { mixed_state = MS }) -> {Res, MS2} = rabbit_mixed_queue:deliver(MS), MS3 = case {Res, AckRequired} of {_, true} -> MS2; @@ -250,7 +254,8 @@ deliver_from_queue(AckRequired, Acc = undefined, State = #q { mixed_state = MS } {Res, Acc, State #q { mixed_state = MS3 }}. run_message_queue(State) -> - {undefined, State2} = deliver_queue(fun deliver_from_queue/3, undefined, State), + {undefined, State2} = + deliver_queue(fun deliver_from_queue/3, undefined, State), State2. attempt_immediate_delivery(none, _ChPid, Msg, State) -> @@ -260,8 +265,9 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) -> (AckRequired, false, State2) -> {AckTag, State3} = if AckRequired -> - {ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered( - Msg, State2 #q.mixed_state), + {ok, AckTag2, MS} = + rabbit_mixed_queue:publish_delivered( + Msg, State2 #q.mixed_state), {AckTag2, State2 #q { mixed_state = MS }}; true -> {noack, State2} @@ -290,19 +296,24 @@ deliver_or_requeue_n([], State) -> run_message_queue(State); deliver_or_requeue_n(MsgsWithAcks, State) -> {{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} = - deliver_queue(fun deliver_or_requeue_msgs/3, {length(MsgsWithAcks) - 1, [], MsgsWithAcks}, State), - {ok, MS} = rabbit_mixed_queue:ack(lists:reverse(AutoAcks), NewState #q.mixed_state), + deliver_queue(fun deliver_or_requeue_msgs/3, + {length(MsgsWithAcks) - 1, [], MsgsWithAcks}, State), + {ok, MS} = rabbit_mixed_queue:ack(lists:reverse(AutoAcks), + NewState #q.mixed_state), case OutstandingMsgs of [] -> run_message_queue(NewState #q { mixed_state = MS }); _ -> {ok, MS2} = rabbit_mixed_queue:requeue(OutstandingMsgs, MS), NewState #q { mixed_state = MS2 } end. -deliver_or_requeue_msgs(is_message_ready, {Len, _AcksAcc, _MsgsWithAcks}, _State) -> +deliver_or_requeue_msgs(is_message_ready, {Len, _AcksAcc, _MsgsWithAcks}, + _State) -> -1 < Len; -deliver_or_requeue_msgs(false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> +deliver_or_requeue_msgs(false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, + State) -> {{Msg, true, noack, Len}, {Len - 1, [AckTag|AcksAcc], MsgsWithAcks}, State}; -deliver_or_requeue_msgs(true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> +deliver_or_requeue_msgs(true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, + State) -> {{Msg, true, AckTag, Len}, {Len - 1, AcksAcc, MsgsWithAcks}, State}. add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). @@ -426,8 +437,10 @@ all_tx_record() -> all_tx() -> [Txn || {{txn, Txn}, _} <- get()]. -record_pending_message(Txn, ChPid, Message = #basic_message { is_persistent = IsPersistent }) -> - Tx = #tx{pending_messages = Pending, is_persistent = IsPersistentTxn } = lookup_tx(Txn), +record_pending_message(Txn, ChPid, Message = + #basic_message { is_persistent = IsPersistent }) -> + Tx = #tx{pending_messages = Pending, is_persistent = IsPersistentTxn } = + lookup_tx(Txn), record_current_channel_tx(ChPid, Txn), store_tx(Txn, Tx #tx { pending_messages = [Message | Pending], is_persistent = IsPersistentTxn orelse IsPersistent @@ -465,7 +478,8 @@ commit_transaction(Txn, State) -> rollback_transaction(Txn, State) -> #tx { pending_messages = PendingMessages } = lookup_tx(Txn), - {ok, MS} = rabbit_mixed_queue:tx_cancel(lists:reverse(PendingMessages), State #q.mixed_state), + {ok, MS} = rabbit_mixed_queue:tx_cancel(lists:reverse(PendingMessages), + State #q.mixed_state), erase_tx(Txn), State #q { mixed_state = MS }. @@ -534,7 +548,8 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_immediate_delivery(Txn, ChPid, Message, State), + {Delivered, NewState} = + attempt_immediate_delivery(Txn, ChPid, Message, State), reply(Delivered, NewState); handle_call({deliver, Txn, Message, ChPid}, _From, State) -> @@ -682,8 +697,8 @@ handle_call(purge, _From, State) -> reply({ok, Count}, State #q { mixed_state = MS }); -handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, - exclusive_consumer = Holder}) -> +handle_call({claim_queue, ReaderPid}, _From, + State = #q{owner = Owner, exclusive_consumer = Holder}) -> case Owner of none -> case check_exclusive_access(Holder, true, State) of @@ -696,7 +711,10 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, %% pid... reply(locked, State); ok -> - reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}) + reply(ok, State #q { owner = + {ReaderPid, + erlang:monitor(process, ReaderPid)} }) + end; {ReaderPid, _MonitorRef} -> reply(ok, State); @@ -717,8 +735,10 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM), case Txn of none -> - Acks = lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks), - {ok, MS} = rabbit_mixed_queue:ack(Acks, State #q.mixed_state), + Acks = lists:map(fun ({_Msg, AckTag}) -> AckTag end, + MsgWithAcks), + {ok, MS} = + rabbit_mixed_queue:ack(Acks, State #q.mixed_state), store_ch_record(C#cr{unacked_messages = Remaining}), noreply(State #q { mixed_state = MS }); _ -> diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 3b30a0da82..f609063432 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -46,7 +46,8 @@ -export([length/1, is_empty/1, next_write_seq/1]). --export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]). +-export([stop/0, stop_and_obliterate/0, + to_disk_only_mode/0, to_ram_disk_mode/0]). -include("rabbit.hrl"). @@ -67,20 +68,22 @@ -define(MAX_READ_FILE_HANDLES, 256). -define(FILE_SIZE_LIMIT, (256*1024*1024)). --record(dqstate, {msg_location_dets, %% where are messages? - msg_location_ets, %% as above, but for ets version - operation_mode, %% ram_disk | disk_only - file_summary, %% what's in the files? - sequences, %% next read and write for each q - current_file_num, %% current file name as number - current_file_name, %% current file name - current_file_handle, %% current file handle - current_offset, %% current offset within current file - current_dirty, %% has the current file been written to since the last fsync? - file_size_limit, %% how big can our files get? - read_file_handles, %% file handles for reading (LRU) - read_file_handles_limit %% how many file handles can we open? - }). +-record(dqstate, + {msg_location_dets, %% where are messages? + msg_location_ets, %% as above, but for ets version + operation_mode, %% ram_disk | disk_only + file_summary, %% what's in the files? + sequences, %% next read and write for each q + current_file_num, %% current file name as number + current_file_name, %% current file name + current_file_handle, %% current file handle + current_offset, %% current offset within current file + current_dirty, %% has the current file been written to + %% since the last fsync? + file_size_limit, %% how big can our files get? + read_file_handles, %% file handles for reading (LRU) + read_file_handles_limit %% how many file handles can we open? + }). %% The components: %% @@ -233,23 +236,28 @@ -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(publish/4 :: (queue_name(), msg_id(), binary(), bool()) -> 'ok'). --spec(publish_with_seq/5 :: (queue_name(), msg_id(), seq_id_or_next(), binary(), bool()) -> 'ok'). +-spec(publish_with_seq/5 :: (queue_name(), msg_id(), seq_id_or_next(), binary(), + bool()) -> 'ok'). -spec(deliver/1 :: (queue_name()) -> {'empty' | {msg_id(), binary(), non_neg_integer(), bool(), {msg_id(), seq_id()}, non_neg_integer()}}). -spec(phantom_deliver/1 :: (queue_name()) -> - { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}, non_neg_integer()}}). + { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}, + non_neg_integer()}}). -spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok'). --spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) -> 'ok'). +-spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) -> + 'ok'). -spec(tx_commit_with_seqs/3 :: (queue_name(), [{msg_id(), seq_id_or_next()}], [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). -spec(requeue/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). --spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id_or_next()}]) -> 'ok'). +-spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, + seq_id_or_next()}]) -> 'ok'). -spec(purge/1 :: (queue_name()) -> non_neg_integer()). --spec(dump_queue/1 :: (queue_name()) -> [{msg_id(), binary(), non_neg_integer(), - bool(), {msg_id(), seq_id()}, seq_id()}]). +-spec(dump_queue/1 :: (queue_name()) -> + [{msg_id(), binary(), non_neg_integer(), bool(), + {msg_id(), seq_id()}, seq_id()}]). -spec(delete_non_durable_queues/1 :: (set()) -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). @@ -321,7 +329,8 @@ dump_queue(Q) -> gen_server2:call(?SERVER, {dump_queue, Q}, infinity). delete_non_durable_queues(DurableQueues) -> - gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues}, infinity). + gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues}, + infinity). stop() -> gen_server2:call(?SERVER, stop, infinity). @@ -483,7 +492,8 @@ handle_call(to_ram_disk_mode, _From, handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) -> {_ReadSeqId, _WriteSeqId, Length} = sequence_lookup(Sequences, Q), {reply, Length, State}; -handle_call({next_write_seq, Q}, _From, State = #dqstate { sequences = Sequences }) -> +handle_call({next_write_seq, Q}, _From, + State = #dqstate { sequences = Sequences }) -> {_ReadSeqId, WriteSeqId, _Length} = sequence_lookup(Sequences, Q), {reply, WriteSeqId, State}; handle_call({dump_queue, Q}, _From, State) -> @@ -494,10 +504,12 @@ handle_call({delete_non_durable_queues, DurableQueues}, _From, State) -> {reply, ok, State1}. handle_cast({publish, Q, MsgId, MsgBody}, State) -> - {ok, _MsgSeqId, State1} = internal_publish(Q, MsgId, next, MsgBody, false, State), + {ok, _MsgSeqId, State1} = + internal_publish(Q, MsgId, next, MsgBody, false, State), {noreply, State1}; handle_cast({publish_with_seq, Q, MsgId, SeqId, MsgBody}, State) -> - {ok, _MsgSeqId, State1} = internal_publish(Q, MsgId, SeqId, MsgBody, false, State), + {ok, _MsgSeqId, State1} = + internal_publish(Q, MsgId, SeqId, MsgBody, false, State), {noreply, State1}; handle_cast({ack, Q, MsgSeqIds}, State) -> {ok, State1} = internal_ack(Q, MsgSeqIds, State), @@ -702,14 +714,16 @@ sequence_lookup(Sequences, Q) -> %% ---- INTERNAL RAW FUNCTIONS ---- -internal_deliver(Q, ReadMsg, FakeDeliver, State = #dqstate { sequences = Sequences }) -> +internal_deliver(Q, ReadMsg, FakeDeliver, + State = #dqstate { sequences = Sequences }) -> case ets:lookup(Sequences, Q) of [] -> {ok, empty, State}; [{Q, SeqId, SeqId, 0}] -> {ok, empty, State}; [{Q, ReadSeqId, WriteSeqId, Length}] when Length > 0 -> Remaining = Length - 1, {ok, Result, NextReadSeqId, State1} = - internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State), + internal_read_message( + Q, ReadSeqId, FakeDeliver, ReadMsg, State), true = ets:insert(Sequences, {Q, NextReadSeqId, WriteSeqId, Remaining}), {ok, @@ -873,7 +887,8 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, _TotalSize}] = dets_ets_lookup(State, MsgId), SeqId2 = adjust_last_msg_seq_id( Q, ExpectedSeqId, SeqId, write), - NextSeqId2 = find_next_seq_id(SeqId2, NextSeqId), + NextSeqId2 = + find_next_seq_id(SeqId2, NextSeqId), ok = mnesia:write( rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 9f3dcbd071..92078acd40 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -101,8 +101,10 @@ ack(LimiterPid, Count) -> gen_server2:cast(LimiterPid, {ack, Count}). register(undefined, _QPid) -> ok; register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}). -unregister(undefined, _QPid) -> ok; -unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}). +unregister(undefined, _QPid) -> + ok; +unregister(LimiterPid, QPid) -> + gen_server2:cast(LimiterPid, {unregister, QPid}). %%---------------------------------------------------------------------------- %% gen_server callbacks diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index c965c69314..bf4a69db2a 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -349,7 +349,8 @@ dirty_foreach_key1(F, TableName, K) -> end. dirty_dump_log(FileName) -> - {ok, LH} = disk_log:open([{name, dirty_dump_log}, {mode, read_only}, {file, FileName}]), + {ok, LH} = disk_log:open([{name, dirty_dump_log}, {mode, read_only}, + {file, FileName}]), dirty_dump_log1(LH, disk_log:chunk(LH, start)), disk_log:close(LH). diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 5082fe55d3..5933357cdd 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -118,7 +118,8 @@ purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q, {Acks, Requeue, Length} = deliver_all_messages(Q, IsDurable, [], [], 0), ok = if Requeue == [] -> ok; - true -> rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue)) + true -> + rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue)) end, ok = if Acks == [] -> ok; true -> rabbit_disk_queue:ack(Q, lists:reverse(Acks)) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 77e309fe84..9e34158427 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -188,7 +188,8 @@ ensure_mnesia_not_running() -> check_schema_integrity() -> %%TODO: more thorough checks - case catch [mnesia:table_info(Tab, version) || Tab <- replicated_table_names()] of + case catch [mnesia:table_info(Tab, version) + || Tab <- replicated_table_names()] of {'EXIT', Reason} -> {error, Reason}; _ -> ok end. @@ -353,12 +354,14 @@ create_local_table_copies(Type) -> HasDiscCopies = case lists:keysearch(disc_copies, 1, TabDef) of false -> false; - {value, {disc_copies, List1}} -> lists:member(node(), List1) + {value, {disc_copies, List1}} -> + lists:member(node(), List1) end, HasDiscOnlyCopies = case lists:keysearch(disc_only_copies, 1, TabDef) of false -> false; - {value, {disc_only_copies, List2}} -> lists:member(node(), List2) + {value, {disc_only_copies, List2}} -> + lists:member(node(), List2) end, StorageType = case Type of diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl index 32ad6b4cfe..c37cb842a2 100644 --- a/src/rabbit_queue_mode_manager.erl +++ b/src/rabbit_queue_mode_manager.erl @@ -69,28 +69,29 @@ init([]) -> queues = [] }}. -handle_call({register, Pid}, _From, State = #state { queues = Qs, mode = Mode }) -> +handle_call({register, Pid}, _From, + State = #state { queues = Qs, mode = Mode }) -> Result = case Mode of unlimited -> mixed; _ -> disk end, {reply, {ok, Result}, State #state { queues = [Pid | Qs] }}. -handle_cast({change_memory_usage, true}, State = #state { mode = disk_only }) -> +handle_cast({change_memory_usage, true}, State = #state { mode=disk_only }) -> {noreply, State}; -handle_cast({change_memory_usage, true}, State = #state { mode = ram_disk }) -> +handle_cast({change_memory_usage, true}, State = #state { mode=ram_disk }) -> constrain_queues(true, State #state.queues), {noreply, State #state { mode = disk_only }}; -handle_cast({change_memory_usage, true}, State = #state { mode = unlimited }) -> +handle_cast({change_memory_usage, true}, State = #state { mode=unlimited }) -> ok = rabbit_disk_queue:to_disk_only_mode(), {noreply, State #state { mode = ram_disk }}; -handle_cast({change_memory_usage, false}, State = #state { mode = unlimited }) -> +handle_cast({change_memory_usage, false}, State = #state { mode=unlimited }) -> {noreply, State}; -handle_cast({change_memory_usage, false}, State = #state { mode = ram_disk }) -> +handle_cast({change_memory_usage, false}, State = #state { mode=ram_disk }) -> ok = rabbit_disk_queue:to_ram_disk_mode(), {noreply, State #state { mode = unlimited }}; -handle_cast({change_memory_usage, false}, State = #state { mode = disk_only }) -> +handle_cast({change_memory_usage, false}, State = #state { mode=disk_only }) -> constrain_queues(false, State #state.queues), {noreply, State #state { mode = ram_disk }}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a2a31a181a..62d5c03ac1 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -730,17 +730,18 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> || Q <- Qs] end ]]), {Deliver, ok} = - timer:tc(?MODULE, rdq_time_commands, - [[fun() -> [begin SeqIds = - [begin - Remaining = MsgCount - N, - {N, Msg, MsgSizeBytes, false, SeqId, Remaining} = - rabbit_disk_queue:deliver(Q), - SeqId - end || N <- List], - ok = rabbit_disk_queue:tx_commit(Q, [], SeqIds) - end || Q <- Qs] - end]]), + timer:tc( + ?MODULE, rdq_time_commands, + [[fun() -> [begin SeqIds = + [begin + Remaining = MsgCount - N, + {N, Msg, MsgSizeBytes, false, SeqId, + Remaining} = rabbit_disk_queue:deliver(Q), + SeqId + end || N <- List], + ok = rabbit_disk_queue:tx_commit(Q, [], SeqIds) + end || Q <- Qs] + end]]), io:format(" ~15.10B| ~14.10B| ~14.10B| ~14.1f| ~14.1f| ~14.6f| ~14.10f| ~14.1f| ~14.6f| ~14.10f~n", [MsgCount, MsgSizeBytes, QCount, float(Startup), float(Publish), (Publish / (MsgCount * QCount)), @@ -749,8 +750,9 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> (Deliver / (MsgCount * QCount * MsgSizeBytes))]), rdq_stop(). -% we know each file is going to be 1024*1024*10 bytes in size (10MB), so make sure we have -% several files, and then keep punching holes in a reasonably sensible way. +% we know each file is going to be 1024*1024*10 bytes in size (10MB), +% so make sure we have several files, and then keep punching holes in +% a reasonably sensible way. rdq_stress_gc(MsgCount) -> rdq_virgin(), rdq_start(), @@ -804,7 +806,8 @@ rdq_test_startup_with_queue_gaps() -> %% deliver first half Seqs = [begin Remaining = Total - N, - {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), SeqId + {N, Msg, 256, false, SeqId, Remaining} = + rabbit_disk_queue:deliver(q), SeqId end || N <- lists:seq(1,Half)], io:format("Deliver first half done~n", []), %% ack every other message we have delivered (starting at the _first_) @@ -819,10 +822,12 @@ rdq_test_startup_with_queue_gaps() -> rdq_stop(), rdq_start(), io:format("Startup (with shuffle) done~n", []), - %% should have shuffled up. So we should now get lists:seq(2,500,2) already delivered + %% should have shuffled up. So we should now get + %% lists:seq(2,500,2) already delivered Seqs2 = [begin Remaining = round(Total - ((Half + N)/2)), - {N, Msg, 256, true, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + {N, Msg, 256, true, SeqId, Remaining} = + rabbit_disk_queue:deliver(q), SeqId end || N <- lists:seq(2,Half,2)], rabbit_disk_queue:tx_commit(q, [], Seqs2), @@ -830,7 +835,8 @@ rdq_test_startup_with_queue_gaps() -> %% and now fetch the rest Seqs3 = [begin Remaining = Total - N, - {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + {N, Msg, 256, false, SeqId, Remaining} = + rabbit_disk_queue:deliver(q), SeqId end || N <- lists:seq(1 + Half,Total)], rabbit_disk_queue:tx_commit(q, [], Seqs3), @@ -852,7 +858,8 @@ rdq_test_redeliver() -> %% deliver first half Seqs = [begin Remaining = Total - N, - {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + {N, Msg, 256, false, SeqId, Remaining} = + rabbit_disk_queue:deliver(q), SeqId end || N <- lists:seq(1,Half)], io:format("Deliver first half done~n", []), @@ -867,16 +874,19 @@ rdq_test_redeliver() -> end, true, Seqs), rabbit_disk_queue:tx_commit(q, [], []), io:format("Redeliver and acking done~n", []), - %% we should now get the 2nd half in order, followed by every-other-from-the-first-half + %% we should now get the 2nd half in order, followed by + %% every-other-from-the-first-half Seqs2 = [begin Remaining = round(Total - N + (Half/2)), - {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + {N, Msg, 256, false, SeqId, Remaining} = + rabbit_disk_queue:deliver(q), SeqId end || N <- lists:seq(1+Half, Total)], rabbit_disk_queue:tx_commit(q, [], Seqs2), Seqs3 = [begin Remaining = round((Half - N) / 2) - 1, - {N, Msg, 256, true, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + {N, Msg, 256, true, SeqId, Remaining} = + rabbit_disk_queue:deliver(q), SeqId end || N <- lists:seq(1, Half, 2)], rabbit_disk_queue:tx_commit(q, [], Seqs3), @@ -897,7 +907,8 @@ rdq_test_purge() -> %% deliver first half Seqs = [begin Remaining = Total - N, - {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + {N, Msg, 256, false, SeqId, Remaining} = + rabbit_disk_queue:deliver(q), SeqId end || N <- lists:seq(1,Half)], io:format("Deliver first half done~n", []), @@ -926,7 +937,8 @@ rdq_test_dump_queue() -> lists:foreach( fun (N) -> Remaining = Total - N, - {N, Msg, 256, false, _SeqId, Remaining} = rabbit_disk_queue:deliver(q) + {N, Msg, 256, false, _SeqId, Remaining} = + rabbit_disk_queue:deliver(q) end, All), [] = rabbit_disk_queue:dump_queue(q), rdq_stop(), @@ -943,22 +955,25 @@ rdq_test_mixed_queue_modes() -> rdq_start(), Payload = <<0:(8*256)>>, {ok, MS} = rabbit_mixed_queue:start_link(q, true, mixed), - MS2 = lists:foldl(fun (_N, MS1) -> - Msg = rabbit_basic:message(x, <<>>, <<>>, Payload), - {ok, MS1a} = rabbit_mixed_queue:publish(Msg, MS1), - MS1a - end, MS, lists:seq(1,10)), - MS4 = lists:foldl(fun (_N, MS3) -> - Msg = (rabbit_basic:message(x, <<>>, <<>>, Payload)) - #basic_message { is_persistent = true }, - {ok, MS3a} = rabbit_mixed_queue:publish(Msg, MS3), - MS3a - end, MS2, lists:seq(1,10)), - MS6 = lists:foldl(fun (_N, MS5) -> - Msg = rabbit_basic:message(x, <<>>, <<>>, Payload), - {ok, MS5a} = rabbit_mixed_queue:publish(Msg, MS5), - MS5a - end, MS4, lists:seq(1,10)), + MS2 = lists:foldl( + fun (_N, MS1) -> + Msg = rabbit_basic:message(x, <<>>, <<>>, Payload), + {ok, MS1a} = rabbit_mixed_queue:publish(Msg, MS1), + MS1a + end, MS, lists:seq(1,10)), + MS4 = lists:foldl( + fun (_N, MS3) -> + Msg = (rabbit_basic:message(x, <<>>, <<>>, Payload)) + #basic_message { is_persistent = true }, + {ok, MS3a} = rabbit_mixed_queue:publish(Msg, MS3), + MS3a + end, MS2, lists:seq(1,10)), + MS6 = lists:foldl( + fun (_N, MS5) -> + Msg = rabbit_basic:message(x, <<>>, <<>>, Payload), + {ok, MS5a} = rabbit_mixed_queue:publish(Msg, MS5), + MS5a + end, MS4, lists:seq(1,10)), 30 = rabbit_mixed_queue:length(MS6), io:format("Published a mixture of messages~n"), {ok, MS7} = rabbit_mixed_queue:to_disk_only_mode(MS6), |
