summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo_client.erl52
-rw-r--r--src/rabbit_quorum_queue.erl12
2 files changed, 52 insertions, 12 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 04918c3eb9..a1bf82367b 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -46,6 +46,7 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-define(SOFT_LIMIT, 256).
+-define(TIMER_TIME, 10000).
-type seq() :: non_neg_integer().
-type action() :: {send_credit_reply, Available :: non_neg_integer()} |
@@ -73,15 +74,17 @@
{maybe(term()), rabbit_fifo:command()}},
consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() =>
#consumer{}},
- block_handler = fun() -> ok end :: fun(() -> ok),
+ block_handler = fun() -> ok end :: fun(() -> term()),
unblock_handler = fun() -> ok end :: fun(() -> ok),
+ timer_state :: term(),
timeout :: non_neg_integer()
}).
-opaque state() :: #state{}.
-export_type([
- state/0
+ state/0,
+ actions/0
]).
@@ -140,9 +143,9 @@ enqueue(Correlation, Msg, State0 = #state{slow = Slow,
% by default there is no correlation id
Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg),
case send_command(Node, Correlation, Cmd, low, State1) of
- {slow, _} = Ret when not Slow ->
+ {slow, State} when not Slow ->
BlockFun(),
- Ret;
+ {slow, set_timer(State)};
Any ->
Any
end.
@@ -478,9 +481,14 @@ handle_ra_event(From, {applied, Seqs},
case maps:size(State1#state.pending) < SftLmt of
true when State1#state.slow == true ->
% we have exited soft limit state
- % send any unsent commands
- State2 = State1#state{slow = false,
- unsent_commands = #{}},
+ % send any unsent commands and cancel the time as
+ % TODO: really the timer should only be cancelled when the channel
+ % exits flow state (which depends on the state of all queues the
+ % channel is interacting with)
+ % but the fact the queue has just applied suggests
+ % it's ok to cancel here anyway
+ State2 = cancel_timer(State1#state{slow = false,
+ unsent_commands = #{}}),
% build up a list of commands to issue
Commands = maps:fold(
fun (Cid, {Settled, Returns, Discards}, Acc) ->
@@ -523,6 +531,15 @@ handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) ->
State1 = State0#state{leader = Leader},
State = resend(Seq, State1),
{internal, [], [], State};
+handle_ra_event(_, timeout, #state{servers = Servers} = State0) ->
+ case find_leader(Servers) of
+ undefined ->
+ %% still no leader, set the timer again
+ {internal, [], [], set_timer(State0)};
+ Leader ->
+ State = resend_all_pending(State0#state{leader = Leader}),
+ {internal, [], [], State}
+ end;
handle_ra_event(_Leader, {machine, eol}, _State0) ->
eol.
@@ -729,3 +746,24 @@ add_command(Cid, return, MsgIds, Acc) ->
[rabbit_fifo:make_settle(Cid, MsgIds) | Acc];
add_command(Cid, discard, MsgIds, Acc) ->
[rabbit_fifo:make_settle(Cid, MsgIds) | Acc].
+
+set_timer(#state{servers = [Server | _]} = State) ->
+ Ref = erlang:send_after(?TIMER_TIME, self(),
+ {ra_event, Server, timeout}),
+ State#state{timer_state = Ref}.
+
+cancel_timer(#state{timer_state = undefined} = State) ->
+ State;
+cancel_timer(#state{timer_state = Ref} = State) ->
+ erlang:cancel_timer(Ref, [{async, true}, {info, false}]),
+ State#state{timer_state = undefined}.
+
+find_leader([]) ->
+ undefined;
+find_leader([Server | Servers]) ->
+ case ra:members(Server, 500) of
+ {ok, _, Leader} -> Leader;
+ _ ->
+ find_leader(Servers)
+ end.
+
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 79c07535a4..e1c1c320f4 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -81,13 +81,15 @@ init_state({Name, _}, QName = #resource{}) ->
Servers0 = [{Name, N} || N <- Nodes],
Servers = [Leader | lists:delete(Leader, Servers0)],
rabbit_fifo_client:init(QName, Servers, SoftLimit,
- fun() -> credit_flow:block(Name), ok end,
+ fun() -> credit_flow:block(Name) end,
fun() -> credit_flow:unblock(Name), ok end).
--spec handle_event({'ra_event', amqqueue:ra_server_id(), any()}, rabbit_fifo_client:state()) ->
- {internal, Correlators :: [term()], rabbit_fifo_client:actions(), rabbit_fifo_client:state()} |
- {rabbit_fifo:client_msg(), rabbit_fifo_client:state()} | eol.
-
+-spec handle_event({'ra_event', amqqueue:ra_server_id(), any()},
+ rabbit_fifo_client:state()) ->
+ {internal, Correlators :: [term()], rabbit_fifo_client:actions(),
+ rabbit_fifo_client:state()} |
+ {rabbit_fifo:client_msg(), rabbit_fifo_client:state()} |
+ eol.
handle_event({ra_event, From, Evt}, QState) ->
rabbit_fifo_client:handle_ra_event(From, Evt, QState).