diff options
| author | Rob Harrop <rob@rabbitmq.com> | 2010-11-10 11:47:51 +0000 |
|---|---|---|
| committer | Rob Harrop <rob@rabbitmq.com> | 2010-11-10 11:47:51 +0000 |
| commit | cb4825213644553644b61df28f8aa6e37496b053 (patch) | |
| tree | 5204b3232354c3683e0150a2662ad1e1771960ee | |
| parent | fb9bf1afff3699236b0af404c70359c0d42483e1 (diff) | |
| download | rabbitmq-server-git-cb4825213644553644b61df28f8aa6e37496b053.tar.gz | |
Refactored rabbit_heartbeat to provide start_heartbeat_fun
| -rw-r--r-- | src/rabbit_connection_sup.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_heartbeat.erl | 35 |
2 files changed, 37 insertions, 26 deletions
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index b3821d3b8b..bb5ed916da 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -79,21 +79,13 @@ init([]) -> {ok, {{one_for_all, 0, 1}, []}}. start_heartbeat_fun(SupPid) -> - fun (_Sock, 0) -> - none; - (Sock, TimeoutSec) -> - Parent = self(), - {ok, Sender} = - supervisor2:start_child( - SupPid, {heartbeat_sender, - {rabbit_heartbeat, start_heartbeat_sender, - [Parent, Sock, TimeoutSec]}, - transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), - {ok, Receiver} = - supervisor2:start_child( - SupPid, {heartbeat_receiver, - {rabbit_heartbeat, start_heartbeat_receiver, - [Parent, Sock, TimeoutSec]}, - transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), - {Sender, Receiver} - end. + SendFun = fun(Sock) -> + Frame = rabbit_binary_generator:build_heartbeat_frame(), + catch rabbit_net:send(Sock, Frame) + end, + + Parent = self(), + TimeoutFun = fun() -> + Parent ! timeout + end, + rabbit_heartbeat:start_heartbeat_fun(SupPid, SendFun, TimeoutFun). diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index a9945af1d4..08462d74ce 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -32,7 +32,7 @@ -module(rabbit_heartbeat). -export([start_heartbeat_sender/3, start_heartbeat_receiver/3, - pause_monitor/1, resume_monitor/1]). + start_heartbeat_fun/3, pause_monitor/1, resume_monitor/1]). -include("rabbit.hrl"). @@ -43,12 +43,13 @@ -export_type([heartbeaters/0]). -type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})). +-type(callback_fun() :: fun (() -> any())). -spec(start_heartbeat_sender/3 :: - (pid(), rabbit_net:socket(), non_neg_integer()) -> + (rabbit_net:socket(), non_neg_integer(), callback_fun()) -> rabbit_types:ok(pid())). -spec(start_heartbeat_receiver/3 :: - (pid(), rabbit_net:socket(), non_neg_integer()) -> + (rabbit_net:socket(), non_neg_integer(), callback_fun()) -> rabbit_types:ok(pid())). -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -58,27 +59,45 @@ %%---------------------------------------------------------------------------- -start_heartbeat_sender(_Parent, Sock, TimeoutSec) -> +start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. heartbeater( {Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> - catch rabbit_net:send( - Sock, rabbit_binary_generator:build_heartbeat_frame()), + SendFun(Sock), continue end}). -start_heartbeat_receiver(Parent, Sock, TimeoutSec) -> +start_heartbeat_receiver(Sock, TimeoutSec, TimeoutFun) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> - Parent ! timeout, + TimeoutFun(), stop end}). +start_heartbeat_fun(SupPid, SendFun, TimeoutFun) -> + fun (_Sock, 0) -> + none; + (Sock, TimeoutSec) -> + {ok, Sender} = + supervisor2:start_child( + SupPid, {heartbeat_sender, + {rabbit_heartbeat, start_heartbeat_sender, + [Sock, TimeoutSec, SendFun]}, + transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {ok, Receiver} = + supervisor2:start_child( + SupPid, {heartbeat_receiver, + {rabbit_heartbeat, start_heartbeat_receiver, + [Sock, TimeoutSec, TimeoutFun]}, + transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {Sender, Receiver} + end. + pause_monitor(none) -> ok; pause_monitor({_Sender, Receiver}) -> |
