summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2018-12-05 00:26:34 +0300
committerMichael Klishin <michael@clojurewerkz.org>2018-12-05 00:26:34 +0300
commitc6a46e36c2634fedb8eed07ed69e942907ac0f28 (patch)
tree0630237b7a951b11758a6c38101d5e8a3e884353 /src
parent36e5a0fa2164b810ee11edd5a2d078083c4d0575 (diff)
parentdb888df2a9156fefda626f57cbb9d9591e27d41f (diff)
downloadrabbitmq-server-git-c6a46e36c2634fedb8eed07ed69e942907ac0f28.tar.gz
Merge branch 'master' into dialyze-qq
Conflicts: test/quorum_queue_SUITE.erl
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl63
-rw-r--r--src/rabbit_basic.erl10
-rw-r--r--src/rabbit_channel.erl7
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_quorum_queue.erl8
6 files changed, 59 insertions, 37 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f4876356ee..ee61a50a63 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -272,7 +272,7 @@ filter_resource_per_type(Resources) ->
{ok, #amqqueue{pid = QPid}} = lookup(Resource),
{Resource, QPid}
end || Resource <- Resources],
- lists:partition(fun({Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues).
+ lists:partition(fun({_Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues).
stop(VHost) ->
%% Classic queues
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c2dab3da6f..52925ce165 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -658,44 +658,53 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
State#q{consumers = Consumers})}
end.
-maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) ->
+maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
+ Delivered,
+ State = #q{overflow = Overflow,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
send_mandatory(Delivery), %% must do this before confirms
case {will_overflow(Delivery, State), Overflow} of
{true, 'reject-publish'} ->
%% Drop publish and nack to publisher
send_reject_publish(Delivery, Delivered, State);
_ ->
- %% Enqueue and maybe drop head later
- deliver_or_enqueue(Delivery, Delivered, State)
+ {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
+ State1 = State#q{backing_queue_state = BQS1},
+ case IsDuplicate of
+ true -> State1;
+ {true, drop} -> State1;
+ %% Drop publish and nack to publisher
+ {true, reject} ->
+ send_reject_publish(Delivery, Delivered, State1);
+ %% Enqueue and maybe drop head later
+ false ->
+ deliver_or_enqueue(Delivery, Delivered, State1)
+ end
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
sender = SenderPid,
flow = Flow},
- Delivered, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ Delivered,
+ State = #q{backing_queue = BQ}) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State1),
- {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
- State2 = State1#q{backing_queue_state = BQS1},
- case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
- State2) of
- true ->
+ case attempt_delivery(Delivery, Props, Delivered, State1) of
+ {delivered, State2} ->
State2;
- {delivered, State3} ->
- State3;
%% The next one is an optimisation
- {undelivered, State3 = #q{ttl = 0, dlx = undefined,
- backing_queue_state = BQS2,
+ {undelivered, State2 = #q{ttl = 0, dlx = undefined,
+ backing_queue_state = BQS,
msg_id_to_channel = MTC}} ->
- {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
- State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
- {undelivered, State3 = #q{backing_queue_state = BQS2}} ->
-
- BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
- {Dropped, State4 = #q{backing_queue_state = BQS4}} =
- maybe_drop_head(State3#q{backing_queue_state = BQS3}),
- QLen = BQ:len(BQS4),
+ {BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC),
+ State2#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1};
+ {undelivered, State2 = #q{backing_queue_state = BQS}} ->
+
+ BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS),
+ {Dropped, State3 = #q{backing_queue_state = BQS2}} =
+ maybe_drop_head(State2#q{backing_queue_state = BQS1}),
+ QLen = BQ:len(BQS2),
%% optimisation: it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
%% we only do that if a new message that might have an
@@ -704,9 +713,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
%% has no expiry and becomes the head of the queue then
%% the call is unnecessary.
case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
- {false, false, _} -> State4;
- {true, true, undefined} -> State4;
- {_, _, _} -> drop_expired_msgs(State4)
+ {false, false, _} -> State3;
+ {true, true, undefined} -> State3;
+ {_, _, _} -> drop_expired_msgs(State3)
end
end.
@@ -1610,7 +1619,3 @@ update_ha_mode(State) ->
{ok, Q} = rabbit_amqqueue:lookup(qname(State)),
ok = rabbit_mirror_queue_misc:update_mirrors(Q),
State.
-
-
-
-
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index d474e9cad3..22ceefb85f 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -23,6 +23,7 @@
extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
header_routes/1, parse_expiration/1, header/2, header/3]).
-export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]).
+-export([add_header/4]).
%%----------------------------------------------------------------------------
@@ -300,3 +301,12 @@ maybe_gc_large_msg(Content) ->
msg_size(Content) ->
rabbit_writer:msg_size(Content).
+
+add_header(Name, Type, Value, #basic_message{content = Content0} = Msg) ->
+ Content = rabbit_basic:map_headers(
+ fun(undefined) ->
+ rabbit_misc:set_table_value([], Name, Type, Value);
+ (Headers) ->
+ rabbit_misc:set_table_value(Headers, Name, Type, Value)
+ end, Content0),
+ Msg#basic_message{content = Content}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 1b74b655f5..bc273bf100 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -649,8 +649,9 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
State = lists:foldl(
fun({MsgId, {MsgHeader, Msg}}, Acc) ->
IsDelivered = maps:is_key(delivery_count, MsgHeader),
+ Msg1 = add_delivery_count_header(MsgHeader, Msg),
handle_deliver(CTag, AckRequired,
- {QName, From, MsgId, IsDelivered, Msg},
+ {QName, From, MsgId, IsDelivered, Msg1},
Acc)
end, State0#ch{queue_states = maps:put(Name, QState2, QueueStates)}, Msgs),
noreply(State);
@@ -2488,3 +2489,7 @@ maybe_monitor(_, QMons) ->
maybe_monitor_all([], S) -> S; %% optimisation
maybe_monitor_all([Item], S) -> maybe_monitor(Item, S); %% optimisation
maybe_monitor_all(Items, S) -> lists:foldl(fun maybe_monitor/2, S, Items).
+
+add_delivery_count_header(MsgHeader, Msg) ->
+ Count = maps:get(delivery_count, MsgHeader, 0),
+ rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index a3050c570f..04353423cc 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -462,7 +462,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% immediately after calling is_duplicate). The msg is
%% invalid. We will not see this again, nor will we be
%% further involved in confirming this message, so erase.
- {true, State #state { seen_status = maps:remove(MsgId, SS) }};
+ {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS) }};
{ok, Disposition}
when Disposition =:= confirmed
%% It got published when we were a slave via gm, and
@@ -477,8 +477,8 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% Message was discarded while we were a slave. Confirm now.
%% As above, amqqueue_process will have the entry for the
%% msg_id_to_channel mapping.
- {true, State #state { seen_status = maps:remove(MsgId, SS),
- confirmed = [MsgId | Confirmed] }}
+ {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS),
+ confirmed = [MsgId | Confirmed] }}
end.
set_queue_mode(Mode, State = #state { gm = GM,
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 61c4858f40..1d82ab6a9b 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -99,7 +99,7 @@ init_state({Name, _}, QName) ->
{ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit),
%% This lookup could potentially return an {error, not_found}, but we do not
%% know what to do if the queue has `disappeared`. Let it crash.
- {ok, #amqqueue{pid = Leader, quorum_nodes = Nodes0}} =
+ {ok, #amqqueue{pid = Leader, quorum_nodes = Nodes}} =
rabbit_amqqueue:lookup(QName),
%% Ensure the leader is listed first
Servers0 = [{Name, N} || N <- Nodes],
@@ -336,8 +336,10 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of
{ok, empty, QState} ->
{ok, empty, QState};
- {ok, {MsgId, {MsgHeader, Msg}}, QState} ->
- IsDelivered = maps:is_key(delivery_count, MsgHeader),
+ {ok, {MsgId, {MsgHeader, Msg0}}, QState} ->
+ Count = maps:get(delivery_count, MsgHeader, 0),
+ IsDelivered = Count > 0,
+ Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0),
{ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState};
{timeout, _} ->
{error, timeout}