summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_amqqueue_process.erl42
-rw-r--r--src/rabbit_backing_queue.erl8
-rw-r--r--src/rabbit_variable_queue.erl60
4 files changed, 75 insertions, 37 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index baa2d72110..be0d4b8233 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -74,7 +74,7 @@
-record(event, {type, props, timestamp}).
--record(msg_properties, {ttl}).
+-record(msg_properties, {expiry}).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d52660c5ac..2cee51f70d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -39,6 +39,8 @@
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(BASE_MSG_PROPERTIES, #msg_properties{expiry = undefined}).
+
-export([start_link/1, info_keys/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
@@ -60,7 +62,8 @@
sync_timer_ref,
rate_timer_ref,
expiry_timer_ref,
- stats_timer
+ stats_timer,
+ ttl
}).
-record(consumer, {tag, ack_required}).
@@ -122,6 +125,7 @@ init(Q) ->
sync_timer_ref = undefined,
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
+ ttl = undefined,
stats_timer = rabbit_event:init_stats_timer()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -144,12 +148,21 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+init_queue_state(State) ->
+ init_expires(init_ttl(State)).
+
init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of
{long, Expires} -> ensure_expiry_timer(State#q{expires = Expires});
undefined -> State
end.
+init_ttl(State = #q{q = #amqqueue{arguments = Arguments}}) ->
+ case rabbit_misc:table_lookup(Arguments, <<"x-message-ttl">>) of
+ {long, Ttl} -> State#q{ttl=Ttl};
+ undefined -> State
+ end.
+
declare(Recover, From,
State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
backing_queue = BQ, backing_queue_state = undefined}) ->
@@ -167,7 +180,7 @@ declare(Recover, From,
queue_created,
[{Item, i(Item, State)} ||
Item <- ?CREATION_EVENT_KEYS]),
- noreply(init_expires(State#q{backing_queue_state = BQS}));
+ noreply(init_queue_state(State#q{backing_queue_state = BQS}));
Q1 -> {stop, normal, {existing, Q1}, State}
end.
@@ -418,7 +431,8 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
record_current_channel_tx(ChPid, Txn),
- {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}.
+ MsgProperties = new_msg_properties(State),
+ {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, MsgProperties, BQS)}}.
deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
case attempt_delivery(Txn, ChPid, Message, State) of
@@ -426,13 +440,15 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
- BQS = BQ:publish(Message, State #q.backing_queue_state),
+ MsgProperties = new_msg_properties(State),
+ BQS = BQ:publish(Message, MsgProperties, State #q.backing_queue_state),
{false, NewState#q{backing_queue_state = BQS}}
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
+ MsgPropsFun = reset_msg_expiry_fun(State),
maybe_run_queue_via_backing_queue(
- fun (BQS) -> BQ:requeue(AckTags, BQS) end, State).
+ fun (BQS) -> BQ:requeue(AckTags, MsgPropsFun, BQS) end, State).
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
@@ -530,7 +546,7 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{AckTags, BQS1} =
- BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS),
+ BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, reset_msg_expiry_fun(State), BQS),
%% ChPid must be known here because of the participant management
%% by the channel.
C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
@@ -549,6 +565,20 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
subtract_acks(A, B) when is_list(B) ->
lists:foldl(fun sets:del_element/2, A, B).
+reset_msg_expiry_fun(State) ->
+ fun(MsgProps) ->
+ MsgProps#msg_properties{expiry=calculate_msg_expiry(State)}
+ end.
+
+new_msg_properties(State) ->
+ #msg_properties{expiry = calculate_msg_expiry(State)}.
+
+calculate_msg_expiry(_State = #q{ttl = undefined}) ->
+ undefined;
+calculate_msg_expiry(_State = #q{ttl = Ttl}) ->
+ Now = timer:now_diff(now(), {0,0,0}),
+ Now - (Ttl * 1000).
+
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 2230c507e9..cc7f857188 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -62,7 +62,7 @@ behaviour_info(callbacks) ->
{purge, 1},
%% Publish a message.
- {publish, 2},
+ {publish, 3},
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
@@ -77,7 +77,7 @@ behaviour_info(callbacks) ->
{ack, 2},
%% A publish, but in the context of a transaction.
- {tx_publish, 3},
+ {tx_publish, 4},
%% Acks, but in the context of a transaction.
{tx_ack, 3},
@@ -89,11 +89,11 @@ behaviour_info(callbacks) ->
%% Commit a transaction. The Fun passed in must be called once
%% the messages have really been commited. This CPS permits the
%% possibility of commit coalescing.
- {tx_commit, 3},
+ {tx_commit, 4},
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
- {requeue, 2},
+ {requeue, 3},
%% How long is my queue?
{len, 1},
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0f52eee84f..e0bc75b7e6 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -32,9 +32,9 @@
-module(rabbit_variable_queue).
-export([init/3, terminate/1, delete_and_terminate/1,
- purge/1, publish/2, publish_delivered/3, fetch/2, ack/2,
- tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3,
- requeue/2, len/1, is_empty/1,
+ purge/1, publish/3, publish_delivered/3, fetch/2, ack/2,
+ tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
+ requeue/3, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
status/1]).
@@ -248,7 +248,8 @@
is_persistent,
is_delivered,
msg_on_disk,
- index_on_disk
+ index_on_disk,
+ msg_properties
}).
-record(delta,
@@ -490,8 +491,8 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
ram_index_count = 0,
persistent_count = 0 })}.
-publish(Msg, State) ->
- {_SeqId, State1} = publish(Msg, false, false, State),
+publish(Msg, MsgProperties, State) ->
+ {_SeqId, State1} = publish(Msg, MsgProperties, false, false, State),
a(reduce_memory_use(State1)).
publish_delivered(false, _Msg, State = #vqstate { len = 0 }) ->
@@ -505,7 +506,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
pending_ack = PA,
durable = IsDurable }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
+ MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, #msg_properties{}))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
PA1 = record_pending_ack(m(MsgStatus1), PA),
@@ -579,12 +580,13 @@ ack(AckTags, State) ->
AckTags, State)).
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent },
+ MsgProperties,
State = #vqstate { durable = IsDurable,
msg_store_clients = MSCState }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
+ store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProperties} | Pubs] }),
a(case IsPersistent andalso IsDurable of
- true -> MsgStatus = msg_status(true, undefined, Msg),
+ true -> MsgStatus = msg_status(true, undefined, Msg, MsgProperties),
{#msg_status { msg_on_disk = true }, MSCState1} =
maybe_write_msg_to_disk(false, MsgStatus, MSCState),
State #vqstate { msg_store_clients = MSCState1 };
@@ -606,7 +608,7 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable }) ->
end,
{lists:append(AckTags), a(State)}.
-tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
+tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) ->
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
PubsOrdered = lists:reverse(Pubs),
@@ -624,18 +626,21 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
HasPersistentPubs, PubsOrdered, AckTags1, Fun, State)
end)}.
-requeue(AckTags, State) ->
+requeue(AckTags, MsgPropsFun, State) ->
a(reduce_memory_use(
ack(fun rabbit_msg_store:release/2,
- fun (#msg_status { msg = Msg }, State1) ->
- {_SeqId, State2} = publish(Msg, true, false, State1),
+ fun (#msg_status { msg = Msg,
+ msg_properties = MsgProperties }, State1) ->
+ {_SeqId, State2} =
+ publish(Msg, MsgPropsFun(MsgProperties), true, false, State1),
State2;
- ({IsPersistent, Guid}, State1) ->
+ ({IsPersistent, Guid, MsgProperties}, State1) ->
#vqstate { msg_store_clients = MSCState } = State1,
{{ok, Msg = #basic_message{}}, MSCState1} =
read_from_msg_store(MSCState, IsPersistent, Guid),
State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, true, true, State2),
+ {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProperties),
+ true, true, State2),
State3
end,
AckTags, State))).
@@ -783,10 +788,11 @@ one_if(false) -> 0.
cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
-msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) ->
+msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgProperties) ->
#msg_status { seq_id = SeqId, guid = Guid, msg = Msg,
is_persistent = IsPersistent, is_delivered = false,
- msg_on_disk = false, index_on_disk = false }.
+ msg_on_disk = false, index_on_disk = false,
+ msg_properties = MsgProperties }.
find_msg_store(true) -> ?PERSISTENT_MSG_STORE;
find_msg_store(false) -> ?TRANSIENT_MSG_STORE.
@@ -914,7 +920,7 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun,
true -> [AckTag || AckTag <- AckTags,
case dict:fetch(AckTag, PA) of
#msg_status {} -> false;
- {IsPersistent, _Guid} -> IsPersistent
+ {IsPersistent, _Guid, _MsgProperties} -> IsPersistent
end];
false -> []
end,
@@ -946,10 +952,11 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
Pubs = lists:append(lists:reverse(SPubs)),
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
- fun (Msg = #basic_message { is_persistent = IsPersistent },
+ fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties},
{SeqIdsAcc, State2}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- {SeqId, State3} = publish(Msg, false, IsPersistent1, State2),
+ MsgProperties1 = MsgProperties,
+ {SeqId, State3} = publish(Msg, MsgProperties1, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
end, {PAcks, ack(Acks, State)}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
@@ -996,7 +1003,7 @@ remove_queue_entries1(
%%----------------------------------------------------------------------------
publish(Msg = #basic_message { is_persistent = IsPersistent },
- IsDelivered, MsgOnDisk,
+ MsgProperties, IsDelivered, MsgOnDisk,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
next_seq_id = SeqId,
len = Len,
@@ -1005,8 +1012,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
durable = IsDurable,
ram_msg_count = RamMsgCount }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
- #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk },
+ MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties))
+ #msg_status { is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk},
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case bpqueue:is_empty(Q3) of
false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
@@ -1073,9 +1081,9 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk } = MsgStatus, PA) ->
+ msg_on_disk = MsgOnDisk, msg_properties = MsgProperties } = MsgStatus, PA) ->
AckEntry = case MsgOnDisk of
- true -> {IsPersistent, Guid};
+ true -> {IsPersistent, Guid, MsgProperties};
false -> MsgStatus
end,
dict:store(SeqId, AckEntry, PA).
@@ -1128,7 +1136,7 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false }, Acc) ->
Acc;
-accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) ->
+accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProperties}, {SeqIdsAcc, Dict}) ->
{cons_if(IsPersistent, SeqId, SeqIdsAcc),
rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}.