diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 74 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 70 |
4 files changed, 119 insertions, 47 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0cdb4fff08..d960444faf 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -57,6 +57,7 @@ -include_lib("stdlib/include/qlc.hrl"). -define(EXPIRES_TYPE, long). +-define(TTL_TYPE, long). %%---------------------------------------------------------------------------- @@ -308,7 +309,8 @@ check_declare_arguments(QueueName, Args) -> precondition_failed, "invalid arg '~s' for ~s: ~w", [Key, rabbit_misc:rs(QueueName), Error]) - end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}]], + end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}, + {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]], ok. check_expires_argument(undefined) -> @@ -321,6 +323,16 @@ check_expires_argument({?EXPIRES_TYPE, _Expires}) -> check_expires_argument(_) -> {error, expires_not_of_type_long}. +check_message_ttl_argument(undefined) -> + ok; +check_message_ttl_argument({?TTL_TYPE, TTL}) + when is_integer(TTL) andalso TTL > 0 -> + ok; +check_message_ttl_argument({?TTL_TYPE, _TTL}) -> + {error, ttl_zero_or_less}; +check_message_ttl_argument(_) -> + {error, ttl_not_of_type_long}. + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2cab7136a6..b6f898cece 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,6 +39,8 @@ -define(SYNC_INTERVAL, 5). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(BASE_MSG_PROPERTIES, #msg_properties{expiry = undefined}). + -export([start_link/1, info_keys/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, @@ -60,7 +62,8 @@ sync_timer_ref, rate_timer_ref, expiry_timer_ref, - stats_timer + stats_timer, + ttl }). -record(consumer, {tag, ack_required}). @@ -122,6 +125,7 @@ init(Q) -> sync_timer_ref = undefined, rate_timer_ref = undefined, expiry_timer_ref = undefined, + ttl = undefined, stats_timer = rabbit_event:init_stats_timer()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -144,12 +148,21 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +init_queue_state(State) -> + init_expires(init_ttl(State)). + init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of {long, Expires} -> ensure_expiry_timer(State#q{expires = Expires}); undefined -> State end. +init_ttl(State = #q{q = #amqqueue{arguments = Arguments}}) -> + case rabbit_misc:table_lookup(Arguments, <<"x-message-ttl">>) of + {long, Ttl} -> State#q{ttl=Ttl}; + undefined -> State + end. + declare(Recover, From, State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, backing_queue = BQ, backing_queue_state = undefined}) -> @@ -167,7 +180,7 @@ declare(Recover, From, queue_created, [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), - noreply(init_expires(State#q{backing_queue_state = BQS})); + noreply(init_queue_state(State#q{backing_queue_state = BQS})); Q1 -> {stop, normal, {existing, Q1}, State} end. @@ -393,7 +406,7 @@ deliver_from_queue_pred(IsEmpty, _State) -> deliver_from_queue_deliver(AckRequired, false, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {{Message, IsDelivered, AckTag, Remaining}, BQS1} = + {{Message, _MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), {{Message, IsDelivered, AckTag}, 0 == Remaining, State #q { backing_queue_state = BQS1 }}. @@ -418,7 +431,8 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), - {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. + MsgProperties = new_msg_properties(State), + {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, MsgProperties, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of @@ -426,13 +440,28 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - BQS = BQ:publish(Message, State #q.backing_queue_state), + MsgProperties = new_msg_properties(State), + BQS = BQ:publish(Message, MsgProperties, State #q.backing_queue_state), {false, NewState#q{backing_queue_state = BQS}} end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> + MsgPropsFun = reset_msg_expiry_fun(State), maybe_run_queue_via_backing_queue( - fun (BQS) -> BQ:requeue(AckTags, BQS) end, State). + fun (BQS) -> BQ:requeue(AckTags, MsgPropsFun, BQS) end, State). + +fetch(AckRequired, State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + case BQ:fetch(AckRequired, BQS) of + {empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}}; + {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} -> + case msg_expired(MsgProperties) of + true -> + fetch(AckRequired, State#q{backing_queue_state = BQS1}); + false -> + {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}} + end + end. add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). @@ -530,7 +559,7 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {AckTags, BQS1} = - BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS), + BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, reset_msg_expiry_fun(State), BQS), %% ChPid must be known here because of the participant management %% by the channel. C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), @@ -549,6 +578,26 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). +msg_expired(_MsgProperties = #msg_properties{expiry = undefined}) -> + false; +msg_expired(_MsgProperties = #msg_properties{expiry=Expiry}) -> + Now = timer:now_diff(now(), {0,0,0}), + Now > Expiry. + +reset_msg_expiry_fun(State) -> + fun(MsgProps) -> + MsgProps#msg_properties{expiry=calculate_msg_expiry(State)} + end. + +new_msg_properties(State) -> + #msg_properties{expiry = calculate_msg_expiry(State)}. + +calculate_msg_expiry(_State = #q{ttl = undefined}) -> + undefined; +calculate_msg_expiry(_State = #q{ttl = Ttl}) -> + Now = timer:now_diff(now(), {0,0,0}), + Now + (Ttl * 1000). + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(name, #q{q = #amqqueue{name = Name}}) -> Name; @@ -670,13 +719,12 @@ handle_call({notify_down, ChPid}, _From, State) -> end; handle_call({basic_get, ChPid, NoAck}, _From, - State = #q{q = #amqqueue{name = QName}, - backing_queue_state = BQS, backing_queue = BQ}) -> + State = #q{q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), - case BQ:fetch(AckRequired, BQS) of - {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1}); - {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> + case fetch(AckRequired, State1) of + {empty, State2} -> reply(empty, State2); + {{Message, IsDelivered, AckTag, Remaining}, State2} -> case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), store_ch_record( @@ -684,7 +732,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, false -> ok end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1}) + reply({ok, Remaining, Msg}, State2) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 2230c507e9..cc7f857188 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -62,7 +62,7 @@ behaviour_info(callbacks) -> {purge, 1}, %% Publish a message. - {publish, 2}, + {publish, 3}, %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls @@ -77,7 +77,7 @@ behaviour_info(callbacks) -> {ack, 2}, %% A publish, but in the context of a transaction. - {tx_publish, 3}, + {tx_publish, 4}, %% Acks, but in the context of a transaction. {tx_ack, 3}, @@ -89,11 +89,11 @@ behaviour_info(callbacks) -> %% Commit a transaction. The Fun passed in must be called once %% the messages have really been commited. This CPS permits the %% possibility of commit coalescing. - {tx_commit, 3}, + {tx_commit, 4}, %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. - {requeue, 2}, + {requeue, 3}, %% How long is my queue? {len, 1}, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0f52eee84f..4e978fd5ad 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -32,9 +32,9 @@ -module(rabbit_variable_queue). -export([init/3, terminate/1, delete_and_terminate/1, - purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, - tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, - requeue/2, len/1, is_empty/1, + purge/1, publish/3, publish_delivered/3, fetch/2, ack/2, + tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, + requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -248,7 +248,8 @@ is_persistent, is_delivered, msg_on_disk, - index_on_disk + index_on_disk, + msg_properties }). -record(delta, @@ -490,8 +491,8 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> ram_index_count = 0, persistent_count = 0 })}. -publish(Msg, State) -> - {_SeqId, State1} = publish(Msg, false, false, State), +publish(Msg, MsgProperties, State) -> + {_SeqId, State1} = publish(Msg, MsgProperties, false, false, State), a(reduce_memory_use(State1)). publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> @@ -505,7 +506,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, pending_ack = PA, durable = IsDurable }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, #msg_properties{})) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), PA1 = record_pending_ack(m(MsgStatus1), PA), @@ -532,7 +533,8 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, {{value, MsgStatus = #msg_status { msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, + msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, + msg_properties = MsgProperties }}, Q4a} -> %% 1. Mark it delivered if necessary @@ -563,7 +565,7 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), Len1 = Len - 1, - {{Msg, IsDelivered, AckTag, Len1}, + {{Msg, MsgProperties, IsDelivered, AckTag, Len1}, a(State #vqstate { q4 = Q4a, ram_msg_count = RamMsgCount - 1, out_counter = OutCount + 1, @@ -579,12 +581,13 @@ ack(AckTags, State) -> AckTags, State)). tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, + MsgProperties, State = #vqstate { durable = IsDurable, msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), + store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProperties} | Pubs] }), a(case IsPersistent andalso IsDurable of - true -> MsgStatus = msg_status(true, undefined, Msg), + true -> MsgStatus = msg_status(true, undefined, Msg, MsgProperties), {#msg_status { msg_on_disk = true }, MSCState1} = maybe_write_msg_to_disk(false, MsgStatus, MSCState), State #vqstate { msg_store_clients = MSCState1 }; @@ -606,10 +609,14 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> end, {lists:append(AckTags), a(State)}. -tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> +tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), - PubsOrdered = lists:reverse(Pubs), + F = fun({Msg, MsgProperties}) -> + {Msg, MsgPropsFun(MsgProperties)} + end, + PubsProcessed = lists:map(F, Pubs), + PubsOrdered = lists:reverse(PubsProcessed), AckTags1 = lists:append(AckTags), PersistentGuids = persistent_guids(PubsOrdered), HasPersistentPubs = PersistentGuids =/= [], @@ -624,18 +631,21 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> HasPersistentPubs, PubsOrdered, AckTags1, Fun, State) end)}. -requeue(AckTags, State) -> +requeue(AckTags, MsgPropsFun, State) -> a(reduce_memory_use( ack(fun rabbit_msg_store:release/2, - fun (#msg_status { msg = Msg }, State1) -> - {_SeqId, State2} = publish(Msg, true, false, State1), + fun (#msg_status { msg = Msg, + msg_properties = MsgProperties }, State1) -> + {_SeqId, State2} = + publish(Msg, MsgPropsFun(MsgProperties), true, false, State1), State2; - ({IsPersistent, Guid}, State1) -> + ({IsPersistent, Guid, MsgProperties}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, {{ok, Msg = #basic_message{}}, MSCState1} = read_from_msg_store(MSCState, IsPersistent, Guid), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, true, true, State2), + {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProperties), + true, true, State2), State3 end, AckTags, State))). @@ -783,10 +793,11 @@ one_if(false) -> 0. cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. -msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) -> +msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgProperties) -> #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, is_persistent = IsPersistent, is_delivered = false, - msg_on_disk = false, index_on_disk = false }. + msg_on_disk = false, index_on_disk = false, + msg_properties = MsgProperties }. find_msg_store(true) -> ?PERSISTENT_MSG_STORE; find_msg_store(false) -> ?TRANSIENT_MSG_STORE. @@ -914,7 +925,7 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, true -> [AckTag || AckTag <- AckTags, case dict:fetch(AckTag, PA) of #msg_status {} -> false; - {IsPersistent, _Guid} -> IsPersistent + {IsPersistent, _Guid, _MsgProperties} -> IsPersistent end]; false -> [] end, @@ -946,10 +957,10 @@ tx_commit_index(State = #vqstate { on_sync = #sync { Pubs = lists:append(lists:reverse(SPubs)), {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( - fun (Msg = #basic_message { is_persistent = IsPersistent }, + fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties}, {SeqIdsAcc, State2}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = publish(Msg, false, IsPersistent1, State2), + {SeqId, State3} = publish(Msg, MsgProperties, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), @@ -996,7 +1007,7 @@ remove_queue_entries1( %%---------------------------------------------------------------------------- publish(Msg = #basic_message { is_persistent = IsPersistent }, - IsDelivered, MsgOnDisk, + MsgProperties, IsDelivered, MsgOnDisk, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, len = Len, @@ -1005,8 +1016,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, durable = IsDurable, ram_msg_count = RamMsgCount }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) - #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk }, + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties)) + #msg_status { is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk}, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case bpqueue:is_empty(Q3) of false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; @@ -1073,9 +1085,9 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk } = MsgStatus, PA) -> + msg_on_disk = MsgOnDisk, msg_properties = MsgProperties } = MsgStatus, PA) -> AckEntry = case MsgOnDisk of - true -> {IsPersistent, Guid}; + true -> {IsPersistent, Guid, MsgProperties}; false -> MsgStatus end, dict:store(SeqId, AckEntry, PA). @@ -1128,7 +1140,7 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false }, Acc) -> Acc; -accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> +accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProperties}, {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. |
