diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_access_control.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_alarm.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 67 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_guid.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_persister.erl | 54 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 3 |
10 files changed, 108 insertions, 76 deletions
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index da0ab9cf7a..54348d9a1c 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -192,7 +192,7 @@ delete_user(Username) -> fun () -> ok = mnesia:delete({rabbit_user, Username}), [ok = mnesia:delete_object( - rabbit_user_permissions, R, write) || + rabbit_user_permission, R, write) || R <- mnesia:match_object( rabbit_user_permission, #user_permission{user_vhost = #user_vhost{ diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 875624ba55..21999f16c3 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -78,7 +78,8 @@ stop() -> register(Pid, HighMemMFA) -> ok = gen_event:call(alarm_handler, ?MODULE, - {register, Pid, HighMemMFA}). + {register, Pid, HighMemMFA}, + infinity). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 59b4d039e7..06bb18f5d3 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -124,19 +124,32 @@ recover() -> recover_durable_queues() -> Node = node(), - %% TODO: use dirty ops instead - R = rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} + lists:foreach( + fun (RecoveredQ) -> + Q = start_queue_process(RecoveredQ), + %% We need to catch the case where a client connected to + %% another node has deleted the queue (and possibly + %% re-created it). + case rabbit_misc:execute_mnesia_transaction( + fun () -> case mnesia:match_object( + rabbit_durable_queue, RecoveredQ, read) of + [_] -> ok = store_queue(Q), + true; + [] -> false + end + end) of + true -> ok; + false -> exit(Q#amqqueue.pid, shutdown) + end + end, + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} <- mnesia:table(rabbit_durable_queue), - node(Pid) == Node])) - end), - Queues = lists:map(fun start_queue_process/1, R), - rabbit_misc:execute_mnesia_transaction( - fun () -> - lists:foreach(fun store_queue/1, Queues), - ok - end). + node(Pid) == Node])) + end)), + ok. declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -200,10 +213,10 @@ list(VHostPath) -> map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - gen_server2:pcall(QPid, 9, info). + gen_server2:pcall(QPid, 9, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case gen_server2:pcall(QPid, 9, {info, Items}) of + case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -212,20 +225,20 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). -stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat). +stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity). stat_all() -> lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - gen_server2:call(QPid, {delete, IfUnused, IfEmpty}). + gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity). -purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge). +purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). deliver(_IsMandatory, true, Txn, Message, QPid) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message}); + gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity); deliver(true, _IsImmediate, Txn, Message, QPid) -> - gen_server2:call(QPid, {deliver, Txn, Message}), + gen_server2:call(QPid, {deliver, Txn, Message}, infinity), true; deliver(false, _IsImmediate, Txn, Message, QPid) -> gen_server2:cast(QPid, {deliver, Txn, Message}), @@ -241,10 +254,9 @@ ack(QPid, Txn, MsgIds, ChPid) -> gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> - Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end, QPids). rollback_all(QPids, Txn) -> @@ -254,12 +266,11 @@ rollback_all(QPids, Txn) -> QPids). notify_down_all(QPids, ChPid) -> - Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( %% we don't care if the queue process has terminated in the %% meantime fun (_) -> ok end, - fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end, QPids). limit_all(QPids, ChPid, LimiterPid) -> @@ -269,18 +280,20 @@ limit_all(QPids, ChPid, LimiterPid) -> QPids). claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - gen_server2:call(QPid, {claim_queue, ReaderPid}). + gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - gen_server2:call(QPid, {basic_get, ChPid, NoAck}). + gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity). basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, - LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, + infinity). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). + ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, + infinity). notify_sent(QPid, ChPid) -> gen_server2:cast(QPid, {notify_sent, ChPid}). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 19efd9fc22..a57e8076bf 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -103,24 +103,17 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, arguments]. recover() -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - mnesia:foldl( - fun (Exchange, Acc) -> - ok = mnesia:write(rabbit_exchange, Exchange, write), - Acc - end, ok, rabbit_durable_exchange), - mnesia:foldl( - fun (Route, Acc) -> - {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(rabbit_route, - Route, write), - ok = mnesia:write(rabbit_reverse_route, - ReverseRoute, write), - Acc - end, ok, rabbit_durable_route), - ok - end). + ok = rabbit_misc:table_foreach( + fun(Exchange) -> ok = mnesia:write(rabbit_exchange, + Exchange, write) + end, rabbit_durable_exchange), + ok = rabbit_misc:table_foreach( + fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(rabbit_route, + Route, write), + ok = mnesia:write(rabbit_reverse_route, + ReverseRoute, write) + end, rabbit_durable_route). declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange = #exchange{name = ExchangeName, diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 51c1665bbb..2be005034e 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -82,7 +82,8 @@ guid() -> %% and time. We combine that with a process-local counter to give %% us a GUID that is monotonically increasing per process. G = case get(guid) of - undefined -> {{gen_server:call(?SERVER, serial), self()}, 0}; + undefined -> {{gen_server:call(?SERVER, serial, infinity), self()}, + 0}; {S, I} -> {S, I+1} end, put(guid, G), diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 532be26d8e..3f9b6ebb9b 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -90,7 +90,7 @@ can_send(undefined, _QPid) -> can_send(LimiterPid, QPid) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid}) end). + fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end). %% Let the limiter know that the channel has received some acks from a %% consumer diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 5d176f8fac..de7bc010b2 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -46,6 +46,7 @@ -export([ensure_ok/2]). -export([localnode/1, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). +-export([table_foreach/2]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). @@ -98,6 +99,7 @@ -spec(intersperse/2 :: (A, [A]) -> [A]). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). +-spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok'). -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). @@ -298,6 +300,21 @@ map_in_order(F, L) -> lists:reverse( lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)). +%% For each entry in a table, execute a function in a transaction. +%% This is often far more efficient than wrapping a tx around the lot. +%% +%% We ignore entries that have been modified or removed. +table_foreach(F, TableName) -> + lists:foreach( + fun (E) -> execute_mnesia_transaction( + fun () -> case mnesia:match_object(TableName, E, read) of + [] -> ok; + _ -> F(E) + end + end) + end, dirty_read_all(TableName)), + ok. + dirty_read_all(TableName) -> mnesia:dirty_select(TableName, [{'$1',[],['$1']}]). diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 94033a4f3d..f4fa45993a 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -49,6 +49,8 @@ -define(LOG_BUNDLE_DELAY, 5). -define(COMPLETE_BUNDLE_DELAY, 2). +-define(HIBERNATE_AFTER, 10000). + -define(MAX_WRAP_ENTRIES, 500). -define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}). @@ -93,7 +95,7 @@ start_link() -> transaction(MessageList) -> ?LOGDEBUG("transaction ~p~n", [MessageList]), TxnKey = rabbit_guid:guid(), - gen_server:call(?SERVER, {transaction, TxnKey, MessageList}). + gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity). extend_transaction(TxnKey, MessageList) -> ?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]), @@ -105,17 +107,17 @@ dirty_work(MessageList) -> commit_transaction(TxnKey) -> ?LOGDEBUG("commit_transaction ~p~n", [TxnKey]), - gen_server:call(?SERVER, {commit_transaction, TxnKey}). + gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity). rollback_transaction(TxnKey) -> ?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]), gen_server:cast(?SERVER, {rollback_transaction, TxnKey}). force_snapshot() -> - gen_server:call(?SERVER, force_snapshot). + gen_server:call(?SERVER, force_snapshot, infinity). serial() -> - gen_server:call(?SERVER, serial). + gen_server:call(?SERVER, serial, infinity). %%-------------------------------------------------------------------- @@ -164,10 +166,8 @@ handle_call({transaction, Key, MessageList}, From, State) -> do_noreply(internal_commit(From, Key, NewState)); handle_call({commit_transaction, TxnKey}, From, State) -> do_noreply(internal_commit(From, TxnKey, State)); -handle_call(force_snapshot, _From, State = #pstate{log_handle = LH, - snapshot = Snapshot}) -> - ok = take_snapshot(LH, Snapshot), - do_reply(ok, State); +handle_call(force_snapshot, _From, State) -> + do_reply(ok, flush(true, State)); handle_call(serial, _From, State = #pstate{snapshot = #psnapshot{serial = Serial}}) -> do_reply(Serial, State); @@ -183,8 +183,13 @@ handle_cast({extend_transaction, TxnKey, MessageList}, State) -> handle_cast(_Msg, State) -> {noreply, State}. +handle_info(timeout, State = #pstate{deadline = infinity}) -> + State1 = flush(true, State), + %% TODO: Once we drop support for R11B-5, we can change this to + %% {noreply, State1, hibernate}; + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]); handle_info(timeout, State) -> - {noreply, flush(State)}; + do_noreply(flush(State)); handle_info(_Info, State) -> {noreply, State}. @@ -275,12 +280,13 @@ take_snapshot_and_save_old(LogHandle, Snapshot) -> rabbit_log:info("Saving persister log in ~p~n", [OldFileName]), ok = take_snapshot(LogHandle, OldFileName, Snapshot). -maybe_take_snapshot(State = #pstate{entry_count = EntryCount, log_handle = LH, - snapshot = Snapshot}) - when EntryCount >= ?MAX_WRAP_ENTRIES -> +maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount, + log_handle = LH, + snapshot = Snapshot}) + when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES -> ok = take_snapshot(LH, Snapshot), State#pstate{entry_count = 0}; -maybe_take_snapshot(State) -> +maybe_take_snapshot(_Force, State) -> State. later_ms(DeltaMilliSec) -> @@ -298,7 +304,7 @@ compute_deadline(_TimerDelay, ExistingDeadline) -> ExistingDeadline. compute_timeout(infinity) -> - infinity; + ?HIBERNATE_AFTER; compute_timeout(Deadline) -> DeltaMilliSec = time_diff(Deadline, now()) * 1000.0, if @@ -314,18 +320,18 @@ do_noreply(State = #pstate{deadline = Deadline}) -> do_reply(Reply, State = #pstate{deadline = Deadline}) -> {reply, Reply, State, compute_timeout(Deadline)}. -flush(State = #pstate{pending_logs = PendingLogs, - pending_replies = Waiting, - log_handle = LogHandle}) -> - State1 = if - PendingLogs /= [] -> +flush(State) -> flush(false, State). + +flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs, + pending_replies = Waiting, + log_handle = LogHandle}) -> + State1 = if PendingLogs /= [] -> disk_log:alog(LogHandle, lists:reverse(PendingLogs)), - maybe_take_snapshot( - State#pstate{ - entry_count = State#pstate.entry_count + 1}); - true -> + State#pstate{entry_count = State#pstate.entry_count + 1}; + true -> State end, + State2 = maybe_take_snapshot(ForceSnapshot, State1), if Waiting /= [] -> ok = disk_log:sync(LogHandle), lists:foreach(fun (From) -> gen_server:reply(From, ok) end, @@ -333,7 +339,7 @@ flush(State = #pstate{pending_logs = PendingLogs, true -> ok end, - State1#pstate{deadline = infinity, + State2#pstate{deadline = infinity, pending_logs = [], pending_replies = []}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 9f642e3540..ef8038e7aa 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -161,10 +161,10 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. info(Pid) -> - gen_server:call(Pid, info). + gen_server:call(Pid, info, infinity). info(Pid, Items) -> - case gen_server:call(Pid, {info, Items}) of + case gen_server:call(Pid, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ff42ea0460..0b06a063a7 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -112,7 +112,8 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, fun ({Node, QPids}) -> try gen_server2:call( {?SERVER, Node}, - {deliver, QPids, Mandatory, Immediate, Txn, Message}) + {deliver, QPids, Mandatory, Immediate, Txn, Message}, + infinity) catch _Class:_Reason -> %% TODO: figure out what to log (and do!) here |
