diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-09-26 18:09:06 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-09-26 18:09:06 +0100 |
| commit | 1fc1f219f7b966f486c580fbcd2fd0da396fc3d7 (patch) | |
| tree | 68f803ce4c96fa53fb540b1e736883a6f0939376 /src | |
| parent | 54fd0701e1e2fcba4c2aa7def75622f641f2e23c (diff) | |
| download | rabbitmq-server-git-1fc1f219f7b966f486c580fbcd2fd0da396fc3d7.tar.gz | |
Stick redelivered in #message_properties, which makes the diff rather smaller.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 25 |
5 files changed, 31 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d081056406..a6f494739a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -536,19 +536,17 @@ run_message_queue(State) -> BQ:is_empty(BQS), State1), State2. -attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, - Redelivered, +attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> deliver_msgs_to_consumers( fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> - Props = message_properties(Confirm, State1), {AckTag, BQS3} = BQ:publish_delivered( AckRequired, Message, Props, SenderPid, BQS2), - {{Message, Redelivered, AckTag}, true, - State1#q{backing_queue_state = BQS3}} + {{Message, Props#message_properties.redelivered, AckTag}, + true, State1#q{backing_queue_state = BQS3}} end, false, State#q{backing_queue_state = BQS1}); {Duplicate, BQS1} -> %% if the message has previously been seen by the BQ then @@ -566,7 +564,8 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, Redelivered, State) -> Confirm = should_confirm_message(Delivery, State), - case attempt_delivery(Delivery, Confirm, Redelivered, State) of + Props = message_properties(Confirm, Redelivered, State), + case attempt_delivery(Delivery, Props, State) of {true, State1} -> maybe_record_confirm_message(Confirm, State1); %% the next one is an optimisations @@ -576,8 +575,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, {false, State1} -> State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = maybe_record_confirm_message(Confirm, State1), - Props = message_properties(Confirm, State2), - BQS1 = BQ:publish(Message, Props, SenderPid, Redelivered, BQS), + BQS1 = BQ:publish(Message, Props, SenderPid, BQS), ensure_ttl_timer(Props#message_properties.expiry, State2#q{backing_queue_state = BQS1}) end. @@ -706,9 +704,10 @@ discard_delivery(#delivery{sender = SenderPid, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}. -message_properties(Confirm, #q{ttl = TTL}) -> +message_properties(Confirm, Redelivered, #q{ttl = TTL}) -> #message_properties{expiry = calculate_msg_expiry(TTL), - needs_confirming = needs_confirming(Confirm)}. + needs_confirming = needs_confirming(Confirm), + redelivered = Redelivered}. calculate_msg_expiry(undefined) -> undefined; calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 9510ae2382..d69a6c3b98 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -77,8 +77,7 @@ %% Publish a message. -callback publish(rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), boolean(), - state()) -> + rabbit_types:message_properties(), pid(), state()) -> state(). %% Called for messages which have already been passed straight @@ -213,7 +212,7 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, - {delete_and_terminate, 2}, {purge, 1}, {publish, 5}, + {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3}, {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 9e98c72666..c11a8ff7c0 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,7 +17,7 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/5, publish_delivered/5, fetch/2, ack/2, + purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, @@ -153,14 +153,14 @@ purge(State = #state { gm = GM, {Count, State #state { backing_queue_state = BQS1, set_delivered = 0 }}. -publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, Redelivered, +publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, State = #state { gm = GM, seen_status = SS, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg, Redelivered}), - BQS1 = BQ:publish(Msg, MsgProps, ChPid, Redelivered, BQS), + ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}), + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, @@ -174,7 +174,7 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, %% all slaves are forced to interpret this publish_delivered at %% the same point, especially if we die and a slave is promoted. ok = gm:confirmed_broadcast( - GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg, false}), + GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}), {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), AM1 = maybe_store_acktag(AckTag, MsgId, AM), diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index b11cd1997c..1e10e8a93e 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -539,7 +539,6 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, Deliveries = [{Delivery, true} || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), Delivery <- queue:to_list(PubQ)], - rabbit_log:warning("Promotion deliveries: ~p~n", [Deliveries]), QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( Q1, rabbit_mirror_queue_master, MasterState, RateTRef, AckTags, Deliveries, KS, MTC), @@ -695,8 +694,7 @@ remove_from_pending_ch(MsgId, ChPid, SQ) -> end. process_instruction( - {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }, - Redelivered}, + {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, State = #state { sender_queues = SQ, backing_queue = BQ, backing_queue_state = BQS, @@ -746,10 +744,9 @@ process_instruction( {ok, case Deliver of false -> - BQS1 = BQ:publish(Msg, MsgProps, ChPid, Redelivered, BQS), + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), State2 #state { backing_queue_state = BQS1 }; {true, AckRequired} -> - false = Redelivered, %% master:publish_delivered/5 only sends this {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), maybe_store_ack(AckRequired, MsgId, AckTag, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 59c0bbebf5..7af2927941 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -17,7 +17,7 @@ -module(rabbit_variable_queue). -export([init/3, terminate/2, delete_and_terminate/2, purge/1, - publish/5, publish_delivered/5, drain_confirmed/1, + publish/4, publish_delivered/5, drain_confirmed/1, dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, @@ -521,16 +521,16 @@ purge(State = #vqstate { q4 = Q4, publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - _ChPid, Redelivered, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, - next_seq_id = SeqId, - len = Len, - in_counter = InCount, - persistent_count = PCount, - durable = IsDurable, - ram_msg_count = RamMsgCount, - unconfirmed = UC }) -> + _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, + next_seq_id = SeqId, + len = Len, + in_counter = InCount, + persistent_count = PCount, + durable = IsDurable, + ram_msg_count = RamMsgCount, + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps, Redelivered), + MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps), {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case ?QUEUE:is_empty(Q3) of false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; @@ -566,7 +566,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, durable = IsDurable, unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps, false)) + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), @@ -874,7 +874,8 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set; gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId }, - MsgProps, Redelivered) -> + MsgProps = #message_properties{redelivered = Redelivered}) -> + %% TODO would it make sense to remove #msg_status.is_delivered? #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg, is_persistent = IsPersistent, is_delivered = Redelivered, msg_on_disk = false, index_on_disk = false, |
