summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-02-14 11:45:42 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-02-14 11:45:42 +0000
commit4790be2e427a1e5d0b429edb4166d4a71797d11c (patch)
tree74dfebc2ec7efaea7ef4a73a851a0ffac45de599
parent94bc4059a2a34dc1b9136dece10ba4db9bf53b4b (diff)
parent5209bbcfd8757073220ee75cf5e1c676f8aba163 (diff)
downloadrabbitmq-server-git-4790be2e427a1e5d0b429edb4166d4a71797d11c.tar.gz
merge heads
-rw-r--r--src/rabbit_amqqueue_process.erl35
1 files changed, 22 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 00a3a85acf..4a0ccf814a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -519,13 +519,11 @@ discard(#delivery{sender = SenderPid,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
State1#q{backing_queue_state = BQS1}.
-run_message_queue(State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(State),
- {_IsEmpty1, State2} = deliver_msgs_to_consumers(
+run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ {_IsEmpty1, State1} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
- BQ:is_empty(BQS), State1),
- State2.
+ BQ:is_empty(BQS), State),
+ State1.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
@@ -559,15 +557,27 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
+ IsEmpty = BQ:is_empty(BQS),
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
- ensure_ttl_timer(Props#message_properties.expiry,
- State2#q{backing_queue_state = BQS1})
+ State3 = State2#q{backing_queue_state = BQS1},
+ %% optimisation: it would be perfectly safe to always
+ %% invoke drop_expired_msgs here, but that is expensive so
+ %% we only do that IFF the new message ends up at the head
+ %% of the queue (because the queue was empty) and has an
+ %% expiry. Only then may it need expiring straight away,
+ %% or, if expiry is not due yet, the expiry timer may need
+ %% (re)scheduling.
+ case {IsEmpty, Props#message_properties.expiry} of
+ {false, _} -> State3;
+ {true, undefined} -> State3;
+ {true, _} -> drop_expired_msgs(State3)
+ end
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- run_message_queue(State#q{backing_queue_state = BQS1}).
+ run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -1064,7 +1074,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case fetch(AckRequired, drop_expired_msgs(State1)) of
+ case fetch(AckRequired, State1) of
{empty, State2} ->
reply(empty, State2);
{{Message, IsDelivered, AckTag}, State2} ->
@@ -1137,7 +1147,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
handle_call(stat, _From, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(ensure_expiry_timer(State)),
+ ensure_expiry_timer(State),
reply({ok, BQ:len(BQS), consumer_count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, From,
@@ -1216,8 +1226,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- noreply(run_message_queue(
- State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}));
+ noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State = #q{senders = Senders}) ->