diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-09-28 15:10:20 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-09-28 15:10:20 +0100 |
| commit | a8883a13999beeab4cc415d920a7e788cbbad295 (patch) | |
| tree | 5b3fb6dc0f49d6bc081f36ddcd957b5a9675e465 | |
| parent | e15ac86a20f4daa235587b4fd8ca2fa663ac8804 (diff) | |
| parent | a4dbeee912a963f291c75e1b6d42523aa299734f (diff) | |
| download | rabbitmq-server-git-a8883a13999beeab4cc415d920a7e788cbbad295.tar.gz | |
Merge bug25179
| -rw-r--r-- | include/rabbit.hrl | 3 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 5 |
5 files changed, 22 insertions, 23 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index fff922057d..f238958751 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -78,7 +78,8 @@ -record(event, {type, props, timestamp}). --record(message_properties, {expiry, needs_confirming = false}). +-record(message_properties, {expiry, needs_confirming = false, + delivered = false}). -record(plugin, {name, %% atom() version, %% string() diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0e3f0bac1b..10ac5bea60 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -169,9 +169,9 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, State1 = requeue_and_run(AckTags, process_args( rabbit_event:init_stats_timer( State, #q.stats_timer))), - lists:foldl( - fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end, - State1, Deliveries). + lists:foldl(fun (Delivery, StateN) -> + deliver_or_enqueue(Delivery, true, StateN) + end, State1, Deliveries). terminate(shutdown = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); @@ -534,18 +534,17 @@ run_message_queue(State) -> BQ:is_empty(BQS), State1), State2. -attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, +attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> deliver_msgs_to_consumers( fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> - Props = message_properties(Confirm, State1), {AckTag, BQS3} = BQ:publish_delivered( AckRequired, Message, Props, SenderPid, BQS2), - {{Message, false, AckTag}, true, - State1#q{backing_queue_state = BQS3}} + {{Message, Props#message_properties.delivered, AckTag}, + true, State1#q{backing_queue_state = BQS3}} end, false, State#q{backing_queue_state = BQS1}); {Duplicate, BQS1} -> %% if the message has previously been seen by the BQ then @@ -560,9 +559,11 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, end. deliver_or_enqueue(Delivery = #delivery{message = Message, - sender = SenderPid}, State) -> + sender = SenderPid}, Delivered, + State) -> Confirm = should_confirm_message(Delivery, State), - case attempt_delivery(Delivery, Confirm, State) of + Props = message_properties(Confirm, Delivered, State), + case attempt_delivery(Delivery, Props, State) of {true, State1} -> maybe_record_confirm_message(Confirm, State1); %% the next one is an optimisations @@ -572,7 +573,6 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, {false, State1} -> State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = maybe_record_confirm_message(Confirm, State1), - Props = message_properties(Confirm, State2), BQS1 = BQ:publish(Message, Props, SenderPid, BQS), ensure_ttl_timer(Props#message_properties.expiry, State2#q{backing_queue_state = BQS1}) @@ -702,9 +702,10 @@ discard_delivery(#delivery{sender = SenderPid, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}. -message_properties(Confirm, #q{ttl = TTL}) -> +message_properties(Confirm, Delivered, #q{ttl = TTL}) -> #message_properties{expiry = calculate_msg_expiry(TTL), - needs_confirming = needs_confirming(Confirm)}. + needs_confirming = needs_confirming(Confirm), + delivered = Delivered}. calculate_msg_expiry(undefined) -> undefined; calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). @@ -1032,7 +1033,7 @@ handle_call(consumers, _From, State) -> handle_call({deliver, Delivery}, From, State) -> %% Synchronous, "mandatory" deliver mode. gen_server2:reply(From, ok), - noreply(deliver_or_enqueue(Delivery, State)); + noreply(deliver_or_enqueue(Delivery, false, State)); handle_call({notify_down, ChPid}, From, State) -> %% we want to do this synchronously, so that auto_deleted queues @@ -1184,7 +1185,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, noflow -> Senders end, State1 = State#q{senders = Senders1}, - noreply(deliver_or_enqueue(Delivery, State1)); + noreply(deliver_or_enqueue(Delivery, false, State1)); handle_cast({ack, AckTags, ChPid}, State) -> noreply(subtract_acks( diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 413898150c..4cfb3dcbfa 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -169,10 +169,7 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, backing_queue_state = BQS, ack_msg_id = AM }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - %% Must use confirmed_broadcast here in order to guarantee that - %% all slaves are forced to interpret this publish_delivered at - %% the same point, especially if we die and a slave is promoted. - ok = gm:confirmed_broadcast( + ok = gm:broadcast( GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}), {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 039b274908..625bcdffba 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -457,8 +457,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids), %% We find all the messages that we've received from channels but - %% not from gm, and if they're due to be enqueued on promotion - %% then we pass them to the + %% not from gm, and pass them to the %% queue_process:init_with_backing_queue_state to be enqueued. %% %% We also have to requeue messages which are pending acks: the diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 98c4571736..68c659dfbe 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -874,9 +874,10 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set; gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId }, - MsgProps) -> + MsgProps = #message_properties{delivered = Delivered}) -> + %% TODO would it make sense to remove #msg_status.is_delivered? #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg, - is_persistent = IsPersistent, is_delivered = false, + is_persistent = IsPersistent, is_delivered = Delivered, msg_on_disk = false, index_on_disk = false, msg_props = MsgProps }. |
