summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl14
-rw-r--r--src/rabbit_binary_generator.erl53
-rw-r--r--src/rabbit_guid.erl14
-rw-r--r--src/rabbit_misc.erl9
-rw-r--r--src/rabbit_persister.erl51
5 files changed, 65 insertions, 76 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index e7bff27082..26f244bc6d 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -91,6 +91,13 @@
{requires, kernel_ready},
{enables, core_initialized}]}).
+-rabbit_boot_step({guid_generator,
+ [{description, "guid generator"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [rabbit_guid]}},
+ {requires, kernel_ready},
+ {enables, core_initialized}]}).
+
-rabbit_boot_step({delegate_sup,
[{description, "cluster delegate"},
{mfa, {rabbit_sup, start_child,
@@ -128,13 +135,6 @@
[rabbit_persister]}},
{requires, queue_sup_queue_recovery}]}).
--rabbit_boot_step({guid_generator,
- [{description, "guid generator"},
- {mfa, {rabbit_sup, start_restartable_child,
- [rabbit_guid]}},
- {requires, persister},
- {enables, routing_ready}]}).
-
-rabbit_boot_step({routing_ready,
[{description, "message delivery logic ready"}]}).
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 1d47d7640e..ed84373585 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -95,33 +95,36 @@ maybe_encode_properties(_ContentProperties, ContentPropertiesBin)
maybe_encode_properties(ContentProperties, none) ->
rabbit_framing:encode_properties(ContentProperties).
-build_content_frames(FragmentsRev, FrameMax, ChannelInt) ->
- BodyPayloadMax = if
- FrameMax == 0 ->
- none;
- true ->
+build_content_frames(FragsRev, FrameMax, ChannelInt) ->
+ BodyPayloadMax = if FrameMax == 0 ->
+ iolist_size(FragsRev);
+ true ->
FrameMax - ?EMPTY_CONTENT_BODY_FRAME_SIZE
end,
- build_content_frames(0, [], FragmentsRev, BodyPayloadMax, ChannelInt).
-
-build_content_frames(SizeAcc, FragmentAcc, [], _BodyPayloadMax, _ChannelInt) ->
- {SizeAcc, FragmentAcc};
-build_content_frames(SizeAcc, FragmentAcc, [Fragment | FragmentsRev],
- BodyPayloadMax, ChannelInt)
- when is_number(BodyPayloadMax) and (size(Fragment) > BodyPayloadMax) ->
- <<Head:BodyPayloadMax/binary, Tail/binary>> = Fragment,
- build_content_frames(SizeAcc, FragmentAcc, [Tail, Head | FragmentsRev],
- BodyPayloadMax, ChannelInt);
-build_content_frames(SizeAcc, FragmentAcc, [<<>> | FragmentsRev],
- BodyPayloadMax, ChannelInt) ->
- build_content_frames(SizeAcc, FragmentAcc, FragmentsRev, BodyPayloadMax, ChannelInt);
-build_content_frames(SizeAcc, FragmentAcc, [Fragment | FragmentsRev],
- BodyPayloadMax, ChannelInt) ->
- build_content_frames(SizeAcc + size(Fragment),
- [create_frame(3, ChannelInt, Fragment) | FragmentAcc],
- FragmentsRev,
- BodyPayloadMax,
- ChannelInt).
+ build_content_frames(0, [], BodyPayloadMax, [],
+ lists:reverse(FragsRev), BodyPayloadMax, ChannelInt).
+
+build_content_frames(SizeAcc, FramesAcc, _FragSizeRem, [],
+ [], _BodyPayloadMax, _ChannelInt) ->
+ {SizeAcc, lists:reverse(FramesAcc)};
+build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc,
+ Frags, BodyPayloadMax, ChannelInt)
+ when FragSizeRem == 0 orelse Frags == [] ->
+ Frame = create_frame(3, ChannelInt, lists:reverse(FragAcc)),
+ FrameSize = BodyPayloadMax - FragSizeRem,
+ build_content_frames(SizeAcc + FrameSize, [Frame | FramesAcc],
+ BodyPayloadMax, [], Frags, BodyPayloadMax, ChannelInt);
+build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc,
+ [Frag | Frags], BodyPayloadMax, ChannelInt) ->
+ Size = size(Frag),
+ {NewFragSizeRem, NewFragAcc, NewFrags} =
+ case Size =< FragSizeRem of
+ true -> {FragSizeRem - Size, [Frag | FragAcc], Frags};
+ false -> <<Head:FragSizeRem/binary, Tail/binary>> = Frag,
+ {0, [Head | FragAcc], [Tail | Frags]}
+ end,
+ build_content_frames(SizeAcc, FramesAcc, NewFragSizeRem, NewFragAcc,
+ NewFrags, BodyPayloadMax, ChannelInt).
build_heartbeat_frame() ->
create_frame(?FRAME_HEARTBEAT, 0, <<>>).
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 2fa531a7ce..1ae8f7dac4 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -67,7 +67,7 @@ update_disk_serial() ->
Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME),
Serial = case rabbit_misc:read_term_file(Filename) of
{ok, [Num]} -> Num;
- {error, enoent} -> rabbit_persister:serial();
+ {error, enoent} -> 0;
{error, Reason} ->
throw({error, {cannot_read_serial_file, Filename, Reason}})
end,
@@ -78,7 +78,7 @@ update_disk_serial() ->
end,
Serial.
-%% generate a guid that is monotonically increasing per process.
+%% generate a GUID.
%%
%% The id is only unique within a single cluster and as long as the
%% serial store hasn't been deleted.
@@ -92,20 +92,18 @@ guid() ->
%% A persisted serial number, in combination with self/0 (which
%% includes the node name) uniquely identifies a process in space
%% and time. We combine that with a process-local counter to give
- %% us a GUID that is monotonically increasing per process.
+ %% us a GUID.
G = case get(guid) of
undefined -> {{gen_server:call(?SERVER, serial, infinity), self()},
0};
{S, I} -> {S, I+1}
end,
put(guid, G),
- G.
+ erlang:md5(term_to_binary(G)).
-%% generate a readable string representation of a guid. Note that any
-%% monotonicity of the guid is not preserved in the encoding.
+%% generate a readable string representation of a GUID.
string_guid(Prefix) ->
- Prefix ++ "-" ++ base64:encode_to_string(
- erlang:md5(term_to_binary(guid()))).
+ Prefix ++ "-" ++ base64:encode_to_string(guid()).
binstring_guid(Prefix) ->
list_to_binary(string_guid(Prefix)).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index d35c0a2512..119808af35 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -43,7 +43,7 @@
-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
-export([enable_cover/1, report_cover/1]).
--export([enable_cover_node/1]).
+-export([start_cover/1]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
@@ -98,6 +98,7 @@
undefined | r(K) when is_subtype(K, atom())).
-spec(rs/1 :: (r(atom())) -> string()).
-spec(enable_cover/0 :: () -> ok_or_error()).
+-spec(start_cover/1 :: ([{string(), string()} | string()]) -> 'ok').
-spec(report_cover/0 :: () -> 'ok').
-spec(enable_cover/1 :: (file_path()) -> ok_or_error()).
-spec(report_cover/1 :: (file_path()) -> 'ok').
@@ -230,9 +231,9 @@ enable_cover(Root) ->
_ -> ok
end.
-enable_cover_node(NodeS) ->
- Node = makenode(NodeS),
- {ok, _} = cover:start([Node]).
+start_cover(NodesS) ->
+ {ok, _} = cover:start([makenode(N) || N <- NodesS]),
+ ok.
report_cover() ->
report_cover(".").
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 53335a6fab..a9e0cab928 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -40,7 +40,7 @@
-export([transaction/1, extend_transaction/2, dirty_work/1,
commit_transaction/1, rollback_transaction/1,
- force_snapshot/0, serial/0]).
+ force_snapshot/0]).
-include("rabbit.hrl").
@@ -49,11 +49,7 @@
-define(LOG_BUNDLE_DELAY, 5).
-define(COMPLETE_BUNDLE_DELAY, 2).
--define(HIBERNATE_AFTER, 10000).
-
--define(MAX_WRAP_ENTRIES, 500).
-
--define(PERSISTER_LOG_FORMAT_VERSION, {2, 5}).
+-define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}).
-record(pstate, {log_handle, entry_count, deadline,
pending_logs, pending_replies,
@@ -64,7 +60,7 @@
%% the other maps a key to one or more queues.
%% The aim is to reduce the overload of storing a message multiple times
%% when it appears in several queues.
--record(psnapshot, {serial, transactions, messages, queues, next_seq_id}).
+-record(psnapshot, {transactions, messages, queues, next_seq_id}).
%%----------------------------------------------------------------------------
@@ -83,7 +79,6 @@
-spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(force_snapshot/0 :: () -> 'ok').
--spec(serial/0 :: () -> non_neg_integer()).
-endif.
@@ -116,17 +111,13 @@ rollback_transaction(TxnKey) ->
force_snapshot() ->
gen_server:call(?SERVER, force_snapshot, infinity).
-serial() ->
- gen_server:call(?SERVER, serial, infinity).
-
%%--------------------------------------------------------------------
init(_Args) ->
process_flag(trap_exit, true),
FileName = base_filename(),
ok = filelib:ensure_dir(FileName),
- Snapshot = #psnapshot{serial = 0,
- transactions = dict:new(),
+ Snapshot = #psnapshot{transactions = dict:new(),
messages = ets:new(messages, []),
queues = ets:new(queues, []),
next_seq_id = 0},
@@ -144,9 +135,7 @@ init(_Args) ->
[Recovered, Bad]),
LH
end,
- {Res, LoadedSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
- NewSnapshot = LoadedSnapshot#psnapshot{
- serial = LoadedSnapshot#psnapshot.serial + 1},
+ {Res, NewSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
case Res of
ok ->
ok = take_snapshot(LogHandle, NewSnapshot);
@@ -169,9 +158,6 @@ handle_call({commit_transaction, TxnKey}, From, State) ->
do_noreply(internal_commit(From, TxnKey, State));
handle_call(force_snapshot, _From, State) ->
do_reply(ok, flush(true, State));
-handle_call(serial, _From,
- State = #pstate{snapshot = #psnapshot{serial = Serial}}) ->
- do_reply(Serial, State);
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -282,12 +268,15 @@ take_snapshot_and_save_old(LogHandle, Snapshot) ->
maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount,
log_handle = LH,
- snapshot = Snapshot})
- when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES ->
- ok = take_snapshot(LH, Snapshot),
- State#pstate{entry_count = 0};
-maybe_take_snapshot(_Force, State) ->
- State.
+ snapshot = Snapshot}) ->
+ {ok, MaxWrapEntries} = application:get_env(persister_max_wrap_entries),
+ if
+ Force orelse EntryCount >= MaxWrapEntries ->
+ ok = take_snapshot(LH, Snapshot),
+ State#pstate{entry_count = 0};
+ true ->
+ State
+ end.
later_ms(DeltaMilliSec) ->
{MegaSec, Sec, MicroSec} = now(),
@@ -304,7 +293,8 @@ compute_deadline(_TimerDelay, ExistingDeadline) ->
ExistingDeadline.
compute_timeout(infinity) ->
- ?HIBERNATE_AFTER;
+ {ok, HibernateAfter} = application:get_env(persister_hibernate_after),
+ HibernateAfter;
compute_timeout(Deadline) ->
DeltaMilliSec = time_diff(Deadline, now()) * 1000.0,
if
@@ -343,8 +333,7 @@ flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs,
pending_logs = [],
pending_replies = []}.
-current_snapshot(_Snapshot = #psnapshot{serial = Serial,
- transactions = Ts,
+current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
messages = Messages,
queues = Queues,
next_seq_id = NextSeqId}) ->
@@ -354,8 +343,7 @@ current_snapshot(_Snapshot = #psnapshot{serial = Serial,
fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
sets:add_element(PKey, S)
end, sets:new(), Queues)),
- InnerSnapshot = {{serial, Serial},
- {txns, Ts},
+ InnerSnapshot = {{txns, Ts},
{messages, ets:tab2list(Messages)},
{queues, ets:tab2list(Queues)},
{next_seq_id, NextSeqId}},
@@ -382,13 +370,12 @@ internal_load_snapshot(LogHandle,
{K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
case check_version(Loaded_Snapshot) of
{ok, StateBin} ->
- {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs},
+ {{txns, Ts}, {messages, Ms}, {queues, Qs},
{next_seq_id, NextSeqId}} = binary_to_term(StateBin),
true = ets:insert(Messages, Ms),
true = ets:insert(Queues, Qs),
Snapshot1 = replay(Items, LogHandle, K,
Snapshot#psnapshot{
- serial = Serial,
transactions = Ts,
next_seq_id = NextSeqId}),
Snapshot2 = requeue_messages(Snapshot1),