diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/credit_flow.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 21 |
2 files changed, 11 insertions, 12 deletions
diff --git a/src/credit_flow.erl b/src/credit_flow.erl index dff339fc80..102c353f9b 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -37,7 +37,7 @@ -ifdef(use_specs). --type(bump_msg() :: {pid(), non_neg_integer()}). +-opaque(bump_msg() :: {pid(), non_neg_integer()}). -type(credit_spec() :: {non_neg_integer(), non_neg_integer()}). -spec(send/1 :: (pid()) -> 'ok'). diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index ac03ca8d75..8c561d1cb3 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -148,7 +148,7 @@ syncer_loop({Ref, MPid} = Args, SPidsMRefs) -> MPid ! {next, Ref}, receive {msg, Ref, Msg, MsgProps} -> - SPidsMRefs1 = wait_for_credit(SPidsMRefs, Ref), + SPidsMRefs1 = wait_for_credit(SPidsMRefs), [begin credit_flow:send(SPid), SPid ! {sync_msg, Ref, Msg, MsgProps} @@ -158,10 +158,16 @@ syncer_loop({Ref, MPid} = Args, SPidsMRefs) -> SPidsMRefs end. -wait_for_credit(SPidsMRefs, Ref) -> +wait_for_credit(SPidsMRefs) -> case credit_flow:blocked() of - true -> wait_for_credit(foreach_slave(SPidsMRefs, Ref, - fun sync_receive_credit/3), Ref); + true -> receive + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg), + wait_for_credit(SPidsMRefs); + {'DOWN', MRef, _, SPid, _} -> + credit_flow:peer_down(SPid), + wait_for_credit(lists:delete({SPid, MRef}, SPidsMRefs)) + end; false -> SPidsMRefs end. @@ -176,13 +182,6 @@ sync_receive_ready(SPid, MRef, Ref) -> {'DOWN', MRef, _, SPid, _} -> ignore end. -sync_receive_credit(SPid, MRef, _Ref) -> - receive - {bump_credit, {SPid, _} = Msg} -> credit_flow:handle_bump_msg(Msg), - SPid; - {'DOWN', MRef, _, SPid, _} -> credit_flow:peer_down(SPid), - ignore - end. sync_send_complete(SPid, _MRef, Ref) -> SPid ! {sync_complete, Ref}. |
