diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2019-11-13 17:39:50 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-11-13 17:39:50 +0300 |
| commit | 6ce30e8c9b2cd3fe101228ffea230e2f28e8eb91 (patch) | |
| tree | d83c9bd5bdcb39ff8c05f2234084f81093c71b50 | |
| parent | 2b3c42ef2ab91af851935eb55baebcab973a5203 (diff) | |
| parent | 1b683c81a3bf26eeea238e38ca6b54ce4856858e (diff) | |
| download | rabbitmq-server-git-6ce30e8c9b2cd3fe101228ffea230e2f28e8eb91.tar.gz | |
Merge pull request #2154 from rabbitmq/improve-error-handling-qq
Improve error handling and logging around quorum queues
| -rw-r--r-- | src/rabbit_amqqueue.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 93 |
3 files changed, 83 insertions, 44 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5654986ce5..936b18c53a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -1582,13 +1582,19 @@ basic_consume(Q, QName = amqqueue:get_name(Q), ok = check_consume_arguments(QName, Args), QState0 = get_quorum_state(Id, QName, QStates), - {ok, QState} = rabbit_quorum_queue:basic_consume(Q, NoAck, ChPid, - ConsumerPrefetchCount, - ConsumerTag, - ExclusiveConsume, Args, - ActingUser, - OkMsg, QState0), - {ok, maps:put(Name, QState, QStates)}. + case rabbit_quorum_queue:basic_consume(Q, NoAck, ChPid, + ConsumerPrefetchCount, + ConsumerTag, + ExclusiveConsume, Args, + ActingUser, + OkMsg, QState0) of + {ok, QState} -> + {ok, maps:put(Name, QState, QStates)}; + {error, Reason} -> + rabbit_misc:protocol_error(internal_error, + "Cannot consume a message from quorum queue '~s': ~w", + [rabbit_misc:rs(QName), Reason]) + end. -spec basic_cancel (amqqueue:amqqueue(), pid(), rabbit_types:ctag(), any(), diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 46064a52dc..c85dd7266a 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -713,8 +713,18 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) -> Query = fun (State) -> rabbit_fifo:get_checked_out(ConsumerId, From, To, State) end, - {ok, {_, Missing}, _} = ra:local_query(Leader, Query), - Missing. + case ra:local_query(Leader, Query) of + {ok, {_, Missing}, _} -> + Missing; + {error, Error} -> + rabbit_misc:protocol_error(internal_error, + "Cannot query missing deliveries from ~p: ~p", + [Leader, Error]); + {timeout, _} -> + rabbit_misc:protocol_error(internal_error, + "Cannot query missing deliveries from ~p: timeout", + [Leader]) + end. pick_node(#state{leader = undefined, servers = [N | _]}) -> %% TODO: pick random rather that first? diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 33bae0919f..75857f81a8 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -519,25 +519,31 @@ basic_consume(Q, NoAck, ChPid, Prefetch, ConsumerMeta, QState0), - {ok, {_, SacResult}, _} = ra:local_query(QPid, - fun rabbit_fifo:query_single_active_consumer/1), - - SingleActiveConsumerOn = single_active_consumer_on(Q), - {IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of - {false, _} -> - {true, up}; - {true, {value, {ConsumerTag, ChPid}}} -> - {true, single_active}; - _ -> - {false, waiting} - end, - - %% TODO: emit as rabbit_fifo effect - rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, QName, - ConsumerPrefetchCount, IsSingleActiveConsumer, - ActivityStatus, Args), - {ok, QState}. + case ra:local_query(QPid, + fun rabbit_fifo:query_single_active_consumer/1) of + {ok, {_, SacResult}, _} -> + + SingleActiveConsumerOn = single_active_consumer_on(Q), + {IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of + {false, _} -> + {true, up}; + {true, {value, {ConsumerTag, ChPid}}} -> + {true, single_active}; + _ -> + {false, waiting} + end, + + %% TODO: emit as rabbit_fifo effect + rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + not NoAck, QName, + ConsumerPrefetchCount, IsSingleActiveConsumer, + ActivityStatus, Args), + {ok, QState}; + {error, Error} -> + Error; + {timeout, _} -> + {error, timeout} + end. -spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. @@ -1052,35 +1058,52 @@ i(open_files, Q) when ?is_amqqueue(Q) -> lists:flatten(Data); i(single_active_consumer_pid, Q) when ?is_amqqueue(Q) -> QPid = amqqueue:get_pid(Q), - {ok, {_, SacResult}, _} = ra:local_query(QPid, - fun rabbit_fifo:query_single_active_consumer/1), - case SacResult of - {value, {_ConsumerTag, ChPid}} -> + case ra:local_query(QPid, fun rabbit_fifo:query_single_active_consumer/1) of + {ok, {_, {value, {_ConsumerTag, ChPid}}}, _} -> ChPid; - _ -> + {ok, _, _} -> + ''; + {error, _} -> + ''; + {timeout, _} -> '' end; i(single_active_consumer_ctag, Q) when ?is_amqqueue(Q) -> QPid = amqqueue:get_pid(Q), - {ok, {_, SacResult}, _} = ra:local_query(QPid, - fun rabbit_fifo:query_single_active_consumer/1), - case SacResult of - {value, {ConsumerTag, _ChPid}} -> + case ra:local_query(QPid, + fun rabbit_fifo:query_single_active_consumer/1) of + {ok, {_, {value, {ConsumerTag, _ChPid}}}, _} -> ConsumerTag; - _ -> + {ok, _, _} -> + ''; + {error, _} -> + ''; + {timeout, _} -> '' end; i(type, _) -> quorum; i(messages_ram, Q) when ?is_amqqueue(Q) -> QPid = amqqueue:get_pid(Q), - {ok, {_, {Length, _}}, _} = ra:local_query(QPid, - fun rabbit_fifo:query_in_memory_usage/1), - Length; + case ra:local_query(QPid, + fun rabbit_fifo:query_in_memory_usage/1) of + {ok, {_, {Length, _}}, _} -> + Length; + {error, _} -> + 0; + {timeout, _} -> + 0 + end; i(message_bytes_ram, Q) when ?is_amqqueue(Q) -> QPid = amqqueue:get_pid(Q), - {ok, {_, {_, Bytes}}, _} = ra:local_query(QPid, - fun rabbit_fifo:query_in_memory_usage/1), - Bytes; + case ra:local_query(QPid, + fun rabbit_fifo:query_in_memory_usage/1) of + {ok, {_, {_, Bytes}}, _} -> + Bytes; + {error, _} -> + 0; + {timeout, _} -> + 0 + end; i(_K, _Q) -> ''. open_files(Name) -> |
