summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-03-16 12:44:13 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-03-16 12:44:13 +0000
commite144a091d293a8b3f20720e777aac53406c5b088 (patch)
tree463fbc1534d24b34b5a6078f843dfeb2838a84f3
parentde473d29cb982244e466397bcbebb0fc9412d685 (diff)
downloadrabbitmq-server-git-e144a091d293a8b3f20720e777aac53406c5b088.tar.gz
Made gm do batching of messages. This has an astonishing performance impact: if every broadcast msg to the gm results in network activity then performance is low - presumably serialisation of, and network broadcast of small messages is very inefficient. By batching broadcasts and then sending many on a timer, performance is much much higher.
-rw-r--r--src/gm.erl134
-rw-r--r--src/gm_soak_test.erl8
2 files changed, 96 insertions, 46 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 8cf225815e..5b3623cf81 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -376,15 +376,16 @@
confirmed_broadcast/2, group_members/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3, prioritise_info/2]).
+ code_change/3, prioritise_cast/2, prioritise_info/2]).
-export([behaviour_info/1]).
--export([table_definitions/0]).
+-export([table_definitions/0, flush/1]).
-define(GROUP_TABLE, gm_group).
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
+-define(BROADCAST_TIMER, 25).
-define(SETS, ordsets).
-define(DICT, orddict).
@@ -398,7 +399,9 @@
pub_count,
members_state,
callback_args,
- confirms
+ confirms,
+ broadcast_buffer,
+ broadcast_timer
}).
-record(gm_group, { name, version, members }).
@@ -508,21 +511,26 @@ confirmed_broadcast(Server, Msg) ->
group_members(Server) ->
gen_server2:call(Server, group_members, infinity).
+flush(Server) ->
+ gen_server2:cast(Server, flush).
+
init([GroupName, Module, Args]) ->
random:seed(now()),
gen_server2:cast(self(), join),
Self = self(),
- {ok, #state { self = Self,
- left = {Self, undefined},
- right = {Self, undefined},
- group_name = GroupName,
- module = Module,
- view = undefined,
- pub_count = 0,
- members_state = undefined,
- callback_args = Args,
- confirms = queue:new() }, hibernate,
+ {ok, #state { self = Self,
+ left = {Self, undefined},
+ right = {Self, undefined},
+ group_name = GroupName,
+ module = Module,
+ view = undefined,
+ pub_count = 0,
+ members_state = undefined,
+ callback_args = Args,
+ confirms = queue:new(),
+ broadcast_buffer = [],
+ broadcast_timer = undefined }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -620,7 +628,11 @@ handle_cast(join, State = #state { self = Self,
{Module:joined(Args, all_known_members(View)), State1});
handle_cast(leave, State) ->
- {stop, normal, State}.
+ {stop, normal, State};
+
+handle_cast(flush, State) ->
+ noreply(
+ flush_broadcast_buffer(State #state { broadcast_timer = undefined })).
handle_info({'DOWN', MRef, process, _Pid, _Reason},
@@ -662,14 +674,17 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason},
end.
-terminate(Reason, #state { module = Module,
- callback_args = Args }) ->
+terminate(Reason, State = #state { module = Module,
+ callback_args = Args }) ->
+ flush_broadcast_buffer(State),
Module:terminate(Args, Reason).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+prioritise_cast(flush, _State) -> 1;
+prioritise_cast(_ , _State) -> 0.
prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1;
prioritise_info(_ , _State) -> 0.
@@ -782,33 +797,62 @@ handle_msg({activity, _NotLeft, _Activity}, State) ->
noreply(State) ->
- {noreply, State, hibernate}.
+ {noreply, ensure_broadcast_timer(State), hibernate}.
reply(Reply, State) ->
- {reply, Reply, State, hibernate}.
-
-internal_broadcast(Msg, From, State = #state { self = Self,
- pub_count = PubCount,
- members_state = MembersState,
- module = Module,
- confirms = Confirms,
- callback_args = Args }) ->
- PubMsg = {PubCount, Msg},
- Activity = activity_cons(Self, [PubMsg], [], activity_nil()),
- ok = maybe_send_activity(activity_finalise(Activity), State),
- MembersState1 =
- with_member(
- fun (Member = #member { pending_ack = PA }) ->
- Member #member { pending_ack = queue:in(PubMsg, PA) }
- end, Self, MembersState),
+ {reply, Reply, ensure_broadcast_timer(State), hibernate}.
+
+ensure_broadcast_timer(State = #state { broadcast_buffer = [],
+ broadcast_timer = undefined }) ->
+ State;
+ensure_broadcast_timer(State = #state { broadcast_buffer = [],
+ broadcast_timer = TRef }) ->
+ timer:cancel(TRef),
+ State #state { broadcast_timer = undefined };
+ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
+ {ok, TRef} = timer:apply_after(?BROADCAST_TIMER, ?MODULE, flush, [self()]),
+ State #state { broadcast_timer = TRef };
+ensure_broadcast_timer(State) ->
+ State.
+
+internal_broadcast(Msg, From, State = #state { self = Self,
+ pub_count = PubCount,
+ module = Module,
+ confirms = Confirms,
+ callback_args = Args,
+ broadcast_buffer = Buffer }) ->
+ Result = Module:handle_msg(Args, Self, Msg),
+ Buffer1 = [{PubCount, Msg} | Buffer],
Confirms1 = case From of
none -> Confirms;
_ -> queue:in({PubCount, From}, Confirms)
end,
- handle_callback_result({Module:handle_msg(Args, Self, Msg),
- State #state { pub_count = PubCount + 1,
- members_state = MembersState1,
- confirms = Confirms1 }}).
+ State1 = State #state { pub_count = PubCount + 1,
+ confirms = Confirms1,
+ broadcast_buffer = Buffer1 },
+ case From =/= none of
+ true ->
+ handle_callback_result({Result, flush_broadcast_buffer(State1)});
+ false ->
+ handle_callback_result(
+ {Result, State1 #state { broadcast_buffer = Buffer1 }})
+ end.
+
+flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
+ State;
+flush_broadcast_buffer(State = #state { self = Self,
+ members_state = MembersState,
+ broadcast_buffer = Buffer }) ->
+ Pubs = lists:reverse(Buffer),
+ Activity = activity_cons(Self, Pubs, [], activity_nil()),
+ ok = maybe_send_activity(activity_finalise(Activity), State),
+ MembersState1 = with_member(
+ fun (Member = #member { pending_ack = PA }) ->
+ PA1 = queue:join(PA, queue:from_list(Pubs)),
+ Member #member { pending_ack = PA1 }
+ end, Self, MembersState),
+ State #state { members_state = MembersState1,
+ broadcast_buffer = [] }.
%% ---------------------------------------------------------------------------
@@ -1093,16 +1137,22 @@ maybe_monitor(Self, Self) ->
maybe_monitor(Other, _Self) ->
erlang:monitor(process, Other).
-check_neighbours(State = #state { self = Self,
- left = Left,
- right = Right,
- view = View }) ->
+check_neighbours(State = #state { self = Self,
+ left = Left,
+ right = Right,
+ view = View,
+ broadcast_buffer = Buffer }) ->
#view_member { left = VLeft, right = VRight }
= fetch_view_member(Self, View),
Ver = view_version(View),
Left1 = ensure_neighbour(Ver, Self, Left, VLeft),
Right1 = ensure_neighbour(Ver, Self, Right, VRight),
- State1 = State #state { left = Left1, right = Right1 },
+ Buffer1 = case Right1 of
+ {Self, undefined} -> [];
+ _ -> Buffer
+ end,
+ State1 = State #state { left = Left1, right = Right1,
+ broadcast_buffer = Buffer1 },
ok = maybe_send_catchup(Right, State1),
State1.
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index 1f8832a6b2..4e30e1d57a 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -80,12 +80,12 @@ handle_msg([], From, {test_msg, Num}) ->
{ok, Num} -> ok;
{ok, Num1} when Num < Num1 ->
exit({{from, From},
- {duplicate_delivery_of, Num1},
- {expecting, Num}});
+ {duplicate_delivery_of, Num},
+ {expecting, Num1}});
{ok, Num1} ->
exit({{from, From},
- {missing_delivery_of, Num},
- {received_early, Num1}});
+ {received_early, Num},
+ {expecting, Num1}});
error ->
exit({{from, From},
{received_premature_delivery, Num}})