summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-17 12:52:35 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-17 12:52:35 +0100
commit0f044b9ee0564b24668c1ef3995248bea6e19c51 (patch)
treec8db24bec5ead094d65a8fbb6390b83cb4be490e
parent1e1f45c489012726832f68faf2f0ea6a76623c6a (diff)
downloadrabbitmq-server-git-0f044b9ee0564b24668c1ef3995248bea6e19c51.tar.gz
fixed line lengths
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_amqqueue.erl15
-rw-r--r--src/rabbit_amqqueue_process.erl72
-rw-r--r--src/rabbit_disk_queue.erl71
-rw-r--r--src/rabbit_limiter.erl6
-rw-r--r--src/rabbit_misc.erl3
-rw-r--r--src/rabbit_mixed_queue.erl3
-rw-r--r--src/rabbit_mnesia.erl9
-rw-r--r--src/rabbit_queue_mode_manager.erl15
-rw-r--r--src/rabbit_tests.erl93
10 files changed, 177 insertions, 116 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 2eecac5ed3..fbadc5f2c5 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -158,9 +158,9 @@ start(normal, []) ->
ok = rabbit_exchange:recover(),
{ok, DurableQueues} = rabbit_amqqueue:recover(),
DurableQueueNames =
- sets:from_list(lists:map(
- fun(Q) -> Q #amqqueue.name end, DurableQueues)),
- ok = rabbit_disk_queue:delete_non_durable_queues(DurableQueueNames)
+ sets:from_list([ Q #amqqueue.name || Q <- DurableQueues ]),
+ ok = rabbit_disk_queue:delete_non_durable_queues(
+ DurableQueueNames)
end},
{"guid generator",
fun () ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 01d40aa1a8..a1f36f31cb 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -136,12 +136,15 @@ recover_durable_queues() ->
%% another node has deleted the queue (and possibly
%% re-created it).
case rabbit_misc:execute_mnesia_transaction(
- fun () -> case mnesia:match_object(
- rabbit_durable_queue, RecoveredQ, read) of
- [_] -> ok = store_queue(Q),
- true;
- [] -> false
- end
+ fun () ->
+ Match =
+ mnesia:match_object(
+ rabbit_durable_queue, RecoveredQ, read),
+ case Match of
+ [_] -> ok = store_queue(Q),
+ true;
+ [] -> false
+ end
end) of
true -> [Q|Acc];
false -> exit(Q#amqqueue.pid, shutdown),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a701fa4d1d..6fc3166432 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -40,7 +40,8 @@
-export([start_link/1]).
--export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
+-export([init/1, terminate/2, code_change/3,
+ handle_call/3, handle_cast/2, handle_info/2]).
-import(queue).
-import(erlang).
@@ -191,10 +192,11 @@ deliver_queue(Fun, FunAcc0,
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
{QName, self(), NextId, IsDelivered, Msg}),
- NewUAM = case AckRequired of
- true -> dict:store(NextId, {Msg, AckTag}, UAM);
- false -> UAM
- end,
+ NewUAM =
+ case AckRequired of
+ true -> dict:store(NextId, {Msg, AckTag}, UAM);
+ false -> UAM
+ end,
NewC = C#cr{unsent_message_count = Count + 1,
unacked_messages = NewUAM},
store_ch_record(NewC),
@@ -210,9 +212,10 @@ deliver_queue(Fun, FunAcc0,
{ActiveConsumers1,
queue:in(QEntry, BlockedConsumers1)}
end,
- State3 = State2 #q { active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers,
- next_msg_id = NextId + 1
+ State3 = State2 #q {
+ active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedConsumers,
+ next_msg_id = NextId + 1
},
if Remaining == 0 -> {FunAcc1, State3};
true -> deliver_queue(Fun, FunAcc1, State3)
@@ -238,7 +241,8 @@ deliver_queue(Fun, FunAcc0,
deliver_from_queue(is_message_ready, undefined, #q { mixed_state = MS }) ->
not rabbit_mixed_queue:is_empty(MS);
-deliver_from_queue(AckRequired, Acc = undefined, State = #q { mixed_state = MS }) ->
+deliver_from_queue(AckRequired, Acc = undefined,
+ State = #q { mixed_state = MS }) ->
{Res, MS2} = rabbit_mixed_queue:deliver(MS),
MS3 = case {Res, AckRequired} of
{_, true} -> MS2;
@@ -250,7 +254,8 @@ deliver_from_queue(AckRequired, Acc = undefined, State = #q { mixed_state = MS }
{Res, Acc, State #q { mixed_state = MS3 }}.
run_message_queue(State) ->
- {undefined, State2} = deliver_queue(fun deliver_from_queue/3, undefined, State),
+ {undefined, State2} =
+ deliver_queue(fun deliver_from_queue/3, undefined, State),
State2.
attempt_immediate_delivery(none, _ChPid, Msg, State) ->
@@ -260,8 +265,9 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) ->
(AckRequired, false, State2) ->
{AckTag, State3} =
if AckRequired ->
- {ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered(
- Msg, State2 #q.mixed_state),
+ {ok, AckTag2, MS} =
+ rabbit_mixed_queue:publish_delivered(
+ Msg, State2 #q.mixed_state),
{AckTag2, State2 #q { mixed_state = MS }};
true ->
{noack, State2}
@@ -290,19 +296,24 @@ deliver_or_requeue_n([], State) ->
run_message_queue(State);
deliver_or_requeue_n(MsgsWithAcks, State) ->
{{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} =
- deliver_queue(fun deliver_or_requeue_msgs/3, {length(MsgsWithAcks) - 1, [], MsgsWithAcks}, State),
- {ok, MS} = rabbit_mixed_queue:ack(lists:reverse(AutoAcks), NewState #q.mixed_state),
+ deliver_queue(fun deliver_or_requeue_msgs/3,
+ {length(MsgsWithAcks) - 1, [], MsgsWithAcks}, State),
+ {ok, MS} = rabbit_mixed_queue:ack(lists:reverse(AutoAcks),
+ NewState #q.mixed_state),
case OutstandingMsgs of
[] -> run_message_queue(NewState #q { mixed_state = MS });
_ -> {ok, MS2} = rabbit_mixed_queue:requeue(OutstandingMsgs, MS),
NewState #q { mixed_state = MS2 }
end.
-deliver_or_requeue_msgs(is_message_ready, {Len, _AcksAcc, _MsgsWithAcks}, _State) ->
+deliver_or_requeue_msgs(is_message_ready, {Len, _AcksAcc, _MsgsWithAcks},
+ _State) ->
-1 < Len;
-deliver_or_requeue_msgs(false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->
+deliver_or_requeue_msgs(false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]},
+ State) ->
{{Msg, true, noack, Len}, {Len - 1, [AckTag|AcksAcc], MsgsWithAcks}, State};
-deliver_or_requeue_msgs(true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->
+deliver_or_requeue_msgs(true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]},
+ State) ->
{{Msg, true, AckTag, Len}, {Len - 1, AcksAcc, MsgsWithAcks}, State}.
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
@@ -426,8 +437,10 @@ all_tx_record() ->
all_tx() ->
[Txn || {{txn, Txn}, _} <- get()].
-record_pending_message(Txn, ChPid, Message = #basic_message { is_persistent = IsPersistent }) ->
- Tx = #tx{pending_messages = Pending, is_persistent = IsPersistentTxn } = lookup_tx(Txn),
+record_pending_message(Txn, ChPid, Message =
+ #basic_message { is_persistent = IsPersistent }) ->
+ Tx = #tx{pending_messages = Pending, is_persistent = IsPersistentTxn } =
+ lookup_tx(Txn),
record_current_channel_tx(ChPid, Txn),
store_tx(Txn, Tx #tx { pending_messages = [Message | Pending],
is_persistent = IsPersistentTxn orelse IsPersistent
@@ -465,7 +478,8 @@ commit_transaction(Txn, State) ->
rollback_transaction(Txn, State) ->
#tx { pending_messages = PendingMessages
} = lookup_tx(Txn),
- {ok, MS} = rabbit_mixed_queue:tx_cancel(lists:reverse(PendingMessages), State #q.mixed_state),
+ {ok, MS} = rabbit_mixed_queue:tx_cancel(lists:reverse(PendingMessages),
+ State #q.mixed_state),
erase_tx(Txn),
State #q { mixed_state = MS }.
@@ -534,7 +548,8 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, NewState} = attempt_immediate_delivery(Txn, ChPid, Message, State),
+ {Delivered, NewState} =
+ attempt_immediate_delivery(Txn, ChPid, Message, State),
reply(Delivered, NewState);
handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
@@ -682,8 +697,8 @@ handle_call(purge, _From, State) ->
reply({ok, Count},
State #q { mixed_state = MS });
-handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
- exclusive_consumer = Holder}) ->
+handle_call({claim_queue, ReaderPid}, _From,
+ State = #q{owner = Owner, exclusive_consumer = Holder}) ->
case Owner of
none ->
case check_exclusive_access(Holder, true, State) of
@@ -696,7 +711,10 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
%% pid...
reply(locked, State);
ok ->
- reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}})
+ reply(ok, State #q { owner =
+ {ReaderPid,
+ erlang:monitor(process, ReaderPid)} })
+
end;
{ReaderPid, _MonitorRef} ->
reply(ok, State);
@@ -717,8 +735,10 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
{MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM),
case Txn of
none ->
- Acks = lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks),
- {ok, MS} = rabbit_mixed_queue:ack(Acks, State #q.mixed_state),
+ Acks = lists:map(fun ({_Msg, AckTag}) -> AckTag end,
+ MsgWithAcks),
+ {ok, MS} =
+ rabbit_mixed_queue:ack(Acks, State #q.mixed_state),
store_ch_record(C#cr{unacked_messages = Remaining}),
noreply(State #q { mixed_state = MS });
_ ->
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 3b30a0da82..f609063432 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -46,7 +46,8 @@
-export([length/1, is_empty/1, next_write_seq/1]).
--export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]).
+-export([stop/0, stop_and_obliterate/0,
+ to_disk_only_mode/0, to_ram_disk_mode/0]).
-include("rabbit.hrl").
@@ -67,20 +68,22 @@
-define(MAX_READ_FILE_HANDLES, 256).
-define(FILE_SIZE_LIMIT, (256*1024*1024)).
--record(dqstate, {msg_location_dets, %% where are messages?
- msg_location_ets, %% as above, but for ets version
- operation_mode, %% ram_disk | disk_only
- file_summary, %% what's in the files?
- sequences, %% next read and write for each q
- current_file_num, %% current file name as number
- current_file_name, %% current file name
- current_file_handle, %% current file handle
- current_offset, %% current offset within current file
- current_dirty, %% has the current file been written to since the last fsync?
- file_size_limit, %% how big can our files get?
- read_file_handles, %% file handles for reading (LRU)
- read_file_handles_limit %% how many file handles can we open?
- }).
+-record(dqstate,
+ {msg_location_dets, %% where are messages?
+ msg_location_ets, %% as above, but for ets version
+ operation_mode, %% ram_disk | disk_only
+ file_summary, %% what's in the files?
+ sequences, %% next read and write for each q
+ current_file_num, %% current file name as number
+ current_file_name, %% current file name
+ current_file_handle, %% current file handle
+ current_offset, %% current offset within current file
+ current_dirty, %% has the current file been written to
+ %% since the last fsync?
+ file_size_limit, %% how big can our files get?
+ read_file_handles, %% file handles for reading (LRU)
+ read_file_handles_limit %% how many file handles can we open?
+ }).
%% The components:
%%
@@ -233,23 +236,28 @@
-spec(start_link/0 :: () ->
{'ok', pid()} | 'ignore' | {'error', any()}).
-spec(publish/4 :: (queue_name(), msg_id(), binary(), bool()) -> 'ok').
--spec(publish_with_seq/5 :: (queue_name(), msg_id(), seq_id_or_next(), binary(), bool()) -> 'ok').
+-spec(publish_with_seq/5 :: (queue_name(), msg_id(), seq_id_or_next(), binary(),
+ bool()) -> 'ok').
-spec(deliver/1 :: (queue_name()) ->
{'empty' | {msg_id(), binary(), non_neg_integer(),
bool(), {msg_id(), seq_id()}, non_neg_integer()}}).
-spec(phantom_deliver/1 :: (queue_name()) ->
- { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}, non_neg_integer()}}).
+ { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()},
+ non_neg_integer()}}).
-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
-spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok').
--spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) -> 'ok').
+-spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) ->
+ 'ok').
-spec(tx_commit_with_seqs/3 :: (queue_name(), [{msg_id(), seq_id_or_next()}],
[{msg_id(), seq_id()}]) -> 'ok').
-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
-spec(requeue/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
--spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id_or_next()}]) -> 'ok').
+-spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()},
+ seq_id_or_next()}]) -> 'ok').
-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
--spec(dump_queue/1 :: (queue_name()) -> [{msg_id(), binary(), non_neg_integer(),
- bool(), {msg_id(), seq_id()}, seq_id()}]).
+-spec(dump_queue/1 :: (queue_name()) ->
+ [{msg_id(), binary(), non_neg_integer(), bool(),
+ {msg_id(), seq_id()}, seq_id()}]).
-spec(delete_non_durable_queues/1 :: (set()) -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
@@ -321,7 +329,8 @@ dump_queue(Q) ->
gen_server2:call(?SERVER, {dump_queue, Q}, infinity).
delete_non_durable_queues(DurableQueues) ->
- gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues}, infinity).
+ gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues},
+ infinity).
stop() ->
gen_server2:call(?SERVER, stop, infinity).
@@ -483,7 +492,8 @@ handle_call(to_ram_disk_mode, _From,
handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) ->
{_ReadSeqId, _WriteSeqId, Length} = sequence_lookup(Sequences, Q),
{reply, Length, State};
-handle_call({next_write_seq, Q}, _From, State = #dqstate { sequences = Sequences }) ->
+handle_call({next_write_seq, Q}, _From,
+ State = #dqstate { sequences = Sequences }) ->
{_ReadSeqId, WriteSeqId, _Length} = sequence_lookup(Sequences, Q),
{reply, WriteSeqId, State};
handle_call({dump_queue, Q}, _From, State) ->
@@ -494,10 +504,12 @@ handle_call({delete_non_durable_queues, DurableQueues}, _From, State) ->
{reply, ok, State1}.
handle_cast({publish, Q, MsgId, MsgBody}, State) ->
- {ok, _MsgSeqId, State1} = internal_publish(Q, MsgId, next, MsgBody, false, State),
+ {ok, _MsgSeqId, State1} =
+ internal_publish(Q, MsgId, next, MsgBody, false, State),
{noreply, State1};
handle_cast({publish_with_seq, Q, MsgId, SeqId, MsgBody}, State) ->
- {ok, _MsgSeqId, State1} = internal_publish(Q, MsgId, SeqId, MsgBody, false, State),
+ {ok, _MsgSeqId, State1} =
+ internal_publish(Q, MsgId, SeqId, MsgBody, false, State),
{noreply, State1};
handle_cast({ack, Q, MsgSeqIds}, State) ->
{ok, State1} = internal_ack(Q, MsgSeqIds, State),
@@ -702,14 +714,16 @@ sequence_lookup(Sequences, Q) ->
%% ---- INTERNAL RAW FUNCTIONS ----
-internal_deliver(Q, ReadMsg, FakeDeliver, State = #dqstate { sequences = Sequences }) ->
+internal_deliver(Q, ReadMsg, FakeDeliver,
+ State = #dqstate { sequences = Sequences }) ->
case ets:lookup(Sequences, Q) of
[] -> {ok, empty, State};
[{Q, SeqId, SeqId, 0}] -> {ok, empty, State};
[{Q, ReadSeqId, WriteSeqId, Length}] when Length > 0 ->
Remaining = Length - 1,
{ok, Result, NextReadSeqId, State1} =
- internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State),
+ internal_read_message(
+ Q, ReadSeqId, FakeDeliver, ReadMsg, State),
true = ets:insert(Sequences,
{Q, NextReadSeqId, WriteSeqId, Remaining}),
{ok,
@@ -873,7 +887,8 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
_TotalSize}] = dets_ets_lookup(State, MsgId),
SeqId2 = adjust_last_msg_seq_id(
Q, ExpectedSeqId, SeqId, write),
- NextSeqId2 = find_next_seq_id(SeqId2, NextSeqId),
+ NextSeqId2 =
+ find_next_seq_id(SeqId2, NextSeqId),
ok = mnesia:write(
rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id =
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 9f3dcbd071..92078acd40 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -101,8 +101,10 @@ ack(LimiterPid, Count) -> gen_server2:cast(LimiterPid, {ack, Count}).
register(undefined, _QPid) -> ok;
register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}).
-unregister(undefined, _QPid) -> ok;
-unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}).
+unregister(undefined, _QPid) ->
+ ok;
+unregister(LimiterPid, QPid) ->
+ gen_server2:cast(LimiterPid, {unregister, QPid}).
%%----------------------------------------------------------------------------
%% gen_server callbacks
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index c965c69314..bf4a69db2a 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -349,7 +349,8 @@ dirty_foreach_key1(F, TableName, K) ->
end.
dirty_dump_log(FileName) ->
- {ok, LH} = disk_log:open([{name, dirty_dump_log}, {mode, read_only}, {file, FileName}]),
+ {ok, LH} = disk_log:open([{name, dirty_dump_log}, {mode, read_only},
+ {file, FileName}]),
dirty_dump_log1(LH, disk_log:chunk(LH, start)),
disk_log:close(LH).
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 5082fe55d3..5933357cdd 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -118,7 +118,8 @@ purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q,
{Acks, Requeue, Length} =
deliver_all_messages(Q, IsDurable, [], [], 0),
ok = if Requeue == [] -> ok;
- true -> rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue))
+ true ->
+ rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue))
end,
ok = if Acks == [] -> ok;
true -> rabbit_disk_queue:ack(Q, lists:reverse(Acks))
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 77e309fe84..9e34158427 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -188,7 +188,8 @@ ensure_mnesia_not_running() ->
check_schema_integrity() ->
%%TODO: more thorough checks
- case catch [mnesia:table_info(Tab, version) || Tab <- replicated_table_names()] of
+ case catch [mnesia:table_info(Tab, version)
+ || Tab <- replicated_table_names()] of
{'EXIT', Reason} -> {error, Reason};
_ -> ok
end.
@@ -353,12 +354,14 @@ create_local_table_copies(Type) ->
HasDiscCopies =
case lists:keysearch(disc_copies, 1, TabDef) of
false -> false;
- {value, {disc_copies, List1}} -> lists:member(node(), List1)
+ {value, {disc_copies, List1}} ->
+ lists:member(node(), List1)
end,
HasDiscOnlyCopies =
case lists:keysearch(disc_only_copies, 1, TabDef) of
false -> false;
- {value, {disc_only_copies, List2}} -> lists:member(node(), List2)
+ {value, {disc_only_copies, List2}} ->
+ lists:member(node(), List2)
end,
StorageType =
case Type of
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
index 32ad6b4cfe..c37cb842a2 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_queue_mode_manager.erl
@@ -69,28 +69,29 @@ init([]) ->
queues = []
}}.
-handle_call({register, Pid}, _From, State = #state { queues = Qs, mode = Mode }) ->
+handle_call({register, Pid}, _From,
+ State = #state { queues = Qs, mode = Mode }) ->
Result = case Mode of
unlimited -> mixed;
_ -> disk
end,
{reply, {ok, Result}, State #state { queues = [Pid | Qs] }}.
-handle_cast({change_memory_usage, true}, State = #state { mode = disk_only }) ->
+handle_cast({change_memory_usage, true}, State = #state { mode=disk_only }) ->
{noreply, State};
-handle_cast({change_memory_usage, true}, State = #state { mode = ram_disk }) ->
+handle_cast({change_memory_usage, true}, State = #state { mode=ram_disk }) ->
constrain_queues(true, State #state.queues),
{noreply, State #state { mode = disk_only }};
-handle_cast({change_memory_usage, true}, State = #state { mode = unlimited }) ->
+handle_cast({change_memory_usage, true}, State = #state { mode=unlimited }) ->
ok = rabbit_disk_queue:to_disk_only_mode(),
{noreply, State #state { mode = ram_disk }};
-handle_cast({change_memory_usage, false}, State = #state { mode = unlimited }) ->
+handle_cast({change_memory_usage, false}, State = #state { mode=unlimited }) ->
{noreply, State};
-handle_cast({change_memory_usage, false}, State = #state { mode = ram_disk }) ->
+handle_cast({change_memory_usage, false}, State = #state { mode=ram_disk }) ->
ok = rabbit_disk_queue:to_ram_disk_mode(),
{noreply, State #state { mode = unlimited }};
-handle_cast({change_memory_usage, false}, State = #state { mode = disk_only }) ->
+handle_cast({change_memory_usage, false}, State = #state { mode=disk_only }) ->
constrain_queues(false, State #state.queues),
{noreply, State #state { mode = ram_disk }}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a2a31a181a..62d5c03ac1 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -730,17 +730,18 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
|| Q <- Qs] end
]]),
{Deliver, ok} =
- timer:tc(?MODULE, rdq_time_commands,
- [[fun() -> [begin SeqIds =
- [begin
- Remaining = MsgCount - N,
- {N, Msg, MsgSizeBytes, false, SeqId, Remaining} =
- rabbit_disk_queue:deliver(Q),
- SeqId
- end || N <- List],
- ok = rabbit_disk_queue:tx_commit(Q, [], SeqIds)
- end || Q <- Qs]
- end]]),
+ timer:tc(
+ ?MODULE, rdq_time_commands,
+ [[fun() -> [begin SeqIds =
+ [begin
+ Remaining = MsgCount - N,
+ {N, Msg, MsgSizeBytes, false, SeqId,
+ Remaining} = rabbit_disk_queue:deliver(Q),
+ SeqId
+ end || N <- List],
+ ok = rabbit_disk_queue:tx_commit(Q, [], SeqIds)
+ end || Q <- Qs]
+ end]]),
io:format(" ~15.10B| ~14.10B| ~14.10B| ~14.1f| ~14.1f| ~14.6f| ~14.10f| ~14.1f| ~14.6f| ~14.10f~n",
[MsgCount, MsgSizeBytes, QCount, float(Startup),
float(Publish), (Publish / (MsgCount * QCount)),
@@ -749,8 +750,9 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
(Deliver / (MsgCount * QCount * MsgSizeBytes))]),
rdq_stop().
-% we know each file is going to be 1024*1024*10 bytes in size (10MB), so make sure we have
-% several files, and then keep punching holes in a reasonably sensible way.
+% we know each file is going to be 1024*1024*10 bytes in size (10MB),
+% so make sure we have several files, and then keep punching holes in
+% a reasonably sensible way.
rdq_stress_gc(MsgCount) ->
rdq_virgin(),
rdq_start(),
@@ -804,7 +806,8 @@ rdq_test_startup_with_queue_gaps() ->
%% deliver first half
Seqs = [begin
Remaining = Total - N,
- {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), SeqId
+ {N, Msg, 256, false, SeqId, Remaining} =
+ rabbit_disk_queue:deliver(q), SeqId
end || N <- lists:seq(1,Half)],
io:format("Deliver first half done~n", []),
%% ack every other message we have delivered (starting at the _first_)
@@ -819,10 +822,12 @@ rdq_test_startup_with_queue_gaps() ->
rdq_stop(),
rdq_start(),
io:format("Startup (with shuffle) done~n", []),
- %% should have shuffled up. So we should now get lists:seq(2,500,2) already delivered
+ %% should have shuffled up. So we should now get
+ %% lists:seq(2,500,2) already delivered
Seqs2 = [begin
Remaining = round(Total - ((Half + N)/2)),
- {N, Msg, 256, true, SeqId, Remaining} = rabbit_disk_queue:deliver(q),
+ {N, Msg, 256, true, SeqId, Remaining} =
+ rabbit_disk_queue:deliver(q),
SeqId
end || N <- lists:seq(2,Half,2)],
rabbit_disk_queue:tx_commit(q, [], Seqs2),
@@ -830,7 +835,8 @@ rdq_test_startup_with_queue_gaps() ->
%% and now fetch the rest
Seqs3 = [begin
Remaining = Total - N,
- {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q),
+ {N, Msg, 256, false, SeqId, Remaining} =
+ rabbit_disk_queue:deliver(q),
SeqId
end || N <- lists:seq(1 + Half,Total)],
rabbit_disk_queue:tx_commit(q, [], Seqs3),
@@ -852,7 +858,8 @@ rdq_test_redeliver() ->
%% deliver first half
Seqs = [begin
Remaining = Total - N,
- {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q),
+ {N, Msg, 256, false, SeqId, Remaining} =
+ rabbit_disk_queue:deliver(q),
SeqId
end || N <- lists:seq(1,Half)],
io:format("Deliver first half done~n", []),
@@ -867,16 +874,19 @@ rdq_test_redeliver() ->
end, true, Seqs),
rabbit_disk_queue:tx_commit(q, [], []),
io:format("Redeliver and acking done~n", []),
- %% we should now get the 2nd half in order, followed by every-other-from-the-first-half
+ %% we should now get the 2nd half in order, followed by
+ %% every-other-from-the-first-half
Seqs2 = [begin
Remaining = round(Total - N + (Half/2)),
- {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q),
+ {N, Msg, 256, false, SeqId, Remaining} =
+ rabbit_disk_queue:deliver(q),
SeqId
end || N <- lists:seq(1+Half, Total)],
rabbit_disk_queue:tx_commit(q, [], Seqs2),
Seqs3 = [begin
Remaining = round((Half - N) / 2) - 1,
- {N, Msg, 256, true, SeqId, Remaining} = rabbit_disk_queue:deliver(q),
+ {N, Msg, 256, true, SeqId, Remaining} =
+ rabbit_disk_queue:deliver(q),
SeqId
end || N <- lists:seq(1, Half, 2)],
rabbit_disk_queue:tx_commit(q, [], Seqs3),
@@ -897,7 +907,8 @@ rdq_test_purge() ->
%% deliver first half
Seqs = [begin
Remaining = Total - N,
- {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q),
+ {N, Msg, 256, false, SeqId, Remaining} =
+ rabbit_disk_queue:deliver(q),
SeqId
end || N <- lists:seq(1,Half)],
io:format("Deliver first half done~n", []),
@@ -926,7 +937,8 @@ rdq_test_dump_queue() ->
lists:foreach(
fun (N) ->
Remaining = Total - N,
- {N, Msg, 256, false, _SeqId, Remaining} = rabbit_disk_queue:deliver(q)
+ {N, Msg, 256, false, _SeqId, Remaining} =
+ rabbit_disk_queue:deliver(q)
end, All),
[] = rabbit_disk_queue:dump_queue(q),
rdq_stop(),
@@ -943,22 +955,25 @@ rdq_test_mixed_queue_modes() ->
rdq_start(),
Payload = <<0:(8*256)>>,
{ok, MS} = rabbit_mixed_queue:start_link(q, true, mixed),
- MS2 = lists:foldl(fun (_N, MS1) ->
- Msg = rabbit_basic:message(x, <<>>, <<>>, Payload),
- {ok, MS1a} = rabbit_mixed_queue:publish(Msg, MS1),
- MS1a
- end, MS, lists:seq(1,10)),
- MS4 = lists:foldl(fun (_N, MS3) ->
- Msg = (rabbit_basic:message(x, <<>>, <<>>, Payload))
- #basic_message { is_persistent = true },
- {ok, MS3a} = rabbit_mixed_queue:publish(Msg, MS3),
- MS3a
- end, MS2, lists:seq(1,10)),
- MS6 = lists:foldl(fun (_N, MS5) ->
- Msg = rabbit_basic:message(x, <<>>, <<>>, Payload),
- {ok, MS5a} = rabbit_mixed_queue:publish(Msg, MS5),
- MS5a
- end, MS4, lists:seq(1,10)),
+ MS2 = lists:foldl(
+ fun (_N, MS1) ->
+ Msg = rabbit_basic:message(x, <<>>, <<>>, Payload),
+ {ok, MS1a} = rabbit_mixed_queue:publish(Msg, MS1),
+ MS1a
+ end, MS, lists:seq(1,10)),
+ MS4 = lists:foldl(
+ fun (_N, MS3) ->
+ Msg = (rabbit_basic:message(x, <<>>, <<>>, Payload))
+ #basic_message { is_persistent = true },
+ {ok, MS3a} = rabbit_mixed_queue:publish(Msg, MS3),
+ MS3a
+ end, MS2, lists:seq(1,10)),
+ MS6 = lists:foldl(
+ fun (_N, MS5) ->
+ Msg = rabbit_basic:message(x, <<>>, <<>>, Payload),
+ {ok, MS5a} = rabbit_mixed_queue:publish(Msg, MS5),
+ MS5a
+ end, MS4, lists:seq(1,10)),
30 = rabbit_mixed_queue:length(MS6),
io:format("Published a mixture of messages~n"),
{ok, MS7} = rabbit_mixed_queue:to_disk_only_mode(MS6),