diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-21 15:53:33 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-21 15:53:33 +0100 |
| commit | 92e5da9c6f30dafdf3ece603f0563a2c4866f66f (patch) | |
| tree | a570f7447417639dd5547cec623ed87616f3c1b4 | |
| parent | 6044c25d7d3aa6aabcd269ebed7f402eab29735d (diff) | |
| download | rabbitmq-server-git-92e5da9c6f30dafdf3ece603f0563a2c4866f66f.tar.gz | |
Added the old persister (slightly mutilated) and an invariable queue as a backing queue which uses it. Passes all the tests.
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 284 | ||||
| -rw-r--r-- | src/rabbit_persister.erl | 542 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 14 |
3 files changed, 837 insertions, 3 deletions
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl new file mode 100644 index 0000000000..5620fab3d7 --- /dev/null +++ b/src/rabbit_invariable_queue.erl @@ -0,0 +1,284 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_invariable_queue). + +-export([init/2, terminate/1, delete_and_terminate/1, purge/1, publish/2, + publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, + tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, + set_ram_duration_target/2, ram_duration/1, sync_callback/1, + handle_pre_hibernate/1, status/1]). + +-export([start/1]). + +-behaviour(rabbit_backing_queue). + +-include("rabbit.hrl"). + +-record(iv_state, { queue, qname, len, pending_ack }). +-record(tx, { pending_messages, pending_acks, is_persistent }). + +-ifdef(use_specs). + +-type(ack() :: guid() | 'blank_ack'). +-type(state() :: #iv_state { queue :: queue(), + qname :: queue_name(), + len :: non_neg_integer(), + pending_ack :: dict() + }). +-include("rabbit_backing_queue_spec.hrl"). + +-endif. + +start(DurableQueues) -> + ok = rabbit_sup:start_child(rabbit_persister, [DurableQueues]). + +init(QName, IsDurable) -> + List = case IsDurable of + true -> rabbit_persister:fetch_content(QName); + false -> [] + end, + Q = queue:from_list(List), + #iv_state { queue = Q, qname = QName, len = queue:len(Q), + pending_ack = dict:new() }. + +terminate(State) -> + State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }. + +delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA }) -> + ok = persist_acks(none, QName, dict:fetch_keys(PA), PA), + {_PLen, State1} = purge(State), + terminate(State1). + +purge(State = #iv_state { len = Len, queue = Q, qname = QName }) -> + %% We do not purge messages pending acks. + {AckTags, PA} = + rabbit_misc:queue_fold( + fun ({#basic_message { is_persistent = false }, _IsDelivered}, Acc) -> + Acc; + ({Msg = #basic_message { guid = Guid }, IsDelivered}, + {AckTagsN, PAN}) -> + ok = persist_delivery(QName, Msg, IsDelivered), + {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)} + end, {[], dict:new()}, Q), + ok = persist_acks(none, QName, AckTags, PA), + {Len, State #iv_state { len = 0, queue = queue:new() }}. + +publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len }) -> + ok = persist_message(none, QName, Msg), + State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }. + +publish_delivered(false, _Msg, State) -> + {blank_ack, State}; +publish_delivered(true, Msg = #basic_message { guid = Guid }, + State = #iv_state { qname = QName, len = 0, + pending_ack = PA }) -> + ok = persist_message(none, QName, Msg), + ok = persist_delivery(QName, Msg, false), + {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}. + +fetch(_AckRequired, State = #iv_state { len = 0 }) -> + {empty, State}; +fetch(AckRequired, State = #iv_state { queue = Q, qname = QName, len = Len, + pending_ack = PA }) -> + {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} = + queue:out(Q), + Len1 = Len - 1, + ok = persist_delivery(QName, Msg, IsDelivered), + PA1 = dict:store(Guid, Msg, PA), + {AckTag, PA2} = case AckRequired of + true -> {Guid, PA1}; + false -> ok = persist_acks(none, QName, [Guid], PA1), + {blank_ack, PA} + end, + {{Msg, IsDelivered, AckTag, Len1}, + State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}. + +ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA }) -> + ok = persist_acks(none, QName, AckTags, PA), + PA1 = remove_acks(AckTags, PA), + State #iv_state { pending_ack = PA1 }. + +tx_publish(Txn, Msg, State = #iv_state { qname = QName }) -> + publish_in_tx(Txn, Msg), + ok = persist_message(Txn, QName, Msg), + State. + +tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA }) -> + ack_in_tx(Txn, AckTags), + ok = persist_acks(Txn, QName, AckTags, PA), + State. + +tx_rollback(Txn, State = #iv_state { qname = QName }) -> + #tx { pending_acks = AckTags } = lookup_tx(Txn), + ok = rollback_work(Txn, QName), + erase_tx(Txn), + {lists:flatten(AckTags), State}. + +tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA, + queue = Q, len = Len }) -> + #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn), + ok = commit_work(Txn, QName), + erase_tx(Txn), + Fun(), + AckTags1 = lists:flatten(AckTags), + PA1 = remove_acks(AckTags1, PA), + {Q1, Len1} = lists:foldr(fun (Msg, {QN, LenN}) -> + {queue:in({Msg, false}, QN), LenN + 1} + end, {Q, Len}, PubsRev), + {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}. + +requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q, + len = Len }) -> + %% We don't need to touch the persister here - the persister will + %% already have these messages published and delivered as + %% necessary. The complication is that the persister's seq_id will + %% now be wrong, given the position of these messages in our queue + %% here. However, the persister's seq_id is only used for sorting + %% on startup, and requeue is silent as to where the requeued + %% messages should appear, thus the persister is permitted to sort + %% based on seq_id, even though it'll likely give a different + %% order to the last known state of our queue, prior to shutdown. + {Q1, PA1, Len1} = + lists:foldl( + fun (Guid, {QN, PAN, LenN}) -> + {ok, Msg = #basic_message {}} = dict:find(Guid, PAN), + {queue:in({Msg, true}, QN), dict:erase(Guid, PAN), LenN + 1} + end, {Q, PA, Len}, AckTags), + State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }. + +len(#iv_state { len = Len }) -> + Len. + +is_empty(State) -> + 0 == len(State). + +set_ram_duration_target(_DurationTarget, State) -> + %% HA! + State. + +ram_duration(State) -> + {0, State}. + +sync_callback(_State) -> + undefined. + +handle_pre_hibernate(State) -> + State. + +status(_State) -> + []. + +%%---------------------------------------------------------------------------- + +remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags). + +%%---------------------------------------------------------------------------- + +lookup_tx(Txn) -> + case get({txn, Txn}) of + undefined -> #tx { pending_messages = [], + pending_acks = [], + is_persistent = false }; + V -> V + end. + +store_tx(Txn, Tx) -> + put({txn, Txn}, Tx). + +erase_tx(Txn) -> + erase({txn, Txn}). + +mark_tx_persistent(Txn) -> + store_tx(Txn, (lookup_tx(Txn)) #tx { is_persistent = true }). + +is_tx_persistent(Txn) -> + (lookup_tx(Txn)) #tx.is_persistent. + +do_if_persistent(F, Txn, QName) -> + ok = case is_tx_persistent(Txn) of + false -> ok; + true -> F({Txn, QName}) + end. + +publish_in_tx(Txn, Msg) -> + Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), + store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }). + +ack_in_tx(Txn, AckTags) -> + Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), + store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }). + +%%---------------------------------------------------------------------------- + +persist_message(_Txn, _QName, #basic_message { is_persistent = false }) -> + ok; +persist_message(Txn, QName, Msg) -> + Msg1 = Msg #basic_message { + %% don't persist any recoverable decoded properties, + %% rebuild from properties_bin on restore + content = rabbit_binary_parser:clear_decoded_content( + Msg #basic_message.content)}, + persist_work(Txn, QName, + [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]). + +persist_delivery(_QName, #basic_message { is_persistent = false }, + _IsDelivered) -> + ok; +persist_delivery(_QName, _Message, true) -> + ok; +persist_delivery(QName, #basic_message { guid = Guid }, _IsDelivered) -> + persist_work(none, QName, [{deliver, {QName, Guid}}]). + +persist_acks(Txn, QName, AckTags, PA) -> + persist_work(Txn, QName, + [{ack, {QName, Guid}} || + Guid <- AckTags, + case dict:find(Guid, PA) of + {ok, #basic_message { is_persistent = true }} -> true; + _ -> false + end]). + +persist_work(_Txn,_QName, []) -> + ok; +persist_work(none, _QName, WorkList) -> + rabbit_persister:dirty_work(WorkList); +persist_work(Txn, QName, WorkList) -> + mark_tx_persistent(Txn), + rabbit_persister:extend_transaction({Txn, QName}, WorkList). + +commit_work(Txn, QName) -> + do_if_persistent(fun rabbit_persister:commit_transaction/1, + Txn, QName). + +rollback_work(Txn, QName) -> + do_if_persistent(fun rabbit_persister:rollback_transaction/1, + Txn, QName). diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl new file mode 100644 index 0000000000..3235a837e8 --- /dev/null +++ b/src/rabbit_persister.erl @@ -0,0 +1,542 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_persister). + +-behaviour(gen_server). + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([transaction/1, extend_transaction/2, dirty_work/1, + commit_transaction/1, rollback_transaction/1, + force_snapshot/0, serial/0, fetch_content/1]). + +-include("rabbit.hrl"). + +-define(SERVER, ?MODULE). + +-define(LOG_BUNDLE_DELAY, 5). +-define(COMPLETE_BUNDLE_DELAY, 2). + +-define(HIBERNATE_AFTER, 10000). + +-define(MAX_WRAP_ENTRIES, 500). + +-define(PERSISTER_LOG_FORMAT_VERSION, {2, 5}). + +-record(pstate, {log_handle, entry_count, deadline, + pending_logs, pending_replies, + snapshot, recovered_content}). + +%% two tables for efficient persistency +%% one maps a key to a message +%% the other maps a key to one or more queues. +%% The aim is to reduce the overload of storing a message multiple times +%% when it appears in several queues. +-record(psnapshot, {serial, transactions, messages, queues, next_seq_id}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(pmsg() :: {queue_name(), pkey()}). +-type(work_item() :: + {publish, message(), pmsg()} | + {deliver, pmsg()} | + {ack, pmsg()}). + +-spec(start_link/1 :: ([queue_name()]) -> + {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(transaction/1 :: ([work_item()]) -> 'ok'). +-spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok'). +-spec(dirty_work/1 :: ([work_item()]) -> 'ok'). +-spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). +-spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). +-spec(force_snapshot/0 :: () -> 'ok'). +-spec(serial/0 :: () -> non_neg_integer()). +-spec(fetch_content/1 :: (queue_name()) -> [{message(), boolean()}]). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link(DurableQueues) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [DurableQueues], []). + +transaction(MessageList) -> + ?LOGDEBUG("transaction ~p~n", [MessageList]), + TxnKey = rabbit_guid:guid(), + gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity). + +extend_transaction(TxnKey, MessageList) -> + ?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]), + gen_server:cast(?SERVER, {extend_transaction, TxnKey, MessageList}). + +dirty_work(MessageList) -> + ?LOGDEBUG("dirty_work ~p~n", [MessageList]), + gen_server:cast(?SERVER, {dirty_work, MessageList}). + +commit_transaction(TxnKey) -> + ?LOGDEBUG("commit_transaction ~p~n", [TxnKey]), + gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity). + +rollback_transaction(TxnKey) -> + ?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]), + gen_server:cast(?SERVER, {rollback_transaction, TxnKey}). + +force_snapshot() -> + gen_server:call(?SERVER, force_snapshot, infinity). + +serial() -> + gen_server:call(?SERVER, serial, infinity). + +fetch_content(QName) -> + gen_server:call(?SERVER, {fetch_content, QName}, infinity). + +%%-------------------------------------------------------------------- + +init([DurableQueues]) -> + process_flag(trap_exit, true), + FileName = base_filename(), + ok = filelib:ensure_dir(FileName), + Snapshot = #psnapshot{serial = 0, + transactions = dict:new(), + messages = ets:new(messages, []), + queues = ets:new(queues, []), + next_seq_id = 0}, + LogHandle = + case disk_log:open([{name, rabbit_persister}, + {head, current_snapshot(Snapshot)}, + {file, FileName}]) of + {ok, LH} -> LH; + {repaired, LH, {recovered, Recovered}, {badbytes, Bad}} -> + WarningFun = if + Bad > 0 -> fun rabbit_log:warning/2; + true -> fun rabbit_log:info/2 + end, + WarningFun("Repaired persister log - ~p recovered, ~p bad~n", + [Recovered, Bad]), + LH + end, + {Res, RecoveredContent, LoadedSnapshot} = + internal_load_snapshot(LogHandle, DurableQueues, Snapshot), + NewSnapshot = LoadedSnapshot#psnapshot{ + serial = LoadedSnapshot#psnapshot.serial + 1}, + case Res of + ok -> + ok = take_snapshot(LogHandle, NewSnapshot); + {error, Reason} -> + rabbit_log:error("Failed to load persister log: ~p~n", [Reason]), + ok = take_snapshot_and_save_old(LogHandle, NewSnapshot) + end, + State = #pstate{log_handle = LogHandle, + entry_count = 0, + deadline = infinity, + pending_logs = [], + pending_replies = [], + snapshot = NewSnapshot, + recovered_content = RecoveredContent}, + {ok, State}. + +handle_call({transaction, Key, MessageList}, From, State) -> + NewState = internal_extend(Key, MessageList, State), + do_noreply(internal_commit(From, Key, NewState)); +handle_call({commit_transaction, TxnKey}, From, State) -> + do_noreply(internal_commit(From, TxnKey, State)); +handle_call(force_snapshot, _From, State) -> + do_reply(ok, flush(true, State)); +handle_call(serial, _From, + State = #pstate{snapshot = #psnapshot{serial = Serial}}) -> + do_reply(Serial, State); +handle_call({fetch_content, QName}, _From, State = + #pstate{recovered_content = RC}) -> + List = case dict:find(QName, RC) of + {ok, Content} -> Content; + error -> [] + end, + do_reply(List, State#pstate{recovered_content = dict:erase(QName, RC)}); +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast({rollback_transaction, TxnKey}, State) -> + do_noreply(internal_rollback(TxnKey, State)); +handle_cast({dirty_work, MessageList}, State) -> + do_noreply(internal_dirty_work(MessageList, State)); +handle_cast({extend_transaction, TxnKey, MessageList}, State) -> + do_noreply(internal_extend(TxnKey, MessageList, State)); +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(timeout, State = #pstate{deadline = infinity}) -> + State1 = flush(true, State), + %% TODO: Once we drop support for R11B-5, we can change this to + %% {noreply, State1, hibernate}; + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]); +handle_info(timeout, State) -> + do_noreply(flush(State)); +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, State = #pstate{log_handle = LogHandle}) -> + flush(State), + disk_log:close(LogHandle), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, flush(State)}. + +%%-------------------------------------------------------------------- + +internal_extend(Key, MessageList, State) -> + log_work(fun (ML) -> {extend_transaction, Key, ML} end, + MessageList, State). + +internal_dirty_work(MessageList, State) -> + log_work(fun (ML) -> {dirty_work, ML} end, + MessageList, State). + +internal_commit(From, Key, State = #pstate{snapshot = Snapshot}) -> + Unit = {commit_transaction, Key}, + NewSnapshot = internal_integrate1(Unit, Snapshot), + complete(From, Unit, State#pstate{snapshot = NewSnapshot}). + +internal_rollback(Key, State = #pstate{snapshot = Snapshot}) -> + Unit = {rollback_transaction, Key}, + NewSnapshot = internal_integrate1(Unit, Snapshot), + log(State#pstate{snapshot = NewSnapshot}, Unit). + +complete(From, Item, State = #pstate{deadline = ExistingDeadline, + pending_logs = Logs, + pending_replies = Waiting}) -> + State#pstate{deadline = compute_deadline( + ?COMPLETE_BUNDLE_DELAY, ExistingDeadline), + pending_logs = [Item | Logs], + pending_replies = [From | Waiting]}. + +%% This is made to limit disk usage by writing messages only once onto +%% disk. We keep a table associating pkeys to messages, and provided +%% the list of messages to output is left to right, we can guarantee +%% that pkeys will be a backreference to a message in memory when a +%% "tied" is met. +log_work(CreateWorkUnit, MessageList, + State = #pstate{ + snapshot = Snapshot = #psnapshot{messages = Messages}}) -> + Unit = CreateWorkUnit( + rabbit_misc:map_in_order( + fun(M = {publish, Message, QK = {_QName, PKey}}) -> + case ets:lookup(Messages, PKey) of + [_] -> {tied, QK}; + [] -> ets:insert(Messages, {PKey, Message}), + M + end; + (M) -> M + end, + MessageList)), + NewSnapshot = internal_integrate1(Unit, Snapshot), + log(State#pstate{snapshot = NewSnapshot}, Unit). + +log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs}, + Message) -> + State#pstate{deadline = compute_deadline(?LOG_BUNDLE_DELAY, + ExistingDeadline), + pending_logs = [Message | Logs]}. + +base_filename() -> + rabbit_mnesia:dir() ++ "/rabbit_persister.LOG". + +take_snapshot(LogHandle, OldFileName, Snapshot) -> + ok = disk_log:sync(LogHandle), + %% current_snapshot is the Head (ie. first thing logged) + ok = disk_log:reopen(LogHandle, OldFileName, current_snapshot(Snapshot)). + +take_snapshot(LogHandle, Snapshot) -> + OldFileName = lists:flatten(base_filename() ++ ".previous"), + file:delete(OldFileName), + rabbit_log:info("Rolling persister log to ~p~n", [OldFileName]), + ok = take_snapshot(LogHandle, OldFileName, Snapshot). + +take_snapshot_and_save_old(LogHandle, Snapshot) -> + {MegaSecs, Secs, MicroSecs} = erlang:now(), + Timestamp = MegaSecs * 1000000 + Secs * 1000 + MicroSecs, + OldFileName = lists:flatten(io_lib:format("~s.saved.~p", + [base_filename(), Timestamp])), + rabbit_log:info("Saving persister log in ~p~n", [OldFileName]), + ok = take_snapshot(LogHandle, OldFileName, Snapshot). + +maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount, + log_handle = LH, + snapshot = Snapshot}) + when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES -> + ok = take_snapshot(LH, Snapshot), + State#pstate{entry_count = 0}; +maybe_take_snapshot(_Force, State) -> + State. + +later_ms(DeltaMilliSec) -> + {MegaSec, Sec, MicroSec} = now(), + %% Note: not normalised. Unimportant for this application. + {MegaSec, Sec, MicroSec + (DeltaMilliSec * 1000)}. + +%% Result = B - A, more or less +time_diff({B1, B2, B3}, {A1, A2, A3}) -> + (B1 - A1) * 1000000 + (B2 - A2) + (B3 - A3) / 1000000.0 . + +compute_deadline(TimerDelay, infinity) -> + later_ms(TimerDelay); +compute_deadline(_TimerDelay, ExistingDeadline) -> + ExistingDeadline. + +compute_timeout(infinity) -> + ?HIBERNATE_AFTER; +compute_timeout(Deadline) -> + DeltaMilliSec = time_diff(Deadline, now()) * 1000.0, + if + DeltaMilliSec =< 1 -> + 0; + true -> + round(DeltaMilliSec) + end. + +do_noreply(State = #pstate{deadline = Deadline}) -> + {noreply, State, compute_timeout(Deadline)}. + +do_reply(Reply, State = #pstate{deadline = Deadline}) -> + {reply, Reply, State, compute_timeout(Deadline)}. + +flush(State) -> flush(false, State). + +flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs, + pending_replies = Waiting, + log_handle = LogHandle}) -> + State1 = if PendingLogs /= [] -> + disk_log:alog(LogHandle, lists:reverse(PendingLogs)), + State#pstate{entry_count = State#pstate.entry_count + 1}; + true -> + State + end, + State2 = maybe_take_snapshot(ForceSnapshot, State1), + if Waiting /= [] -> + ok = disk_log:sync(LogHandle), + lists:foreach(fun (From) -> gen_server:reply(From, ok) end, + Waiting); + true -> + ok + end, + State2#pstate{deadline = infinity, + pending_logs = [], + pending_replies = []}. + +current_snapshot(_Snapshot = #psnapshot{serial = Serial, + transactions = Ts, + messages = Messages, + queues = Queues, + next_seq_id = NextSeqId}) -> + %% Avoid infinite growth of the table by removing messages not + %% bound to a queue anymore + prune_table(Messages, ets:foldl( + fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> + sets:add_element(PKey, S) + end, sets:new(), Queues)), + InnerSnapshot = {{serial, Serial}, + {txns, Ts}, + {messages, ets:tab2list(Messages)}, + {queues, ets:tab2list(Queues)}, + {next_seq_id, NextSeqId}}, + ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]), + {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, + term_to_binary(InnerSnapshot)}. + +prune_table(Tab, Keys) -> + true = ets:safe_fixtable(Tab, true), + ok = prune_table(Tab, Keys, ets:first(Tab)), + true = ets:safe_fixtable(Tab, false). + +prune_table(_Tab, _Keys, '$end_of_table') -> ok; +prune_table(Tab, Keys, Key) -> + case sets:is_element(Key, Keys) of + true -> ok; + false -> ets:delete(Tab, Key) + end, + prune_table(Tab, Keys, ets:next(Tab, Key)). + +internal_load_snapshot(LogHandle, + DurableQueues, + Snapshot = #psnapshot{messages = Messages, + queues = Queues}) -> + {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), + case check_version(Loaded_Snapshot) of + {ok, StateBin} -> + {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}, + {next_seq_id, NextSeqId}} = binary_to_term(StateBin), + true = ets:insert(Messages, Ms), + true = ets:insert(Queues, Qs), + Snapshot1 = replay(Items, LogHandle, K, + Snapshot#psnapshot{ + serial = Serial, + transactions = Ts, + next_seq_id = NextSeqId}), + {RecoveredContent, Snapshot2} = + recover_messages(DurableQueues, Snapshot1), + %% uncompleted transactions are discarded - this is TRTTD + %% since we only get into this code on node restart, so + %% any uncompleted transactions will have been aborted. + {ok, RecoveredContent, + Snapshot2#psnapshot{transactions = dict:new()}}; + {error, Reason} -> {{error, Reason}, dict:new(), Snapshot} + end. + +check_version({persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, + StateBin}) -> + {ok, StateBin}; +check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) -> + {error, {unsupported_persister_log_format, Vsn}}; +check_version(_Other) -> + {error, unrecognised_persister_log_format}. + +recover_messages(DurableQueues, Snapshot = #psnapshot{messages = Messages, + queues = Queues}) -> + DurableQueuesSet = sets:from_list(DurableQueues), + Work = ets:foldl( + fun ({{QName, PKey}, Delivered, SeqId}, Acc) -> + case sets:is_element(QName, DurableQueuesSet) of + true -> + rabbit_misc:dict_cons( + QName, {SeqId, PKey, Delivered}, Acc); + false -> + Acc + end + end, dict:new(), Queues), + {L, RecoveredContent} = + lists:foldl( + fun ({Recovered, {QName, Msgs}}, {L, Dict}) -> + {Recovered ++ L, dict:store(QName, Msgs, Dict)} + end, {[], dict:new()}, + %% unstable parallel map, because order doesn't matter + rabbit_misc:upmap( + %% we do as much work as possible in spawned worker + %% processes, but we need to make sure the ets:inserts are + %% performed in self() + fun ({QName, Requeues}) -> + recover(QName, Requeues, Messages) + end, dict:to_list(Work))), + NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L], + NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L], + ets:delete_all_objects(Messages), + ets:delete_all_objects(Queues), + true = ets:insert(Messages, NewMessages), + true = ets:insert(Queues, NewQueues), + %% contains the mutated messages and queues tables + {RecoveredContent, Snapshot}. + +recover(QName, Requeues, Messages) -> + RecoveredMessages = + lists:sort([{SeqId, QName, PKey, Message, Delivered} || + {SeqId, PKey, Delivered} <- Requeues, + {_, Message} <- ets:lookup(Messages, PKey)]), + {RecoveredMessages, {QName, [{Message, Delivered} || + {_, _, _, Message, Delivered} + <- RecoveredMessages]}}. + +replay([], LogHandle, K, Snapshot) -> + case disk_log:chunk(LogHandle, K) of + {K1, Items} -> + replay(Items, LogHandle, K1, Snapshot); + {K1, Items, Badbytes} -> + rabbit_log:warning("~p bad bytes recovering persister log~n", + [Badbytes]), + replay(Items, LogHandle, K1, Snapshot); + eof -> Snapshot + end; +replay([Item | Items], LogHandle, K, Snapshot) -> + NewSnapshot = internal_integrate_messages(Item, Snapshot), + replay(Items, LogHandle, K, NewSnapshot). + +internal_integrate_messages(Items, Snapshot) -> + lists:foldl(fun (Item, Snap) -> internal_integrate1(Item, Snap) end, + Snapshot, Items). + +internal_integrate1({extend_transaction, Key, MessageList}, + Snapshot = #psnapshot {transactions = Transactions}) -> + Snapshot#psnapshot{transactions = rabbit_misc:dict_cons(Key, MessageList, + Transactions)}; +internal_integrate1({rollback_transaction, Key}, + Snapshot = #psnapshot{transactions = Transactions}) -> + Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; +internal_integrate1({commit_transaction, Key}, + Snapshot = #psnapshot{transactions = Transactions, + messages = Messages, + queues = Queues, + next_seq_id = SeqId}) -> + case dict:find(Key, Transactions) of + {ok, MessageLists} -> + ?LOGDEBUG("persist committing txn ~p~n", [Key]), + NextSeqId = + lists:foldr( + fun (ML, SeqIdN) -> + perform_work(ML, Messages, Queues, SeqIdN) end, + SeqId, MessageLists), + Snapshot#psnapshot{transactions = dict:erase(Key, Transactions), + next_seq_id = NextSeqId}; + error -> + Snapshot + end; +internal_integrate1({dirty_work, MessageList}, + Snapshot = #psnapshot{messages = Messages, + queues = Queues, + next_seq_id = SeqId}) -> + Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages, + Queues, SeqId)}. + +perform_work(MessageList, Messages, Queues, SeqId) -> + lists:foldl(fun (Item, NextSeqId) -> + perform_work_item(Item, Messages, Queues, NextSeqId) + end, SeqId, MessageList). + +perform_work_item({publish, Message, QK = {_QName, PKey}}, + Messages, Queues, NextSeqId) -> + true = ets:insert(Messages, {PKey, Message}), + true = ets:insert(Queues, {QK, false, NextSeqId}), + NextSeqId + 1; + +perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) -> + true = ets:insert(Queues, {QK, false, NextSeqId}), + NextSeqId + 1; + +perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) -> + true = ets:update_element(Queues, QK, {2, true}), + NextSeqId; + +perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) -> + true = ets:delete(Queues, QK), + NextSeqId. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 4bef843596..6b5c73bb90 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -54,9 +54,7 @@ test_content_prop_roundtrip(Datum, Binary) -> Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion all_tests() -> - passed = test_msg_store(), - passed = test_queue_index(), - passed = test_variable_queue(), + passed = test_backing_queue(), passed = test_priority_queue(), passed = test_bpqueue(), passed = test_pg_local(), @@ -994,6 +992,16 @@ bad_handle_hook(_, _, _) -> extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) -> handle_hook(Hookname, Handler, {Args, Extra1, Extra2}). +test_backing_queue() -> + case application:get_env(backing_queue_module) of + {ok, rabbit_variable_queue} -> + passed = test_msg_store(), + passed = test_queue_index(), + passed = test_variable_queue(); + _ -> + passed + end. + start_msg_store_empty() -> start_msg_store(fun (ok) -> finished end, ok). |
