diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_binary_generator.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_guid.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_persister.erl | 51 |
5 files changed, 65 insertions, 76 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index e7bff27082..26f244bc6d 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -91,6 +91,13 @@ {requires, kernel_ready}, {enables, core_initialized}]}). +-rabbit_boot_step({guid_generator, + [{description, "guid generator"}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_guid]}}, + {requires, kernel_ready}, + {enables, core_initialized}]}). + -rabbit_boot_step({delegate_sup, [{description, "cluster delegate"}, {mfa, {rabbit_sup, start_child, @@ -128,13 +135,6 @@ [rabbit_persister]}}, {requires, queue_sup_queue_recovery}]}). --rabbit_boot_step({guid_generator, - [{description, "guid generator"}, - {mfa, {rabbit_sup, start_restartable_child, - [rabbit_guid]}}, - {requires, persister}, - {enables, routing_ready}]}). - -rabbit_boot_step({routing_ready, [{description, "message delivery logic ready"}]}). diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 1d47d7640e..ed84373585 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -95,33 +95,36 @@ maybe_encode_properties(_ContentProperties, ContentPropertiesBin) maybe_encode_properties(ContentProperties, none) -> rabbit_framing:encode_properties(ContentProperties). -build_content_frames(FragmentsRev, FrameMax, ChannelInt) -> - BodyPayloadMax = if - FrameMax == 0 -> - none; - true -> +build_content_frames(FragsRev, FrameMax, ChannelInt) -> + BodyPayloadMax = if FrameMax == 0 -> + iolist_size(FragsRev); + true -> FrameMax - ?EMPTY_CONTENT_BODY_FRAME_SIZE end, - build_content_frames(0, [], FragmentsRev, BodyPayloadMax, ChannelInt). - -build_content_frames(SizeAcc, FragmentAcc, [], _BodyPayloadMax, _ChannelInt) -> - {SizeAcc, FragmentAcc}; -build_content_frames(SizeAcc, FragmentAcc, [Fragment | FragmentsRev], - BodyPayloadMax, ChannelInt) - when is_number(BodyPayloadMax) and (size(Fragment) > BodyPayloadMax) -> - <<Head:BodyPayloadMax/binary, Tail/binary>> = Fragment, - build_content_frames(SizeAcc, FragmentAcc, [Tail, Head | FragmentsRev], - BodyPayloadMax, ChannelInt); -build_content_frames(SizeAcc, FragmentAcc, [<<>> | FragmentsRev], - BodyPayloadMax, ChannelInt) -> - build_content_frames(SizeAcc, FragmentAcc, FragmentsRev, BodyPayloadMax, ChannelInt); -build_content_frames(SizeAcc, FragmentAcc, [Fragment | FragmentsRev], - BodyPayloadMax, ChannelInt) -> - build_content_frames(SizeAcc + size(Fragment), - [create_frame(3, ChannelInt, Fragment) | FragmentAcc], - FragmentsRev, - BodyPayloadMax, - ChannelInt). + build_content_frames(0, [], BodyPayloadMax, [], + lists:reverse(FragsRev), BodyPayloadMax, ChannelInt). + +build_content_frames(SizeAcc, FramesAcc, _FragSizeRem, [], + [], _BodyPayloadMax, _ChannelInt) -> + {SizeAcc, lists:reverse(FramesAcc)}; +build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc, + Frags, BodyPayloadMax, ChannelInt) + when FragSizeRem == 0 orelse Frags == [] -> + Frame = create_frame(3, ChannelInt, lists:reverse(FragAcc)), + FrameSize = BodyPayloadMax - FragSizeRem, + build_content_frames(SizeAcc + FrameSize, [Frame | FramesAcc], + BodyPayloadMax, [], Frags, BodyPayloadMax, ChannelInt); +build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc, + [Frag | Frags], BodyPayloadMax, ChannelInt) -> + Size = size(Frag), + {NewFragSizeRem, NewFragAcc, NewFrags} = + case Size =< FragSizeRem of + true -> {FragSizeRem - Size, [Frag | FragAcc], Frags}; + false -> <<Head:FragSizeRem/binary, Tail/binary>> = Frag, + {0, [Head | FragAcc], [Tail | Frags]} + end, + build_content_frames(SizeAcc, FramesAcc, NewFragSizeRem, NewFragAcc, + NewFrags, BodyPayloadMax, ChannelInt). build_heartbeat_frame() -> create_frame(?FRAME_HEARTBEAT, 0, <<>>). diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 2fa531a7ce..1ae8f7dac4 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -67,7 +67,7 @@ update_disk_serial() -> Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME), Serial = case rabbit_misc:read_term_file(Filename) of {ok, [Num]} -> Num; - {error, enoent} -> rabbit_persister:serial(); + {error, enoent} -> 0; {error, Reason} -> throw({error, {cannot_read_serial_file, Filename, Reason}}) end, @@ -78,7 +78,7 @@ update_disk_serial() -> end, Serial. -%% generate a guid that is monotonically increasing per process. +%% generate a GUID. %% %% The id is only unique within a single cluster and as long as the %% serial store hasn't been deleted. @@ -92,20 +92,18 @@ guid() -> %% A persisted serial number, in combination with self/0 (which %% includes the node name) uniquely identifies a process in space %% and time. We combine that with a process-local counter to give - %% us a GUID that is monotonically increasing per process. + %% us a GUID. G = case get(guid) of undefined -> {{gen_server:call(?SERVER, serial, infinity), self()}, 0}; {S, I} -> {S, I+1} end, put(guid, G), - G. + erlang:md5(term_to_binary(G)). -%% generate a readable string representation of a guid. Note that any -%% monotonicity of the guid is not preserved in the encoding. +%% generate a readable string representation of a GUID. string_guid(Prefix) -> - Prefix ++ "-" ++ base64:encode_to_string( - erlang:md5(term_to_binary(guid()))). + Prefix ++ "-" ++ base64:encode_to_string(guid()). binstring_guid(Prefix) -> list_to_binary(string_guid(Prefix)). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index d35c0a2512..119808af35 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -43,7 +43,7 @@ -export([r/3, r/2, r_arg/4, rs/1]). -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). --export([enable_cover_node/1]). +-export([start_cover/1]). -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). @@ -98,6 +98,7 @@ undefined | r(K) when is_subtype(K, atom())). -spec(rs/1 :: (r(atom())) -> string()). -spec(enable_cover/0 :: () -> ok_or_error()). +-spec(start_cover/1 :: ([{string(), string()} | string()]) -> 'ok'). -spec(report_cover/0 :: () -> 'ok'). -spec(enable_cover/1 :: (file_path()) -> ok_or_error()). -spec(report_cover/1 :: (file_path()) -> 'ok'). @@ -230,9 +231,9 @@ enable_cover(Root) -> _ -> ok end. -enable_cover_node(NodeS) -> - Node = makenode(NodeS), - {ok, _} = cover:start([Node]). +start_cover(NodesS) -> + {ok, _} = cover:start([makenode(N) || N <- NodesS]), + ok. report_cover() -> report_cover("."). diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 53335a6fab..a9e0cab928 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -40,7 +40,7 @@ -export([transaction/1, extend_transaction/2, dirty_work/1, commit_transaction/1, rollback_transaction/1, - force_snapshot/0, serial/0]). + force_snapshot/0]). -include("rabbit.hrl"). @@ -49,11 +49,7 @@ -define(LOG_BUNDLE_DELAY, 5). -define(COMPLETE_BUNDLE_DELAY, 2). --define(HIBERNATE_AFTER, 10000). - --define(MAX_WRAP_ENTRIES, 500). - --define(PERSISTER_LOG_FORMAT_VERSION, {2, 5}). +-define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}). -record(pstate, {log_handle, entry_count, deadline, pending_logs, pending_replies, @@ -64,7 +60,7 @@ %% the other maps a key to one or more queues. %% The aim is to reduce the overload of storing a message multiple times %% when it appears in several queues. --record(psnapshot, {serial, transactions, messages, queues, next_seq_id}). +-record(psnapshot, {transactions, messages, queues, next_seq_id}). %%---------------------------------------------------------------------------- @@ -83,7 +79,6 @@ -spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). -spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). -spec(force_snapshot/0 :: () -> 'ok'). --spec(serial/0 :: () -> non_neg_integer()). -endif. @@ -116,17 +111,13 @@ rollback_transaction(TxnKey) -> force_snapshot() -> gen_server:call(?SERVER, force_snapshot, infinity). -serial() -> - gen_server:call(?SERVER, serial, infinity). - %%-------------------------------------------------------------------- init(_Args) -> process_flag(trap_exit, true), FileName = base_filename(), ok = filelib:ensure_dir(FileName), - Snapshot = #psnapshot{serial = 0, - transactions = dict:new(), + Snapshot = #psnapshot{transactions = dict:new(), messages = ets:new(messages, []), queues = ets:new(queues, []), next_seq_id = 0}, @@ -144,9 +135,7 @@ init(_Args) -> [Recovered, Bad]), LH end, - {Res, LoadedSnapshot} = internal_load_snapshot(LogHandle, Snapshot), - NewSnapshot = LoadedSnapshot#psnapshot{ - serial = LoadedSnapshot#psnapshot.serial + 1}, + {Res, NewSnapshot} = internal_load_snapshot(LogHandle, Snapshot), case Res of ok -> ok = take_snapshot(LogHandle, NewSnapshot); @@ -169,9 +158,6 @@ handle_call({commit_transaction, TxnKey}, From, State) -> do_noreply(internal_commit(From, TxnKey, State)); handle_call(force_snapshot, _From, State) -> do_reply(ok, flush(true, State)); -handle_call(serial, _From, - State = #pstate{snapshot = #psnapshot{serial = Serial}}) -> - do_reply(Serial, State); handle_call(_Request, _From, State) -> {noreply, State}. @@ -282,12 +268,15 @@ take_snapshot_and_save_old(LogHandle, Snapshot) -> maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount, log_handle = LH, - snapshot = Snapshot}) - when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES -> - ok = take_snapshot(LH, Snapshot), - State#pstate{entry_count = 0}; -maybe_take_snapshot(_Force, State) -> - State. + snapshot = Snapshot}) -> + {ok, MaxWrapEntries} = application:get_env(persister_max_wrap_entries), + if + Force orelse EntryCount >= MaxWrapEntries -> + ok = take_snapshot(LH, Snapshot), + State#pstate{entry_count = 0}; + true -> + State + end. later_ms(DeltaMilliSec) -> {MegaSec, Sec, MicroSec} = now(), @@ -304,7 +293,8 @@ compute_deadline(_TimerDelay, ExistingDeadline) -> ExistingDeadline. compute_timeout(infinity) -> - ?HIBERNATE_AFTER; + {ok, HibernateAfter} = application:get_env(persister_hibernate_after), + HibernateAfter; compute_timeout(Deadline) -> DeltaMilliSec = time_diff(Deadline, now()) * 1000.0, if @@ -343,8 +333,7 @@ flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs, pending_logs = [], pending_replies = []}. -current_snapshot(_Snapshot = #psnapshot{serial = Serial, - transactions = Ts, +current_snapshot(_Snapshot = #psnapshot{transactions = Ts, messages = Messages, queues = Queues, next_seq_id = NextSeqId}) -> @@ -354,8 +343,7 @@ current_snapshot(_Snapshot = #psnapshot{serial = Serial, fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> sets:add_element(PKey, S) end, sets:new(), Queues)), - InnerSnapshot = {{serial, Serial}, - {txns, Ts}, + InnerSnapshot = {{txns, Ts}, {messages, ets:tab2list(Messages)}, {queues, ets:tab2list(Queues)}, {next_seq_id, NextSeqId}}, @@ -382,13 +370,12 @@ internal_load_snapshot(LogHandle, {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), case check_version(Loaded_Snapshot) of {ok, StateBin} -> - {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}, + {{txns, Ts}, {messages, Ms}, {queues, Qs}, {next_seq_id, NextSeqId}} = binary_to_term(StateBin), true = ets:insert(Messages, Ms), true = ets:insert(Queues, Qs), Snapshot1 = replay(Items, LogHandle, K, Snapshot#psnapshot{ - serial = Serial, transactions = Ts, next_seq_id = NextSeqId}), Snapshot2 = requeue_messages(Snapshot1), |
