diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2018-12-05 00:26:34 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2018-12-05 00:26:34 +0300 |
| commit | c6a46e36c2634fedb8eed07ed69e942907ac0f28 (patch) | |
| tree | 0630237b7a951b11758a6c38101d5e8a3e884353 /src | |
| parent | 36e5a0fa2164b810ee11edd5a2d078083c4d0575 (diff) | |
| parent | db888df2a9156fefda626f57cbb9d9591e27d41f (diff) | |
| download | rabbitmq-server-git-c6a46e36c2634fedb8eed07ed69e942907ac0f28.tar.gz | |
Merge branch 'master' into dialyze-qq
Conflicts:
test/quorum_queue_SUITE.erl
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 63 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 8 |
6 files changed, 59 insertions, 37 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f4876356ee..ee61a50a63 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -272,7 +272,7 @@ filter_resource_per_type(Resources) -> {ok, #amqqueue{pid = QPid}} = lookup(Resource), {Resource, QPid} end || Resource <- Resources], - lists:partition(fun({Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues). + lists:partition(fun({_Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues). stop(VHost) -> %% Classic queues diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c2dab3da6f..52925ce165 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -658,44 +658,53 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, State#q{consumers = Consumers})} end. -maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) -> +maybe_deliver_or_enqueue(Delivery = #delivery{message = Message}, + Delivered, + State = #q{overflow = Overflow, + backing_queue = BQ, + backing_queue_state = BQS}) -> send_mandatory(Delivery), %% must do this before confirms case {will_overflow(Delivery, State), Overflow} of {true, 'reject-publish'} -> %% Drop publish and nack to publisher send_reject_publish(Delivery, Delivered, State); _ -> - %% Enqueue and maybe drop head later - deliver_or_enqueue(Delivery, Delivered, State) + {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), + State1 = State#q{backing_queue_state = BQS1}, + case IsDuplicate of + true -> State1; + {true, drop} -> State1; + %% Drop publish and nack to publisher + {true, reject} -> + send_reject_publish(Delivery, Delivered, State1); + %% Enqueue and maybe drop head later + false -> + deliver_or_enqueue(Delivery, Delivered, State1) + end end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid, flow = Flow}, - Delivered, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + Delivered, + State = #q{backing_queue = BQ}) -> {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Message, Confirm, State1), - {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), - State2 = State1#q{backing_queue_state = BQS1}, - case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered, - State2) of - true -> + case attempt_delivery(Delivery, Props, Delivered, State1) of + {delivered, State2} -> State2; - {delivered, State3} -> - State3; %% The next one is an optimisation - {undelivered, State3 = #q{ttl = 0, dlx = undefined, - backing_queue_state = BQS2, + {undelivered, State2 = #q{ttl = 0, dlx = undefined, + backing_queue_state = BQS, msg_id_to_channel = MTC}} -> - {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC), - State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1}; - {undelivered, State3 = #q{backing_queue_state = BQS2}} -> - - BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2), - {Dropped, State4 = #q{backing_queue_state = BQS4}} = - maybe_drop_head(State3#q{backing_queue_state = BQS3}), - QLen = BQ:len(BQS4), + {BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC), + State2#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1}; + {undelivered, State2 = #q{backing_queue_state = BQS}} -> + + BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS), + {Dropped, State3 = #q{backing_queue_state = BQS2}} = + maybe_drop_head(State2#q{backing_queue_state = BQS1}), + QLen = BQ:len(BQS2), %% optimisation: it would be perfectly safe to always %% invoke drop_expired_msgs here, but that is expensive so %% we only do that if a new message that might have an @@ -704,9 +713,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, %% has no expiry and becomes the head of the queue then %% the call is unnecessary. case {Dropped, QLen =:= 1, Props#message_properties.expiry} of - {false, false, _} -> State4; - {true, true, undefined} -> State4; - {_, _, _} -> drop_expired_msgs(State4) + {false, false, _} -> State3; + {true, true, undefined} -> State3; + {_, _, _} -> drop_expired_msgs(State3) end end. @@ -1610,7 +1619,3 @@ update_ha_mode(State) -> {ok, Q} = rabbit_amqqueue:lookup(qname(State)), ok = rabbit_mirror_queue_misc:update_mirrors(Q), State. - - - - diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index d474e9cad3..22ceefb85f 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -23,6 +23,7 @@ extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4, header_routes/1, parse_expiration/1, header/2, header/3]). -export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]). +-export([add_header/4]). %%---------------------------------------------------------------------------- @@ -300,3 +301,12 @@ maybe_gc_large_msg(Content) -> msg_size(Content) -> rabbit_writer:msg_size(Content). + +add_header(Name, Type, Value, #basic_message{content = Content0} = Msg) -> + Content = rabbit_basic:map_headers( + fun(undefined) -> + rabbit_misc:set_table_value([], Name, Type, Value); + (Headers) -> + rabbit_misc:set_table_value(Headers, Name, Type, Value) + end, Content0), + Msg#basic_message{content = Content}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 1b74b655f5..bc273bf100 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -649,8 +649,9 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, State = lists:foldl( fun({MsgId, {MsgHeader, Msg}}, Acc) -> IsDelivered = maps:is_key(delivery_count, MsgHeader), + Msg1 = add_delivery_count_header(MsgHeader, Msg), handle_deliver(CTag, AckRequired, - {QName, From, MsgId, IsDelivered, Msg}, + {QName, From, MsgId, IsDelivered, Msg1}, Acc) end, State0#ch{queue_states = maps:put(Name, QState2, QueueStates)}, Msgs), noreply(State); @@ -2488,3 +2489,7 @@ maybe_monitor(_, QMons) -> maybe_monitor_all([], S) -> S; %% optimisation maybe_monitor_all([Item], S) -> maybe_monitor(Item, S); %% optimisation maybe_monitor_all(Items, S) -> lists:foldl(fun maybe_monitor/2, S, Items). + +add_delivery_count_header(MsgHeader, Msg) -> + Count = maps:get(delivery_count, MsgHeader, 0), + rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index a3050c570f..04353423cc 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -462,7 +462,7 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% immediately after calling is_duplicate). The msg is %% invalid. We will not see this again, nor will we be %% further involved in confirming this message, so erase. - {true, State #state { seen_status = maps:remove(MsgId, SS) }}; + {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS) }}; {ok, Disposition} when Disposition =:= confirmed %% It got published when we were a slave via gm, and @@ -477,8 +477,8 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% Message was discarded while we were a slave. Confirm now. %% As above, amqqueue_process will have the entry for the %% msg_id_to_channel mapping. - {true, State #state { seen_status = maps:remove(MsgId, SS), - confirmed = [MsgId | Confirmed] }} + {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS), + confirmed = [MsgId | Confirmed] }} end. set_queue_mode(Mode, State = #state { gm = GM, diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 61c4858f40..1d82ab6a9b 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -99,7 +99,7 @@ init_state({Name, _}, QName) -> {ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit), %% This lookup could potentially return an {error, not_found}, but we do not %% know what to do if the queue has `disappeared`. Let it crash. - {ok, #amqqueue{pid = Leader, quorum_nodes = Nodes0}} = + {ok, #amqqueue{pid = Leader, quorum_nodes = Nodes}} = rabbit_amqqueue:lookup(QName), %% Ensure the leader is listed first Servers0 = [{Name, N} || N <- Nodes], @@ -336,8 +336,10 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of {ok, empty, QState} -> {ok, empty, QState}; - {ok, {MsgId, {MsgHeader, Msg}}, QState} -> - IsDelivered = maps:is_key(delivery_count, MsgHeader), + {ok, {MsgId, {MsgHeader, Msg0}}, QState} -> + Count = maps:get(delivery_count, MsgHeader, 0), + IsDelivered = Count > 0, + Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0), {ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState}; {timeout, _} -> {error, timeout} |
