summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-21 15:53:33 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-21 15:53:33 +0100
commit92e5da9c6f30dafdf3ece603f0563a2c4866f66f (patch)
treea570f7447417639dd5547cec623ed87616f3c1b4
parent6044c25d7d3aa6aabcd269ebed7f402eab29735d (diff)
downloadrabbitmq-server-git-92e5da9c6f30dafdf3ece603f0563a2c4866f66f.tar.gz
Added the old persister (slightly mutilated) and an invariable queue as a backing queue which uses it. Passes all the tests.
-rw-r--r--src/rabbit_invariable_queue.erl284
-rw-r--r--src/rabbit_persister.erl542
-rw-r--r--src/rabbit_tests.erl14
3 files changed, 837 insertions, 3 deletions
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
new file mode 100644
index 0000000000..5620fab3d7
--- /dev/null
+++ b/src/rabbit_invariable_queue.erl
@@ -0,0 +1,284 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_invariable_queue).
+
+-export([init/2, terminate/1, delete_and_terminate/1, purge/1, publish/2,
+ publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3,
+ tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1,
+ set_ram_duration_target/2, ram_duration/1, sync_callback/1,
+ handle_pre_hibernate/1, status/1]).
+
+-export([start/1]).
+
+-behaviour(rabbit_backing_queue).
+
+-include("rabbit.hrl").
+
+-record(iv_state, { queue, qname, len, pending_ack }).
+-record(tx, { pending_messages, pending_acks, is_persistent }).
+
+-ifdef(use_specs).
+
+-type(ack() :: guid() | 'blank_ack').
+-type(state() :: #iv_state { queue :: queue(),
+ qname :: queue_name(),
+ len :: non_neg_integer(),
+ pending_ack :: dict()
+ }).
+-include("rabbit_backing_queue_spec.hrl").
+
+-endif.
+
+start(DurableQueues) ->
+ ok = rabbit_sup:start_child(rabbit_persister, [DurableQueues]).
+
+init(QName, IsDurable) ->
+ List = case IsDurable of
+ true -> rabbit_persister:fetch_content(QName);
+ false -> []
+ end,
+ Q = queue:from_list(List),
+ #iv_state { queue = Q, qname = QName, len = queue:len(Q),
+ pending_ack = dict:new() }.
+
+terminate(State) ->
+ State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }.
+
+delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA }) ->
+ ok = persist_acks(none, QName, dict:fetch_keys(PA), PA),
+ {_PLen, State1} = purge(State),
+ terminate(State1).
+
+purge(State = #iv_state { len = Len, queue = Q, qname = QName }) ->
+ %% We do not purge messages pending acks.
+ {AckTags, PA} =
+ rabbit_misc:queue_fold(
+ fun ({#basic_message { is_persistent = false }, _IsDelivered}, Acc) ->
+ Acc;
+ ({Msg = #basic_message { guid = Guid }, IsDelivered},
+ {AckTagsN, PAN}) ->
+ ok = persist_delivery(QName, Msg, IsDelivered),
+ {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)}
+ end, {[], dict:new()}, Q),
+ ok = persist_acks(none, QName, AckTags, PA),
+ {Len, State #iv_state { len = 0, queue = queue:new() }}.
+
+publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len }) ->
+ ok = persist_message(none, QName, Msg),
+ State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }.
+
+publish_delivered(false, _Msg, State) ->
+ {blank_ack, State};
+publish_delivered(true, Msg = #basic_message { guid = Guid },
+ State = #iv_state { qname = QName, len = 0,
+ pending_ack = PA }) ->
+ ok = persist_message(none, QName, Msg),
+ ok = persist_delivery(QName, Msg, false),
+ {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}.
+
+fetch(_AckRequired, State = #iv_state { len = 0 }) ->
+ {empty, State};
+fetch(AckRequired, State = #iv_state { queue = Q, qname = QName, len = Len,
+ pending_ack = PA }) ->
+ {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} =
+ queue:out(Q),
+ Len1 = Len - 1,
+ ok = persist_delivery(QName, Msg, IsDelivered),
+ PA1 = dict:store(Guid, Msg, PA),
+ {AckTag, PA2} = case AckRequired of
+ true -> {Guid, PA1};
+ false -> ok = persist_acks(none, QName, [Guid], PA1),
+ {blank_ack, PA}
+ end,
+ {{Msg, IsDelivered, AckTag, Len1},
+ State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}.
+
+ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA }) ->
+ ok = persist_acks(none, QName, AckTags, PA),
+ PA1 = remove_acks(AckTags, PA),
+ State #iv_state { pending_ack = PA1 }.
+
+tx_publish(Txn, Msg, State = #iv_state { qname = QName }) ->
+ publish_in_tx(Txn, Msg),
+ ok = persist_message(Txn, QName, Msg),
+ State.
+
+tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA }) ->
+ ack_in_tx(Txn, AckTags),
+ ok = persist_acks(Txn, QName, AckTags, PA),
+ State.
+
+tx_rollback(Txn, State = #iv_state { qname = QName }) ->
+ #tx { pending_acks = AckTags } = lookup_tx(Txn),
+ ok = rollback_work(Txn, QName),
+ erase_tx(Txn),
+ {lists:flatten(AckTags), State}.
+
+tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA,
+ queue = Q, len = Len }) ->
+ #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn),
+ ok = commit_work(Txn, QName),
+ erase_tx(Txn),
+ Fun(),
+ AckTags1 = lists:flatten(AckTags),
+ PA1 = remove_acks(AckTags1, PA),
+ {Q1, Len1} = lists:foldr(fun (Msg, {QN, LenN}) ->
+ {queue:in({Msg, false}, QN), LenN + 1}
+ end, {Q, Len}, PubsRev),
+ {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}.
+
+requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q,
+ len = Len }) ->
+ %% We don't need to touch the persister here - the persister will
+ %% already have these messages published and delivered as
+ %% necessary. The complication is that the persister's seq_id will
+ %% now be wrong, given the position of these messages in our queue
+ %% here. However, the persister's seq_id is only used for sorting
+ %% on startup, and requeue is silent as to where the requeued
+ %% messages should appear, thus the persister is permitted to sort
+ %% based on seq_id, even though it'll likely give a different
+ %% order to the last known state of our queue, prior to shutdown.
+ {Q1, PA1, Len1} =
+ lists:foldl(
+ fun (Guid, {QN, PAN, LenN}) ->
+ {ok, Msg = #basic_message {}} = dict:find(Guid, PAN),
+ {queue:in({Msg, true}, QN), dict:erase(Guid, PAN), LenN + 1}
+ end, {Q, PA, Len}, AckTags),
+ State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }.
+
+len(#iv_state { len = Len }) ->
+ Len.
+
+is_empty(State) ->
+ 0 == len(State).
+
+set_ram_duration_target(_DurationTarget, State) ->
+ %% HA!
+ State.
+
+ram_duration(State) ->
+ {0, State}.
+
+sync_callback(_State) ->
+ undefined.
+
+handle_pre_hibernate(State) ->
+ State.
+
+status(_State) ->
+ [].
+
+%%----------------------------------------------------------------------------
+
+remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags).
+
+%%----------------------------------------------------------------------------
+
+lookup_tx(Txn) ->
+ case get({txn, Txn}) of
+ undefined -> #tx { pending_messages = [],
+ pending_acks = [],
+ is_persistent = false };
+ V -> V
+ end.
+
+store_tx(Txn, Tx) ->
+ put({txn, Txn}, Tx).
+
+erase_tx(Txn) ->
+ erase({txn, Txn}).
+
+mark_tx_persistent(Txn) ->
+ store_tx(Txn, (lookup_tx(Txn)) #tx { is_persistent = true }).
+
+is_tx_persistent(Txn) ->
+ (lookup_tx(Txn)) #tx.is_persistent.
+
+do_if_persistent(F, Txn, QName) ->
+ ok = case is_tx_persistent(Txn) of
+ false -> ok;
+ true -> F({Txn, QName})
+ end.
+
+publish_in_tx(Txn, Msg) ->
+ Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
+ store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }).
+
+ack_in_tx(Txn, AckTags) ->
+ Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
+ store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }).
+
+%%----------------------------------------------------------------------------
+
+persist_message(_Txn, _QName, #basic_message { is_persistent = false }) ->
+ ok;
+persist_message(Txn, QName, Msg) ->
+ Msg1 = Msg #basic_message {
+ %% don't persist any recoverable decoded properties,
+ %% rebuild from properties_bin on restore
+ content = rabbit_binary_parser:clear_decoded_content(
+ Msg #basic_message.content)},
+ persist_work(Txn, QName,
+ [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]).
+
+persist_delivery(_QName, #basic_message { is_persistent = false },
+ _IsDelivered) ->
+ ok;
+persist_delivery(_QName, _Message, true) ->
+ ok;
+persist_delivery(QName, #basic_message { guid = Guid }, _IsDelivered) ->
+ persist_work(none, QName, [{deliver, {QName, Guid}}]).
+
+persist_acks(Txn, QName, AckTags, PA) ->
+ persist_work(Txn, QName,
+ [{ack, {QName, Guid}} ||
+ Guid <- AckTags,
+ case dict:find(Guid, PA) of
+ {ok, #basic_message { is_persistent = true }} -> true;
+ _ -> false
+ end]).
+
+persist_work(_Txn,_QName, []) ->
+ ok;
+persist_work(none, _QName, WorkList) ->
+ rabbit_persister:dirty_work(WorkList);
+persist_work(Txn, QName, WorkList) ->
+ mark_tx_persistent(Txn),
+ rabbit_persister:extend_transaction({Txn, QName}, WorkList).
+
+commit_work(Txn, QName) ->
+ do_if_persistent(fun rabbit_persister:commit_transaction/1,
+ Txn, QName).
+
+rollback_work(Txn, QName) ->
+ do_if_persistent(fun rabbit_persister:rollback_transaction/1,
+ Txn, QName).
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
new file mode 100644
index 0000000000..3235a837e8
--- /dev/null
+++ b/src/rabbit_persister.erl
@@ -0,0 +1,542 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_persister).
+
+-behaviour(gen_server).
+
+-export([start_link/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([transaction/1, extend_transaction/2, dirty_work/1,
+ commit_transaction/1, rollback_transaction/1,
+ force_snapshot/0, serial/0, fetch_content/1]).
+
+-include("rabbit.hrl").
+
+-define(SERVER, ?MODULE).
+
+-define(LOG_BUNDLE_DELAY, 5).
+-define(COMPLETE_BUNDLE_DELAY, 2).
+
+-define(HIBERNATE_AFTER, 10000).
+
+-define(MAX_WRAP_ENTRIES, 500).
+
+-define(PERSISTER_LOG_FORMAT_VERSION, {2, 5}).
+
+-record(pstate, {log_handle, entry_count, deadline,
+ pending_logs, pending_replies,
+ snapshot, recovered_content}).
+
+%% two tables for efficient persistency
+%% one maps a key to a message
+%% the other maps a key to one or more queues.
+%% The aim is to reduce the overload of storing a message multiple times
+%% when it appears in several queues.
+-record(psnapshot, {serial, transactions, messages, queues, next_seq_id}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(pmsg() :: {queue_name(), pkey()}).
+-type(work_item() ::
+ {publish, message(), pmsg()} |
+ {deliver, pmsg()} |
+ {ack, pmsg()}).
+
+-spec(start_link/1 :: ([queue_name()]) ->
+ {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(transaction/1 :: ([work_item()]) -> 'ok').
+-spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok').
+-spec(dirty_work/1 :: ([work_item()]) -> 'ok').
+-spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
+-spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
+-spec(force_snapshot/0 :: () -> 'ok').
+-spec(serial/0 :: () -> non_neg_integer()).
+-spec(fetch_content/1 :: (queue_name()) -> [{message(), boolean()}]).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link(DurableQueues) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [DurableQueues], []).
+
+transaction(MessageList) ->
+ ?LOGDEBUG("transaction ~p~n", [MessageList]),
+ TxnKey = rabbit_guid:guid(),
+ gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity).
+
+extend_transaction(TxnKey, MessageList) ->
+ ?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]),
+ gen_server:cast(?SERVER, {extend_transaction, TxnKey, MessageList}).
+
+dirty_work(MessageList) ->
+ ?LOGDEBUG("dirty_work ~p~n", [MessageList]),
+ gen_server:cast(?SERVER, {dirty_work, MessageList}).
+
+commit_transaction(TxnKey) ->
+ ?LOGDEBUG("commit_transaction ~p~n", [TxnKey]),
+ gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity).
+
+rollback_transaction(TxnKey) ->
+ ?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]),
+ gen_server:cast(?SERVER, {rollback_transaction, TxnKey}).
+
+force_snapshot() ->
+ gen_server:call(?SERVER, force_snapshot, infinity).
+
+serial() ->
+ gen_server:call(?SERVER, serial, infinity).
+
+fetch_content(QName) ->
+ gen_server:call(?SERVER, {fetch_content, QName}, infinity).
+
+%%--------------------------------------------------------------------
+
+init([DurableQueues]) ->
+ process_flag(trap_exit, true),
+ FileName = base_filename(),
+ ok = filelib:ensure_dir(FileName),
+ Snapshot = #psnapshot{serial = 0,
+ transactions = dict:new(),
+ messages = ets:new(messages, []),
+ queues = ets:new(queues, []),
+ next_seq_id = 0},
+ LogHandle =
+ case disk_log:open([{name, rabbit_persister},
+ {head, current_snapshot(Snapshot)},
+ {file, FileName}]) of
+ {ok, LH} -> LH;
+ {repaired, LH, {recovered, Recovered}, {badbytes, Bad}} ->
+ WarningFun = if
+ Bad > 0 -> fun rabbit_log:warning/2;
+ true -> fun rabbit_log:info/2
+ end,
+ WarningFun("Repaired persister log - ~p recovered, ~p bad~n",
+ [Recovered, Bad]),
+ LH
+ end,
+ {Res, RecoveredContent, LoadedSnapshot} =
+ internal_load_snapshot(LogHandle, DurableQueues, Snapshot),
+ NewSnapshot = LoadedSnapshot#psnapshot{
+ serial = LoadedSnapshot#psnapshot.serial + 1},
+ case Res of
+ ok ->
+ ok = take_snapshot(LogHandle, NewSnapshot);
+ {error, Reason} ->
+ rabbit_log:error("Failed to load persister log: ~p~n", [Reason]),
+ ok = take_snapshot_and_save_old(LogHandle, NewSnapshot)
+ end,
+ State = #pstate{log_handle = LogHandle,
+ entry_count = 0,
+ deadline = infinity,
+ pending_logs = [],
+ pending_replies = [],
+ snapshot = NewSnapshot,
+ recovered_content = RecoveredContent},
+ {ok, State}.
+
+handle_call({transaction, Key, MessageList}, From, State) ->
+ NewState = internal_extend(Key, MessageList, State),
+ do_noreply(internal_commit(From, Key, NewState));
+handle_call({commit_transaction, TxnKey}, From, State) ->
+ do_noreply(internal_commit(From, TxnKey, State));
+handle_call(force_snapshot, _From, State) ->
+ do_reply(ok, flush(true, State));
+handle_call(serial, _From,
+ State = #pstate{snapshot = #psnapshot{serial = Serial}}) ->
+ do_reply(Serial, State);
+handle_call({fetch_content, QName}, _From, State =
+ #pstate{recovered_content = RC}) ->
+ List = case dict:find(QName, RC) of
+ {ok, Content} -> Content;
+ error -> []
+ end,
+ do_reply(List, State#pstate{recovered_content = dict:erase(QName, RC)});
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast({rollback_transaction, TxnKey}, State) ->
+ do_noreply(internal_rollback(TxnKey, State));
+handle_cast({dirty_work, MessageList}, State) ->
+ do_noreply(internal_dirty_work(MessageList, State));
+handle_cast({extend_transaction, TxnKey, MessageList}, State) ->
+ do_noreply(internal_extend(TxnKey, MessageList, State));
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info(timeout, State = #pstate{deadline = infinity}) ->
+ State1 = flush(true, State),
+ %% TODO: Once we drop support for R11B-5, we can change this to
+ %% {noreply, State1, hibernate};
+ proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]);
+handle_info(timeout, State) ->
+ do_noreply(flush(State));
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, State = #pstate{log_handle = LogHandle}) ->
+ flush(State),
+ disk_log:close(LogHandle),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, flush(State)}.
+
+%%--------------------------------------------------------------------
+
+internal_extend(Key, MessageList, State) ->
+ log_work(fun (ML) -> {extend_transaction, Key, ML} end,
+ MessageList, State).
+
+internal_dirty_work(MessageList, State) ->
+ log_work(fun (ML) -> {dirty_work, ML} end,
+ MessageList, State).
+
+internal_commit(From, Key, State = #pstate{snapshot = Snapshot}) ->
+ Unit = {commit_transaction, Key},
+ NewSnapshot = internal_integrate1(Unit, Snapshot),
+ complete(From, Unit, State#pstate{snapshot = NewSnapshot}).
+
+internal_rollback(Key, State = #pstate{snapshot = Snapshot}) ->
+ Unit = {rollback_transaction, Key},
+ NewSnapshot = internal_integrate1(Unit, Snapshot),
+ log(State#pstate{snapshot = NewSnapshot}, Unit).
+
+complete(From, Item, State = #pstate{deadline = ExistingDeadline,
+ pending_logs = Logs,
+ pending_replies = Waiting}) ->
+ State#pstate{deadline = compute_deadline(
+ ?COMPLETE_BUNDLE_DELAY, ExistingDeadline),
+ pending_logs = [Item | Logs],
+ pending_replies = [From | Waiting]}.
+
+%% This is made to limit disk usage by writing messages only once onto
+%% disk. We keep a table associating pkeys to messages, and provided
+%% the list of messages to output is left to right, we can guarantee
+%% that pkeys will be a backreference to a message in memory when a
+%% "tied" is met.
+log_work(CreateWorkUnit, MessageList,
+ State = #pstate{
+ snapshot = Snapshot = #psnapshot{messages = Messages}}) ->
+ Unit = CreateWorkUnit(
+ rabbit_misc:map_in_order(
+ fun(M = {publish, Message, QK = {_QName, PKey}}) ->
+ case ets:lookup(Messages, PKey) of
+ [_] -> {tied, QK};
+ [] -> ets:insert(Messages, {PKey, Message}),
+ M
+ end;
+ (M) -> M
+ end,
+ MessageList)),
+ NewSnapshot = internal_integrate1(Unit, Snapshot),
+ log(State#pstate{snapshot = NewSnapshot}, Unit).
+
+log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs},
+ Message) ->
+ State#pstate{deadline = compute_deadline(?LOG_BUNDLE_DELAY,
+ ExistingDeadline),
+ pending_logs = [Message | Logs]}.
+
+base_filename() ->
+ rabbit_mnesia:dir() ++ "/rabbit_persister.LOG".
+
+take_snapshot(LogHandle, OldFileName, Snapshot) ->
+ ok = disk_log:sync(LogHandle),
+ %% current_snapshot is the Head (ie. first thing logged)
+ ok = disk_log:reopen(LogHandle, OldFileName, current_snapshot(Snapshot)).
+
+take_snapshot(LogHandle, Snapshot) ->
+ OldFileName = lists:flatten(base_filename() ++ ".previous"),
+ file:delete(OldFileName),
+ rabbit_log:info("Rolling persister log to ~p~n", [OldFileName]),
+ ok = take_snapshot(LogHandle, OldFileName, Snapshot).
+
+take_snapshot_and_save_old(LogHandle, Snapshot) ->
+ {MegaSecs, Secs, MicroSecs} = erlang:now(),
+ Timestamp = MegaSecs * 1000000 + Secs * 1000 + MicroSecs,
+ OldFileName = lists:flatten(io_lib:format("~s.saved.~p",
+ [base_filename(), Timestamp])),
+ rabbit_log:info("Saving persister log in ~p~n", [OldFileName]),
+ ok = take_snapshot(LogHandle, OldFileName, Snapshot).
+
+maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount,
+ log_handle = LH,
+ snapshot = Snapshot})
+ when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES ->
+ ok = take_snapshot(LH, Snapshot),
+ State#pstate{entry_count = 0};
+maybe_take_snapshot(_Force, State) ->
+ State.
+
+later_ms(DeltaMilliSec) ->
+ {MegaSec, Sec, MicroSec} = now(),
+ %% Note: not normalised. Unimportant for this application.
+ {MegaSec, Sec, MicroSec + (DeltaMilliSec * 1000)}.
+
+%% Result = B - A, more or less
+time_diff({B1, B2, B3}, {A1, A2, A3}) ->
+ (B1 - A1) * 1000000 + (B2 - A2) + (B3 - A3) / 1000000.0 .
+
+compute_deadline(TimerDelay, infinity) ->
+ later_ms(TimerDelay);
+compute_deadline(_TimerDelay, ExistingDeadline) ->
+ ExistingDeadline.
+
+compute_timeout(infinity) ->
+ ?HIBERNATE_AFTER;
+compute_timeout(Deadline) ->
+ DeltaMilliSec = time_diff(Deadline, now()) * 1000.0,
+ if
+ DeltaMilliSec =< 1 ->
+ 0;
+ true ->
+ round(DeltaMilliSec)
+ end.
+
+do_noreply(State = #pstate{deadline = Deadline}) ->
+ {noreply, State, compute_timeout(Deadline)}.
+
+do_reply(Reply, State = #pstate{deadline = Deadline}) ->
+ {reply, Reply, State, compute_timeout(Deadline)}.
+
+flush(State) -> flush(false, State).
+
+flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs,
+ pending_replies = Waiting,
+ log_handle = LogHandle}) ->
+ State1 = if PendingLogs /= [] ->
+ disk_log:alog(LogHandle, lists:reverse(PendingLogs)),
+ State#pstate{entry_count = State#pstate.entry_count + 1};
+ true ->
+ State
+ end,
+ State2 = maybe_take_snapshot(ForceSnapshot, State1),
+ if Waiting /= [] ->
+ ok = disk_log:sync(LogHandle),
+ lists:foreach(fun (From) -> gen_server:reply(From, ok) end,
+ Waiting);
+ true ->
+ ok
+ end,
+ State2#pstate{deadline = infinity,
+ pending_logs = [],
+ pending_replies = []}.
+
+current_snapshot(_Snapshot = #psnapshot{serial = Serial,
+ transactions = Ts,
+ messages = Messages,
+ queues = Queues,
+ next_seq_id = NextSeqId}) ->
+ %% Avoid infinite growth of the table by removing messages not
+ %% bound to a queue anymore
+ prune_table(Messages, ets:foldl(
+ fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
+ sets:add_element(PKey, S)
+ end, sets:new(), Queues)),
+ InnerSnapshot = {{serial, Serial},
+ {txns, Ts},
+ {messages, ets:tab2list(Messages)},
+ {queues, ets:tab2list(Queues)},
+ {next_seq_id, NextSeqId}},
+ ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]),
+ {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
+ term_to_binary(InnerSnapshot)}.
+
+prune_table(Tab, Keys) ->
+ true = ets:safe_fixtable(Tab, true),
+ ok = prune_table(Tab, Keys, ets:first(Tab)),
+ true = ets:safe_fixtable(Tab, false).
+
+prune_table(_Tab, _Keys, '$end_of_table') -> ok;
+prune_table(Tab, Keys, Key) ->
+ case sets:is_element(Key, Keys) of
+ true -> ok;
+ false -> ets:delete(Tab, Key)
+ end,
+ prune_table(Tab, Keys, ets:next(Tab, Key)).
+
+internal_load_snapshot(LogHandle,
+ DurableQueues,
+ Snapshot = #psnapshot{messages = Messages,
+ queues = Queues}) ->
+ {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
+ case check_version(Loaded_Snapshot) of
+ {ok, StateBin} ->
+ {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs},
+ {next_seq_id, NextSeqId}} = binary_to_term(StateBin),
+ true = ets:insert(Messages, Ms),
+ true = ets:insert(Queues, Qs),
+ Snapshot1 = replay(Items, LogHandle, K,
+ Snapshot#psnapshot{
+ serial = Serial,
+ transactions = Ts,
+ next_seq_id = NextSeqId}),
+ {RecoveredContent, Snapshot2} =
+ recover_messages(DurableQueues, Snapshot1),
+ %% uncompleted transactions are discarded - this is TRTTD
+ %% since we only get into this code on node restart, so
+ %% any uncompleted transactions will have been aborted.
+ {ok, RecoveredContent,
+ Snapshot2#psnapshot{transactions = dict:new()}};
+ {error, Reason} -> {{error, Reason}, dict:new(), Snapshot}
+ end.
+
+check_version({persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
+ StateBin}) ->
+ {ok, StateBin};
+check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) ->
+ {error, {unsupported_persister_log_format, Vsn}};
+check_version(_Other) ->
+ {error, unrecognised_persister_log_format}.
+
+recover_messages(DurableQueues, Snapshot = #psnapshot{messages = Messages,
+ queues = Queues}) ->
+ DurableQueuesSet = sets:from_list(DurableQueues),
+ Work = ets:foldl(
+ fun ({{QName, PKey}, Delivered, SeqId}, Acc) ->
+ case sets:is_element(QName, DurableQueuesSet) of
+ true ->
+ rabbit_misc:dict_cons(
+ QName, {SeqId, PKey, Delivered}, Acc);
+ false ->
+ Acc
+ end
+ end, dict:new(), Queues),
+ {L, RecoveredContent} =
+ lists:foldl(
+ fun ({Recovered, {QName, Msgs}}, {L, Dict}) ->
+ {Recovered ++ L, dict:store(QName, Msgs, Dict)}
+ end, {[], dict:new()},
+ %% unstable parallel map, because order doesn't matter
+ rabbit_misc:upmap(
+ %% we do as much work as possible in spawned worker
+ %% processes, but we need to make sure the ets:inserts are
+ %% performed in self()
+ fun ({QName, Requeues}) ->
+ recover(QName, Requeues, Messages)
+ end, dict:to_list(Work))),
+ NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L],
+ NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L],
+ ets:delete_all_objects(Messages),
+ ets:delete_all_objects(Queues),
+ true = ets:insert(Messages, NewMessages),
+ true = ets:insert(Queues, NewQueues),
+ %% contains the mutated messages and queues tables
+ {RecoveredContent, Snapshot}.
+
+recover(QName, Requeues, Messages) ->
+ RecoveredMessages =
+ lists:sort([{SeqId, QName, PKey, Message, Delivered} ||
+ {SeqId, PKey, Delivered} <- Requeues,
+ {_, Message} <- ets:lookup(Messages, PKey)]),
+ {RecoveredMessages, {QName, [{Message, Delivered} ||
+ {_, _, _, Message, Delivered}
+ <- RecoveredMessages]}}.
+
+replay([], LogHandle, K, Snapshot) ->
+ case disk_log:chunk(LogHandle, K) of
+ {K1, Items} ->
+ replay(Items, LogHandle, K1, Snapshot);
+ {K1, Items, Badbytes} ->
+ rabbit_log:warning("~p bad bytes recovering persister log~n",
+ [Badbytes]),
+ replay(Items, LogHandle, K1, Snapshot);
+ eof -> Snapshot
+ end;
+replay([Item | Items], LogHandle, K, Snapshot) ->
+ NewSnapshot = internal_integrate_messages(Item, Snapshot),
+ replay(Items, LogHandle, K, NewSnapshot).
+
+internal_integrate_messages(Items, Snapshot) ->
+ lists:foldl(fun (Item, Snap) -> internal_integrate1(Item, Snap) end,
+ Snapshot, Items).
+
+internal_integrate1({extend_transaction, Key, MessageList},
+ Snapshot = #psnapshot {transactions = Transactions}) ->
+ Snapshot#psnapshot{transactions = rabbit_misc:dict_cons(Key, MessageList,
+ Transactions)};
+internal_integrate1({rollback_transaction, Key},
+ Snapshot = #psnapshot{transactions = Transactions}) ->
+ Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
+internal_integrate1({commit_transaction, Key},
+ Snapshot = #psnapshot{transactions = Transactions,
+ messages = Messages,
+ queues = Queues,
+ next_seq_id = SeqId}) ->
+ case dict:find(Key, Transactions) of
+ {ok, MessageLists} ->
+ ?LOGDEBUG("persist committing txn ~p~n", [Key]),
+ NextSeqId =
+ lists:foldr(
+ fun (ML, SeqIdN) ->
+ perform_work(ML, Messages, Queues, SeqIdN) end,
+ SeqId, MessageLists),
+ Snapshot#psnapshot{transactions = dict:erase(Key, Transactions),
+ next_seq_id = NextSeqId};
+ error ->
+ Snapshot
+ end;
+internal_integrate1({dirty_work, MessageList},
+ Snapshot = #psnapshot{messages = Messages,
+ queues = Queues,
+ next_seq_id = SeqId}) ->
+ Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages,
+ Queues, SeqId)}.
+
+perform_work(MessageList, Messages, Queues, SeqId) ->
+ lists:foldl(fun (Item, NextSeqId) ->
+ perform_work_item(Item, Messages, Queues, NextSeqId)
+ end, SeqId, MessageList).
+
+perform_work_item({publish, Message, QK = {_QName, PKey}},
+ Messages, Queues, NextSeqId) ->
+ true = ets:insert(Messages, {PKey, Message}),
+ true = ets:insert(Queues, {QK, false, NextSeqId}),
+ NextSeqId + 1;
+
+perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:insert(Queues, {QK, false, NextSeqId}),
+ NextSeqId + 1;
+
+perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:update_element(Queues, QK, {2, true}),
+ NextSeqId;
+
+perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:delete(Queues, QK),
+ NextSeqId.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 4bef843596..6b5c73bb90 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -54,9 +54,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion
all_tests() ->
- passed = test_msg_store(),
- passed = test_queue_index(),
- passed = test_variable_queue(),
+ passed = test_backing_queue(),
passed = test_priority_queue(),
passed = test_bpqueue(),
passed = test_pg_local(),
@@ -994,6 +992,16 @@ bad_handle_hook(_, _, _) ->
extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) ->
handle_hook(Hookname, Handler, {Args, Extra1, Extra2}).
+test_backing_queue() ->
+ case application:get_env(backing_queue_module) of
+ {ok, rabbit_variable_queue} ->
+ passed = test_msg_store(),
+ passed = test_queue_index(),
+ passed = test_variable_queue();
+ _ ->
+ passed
+ end.
+
start_msg_store_empty() ->
start_msg_store(fun (ok) -> finished end, ok).