summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2010-11-10 11:47:51 +0000
committerRob Harrop <rob@rabbitmq.com>2010-11-10 11:47:51 +0000
commitcb4825213644553644b61df28f8aa6e37496b053 (patch)
tree5204b3232354c3683e0150a2662ad1e1771960ee
parentfb9bf1afff3699236b0af404c70359c0d42483e1 (diff)
downloadrabbitmq-server-git-cb4825213644553644b61df28f8aa6e37496b053.tar.gz
Refactored rabbit_heartbeat to provide start_heartbeat_fun
-rw-r--r--src/rabbit_connection_sup.erl28
-rw-r--r--src/rabbit_heartbeat.erl35
2 files changed, 37 insertions, 26 deletions
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index b3821d3b8b..bb5ed916da 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -79,21 +79,13 @@ init([]) ->
{ok, {{one_for_all, 0, 1}, []}}.
start_heartbeat_fun(SupPid) ->
- fun (_Sock, 0) ->
- none;
- (Sock, TimeoutSec) ->
- Parent = self(),
- {ok, Sender} =
- supervisor2:start_child(
- SupPid, {heartbeat_sender,
- {rabbit_heartbeat, start_heartbeat_sender,
- [Parent, Sock, TimeoutSec]},
- transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
- {ok, Receiver} =
- supervisor2:start_child(
- SupPid, {heartbeat_receiver,
- {rabbit_heartbeat, start_heartbeat_receiver,
- [Parent, Sock, TimeoutSec]},
- transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
- {Sender, Receiver}
- end.
+ SendFun = fun(Sock) ->
+ Frame = rabbit_binary_generator:build_heartbeat_frame(),
+ catch rabbit_net:send(Sock, Frame)
+ end,
+
+ Parent = self(),
+ TimeoutFun = fun() ->
+ Parent ! timeout
+ end,
+ rabbit_heartbeat:start_heartbeat_fun(SupPid, SendFun, TimeoutFun).
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index a9945af1d4..08462d74ce 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -32,7 +32,7 @@
-module(rabbit_heartbeat).
-export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
- pause_monitor/1, resume_monitor/1]).
+ start_heartbeat_fun/3, pause_monitor/1, resume_monitor/1]).
-include("rabbit.hrl").
@@ -43,12 +43,13 @@
-export_type([heartbeaters/0]).
-type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})).
+-type(callback_fun() :: fun (() -> any())).
-spec(start_heartbeat_sender/3 ::
- (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ (rabbit_net:socket(), non_neg_integer(), callback_fun()) ->
rabbit_types:ok(pid())).
-spec(start_heartbeat_receiver/3 ::
- (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ (rabbit_net:socket(), non_neg_integer(), callback_fun()) ->
rabbit_types:ok(pid())).
-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
@@ -58,27 +59,45 @@
%%----------------------------------------------------------------------------
-start_heartbeat_sender(_Parent, Sock, TimeoutSec) ->
+start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
heartbeater(
{Sock, TimeoutSec * 1000 div 2, send_oct, 0,
fun () ->
- catch rabbit_net:send(
- Sock, rabbit_binary_generator:build_heartbeat_frame()),
+ SendFun(Sock),
continue
end}).
-start_heartbeat_receiver(Parent, Sock, TimeoutSec) ->
+start_heartbeat_receiver(Sock, TimeoutSec, TimeoutFun) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () ->
- Parent ! timeout,
+ TimeoutFun(),
stop
end}).
+start_heartbeat_fun(SupPid, SendFun, TimeoutFun) ->
+ fun (_Sock, 0) ->
+ none;
+ (Sock, TimeoutSec) ->
+ {ok, Sender} =
+ supervisor2:start_child(
+ SupPid, {heartbeat_sender,
+ {rabbit_heartbeat, start_heartbeat_sender,
+ [Sock, TimeoutSec, SendFun]},
+ transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ {ok, Receiver} =
+ supervisor2:start_child(
+ SupPid, {heartbeat_receiver,
+ {rabbit_heartbeat, start_heartbeat_receiver,
+ [Sock, TimeoutSec, TimeoutFun]},
+ transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ {Sender, Receiver}
+ end.
+
pause_monitor(none) ->
ok;
pause_monitor({_Sender, Receiver}) ->