diff options
| author | dcorbacho <dparracorbacho@piotal.io> | 2019-10-30 12:26:59 +0000 |
|---|---|---|
| committer | dcorbacho <dparracorbacho@piotal.io> | 2019-10-30 12:26:59 +0000 |
| commit | 0c780e35a94814ac89e439eb9b842179a81101d0 (patch) | |
| tree | 611587729648c9a4cca08f8e0c0ae6d2e9ed67e7 /src | |
| parent | 7d455d2784413020d32bfa2ae40e596de2236f99 (diff) | |
| download | rabbitmq-server-git-0c780e35a94814ac89e439eb9b842179a81101d0.tar.gz | |
Handle errors and timeouts in local queries
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 68 |
3 files changed, 68 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5654986ce5..994203c492 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -1582,13 +1582,19 @@ basic_consume(Q, QName = amqqueue:get_name(Q), ok = check_consume_arguments(QName, Args), QState0 = get_quorum_state(Id, QName, QStates), - {ok, QState} = rabbit_quorum_queue:basic_consume(Q, NoAck, ChPid, - ConsumerPrefetchCount, - ConsumerTag, - ExclusiveConsume, Args, - ActingUser, - OkMsg, QState0), - {ok, maps:put(Name, QState, QStates)}. + case rabbit_quorum_queue:basic_consume(Q, NoAck, ChPid, + ConsumerPrefetchCount, + ConsumerTag, + ExclusiveConsume, Args, + ActingUser, + OkMsg, QState0) of + {ok, QState} -> + {ok, maps:put(Name, QState, QStates)}; + {error, Reason} -> + rabbit_misc:protocol_error(internal_error, + "Cannot consume a message from quorum queue '~s': ~p", + [rabbit_misc:rs(QName), Reason]) + end. -spec basic_cancel (amqqueue:amqqueue(), pid(), rabbit_types:ctag(), any(), diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 46064a52dc..c85dd7266a 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -713,8 +713,18 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) -> Query = fun (State) -> rabbit_fifo:get_checked_out(ConsumerId, From, To, State) end, - {ok, {_, Missing}, _} = ra:local_query(Leader, Query), - Missing. + case ra:local_query(Leader, Query) of + {ok, {_, Missing}, _} -> + Missing; + {error, Error} -> + rabbit_misc:protocol_error(internal_error, + "Cannot query missing deliveries from ~p: ~p", + [Leader, Error]); + {timeout, _} -> + rabbit_misc:protocol_error(internal_error, + "Cannot query missing deliveries from ~p: timeout", + [Leader]) + end. pick_node(#state{leader = undefined, servers = [N | _]}) -> %% TODO: pick random rather that first? diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index d46983b4fc..75857f81a8 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -519,25 +519,31 @@ basic_consume(Q, NoAck, ChPid, Prefetch, ConsumerMeta, QState0), - {ok, {_, SacResult}, _} = ra:local_query(QPid, - fun rabbit_fifo:query_single_active_consumer/1), - - SingleActiveConsumerOn = single_active_consumer_on(Q), - {IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of - {false, _} -> - {true, up}; - {true, {value, {ConsumerTag, ChPid}}} -> - {true, single_active}; - _ -> - {false, waiting} - end, - - %% TODO: emit as rabbit_fifo effect - rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, QName, - ConsumerPrefetchCount, IsSingleActiveConsumer, - ActivityStatus, Args), - {ok, QState}. + case ra:local_query(QPid, + fun rabbit_fifo:query_single_active_consumer/1) of + {ok, {_, SacResult}, _} -> + + SingleActiveConsumerOn = single_active_consumer_on(Q), + {IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of + {false, _} -> + {true, up}; + {true, {value, {ConsumerTag, ChPid}}} -> + {true, single_active}; + _ -> + {false, waiting} + end, + + %% TODO: emit as rabbit_fifo effect + rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + not NoAck, QName, + ConsumerPrefetchCount, IsSingleActiveConsumer, + ActivityStatus, Args), + {ok, QState}; + {error, Error} -> + Error; + {timeout, _} -> + {error, timeout} + end. -spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. @@ -1078,14 +1084,26 @@ i(single_active_consumer_ctag, Q) when ?is_amqqueue(Q) -> i(type, _) -> quorum; i(messages_ram, Q) when ?is_amqqueue(Q) -> QPid = amqqueue:get_pid(Q), - {ok, {_, {Length, _}}, _} = ra:local_query(QPid, - fun rabbit_fifo:query_in_memory_usage/1), - Length; + case ra:local_query(QPid, + fun rabbit_fifo:query_in_memory_usage/1) of + {ok, {_, {Length, _}}, _} -> + Length; + {error, _} -> + 0; + {timeout, _} -> + 0 + end; i(message_bytes_ram, Q) when ?is_amqqueue(Q) -> QPid = amqqueue:get_pid(Q), - {ok, {_, {_, Bytes}}, _} = ra:local_query(QPid, - fun rabbit_fifo:query_in_memory_usage/1), - Bytes; + case ra:local_query(QPid, + fun rabbit_fifo:query_in_memory_usage/1) of + {ok, {_, {_, Bytes}}, _} -> + Bytes; + {error, _} -> + 0; + {timeout, _} -> + 0 + end; i(_K, _Q) -> ''. open_files(Name) -> |
