summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl14
-rw-r--r--src/rabbit_amqqueue_process.erl52
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_exchange_type_invalid.erl3
-rw-r--r--src/rabbit_variable_queue.erl12
5 files changed, 44 insertions, 39 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 55736b287e..c004c4899f 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -719,13 +719,13 @@ erts_version_check() ->
print_banner() ->
{ok, Product} = application:get_key(id),
{ok, Version} = application:get_key(vsn),
- io:format("~n## ## ~s ~s. ~s"
- "~n## ## ~s"
- "~n##########"
- "~n###### ## Logs: ~s"
- "~n########## ~s"
- "~n"
- "~n Starting broker...",
+ io:format("~n ~s ~s. ~s"
+ "~n ## ## ~s"
+ "~n ## ##"
+ "~n ########## Logs: ~s"
+ "~n ###### ## ~s"
+ "~n ##########"
+ "~n Starting broker...",
[Product, Version, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE,
log_location(kernel), log_location(sasl)]).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 35f1949321..35a28b6bac 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -497,10 +497,9 @@ deliver_msg_to_consumer(DeliverFun, NewLimiter,
{Stop, State1}.
deliver_from_queue_deliver(AckRequired, State) ->
- {Result, State1} = fetch(AckRequired, State),
- State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(State1),
- {Result, BQ:is_empty(BQS), State2}.
+ {Result, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} =
+ fetch(AckRequired, State),
+ {Result, BQ:is_empty(BQS), State1}.
confirm_messages([], State) ->
State;
@@ -547,13 +546,11 @@ discard(#delivery{sender = SenderPid,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
State1#q{backing_queue_state = BQS1}.
-run_message_queue(State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(State),
- {_IsEmpty1, State2} = deliver_msgs_to_consumers(
+run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ {_IsEmpty1, State1} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
- BQ:is_empty(BQS), State1),
- State2.
+ BQ:is_empty(BQS), State),
+ State1.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
@@ -587,20 +584,32 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
+ IsEmpty = BQ:is_empty(BQS),
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
- ensure_ttl_timer(Props#message_properties.expiry,
- State2#q{backing_queue_state = BQS1})
+ State3 = State2#q{backing_queue_state = BQS1},
+ %% optimisation: it would be perfectly safe to always
+ %% invoke drop_expired_msgs here, but that is expensive so
+ %% we only do that IFF the new message ends up at the head
+ %% of the queue (because the queue was empty) and has an
+ %% expiry. Only then may it need expiring straight away,
+ %% or, if expiry is not due yet, the expiry timer may need
+ %% (re)scheduling.
+ case {IsEmpty, Props#message_properties.expiry} of
+ {false, _} -> State3;
+ {true, undefined} -> State3;
+ {true, _} -> drop_expired_msgs(State3)
+ end
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- run_message_queue(State#q{backing_queue_state = BQS1}).
+ run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
- {Result, State#q{backing_queue_state = BQS1}}.
+ {Result, drop_expired_msgs(State#q{backing_queue_state = BQS1})}.
ack(AckTags, ChPid, State) ->
subtract_acks(ChPid, AckTags, State,
@@ -746,7 +755,13 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
drop_expired_msgs(State = #q{backing_queue_state = BQS,
backing_queue = BQ }) ->
- Now = now_micros(),
+ case BQ:is_empty(BQS) of
+ true -> State;
+ false -> drop_expired_msgs(now_micros(), State)
+ end.
+
+drop_expired_msgs(Now, State = #q{backing_queue_state = BQS,
+ backing_queue = BQ }) ->
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
{Props, State1} =
with_dlx(
@@ -1091,7 +1106,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case fetch(AckRequired, drop_expired_msgs(State1)) of
+ case fetch(AckRequired, State1) of
{empty, State2} ->
reply(empty, State2);
{{Message, IsDelivered, AckTag}, State2} ->
@@ -1174,7 +1189,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
handle_call(stat, _From, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(ensure_expiry_timer(State)),
+ ensure_expiry_timer(State),
reply({ok, BQ:len(BQS), consumer_count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, From,
@@ -1253,8 +1268,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- noreply(run_message_queue(
- State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}));
+ noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State = #q{senders = Senders}) ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index c2b52a7c2c..2f247448c7 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -18,8 +18,6 @@
-ifdef(use_specs).
--export_type([async_callback/0]).
-
%% We can't specify a per-queue ack/state with callback signatures
-type(ack() :: any()).
-type(state() :: any()).
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
index 4a48a45885..6b07351a47 100644
--- a/src/rabbit_exchange_type_invalid.erl
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -24,8 +24,7 @@
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
description() ->
- [{name, <<"invalid">>},
- {description,
+ [{description,
<<"Dummy exchange type, to be used when the intended one is not found.">>
}].
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 5d463f575b..f7c6c7295a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -262,8 +262,6 @@
durable,
transient_threshold,
- async_callback,
-
len,
persistent_count,
@@ -356,8 +354,6 @@
durable :: boolean(),
transient_threshold :: non_neg_integer(),
- async_callback :: rabbit_backing_queue:async_callback(),
-
len :: non_neg_integer(),
persistent_count :: non_neg_integer(),
@@ -426,7 +422,7 @@ init(Queue, Recover, AsyncCallback) ->
init(#amqqueue { name = QueueName, durable = IsDurable }, false,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
- init(IsDurable, IndexState, 0, [], AsyncCallback,
+ init(IsDurable, IndexState, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
MsgOnDiskFun, AsyncCallback);
@@ -454,7 +450,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, Terms1, AsyncCallback,
+ init(true, IndexState, DeltaCount, Terms1,
PersistentClient, TransientClient).
terminate(_Reason, State) ->
@@ -1004,7 +1000,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
+init(IsDurable, IndexState, DeltaCount, Terms,
PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
@@ -1030,8 +1026,6 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
durable = IsDurable,
transient_threshold = NextSeqId,
- async_callback = AsyncCallback,
-
len = DeltaCount1,
persistent_count = DeltaCount1,