summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_invariable_queue.erl9
-rw-r--r--src/rabbit_persister.erl111
2 files changed, 39 insertions, 81 deletions
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 5620fab3d7..7765069fb9 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -62,11 +62,10 @@ start(DurableQueues) ->
ok = rabbit_sup:start_child(rabbit_persister, [DurableQueues]).
init(QName, IsDurable) ->
- List = case IsDurable of
- true -> rabbit_persister:fetch_content(QName);
- false -> []
- end,
- Q = queue:from_list(List),
+ Q = queue:from_list(case IsDurable of
+ true -> rabbit_persister:queue_content(QName);
+ false -> []
+ end),
#iv_state { queue = Q, qname = QName, len = queue:len(Q),
pending_ack = dict:new() }.
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index f16b7a33cd..ba9430fe2e 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -40,7 +40,7 @@
-export([transaction/1, extend_transaction/2, dirty_work/1,
commit_transaction/1, rollback_transaction/1,
- force_snapshot/0, fetch_content/1]).
+ force_snapshot/0, queue_content/1]).
-include("rabbit.hrl").
@@ -56,8 +56,7 @@
-define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}).
-record(pstate, {log_handle, entry_count, deadline,
- pending_logs, pending_replies,
- snapshot, recovered_content}).
+ pending_logs, pending_replies, snapshot}).
%% two tables for efficient persistency
%% one maps a key to a message
@@ -84,7 +83,7 @@
-spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(force_snapshot/0 :: () -> 'ok').
--spec(fetch_content/1 :: (queue_name()) -> [{message(), boolean()}]).
+-spec(queue_content/1 :: (queue_name()) -> [{message(), boolean()}]).
-endif.
@@ -117,8 +116,8 @@ rollback_transaction(TxnKey) ->
force_snapshot() ->
gen_server:call(?SERVER, force_snapshot, infinity).
-fetch_content(QName) ->
- gen_server:call(?SERVER, {fetch_content, QName}, infinity).
+queue_content(QName) ->
+ gen_server:call(?SERVER, {queue_content, QName}, infinity).
%%--------------------------------------------------------------------
@@ -128,7 +127,7 @@ init([DurableQueues]) ->
ok = filelib:ensure_dir(FileName),
Snapshot = #psnapshot{transactions = dict:new(),
messages = ets:new(messages, []),
- queues = ets:new(queues, []),
+ queues = ets:new(queues, [ordered_set]),
next_seq_id = 0},
LogHandle =
case disk_log:open([{name, rabbit_persister},
@@ -144,7 +143,7 @@ init([DurableQueues]) ->
[Recovered, Bad]),
LH
end,
- {Res, RecoveredContent, NewSnapshot} =
+ {Res, NewSnapshot} =
internal_load_snapshot(LogHandle, DurableQueues, Snapshot),
case Res of
ok ->
@@ -158,8 +157,7 @@ init([DurableQueues]) ->
deadline = infinity,
pending_logs = [],
pending_replies = [],
- snapshot = NewSnapshot,
- recovered_content = RecoveredContent},
+ snapshot = NewSnapshot},
{ok, State}.
handle_call({transaction, Key, MessageList}, From, State) ->
@@ -169,13 +167,13 @@ handle_call({commit_transaction, TxnKey}, From, State) ->
do_noreply(internal_commit(From, TxnKey, State));
handle_call(force_snapshot, _From, State) ->
do_reply(ok, flush(true, State));
-handle_call({fetch_content, QName}, _From, State =
- #pstate{recovered_content = RC}) ->
- List = case dict:find(QName, RC) of
- {ok, Content} -> Content;
- error -> []
- end,
- do_reply(List, State#pstate{recovered_content = dict:erase(QName, RC)});
+handle_call({queue_content, QName}, _From,
+ State = #pstate{snapshot = #psnapshot{messages = Messages,
+ queues = Queues}}) ->
+ MatchSpec= [{{{QName,'$1'}, '$2', '$3'}, [], [{{'$3', '$1', '$2'}}]}],
+ do_reply([{ets:lookup_element(Messages, K, 2), D} ||
+ {_, K, D} <- lists:sort(ets:select(Queues, MatchSpec))],
+ State);
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -353,10 +351,10 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
next_seq_id = NextSeqId}) ->
%% Avoid infinite growth of the table by removing messages not
%% bound to a queue anymore
- prune_table(Messages, ets:foldl(
- fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
- sets:add_element(PKey, S)
- end, sets:new(), Queues)),
+ PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
+ sets:add_element(PKey, S)
+ end, sets:new(), Queues),
+ prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end),
InnerSnapshot = {{txns, Ts},
{messages, ets:tab2list(Messages)},
{queues, ets:tab2list(Queues)},
@@ -365,18 +363,18 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
{persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
term_to_binary(InnerSnapshot)}.
-prune_table(Tab, Keys) ->
+prune_table(Tab, Pred) ->
true = ets:safe_fixtable(Tab, true),
- ok = prune_table(Tab, Keys, ets:first(Tab)),
+ ok = prune_table(Tab, Pred, ets:first(Tab)),
true = ets:safe_fixtable(Tab, false).
-prune_table(_Tab, _Keys, '$end_of_table') -> ok;
-prune_table(Tab, Keys, Key) ->
- case sets:is_element(Key, Keys) of
+prune_table(_Tab, _Pred, '$end_of_table') -> ok;
+prune_table(Tab, Pred, Key) ->
+ case Pred(Key) of
true -> ok;
false -> ets:delete(Tab, Key)
end,
- prune_table(Tab, Keys, ets:next(Tab, Key)).
+ prune_table(Tab, Pred, ets:next(Tab, Key)).
internal_load_snapshot(LogHandle,
DurableQueues,
@@ -393,14 +391,19 @@ internal_load_snapshot(LogHandle,
Snapshot#psnapshot{
transactions = Ts,
next_seq_id = NextSeqId}),
- {RecoveredContent, Snapshot2} =
- recover_messages(DurableQueues, Snapshot1),
+ %% Remove all entries for queues that no longer exist.
+ %% Note that the 'messages' table is pruned when the next
+ %% snapshot is taken.
+ DurableQueuesSet = sets:from_list(DurableQueues),
+ prune_table(Snapshot1#psnapshot.queues,
+ fun ({QName, _PKey}) ->
+ sets:is_element(QName, DurableQueuesSet)
+ end),
%% uncompleted transactions are discarded - this is TRTTD
%% since we only get into this code on node restart, so
%% any uncompleted transactions will have been aborted.
- {ok, RecoveredContent,
- Snapshot2#psnapshot{transactions = dict:new()}};
- {error, Reason} -> {{error, Reason}, dict:new(), Snapshot}
+ {ok, Snapshot1#psnapshot{transactions = dict:new()}};
+ {error, Reason} -> {{error, Reason}, Snapshot}
end.
check_version({persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
@@ -411,50 +414,6 @@ check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) ->
check_version(_Other) ->
{error, unrecognised_persister_log_format}.
-recover_messages(DurableQueues, Snapshot = #psnapshot{messages = Messages,
- queues = Queues}) ->
- DurableQueuesSet = sets:from_list(DurableQueues),
- Work = ets:foldl(
- fun ({{QName, PKey}, Delivered, SeqId}, Acc) ->
- case sets:is_element(QName, DurableQueuesSet) of
- true ->
- rabbit_misc:dict_cons(
- QName, {SeqId, PKey, Delivered}, Acc);
- false ->
- Acc
- end
- end, dict:new(), Queues),
- {L, RecoveredContent} =
- lists:foldl(
- fun ({Recovered, {QName, Msgs}}, {L, Dict}) ->
- {Recovered ++ L, dict:store(QName, Msgs, Dict)}
- end, {[], dict:new()},
- %% unstable parallel map, because order doesn't matter
- rabbit_misc:upmap(
- %% we do as much work as possible in spawned worker
- %% processes, but we need to make sure the ets:inserts are
- %% performed in self()
- fun ({QName, Requeues}) ->
- recover(QName, Requeues, Messages)
- end, dict:to_list(Work))),
- NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L],
- NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L],
- ets:delete_all_objects(Messages),
- ets:delete_all_objects(Queues),
- true = ets:insert(Messages, NewMessages),
- true = ets:insert(Queues, NewQueues),
- %% contains the mutated messages and queues tables
- {RecoveredContent, Snapshot}.
-
-recover(QName, Requeues, Messages) ->
- RecoveredMessages =
- lists:sort([{SeqId, QName, PKey, Message, Delivered} ||
- {SeqId, PKey, Delivered} <- Requeues,
- {_, Message} <- ets:lookup(Messages, PKey)]),
- {RecoveredMessages, {QName, [{Message, Delivered} ||
- {_, _, _, Message, Delivered}
- <- RecoveredMessages]}}.
-
replay([], LogHandle, K, Snapshot) ->
case disk_log:chunk(LogHandle, K) of
{K1, Items} ->