diff options
| author | kjnilsson <knilsson@pivotal.io> | 2020-07-14 12:13:55 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2020-09-07 09:42:10 +0100 |
| commit | 0b81094345ca926e56cd3207abfe1bf2e5ae2965 (patch) | |
| tree | 735c80ef09d071ccc86de3c476b02efb4348d193 /src | |
| parent | b5ec2249f653d6b3be2c8d0332666bf1cf3b020f (diff) | |
| download | rabbitmq-server-git-0b81094345ca926e56cd3207abfe1bf2e5ae2965.tar.gz | |
Test fixes
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo_client.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_fifo_v0.erl | 1 |
2 files changed, 10 insertions, 10 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 9a6cd32a7b..8700b1e6af 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -106,22 +106,23 @@ init(ClusterName, Servers) -> %% @param MaxPending size defining the max number of pending commands. -spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state(). init(ClusterName = #resource{}, Servers, SoftLimit) -> - Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, + Timeout = application:get_env(kernel, net_ticktime, 60) + 5, #state{cfg = #cfg{cluster_name = ClusterName, servers = Servers, soft_limit = SoftLimit, - timeout = Timeout}}. + timeout = Timeout * 1000}}. -spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok), fun(() -> ok)) -> state(). init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> - Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, + %% net ticktime is in seconds + Timeout = application:get_env(kernel, net_ticktime, 60) + 5, #state{cfg = #cfg{cluster_name = ClusterName, servers = Servers, block_handler = BlockFun, unblock_handler = UnblockFun, soft_limit = SoftLimit, - timeout = Timeout}}. + timeout = Timeout * 1000}}. %% @doc Enqueues a message. @@ -141,7 +142,7 @@ init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> enqueue(Correlation, Msg, #state{queue_status = undefined, next_enqueue_seq = 1, - cfg = #cfg{}} = State0) -> + cfg = #cfg{timeout = Timeout}} = State0) -> %% it is the first enqueue, check the version {_, Node} = Server = pick_server(State0), case rpc:call(Node, rabbit_fifo, version, []) of @@ -154,7 +155,7 @@ enqueue(Correlation, Msg, %% were running the new version on the leader do sync initialisation %% of enqueuer session Reg = rabbit_fifo:make_register_enqueuer(self()), - case ra:process_command(Server, Reg) of + case ra:process_command(Server, Reg, Timeout) of {ok, reject_publish, _} -> {reject_publish, State0#state{queue_status = reject_publish}}; {ok, ok, _} -> @@ -167,8 +168,7 @@ enqueue(Correlation, Msg, exit(Err) end; {badrpc, nodedown} -> - rabbit_log:info("rabbit_fifo_client: badrpc for node ~w", [Node]), - State0#state{queue_status = go} + {reject_publish, State0} end; enqueue(_Correlation, _Msg, #state{queue_status = reject_publish, @@ -176,6 +176,7 @@ enqueue(_Correlation, _Msg, {reject_publish, State}; enqueue(Correlation, Msg, #state{slow = Slow, + queue_status = go, cfg = #cfg{block_handler = BlockFun}} = State0) -> Node = pick_server(State0), {Next, State1} = next_enqueue_seq(State0), @@ -694,7 +695,7 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) -> resend_all_pending(#state{pending = Pend} = State) -> Seqs = lists:sort(maps:keys(Pend)), - rabbit_log:info("rabbit_fifo_client resend all pending ~w", [Seqs]), + rabbit_log:info("rabbit_fifo_client: resend all pending ~w", [Seqs]), lists:foldl(fun resend/2, State, Seqs). handle_delivery(From, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, diff --git a/src/rabbit_fifo_v0.erl b/src/rabbit_fifo_v0.erl index 95f665d0a9..51f6bd133e 100644 --- a/src/rabbit_fifo_v0.erl +++ b/src/rabbit_fifo_v0.erl @@ -128,7 +128,6 @@ -spec init(config()) -> state(). init(#{name := Name, queue_resource := Resource} = Conf) -> - rabbit_log:info("rabbit_fifo: init v0 ~p", [Conf]), update_config(Conf, #?STATE{cfg = #cfg{name = Name, resource = Resource}}). |
