diff options
| -rw-r--r-- | include/rabbit.hrl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 42 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 60 |
4 files changed, 75 insertions, 37 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index baa2d72110..be0d4b8233 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -74,7 +74,7 @@ -record(event, {type, props, timestamp}). --record(msg_properties, {ttl}). +-record(msg_properties, {expiry}). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d52660c5ac..2cee51f70d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,6 +39,8 @@ -define(SYNC_INTERVAL, 5). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(BASE_MSG_PROPERTIES, #msg_properties{expiry = undefined}). + -export([start_link/1, info_keys/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, @@ -60,7 +62,8 @@ sync_timer_ref, rate_timer_ref, expiry_timer_ref, - stats_timer + stats_timer, + ttl }). -record(consumer, {tag, ack_required}). @@ -122,6 +125,7 @@ init(Q) -> sync_timer_ref = undefined, rate_timer_ref = undefined, expiry_timer_ref = undefined, + ttl = undefined, stats_timer = rabbit_event:init_stats_timer()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -144,12 +148,21 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +init_queue_state(State) -> + init_expires(init_ttl(State)). + init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of {long, Expires} -> ensure_expiry_timer(State#q{expires = Expires}); undefined -> State end. +init_ttl(State = #q{q = #amqqueue{arguments = Arguments}}) -> + case rabbit_misc:table_lookup(Arguments, <<"x-message-ttl">>) of + {long, Ttl} -> State#q{ttl=Ttl}; + undefined -> State + end. + declare(Recover, From, State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, backing_queue = BQ, backing_queue_state = undefined}) -> @@ -167,7 +180,7 @@ declare(Recover, From, queue_created, [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), - noreply(init_expires(State#q{backing_queue_state = BQS})); + noreply(init_queue_state(State#q{backing_queue_state = BQS})); Q1 -> {stop, normal, {existing, Q1}, State} end. @@ -418,7 +431,8 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), - {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. + MsgProperties = new_msg_properties(State), + {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, MsgProperties, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of @@ -426,13 +440,15 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - BQS = BQ:publish(Message, State #q.backing_queue_state), + MsgProperties = new_msg_properties(State), + BQS = BQ:publish(Message, MsgProperties, State #q.backing_queue_state), {false, NewState#q{backing_queue_state = BQS}} end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> + MsgPropsFun = reset_msg_expiry_fun(State), maybe_run_queue_via_backing_queue( - fun (BQS) -> BQ:requeue(AckTags, BQS) end, State). + fun (BQS) -> BQ:requeue(AckTags, MsgPropsFun, BQS) end, State). add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). @@ -530,7 +546,7 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {AckTags, BQS1} = - BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS), + BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, reset_msg_expiry_fun(State), BQS), %% ChPid must be known here because of the participant management %% by the channel. C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), @@ -549,6 +565,20 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). +reset_msg_expiry_fun(State) -> + fun(MsgProps) -> + MsgProps#msg_properties{expiry=calculate_msg_expiry(State)} + end. + +new_msg_properties(State) -> + #msg_properties{expiry = calculate_msg_expiry(State)}. + +calculate_msg_expiry(_State = #q{ttl = undefined}) -> + undefined; +calculate_msg_expiry(_State = #q{ttl = Ttl}) -> + Now = timer:now_diff(now(), {0,0,0}), + Now - (Ttl * 1000). + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(name, #q{q = #amqqueue{name = Name}}) -> Name; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 2230c507e9..cc7f857188 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -62,7 +62,7 @@ behaviour_info(callbacks) -> {purge, 1}, %% Publish a message. - {publish, 2}, + {publish, 3}, %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls @@ -77,7 +77,7 @@ behaviour_info(callbacks) -> {ack, 2}, %% A publish, but in the context of a transaction. - {tx_publish, 3}, + {tx_publish, 4}, %% Acks, but in the context of a transaction. {tx_ack, 3}, @@ -89,11 +89,11 @@ behaviour_info(callbacks) -> %% Commit a transaction. The Fun passed in must be called once %% the messages have really been commited. This CPS permits the %% possibility of commit coalescing. - {tx_commit, 3}, + {tx_commit, 4}, %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. - {requeue, 2}, + {requeue, 3}, %% How long is my queue? {len, 1}, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0f52eee84f..e0bc75b7e6 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -32,9 +32,9 @@ -module(rabbit_variable_queue). -export([init/3, terminate/1, delete_and_terminate/1, - purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, - tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, - requeue/2, len/1, is_empty/1, + purge/1, publish/3, publish_delivered/3, fetch/2, ack/2, + tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, + requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -248,7 +248,8 @@ is_persistent, is_delivered, msg_on_disk, - index_on_disk + index_on_disk, + msg_properties }). -record(delta, @@ -490,8 +491,8 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> ram_index_count = 0, persistent_count = 0 })}. -publish(Msg, State) -> - {_SeqId, State1} = publish(Msg, false, false, State), +publish(Msg, MsgProperties, State) -> + {_SeqId, State1} = publish(Msg, MsgProperties, false, false, State), a(reduce_memory_use(State1)). publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> @@ -505,7 +506,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, pending_ack = PA, durable = IsDurable }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, #msg_properties{})) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), PA1 = record_pending_ack(m(MsgStatus1), PA), @@ -579,12 +580,13 @@ ack(AckTags, State) -> AckTags, State)). tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, + MsgProperties, State = #vqstate { durable = IsDurable, msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), + store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProperties} | Pubs] }), a(case IsPersistent andalso IsDurable of - true -> MsgStatus = msg_status(true, undefined, Msg), + true -> MsgStatus = msg_status(true, undefined, Msg, MsgProperties), {#msg_status { msg_on_disk = true }, MSCState1} = maybe_write_msg_to_disk(false, MsgStatus, MSCState), State #vqstate { msg_store_clients = MSCState1 }; @@ -606,7 +608,7 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> end, {lists:append(AckTags), a(State)}. -tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> +tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), PubsOrdered = lists:reverse(Pubs), @@ -624,18 +626,21 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> HasPersistentPubs, PubsOrdered, AckTags1, Fun, State) end)}. -requeue(AckTags, State) -> +requeue(AckTags, MsgPropsFun, State) -> a(reduce_memory_use( ack(fun rabbit_msg_store:release/2, - fun (#msg_status { msg = Msg }, State1) -> - {_SeqId, State2} = publish(Msg, true, false, State1), + fun (#msg_status { msg = Msg, + msg_properties = MsgProperties }, State1) -> + {_SeqId, State2} = + publish(Msg, MsgPropsFun(MsgProperties), true, false, State1), State2; - ({IsPersistent, Guid}, State1) -> + ({IsPersistent, Guid, MsgProperties}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, {{ok, Msg = #basic_message{}}, MSCState1} = read_from_msg_store(MSCState, IsPersistent, Guid), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, true, true, State2), + {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProperties), + true, true, State2), State3 end, AckTags, State))). @@ -783,10 +788,11 @@ one_if(false) -> 0. cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. -msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) -> +msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgProperties) -> #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, is_persistent = IsPersistent, is_delivered = false, - msg_on_disk = false, index_on_disk = false }. + msg_on_disk = false, index_on_disk = false, + msg_properties = MsgProperties }. find_msg_store(true) -> ?PERSISTENT_MSG_STORE; find_msg_store(false) -> ?TRANSIENT_MSG_STORE. @@ -914,7 +920,7 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, true -> [AckTag || AckTag <- AckTags, case dict:fetch(AckTag, PA) of #msg_status {} -> false; - {IsPersistent, _Guid} -> IsPersistent + {IsPersistent, _Guid, _MsgProperties} -> IsPersistent end]; false -> [] end, @@ -946,10 +952,11 @@ tx_commit_index(State = #vqstate { on_sync = #sync { Pubs = lists:append(lists:reverse(SPubs)), {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( - fun (Msg = #basic_message { is_persistent = IsPersistent }, + fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties}, {SeqIdsAcc, State2}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = publish(Msg, false, IsPersistent1, State2), + MsgProperties1 = MsgProperties, + {SeqId, State3} = publish(Msg, MsgProperties1, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), @@ -996,7 +1003,7 @@ remove_queue_entries1( %%---------------------------------------------------------------------------- publish(Msg = #basic_message { is_persistent = IsPersistent }, - IsDelivered, MsgOnDisk, + MsgProperties, IsDelivered, MsgOnDisk, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, len = Len, @@ -1005,8 +1012,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, durable = IsDurable, ram_msg_count = RamMsgCount }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) - #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk }, + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties)) + #msg_status { is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk}, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case bpqueue:is_empty(Q3) of false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; @@ -1073,9 +1081,9 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk } = MsgStatus, PA) -> + msg_on_disk = MsgOnDisk, msg_properties = MsgProperties } = MsgStatus, PA) -> AckEntry = case MsgOnDisk of - true -> {IsPersistent, Guid}; + true -> {IsPersistent, Guid, MsgProperties}; false -> MsgStatus end, dict:store(SeqId, AckEntry, PA). @@ -1128,7 +1136,7 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false }, Acc) -> Acc; -accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> +accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProperties}, {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. |
