diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-09-26 16:33:42 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-09-26 16:33:42 +0100 |
| commit | 54fd0701e1e2fcba4c2aa7def75622f641f2e23c (patch) | |
| tree | 7646262122cc0ade44ebef8d6b727f45134cabc2 | |
| parent | b8bd0e50ccd97d1e115ada92f2ca50a8e4432c9f (diff) | |
| download | rabbitmq-server-git-54fd0701e1e2fcba4c2aa7def75622f641f2e23c.tar.gz | |
Mark enqueued-on-promotion messages as redelivered
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 26 |
5 files changed, 40 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0e3f0bac1b..d081056406 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -78,7 +78,7 @@ -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(init_with_backing_queue_state/8 :: (rabbit_types:amqqueue(), atom(), tuple(), any(), [any()], - [rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}). + [{rabbit_types:delivery(), boolean()}], pmon:pmon(), dict()) -> #q{}). -endif. @@ -170,7 +170,9 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, rabbit_event:init_stats_timer( State, #q.stats_timer))), lists:foldl( - fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end, + fun ({Delivery, Redelivered}, StateN) -> + deliver_or_enqueue(Delivery, Redelivered, StateN) + end, State1, Deliveries). terminate(shutdown = R, State = #q{backing_queue = BQ}) -> @@ -535,6 +537,7 @@ run_message_queue(State) -> State2. attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, + Redelivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> @@ -544,7 +547,7 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, {AckTag, BQS3} = BQ:publish_delivered( AckRequired, Message, Props, SenderPid, BQS2), - {{Message, false, AckTag}, true, + {{Message, Redelivered, AckTag}, true, State1#q{backing_queue_state = BQS3}} end, false, State#q{backing_queue_state = BQS1}); {Duplicate, BQS1} -> @@ -560,9 +563,10 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, end. deliver_or_enqueue(Delivery = #delivery{message = Message, - sender = SenderPid}, State) -> + sender = SenderPid}, Redelivered, + State) -> Confirm = should_confirm_message(Delivery, State), - case attempt_delivery(Delivery, Confirm, State) of + case attempt_delivery(Delivery, Confirm, Redelivered, State) of {true, State1} -> maybe_record_confirm_message(Confirm, State1); %% the next one is an optimisations @@ -573,7 +577,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = maybe_record_confirm_message(Confirm, State1), Props = message_properties(Confirm, State2), - BQS1 = BQ:publish(Message, Props, SenderPid, BQS), + BQS1 = BQ:publish(Message, Props, SenderPid, Redelivered, BQS), ensure_ttl_timer(Props#message_properties.expiry, State2#q{backing_queue_state = BQS1}) end. @@ -1032,7 +1036,7 @@ handle_call(consumers, _From, State) -> handle_call({deliver, Delivery}, From, State) -> %% Synchronous, "mandatory" deliver mode. gen_server2:reply(From, ok), - noreply(deliver_or_enqueue(Delivery, State)); + noreply(deliver_or_enqueue(Delivery, false, State)); handle_call({notify_down, ChPid}, From, State) -> %% we want to do this synchronously, so that auto_deleted queues @@ -1184,7 +1188,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, noflow -> Senders end, State1 = State#q{senders = Senders1}, - noreply(deliver_or_enqueue(Delivery, State1)); + noreply(deliver_or_enqueue(Delivery, false, State1)); handle_cast({ack, AckTags, ChPid}, State) -> noreply(subtract_acks( diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index d69a6c3b98..9510ae2382 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -77,7 +77,8 @@ %% Publish a message. -callback publish(rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) -> + rabbit_types:message_properties(), pid(), boolean(), + state()) -> state(). %% Called for messages which have already been passed straight @@ -212,7 +213,7 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, - {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, + {delete_and_terminate, 2}, {purge, 1}, {publish, 5}, {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3}, {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index c11a8ff7c0..9e98c72666 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,7 +17,7 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, + purge/1, publish/5, publish_delivered/5, fetch/2, ack/2, requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, @@ -153,14 +153,14 @@ purge(State = #state { gm = GM, {Count, State #state { backing_queue_state = BQS1, set_delivered = 0 }}. -publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, +publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, Redelivered, State = #state { gm = GM, seen_status = SS, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}), - BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg, Redelivered}), + BQS1 = BQ:publish(Msg, MsgProps, ChPid, Redelivered, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, @@ -174,7 +174,7 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, %% all slaves are forced to interpret this publish_delivered at %% the same point, especially if we die and a slave is promoted. ok = gm:confirmed_broadcast( - GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}), + GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg, false}), {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), AM1 = maybe_store_acktag(AckTag, MsgId, AM), diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 039b274908..b11cd1997c 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -536,8 +536,10 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, end, gb_trees:empty(), MSList), NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)], AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], - Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), + Deliveries = [{Delivery, true} || + {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), Delivery <- queue:to_list(PubQ)], + rabbit_log:warning("Promotion deliveries: ~p~n", [Deliveries]), QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( Q1, rabbit_mirror_queue_master, MasterState, RateTRef, AckTags, Deliveries, KS, MTC), @@ -693,7 +695,8 @@ remove_from_pending_ch(MsgId, ChPid, SQ) -> end. process_instruction( - {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, + {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }, + Redelivered}, State = #state { sender_queues = SQ, backing_queue = BQ, backing_queue_state = BQS, @@ -743,9 +746,10 @@ process_instruction( {ok, case Deliver of false -> - BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + BQS1 = BQ:publish(Msg, MsgProps, ChPid, Redelivered, BQS), State2 #state { backing_queue_state = BQS1 }; {true, AckRequired} -> + false = Redelivered, %% master:publish_delivered/5 only sends this {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), maybe_store_ack(AckRequired, MsgId, AckTag, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 98c4571736..59c0bbebf5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -17,7 +17,7 @@ -module(rabbit_variable_queue). -export([init/3, terminate/2, delete_and_terminate/2, purge/1, - publish/4, publish_delivered/5, drain_confirmed/1, + publish/5, publish_delivered/5, drain_confirmed/1, dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, @@ -521,16 +521,16 @@ purge(State = #vqstate { q4 = Q4, publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, - next_seq_id = SeqId, - len = Len, - in_counter = InCount, - persistent_count = PCount, - durable = IsDurable, - ram_msg_count = RamMsgCount, - unconfirmed = UC }) -> + _ChPid, Redelivered, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, + next_seq_id = SeqId, + len = Len, + in_counter = InCount, + persistent_count = PCount, + durable = IsDurable, + ram_msg_count = RamMsgCount, + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps), + MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps, Redelivered), {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case ?QUEUE:is_empty(Q3) of false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; @@ -566,7 +566,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, durable = IsDurable, unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps, false)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), @@ -874,9 +874,9 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set; gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId }, - MsgProps) -> + MsgProps, Redelivered) -> #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg, - is_persistent = IsPersistent, is_delivered = false, + is_persistent = IsPersistent, is_delivered = Redelivered, msg_on_disk = false, index_on_disk = false, msg_props = MsgProps }. |
