summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile1
-rw-r--r--README.md10
-rw-r--r--docs/README.md17
-rw-r--r--docs/rabbitmq-server.82
-rw-r--r--docs/rabbitmq.conf.example9
-rw-r--r--docs/rabbitmqctl.85
-rw-r--r--priv/schema/rabbit.schema10
-rw-r--r--rabbitmq-components.mk13
-rwxr-xr-xscripts/rabbitmq-server10
-rw-r--r--src/lqueue.erl8
-rw-r--r--src/rabbit_amqqueue.erl29
-rw-r--r--src/rabbit_amqqueue_process.erl63
-rw-r--r--src/rabbit_basic.erl10
-rw-r--r--src/rabbit_channel.erl222
-rw-r--r--src/rabbit_connection_sup.erl4
-rw-r--r--src/rabbit_fifo.erl365
-rw-r--r--src/rabbit_fifo_client.erl52
-rw-r--r--src/rabbit_fifo_index.erl52
-rw-r--r--src/rabbit_lager.erl30
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_networking.erl41
-rw-r--r--src/rabbit_queue_consumers.erl18
-rw-r--r--src/rabbit_quorum_memory_manager.erl76
-rw-r--r--src/rabbit_quorum_queue.erl62
-rw-r--r--src/rabbit_reader.erl18
-rw-r--r--src/rabbit_vhost.erl11
-rw-r--r--src/rabbit_vm.erl16
-rw-r--r--test/config_schema_SUITE_data/rabbit.snippets15
-rw-r--r--test/quorum_queue_SUITE.erl470
-rw-r--r--test/rabbit_fifo_SUITE.erl14
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl348
31 files changed, 1668 insertions, 339 deletions
diff --git a/Makefile b/Makefile
index 68ab3c820f..e26f32d89f 100644
--- a/Makefile
+++ b/Makefile
@@ -268,6 +268,7 @@ web-manpages: $(WEB_MANPAGES)
gsub(/<h1/, "<h2", line); \
gsub(/<\/h1>/, "</h2>", line); \
gsub(/class="D1"/, "class=\"D1 sourcecode bash hljs\"", line); \
+ gsub(/class="Bd Bd-indent"/, "class=\"Bd Bd-indent sourcecode bash hljs\"", line); \
print line; \
} } \
' > "$@"
diff --git a/README.md b/README.md
index 3f07284d60..2267a853c9 100644
--- a/README.md
+++ b/README.md
@@ -16,10 +16,12 @@
## Tutorials & Documentation
* [RabbitMQ tutorials](https://rabbitmq.com/getstarted.html)
- * [Documentation guides](https://rabbitmq.com/documentation.html)
- * [Documentation Source Code](https://github.com/rabbitmq/rabbitmq-website/)
+ * [All documentation guides](https://rabbitmq.com/documentation.html)
+ * [Documentation source](https://github.com/rabbitmq/rabbitmq-website/)
* [Client libraries and tools](https://rabbitmq.com/devtools.html)
- * [Tutorials Source Code](https://github.com/rabbitmq/rabbitmq-tutorials/)
+ * [Monitoring guide](https://rabbitmq.com/monitoring.html)
+ * [Production checklist](https://rabbitmq.com/production-checklist.html)
+ * [Runnable tutorials](https://github.com/rabbitmq/rabbitmq-tutorials/)
## Getting Help
@@ -33,6 +35,8 @@
See [CONTRIBUTING.md](./CONTRIBUTING.md) and our [development process overview](https://rabbitmq.com/github.html).
+Questions about contributing, internals and so on are very welcome on the [mailing list](https://groups.google.com/forum/#!forum/rabbitmq-users).
+
## Licensing
diff --git a/docs/README.md b/docs/README.md
index 74bce06870..7efad23cf1 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -8,3 +8,20 @@ This directory contains [CLI tool](https://rabbitmq.com/cli.html) man page sourc
* A [systemd unit file example](./rabbitmq-server.service.example)
Please [see rabbitmq.com](https://rabbitmq.com/documentation.html) for documentation guides.
+
+## man Pages
+
+### Source Files
+
+This directory contains man pages that are are converted to HTML using `mandoc`:
+
+ gmake web-manpages
+
+The result is then copied to the [website repository](https://github.com/rabbitmq/rabbitmq-website/tree/live/site/man)
+
+### Contributions
+
+Since deployed man pages are generated, it is important to keep them in sync with the source.
+Accepting community contributions — which will always come as website pull requests —
+is fine but the person who merges them is responsible for backporting all changes
+to the source pages in this repo.
diff --git a/docs/rabbitmq-server.8 b/docs/rabbitmq-server.8
index 84772aa042..4c469d25d0 100644
--- a/docs/rabbitmq-server.8
+++ b/docs/rabbitmq-server.8
@@ -74,7 +74,7 @@ Defaults to 5672.
.Sh OPTIONS
.\" ------------------------------------------------------------------
.Bl -tag -width Ds
-.It Fl -detached
+.It Fl detached
Start the server process in the background.
Note that this will cause the pid not to be written to the pid file.
.Pp
diff --git a/docs/rabbitmq.conf.example b/docs/rabbitmq.conf.example
index 2a373b2eb3..a62ed38291 100644
--- a/docs/rabbitmq.conf.example
+++ b/docs/rabbitmq.conf.example
@@ -503,8 +503,17 @@
# Kernel section
# ======================================
+## Timeout used to detect peer unavailability, including CLI tools.
+## Related doc guide: https://www.rabbitmq.com/nettick.html.
+##
# net_ticktime = 60
+## Inter-node communication port range.
+## Related doc guide: https://www.rabbitmq.com/networking.html#epmd-inet-dist-port-range.
+##
+# inet_dist_listen_min = 25672
+# inet_dist_listen_max = 25692
+
## ----------------------------------------------------------------------------
## RabbitMQ Management Plugin
##
diff --git a/docs/rabbitmqctl.8 b/docs/rabbitmqctl.8
index c99170d175..524da897d1 100644
--- a/docs/rabbitmqctl.8
+++ b/docs/rabbitmqctl.8
@@ -73,6 +73,11 @@ for details of configuring the RabbitMQ broker.
.It Fl q , -quiet
Quiet output mode is selected.
Informational messages are suppressed when quiet mode is in effect.
+.It Fl s , -silent
+Silent output mode is selected.
+Informational messages and table headers are suppressed when silent mode is in effect.
+.It Fl -no-table-headers
+Do not output headers for tabular data.
.It Fl -dry-run
Do not run the command.
Only print information message.
diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema
index 9cee795296..5c6078a413 100644
--- a/priv/schema/rabbit.schema
+++ b/priv/schema/rabbit.schema
@@ -1342,6 +1342,16 @@ end}.
{validators, ["non_zero_positive_integer"]}
]}.
+{mapping, "inet_dist_listen_min", "kernel.inet_dist_listen_min",[
+ {datatype, [integer]},
+ {validators, ["non_zero_positive_integer"]}
+]}.
+
+{mapping, "inet_dist_listen_max", "kernel.inet_dist_listen_max",[
+ {datatype, [integer]},
+ {validators, ["non_zero_positive_integer"]}
+]}.
+
% ===============================
% Validators
% ===============================
diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk
index b440169d0d..1bec3c0942 100644
--- a/rabbitmq-components.mk
+++ b/rabbitmq-components.mk
@@ -50,7 +50,6 @@ dep_rabbitmq_auth_backend_ldap = git_rmq rabbitmq-auth-backend-ldap $(cur
dep_rabbitmq_auth_mechanism_ssl = git_rmq rabbitmq-auth-mechanism-ssl $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_aws = git_rmq rabbitmq-aws $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_boot_steps_visualiser = git_rmq rabbitmq-boot-steps-visualiser $(current_rmq_ref) $(base_rmq_ref) master
-dep_rabbitmq_clusterer = git_rmq rabbitmq-clusterer $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_cli = git_rmq rabbitmq-cli $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_codegen = git_rmq rabbitmq-codegen $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_consistent_hash_exchange = git_rmq rabbitmq-consistent-hash-exchange $(current_rmq_ref) $(base_rmq_ref) master
@@ -70,7 +69,6 @@ dep_rabbitmq_management = git_rmq rabbitmq-management $(current_rm
dep_rabbitmq_management_agent = git_rmq rabbitmq-management-agent $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_management_exchange = git_rmq rabbitmq-management-exchange $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_management_themes = git_rmq rabbitmq-management-themes $(current_rmq_ref) $(base_rmq_ref) master
-dep_rabbitmq_management_visualiser = git_rmq rabbitmq-management-visualiser $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_message_timestamp = git_rmq rabbitmq-message-timestamp $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_metronome = git_rmq rabbitmq-metronome $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_mqtt = git_rmq rabbitmq-mqtt $(current_rmq_ref) $(base_rmq_ref) master
@@ -110,17 +108,14 @@ dep_rabbitmq_public_umbrella = git_rmq rabbitmq-public-umbrella $(curre
# all projects use the same versions. It avoids conflicts and makes it
# possible to work with rabbitmq-public-umbrella.
-dep_cowboy = hex 2.4.0
-dep_cowlib = hex 2.3.0
+dep_cowboy = hex 2.6.1
+dep_cowlib = hex 2.7.0
dep_jsx = hex 2.9.0
dep_lager = hex 3.6.5
dep_ra = git https://github.com/rabbitmq/ra.git master
-dep_ranch = hex 1.6.2
-dep_ranch_proxy_protocol = hex 2.1.1
+dep_ranch = hex 1.7.1
dep_recon = hex 2.3.6
-dep_sockjs = git https://github.com/rabbitmq/sockjs-erlang.git 405990ea62353d98d36dbf5e1e64942d9b0a1daf
-
RABBITMQ_COMPONENTS = amqp_client \
amqp10_common \
amqp10_client \
@@ -134,7 +129,6 @@ RABBITMQ_COMPONENTS = amqp_client \
rabbitmq_auth_mechanism_ssl \
rabbitmq_aws \
rabbitmq_boot_steps_visualiser \
- rabbitmq_clusterer \
rabbitmq_cli \
rabbitmq_codegen \
rabbitmq_consistent_hash_exchange \
@@ -154,7 +148,6 @@ RABBITMQ_COMPONENTS = amqp_client \
rabbitmq_management_agent \
rabbitmq_management_exchange \
rabbitmq_management_themes \
- rabbitmq_management_visualiser \
rabbitmq_message_timestamp \
rabbitmq_metronome \
rabbitmq_mqtt \
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index d66e2517c6..f0c4030529 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -275,17 +275,17 @@ ensure_thread_pool_size() {
}
start_rabbitmq_server() {
- # "-pa ${RABBITMQ_SERVER_CODE_PATH}" should be the very first
- # command-line argument. In case of using cached HiPE-compilation,
- # this will allow for compiled versions of erlang built-in modules
- # (e.g. lists) to be loaded.
+ # The arguments to -pa are in this order because they are *pre*-pended
+ # to the code path. Since we want RABBITMQ_SERVER_CODE_PATH to precede
+ # RABBITMQ_EBIN_ROOT, it must come as the second argument here.
+ # https://github.com/rabbitmq/rabbitmq-server/issues/1777
ensure_thread_pool_size
check_start_params &&
RABBITMQ_CONFIG_FILE=$RABBITMQ_CONFIG_FILE \
ERL_MAX_ETS_TABLES=$ERL_MAX_ETS_TABLES \
ERL_CRASH_DUMP=$ERL_CRASH_DUMP \
exec ${ERL_DIR}erl \
- -pa ${RABBITMQ_SERVER_CODE_PATH} ${RABBITMQ_EBIN_ROOT} \
+ -pa "$RABBITMQ_EBIN_ROOT" "$RABBITMQ_SERVER_CODE_PATH" \
${RABBITMQ_START_RABBIT} \
${RABBITMQ_NAME_TYPE} ${RABBITMQ_NODENAME} \
-boot "${SASL_BOOT_FILE}" \
diff --git a/src/lqueue.erl b/src/lqueue.erl
index 0652061075..1abe4e0b82 100644
--- a/src/lqueue.erl
+++ b/src/lqueue.erl
@@ -21,7 +21,7 @@
%% is an O(1) operation, in contrast with queue:len/1 which is O(n).
-export([new/0, is_empty/1, len/1, in/2, in_r/2, out/1, out_r/1, join/2,
- foldl/3, foldr/3, from_list/1, to_list/1, peek/1, peek_r/1]).
+ foldl/3, foldr/3, from_list/1, drop/1, to_list/1, peek/1, peek_r/1]).
-define(QUEUE, queue).
@@ -32,6 +32,7 @@
-type result() :: 'empty' | {'value', value()}.
-spec new() -> ?MODULE().
+-spec drop(?MODULE()) -> ?MODULE().
-spec is_empty(?MODULE()) -> boolean().
-spec len(?MODULE()) -> non_neg_integer().
-spec in(value(), ?MODULE()) -> ?MODULE().
@@ -48,6 +49,8 @@
new() -> {0, ?QUEUE:new()}.
+drop({L, Q}) -> {L - 1, ?QUEUE:drop(Q)}.
+
is_empty({0, _Q}) -> true;
is_empty(_) -> false.
@@ -81,7 +84,8 @@ foldr(Fun, Init, Q) ->
{{value, V}, Q1} -> foldr(Fun, Fun(V, Init), Q1)
end.
-len({L, _Q}) -> L.
+len({L, _}) -> L.
+
peek({ 0, _Q}) -> empty;
peek({_L, Q}) -> ?QUEUE:peek(Q).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 64cb3cf790..0f7d569a00 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -44,6 +44,7 @@
-export([list_local_followers/0]).
-export([ensure_rabbit_queue_record_is_initialized/1]).
-export([format/1]).
+-export([delete_immediately_by_resource/1]).
-export([pid_of/1, pid_of/2]).
-export([mark_local_durable_queues_stopped/1]).
@@ -266,6 +267,13 @@ filter_per_type(Queues) ->
filter_pid_per_type(QPids) ->
lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids).
+filter_resource_per_type(Resources) ->
+ Queues = [begin
+ {ok, #amqqueue{pid = QPid}} = lookup(Resource),
+ {Resource, QPid}
+ end || Resource <- Resources],
+ lists:partition(fun({_Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues).
+
stop(VHost) ->
%% Classic queues
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
@@ -912,8 +920,11 @@ list_local(VHostPath) ->
[ Q || #amqqueue{state = State, pid = QPid} = Q <- list(VHostPath),
State =/= crashed, is_local_to_node(QPid, node()) ].
-notify_policy_changed(#amqqueue{pid = QPid}) ->
- gen_server2:cast(QPid, policy_changed).
+notify_policy_changed(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) ->
+ gen_server2:cast(QPid, policy_changed);
+notify_policy_changed(#amqqueue{pid = QPid,
+ name = QName}) when ?IS_QUORUM(QPid) ->
+ rabbit_quorum_queue:policy_changed(QName, QPid).
consumers(#amqqueue{ pid = QPid }) ->
delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}).
@@ -961,7 +972,16 @@ delete_exclusive(QPids, ConnId) ->
delete_immediately(QPids) ->
{Classic, Quorum} = filter_pid_per_type(QPids),
[gen_server2:cast(QPid, delete_immediately) || QPid <- Classic],
- [rabbit_quorum_queue:delete_immediately(QPid) || QPid <- Quorum],
+ case Quorum of
+ [] -> ok;
+ _ -> {error, cannot_delete_quorum_queues, Quorum}
+ end.
+
+delete_immediately_by_resource(Resources) ->
+ {Classic, Quorum} = filter_resource_per_type(Resources),
+ [gen_server2:cast(QPid, delete_immediately) || {_, QPid} <- Classic],
+ [rabbit_quorum_queue:delete_immediately(Resource, QPid)
+ || {Resource, QPid} <- Quorum],
ok.
delete(#amqqueue{ type = quorum} = Q,
@@ -1098,7 +1118,8 @@ notify_down_all(QPids, ChPid, Timeout) ->
Error -> {error, Error}
end.
-activate_limit_all(QPids, ChPid) ->
+activate_limit_all(QRefs, ChPid) ->
+ QPids = [P || P <- QRefs, ?IS_CLASSIC(P)],
delegate:invoke_no_result(QPids, {gen_server2, cast,
[{activate_limit, ChPid}]}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 606e62af11..5f56955ccc 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -669,44 +669,53 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
State#q{consumers = Consumers})}
end.
-maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) ->
+maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
+ Delivered,
+ State = #q{overflow = Overflow,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
send_mandatory(Delivery), %% must do this before confirms
case {will_overflow(Delivery, State), Overflow} of
{true, 'reject-publish'} ->
%% Drop publish and nack to publisher
send_reject_publish(Delivery, Delivered, State);
_ ->
- %% Enqueue and maybe drop head later
- deliver_or_enqueue(Delivery, Delivered, State)
+ {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
+ State1 = State#q{backing_queue_state = BQS1},
+ case IsDuplicate of
+ true -> State1;
+ {true, drop} -> State1;
+ %% Drop publish and nack to publisher
+ {true, reject} ->
+ send_reject_publish(Delivery, Delivered, State1);
+ %% Enqueue and maybe drop head later
+ false ->
+ deliver_or_enqueue(Delivery, Delivered, State1)
+ end
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
sender = SenderPid,
flow = Flow},
- Delivered, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ Delivered,
+ State = #q{backing_queue = BQ}) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State1),
- {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
- State2 = State1#q{backing_queue_state = BQS1},
- case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
- State2) of
- true ->
+ case attempt_delivery(Delivery, Props, Delivered, State1) of
+ {delivered, State2} ->
State2;
- {delivered, State3} ->
- State3;
%% The next one is an optimisation
- {undelivered, State3 = #q{ttl = 0, dlx = undefined,
- backing_queue_state = BQS2,
+ {undelivered, State2 = #q{ttl = 0, dlx = undefined,
+ backing_queue_state = BQS,
msg_id_to_channel = MTC}} ->
- {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
- State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
- {undelivered, State3 = #q{backing_queue_state = BQS2}} ->
-
- BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
- {Dropped, State4 = #q{backing_queue_state = BQS4}} =
- maybe_drop_head(State3#q{backing_queue_state = BQS3}),
- QLen = BQ:len(BQS4),
+ {BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC),
+ State2#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1};
+ {undelivered, State2 = #q{backing_queue_state = BQS}} ->
+
+ BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS),
+ {Dropped, State3 = #q{backing_queue_state = BQS2}} =
+ maybe_drop_head(State2#q{backing_queue_state = BQS1}),
+ QLen = BQ:len(BQS2),
%% optimisation: it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
%% we only do that if a new message that might have an
@@ -715,9 +724,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
%% has no expiry and becomes the head of the queue then
%% the call is unnecessary.
case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
- {false, false, _} -> State4;
- {true, true, undefined} -> State4;
- {_, _, _} -> drop_expired_msgs(State4)
+ {false, false, _} -> State3;
+ {true, true, undefined} -> State3;
+ {_, _, _} -> drop_expired_msgs(State3)
end
end.
@@ -1683,7 +1692,3 @@ update_ha_mode(State) ->
{ok, Q} = rabbit_amqqueue:lookup(qname(State)),
ok = rabbit_mirror_queue_misc:update_mirrors(Q),
State.
-
-
-
-
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index d474e9cad3..22ceefb85f 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -23,6 +23,7 @@
extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
header_routes/1, parse_expiration/1, header/2, header/3]).
-export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]).
+-export([add_header/4]).
%%----------------------------------------------------------------------------
@@ -300,3 +301,12 @@ maybe_gc_large_msg(Content) ->
msg_size(Content) ->
rabbit_writer:msg_size(Content).
+
+add_header(Name, Type, Value, #basic_message{content = Content0} = Msg) ->
+ Content = rabbit_basic:map_headers(
+ fun(undefined) ->
+ rabbit_misc:set_table_value([], Name, Type, Value);
+ (Headers) ->
+ rabbit_misc:set_table_value(Headers, Name, Type, Value)
+ end, Content0),
+ Msg#basic_message{content = Content}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 1b74b655f5..f749d9f30e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -110,7 +110,7 @@
%% when queue.bind's queue field is empty,
%% this name will be used instead
most_recently_declared_queue,
- %% a map of queue pid to queue name
+ %% a map of queue ref to queue name
queue_names,
%% queue processes are monitored to update
%% queue names
@@ -161,6 +161,7 @@
queue_cleanup_timer
}).
+-define(QUEUE, lqueue).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -339,12 +340,29 @@ list_local() ->
info_keys() -> ?INFO_KEYS.
info(Pid) ->
- gen_server2:call(Pid, info, infinity).
+ {Timeout, Deadline} = get_operation_timeout_and_deadline(),
+ try
+ case gen_server2:call(Pid, {info, Deadline}, Timeout) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end
+ catch
+ exit:{timeout, _} ->
+ rabbit_log:error("Timed out getting channel ~p info", [Pid]),
+ throw(timeout)
+ end.
info(Pid, Items) ->
- case gen_server2:call(Pid, {info, Items}, infinity) of
- {ok, Res} -> Res;
- {error, Error} -> throw(Error)
+ {Timeout, Deadline} = get_operation_timeout_and_deadline(),
+ try
+ case gen_server2:call(Pid, {{info, Items}, Deadline}, Timeout) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end
+ catch
+ exit:{timeout, _} ->
+ rabbit_log:error("Timed out getting channel ~p info", [Pid]),
+ throw(timeout)
end.
info_all() ->
@@ -433,7 +451,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
limiter = Limiter,
tx = none,
next_tag = 1,
- unacked_message_q = queue:new(),
+ unacked_message_q = ?QUEUE:new(),
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
@@ -493,13 +511,20 @@ prioritise_info(Msg, _Len, _State) ->
handle_call(flush, _From, State) ->
reply(ok, State);
-handle_call(info, _From, State) ->
- reply(infos(?INFO_KEYS, State), State);
+handle_call({info, Deadline}, _From, State) ->
+ try
+ reply({ok, infos(?INFO_KEYS, Deadline, State)}, State)
+ catch
+ Error ->
+ reply({error, Error}, State)
+ end;
-handle_call({info, Items}, _From, State) ->
+handle_call({{info, Items}, Deadline}, _From, State) ->
try
- reply({ok, infos(Items, State)}, State)
- catch Error -> reply({error, Error}, State)
+ reply({ok, infos(Items, Deadline, State)}, State)
+ catch
+ Error ->
+ reply({error, Error}, State)
end;
handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
@@ -649,8 +674,9 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
State = lists:foldl(
fun({MsgId, {MsgHeader, Msg}}, Acc) ->
IsDelivered = maps:is_key(delivery_count, MsgHeader),
+ Msg1 = add_delivery_count_header(MsgHeader, Msg),
handle_deliver(CTag, AckRequired,
- {QName, From, MsgId, IsDelivered, Msg},
+ {QName, From, MsgId, IsDelivered, Msg1},
Acc)
end, State0#ch{queue_states = maps:put(Name, QState2, QueueStates)}, Msgs),
noreply(State);
@@ -669,19 +695,20 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
#'basic.credit_drained'{consumer_tag = CTag,
credit_drained = Credit})
end, Actions),
- noreply_coalesce(confirm(MsgSeqNos, From, State));
+ noreply_coalesce(confirm(MsgSeqNos, Name, State));
eol ->
- State1 = handle_consuming_queue_down_or_eol(From, State0),
- State2 = handle_delivering_queue_down(From, State1),
- {MXs, UC1} = dtree:take(From, State2#ch.unconfirmed),
+ State1 = handle_consuming_queue_down_or_eol(Name, State0),
+ State2 = handle_delivering_queue_down(Name, State1),
+ %% TODO: this should use dtree:take/3
+ {MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed),
State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}),
- case maps:find(From, QNames) of
+ case maps:find(Name, QNames) of
{ok, QName} -> erase_queue_stats(QName);
error -> ok
end,
noreply_coalesce(
State3#ch{queue_states = maps:remove(Name, QueueStates),
- queue_names = maps:remove(From, QNames)})
+ queue_names = maps:remove(Name, QNames)})
end;
_ ->
%% the assumption here is that the queue state has been cleaned up and
@@ -1179,7 +1206,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
DQ = {Delivery#delivery{flow = Flow}, QNames},
{noreply, case Tx of
none -> deliver_to_queues(DQ, State1);
- {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
+ {Msgs, Acks} -> Msgs1 = ?QUEUE:in(DQ, Msgs),
State1#ch{tx = {Msgs1, Acks}}
end};
{error, Reason} ->
@@ -1334,13 +1361,14 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
return_ok(State, NoWait, OkMsg);
{ok, {Q = #amqqueue{pid = QPid}, _CParams}} ->
ConsumerMapping1 = maps:remove(ConsumerTag, ConsumerMapping),
+ QRef = qpid_to_ref(QPid),
QCons1 =
- case maps:find(QPid, QCons) of
+ case maps:find(QRef, QCons) of
error -> QCons;
{ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags),
case gb_sets:is_empty(CTags1) of
- true -> maps:remove(QPid, QCons);
- false -> maps:put(QPid, CTags1, QCons)
+ true -> maps:remove(QRef, QCons);
+ false -> maps:put(QRef, CTags1, QCons)
end
end,
NewState = State#ch{consumer_mapping = ConsumerMapping1,
@@ -1386,14 +1414,14 @@ handle_method(#'basic.qos'{global = true,
handle_method(#'basic.qos'{global = true,
prefetch_count = PrefetchCount},
_, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
- %% TODO queue:len(UAMQ) is not strictly right since that counts
+ %% TODO ?QUEUE:len(UAMQ) is not strictly right since that counts
%% unacked messages from basic.get too. Pretty obscure though.
Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
- PrefetchCount, queue:len(UAMQ)),
+ PrefetchCount, ?QUEUE:len(UAMQ)),
case ((not rabbit_limiter:is_active(Limiter)) andalso
rabbit_limiter:is_active(Limiter1)) of
true -> rabbit_amqqueue:activate_limit_all(
- consumer_queues(State#ch.consumer_mapping), self());
+ consumer_queue_refs(State#ch.consumer_mapping), self());
false -> ok
end,
{reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
@@ -1402,7 +1430,7 @@ handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ, limiter = Limiter,
queue_states = QueueStates0}) ->
OkFun = fun () -> ok end,
- UAMQL = queue:to_list(UAMQ),
+ UAMQL = ?QUEUE:to_list(UAMQ),
QueueStates =
foreach_per_queue(
fun ({QPid, CTag}, MsgIds, Acc0) ->
@@ -1416,7 +1444,7 @@ handle_method(#'basic.recover_async'{requeue = true},
ok = notify_limiter(Limiter, UAMQL),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
- {noreply, State#ch{unacked_message_q = queue:new(),
+ {noreply, State#ch{unacked_message_q = ?QUEUE:new(),
queue_states = QueueStates}};
handle_method(#'basic.recover_async'{requeue = false}, _, _State) ->
@@ -1525,7 +1553,7 @@ handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks},
limiter = Limiter}) ->
- State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
+ State1 = queue_fold(fun deliver_to_queues/2, State, Msgs),
Rev = fun (X) -> lists:reverse(lists:sort(X)) end,
State2 = lists:foldl(fun ({ack, A}, Acc) ->
ack(Rev(A), Acc);
@@ -1540,7 +1568,7 @@ handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
tx = {_Msgs, Acks}}) ->
AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])),
- UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))),
+ UAMQ1 = ?QUEUE:from_list(lists:usort(AcksL ++ ?QUEUE:to_list(UAMQ))),
{reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1,
tx = new_tx()}};
@@ -1639,25 +1667,26 @@ consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
queue_monitors = QMons,
queue_consumers = QCons}) ->
- {#amqqueue{pid = QPid}, _} =
- maps:get(ConsumerTag, ConsumerMapping),
- CTags1 = case maps:find(QPid, QCons) of
+ {#amqqueue{pid = QPid}, _} = maps:get(ConsumerTag, ConsumerMapping),
+ QRef = qpid_to_ref(QPid),
+ CTags1 = case maps:find(QRef, QCons) of
{ok, CTags} -> gb_sets:insert(ConsumerTag, CTags);
error -> gb_sets:singleton(ConsumerTag)
end,
- QCons1 = maps:put(QPid, CTags1, QCons),
- State#ch{queue_monitors = maybe_monitor(QPid, QMons),
+ QCons1 = maps:put(QRef, CTags1, QCons),
+ State#ch{queue_monitors = maybe_monitor(QRef, QMons),
queue_consumers = QCons1}.
track_delivering_queue(NoAck, QPid, QName,
State = #ch{queue_names = QNames,
queue_monitors = QMons,
delivering_queues = DQ}) ->
- State#ch{queue_names = maps:put(QPid, QName, QNames),
- queue_monitors = maybe_monitor(QPid, QMons),
+ QRef = qpid_to_ref(QPid),
+ State#ch{queue_names = maps:put(QRef, QName, QNames),
+ queue_monitors = maybe_monitor(QRef, QMons),
delivering_queues = case NoAck of
true -> DQ;
- false -> sets:add_element(QPid, DQ)
+ false -> sets:add_element(QRef, DQ)
end}.
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
@@ -1676,16 +1705,16 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) ->
error(quorum_queues_should_never_be_monitored).
-handle_consuming_queue_down_or_eol(QPid,
- State = #ch{queue_consumers = QCons,
- queue_names = QNames}) ->
- ConsumerTags = case maps:find(QPid, QCons) of
+handle_consuming_queue_down_or_eol(QRef,
+ State = #ch{queue_consumers = QCons,
+ queue_names = QNames}) ->
+ ConsumerTags = case maps:find(QRef, QCons) of
error -> gb_sets:new();
{ok, CTags} -> CTags
end,
gb_sets:fold(
fun (CTag, StateN = #ch{consumer_mapping = CMap}) ->
- QName = maps:get(QPid, QNames),
+ QName = maps:get(QRef, QNames),
case queue_down_consumer_action(CTag, CMap) of
remove ->
cancel_consumer(CTag, QName, StateN);
@@ -1697,7 +1726,7 @@ handle_consuming_queue_down_or_eol(QPid,
_ -> cancel_consumer(CTag, QName, StateN)
end
end
- end, State#ch{queue_consumers = maps:remove(QPid, QCons)}, ConsumerTags).
+ end, State#ch{queue_consumers = maps:remove(QRef, QCons)}, ConsumerTags).
%% [0] There is a slight danger here that if a queue is deleted and
%% then recreated again the reconsume will succeed even though it was
@@ -1724,8 +1753,8 @@ queue_down_consumer_action(CTag, CMap) ->
_ -> {recover, ConsumeSpec}
end.
-handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
- State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
+handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) ->
+ State#ch{delivering_queues = sets:del_element(QRef, DQ)}.
binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
RoutingKey, Arguments, VHostPath, ConnPid,
@@ -1831,28 +1860,28 @@ record_sent(Type, Tag, AckRequired,
end,
rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
UAMQ1 = case AckRequired of
- true -> queue:in({DeliveryTag, Tag, {QPid, MsgId}},
- UAMQ);
+ true -> ?QUEUE:in({DeliveryTag, Tag, {QPid, MsgId}},
+ UAMQ);
false -> UAMQ
end,
State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
%% NB: returns acks in youngest-first order
collect_acks(Q, 0, true) ->
- {lists:reverse(queue:to_list(Q)), queue:new()};
+ {lists:reverse(?QUEUE:to_list(Q)), ?QUEUE:new()};
collect_acks(Q, DeliveryTag, Multiple) ->
collect_acks([], [], Q, DeliveryTag, Multiple).
collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
- case queue:out(Q) of
+ case ?QUEUE:out(Q) of
{{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
QTail} ->
if CurrentDeliveryTag == DeliveryTag ->
{[UnackedMsg | ToAcc],
case PrefixAcc of
[] -> QTail;
- _ -> queue:join(
- queue:from_list(lists:reverse(PrefixAcc)),
+ _ -> ?QUEUE:join(
+ ?QUEUE:from_list(lists:reverse(PrefixAcc)),
QTail)
end};
Multiple ->
@@ -1880,7 +1909,7 @@ ack(Acked, State = #ch{queue_names = QNames,
State#ch{queue_states = QueueStates}.
incr_queue_stats(QPid, QNames, MsgIds, State) ->
- case maps:find(QPid, QNames) of
+ case maps:find(qpid_to_ref(QPid), QNames) of
{ok, QName} -> Count = length(MsgIds),
?INCR_STATS(queue_stats, QName, Count, ack, State);
error -> ok
@@ -1898,16 +1927,16 @@ incr_queue_stats(QPid, QNames, MsgIds, State) ->
%% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as
%% well as the list overall, are in "most-recent (generally youngest)
%% ack first" order.
-new_tx() -> {queue:new(), []}.
+new_tx() -> {?QUEUE:new(), []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
notify_queues(State = #ch{consumer_mapping = Consumers,
delivering_queues = DQ }) ->
- QPids0 = sets:to_list(
- sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
+ QRefs0 = sets:to_list(
+ sets:union(sets:from_list(consumer_queue_refs(Consumers)), DQ)),
%% filter to only include pids to avoid trying to notify quorum queues
- QPids = [P || P <- QPids0, ?IS_CLASSIC(P)],
+ QPids = [P || P <- QRefs0, ?IS_CLASSIC(P)],
Timeout = get_operation_timeout(),
{rabbit_amqqueue:notify_down_all(QPids, self(), Timeout),
State#ch{state = closing}}.
@@ -1923,8 +1952,8 @@ foreach_per_queue(F, UAL, Acc) ->
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T).
-consumer_queues(Consumers) ->
- lists:usort([QPid || {_Key, {#amqqueue{pid = QPid}, _CParams}}
+consumer_queue_refs(Consumers) ->
+ lists:usort([qpid_to_ref(QPid) || {_Key, {#amqqueue{pid = QPid}, _CParams}}
<- maps:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
@@ -1967,7 +1996,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
Qs = rabbit_amqqueue:lookup(DelQNames),
{DeliveredQPids, DeliveredQQPids, QueueStates} =
rabbit_amqqueue:deliver(Qs, Delivery, QueueStates0),
- AllDeliveredQPids = DeliveredQPids ++ DeliveredQQPids,
+ AllDeliveredQRefs = DeliveredQPids ++ [N || {N, _} <- DeliveredQQPids],
%% The maybe_monitor_all/2 monitors all queues to which we
%% delivered. But we want to monitor even queues we didn't deliver
%% to, since we need their 'DOWN' messages to clean
@@ -1981,49 +2010,50 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
{QNames1, QMons1} =
lists:foldl(fun (#amqqueue{pid = QPid, name = QName},
{QNames0, QMons0}) ->
- {case maps:is_key(QPid, QNames0) of
+ QRef = qpid_to_ref(QPid),
+ {case maps:is_key(QRef, QNames0) of
true -> QNames0;
- false -> maps:put(QPid, QName, QNames0)
+ false -> maps:put(QRef, QName, QNames0)
end, maybe_monitor(QPid, QMons0)}
end, {QNames, maybe_monitor_all(DeliveredQPids, QMons)}, Qs),
State1 = State#ch{queue_names = QNames1,
queue_monitors = QMons1},
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
- State2 = process_routing_mandatory(Mandatory, AllDeliveredQPids, MsgSeqNo,
+ State2 = process_routing_mandatory(Mandatory, AllDeliveredQRefs , MsgSeqNo,
Message, State1),
- State3 = process_routing_confirm( Confirm, AllDeliveredQPids, MsgSeqNo,
- XName, State2),
+ State3 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo,
+ XName, State2),
case rabbit_event:stats_level(State3, #ch.stats_timer) of
fine ->
?INCR_STATS(exchange_stats, XName, 1, publish),
[?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) ||
- QPid <- AllDeliveredQPids,
- {ok, QName} <- [maps:find(QPid, QNames1)]];
+ QRef <- AllDeliveredQRefs,
+ {ok, QName} <- [maps:find(QRef, QNames1)]];
_ ->
ok
end,
State3#ch{queue_states = QueueStates}.
-process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) ->
+process_routing_mandatory(false, _, _, _, State) ->
State;
-process_routing_mandatory(true, [], _MsgSeqNo, Msg, State) ->
+process_routing_mandatory(true, [], _, Msg, State) ->
ok = basic_return(Msg, State, no_route),
State;
-process_routing_mandatory(true, QPids, MsgSeqNo, Msg, State) ->
- State#ch{mandatory = dtree:insert(MsgSeqNo, QPids, Msg,
+process_routing_mandatory(true, QRefs, MsgSeqNo, Msg, State) ->
+ State#ch{mandatory = dtree:insert(MsgSeqNo, QRefs, Msg,
State#ch.mandatory)}.
-process_routing_confirm(false, _, _MsgSeqNo, _XName, State) ->
+process_routing_confirm(false, _, _, _, State) ->
State;
-process_routing_confirm(true, [], MsgSeqNo, XName, State) ->
+process_routing_confirm(true, [], MsgSeqNo, XName, State) ->
record_confirms([{MsgSeqNo, XName}], State);
-process_routing_confirm(true, QPids, MsgSeqNo, XName, State) ->
- State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
+process_routing_confirm(true, QRefs, MsgSeqNo, XName, State) ->
+ State#ch{unconfirmed = dtree:insert(MsgSeqNo, QRefs, XName,
State#ch.unconfirmed)}.
-confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
+confirm(MsgSeqNos, QRef, State = #ch{unconfirmed = UC}) ->
+ {MXs, UC1} = dtree:take(MsgSeqNos, QRef, UC),
%% NB: don't call noreply/1 since we don't want to send confirms.
record_confirms(MXs, State#ch{unconfirmed = UC1}).
@@ -2129,6 +2159,17 @@ complete_tx(State = #ch{tx = failed}) ->
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+infos(Items, Deadline, State) ->
+ [begin
+ Now = now_millis(),
+ if
+ Now > Deadline ->
+ throw(timeout);
+ true ->
+ {Item, i(Item, State)}
+ end
+ end || Item <- Items].
+
i(pid, _) -> self();
i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
i(number, #ch{channel = Channel}) -> Channel;
@@ -2140,8 +2181,8 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM);
i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
-i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ);
-i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs);
+i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> ?QUEUE:len(UAMQ);
+i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs);
i(messages_uncommitted, #ch{}) -> 0;
i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
@@ -2488,3 +2529,28 @@ maybe_monitor(_, QMons) ->
maybe_monitor_all([], S) -> S; %% optimisation
maybe_monitor_all([Item], S) -> maybe_monitor(Item, S); %% optimisation
maybe_monitor_all(Items, S) -> lists:foldl(fun maybe_monitor/2, S, Items).
+
+add_delivery_count_header(MsgHeader, Msg) ->
+ Count = maps:get(delivery_count, MsgHeader, 0),
+ rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg).
+
+qpid_to_ref(Pid) when is_pid(Pid) -> Pid;
+qpid_to_ref({Name, _}) -> Name;
+%% assume it already is a ref
+qpid_to_ref(Ref) -> Ref.
+
+now_millis() ->
+ erlang:monotonic_time(millisecond).
+
+get_operation_timeout_and_deadline() ->
+ % NB: can't use get_operation_timeout because
+ % this code may not be running via the channel Pid
+ Timeout = ?CHANNEL_OPERATION_TIMEOUT,
+ Deadline = now_millis() + Timeout,
+ {Timeout, Deadline}.
+
+queue_fold(Fun, Init, Q) ->
+ case ?QUEUE:out(Q) of
+ {empty, _Q} -> Init;
+ {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
+ end.
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index 4dcfa8dc8a..7ec5262448 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -42,7 +42,7 @@
%%--------------------------------------------------------------------------
-start_link(Ref, Sock, _Transport, _Opts) ->
+start_link(Ref, _Sock, _Transport, _Opts) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
%% We need to get channels in the hierarchy here so they get shut
%% down after the reader, so the reader gets a chance to terminate
@@ -62,7 +62,7 @@ start_link(Ref, Sock, _Transport, _Opts) ->
{ok, ReaderPid} =
supervisor2:start_child(
SupPid,
- {reader, {rabbit_reader, start_link, [HelperSup, Ref, Sock]},
+ {reader, {rabbit_reader, start_link, [HelperSup, Ref]},
intrinsic, ?WORKER_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 79d4a3effc..00d0db0b8a 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -1,3 +1,19 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
-module(rabbit_fifo).
-behaviour(ra_machine).
@@ -106,7 +122,7 @@
-type applied_mfa() :: {module(), atom(), list()}.
% represents a partially applied module call
--define(SHADOW_COPY_INTERVAL, 4096).
+-define(SHADOW_COPY_INTERVAL, 4096 * 4).
-define(USE_AVG_HALF_LIFE, 10000.0).
-record(consumer,
@@ -147,7 +163,7 @@
next_msg_num = 1 :: msg_in_id(),
% list of returned msg_in_ids - when checking out it picks from
% this list first before taking low_msg_num
- returns = queue:new() :: queue:queue(msg_in_id()),
+ returns = lqueue:new() :: lqueue:queue(msg_in_id() | '$prefix_msg'),
% a counter of enqueues - used to trigger shadow copy points
enqueue_count = 0 :: non_neg_integer(),
% a map containing all the live processes that have ever enqueued
@@ -180,7 +196,8 @@
%% it instead takes messages from the `messages' map.
%% This is done so that consumers are still served in a deterministic
%% order on recovery.
- prefix_msg_count = 0 :: non_neg_integer()
+ prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(),
+ PrefixMsgs :: non_neg_integer()}
}).
-opaque state() :: #state{}.
@@ -207,19 +224,19 @@
-spec init(config()) -> {state(), ra_machine:effects()}.
init(#{name := Name} = Conf) ->
+ update_state(Conf, #state{name = Name}).
+
+update_state(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
CCH = maps:get(cancel_consumer_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
MH = maps:get(metrics_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
- #state{name = Name,
- dead_letter_handler = DLH,
- cancel_consumer_handler = CCH,
- become_leader_handler = BLH,
- metrics_handler = MH,
- shadow_copy_interval = SHI}.
-
-
+ State#state{dead_letter_handler = DLH,
+ cancel_consumer_handler = CCH,
+ become_leader_handler = BLH,
+ metrics_handler = MH,
+ shadow_copy_interval = SHI}.
% msg_ids are scoped per consumer
% ra_indexes holds all raft indexes for enqueues currently on queue
@@ -228,9 +245,9 @@ init(#{name := Name} = Conf) ->
{state(), ra_machine:effects(), Reply :: term()}.
apply(#{index := RaftIdx}, {enqueue, From, Seq, RawMsg}, Effects0, State00) ->
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of
- {ok, State0, Effects} ->
- State = append_to_master_index(RaftIdx, State0),
- checkout(State, Effects);
+ {ok, State0, Effects1} ->
+ {State, Effects, ok} = checkout(State0, Effects1),
+ {append_to_master_index(RaftIdx, State), Effects, ok};
{duplicate, State, Effects} ->
{State, Effects, ok}
end;
@@ -263,7 +280,7 @@ apply(_, {return, MsgIds, ConsumerId}, Effects0,
#{ConsumerId := Con0 = #consumer{checked_out = Checked0}} ->
Checked = maps:without(MsgIds, Checked0),
Returned = maps:with(MsgIds, Checked0),
- MsgNumMsgs = [M || M <- maps:values(Returned)],
+ MsgNumMsgs = maps:values(Returned),
return(ConsumerId, MsgNumMsgs, Con0, Checked, Effects0, State);
_ ->
{State, Effects0, ok}
@@ -313,7 +330,8 @@ apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0,
end;
apply(_, {checkout, {dequeue, _}, {_Tag, _Pid}}, Effects0,
#state{messages = M,
- prefix_msg_count = 0} = State0) when map_size(M) == 0 ->
+ prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 ->
+ %% FIXME: also check if there are returned messages
%% TODO do we need metric visibility of empty get requests?
{State0, Effects0, {dequeue, empty}};
apply(Meta, {checkout, {dequeue, settled}, ConsumerId},
@@ -357,7 +375,7 @@ apply(#{index := RaftIdx}, purge, Effects0,
{StateAcc0, EffectsAcc0, ok}) ->
MsgRaftIdxs = [RIdx || {_MsgInId, {RIdx, _}}
<- maps:values(Checked0)],
- complete(ConsumerId, MsgRaftIdxs, C,
+ complete(ConsumerId, MsgRaftIdxs, maps:size(Checked0), C,
#{}, EffectsAcc0, StateAcc0)
end, {State0, Effects0, ok}, Cons0),
{State, Effects, _} =
@@ -365,7 +383,7 @@ apply(#{index := RaftIdx}, purge, Effects0,
RaftIdx, Indexes,
State1#state{ra_indexes = rabbit_fifo_index:empty(),
messages = #{},
- returns = queue:new(),
+ returns = lqueue:new(),
low_msg_num = undefined}, Effects1),
{State, [garbage_collection | Effects], {purge, Total}};
apply(_, {down, ConsumerPid, noconnection},
@@ -374,10 +392,16 @@ apply(_, {down, ConsumerPid, noconnection},
Node = node(ConsumerPid),
% mark all consumers and enqueuers as suspect
% and monitor the node
- Cons = maps:map(fun({_, P}, C) when node(P) =:= Node ->
- C#consumer{suspected_down = true};
- (_, C) -> C
- end, Cons0),
+ {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C,
+ {Co, St0}) when node(P) =:= Node ->
+ St = return_all(St0, Checked0),
+ {maps:put(K, C#consumer{suspected_down = true,
+ checked_out = #{}},
+ Co),
+ St};
+ (K, C, {Co, St}) ->
+ {maps:put(K, C, Co), St}
+ end, {#{}, State0}, Cons0),
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{suspected_down = true};
(_, E) -> E
@@ -388,7 +412,7 @@ apply(_, {down, ConsumerPid, noconnection},
_ ->
[{monitor, node, Node} | Effects0]
end,
- {State0#state{consumers = Cons, enqueuers = Enqs}, Effects, ok};
+ {State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok};
apply(_, {down, Pid, _Info}, Effects0,
#state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
@@ -411,7 +435,8 @@ apply(_, {down, Pid, _Info}, Effects0,
checkout(State2, Effects1);
apply(_, {nodeup, Node}, Effects0,
#state{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+ enqueuers = Enqs0,
+ service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
%% If we have suspected any processes of being
%% down we should now re-issue the monitors for them to detect if they're
@@ -427,26 +452,48 @@ apply(_, {nodeup, Node}, Effects0,
(_, _, Acc) -> Acc
end, [], Enqs0),
Monitors = [{monitor, process, P} || P <- Cons ++ Enqs],
+ Enqs1 = maps:map(fun(P, E) when node(P) =:= Node ->
+ E#enqueuer{suspected_down = false};
+ (_, E) -> E
+ end, Enqs0),
+ {Cons1, SQ, Effects} =
+ maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
+ when node(P) =:= Node ->
+ update_or_remove_sub(
+ ConsumerId, C#consumer{suspected_down = false},
+ CAcc, SQAcc, EAcc);
+ (_, _, Acc) ->
+ Acc
+ end, {Cons0, SQ0, Effects0}, Cons0),
% TODO: avoid list concat
- {State0, Monitors ++ Effects0, ok};
+ checkout(State0#state{consumers = Cons1, enqueuers = Enqs1,
+ service_queue = SQ}, Monitors ++ Effects);
apply(_, {nodedown, _Node}, Effects, State) ->
- {State, Effects, ok}.
+ {State, Effects, ok};
+apply(_, {update_state, Conf}, Effects, State) ->
+ {update_state(Conf, State), Effects, ok}.
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
-state_enter(leader, #state{consumers = Custs,
+state_enter(leader, #state{consumers = Cons,
enqueuers = Enqs,
name = Name,
+ prefix_msg_counts = {0, 0},
become_leader_handler = BLH}) ->
% return effects to monitor all current consumers and enqueuers
- ConMons = [{monitor, process, P} || {_, P} <- maps:keys(Custs)],
- EnqMons = [{monitor, process, P} || P <- maps:keys(Enqs)],
- Effects = ConMons ++ EnqMons,
+ Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]),
+ Mons = [{monitor, process, P} || P <- Pids],
+ Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
+ Effects = Mons ++ Nots,
case BLH of
undefined ->
Effects;
{Mod, Fun, Args} ->
[{mod_call, Mod, Fun, Args ++ [Name]} | Effects]
end;
+state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts})
+ when PrefixMsgCounts =/= {0, 0} ->
+ %% TODO: remove assertion?
+ exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts});
state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0}) ->
Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
[{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))];
@@ -583,9 +630,7 @@ cancel_consumer(ConsumerId,
{Effects0, #state{consumers = C0, name = Name} = S0}) ->
case maps:take(ConsumerId, C0) of
{#consumer{checked_out = Checked0}, Cons} ->
- S = maps:fold(fun (_, {MsgNum, Msg}, S) ->
- return_one(MsgNum, Msg, S)
- end, S0, Checked0),
+ S = return_all(S0, Checked0),
Effects = cancel_consumer_effects(ConsumerId, Name, S, Effects0),
case maps:size(Cons) of
0 ->
@@ -678,8 +723,8 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked,
end,
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
- State1 = lists:foldl(fun('$prefix_msg', #state{prefix_msg_count = MsgCount} = S0) ->
- S0#state{prefix_msg_count = MsgCount + 1};
+ State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) ->
+ return_one(0, Msg, S0);
({MsgNum, Msg}, S0) ->
return_one(MsgNum, Msg, S0)
end, State0, MsgNumMsgs),
@@ -688,14 +733,14 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked,
Effects).
% used to processes messages that are finished
-complete(ConsumerId, MsgRaftIdxs,
+complete(ConsumerId, MsgRaftIdxs, NumDiscarded,
Con0, Checked, Effects0,
#state{consumers = Cons0, service_queue = SQ0,
ra_indexes = Indexes0} = State0) ->
- %% credit_mode = simple_prefetch should automatically top-up credit as messages
- %% are simple_prefetch or otherwise returned
+ %% credit_mode = simple_prefetch should automatically top-up credit
+ %% as messages are simple_prefetch or otherwise returned
Con = Con0#consumer{checked_out = Checked,
- credit = increase_credit(Con0, length(MsgRaftIdxs))},
+ credit = increase_credit(Con0, NumDiscarded)},
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs),
@@ -721,7 +766,10 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
Checked = maps:without(MsgIds, Checked0),
Discarded = maps:with(MsgIds, Checked0),
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
+ %% need to pass the length of discarded as $prefix_msgs would be filtered
+ %% by the above list comprehension
{State1, Effects1, _} = complete(ConsumerId, MsgRaftIdxs,
+ maps:size(Discarded),
Con0, Checked, Effects0, State0),
{State, Effects, _} = checkout(State1, Effects1),
% settle metrics are incremented separately
@@ -749,6 +797,7 @@ cancel_consumer_effects(Pid, Name,
update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
#state{ra_indexes = Indexes,
+ % prefix_msg_count = 0,
messages = Messages} = State, Effects) ->
case rabbit_fifo_index:size(Indexes) of
0 when map_size(Messages) =:= 0 ->
@@ -777,6 +826,9 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
end.
% TODO update message then update messages and returns in single operations
+return_one(0, '$prefix_msg',
+ #state{returns = Returns} = State0) ->
+ State0#state{returns = lqueue:in('$prefix_msg', Returns)};
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
#state{messages = Messages,
returns = Returns} = State0) ->
@@ -786,8 +838,14 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
Msg = {RaftId, {Header, RawMsg}},
% this should not affect the release cursor in any way
State0#state{messages = maps:put(MsgNum, Msg, Messages),
- returns = queue:in(MsgNum, Returns)}.
+ returns = lqueue:in(MsgNum, Returns)}.
+return_all(State, Checked) ->
+ maps:fold(fun (_, '$prefix_msg', S) ->
+ return_one(0, '$prefix_msg', S);
+ (_, {MsgNum, Msg}, S) ->
+ return_one(MsgNum, Msg, S)
+ end, State, Checked).
checkout(State, Effects) ->
checkout0(checkout_one(State), Effects, #{}).
@@ -813,13 +871,20 @@ append_send_msg_effects(Effects0, AccMap) ->
end, Effects0, AccMap),
[{aux, active} | Effects].
+next_checkout_message(#state{prefix_msg_counts = {PReturns, P}} = State)
+ when PReturns > 0 ->
+ %% there are prefix returns, these should be served first
+ {'$prefix_msg', State#state{prefix_msg_counts = {PReturns - 1, P}}};
next_checkout_message(#state{returns = Returns,
low_msg_num = Low0,
+ prefix_msg_counts = {R, P},
next_msg_num = NextMsgNum} = State) ->
%% use peek rather than out there as the most likely case is an empty
%% queue
- case queue:peek(Returns) of
- empty ->
+ case lqueue:peek(Returns) of
+ {value, Next} ->
+ {Next, State#state{returns = lqueue:drop(Returns)}};
+ empty when P == 0 ->
case Low0 of
undefined ->
{undefined, State};
@@ -832,25 +897,32 @@ next_checkout_message(#state{returns = Returns,
{Low0, State#state{low_msg_num = Low}}
end
end;
- {value, Next} ->
- {Next, State#state{returns = queue:drop(Returns)}}
+ empty ->
+ %% There are prefix msgs
+ {'$prefix_msg', State#state{prefix_msg_counts = {R, P - 1}}}
end.
-take_next_msg(#state{prefix_msg_count = 0,
- messages = Messages0} = State0) ->
- {NextMsgInId, State} = next_checkout_message(State0),
- %% messages are available
- case maps:take(NextMsgInId, Messages0) of
- {IdxMsg, Messages} ->
- {{NextMsgInId, IdxMsg}, State, Messages, 0};
- error ->
- error
- end;
-take_next_msg(#state{prefix_msg_count = MsgCount,
- messages = Messages} = State) ->
- %% there is still a prefix message count for the consumer
- %% "fake" a '$prefix_msg' message
- {'$prefix_msg', State, Messages, MsgCount - 1}.
+%% next message is determined as follows:
+%% First we check if there are are prefex returns
+%% Then we check if there are current returns
+%% then we check prefix msgs
+%% then we check current messages
+%%
+%% When we return it is always done to the current return queue
+%% for both prefix messages and current messages
+take_next_msg(#state{messages = Messages0} = State0) ->
+ case next_checkout_message(State0) of
+ {'$prefix_msg', State} ->
+ {'$prefix_msg', State, Messages0};
+ {NextMsgInId, State} ->
+ %% messages are available
+ case maps:take(NextMsgInId, Messages0) of
+ {IdxMsg, Messages} ->
+ {{NextMsgInId, IdxMsg}, State, Messages};
+ error ->
+ error
+ end
+ end.
send_msg_effect({CTag, CPid}, Msgs) ->
{send_msg, CPid, {delivery, CTag, Msgs}, ra_event}.
@@ -861,7 +933,7 @@ checkout_one(#state{service_queue = SQ0,
case queue:peek(SQ0) of
{value, ConsumerId} ->
case take_next_msg(InitState) of
- {ConsumerMsg, State0, Messages, PrefMsgC} ->
+ {ConsumerMsg, State0, Messages} ->
SQ1 = queue:drop(SQ0),
%% there are consumers waiting to be serviced
%% process consumer checkout
@@ -871,6 +943,8 @@ checkout_one(#state{service_queue = SQ0,
%% can happen when draining
%% recurse without consumer on queue
checkout_one(InitState#state{service_queue = SQ1});
+ {ok, #consumer{suspected_down = true}} ->
+ checkout_one(InitState#state{service_queue = SQ1});
{ok, #consumer{checked_out = Checked0,
next_msg_id = Next,
credit = Credit,
@@ -885,7 +959,6 @@ checkout_one(#state{service_queue = SQ0,
Cons0, SQ1, []),
State = State0#state{service_queue = SQ,
messages = Messages,
- prefix_msg_count = PrefMsgC,
consumers = Cons},
Msg = case ConsumerMsg of
'$prefix_msg' -> '$prefix_msg';
@@ -976,17 +1049,30 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
%% creates a dehydrated version of the current state to be cached and
%% potentially used to for a snaphot at a later point
-dehydrate_state(#state{messages = Messages0,
+dehydrate_state(#state{messages = Messages,
consumers = Consumers,
- prefix_msg_count = MsgCount} = State) ->
+ returns = Returns,
+ prefix_msg_counts = {PrefRetCnt, MsgCount}} = State) ->
+ %% TODO: optimise to avoid having to iterate the queue to get the number
+ %% of current returned messages
+ RetLen = lqueue:len(Returns), % O(1)
+ CurReturns = length([R || R <- lqueue:to_list(Returns),
+ R =/= '$prefix_msg']),
+ PrefixMsgCnt = MsgCount + maps:size(Messages) - CurReturns,
State#state{messages = #{},
ra_indexes = rabbit_fifo_index:empty(),
low_msg_num = undefined,
consumers = maps:map(fun (_, C) ->
- C#consumer{checked_out = #{}}
+ dehydrate_consumer(C)
end, Consumers),
- returns = queue:new(),
- prefix_msg_count = maps:size(Messages0) + MsgCount}.
+ returns = lqueue:new(),
+ %% messages include returns
+ prefix_msg_counts = {RetLen + PrefRetCnt,
+ PrefixMsgCnt}}.
+
+dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
+ Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0),
+ Con#consumer{checked_out = Checked}.
-ifdef(TEST).
@@ -1289,6 +1375,20 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() ->
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3),
ok.
+down_with_noconnection_returns_unack_test() ->
+ Pid = spawn(fun() -> ok end),
+ Cid = {<<"down_with_noconnect">>, Pid},
+ {State0, _} = enq(1, 1, second, test_init(test)),
+ ?assertEqual(1, maps:size(State0#state.messages)),
+ ?assertEqual(0, lqueue:len(State0#state.returns)),
+ {State1, {_, _}} = deq(2, Cid, unsettled, State0),
+ ?assertEqual(0, maps:size(State1#state.messages)),
+ ?assertEqual(0, lqueue:len(State1#state.returns)),
+ {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1),
+ ?assertEqual(1, maps:size(State2a#state.messages)),
+ ?assertEqual(1, lqueue:len(State2a#state.returns)),
+ ok.
+
down_with_noproc_enqueuer_is_cleaned_up_test() ->
State00 = test_init(test),
Pid = spawn(fun() -> ok end),
@@ -1351,7 +1451,7 @@ tick_test() ->
ok.
enq_deq_snapshot_recover_test() ->
- Tag = <<"release_cursor_snapshot_state_test">>,
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
% OthPid = spawn(fun () -> ok end),
% Oth = {<<"oth">>, OthPid},
@@ -1408,20 +1508,49 @@ snapshot_recover_test() ->
],
run_snapshot_test(?FUNCTION_NAME, Commands).
-enq_deq_return_snapshot_recover_test() ->
+enq_deq_return_settle_snapshot_test() ->
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ {enqueue, self(), 1, one}, %% to Cid
+ {checkout, {auto, 1, simple_prefetch}, Cid},
+ {return, [0], Cid}, %% should be re-delivered to Cid
+ {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2
+ {settle, [1], Cid},
+ {settle, [2], Cid}
+ ],
+ run_snapshot_test(?FUNCTION_NAME, Commands).
+
+return_prefix_msg_count_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
- OthPid = spawn(fun () -> ok end),
- Oth = {<<"oth">>, OthPid},
Commands = [
{enqueue, self(), 1, one},
- {enqueue, self(), 2, two},
- {checkout, {dequeue, unsettled}, Oth},
- {checkout, {dequeue, unsettled}, Cid},
- {settle, [0], Oth},
- {return, [0], Cid},
+ {checkout, {auto, 1, simple_prefetch}, Cid},
+ {checkout, cancel, Cid},
+ {enqueue, self(), 2, two} %% Cid prefix_msg_count: 2
+ ],
+ Indexes = lists:seq(1, length(Commands)),
+ Entries = lists:zip(Indexes, Commands),
+ {State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries),
+ ?debugFmt("return_prefix_msg_count_test state ~n~p~n", [State]),
+ ok.
+
+
+return_settle_snapshot_test() ->
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ {enqueue, self(), 1, one}, %% to Cid
+ {checkout, {auto, 1, simple_prefetch}, Cid},
+ {return, [0], Cid}, %% should be re-delivered to Oth
+ {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2
+ {settle, [1], Cid},
+ {return, [2], Cid},
+ {settle, [3], Cid},
{enqueue, self(), 3, three},
- purge
+ purge,
+ {enqueue, self(), 4, four}
],
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1436,17 +1565,67 @@ enq_check_settle_snapshot_recover_test() ->
{settle, [0], Cid},
{enqueue, self(), 3, three},
{settle, [2], Cid}
+ ],
+ % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
+ run_snapshot_test(?FUNCTION_NAME, Commands).
+enq_check_settle_snapshot_purge_test() ->
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ {checkout, {auto, 2, simple_prefetch}, Cid},
+ {enqueue, self(), 1, one},
+ {enqueue, self(), 2, two},
+ {settle, [1], Cid},
+ {settle, [0], Cid},
+ {enqueue, self(), 3, three},
+ purge
+ ],
+ % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
+ run_snapshot_test(?FUNCTION_NAME, Commands).
+
+enq_check_settle_duplicate_test() ->
+ %% duplicate settle commands are likely
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ {checkout, {auto, 2, simple_prefetch}, Cid},
+ {enqueue, self(), 1, one}, %% 0
+ {enqueue, self(), 2, two}, %% 0
+ {settle, [0], Cid},
+ {settle, [1], Cid},
+ {settle, [1], Cid},
+ {enqueue, self(), 3, three},
+ {settle, [2], Cid}
],
% ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
run_snapshot_test(?FUNCTION_NAME, Commands).
+multi_return_snapshot_test() ->
+ %% this was discovered using property testing
+ C1 = {<<>>, c:pid(0,6723,1)},
+ C2 = {<<0>>,c:pid(0,6723,1)},
+ E = c:pid(0,6720,1),
+ Commands = [
+ {checkout,{auto,2,simple_prefetch},C1},
+ {enqueue,E,1,msg},
+ {enqueue,E,2,msg},
+ {checkout,cancel,C1}, %% both on returns queue
+ {checkout,{auto,1,simple_prefetch},C2}, % on on return one on C2
+ {return,[0],C2}, %% E1 in returns, E2 with C2
+ {return,[1],C2}, %% E2 in returns E1 with C2
+ {settle,[2],C2} %% E2 with C2
+ ],
+ run_snapshot_test(?FUNCTION_NAME, Commands),
+ ok.
+
+
run_snapshot_test(Name, Commands) ->
%% create every incremental permuation of the commands lists
%% and run the snapshot tests against that
[begin
- % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
+ ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]),
run_snapshot_test0(Name, C)
end || C <- prefixes(Commands, 1, [])].
@@ -1459,6 +1638,7 @@ run_snapshot_test0(Name, Commands) ->
Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
end, Entries),
+ ?debugFmt("running from snapshot: ~b", [SnapIdx]),
{S, _} = run_log(SnapState, Filtered),
% assert log can be restored from any release cursor index
% ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n",
@@ -1474,7 +1654,7 @@ prefixes(Source, N, Acc) ->
prefixes(Source, N+1, [X | Acc]).
delivery_query_returns_deliveries_test() ->
- Tag = <<"release_cursor_snapshot_state_test">>,
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
{checkout, {auto, 5, simple_prefetch}, Cid},
@@ -1514,19 +1694,30 @@ state_enter_test() ->
[{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0),
ok.
-leader_monitors_on_state_enter_test() ->
- Cid = {<<"cid">>, self()},
- {State0, [_, _]} = enq(1, 1, first, test_init(test)),
- {State1, _} = check_auto(Cid, 2, State0),
+state_enter_montors_and_notifications_test() ->
+ Oth = spawn(fun () -> ok end),
+ {State0, _} = enq(1, 1, first, test_init(test)),
+ Cid = {<<"adf">>, self()},
+ OthCid = {<<"oth">>, Oth},
+ {State1, _} = check(Cid, 2, State0),
+ {State, _} = check(OthCid, 3, State1),
Self = self(),
- %% as we have an enqueuer _and_ a consumer we chould
- %% get two monitor effects in total, even if they are for the same
- %% processs
+ Effects = state_enter(leader, State),
+
+ %% monitor all enqueuers and consumers
[{monitor, process, Self},
- {monitor, process, Self}] = state_enter(leader, State1),
+ {monitor, process, Oth}] =
+ lists:filter(fun ({monitor, process, _}) -> true;
+ (_) -> false
+ end, Effects),
+ [{send_msg, Self, leader_change, ra_event},
+ {send_msg, Oth, leader_change, ra_event}] =
+ lists:filter(fun ({send_msg, _, leader_change, ra_event}) -> true;
+ (_) -> false
+ end, Effects),
+ ?ASSERT_EFF({monitor, process, _}, Effects),
ok.
-
purge_test() ->
Cid = {<<"purge_test">>, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index c087e35fb2..635d85be4a 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -1,3 +1,19 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
%% @doc Provides an easy to consume API for interacting with the {@link rabbit_fifo.}
%% state machine implementation running inside a `ra' raft system.
%%
@@ -21,7 +37,8 @@
handle_ra_event/3,
untracked_enqueue/2,
purge/1,
- cluster_name/1
+ cluster_name/1,
+ update_machine_state/2
]).
-include_lib("ra/include/ra.hrl").
@@ -375,6 +392,14 @@ purge(Node) ->
cluster_name(#state{cluster_name = ClusterName}) ->
ClusterName.
+update_machine_state(Node, Conf) ->
+ case ra:process_command(Node, {update_state, Conf}) of
+ {ok, ok, _} ->
+ ok;
+ Err ->
+ Err
+ end.
+
%% @doc Handles incoming `ra_events'. Events carry both internal "bookeeping"
%% events emitted by the `ra' leader as well as `rabbit_fifo' emitted events such
%% as message deliveries. All ra events need to be handled by {@module}
@@ -438,7 +463,8 @@ handle_ra_event(From, {applied, Seqs},
fun (Cid, {Settled, Returns, Discards}, Acc) ->
add_command(Cid, settle, Settled,
add_command(Cid, return, Returns,
- add_command(Cid, discard, Discards, Acc)))
+ add_command(Cid, discard,
+ Discards, Acc)))
end, [], State1#state.unsent_commands),
Node = pick_node(State2),
%% send all the settlements and returns
@@ -456,10 +482,21 @@ handle_ra_event(From, {applied, Seqs},
end;
handle_ra_event(Leader, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
handle_delivery(Leader, Del, State0);
+handle_ra_event(Leader, {machine, leader_change},
+ #state{leader = Leader} = State) ->
+ %% leader already known
+ {internal, [], [], State};
+handle_ra_event(Leader, {machine, leader_change}, State0) ->
+ %% we need to update leader
+ %% and resend any pending commands
+ State = resend_all_pending(State0#state{leader = Leader}),
+ {internal, [], [], State};
handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) ->
% TODO: how should these be handled? re-sent on timer or try random
{internal, [], [], State0};
handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) ->
+ % ?INFO("rabbit_fifo_client: rejected ~b not leader ~w leader: ~w~n",
+ % [Seq, From, Leader]),
State1 = State0#state{leader = Leader},
State = resend(Seq, State1),
{internal, [], [], State};
@@ -517,7 +554,9 @@ seq_applied({Seq, MaybeAction},
last_applied = Seq}};
error ->
% must have already been resent or removed for some other reason
- {Corrs, Actions, State}
+ % still need to update last_applied or we may inadvertently resend
+ % stuff later
+ {Corrs, Actions, State#state{last_applied = Seq}}
end;
seq_applied(_Seq, Acc) ->
Acc.
@@ -541,7 +580,7 @@ maybe_add_action(Action, Acc, State) ->
{[Action | Acc], State}.
do_resends(From, To, State) when From =< To ->
- ?INFO("doing resends From ~w To ~w~n", [From, To]),
+ % ?INFO("rabbit_fifo_client: doing resends From ~w To ~w~n", [From, To]),
lists:foldl(fun resend/2, State, lists:seq(From, To));
do_resends(_, _, State) ->
State.
@@ -556,6 +595,10 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) ->
State
end.
+resend_all_pending(#state{pending = Pend} = State) ->
+ Seqs = lists:sort(maps:keys(Pend)),
+ lists:foldl(fun resend/2, State, Seqs).
+
handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,
#state{consumer_deliveries = CDels0} = State0) ->
{LastId, _} = lists:last(IdMsgs),
@@ -610,6 +653,7 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) ->
Missing.
pick_node(#state{leader = undefined, servers = [N | _]}) ->
+ %% TODO: pick random rather that first?
N;
pick_node(#state{leader = Leader}) ->
Leader.
diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl
index e1848862fe..345a99a03c 100644
--- a/src/rabbit_fifo_index.erl
+++ b/src/rabbit_fifo_index.erl
@@ -15,83 +15,83 @@
-include_lib("ra/include/ra.hrl").
-compile({no_auto_import, [size/1]}).
--record(state, {data = #{} :: #{integer() => term()},
- smallest :: undefined | non_neg_integer(),
- largest :: undefined | non_neg_integer()
- }).
+-record(?MODULE, {data = #{} :: #{integer() => term()},
+ smallest :: undefined | non_neg_integer(),
+ largest :: undefined | non_neg_integer()
+ }).
--opaque state() :: #state{}.
+-opaque state() :: #?MODULE{}.
-export_type([state/0]).
-spec empty() -> state().
empty() ->
- #state{}.
+ #?MODULE{}.
-spec fetch(integer(), state()) -> undefined | term().
-fetch(Key, #state{data = Data}) ->
+fetch(Key, #?MODULE{data = Data}) ->
maps:get(Key, Data, undefined).
% only integer keys are supported
-spec append(integer(), term(), state()) -> state().
append(Key, Value,
- #state{data = Data,
+ #?MODULE{data = Data,
smallest = Smallest,
largest = Largest} = State)
when Key > Largest orelse Largest =:= undefined ->
- State#state{data = maps:put(Key, Value, Data),
+ State#?MODULE{data = maps:put(Key, Value, Data),
smallest = ra_lib:default(Smallest, Key),
largest = Key}.
-spec return(integer(), term(), state()) -> state().
-return(Key, Value, #state{data = Data, smallest = Smallest} = State)
+return(Key, Value, #?MODULE{data = Data, smallest = Smallest} = State)
when is_integer(Key) andalso Key < Smallest ->
% TODO: this could potentially result in very large gaps which would
% result in poor performance of smallest/1
% We could try to persist a linked list of "smallests" to make it quicker
% to skip from one to the other - needs measurement
- State#state{data = maps:put(Key, Value, Data),
+ State#?MODULE{data = maps:put(Key, Value, Data),
smallest = Key};
-return(Key, Value, #state{data = Data} = State)
+return(Key, Value, #?MODULE{data = Data} = State)
when is_integer(Key) ->
- State#state{data = maps:put(Key, Value, Data)}.
+ State#?MODULE{data = maps:put(Key, Value, Data)}.
-spec delete(integer(), state()) -> state().
-delete(Smallest, #state{data = Data0,
+delete(Smallest, #?MODULE{data = Data0,
largest = Largest,
smallest = Smallest} = State) ->
Data = maps:remove(Smallest, Data0),
case find_next(Smallest + 1, Largest, Data) of
undefined ->
- State#state{data = Data,
+ State#?MODULE{data = Data,
smallest = undefined,
largest = undefined};
Next ->
- State#state{data = Data, smallest = Next}
+ State#?MODULE{data = Data, smallest = Next}
end;
-delete(Key, #state{data = Data} = State) ->
- State#state{data = maps:remove(Key, Data)}.
+delete(Key, #?MODULE{data = Data} = State) ->
+ State#?MODULE{data = maps:remove(Key, Data)}.
-spec size(state()) -> non_neg_integer().
-size(#state{data = Data}) ->
+size(#?MODULE{data = Data}) ->
maps:size(Data).
-spec smallest(state()) -> undefined | {integer(), term()}.
-smallest(#state{smallest = undefined}) ->
+smallest(#?MODULE{smallest = undefined}) ->
undefined;
-smallest(#state{smallest = Smallest, data = Data}) ->
+smallest(#?MODULE{smallest = Smallest, data = Data}) ->
{Smallest, maps:get(Smallest, Data)}.
-spec next_key_after(non_neg_integer(), state()) -> undefined | integer().
-next_key_after(_Idx, #state{smallest = undefined}) ->
+next_key_after(_Idx, #?MODULE{smallest = undefined}) ->
% map must be empty
undefined;
-next_key_after(Idx, #state{smallest = Smallest,
+next_key_after(Idx, #?MODULE{smallest = Smallest,
largest = Largest})
when Idx+1 < Smallest orelse Idx+1 > Largest ->
undefined;
-next_key_after(Idx, #state{data = Data} = State) ->
+next_key_after(Idx, #?MODULE{data = Data} = State) ->
Next = Idx+1,
case maps:is_key(Next, Data) of
true ->
@@ -101,8 +101,8 @@ next_key_after(Idx, #state{data = Data} = State) ->
end.
-spec map(fun(), state()) -> state().
-map(F, #state{data = Data} = State) ->
- State#state{data = maps:map(F, Data)}.
+map(F, #?MODULE{data = Data} = State) ->
+ State#?MODULE{data = maps:map(F, Data)}.
%% internal
diff --git a/src/rabbit_lager.erl b/src/rabbit_lager.erl
index 21934b5941..da40f207de 100644
--- a/src/rabbit_lager.erl
+++ b/src/rabbit_lager.erl
@@ -20,7 +20,7 @@
%% API
-export([start_logger/0, log_locations/0, fold_sinks/2,
- broker_is_started/0]).
+ broker_is_started/0, set_log_level/1]).
%% For test purposes
-export([configure_lager/0]).
@@ -56,6 +56,34 @@ broker_is_started() ->
ok
end.
+set_log_level(Level) ->
+ IsValidLevel = lists:member(Level, lager_util:levels()),
+ set_log_level(IsValidLevel, Level).
+
+set_log_level(true, Level) ->
+ SinksAndHandlers = [{Sink, gen_event:which_handlers(Sink)} ||
+ Sink <- lager:list_all_sinks()],
+ set_sink_log_level(SinksAndHandlers, Level);
+set_log_level(_, Level) ->
+ {error, {invalid_log_level, Level}}.
+
+set_sink_log_level([], _Level) ->
+ ok;
+set_sink_log_level([{Sink, Handlers}|Rest], Level) ->
+ set_sink_handler_log_level(Sink, Handlers, Level),
+ set_sink_log_level(Rest, Level).
+
+set_sink_handler_log_level(_Sink, [], _Level) ->
+ ok;
+set_sink_handler_log_level(Sink, [Handler|Rest], Level) when is_atom(Handler) ->
+ ok = lager:set_loglevel(Sink, Handler, undefined, Level),
+ set_sink_handler_log_level(Sink, Rest, Level);
+set_sink_handler_log_level(Sink, [{Handler, Id}|Rest], Level) ->
+ ok = lager:set_loglevel(Sink, Handler, Id, Level),
+ set_sink_handler_log_level(Sink, Rest, Level);
+set_sink_handler_log_level(Sink, [_|Rest], Level) ->
+ set_sink_handler_log_level(Sink, Rest, Level).
+
log_locations() ->
ensure_lager_configured(),
DefaultHandlers = application:get_env(lager, handlers, []),
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index a3050c570f..04353423cc 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -462,7 +462,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% immediately after calling is_duplicate). The msg is
%% invalid. We will not see this again, nor will we be
%% further involved in confirming this message, so erase.
- {true, State #state { seen_status = maps:remove(MsgId, SS) }};
+ {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS) }};
{ok, Disposition}
when Disposition =:= confirmed
%% It got published when we were a slave via gm, and
@@ -477,8 +477,8 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% Message was discarded while we were a slave. Confirm now.
%% As above, amqqueue_process will have the entry for the
%% msg_id_to_channel mapping.
- {true, State #state { seen_status = maps:remove(MsgId, SS),
- confirmed = [MsgId | Confirmed] }}
+ {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS),
+ confirmed = [MsgId | Confirmed] }}
end.
set_queue_mode(Mode, State = #state { gm = GM,
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 256d424740..cf431ee04f 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -36,7 +36,7 @@
connection_info_all/0, connection_info_all/1,
emit_connection_info_all/4, emit_connection_info_local/3,
close_connection/2, accept_ack/2,
- tcp_host/1]).
+ handshake/2, tcp_host/1]).
%% Used by TCP-based transports, e.g. STOMP adapter
-export([tcp_listener_addresses/1, tcp_listener_spec/9,
@@ -121,7 +121,6 @@ boot() ->
%% Failures will throw exceptions
_ = boot_listeners(fun boot_tcp/1, application:get_env(rabbit, num_tcp_acceptors, 10), "TCP"),
_ = boot_listeners(fun boot_tls/1, application:get_env(rabbit, num_ssl_acceptors, 10), "TLS"),
- _ = maybe_start_proxy_protocol(),
ok.
boot_listeners(Fun, NumAcceptors, Type) ->
@@ -190,12 +189,6 @@ log_poodle_fail(Context) ->
"'rabbit' section of your configuration file.~n",
[rabbit_misc:otp_release(), Context]).
-maybe_start_proxy_protocol() ->
- case application:get_env(rabbit, proxy_protocol, false) of
- false -> ok;
- true -> application:start(ranch_proxy_protocol)
- end.
-
fix_ssl_options(Config) ->
rabbit_ssl_options:fix(Config).
@@ -263,12 +256,9 @@ start_listener0(Address, NumAcceptors, Protocol, Label, Opts) ->
end.
transport(Protocol) ->
- ProxyProtocol = application:get_env(rabbit, proxy_protocol, false),
- case {Protocol, ProxyProtocol} of
- {amqp, false} -> ranch_tcp;
- {amqp, true} -> ranch_proxy;
- {'amqp/ssl', false} -> ranch_ssl;
- {'amqp/ssl', true} -> ranch_proxy_ssl
+ case Protocol of
+ amqp -> ranch_tcp;
+ 'amqp/ssl' -> ranch_ssl
end.
@@ -368,16 +358,31 @@ close_connection(Pid, Explanation) ->
ok
end.
+handshake(Ref, ProxyProtocol) ->
+ case ProxyProtocol of
+ true ->
+ {ok, ProxyInfo} = ranch:recv_proxy_header(Ref, 1000),
+ {ok, Sock} = ranch:handshake(Ref),
+ tune_buffer_size(Sock),
+ ok = file_handle_cache:obtain(),
+ {ok, {rabbit_proxy_socket, Sock, ProxyInfo}};
+ false ->
+ ranch:handshake(Ref)
+ end.
+
accept_ack(Ref, Sock) ->
ok = ranch:accept_ack(Ref),
- case tune_buffer_size(Sock) of
+ tune_buffer_size(Sock),
+ ok = file_handle_cache:obtain().
+
+tune_buffer_size(Sock) ->
+ case tune_buffer_size1(Sock) of
ok -> ok;
{error, _} -> rabbit_net:fast_close(Sock),
exit(normal)
- end,
- ok = file_handle_cache:obtain().
+ end.
-tune_buffer_size(Sock) ->
+tune_buffer_size1(Sock) ->
case rabbit_net:getopts(Sock, [sndbuf, recbuf, buffer]) of
{ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
rabbit_net:setopts(Sock, [{buffer, BufSz}]);
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index f32d261d20..bbbeb3ce44 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -26,6 +26,8 @@
%%----------------------------------------------------------------------------
+-define(QUEUE, lqueue).
+
-define(UNSENT_MESSAGE_LIMIT, 200).
%% Utilisation average calculations are all in μs.
@@ -128,7 +130,7 @@ consumers(Consumers, Acc) ->
count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
unacknowledged_message_count() ->
- lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
+ lists:sum([?QUEUE:len(C#cr.acktags) || C <- all_ch_record()]).
add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty,
Username, State = #state{consumers = Consumers,
@@ -185,7 +187,7 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
All = priority_queue:join(Consumers, BlockedQ),
ok = erase_ch_record(C),
Filtered = priority_queue:filter(chan_pred(ChPid, true), All),
- {[AckTag || {AckTag, _CTag} <- queue:to_list(ChAckTags)],
+ {[AckTag || {AckTag, _CTag} <- ?QUEUE:to_list(ChAckTags)],
tags(priority_queue:to_list(Filtered)),
State#state{consumers = remove_consumers(ChPid, Consumers)}}
end.
@@ -267,7 +269,7 @@ deliver_to_consumer(FetchFun,
rabbit_channel:deliver(ChPid, CTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
- true -> queue:in({AckTag, CTag}, ChAckTags);
+ true -> ?QUEUE:in({AckTag, CTag}, ChAckTags);
false -> ChAckTags
end,
update_ch_record(C#cr{acktags = ChAckTags1,
@@ -280,7 +282,7 @@ is_blocked(Consumer = {ChPid, _C}) ->
record_ack(ChPid, LimiterPid, AckTag) ->
C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),
- update_ch_record(C#cr{acktags = queue:in({AckTag, none}, ChAckTags)}),
+ update_ch_record(C#cr{acktags = ?QUEUE:in({AckTag, none}, ChAckTags)}),
ok.
subtract_acks(ChPid, AckTags, State) ->
@@ -308,9 +310,9 @@ subtract_acks(ChPid, AckTags, State) ->
subtract_acks([], [], CTagCounts, AckQ) ->
{CTagCounts, AckQ};
subtract_acks([], Prefix, CTagCounts, AckQ) ->
- {CTagCounts, queue:join(queue:from_list(lists:reverse(Prefix)), AckQ)};
+ {CTagCounts, ?QUEUE:join(?QUEUE:from_list(lists:reverse(Prefix)), AckQ)};
subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) ->
- case queue:out(AckQ) of
+ case ?QUEUE:out(AckQ) of
{{value, {T, CTag}}, QTail} ->
subtract_acks(TL, Prefix,
maps:update_with(CTag, fun (Old) -> Old + 1 end, 1, CTagCounts), QTail);
@@ -437,7 +439,7 @@ ch_record(ChPid, LimiterPid) ->
Limiter = rabbit_limiter:client(LimiterPid),
C = #cr{ch_pid = ChPid,
monitor_ref = MonitorRef,
- acktags = queue:new(),
+ acktags = ?QUEUE:new(),
consumer_count = 0,
blocked_consumers = priority_queue:new(),
limiter = Limiter,
@@ -450,7 +452,7 @@ ch_record(ChPid, LimiterPid) ->
update_ch_record(C = #cr{consumer_count = ConsumerCount,
acktags = ChAckTags,
unsent_message_count = UnsentMessageCount}) ->
- case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
+ case {?QUEUE:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
{true, 0, 0} -> ok = erase_ch_record(C);
_ -> ok = store_ch_record(C)
end,
diff --git a/src/rabbit_quorum_memory_manager.erl b/src/rabbit_quorum_memory_manager.erl
new file mode 100644
index 0000000000..347f7f205e
--- /dev/null
+++ b/src/rabbit_quorum_memory_manager.erl
@@ -0,0 +1,76 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+%%
+-module(rabbit_quorum_memory_manager).
+
+-include("rabbit.hrl").
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+-export([register/0, unregister/0]).
+
+-record(state, {last_roll_over,
+ interval}).
+
+-rabbit_boot_step({rabbit_quorum_memory_manager,
+ [{description, "quorum memory manager"},
+ {mfa, {?MODULE, register, []}},
+ {cleanup, {?MODULE, unregister, []}},
+ {requires, rabbit_event},
+ {enables, recovery}]}).
+
+register() ->
+ gen_event:add_handler(rabbit_alarm, ?MODULE, []).
+
+unregister() ->
+ gen_event:delete_handler(rabbit_alarm, ?MODULE, []).
+
+init([]) ->
+ {ok, #state{interval = interval()}}.
+
+handle_call( _, State) ->
+ {ok, ok, State}.
+
+handle_event({set_alarm, {{resource_limit, memory, Node}, []}},
+ #state{last_roll_over = undefined} = State) when Node == node() ->
+ {ok, force_roll_over(State)};
+handle_event({set_alarm, {{resource_limit, memory, Node}, []}},
+ #state{last_roll_over = Last, interval = Interval } = State)
+ when Node == node() ->
+ Now = erlang:system_time(millisecond),
+ case Now > (Last + Interval) of
+ true ->
+ {ok, force_roll_over(State)};
+ false ->
+ {ok, State}
+ end;
+handle_event(_, State) ->
+ {ok, State}.
+
+handle_info(_, State) ->
+ {ok, State}.
+
+terminate(_, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+force_roll_over(State) ->
+ ra_log_wal:force_roll_over(ra_log_wal),
+ State#state{last_roll_over = erlang:system_time(millisecond)}.
+
+interval() ->
+ application:get_env(rabbit, min_wal_roll_over_interval, 20000).
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index e24b907600..5e854a4657 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -17,7 +17,7 @@
-module(rabbit_quorum_queue).
-export([init_state/2, handle_event/2]).
--export([declare/1, recover/1, stop/1, delete/4, delete_immediately/1]).
+-export([declare/1, recover/1, stop/1, delete/4, delete_immediately/2]).
-export([info/1, info/2, stat/1, infos/1]).
-export([ack/3, reject/4, basic_get/4, basic_consume/9, basic_cancel/4]).
-export([credit/4]).
@@ -34,6 +34,7 @@
-export([add_member/3]).
-export([delete_member/3]).
-export([requeue/3]).
+-export([policy_changed/2]).
-export([cleanup_data_dir/0]).
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -74,7 +75,7 @@
-spec infos(rabbit_types:r('queue')) -> rabbit_types:infos().
-spec stat(rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}.
-spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'.
--spec status(rabbit_types:vhost(), Name :: atom()) -> rabbit_types:infos() | {error, term()}.
+-spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> rabbit_types:infos() | {error, term()}.
-define(STATISTICS_KEYS,
[policy,
@@ -93,14 +94,17 @@
%%----------------------------------------------------------------------------
-spec init_state(ra_server_id(), rabbit_types:r('queue')) ->
- rabbit_fifo_client:state().
+ rabbit_fifo_client:state().
init_state({Name, _}, QName) ->
{ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit),
- {ok, #amqqueue{pid = Leader, quorum_nodes = Nodes0}} =
+ %% This lookup could potentially return an {error, not_found}, but we do not
+ %% know what to do if the queue has `disappeared`. Let it crash.
+ {ok, #amqqueue{pid = Leader, quorum_nodes = Nodes}} =
rabbit_amqqueue:lookup(QName),
%% Ensure the leader is listed first
- Nodes = [Leader | lists:delete(Leader, Nodes0)],
- rabbit_fifo_client:init(QName, Nodes, SoftLimit,
+ Servers0 = [{Name, N} || N <- Nodes],
+ Servers = [Leader | lists:delete(Leader, Servers0)],
+ rabbit_fifo_client:init(qname_to_rname(QName), Servers, SoftLimit,
fun() -> credit_flow:block(Name), ok end,
fun() -> credit_flow:unblock(Name), ok end).
@@ -148,12 +152,14 @@ declare(#amqqueue{name = QName,
-ra_machine(Q = #amqqueue{name = QName}) ->
- {module, rabbit_fifo,
- #{dead_letter_handler => dlx_mfa(Q),
- cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]},
- become_leader_handler => {?MODULE, become_leader, [QName]},
- metrics_handler => {?MODULE, update_metrics, [QName]}}}.
+ra_machine(Q) ->
+ {module, rabbit_fifo, ra_machine_config(Q)}.
+
+ra_machine_config(Q = #amqqueue{name = QName}) ->
+ #{dead_letter_handler => dlx_mfa(Q),
+ cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]},
+ become_leader_handler => {?MODULE, become_leader, [QName]},
+ metrics_handler => {?MODULE, update_metrics, [QName]}}.
cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) ->
Node = node(ChPid),
@@ -272,9 +278,10 @@ stop(VHost) ->
delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = QNodes},
_IfUnused, _IfEmpty, ActingUser) ->
%% TODO Quorum queue needs to support consumer tracking for IfUnused
+ Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
Msgs = quorum_messages(Name),
_ = rabbit_amqqueue:internal_delete(QName, ActingUser),
- case ra:delete_cluster([{Name, Node} || Node <- QNodes], 120000) of
+ case ra:delete_cluster([{Name, Node} || Node <- QNodes], Timeout) of
{ok, {_, LeaderNode} = Leader} ->
MRef = erlang:monitor(process, Leader),
receive
@@ -300,11 +307,10 @@ delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = Q
end
end.
-delete_immediately({Name, _} = QPid) ->
- QName = queue_name(Name),
- _ = rabbit_amqqueue:internal_delete(QName, ?INTERNAL_USER),
- ok = ra:delete_cluster([QPid]),
- rabbit_core_metrics:queue_deleted(QName),
+delete_immediately(Resource, {_Name, _} = QPid) ->
+ _ = rabbit_amqqueue:internal_delete(Resource, ?INTERNAL_USER),
+ {ok, _} = ra:delete_cluster([QPid]),
+ rabbit_core_metrics:queue_deleted(Resource),
ok.
ack(CTag, MsgIds, QState) ->
@@ -330,8 +336,10 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of
{ok, empty, QState} ->
{ok, empty, QState};
- {ok, {MsgId, {MsgHeader, Msg}}, QState} ->
- IsDelivered = maps:is_key(delivery_count, MsgHeader),
+ {ok, {MsgId, {MsgHeader, Msg0}}, QState} ->
+ Count = maps:get(delivery_count, MsgHeader, 0),
+ IsDelivered = Count > 0,
+ Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0),
{ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState};
{timeout, _} ->
{error, timeout}
@@ -411,6 +419,10 @@ maybe_delete_data_dir(UId) ->
ok
end.
+policy_changed(QName, Node) ->
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ rabbit_fifo_client:update_machine_state(Node, ra_machine_config(Q)).
+
cluster_state(Name) ->
case whereis(Name) of
undefined -> down;
@@ -547,9 +559,13 @@ args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) ->
dead_letter_publish(undefined, _, _, _) ->
ok;
dead_letter_publish(X, RK, QName, ReasonMsgs) ->
- {ok, Exchange} = rabbit_exchange:lookup(X),
- [rabbit_dead_letter:publish(Msg, Reason, Exchange, RK, QName)
- || {Reason, Msg} <- ReasonMsgs].
+ case rabbit_exchange:lookup(X) of
+ {ok, Exchange} ->
+ [rabbit_dead_letter:publish(Msg, Reason, Exchange, RK, QName)
+ || {Reason, Msg} <- ReasonMsgs];
+ {error, not_found} ->
+ ok
+ end.
%% TODO escape hack
qname_to_rname(#resource{virtual_host = <<"/">>, name = Name}) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 91002d0b94..8370d08ed9 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -57,12 +57,12 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/3, info_keys/0, info/1, info/2,
+-export([start_link/2, info_keys/0, info/1, info/2,
shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/4, mainloop/4, recvloop/4]).
+-export([init/3, mainloop/4, recvloop/4]).
-export([conserve_resources/3, server_properties/1]).
@@ -157,7 +157,7 @@
%%--------------------------------------------------------------------------
--spec start_link(pid(), any(), rabbit_net:socket()) -> rabbit_types:ok(pid()).
+-spec start_link(pid(), any()) -> rabbit_types:ok(pid()).
-spec info_keys() -> rabbit_types:info_keys().
-spec info(pid()) -> rabbit_types:infos().
-spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos().
@@ -170,7 +170,7 @@
rabbit_framing:amqp_table().
%% These specs only exists to add no_return() to keep dialyzer happy
--spec init(pid(), pid(), any(), rabbit_net:socket()) -> no_return().
+-spec init(pid(), pid(), any()) -> no_return().
-spec start_connection(pid(), pid(), any(), rabbit_net:socket()) ->
no_return().
@@ -181,18 +181,18 @@
%%--------------------------------------------------------------------------
-start_link(HelperSup, Ref, Sock) ->
- Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSup, Ref, Sock]),
+start_link(HelperSup, Ref) ->
+ Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSup, Ref]),
{ok, Pid}.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent, HelperSup, Ref, Sock) ->
+init(Parent, HelperSup, Ref) ->
?LG_PROCESS_TYPE(reader),
- RealSocket = rabbit_net:unwrap_socket(Sock),
- rabbit_networking:accept_ack(Ref, RealSocket),
+ {ok, Sock} = rabbit_networking:handshake(Ref,
+ application:get_env(rabbit, proxy_protocol, false)),
Deb = sys:debug_options([]),
start_connection(Parent, HelperSup, Deb, Sock).
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index c460b02e5b..e462fc6bc0 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -208,7 +208,16 @@ delete_storage(VHost) ->
VhostDir = msg_store_dir_path(VHost),
rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]),
%% Message store should be closed when vhost supervisor is closed.
- ok = rabbit_file:recursive_delete([VhostDir]).
+ case rabbit_file:recursive_delete([VhostDir]) of
+ ok -> ok;
+ {error, {_, enoent}} ->
+ %% a concurrent delete did the job for us
+ rabbit_log:warning("Tried to delete storage directories for vhost '~s', it failed with an ENOENT", [VHost]),
+ ok;
+ Other ->
+ rabbit_log:warning("Tried to delete storage directories for vhost '~s': ~p", [VHost, Other]),
+ Other
+ end.
assert_benign(ok, _) -> ok;
assert_benign({ok, _}, _) -> ok;
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 265bcb45e3..e495ab8677 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -35,7 +35,7 @@ memory() ->
{Sums, _Other} = sum_processes(
lists:append(All), distinguishers(), [memory]),
- [Qs, QsSlave, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther,
+ [Qs, QsSlave, Qqs, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther,
MsgIndexProc, MgmtDbProc, Plugins] =
[aggregate(Names, Sums, memory, fun (X) -> X end)
|| Names <- distinguished_interesting_sups()],
@@ -69,7 +69,7 @@ memory() ->
OtherProc = Processes
- ConnsReader - ConnsWriter - ConnsChannel - ConnsOther
- - Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc,
+ - Qs - QsSlave - Qqs - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc,
[
%% Connections
@@ -81,6 +81,7 @@ memory() ->
%% Queues
{queue_procs, Qs},
{queue_slave_procs, QsSlave},
+ {quorum_queue_procs, Qqs},
%% Processes
{plugins, Plugins},
@@ -124,7 +125,7 @@ binary() ->
sets:add_element({Ptr, Sz}, Acc0)
end, Acc, Info)
end, distinguishers(), [{binary, sets:new()}]),
- [Other, Qs, QsSlave, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther,
+ [Other, Qs, QsSlave, Qqs, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther,
MsgIndexProc, MgmtDbProc, Plugins] =
[aggregate(Names, [{other, Rest} | Sums], binary, fun sum_binary/1)
|| Names <- [[other] | distinguished_interesting_sups()]],
@@ -134,6 +135,7 @@ binary() ->
{connection_other, ConnsOther},
{queue_procs, Qs},
{queue_slave_procs, QsSlave},
+ {quorum_queue_procs, Qqs},
{plugins, Plugins},
{mgmt_db, MgmtDbProc},
{msg_index, MsgIndexProc},
@@ -173,11 +175,16 @@ bytes(Words) -> try
end.
interesting_sups() ->
- [queue_sups(), conn_sups() | interesting_sups0()].
+ [queue_sups(), quorum_sups(), conn_sups() | interesting_sups0()].
queue_sups() ->
all_vhosts_children(rabbit_amqqueue_sup_sup).
+quorum_sups() ->
+ %% TODO: in the future not all ra servers may be queues and we needs
+ %% some way to filter this
+ [ra_server_sup].
+
msg_stores() ->
all_vhosts_children(msg_store_transient)
++
@@ -229,6 +236,7 @@ distinguished_interesting_sups() ->
[
with(queue_sups(), master),
with(queue_sups(), slave),
+ quorum_sups(),
with(conn_sups(), reader),
with(conn_sups(), writer),
with(conn_sups(), channel),
diff --git a/test/config_schema_SUITE_data/rabbit.snippets b/test/config_schema_SUITE_data/rabbit.snippets
index 625fcd93a9..b318adaa12 100644
--- a/test/config_schema_SUITE_data/rabbit.snippets
+++ b/test/config_schema_SUITE_data/rabbit.snippets
@@ -568,12 +568,27 @@ credential_validator.regexp = ^abc\\d+",
{delegate_count, 64}
]}],
[]},
+
{kernel_net_ticktime,
"net_ticktime = 20",
[{kernel, [
{net_ticktime, 20}
]}],
[]},
+
+ {kernel_inet_dist_listen_min,
+ "inet_dist_listen_min = 16000",
+ [{kernel, [
+ {inet_dist_listen_min, 16000}
+ ]}],
+ []},
+ {kernel_inet_dist_listen_max,
+ "inet_dist_listen_max = 16100",
+ [{kernel, [
+ {inet_dist_listen_max, 16100}
+ ]}],
+ []},
+
{log_syslog_settings,
"log.syslog = true
log.syslog.identity = rabbitmq
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 19f3a66a1d..5b87c5be20 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -48,18 +48,24 @@ groups() ->
++ all_tests()},
{cluster_size_3, [], [
declare_during_node_down,
+ simple_confirm_availability_on_leader_change,
+ confirm_availability_on_leader_change,
recover_from_single_failure,
recover_from_multiple_failures,
leadership_takeover,
delete_declare,
metrics_cleanup_on_leadership_takeover,
metrics_cleanup_on_leader_crash,
- consume_in_minority
- ]},
+ consume_in_minority]},
{cluster_size_5, [], [start_queue,
start_queue_concurrent,
quorum_cluster_size_3,
quorum_cluster_size_7
+ ]},
+ {clustered_with_partitions, [], [
+ reconnect_consumer_and_publish,
+ reconnect_consumer_and_wait,
+ reconnect_consumer_and_wait_channel_down
]}
]}
].
@@ -100,6 +106,7 @@ all_tests() ->
dead_letter_to_classic_queue,
dead_letter_to_quorum_queue,
dead_letter_from_classic_to_quorum_queue,
+ dead_letter_policy,
cleanup_queue_state_on_channel_after_publish,
cleanup_queue_state_on_channel_after_subscribe,
basic_cancel,
@@ -108,7 +115,12 @@ all_tests() ->
cancel_sync_queue,
basic_recover,
idempotent_recover,
- vhost_with_quorum_queue_is_deleted
+ vhost_with_quorum_queue_is_deleted,
+ delete_immediately,
+ delete_immediately_by_resource,
+ consume_redelivery_count,
+ subscribe_redelivery_count,
+ memory_alarm_rolls_wal
].
%% -------------------------------------------------------------------
@@ -117,7 +129,9 @@ all_tests() ->
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
- rabbit_ct_helpers:run_setup_steps(Config).
+ rabbit_ct_helpers:run_setup_steps(
+ Config,
+ [fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1]).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
@@ -126,6 +140,8 @@ init_per_group(clustered, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]);
init_per_group(unclustered, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]);
+init_per_group(clustered_with_partitions, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{net_ticktime, 10}]);
init_per_group(Group, Config) ->
ClusterSize = case Group of
single_node -> 1;
@@ -137,7 +153,8 @@ init_per_group(Group, Config) ->
[{rmq_nodes_count, ClusterSize},
{rmq_nodename_suffix, Group},
{tcp_ports_base}]),
- Config2 = rabbit_ct_helpers:run_steps(Config1,
+ Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]),
+ Config2 = rabbit_ct_helpers:run_steps(Config1b,
[fun merge_app_env/1 ] ++
rabbit_ct_broker_helpers:setup_steps()),
ok = rabbit_ct_broker_helpers:rpc(
@@ -157,10 +174,28 @@ end_per_group(clustered, Config) ->
Config;
end_per_group(unclustered, Config) ->
Config;
+end_per_group(clustered_with_partitions, Config) ->
+ Config;
end_per_group(_, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).
+init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish;
+ Testcase == reconnect_consumer_and_wait;
+ Testcase == reconnect_consumer_and_wait_channel_down ->
+ Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
+ Q = rabbit_data_coercion:to_binary(Testcase),
+ Config2 = rabbit_ct_helpers:set_config(Config1,
+ [{rmq_nodes_count, 3},
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base},
+ {queue_name, Q}
+ ]),
+ rabbit_ct_helpers:run_steps(Config2,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++
+ [fun rabbit_ct_broker_helpers:enable_dist_proxy/1,
+ fun rabbit_ct_broker_helpers:cluster_nodes/1]);
init_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
@@ -168,12 +203,21 @@ init_per_testcase(Testcase, Config) ->
Config2 = rabbit_ct_helpers:set_config(Config1,
[{queue_name, Q}
]),
- rabbit_ct_helpers:run_steps(Config2,
- rabbit_ct_client_helpers:setup_steps()).
+ rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).
merge_app_env(Config) ->
- rabbit_ct_helpers:merge_app_env(Config, {rabbit, [{core_metrics_gc_interval, 100}]}).
-
+ rabbit_ct_helpers:merge_app_env(
+ rabbit_ct_helpers:merge_app_env(Config,
+ {rabbit, [{core_metrics_gc_interval, 100}]}),
+ {ra, [{min_wal_roll_over_interval, 30000}]}).
+
+end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish;
+ Testcase == reconnect_consumer_and_wait;
+ Testcase == reconnect_consumer_and_wait_channel_down ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase);
end_per_testcase(Testcase, Config) ->
catch delete_queues(),
Config1 = rabbit_ct_helpers:run_steps(
@@ -539,6 +583,19 @@ publish(Config) ->
wait_for_messages_ready(Servers, Name, 1),
wait_for_messages_pending_ack(Servers, Name, 0).
+publish_confirm(Ch, QName) ->
+ publish(Ch, QName),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ ct:pal("waiting for confirms from ~s", [QName]),
+ ok = receive
+ #'basic.ack'{} -> ok;
+ #'basic.nack'{} -> fail
+ after 2500 ->
+ exit(confirm_timeout)
+ end,
+ ct:pal("CONFIRMED! ~s", [QName]),
+ ok.
+
ra_name(Q) ->
binary_to_atom(<<"%2F_", Q/binary>>, utf8).
@@ -1064,22 +1121,47 @@ dead_letter_to_classic_queue(Config) ->
{<<"x-dead-letter-routing-key">>, longstr, CQ}
])),
?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
- RaName = ra_name(QQ),
- publish(Ch, QQ),
+ test_dead_lettering(true, Config, Ch, Servers, ra_name(QQ), QQ, CQ).
+
+test_dead_lettering(PolicySet, Config, Ch, Servers, RaName, Source, Destination) ->
+ publish(Ch, Source),
wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0),
- wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]),
- DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]),
+ DeliveryTag = consume(Ch, Source, false),
wait_for_messages_ready(Servers, RaName, 0),
wait_for_messages_pending_ack(Servers, RaName, 1),
- wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]),
+ wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]),
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = false}),
wait_for_messages_ready(Servers, RaName, 0),
wait_for_messages_pending_ack(Servers, RaName, 0),
- wait_for_messages(Config, [[CQ, <<"1">>, <<"1">>, <<"0">>]]),
- _ = consume(Ch, CQ, false).
+ case PolicySet of
+ true ->
+ wait_for_messages(Config, [[Destination, <<"1">>, <<"1">>, <<"0">>]]),
+ _ = consume(Ch, Destination, true);
+ false ->
+ wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]])
+ end.
+
+dead_letter_policy(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ CQ = <<"classic-dead_letter_policy">>,
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"dlx">>, <<"dead_letter.*">>, <<"queues">>,
+ [{<<"dead-letter-exchange">>, <<"">>},
+ {<<"dead-letter-routing-key">>, CQ}]),
+ RaName = ra_name(QQ),
+ test_dead_lettering(true, Config, Ch, Servers, RaName, QQ, CQ),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"dlx">>),
+ test_dead_lettering(false, Config, Ch, Servers, RaName, QQ, CQ).
dead_letter_to_quorum_queue(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1488,6 +1570,82 @@ declare_during_node_down(Config) ->
wait_for_messages_ready(Servers, RaName, 1),
ok.
+simple_confirm_availability_on_leader_change(Config) ->
+ [Node1, Node2, _Node3] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ %% declare a queue on node2 - this _should_ host the leader on node 2
+ DCh = rabbit_ct_client_helpers:open_channel(Config, Node2),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(DCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ erlang:process_flag(trap_exit, true),
+ %% open a channel to another node
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ publish_confirm(Ch, QQ),
+
+ %% stop the node hosting the leader
+ stop_node(Config, Node2),
+ %% this should not fail as the channel should detect the new leader and
+ %% resend to that
+ publish_confirm(Ch, QQ),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
+ ok.
+
+confirm_availability_on_leader_change(Config) ->
+ [Node1, Node2, _Node3] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ %% declare a queue on node2 - this _should_ host the leader on node 2
+ DCh = rabbit_ct_client_helpers:open_channel(Config, Node2),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(DCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ erlang:process_flag(trap_exit, true),
+ Pid = spawn_link(fun () ->
+ %% open a channel to another node
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ ConfirmLoop = fun Loop() ->
+ publish_confirm(Ch, QQ),
+ receive {done, P} ->
+ P ! done,
+ ok
+ after 0 -> Loop() end
+ end,
+ ConfirmLoop()
+ end),
+
+ timer:sleep(500),
+ %% stop the node hosting the leader
+ stop_node(Config, Node2),
+ %% this should not fail as the channel should detect the new leader and
+ %% resend to that
+ timer:sleep(500),
+ Pid ! {done, self()},
+ receive
+ done -> ok;
+ {'EXIT', Pid, Err} ->
+ exit(Err)
+ after 5500 ->
+ flush(100),
+ exit(bah)
+ end,
+ ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
+ ok.
+
+flush(T) ->
+ receive X ->
+ ct:pal("flushed ~w", [X]),
+ flush(T)
+ after T ->
+ ok
+ end.
+
+
add_member_not_running(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1587,6 +1745,7 @@ delete_member(Config) ->
rpc:call(Server, rabbit_quorum_queue, delete_member,
[<<"/">>, QQ, Server])).
+
cleanup_data_dir(Config) ->
%% This test is slow, but also checks that we handle properly errors when
%% trying to delete a queue in minority. A case clause there had gone
@@ -1608,6 +1767,7 @@ cleanup_data_dir(Config) ->
?assertExit({{shutdown,
{connection_closing, {server_initiated_close, 541, _}}}, _},
amqp_channel:call(Ch, #'queue.delete'{queue = QQ})),
+ catch amqp_channel:call(Ch, #'queue.delete'{queue = QQ}),
?assert(filelib:is_dir(DataDir)),
?assertEqual(ok,
@@ -1615,6 +1775,132 @@ cleanup_data_dir(Config) ->
[])),
?assert(not filelib:is_dir(DataDir)).
+reconnect_consumer_and_publish(Config) ->
+ [Server | _] = Servers =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
+ [F1, F2] = lists:delete(Leader, Servers),
+ ChF = rabbit_ct_client_helpers:open_channel(Config, F1),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(ChF, QQ, false),
+ receive
+ {#'basic.deliver'{redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1)
+ end,
+ Up = [Leader, F2],
+ rabbit_ct_broker_helpers:block_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:block_traffic_between(F1, F2),
+ wait_for_messages_ready(Up, RaName, 1),
+ wait_for_messages_pending_ack(Up, RaName, 0),
+ wait_for_messages_ready([F1], RaName, 0),
+ wait_for_messages_pending_ack([F1], RaName, 1),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, F2),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 2),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1)
+ end,
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag2,
+ redelivered = true}, _} ->
+ amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag2,
+ multiple = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+reconnect_consumer_and_wait(Config) ->
+ [Server | _] = Servers =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
+ [F1, F2] = lists:delete(Leader, Servers),
+ ChF = rabbit_ct_client_helpers:open_channel(Config, F1),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(ChF, QQ, false),
+ receive
+ {#'basic.deliver'{redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1)
+ end,
+ Up = [Leader, F2],
+ rabbit_ct_broker_helpers:block_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:block_traffic_between(F1, F2),
+ wait_for_messages_ready(Up, RaName, 1),
+ wait_for_messages_pending_ack(Up, RaName, 0),
+ wait_for_messages_ready([F1], RaName, 0),
+ wait_for_messages_pending_ack([F1], RaName, 1),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, F2),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = true}, _} ->
+ amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+reconnect_consumer_and_wait_channel_down(Config) ->
+ [Server | _] = Servers =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
+ [F1, F2] = lists:delete(Leader, Servers),
+ ChF = rabbit_ct_client_helpers:open_channel(Config, F1),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(ChF, QQ, false),
+ receive
+ {#'basic.deliver'{redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1)
+ end,
+ Up = [Leader, F2],
+ rabbit_ct_broker_helpers:block_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:block_traffic_between(F1, F2),
+ wait_for_messages_ready(Up, RaName, 1),
+ wait_for_messages_pending_ack(Up, RaName, 0),
+ wait_for_messages_ready([F1], RaName, 0),
+ wait_for_messages_pending_ack([F1], RaName, 1),
+ rabbit_ct_client_helpers:close_channel(ChF),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, F2),
+ %% Let's give it a few seconds to ensure it doesn't attempt to
+ %% deliver to the down channel - it shouldn't be monitored
+ %% at this time!
+ timer:sleep(5000),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
basic_recover(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1633,6 +1919,156 @@ basic_recover(Config) ->
amqp_channel:cast(Ch, #'basic.recover'{requeue = true}),
wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0).
+
+delete_immediately(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}],
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, Args)),
+
+ Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."],
+ {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd),
+ ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])),
+
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ amqp_channel:call(Ch, #'queue.declare'{queue = QQ,
+ durable = true,
+ passive = true,
+ auto_delete = false,
+ arguments = Args})).
+
+delete_immediately_by_resource(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ Cmd2 = ["eval", "rabbit_amqqueue:delete_immediately_by_resource([rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)])."],
+ ?assertEqual({ok, "ok\n"}, rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd2)),
+
+ %% Check that the application and process are down
+ wait_until(fun() ->
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ end),
+ ?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))).
+
+subscribe_redelivery_count(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+
+ DTag = <<"x-delivery-count">>,
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{props = #'P_basic'{headers = H0}}} ->
+ ?assertMatch({DTag, _, 0}, rabbit_basic:header(DTag, H0)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true})
+ end,
+
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H1}}} ->
+ ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
+ multiple = false,
+ requeue = true})
+ end,
+
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag2,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H2}}} ->
+ ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2,
+ multiple = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+consume_redelivery_count(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+
+ DTag = <<"x-delivery-count">>,
+
+ {#'basic.get_ok'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{props = #'P_basic'{headers = H0}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = false}),
+ ?assertMatch({DTag, _, 0}, rabbit_basic:header(DTag, H0)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+ %% wait for requeueing
+ timer:sleep(500),
+
+ {#'basic.get_ok'{delivery_tag = DeliveryTag1,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H1}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = false}),
+ ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
+ multiple = false,
+ requeue = true}),
+
+ {#'basic.get_ok'{delivery_tag = DeliveryTag2,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H2}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = false}),
+ ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2,
+ multiple = false,
+ requeue = true}).
+
+memory_alarm_rolls_wal(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ WalDataDir = rpc:call(Server, ra_env, wal_data_dir, []),
+ [Wal0] = filelib:wildcard(WalDataDir ++ "/*.wal"),
+ ok = rpc:call(Server, rabbit_alarm, set_alarm,
+ [{{resource_limit, memory, Server}, []}]),
+ timer:sleep(1000),
+ [Wal1] = filelib:wildcard(WalDataDir ++ "/*.wal"),
+ ?assert(Wal0 =/= Wal1),
+ %% roll over shouldn't happen if we trigger a new alarm in less than
+ %% min_wal_roll_over_interval
+ ok = rpc:call(Server, rabbit_alarm, set_alarm,
+ [{{resource_limit, memory, Server}, []}]),
+ timer:sleep(1000),
+ [Wal2] = filelib:wildcard(WalDataDir ++ "/*.wal"),
+ ?assert(Wal1 == Wal2).
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->
@@ -1684,7 +2120,7 @@ filter_queues(Expected, Got) ->
end, Got).
publish(Ch, Queue) ->
- ok = amqp_channel:call(Ch,
+ ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = Queue},
#amqp_msg{props = #'P_basic'{delivery_mode = 2},
payload = <<"msg">>}).
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index a2e22afc2e..56608e9af3 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -99,8 +99,13 @@ basics(Config) ->
_ = ra:stop_server(ServerId),
_ = ra:restart_server(ServerId),
- % give time to become leader
- timer:sleep(500),
+ %% wait for leader change to notice server is up again
+ receive
+ {ra_event, _, {machine, leader_change}} -> ok
+ after 5000 ->
+ exit(leader_change_timeout)
+ end,
+
{ok, FState6} = rabbit_fifo_client:enqueue(two, FState5b),
% process applied event
FState6b = process_ra_event(FState6, 250),
@@ -523,8 +528,9 @@ conf(ClusterName, UId, ServerId, _, Peers) ->
process_ra_event(State, Wait) ->
receive
{ra_event, From, Evt} ->
- % ct:pal("processed ra event ~p~n", [Evt]),
- {internal, _, _, S} = rabbit_fifo_client:handle_ra_event(From, Evt, State),
+ ct:pal("processed ra event ~p~n", [Evt]),
+ {internal, _, _, S} =
+ rabbit_fifo_client:handle_ra_event(From, Evt, State),
S
after Wait ->
exit(ra_event_timeout)
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
new file mode 100644
index 0000000000..48e7b9aa7f
--- /dev/null
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -0,0 +1,348 @@
+-module(rabbit_fifo_prop_SUITE).
+
+-compile(export_all).
+
+-export([
+ ]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("proper/include/proper.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+%%%===================================================================
+%%% Common Test callbacks
+%%%===================================================================
+
+all() ->
+ [
+ {group, tests}
+ ].
+
+
+all_tests() ->
+ [
+ snapshots,
+ scenario1,
+ scenario2,
+ scenario3,
+ scenario4
+ ].
+
+groups() ->
+ [
+ {tests, [], all_tests()}
+ ].
+
+init_per_suite(Config) ->
+ Config.
+
+end_per_suite(_Config) ->
+ ok.
+
+init_per_group(_Group, Config) ->
+ Config.
+
+end_per_group(_Group, _Config) ->
+ ok.
+
+init_per_testcase(_TestCase, Config) ->
+ Config.
+
+end_per_testcase(_TestCase, _Config) ->
+ ok.
+
+%%%===================================================================
+%%% Test cases
+%%%===================================================================
+
+% -type log_op() ::
+% {enqueue, pid(), maybe(msg_seqno()), Msg :: raw_msg()}.
+
+scenario1(_Config) ->
+ C1 = {<<>>, c:pid(0,6723,1)},
+ C2 = {<<0>>,c:pid(0,6723,1)},
+ E = c:pid(0,6720,1),
+
+ Commands = [
+ {checkout,{auto,2,simple_prefetch},C1},
+ {enqueue,E,1,msg1},
+ {enqueue,E,2,msg2},
+ {checkout,cancel,C1}, %% both on returns queue
+ {checkout,{auto,1,simple_prefetch},C2}, % on on return one on C2
+ {return,[0],C2}, %% E1 in returns, E2 with C2
+ {return,[1],C2}, %% E2 in returns E1 with C2
+ {settle,[2],C2} %% E2 with C2
+ ],
+ run_snapshot_test(?FUNCTION_NAME, Commands),
+ ok.
+
+scenario2(_Config) ->
+ C1 = {<<>>, c:pid(0,346,1)},
+ C2 = {<<>>,c:pid(0,379,1)},
+ E = c:pid(0,327,1),
+ Commands = [{checkout,{auto,1,simple_prefetch},C1},
+ {enqueue,E,1,msg1},
+ {checkout,cancel,C1},
+ {enqueue,E,2,msg2},
+ {checkout,{auto,1,simple_prefetch},C2},
+ {settle,[0],C1},
+ {settle,[0],C2}
+ ],
+ run_snapshot_test(?FUNCTION_NAME, Commands),
+ ok.
+
+scenario3(_Config) ->
+ C1 = {<<>>, c:pid(0,179,1)},
+ E = c:pid(0,176,1),
+ Commands = [{checkout,{auto,2,simple_prefetch},C1},
+ {enqueue,E,1,msg1},
+ {return,[0],C1},
+ {enqueue,E,2,msg2},
+ {enqueue,E,3,msg3},
+ {settle,[1],C1},
+ {settle,[2],C1}],
+ run_snapshot_test(?FUNCTION_NAME, Commands),
+ ok.
+
+scenario4(_Config) ->
+ C1 = {<<>>, c:pid(0,179,1)},
+ E = c:pid(0,176,1),
+Commands = [{checkout,{auto,1,simple_prefetch},C1},
+ {enqueue,E,1,msg},
+ {settle,[0],C1}],
+ run_snapshot_test(?FUNCTION_NAME, Commands),
+ ok.
+
+snapshots(_Config) ->
+ run_proper(
+ fun () ->
+ ?FORALL(O, ?LET(Ops, log_gen(), expand(Ops)),
+ test1_prop(O))
+ end, [], 1000).
+
+test1_prop(Commands) ->
+ ct:pal("Commands: ~p~n", [Commands]),
+ try run_snapshot_test(?FUNCTION_NAME, Commands) of
+ _ -> true
+ catch
+ Err ->
+ ct:pal("Err: ~p~n", [Err]),
+ false
+ end.
+
+log_gen() ->
+ ?LET(EPids, vector(2, pid_gen()),
+ ?LET(CPids, vector(2, pid_gen()),
+ resize(200,
+ list(
+ frequency(
+ [{20, enqueue_gen(oneof(EPids))},
+ {40, {input_event,
+ frequency([{10, settle},
+ {2, return},
+ {1, discard},
+ {1, requeue}])}},
+ {2, checkout_gen(oneof(CPids))},
+ {1, checkout_cancel_gen(oneof(CPids))},
+ {1, down_gen(oneof(EPids ++ CPids))},
+ {1, purge}
+ ]))))).
+
+pid_gen() ->
+ ?LET(_, integer(), spawn(fun () -> ok end)).
+
+down_gen(Pid) ->
+ ?LET(E, {down, Pid, oneof([noconnection, noproc])}, E).
+
+enqueue_gen(Pid) ->
+ ?LET(E, {enqueue, Pid, frequency([{10, enqueue},
+ {1, delay}])}, E).
+
+checkout_cancel_gen(Pid) ->
+ {checkout, Pid, cancel}.
+
+checkout_gen(Pid) ->
+ %% pid, tag, prefetch
+ ?LET(C, {checkout, {binary(), Pid}, choose(1, 10)}, C).
+
+
+-record(t, {state = rabbit_fifo:init(#{name => proper,
+ shadow_copy_interval => 1})
+ :: rabbit_fifo:state(),
+ index = 1 :: non_neg_integer(), %% raft index
+ enqueuers = #{} :: #{pid() => term()},
+ consumers = #{} :: #{{binary(), pid()} => term()},
+ effects = queue:new() :: queue:queue(),
+ log = [] :: list(),
+ down = #{} :: #{pid() => noproc | noconnection}
+ }).
+
+expand(Ops) ->
+ %% execute each command against a rabbit_fifo state and capture all releavant
+ %% effects
+ T = #t{},
+ #t{effects = Effs} = T1 = lists:foldl(fun handle_op/2, T, Ops),
+ %% process the remaining effects
+ #t{log = Log} = lists:foldl(fun do_apply/2,
+ T1#t{effects = queue:new()},
+ queue:to_list(Effs)),
+
+ lists:reverse(Log).
+
+
+handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0,
+ down = Down,
+ effects = Effs} = T) ->
+ case Down of
+ #{Pid := noproc} ->
+ %% if it's a noproc then it cannot exist - can it?
+ %% drop operation
+ T;
+ _ ->
+ Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0),
+ MsgSeq = maps:get(Pid, Enqs),
+ Cmd = {enqueue, Pid, MsgSeq, msg},
+ case When of
+ enqueue ->
+ do_apply(Cmd, T#t{enqueuers = Enqs});
+ delay ->
+ %% just put the command on the effects queue
+ ct:pal("delaying ~w", [Cmd]),
+ T#t{effects = queue:in(Cmd, Effs)}
+ end
+ end;
+handle_op({checkout, Pid, cancel}, #t{consumers = Cons0} = T) ->
+ case maps:keys(
+ maps:filter(fun ({_, P}, _) when P == Pid -> true;
+ (_, _) -> false
+ end, Cons0)) of
+ [CId | _] ->
+ Cons = maps:remove(CId, Cons0),
+ Cmd = {checkout, cancel, CId},
+ do_apply(Cmd, T#t{consumers = Cons});
+ _ ->
+ T
+ end;
+handle_op({checkout, CId, Prefetch}, #t{consumers = Cons0} = T) ->
+ case Cons0 of
+ #{CId := _} ->
+ %% ignore if it already exists
+ T;
+ _ ->
+ Cons = maps:put(CId, ok, Cons0),
+ Cmd = {checkout, {auto, Prefetch, simple_prefetch}, CId},
+ do_apply(Cmd, T#t{consumers = Cons})
+ end;
+handle_op({down, Pid, Reason} = Cmd, #t{down = Down} = T) ->
+ case Down of
+ #{Pid := noproc} ->
+ %% it it permanently down, cannot upgrade
+ T;
+ _ ->
+ %% it is either not down or down with noconnection
+ do_apply(Cmd, T#t{down = maps:put(Pid, Reason, Down)})
+ end;
+handle_op({input_event, requeue}, #t{effects = Effs} = T) ->
+ %% this simulates certain settlements arriving out of order
+ case queue:out(Effs) of
+ {{value, Cmd}, Q} ->
+ T#t{effects = queue:in(Cmd, Q)};
+ _ ->
+ T
+ end;
+handle_op({input_event, Settlement}, #t{effects = Effs} = T) ->
+ case queue:out(Effs) of
+ {{value, {settle, MsgIds, CId}}, Q} ->
+ do_apply({Settlement, MsgIds, CId}, T#t{effects = Q});
+ {{value, {enqueue, _, _, _} = Cmd}, Q} ->
+ do_apply(Cmd, T#t{effects = Q});
+ _ ->
+ T
+ end;
+handle_op(purge, T) ->
+ do_apply(purge, T).
+
+do_apply(Cmd, #t{effects = Effs, index = Index, state = S0,
+ log = Log} = T) ->
+ {S, Effects, _} = rabbit_fifo:apply(#{index => Index}, Cmd, [], S0),
+ T#t{state = S,
+ index = Index + 1,
+ effects = enq_effs(Effects, Effs),
+ log = [Cmd | Log]}.
+
+enq_effs([], Q) -> Q;
+enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) ->
+ MsgIds = [I || {I, _} <- Msgs],
+ %% always make settle commands by default
+ %% they can be changed depending on the input event later
+ Cmd = {settle, MsgIds, {CTag, P}},
+ enq_effs(Rem, queue:in(Cmd, Q));
+enq_effs([_ | Rem], Q) ->
+ % ct:pal("enq_effs dropping ~w~n", [E]),
+ enq_effs(Rem, Q).
+
+
+%% Utility
+run_proper(Fun, Args, NumTests) ->
+ ?assertEqual(
+ true,
+ proper:counterexample(
+ erlang:apply(Fun, Args),
+ [{numtests, NumTests},
+ {on_output, fun(".", _) -> ok; % don't print the '.'s on new lines
+ (F, A) -> ct:pal(?LOW_IMPORTANCE, F, A)
+ end}])).
+
+run_snapshot_test(Name, Commands) ->
+ %% create every incremental permuation of the commands lists
+ %% and run the snapshot tests against that
+ [begin
+ % ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]),
+ run_snapshot_test0(Name, C)
+ end || C <- prefixes(Commands, 1, [])].
+
+run_snapshot_test0(Name, Commands) ->
+ Indexes = lists:seq(1, length(Commands)),
+ Entries = lists:zip(Indexes, Commands),
+ {State, Effects} = run_log(test_init(Name), Entries),
+
+ [begin
+ Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
+ (_) -> false
+ end, Entries),
+ % L = case Filtered of
+ % [] -> undefined;
+ % _ ->lists:last(Filtered)
+ % end,
+
+ % ct:pal("running from snapshot: ~b to ~w"
+ % "~n~p~n",
+ % [SnapIdx, L, SnapState]),
+ {S, _} = run_log(SnapState, Filtered),
+ % assert log can be restored from any release cursor index
+ % ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n",
+ % [Name, SnapIdx, S, State, SnapState, Filtered]),
+ ?assertEqual(State, S)
+ end || {release_cursor, SnapIdx, SnapState} <- Effects],
+ ok.
+
+prefixes(Source, N, Acc) when N > length(Source) ->
+ lists:reverse(Acc);
+prefixes(Source, N, Acc) ->
+ {X, _} = lists:split(N, Source),
+ prefixes(Source, N+1, [X | Acc]).
+
+run_log(InitState, Entries) ->
+ lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) ->
+ case rabbit_fifo:apply(meta(Idx), E, Efx0, Acc0) of
+ {Acc, Efx, _} ->
+ {Acc, Efx}
+ end
+ end, {InitState, []}, Entries).
+
+test_init(Name) ->
+ rabbit_fifo:init(#{name => Name,
+ shadow_copy_interval => 0,
+ metrics_handler => {?MODULE, metrics_handler, []}}).
+meta(Idx) ->
+ #{index => Idx, term => 1}.