diff options
Diffstat (limited to 'src/gm.erl')
| -rw-r--r-- | src/gm.erl | 106 |
1 files changed, 70 insertions, 36 deletions
diff --git a/src/gm.erl b/src/gm.erl index df1c258d70..5a82950a41 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -382,7 +382,7 @@ -behaviour(gen_server2). --export([create_tables/0, start_link/4, leave/1, broadcast/2, +-export([create_tables/0, start_link/4, leave/1, broadcast/2, broadcast/3, confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -395,6 +395,7 @@ -export([table_definitions/0]). -define(GROUP_TABLE, gm_group). +-define(MAX_BUFFER_SIZE, 100000000). %% 100MB -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). -define(BROADCAST_TIMER, 25). @@ -414,6 +415,7 @@ callback_args, confirms, broadcast_buffer, + broadcast_buffer_sz, broadcast_timer, txn_executor }). @@ -522,8 +524,10 @@ start_link(GroupName, Module, Args, TxnFun) -> leave(Server) -> gen_server2:cast(Server, leave). -broadcast(Server, Msg) -> - gen_server2:cast(Server, {broadcast, Msg}). +broadcast(Server, Msg) -> broadcast(Server, Msg, 0). + +broadcast(Server, Msg, SizeHint) -> + gen_server2:cast(Server, {broadcast, Msg, SizeHint}). confirmed_broadcast(Server, Msg) -> gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity). @@ -547,19 +551,20 @@ init([GroupName, Module, Args, TxnFun]) -> random:seed(MegaSecs, Secs, MicroSecs), Self = make_member(GroupName), gen_server2:cast(self(), join), - {ok, #state { self = Self, - left = {Self, undefined}, - right = {Self, undefined}, - group_name = GroupName, - module = Module, - view = undefined, - pub_count = -1, - members_state = undefined, - callback_args = Args, - confirms = queue:new(), - broadcast_buffer = [], - broadcast_timer = undefined, - txn_executor = TxnFun }, hibernate, + {ok, #state { self = Self, + left = {Self, undefined}, + right = {Self, undefined}, + group_name = GroupName, + module = Module, + view = undefined, + pub_count = -1, + members_state = undefined, + callback_args = Args, + confirms = queue:new(), + broadcast_buffer = [], + broadcast_buffer_sz = 0, + broadcast_timer = undefined, + txn_executor = TxnFun }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -576,7 +581,7 @@ handle_call({confirmed_broadcast, Msg}, _From, ok, State}); handle_call({confirmed_broadcast, Msg}, From, State) -> - internal_broadcast(Msg, From, State); + internal_broadcast(Msg, From, 0, State); handle_call(info, _From, State = #state { members_state = undefined }) -> @@ -639,10 +644,11 @@ handle_cast({?TAG, ReqVer, Msg}, if_callback_success( Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)); -handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) -> +handle_cast({broadcast, _Msg, _SizeHint}, + State = #state { members_state = undefined }) -> noreply(State); -handle_cast({broadcast, Msg}, +handle_cast({broadcast, Msg, _SizeHint}, State = #state { self = Self, right = {Self, undefined}, module = Module, @@ -650,8 +656,8 @@ handle_cast({broadcast, Msg}, handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), State}); -handle_cast({broadcast, Msg}, State) -> - internal_broadcast(Msg, none, State); +handle_cast({broadcast, Msg, SizeHint}, State) -> + internal_broadcast(Msg, none, SizeHint, State); handle_cast(join, State = #state { self = Self, group_name = GroupName, @@ -883,12 +889,14 @@ ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) -> ensure_broadcast_timer(State) -> State. -internal_broadcast(Msg, From, State = #state { self = Self, - pub_count = PubCount, - module = Module, - confirms = Confirms, - callback_args = Args, - broadcast_buffer = Buffer }) -> +internal_broadcast(Msg, From, SizeHint, + State = #state { self = Self, + pub_count = PubCount, + module = Module, + confirms = Confirms, + callback_args = Args, + broadcast_buffer = Buffer, + broadcast_buffer_sz = BufferSize }) -> PubCount1 = PubCount + 1, Result = Module:handle_msg(Args, get_pid(Self), Msg), Buffer1 = [{PubCount1, Msg} | Buffer], @@ -896,13 +904,38 @@ internal_broadcast(Msg, From, State = #state { self = Self, none -> Confirms; _ -> queue:in({PubCount1, From}, Confirms) end, - State1 = State #state { pub_count = PubCount1, - confirms = Confirms1, - broadcast_buffer = Buffer1 }, - handle_callback_result({Result, case From of - none -> State1; - _ -> flush_broadcast_buffer(State1) - end}). + State1 = State #state { pub_count = PubCount1, + confirms = Confirms1, + broadcast_buffer = Buffer1, + broadcast_buffer_sz = BufferSize + SizeHint}, + handle_callback_result( + {Result, case From of + none -> maybe_flush_broadcast_buffer(State1); + _ -> flush_broadcast_buffer(State1) + end}). + +%% The Erlang distribution mechanism has an interesting quirk - it +%% will kill the VM cold with "Absurdly large distribution output data +%% buffer" if you attempt to send a message which serialises out to +%% more than 2^31 bytes in size. It's therefore a very good idea to +%% make sure that we don't exceed that size! +%% +%% Now, we could figure out the size of messages as they come in using +%% size(term_to_binary(Msg)) or similar. The trouble is, that requires +%% us to serialise the message only to throw the serialised form +%% away. Hard to believe that's a sensible thing to do. So instead we +%% accept a size hint from the application, via broadcast/3. This size +%% hint can be the size of anything in the message which we expect +%% could be large, and we just ignore the size of any small bits of +%% the message term. Therefore MAX_BUFFER_SIZE is set somewhat +%% conservatively at 100MB - but the buffer is only to allow us to +%% buffer tiny messages anyway, so 100MB is plenty. + +maybe_flush_broadcast_buffer(State = #state{broadcast_buffer_sz = Size}) -> + case Size > ?MAX_BUFFER_SIZE of + true -> flush_broadcast_buffer(State); + false -> State + end. flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) -> State; @@ -920,8 +953,9 @@ flush_broadcast_buffer(State = #state { self = Self, Member #member { pending_ack = PA1, last_pub = PubCount } end, Self, MembersState), - State #state { members_state = MembersState1, - broadcast_buffer = [] }. + State #state { members_state = MembersState1, + broadcast_buffer = [], + broadcast_buffer_sz = 0}. %% --------------------------------------------------------------------------- |
