summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-09-26 16:33:42 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-09-26 16:33:42 +0100
commit54fd0701e1e2fcba4c2aa7def75622f641f2e23c (patch)
tree7646262122cc0ade44ebef8d6b727f45134cabc2
parentb8bd0e50ccd97d1e115ada92f2ca50a8e4432c9f (diff)
downloadrabbitmq-server-git-54fd0701e1e2fcba4c2aa7def75622f641f2e23c.tar.gz
Mark enqueued-on-promotion messages as redelivered
-rw-r--r--src/rabbit_amqqueue_process.erl20
-rw-r--r--src/rabbit_backing_queue.erl5
-rw-r--r--src/rabbit_mirror_queue_master.erl10
-rw-r--r--src/rabbit_mirror_queue_slave.erl10
-rw-r--r--src/rabbit_variable_queue.erl26
5 files changed, 40 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0e3f0bac1b..d081056406 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -78,7 +78,7 @@
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(init_with_backing_queue_state/8 ::
(rabbit_types:amqqueue(), atom(), tuple(), any(), [any()],
- [rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}).
+ [{rabbit_types:delivery(), boolean()}], pmon:pmon(), dict()) -> #q{}).
-endif.
@@ -170,7 +170,9 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
rabbit_event:init_stats_timer(
State, #q.stats_timer))),
lists:foldl(
- fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end,
+ fun ({Delivery, Redelivered}, StateN) ->
+ deliver_or_enqueue(Delivery, Redelivered, StateN)
+ end,
State1, Deliveries).
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
@@ -535,6 +537,7 @@ run_message_queue(State) ->
State2.
attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
+ Redelivered,
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
@@ -544,7 +547,7 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
{AckTag, BQS3} = BQ:publish_delivered(
AckRequired, Message, Props,
SenderPid, BQS2),
- {{Message, false, AckTag}, true,
+ {{Message, Redelivered, AckTag}, true,
State1#q{backing_queue_state = BQS3}}
end, false, State#q{backing_queue_state = BQS1});
{Duplicate, BQS1} ->
@@ -560,9 +563,10 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
- sender = SenderPid}, State) ->
+ sender = SenderPid}, Redelivered,
+ State) ->
Confirm = should_confirm_message(Delivery, State),
- case attempt_delivery(Delivery, Confirm, State) of
+ case attempt_delivery(Delivery, Confirm, Redelivered, State) of
{true, State1} ->
maybe_record_confirm_message(Confirm, State1);
%% the next one is an optimisations
@@ -573,7 +577,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_record_confirm_message(Confirm, State1),
Props = message_properties(Confirm, State2),
- BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
+ BQS1 = BQ:publish(Message, Props, SenderPid, Redelivered, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
State2#q{backing_queue_state = BQS1})
end.
@@ -1032,7 +1036,7 @@ handle_call(consumers, _From, State) ->
handle_call({deliver, Delivery}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
gen_server2:reply(From, ok),
- noreply(deliver_or_enqueue(Delivery, State));
+ noreply(deliver_or_enqueue(Delivery, false, State));
handle_call({notify_down, ChPid}, From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
@@ -1184,7 +1188,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
noflow -> Senders
end,
State1 = State#q{senders = Senders1},
- noreply(deliver_or_enqueue(Delivery, State1));
+ noreply(deliver_or_enqueue(Delivery, false, State1));
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(subtract_acks(
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index d69a6c3b98..9510ae2382 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -77,7 +77,8 @@
%% Publish a message.
-callback publish(rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state()) ->
+ rabbit_types:message_properties(), pid(), boolean(),
+ state()) ->
state().
%% Called for messages which have already been passed straight
@@ -212,7 +213,7 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
- {delete_and_terminate, 2}, {purge, 1}, {publish, 4},
+ {delete_and_terminate, 2}, {purge, 1}, {publish, 5},
{publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3},
{fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index c11a8ff7c0..9e98c72666 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,7 +17,7 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
+ purge/1, publish/5, publish_delivered/5, fetch/2, ack/2,
requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
@@ -153,14 +153,14 @@ purge(State = #state { gm = GM,
{Count, State #state { backing_queue_state = BQS1,
set_delivered = 0 }}.
-publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
+publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, Redelivered,
State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg, Redelivered}),
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, Redelivered, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
@@ -174,7 +174,7 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
%% all slaves are forced to interpret this publish_delivered at
%% the same point, especially if we die and a slave is promoted.
ok = gm:confirmed_broadcast(
- GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}),
+ GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg, false}),
{AckTag, BQS1} =
BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS),
AM1 = maybe_store_acktag(AckTag, MsgId, AM),
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 039b274908..b11cd1997c 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -536,8 +536,10 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
end, gb_trees:empty(), MSList),
NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
- Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
+ Deliveries = [{Delivery, true} ||
+ {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
+ rabbit_log:warning("Promotion deliveries: ~p~n", [Deliveries]),
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
AckTags, Deliveries, KS, MTC),
@@ -693,7 +695,8 @@ remove_from_pending_ch(MsgId, ChPid, SQ) ->
end.
process_instruction(
- {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }},
+ {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId },
+ Redelivered},
State = #state { sender_queues = SQ,
backing_queue = BQ,
backing_queue_state = BQS,
@@ -743,9 +746,10 @@ process_instruction(
{ok,
case Deliver of
false ->
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, Redelivered, BQS),
State2 #state { backing_queue_state = BQS1 };
{true, AckRequired} ->
+ false = Redelivered, %% master:publish_delivered/5 only sends this
{AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
ChPid, BQS),
maybe_store_ack(AckRequired, MsgId, AckTag,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 98c4571736..59c0bbebf5 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -17,7 +17,7 @@
-module(rabbit_variable_queue).
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
- publish/4, publish_delivered/5, drain_confirmed/1,
+ publish/5, publish_delivered/5, drain_confirmed/1,
dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
@@ -521,16 +521,16 @@ purge(State = #vqstate { q4 = Q4,
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- next_seq_id = SeqId,
- len = Len,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- ram_msg_count = RamMsgCount,
- unconfirmed = UC }) ->
+ _ChPid, Redelivered, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ next_seq_id = SeqId,
+ len = Len,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ ram_msg_count = RamMsgCount,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps),
+ MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps, Redelivered),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case ?QUEUE:is_empty(Q3) of
false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
@@ -566,7 +566,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
durable = IsDurable,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
+ MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps, false))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
@@ -874,9 +874,9 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set;
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
- MsgProps) ->
+ MsgProps, Redelivered) ->
#msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
- is_persistent = IsPersistent, is_delivered = false,
+ is_persistent = IsPersistent, is_delivered = Redelivered,
msg_on_disk = false, index_on_disk = false,
msg_props = MsgProps }.