diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2013-02-14 11:47:04 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-02-14 11:47:04 +0000 |
| commit | 6752bb76125d50c0e69c2ea9367f017c0fca40b2 (patch) | |
| tree | a51d1eb4b80533da012ffb39f3941ba96e6c0c4b | |
| parent | 16c725b4bff4b00193eeb843bd5da328b6430f7e (diff) | |
| parent | 4790be2e427a1e5d0b429edb4166d4a71797d11c (diff) | |
| download | rabbitmq-server-git-6752bb76125d50c0e69c2ea9367f017c0fca40b2.tar.gz | |
merge default into bug23749
| -rw-r--r-- | src/rabbit.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 52 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_invalid.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 12 |
5 files changed, 44 insertions, 39 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 55736b287e..c004c4899f 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -719,13 +719,13 @@ erts_version_check() -> print_banner() -> {ok, Product} = application:get_key(id), {ok, Version} = application:get_key(vsn), - io:format("~n## ## ~s ~s. ~s" - "~n## ## ~s" - "~n##########" - "~n###### ## Logs: ~s" - "~n########## ~s" - "~n" - "~n Starting broker...", + io:format("~n ~s ~s. ~s" + "~n ## ## ~s" + "~n ## ##" + "~n ########## Logs: ~s" + "~n ###### ## ~s" + "~n ##########" + "~n Starting broker...", [Product, Version, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE, log_location(kernel), log_location(sasl)]). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 35f1949321..35a28b6bac 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -497,10 +497,9 @@ deliver_msg_to_consumer(DeliverFun, NewLimiter, {Stop, State1}. deliver_from_queue_deliver(AckRequired, State) -> - {Result, State1} = fetch(AckRequired, State), - State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_msgs(State1), - {Result, BQ:is_empty(BQS), State2}. + {Result, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} = + fetch(AckRequired, State), + {Result, BQ:is_empty(BQS), State1}. confirm_messages([], State) -> State; @@ -547,13 +546,11 @@ discard(#delivery{sender = SenderPid, BQS1 = BQ:discard(MsgId, SenderPid, BQS), State1#q{backing_queue_state = BQS1}. -run_message_queue(State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_msgs(State), - {_IsEmpty1, State2} = deliver_msgs_to_consumers( +run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + {_IsEmpty1, State1} = deliver_msgs_to_consumers( fun deliver_from_queue_deliver/2, - BQ:is_empty(BQS), State1), - State2. + BQ:is_empty(BQS), State), + State1. attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, @@ -587,20 +584,32 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, {false, State2 = #q{ttl = 0, dlx = undefined}} -> discard(Delivery, State2); {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> + IsEmpty = BQ:is_empty(BQS), BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), - ensure_ttl_timer(Props#message_properties.expiry, - State2#q{backing_queue_state = BQS1}) + State3 = State2#q{backing_queue_state = BQS1}, + %% optimisation: it would be perfectly safe to always + %% invoke drop_expired_msgs here, but that is expensive so + %% we only do that IFF the new message ends up at the head + %% of the queue (because the queue was empty) and has an + %% expiry. Only then may it need expiring straight away, + %% or, if expiry is not due yet, the expiry timer may need + %% (re)scheduling. + case {IsEmpty, Props#message_properties.expiry} of + {false, _} -> State3; + {true, undefined} -> State3; + {true, _} -> drop_expired_msgs(State3) + end end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - run_message_queue(State#q{backing_queue_state = BQS1}). + run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})). fetch(AckRequired, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), - {Result, State#q{backing_queue_state = BQS1}}. + {Result, drop_expired_msgs(State#q{backing_queue_state = BQS1})}. ack(AckTags, ChPid, State) -> subtract_acks(ChPid, AckTags, State, @@ -746,7 +755,13 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> drop_expired_msgs(State = #q{backing_queue_state = BQS, backing_queue = BQ }) -> - Now = now_micros(), + case BQ:is_empty(BQS) of + true -> State; + false -> drop_expired_msgs(now_micros(), State) + end. + +drop_expired_msgs(Now, State = #q{backing_queue_state = BQS, + backing_queue = BQ }) -> ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, {Props, State1} = with_dlx( @@ -1091,7 +1106,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), - case fetch(AckRequired, drop_expired_msgs(State1)) of + case fetch(AckRequired, State1) of {empty, State2} -> reply(empty, State2); {{Message, IsDelivered, AckTag}, State2} -> @@ -1174,7 +1189,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, handle_call(stat, _From, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_msgs(ensure_expiry_timer(State)), + ensure_expiry_timer(State), reply({ok, BQ:len(BQS), consumer_count()}, State1); handle_call({delete, IfUnused, IfEmpty}, From, @@ -1253,8 +1268,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - noreply(run_message_queue( - State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)})); + noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}); handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, State = #q{senders = Senders}) -> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index c2b52a7c2c..2f247448c7 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -18,8 +18,6 @@ -ifdef(use_specs). --export_type([async_callback/0]). - %% We can't specify a per-queue ack/state with callback signatures -type(ack() :: any()). -type(state() :: any()). diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl index 4a48a45885..6b07351a47 100644 --- a/src/rabbit_exchange_type_invalid.erl +++ b/src/rabbit_exchange_type_invalid.erl @@ -24,8 +24,7 @@ add_binding/3, remove_bindings/3, assert_args_equivalence/2]). description() -> - [{name, <<"invalid">>}, - {description, + [{description, <<"Dummy exchange type, to be used when the intended one is not found.">> }]. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 5d463f575b..f7c6c7295a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -262,8 +262,6 @@ durable, transient_threshold, - async_callback, - len, persistent_count, @@ -356,8 +354,6 @@ durable :: boolean(), transient_threshold :: non_neg_integer(), - async_callback :: rabbit_backing_queue:async_callback(), - len :: non_neg_integer(), persistent_count :: non_neg_integer(), @@ -426,7 +422,7 @@ init(Queue, Recover, AsyncCallback) -> init(#amqqueue { name = QueueName, durable = IsDurable }, false, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), - init(IsDurable, IndexState, 0, [], AsyncCallback, + init(IsDurable, IndexState, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, MsgOnDiskFun, AsyncCallback); @@ -454,7 +450,7 @@ init(#amqqueue { name = QueueName, durable = true }, true, rabbit_msg_store:contains(MsgId, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, Terms1, AsyncCallback, + init(true, IndexState, DeltaCount, Terms1, PersistentClient, TransientClient). terminate(_Reason, State) -> @@ -1004,7 +1000,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, +init(IsDurable, IndexState, DeltaCount, Terms, PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), @@ -1030,8 +1026,6 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, durable = IsDurable, transient_threshold = NextSeqId, - async_callback = AsyncCallback, - len = DeltaCount1, persistent_count = DeltaCount1, |
