summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-09-28 15:10:20 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-09-28 15:10:20 +0100
commita8883a13999beeab4cc415d920a7e788cbbad295 (patch)
tree5b3fb6dc0f49d6bc081f36ddcd957b5a9675e465 /src
parente15ac86a20f4daa235587b4fd8ca2fa663ac8804 (diff)
parenta4dbeee912a963f291c75e1b6d42523aa299734f (diff)
downloadrabbitmq-server-git-a8883a13999beeab4cc415d920a7e788cbbad295.tar.gz
Merge bug25179
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl29
-rw-r--r--src/rabbit_mirror_queue_master.erl5
-rw-r--r--src/rabbit_mirror_queue_slave.erl3
-rw-r--r--src/rabbit_variable_queue.erl5
4 files changed, 20 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0e3f0bac1b..10ac5bea60 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -169,9 +169,9 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
State1 = requeue_and_run(AckTags, process_args(
rabbit_event:init_stats_timer(
State, #q.stats_timer))),
- lists:foldl(
- fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end,
- State1, Deliveries).
+ lists:foldl(fun (Delivery, StateN) ->
+ deliver_or_enqueue(Delivery, true, StateN)
+ end, State1, Deliveries).
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
@@ -534,18 +534,17 @@ run_message_queue(State) ->
BQ:is_empty(BQS), State1),
State2.
-attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
+attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props,
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
deliver_msgs_to_consumers(
fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
- Props = message_properties(Confirm, State1),
{AckTag, BQS3} = BQ:publish_delivered(
AckRequired, Message, Props,
SenderPid, BQS2),
- {{Message, false, AckTag}, true,
- State1#q{backing_queue_state = BQS3}}
+ {{Message, Props#message_properties.delivered, AckTag},
+ true, State1#q{backing_queue_state = BQS3}}
end, false, State#q{backing_queue_state = BQS1});
{Duplicate, BQS1} ->
%% if the message has previously been seen by the BQ then
@@ -560,9 +559,11 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
- sender = SenderPid}, State) ->
+ sender = SenderPid}, Delivered,
+ State) ->
Confirm = should_confirm_message(Delivery, State),
- case attempt_delivery(Delivery, Confirm, State) of
+ Props = message_properties(Confirm, Delivered, State),
+ case attempt_delivery(Delivery, Props, State) of
{true, State1} ->
maybe_record_confirm_message(Confirm, State1);
%% the next one is an optimisations
@@ -572,7 +573,6 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
{false, State1} ->
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_record_confirm_message(Confirm, State1),
- Props = message_properties(Confirm, State2),
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
State2#q{backing_queue_state = BQS1})
@@ -702,9 +702,10 @@ discard_delivery(#delivery{sender = SenderPid,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}.
-message_properties(Confirm, #q{ttl = TTL}) ->
+message_properties(Confirm, Delivered, #q{ttl = TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL),
- needs_confirming = needs_confirming(Confirm)}.
+ needs_confirming = needs_confirming(Confirm),
+ delivered = Delivered}.
calculate_msg_expiry(undefined) -> undefined;
calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
@@ -1032,7 +1033,7 @@ handle_call(consumers, _From, State) ->
handle_call({deliver, Delivery}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
gen_server2:reply(From, ok),
- noreply(deliver_or_enqueue(Delivery, State));
+ noreply(deliver_or_enqueue(Delivery, false, State));
handle_call({notify_down, ChPid}, From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
@@ -1184,7 +1185,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
noflow -> Senders
end,
State1 = State#q{senders = Senders1},
- noreply(deliver_or_enqueue(Delivery, State1));
+ noreply(deliver_or_enqueue(Delivery, false, State1));
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(subtract_acks(
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 413898150c..4cfb3dcbfa 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -169,10 +169,7 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
backing_queue_state = BQS,
ack_msg_id = AM }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- %% Must use confirmed_broadcast here in order to guarantee that
- %% all slaves are forced to interpret this publish_delivered at
- %% the same point, especially if we die and a slave is promoted.
- ok = gm:confirmed_broadcast(
+ ok = gm:broadcast(
GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}),
{AckTag, BQS1} =
BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS),
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 039b274908..625bcdffba 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -457,8 +457,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids),
%% We find all the messages that we've received from channels but
- %% not from gm, and if they're due to be enqueued on promotion
- %% then we pass them to the
+ %% not from gm, and pass them to the
%% queue_process:init_with_backing_queue_state to be enqueued.
%%
%% We also have to requeue messages which are pending acks: the
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 98c4571736..68c659dfbe 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -874,9 +874,10 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set;
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
- MsgProps) ->
+ MsgProps = #message_properties{delivered = Delivered}) ->
+ %% TODO would it make sense to remove #msg_status.is_delivered?
#msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
- is_persistent = IsPersistent, is_delivered = false,
+ is_persistent = IsPersistent, is_delivered = Delivered,
msg_on_disk = false, index_on_disk = false,
msg_props = MsgProps }.