summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-07-14 12:13:55 +0100
committerkjnilsson <knilsson@pivotal.io>2020-09-07 09:42:10 +0100
commit0b81094345ca926e56cd3207abfe1bf2e5ae2965 (patch)
tree735c80ef09d071ccc86de3c476b02efb4348d193 /src
parentb5ec2249f653d6b3be2c8d0332666bf1cf3b020f (diff)
downloadrabbitmq-server-git-0b81094345ca926e56cd3207abfe1bf2e5ae2965.tar.gz
Test fixes
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo_client.erl19
-rw-r--r--src/rabbit_fifo_v0.erl1
2 files changed, 10 insertions, 10 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 9a6cd32a7b..8700b1e6af 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -106,22 +106,23 @@ init(ClusterName, Servers) ->
%% @param MaxPending size defining the max number of pending commands.
-spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit) ->
- Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
+ Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
#state{cfg = #cfg{cluster_name = ClusterName,
servers = Servers,
soft_limit = SoftLimit,
- timeout = Timeout}}.
+ timeout = Timeout * 1000}}.
-spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok),
fun(() -> ok)) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
- Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
+ %% net ticktime is in seconds
+ Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
#state{cfg = #cfg{cluster_name = ClusterName,
servers = Servers,
block_handler = BlockFun,
unblock_handler = UnblockFun,
soft_limit = SoftLimit,
- timeout = Timeout}}.
+ timeout = Timeout * 1000}}.
%% @doc Enqueues a message.
@@ -141,7 +142,7 @@ init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
enqueue(Correlation, Msg,
#state{queue_status = undefined,
next_enqueue_seq = 1,
- cfg = #cfg{}} = State0) ->
+ cfg = #cfg{timeout = Timeout}} = State0) ->
%% it is the first enqueue, check the version
{_, Node} = Server = pick_server(State0),
case rpc:call(Node, rabbit_fifo, version, []) of
@@ -154,7 +155,7 @@ enqueue(Correlation, Msg,
%% were running the new version on the leader do sync initialisation
%% of enqueuer session
Reg = rabbit_fifo:make_register_enqueuer(self()),
- case ra:process_command(Server, Reg) of
+ case ra:process_command(Server, Reg, Timeout) of
{ok, reject_publish, _} ->
{reject_publish, State0#state{queue_status = reject_publish}};
{ok, ok, _} ->
@@ -167,8 +168,7 @@ enqueue(Correlation, Msg,
exit(Err)
end;
{badrpc, nodedown} ->
- rabbit_log:info("rabbit_fifo_client: badrpc for node ~w", [Node]),
- State0#state{queue_status = go}
+ {reject_publish, State0}
end;
enqueue(_Correlation, _Msg,
#state{queue_status = reject_publish,
@@ -176,6 +176,7 @@ enqueue(_Correlation, _Msg,
{reject_publish, State};
enqueue(Correlation, Msg,
#state{slow = Slow,
+ queue_status = go,
cfg = #cfg{block_handler = BlockFun}} = State0) ->
Node = pick_server(State0),
{Next, State1} = next_enqueue_seq(State0),
@@ -694,7 +695,7 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) ->
resend_all_pending(#state{pending = Pend} = State) ->
Seqs = lists:sort(maps:keys(Pend)),
- rabbit_log:info("rabbit_fifo_client resend all pending ~w", [Seqs]),
+ rabbit_log:info("rabbit_fifo_client: resend all pending ~w", [Seqs]),
lists:foldl(fun resend/2, State, Seqs).
handle_delivery(From, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,
diff --git a/src/rabbit_fifo_v0.erl b/src/rabbit_fifo_v0.erl
index 95f665d0a9..51f6bd133e 100644
--- a/src/rabbit_fifo_v0.erl
+++ b/src/rabbit_fifo_v0.erl
@@ -128,7 +128,6 @@
-spec init(config()) -> state().
init(#{name := Name,
queue_resource := Resource} = Conf) ->
- rabbit_log:info("rabbit_fifo: init v0 ~p", [Conf]),
update_config(Conf, #?STATE{cfg = #cfg{name = Name,
resource = Resource}}).