summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-09-26 18:09:06 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-09-26 18:09:06 +0100
commit1fc1f219f7b966f486c580fbcd2fd0da396fc3d7 (patch)
tree68f803ce4c96fa53fb540b1e736883a6f0939376
parent54fd0701e1e2fcba4c2aa7def75622f641f2e23c (diff)
downloadrabbitmq-server-git-1fc1f219f7b966f486c580fbcd2fd0da396fc3d7.tar.gz
Stick redelivered in #message_properties, which makes the diff rather smaller.
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/rabbit_amqqueue_process.erl19
-rw-r--r--src/rabbit_backing_queue.erl5
-rw-r--r--src/rabbit_mirror_queue_master.erl10
-rw-r--r--src/rabbit_mirror_queue_slave.erl7
-rw-r--r--src/rabbit_variable_queue.erl25
6 files changed, 33 insertions, 36 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index fff922057d..19fc8ff75b 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -78,7 +78,8 @@
-record(event, {type, props, timestamp}).
--record(message_properties, {expiry, needs_confirming = false}).
+-record(message_properties, {expiry, needs_confirming = false,
+ redelivered = false}).
-record(plugin, {name, %% atom()
version, %% string()
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d081056406..a6f494739a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -536,19 +536,17 @@ run_message_queue(State) ->
BQ:is_empty(BQS), State1),
State2.
-attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
- Redelivered,
+attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props,
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
deliver_msgs_to_consumers(
fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
- Props = message_properties(Confirm, State1),
{AckTag, BQS3} = BQ:publish_delivered(
AckRequired, Message, Props,
SenderPid, BQS2),
- {{Message, Redelivered, AckTag}, true,
- State1#q{backing_queue_state = BQS3}}
+ {{Message, Props#message_properties.redelivered, AckTag},
+ true, State1#q{backing_queue_state = BQS3}}
end, false, State#q{backing_queue_state = BQS1});
{Duplicate, BQS1} ->
%% if the message has previously been seen by the BQ then
@@ -566,7 +564,8 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
sender = SenderPid}, Redelivered,
State) ->
Confirm = should_confirm_message(Delivery, State),
- case attempt_delivery(Delivery, Confirm, Redelivered, State) of
+ Props = message_properties(Confirm, Redelivered, State),
+ case attempt_delivery(Delivery, Props, State) of
{true, State1} ->
maybe_record_confirm_message(Confirm, State1);
%% the next one is an optimisations
@@ -576,8 +575,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
{false, State1} ->
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_record_confirm_message(Confirm, State1),
- Props = message_properties(Confirm, State2),
- BQS1 = BQ:publish(Message, Props, SenderPid, Redelivered, BQS),
+ BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
State2#q{backing_queue_state = BQS1})
end.
@@ -706,9 +704,10 @@ discard_delivery(#delivery{sender = SenderPid,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}.
-message_properties(Confirm, #q{ttl = TTL}) ->
+message_properties(Confirm, Redelivered, #q{ttl = TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL),
- needs_confirming = needs_confirming(Confirm)}.
+ needs_confirming = needs_confirming(Confirm),
+ redelivered = Redelivered}.
calculate_msg_expiry(undefined) -> undefined;
calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 9510ae2382..d69a6c3b98 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -77,8 +77,7 @@
%% Publish a message.
-callback publish(rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), boolean(),
- state()) ->
+ rabbit_types:message_properties(), pid(), state()) ->
state().
%% Called for messages which have already been passed straight
@@ -213,7 +212,7 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
- {delete_and_terminate, 2}, {purge, 1}, {publish, 5},
+ {delete_and_terminate, 2}, {purge, 1}, {publish, 4},
{publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3},
{fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 9e98c72666..c11a8ff7c0 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,7 +17,7 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/5, publish_delivered/5, fetch/2, ack/2,
+ purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
@@ -153,14 +153,14 @@ purge(State = #state { gm = GM,
{Count, State #state { backing_queue_state = BQS1,
set_delivered = 0 }}.
-publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, Redelivered,
+publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg, Redelivered}),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, Redelivered, BQS),
+ ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}),
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
@@ -174,7 +174,7 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
%% all slaves are forced to interpret this publish_delivered at
%% the same point, especially if we die and a slave is promoted.
ok = gm:confirmed_broadcast(
- GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg, false}),
+ GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}),
{AckTag, BQS1} =
BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS),
AM1 = maybe_store_acktag(AckTag, MsgId, AM),
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index b11cd1997c..1e10e8a93e 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -539,7 +539,6 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
Deliveries = [{Delivery, true} ||
{_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
- rabbit_log:warning("Promotion deliveries: ~p~n", [Deliveries]),
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
AckTags, Deliveries, KS, MTC),
@@ -695,8 +694,7 @@ remove_from_pending_ch(MsgId, ChPid, SQ) ->
end.
process_instruction(
- {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId },
- Redelivered},
+ {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }},
State = #state { sender_queues = SQ,
backing_queue = BQ,
backing_queue_state = BQS,
@@ -746,10 +744,9 @@ process_instruction(
{ok,
case Deliver of
false ->
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, Redelivered, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
State2 #state { backing_queue_state = BQS1 };
{true, AckRequired} ->
- false = Redelivered, %% master:publish_delivered/5 only sends this
{AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
ChPid, BQS),
maybe_store_ack(AckRequired, MsgId, AckTag,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 59c0bbebf5..7af2927941 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -17,7 +17,7 @@
-module(rabbit_variable_queue).
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
- publish/5, publish_delivered/5, drain_confirmed/1,
+ publish/4, publish_delivered/5, drain_confirmed/1,
dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
@@ -521,16 +521,16 @@ purge(State = #vqstate { q4 = Q4,
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- _ChPid, Redelivered, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- next_seq_id = SeqId,
- len = Len,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- ram_msg_count = RamMsgCount,
- unconfirmed = UC }) ->
+ _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ next_seq_id = SeqId,
+ len = Len,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ ram_msg_count = RamMsgCount,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps, Redelivered),
+ MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case ?QUEUE:is_empty(Q3) of
false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
@@ -566,7 +566,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
durable = IsDurable,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps, false))
+ MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
@@ -874,7 +874,8 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set;
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
- MsgProps, Redelivered) ->
+ MsgProps = #message_properties{redelivered = Redelivered}) ->
+ %% TODO would it make sense to remove #msg_status.is_delivered?
#msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
is_persistent = IsPersistent, is_delivered = Redelivered,
msg_on_disk = false, index_on_disk = false,