diff options
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_persister.erl | 111 |
2 files changed, 39 insertions, 81 deletions
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 5620fab3d7..7765069fb9 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -62,11 +62,10 @@ start(DurableQueues) -> ok = rabbit_sup:start_child(rabbit_persister, [DurableQueues]). init(QName, IsDurable) -> - List = case IsDurable of - true -> rabbit_persister:fetch_content(QName); - false -> [] - end, - Q = queue:from_list(List), + Q = queue:from_list(case IsDurable of + true -> rabbit_persister:queue_content(QName); + false -> [] + end), #iv_state { queue = Q, qname = QName, len = queue:len(Q), pending_ack = dict:new() }. diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index f16b7a33cd..ba9430fe2e 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -40,7 +40,7 @@ -export([transaction/1, extend_transaction/2, dirty_work/1, commit_transaction/1, rollback_transaction/1, - force_snapshot/0, fetch_content/1]). + force_snapshot/0, queue_content/1]). -include("rabbit.hrl"). @@ -56,8 +56,7 @@ -define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}). -record(pstate, {log_handle, entry_count, deadline, - pending_logs, pending_replies, - snapshot, recovered_content}). + pending_logs, pending_replies, snapshot}). %% two tables for efficient persistency %% one maps a key to a message @@ -84,7 +83,7 @@ -spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). -spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). -spec(force_snapshot/0 :: () -> 'ok'). --spec(fetch_content/1 :: (queue_name()) -> [{message(), boolean()}]). +-spec(queue_content/1 :: (queue_name()) -> [{message(), boolean()}]). -endif. @@ -117,8 +116,8 @@ rollback_transaction(TxnKey) -> force_snapshot() -> gen_server:call(?SERVER, force_snapshot, infinity). -fetch_content(QName) -> - gen_server:call(?SERVER, {fetch_content, QName}, infinity). +queue_content(QName) -> + gen_server:call(?SERVER, {queue_content, QName}, infinity). %%-------------------------------------------------------------------- @@ -128,7 +127,7 @@ init([DurableQueues]) -> ok = filelib:ensure_dir(FileName), Snapshot = #psnapshot{transactions = dict:new(), messages = ets:new(messages, []), - queues = ets:new(queues, []), + queues = ets:new(queues, [ordered_set]), next_seq_id = 0}, LogHandle = case disk_log:open([{name, rabbit_persister}, @@ -144,7 +143,7 @@ init([DurableQueues]) -> [Recovered, Bad]), LH end, - {Res, RecoveredContent, NewSnapshot} = + {Res, NewSnapshot} = internal_load_snapshot(LogHandle, DurableQueues, Snapshot), case Res of ok -> @@ -158,8 +157,7 @@ init([DurableQueues]) -> deadline = infinity, pending_logs = [], pending_replies = [], - snapshot = NewSnapshot, - recovered_content = RecoveredContent}, + snapshot = NewSnapshot}, {ok, State}. handle_call({transaction, Key, MessageList}, From, State) -> @@ -169,13 +167,13 @@ handle_call({commit_transaction, TxnKey}, From, State) -> do_noreply(internal_commit(From, TxnKey, State)); handle_call(force_snapshot, _From, State) -> do_reply(ok, flush(true, State)); -handle_call({fetch_content, QName}, _From, State = - #pstate{recovered_content = RC}) -> - List = case dict:find(QName, RC) of - {ok, Content} -> Content; - error -> [] - end, - do_reply(List, State#pstate{recovered_content = dict:erase(QName, RC)}); +handle_call({queue_content, QName}, _From, + State = #pstate{snapshot = #psnapshot{messages = Messages, + queues = Queues}}) -> + MatchSpec= [{{{QName,'$1'}, '$2', '$3'}, [], [{{'$3', '$1', '$2'}}]}], + do_reply([{ets:lookup_element(Messages, K, 2), D} || + {_, K, D} <- lists:sort(ets:select(Queues, MatchSpec))], + State); handle_call(_Request, _From, State) -> {noreply, State}. @@ -353,10 +351,10 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts, next_seq_id = NextSeqId}) -> %% Avoid infinite growth of the table by removing messages not %% bound to a queue anymore - prune_table(Messages, ets:foldl( - fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> - sets:add_element(PKey, S) - end, sets:new(), Queues)), + PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> + sets:add_element(PKey, S) + end, sets:new(), Queues), + prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end), InnerSnapshot = {{txns, Ts}, {messages, ets:tab2list(Messages)}, {queues, ets:tab2list(Queues)}, @@ -365,18 +363,18 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts, {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, term_to_binary(InnerSnapshot)}. -prune_table(Tab, Keys) -> +prune_table(Tab, Pred) -> true = ets:safe_fixtable(Tab, true), - ok = prune_table(Tab, Keys, ets:first(Tab)), + ok = prune_table(Tab, Pred, ets:first(Tab)), true = ets:safe_fixtable(Tab, false). -prune_table(_Tab, _Keys, '$end_of_table') -> ok; -prune_table(Tab, Keys, Key) -> - case sets:is_element(Key, Keys) of +prune_table(_Tab, _Pred, '$end_of_table') -> ok; +prune_table(Tab, Pred, Key) -> + case Pred(Key) of true -> ok; false -> ets:delete(Tab, Key) end, - prune_table(Tab, Keys, ets:next(Tab, Key)). + prune_table(Tab, Pred, ets:next(Tab, Key)). internal_load_snapshot(LogHandle, DurableQueues, @@ -393,14 +391,19 @@ internal_load_snapshot(LogHandle, Snapshot#psnapshot{ transactions = Ts, next_seq_id = NextSeqId}), - {RecoveredContent, Snapshot2} = - recover_messages(DurableQueues, Snapshot1), + %% Remove all entries for queues that no longer exist. + %% Note that the 'messages' table is pruned when the next + %% snapshot is taken. + DurableQueuesSet = sets:from_list(DurableQueues), + prune_table(Snapshot1#psnapshot.queues, + fun ({QName, _PKey}) -> + sets:is_element(QName, DurableQueuesSet) + end), %% uncompleted transactions are discarded - this is TRTTD %% since we only get into this code on node restart, so %% any uncompleted transactions will have been aborted. - {ok, RecoveredContent, - Snapshot2#psnapshot{transactions = dict:new()}}; - {error, Reason} -> {{error, Reason}, dict:new(), Snapshot} + {ok, Snapshot1#psnapshot{transactions = dict:new()}}; + {error, Reason} -> {{error, Reason}, Snapshot} end. check_version({persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, @@ -411,50 +414,6 @@ check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) -> check_version(_Other) -> {error, unrecognised_persister_log_format}. -recover_messages(DurableQueues, Snapshot = #psnapshot{messages = Messages, - queues = Queues}) -> - DurableQueuesSet = sets:from_list(DurableQueues), - Work = ets:foldl( - fun ({{QName, PKey}, Delivered, SeqId}, Acc) -> - case sets:is_element(QName, DurableQueuesSet) of - true -> - rabbit_misc:dict_cons( - QName, {SeqId, PKey, Delivered}, Acc); - false -> - Acc - end - end, dict:new(), Queues), - {L, RecoveredContent} = - lists:foldl( - fun ({Recovered, {QName, Msgs}}, {L, Dict}) -> - {Recovered ++ L, dict:store(QName, Msgs, Dict)} - end, {[], dict:new()}, - %% unstable parallel map, because order doesn't matter - rabbit_misc:upmap( - %% we do as much work as possible in spawned worker - %% processes, but we need to make sure the ets:inserts are - %% performed in self() - fun ({QName, Requeues}) -> - recover(QName, Requeues, Messages) - end, dict:to_list(Work))), - NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L], - NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L], - ets:delete_all_objects(Messages), - ets:delete_all_objects(Queues), - true = ets:insert(Messages, NewMessages), - true = ets:insert(Queues, NewQueues), - %% contains the mutated messages and queues tables - {RecoveredContent, Snapshot}. - -recover(QName, Requeues, Messages) -> - RecoveredMessages = - lists:sort([{SeqId, QName, PKey, Message, Delivered} || - {SeqId, PKey, Delivered} <- Requeues, - {_, Message} <- ets:lookup(Messages, PKey)]), - {RecoveredMessages, {QName, [{Message, Delivered} || - {_, _, _, Message, Delivered} - <- RecoveredMessages]}}. - replay([], LogHandle, K, Snapshot) -> case disk_log:chunk(LogHandle, K) of {K1, Items} -> |
