summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2019-10-30 12:26:59 +0000
committerdcorbacho <dparracorbacho@piotal.io>2019-10-30 12:26:59 +0000
commit0c780e35a94814ac89e439eb9b842179a81101d0 (patch)
tree611587729648c9a4cca08f8e0c0ae6d2e9ed67e7 /src
parent7d455d2784413020d32bfa2ae40e596de2236f99 (diff)
downloadrabbitmq-server-git-0c780e35a94814ac89e439eb9b842179a81101d0.tar.gz
Handle errors and timeouts in local queries
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl20
-rw-r--r--src/rabbit_fifo_client.erl14
-rw-r--r--src/rabbit_quorum_queue.erl68
3 files changed, 68 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5654986ce5..994203c492 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -1582,13 +1582,19 @@ basic_consume(Q,
QName = amqqueue:get_name(Q),
ok = check_consume_arguments(QName, Args),
QState0 = get_quorum_state(Id, QName, QStates),
- {ok, QState} = rabbit_quorum_queue:basic_consume(Q, NoAck, ChPid,
- ConsumerPrefetchCount,
- ConsumerTag,
- ExclusiveConsume, Args,
- ActingUser,
- OkMsg, QState0),
- {ok, maps:put(Name, QState, QStates)}.
+ case rabbit_quorum_queue:basic_consume(Q, NoAck, ChPid,
+ ConsumerPrefetchCount,
+ ConsumerTag,
+ ExclusiveConsume, Args,
+ ActingUser,
+ OkMsg, QState0) of
+ {ok, QState} ->
+ {ok, maps:put(Name, QState, QStates)};
+ {error, Reason} ->
+ rabbit_misc:protocol_error(internal_error,
+ "Cannot consume a message from quorum queue '~s': ~p",
+ [rabbit_misc:rs(QName), Reason])
+ end.
-spec basic_cancel
(amqqueue:amqqueue(), pid(), rabbit_types:ctag(), any(),
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 46064a52dc..c85dd7266a 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -713,8 +713,18 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) ->
Query = fun (State) ->
rabbit_fifo:get_checked_out(ConsumerId, From, To, State)
end,
- {ok, {_, Missing}, _} = ra:local_query(Leader, Query),
- Missing.
+ case ra:local_query(Leader, Query) of
+ {ok, {_, Missing}, _} ->
+ Missing;
+ {error, Error} ->
+ rabbit_misc:protocol_error(internal_error,
+ "Cannot query missing deliveries from ~p: ~p",
+ [Leader, Error]);
+ {timeout, _} ->
+ rabbit_misc:protocol_error(internal_error,
+ "Cannot query missing deliveries from ~p: timeout",
+ [Leader])
+ end.
pick_node(#state{leader = undefined, servers = [N | _]}) ->
%% TODO: pick random rather that first?
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index d46983b4fc..75857f81a8 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -519,25 +519,31 @@ basic_consume(Q, NoAck, ChPid,
Prefetch,
ConsumerMeta,
QState0),
- {ok, {_, SacResult}, _} = ra:local_query(QPid,
- fun rabbit_fifo:query_single_active_consumer/1),
-
- SingleActiveConsumerOn = single_active_consumer_on(Q),
- {IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of
- {false, _} ->
- {true, up};
- {true, {value, {ConsumerTag, ChPid}}} ->
- {true, single_active};
- _ ->
- {false, waiting}
- end,
-
- %% TODO: emit as rabbit_fifo effect
- rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, QName,
- ConsumerPrefetchCount, IsSingleActiveConsumer,
- ActivityStatus, Args),
- {ok, QState}.
+ case ra:local_query(QPid,
+ fun rabbit_fifo:query_single_active_consumer/1) of
+ {ok, {_, SacResult}, _} ->
+
+ SingleActiveConsumerOn = single_active_consumer_on(Q),
+ {IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of
+ {false, _} ->
+ {true, up};
+ {true, {value, {ConsumerTag, ChPid}}} ->
+ {true, single_active};
+ _ ->
+ {false, waiting}
+ end,
+
+ %% TODO: emit as rabbit_fifo effect
+ rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ not NoAck, QName,
+ ConsumerPrefetchCount, IsSingleActiveConsumer,
+ ActivityStatus, Args),
+ {ok, QState};
+ {error, Error} ->
+ Error;
+ {timeout, _} ->
+ {error, timeout}
+ end.
-spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
@@ -1078,14 +1084,26 @@ i(single_active_consumer_ctag, Q) when ?is_amqqueue(Q) ->
i(type, _) -> quorum;
i(messages_ram, Q) when ?is_amqqueue(Q) ->
QPid = amqqueue:get_pid(Q),
- {ok, {_, {Length, _}}, _} = ra:local_query(QPid,
- fun rabbit_fifo:query_in_memory_usage/1),
- Length;
+ case ra:local_query(QPid,
+ fun rabbit_fifo:query_in_memory_usage/1) of
+ {ok, {_, {Length, _}}, _} ->
+ Length;
+ {error, _} ->
+ 0;
+ {timeout, _} ->
+ 0
+ end;
i(message_bytes_ram, Q) when ?is_amqqueue(Q) ->
QPid = amqqueue:get_pid(Q),
- {ok, {_, {_, Bytes}}, _} = ra:local_query(QPid,
- fun rabbit_fifo:query_in_memory_usage/1),
- Bytes;
+ case ra:local_query(QPid,
+ fun rabbit_fifo:query_in_memory_usage/1) of
+ {ok, {_, {_, Bytes}}, _} ->
+ Bytes;
+ {error, _} ->
+ 0;
+ {timeout, _} ->
+ 0
+ end;
i(_K, _Q) -> ''.
open_files(Name) ->