summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2018-12-05 00:26:34 +0300
committerMichael Klishin <michael@clojurewerkz.org>2018-12-05 00:26:34 +0300
commitc6a46e36c2634fedb8eed07ed69e942907ac0f28 (patch)
tree0630237b7a951b11758a6c38101d5e8a3e884353
parent36e5a0fa2164b810ee11edd5a2d078083c4d0575 (diff)
parentdb888df2a9156fefda626f57cbb9d9591e27d41f (diff)
downloadrabbitmq-server-git-c6a46e36c2634fedb8eed07ed69e942907ac0f28.tar.gz
Merge branch 'master' into dialyze-qq
Conflicts: test/quorum_queue_SUITE.erl
-rw-r--r--README.md10
-rw-r--r--docs/rabbitmq.conf.example9
-rw-r--r--priv/schema/rabbit.schema10
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl63
-rw-r--r--src/rabbit_basic.erl10
-rw-r--r--src/rabbit_channel.erl7
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_quorum_queue.erl8
-rw-r--r--test/config_schema_SUITE_data/rabbit.snippets15
-rw-r--r--test/quorum_queue_SUITE.erl96
11 files changed, 193 insertions, 43 deletions
diff --git a/README.md b/README.md
index 3f07284d60..2267a853c9 100644
--- a/README.md
+++ b/README.md
@@ -16,10 +16,12 @@
## Tutorials & Documentation
* [RabbitMQ tutorials](https://rabbitmq.com/getstarted.html)
- * [Documentation guides](https://rabbitmq.com/documentation.html)
- * [Documentation Source Code](https://github.com/rabbitmq/rabbitmq-website/)
+ * [All documentation guides](https://rabbitmq.com/documentation.html)
+ * [Documentation source](https://github.com/rabbitmq/rabbitmq-website/)
* [Client libraries and tools](https://rabbitmq.com/devtools.html)
- * [Tutorials Source Code](https://github.com/rabbitmq/rabbitmq-tutorials/)
+ * [Monitoring guide](https://rabbitmq.com/monitoring.html)
+ * [Production checklist](https://rabbitmq.com/production-checklist.html)
+ * [Runnable tutorials](https://github.com/rabbitmq/rabbitmq-tutorials/)
## Getting Help
@@ -33,6 +35,8 @@
See [CONTRIBUTING.md](./CONTRIBUTING.md) and our [development process overview](https://rabbitmq.com/github.html).
+Questions about contributing, internals and so on are very welcome on the [mailing list](https://groups.google.com/forum/#!forum/rabbitmq-users).
+
## Licensing
diff --git a/docs/rabbitmq.conf.example b/docs/rabbitmq.conf.example
index 2a373b2eb3..a62ed38291 100644
--- a/docs/rabbitmq.conf.example
+++ b/docs/rabbitmq.conf.example
@@ -503,8 +503,17 @@
# Kernel section
# ======================================
+## Timeout used to detect peer unavailability, including CLI tools.
+## Related doc guide: https://www.rabbitmq.com/nettick.html.
+##
# net_ticktime = 60
+## Inter-node communication port range.
+## Related doc guide: https://www.rabbitmq.com/networking.html#epmd-inet-dist-port-range.
+##
+# inet_dist_listen_min = 25672
+# inet_dist_listen_max = 25692
+
## ----------------------------------------------------------------------------
## RabbitMQ Management Plugin
##
diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema
index 9cee795296..5c6078a413 100644
--- a/priv/schema/rabbit.schema
+++ b/priv/schema/rabbit.schema
@@ -1342,6 +1342,16 @@ end}.
{validators, ["non_zero_positive_integer"]}
]}.
+{mapping, "inet_dist_listen_min", "kernel.inet_dist_listen_min",[
+ {datatype, [integer]},
+ {validators, ["non_zero_positive_integer"]}
+]}.
+
+{mapping, "inet_dist_listen_max", "kernel.inet_dist_listen_max",[
+ {datatype, [integer]},
+ {validators, ["non_zero_positive_integer"]}
+]}.
+
% ===============================
% Validators
% ===============================
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f4876356ee..ee61a50a63 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -272,7 +272,7 @@ filter_resource_per_type(Resources) ->
{ok, #amqqueue{pid = QPid}} = lookup(Resource),
{Resource, QPid}
end || Resource <- Resources],
- lists:partition(fun({Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues).
+ lists:partition(fun({_Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues).
stop(VHost) ->
%% Classic queues
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c2dab3da6f..52925ce165 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -658,44 +658,53 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
State#q{consumers = Consumers})}
end.
-maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) ->
+maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
+ Delivered,
+ State = #q{overflow = Overflow,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
send_mandatory(Delivery), %% must do this before confirms
case {will_overflow(Delivery, State), Overflow} of
{true, 'reject-publish'} ->
%% Drop publish and nack to publisher
send_reject_publish(Delivery, Delivered, State);
_ ->
- %% Enqueue and maybe drop head later
- deliver_or_enqueue(Delivery, Delivered, State)
+ {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
+ State1 = State#q{backing_queue_state = BQS1},
+ case IsDuplicate of
+ true -> State1;
+ {true, drop} -> State1;
+ %% Drop publish and nack to publisher
+ {true, reject} ->
+ send_reject_publish(Delivery, Delivered, State1);
+ %% Enqueue and maybe drop head later
+ false ->
+ deliver_or_enqueue(Delivery, Delivered, State1)
+ end
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
sender = SenderPid,
flow = Flow},
- Delivered, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ Delivered,
+ State = #q{backing_queue = BQ}) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State1),
- {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
- State2 = State1#q{backing_queue_state = BQS1},
- case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
- State2) of
- true ->
+ case attempt_delivery(Delivery, Props, Delivered, State1) of
+ {delivered, State2} ->
State2;
- {delivered, State3} ->
- State3;
%% The next one is an optimisation
- {undelivered, State3 = #q{ttl = 0, dlx = undefined,
- backing_queue_state = BQS2,
+ {undelivered, State2 = #q{ttl = 0, dlx = undefined,
+ backing_queue_state = BQS,
msg_id_to_channel = MTC}} ->
- {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
- State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
- {undelivered, State3 = #q{backing_queue_state = BQS2}} ->
-
- BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
- {Dropped, State4 = #q{backing_queue_state = BQS4}} =
- maybe_drop_head(State3#q{backing_queue_state = BQS3}),
- QLen = BQ:len(BQS4),
+ {BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC),
+ State2#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1};
+ {undelivered, State2 = #q{backing_queue_state = BQS}} ->
+
+ BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS),
+ {Dropped, State3 = #q{backing_queue_state = BQS2}} =
+ maybe_drop_head(State2#q{backing_queue_state = BQS1}),
+ QLen = BQ:len(BQS2),
%% optimisation: it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
%% we only do that if a new message that might have an
@@ -704,9 +713,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
%% has no expiry and becomes the head of the queue then
%% the call is unnecessary.
case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
- {false, false, _} -> State4;
- {true, true, undefined} -> State4;
- {_, _, _} -> drop_expired_msgs(State4)
+ {false, false, _} -> State3;
+ {true, true, undefined} -> State3;
+ {_, _, _} -> drop_expired_msgs(State3)
end
end.
@@ -1610,7 +1619,3 @@ update_ha_mode(State) ->
{ok, Q} = rabbit_amqqueue:lookup(qname(State)),
ok = rabbit_mirror_queue_misc:update_mirrors(Q),
State.
-
-
-
-
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index d474e9cad3..22ceefb85f 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -23,6 +23,7 @@
extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
header_routes/1, parse_expiration/1, header/2, header/3]).
-export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]).
+-export([add_header/4]).
%%----------------------------------------------------------------------------
@@ -300,3 +301,12 @@ maybe_gc_large_msg(Content) ->
msg_size(Content) ->
rabbit_writer:msg_size(Content).
+
+add_header(Name, Type, Value, #basic_message{content = Content0} = Msg) ->
+ Content = rabbit_basic:map_headers(
+ fun(undefined) ->
+ rabbit_misc:set_table_value([], Name, Type, Value);
+ (Headers) ->
+ rabbit_misc:set_table_value(Headers, Name, Type, Value)
+ end, Content0),
+ Msg#basic_message{content = Content}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 1b74b655f5..bc273bf100 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -649,8 +649,9 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
State = lists:foldl(
fun({MsgId, {MsgHeader, Msg}}, Acc) ->
IsDelivered = maps:is_key(delivery_count, MsgHeader),
+ Msg1 = add_delivery_count_header(MsgHeader, Msg),
handle_deliver(CTag, AckRequired,
- {QName, From, MsgId, IsDelivered, Msg},
+ {QName, From, MsgId, IsDelivered, Msg1},
Acc)
end, State0#ch{queue_states = maps:put(Name, QState2, QueueStates)}, Msgs),
noreply(State);
@@ -2488,3 +2489,7 @@ maybe_monitor(_, QMons) ->
maybe_monitor_all([], S) -> S; %% optimisation
maybe_monitor_all([Item], S) -> maybe_monitor(Item, S); %% optimisation
maybe_monitor_all(Items, S) -> lists:foldl(fun maybe_monitor/2, S, Items).
+
+add_delivery_count_header(MsgHeader, Msg) ->
+ Count = maps:get(delivery_count, MsgHeader, 0),
+ rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index a3050c570f..04353423cc 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -462,7 +462,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% immediately after calling is_duplicate). The msg is
%% invalid. We will not see this again, nor will we be
%% further involved in confirming this message, so erase.
- {true, State #state { seen_status = maps:remove(MsgId, SS) }};
+ {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS) }};
{ok, Disposition}
when Disposition =:= confirmed
%% It got published when we were a slave via gm, and
@@ -477,8 +477,8 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% Message was discarded while we were a slave. Confirm now.
%% As above, amqqueue_process will have the entry for the
%% msg_id_to_channel mapping.
- {true, State #state { seen_status = maps:remove(MsgId, SS),
- confirmed = [MsgId | Confirmed] }}
+ {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS),
+ confirmed = [MsgId | Confirmed] }}
end.
set_queue_mode(Mode, State = #state { gm = GM,
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 61c4858f40..1d82ab6a9b 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -99,7 +99,7 @@ init_state({Name, _}, QName) ->
{ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit),
%% This lookup could potentially return an {error, not_found}, but we do not
%% know what to do if the queue has `disappeared`. Let it crash.
- {ok, #amqqueue{pid = Leader, quorum_nodes = Nodes0}} =
+ {ok, #amqqueue{pid = Leader, quorum_nodes = Nodes}} =
rabbit_amqqueue:lookup(QName),
%% Ensure the leader is listed first
Servers0 = [{Name, N} || N <- Nodes],
@@ -336,8 +336,10 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of
{ok, empty, QState} ->
{ok, empty, QState};
- {ok, {MsgId, {MsgHeader, Msg}}, QState} ->
- IsDelivered = maps:is_key(delivery_count, MsgHeader),
+ {ok, {MsgId, {MsgHeader, Msg0}}, QState} ->
+ Count = maps:get(delivery_count, MsgHeader, 0),
+ IsDelivered = Count > 0,
+ Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0),
{ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState};
{timeout, _} ->
{error, timeout}
diff --git a/test/config_schema_SUITE_data/rabbit.snippets b/test/config_schema_SUITE_data/rabbit.snippets
index 625fcd93a9..b318adaa12 100644
--- a/test/config_schema_SUITE_data/rabbit.snippets
+++ b/test/config_schema_SUITE_data/rabbit.snippets
@@ -568,12 +568,27 @@ credential_validator.regexp = ^abc\\d+",
{delegate_count, 64}
]}],
[]},
+
{kernel_net_ticktime,
"net_ticktime = 20",
[{kernel, [
{net_ticktime, 20}
]}],
[]},
+
+ {kernel_inet_dist_listen_min,
+ "inet_dist_listen_min = 16000",
+ [{kernel, [
+ {inet_dist_listen_min, 16000}
+ ]}],
+ []},
+ {kernel_inet_dist_listen_max,
+ "inet_dist_listen_max = 16100",
+ [{kernel, [
+ {inet_dist_listen_max, 16100}
+ ]}],
+ []},
+
{log_syslog_settings,
"log.syslog = true
log.syslog.identity = rabbitmq
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 14a3650a87..e949f7cc1a 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -115,7 +115,9 @@ all_tests() ->
idempotent_recover,
vhost_with_quorum_queue_is_deleted,
delete_immediately,
- delete_immediately_by_resource
+ delete_immediately_by_resource,
+ consume_redelivery_count,
+ subscribe_redelivery_count
].
%% -------------------------------------------------------------------
@@ -1845,12 +1847,10 @@ delete_immediately(Config) ->
delete_immediately_by_resource(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
-
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
Cmd2 = ["eval", "rabbit_amqqueue:delete_immediately_by_resource([rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)])."],
?assertEqual({ok, "ok\n"}, rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd2)),
@@ -1861,6 +1861,96 @@ delete_immediately_by_resource(Config) ->
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))).
+subscribe_redelivery_count(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+
+ DTag = <<"x-delivery-count">>,
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{props = #'P_basic'{headers = H0}}} ->
+ ?assertMatch({DTag, _, 0}, rabbit_basic:header(DTag, H0)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true})
+ end,
+
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H1}}} ->
+ ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
+ multiple = false,
+ requeue = true})
+ end,
+
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag2,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H2}}} ->
+ ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2,
+ multiple = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+consume_redelivery_count(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+
+ DTag = <<"x-delivery-count">>,
+
+ {#'basic.get_ok'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{props = #'P_basic'{headers = H0}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = false}),
+ ?assertMatch({DTag, _, 0}, rabbit_basic:header(DTag, H0)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+
+ {#'basic.get_ok'{delivery_tag = DeliveryTag1,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H1}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = false}),
+ ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
+ multiple = false,
+ requeue = true}),
+
+ {#'basic.get_ok'{delivery_tag = DeliveryTag2,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H2}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = false}),
+ ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2,
+ multiple = false,
+ requeue = true}).
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->