summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_amqqueue.erl14
-rw-r--r--src/rabbit_amqqueue_process.erl74
-rw-r--r--src/rabbit_backing_queue.erl8
-rw-r--r--src/rabbit_variable_queue.erl70
5 files changed, 121 insertions, 47 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index b9abd78857..be0d4b8233 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -74,6 +74,8 @@
-record(event, {type, props, timestamp}).
+-record(msg_properties, {expiry}).
+
%%----------------------------------------------------------------------------
-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.").
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 0cdb4fff08..d960444faf 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -57,6 +57,7 @@
-include_lib("stdlib/include/qlc.hrl").
-define(EXPIRES_TYPE, long).
+-define(TTL_TYPE, long).
%%----------------------------------------------------------------------------
@@ -308,7 +309,8 @@ check_declare_arguments(QueueName, Args) ->
precondition_failed,
"invalid arg '~s' for ~s: ~w",
[Key, rabbit_misc:rs(QueueName), Error])
- end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}]],
+ end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1},
+ {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]],
ok.
check_expires_argument(undefined) ->
@@ -321,6 +323,16 @@ check_expires_argument({?EXPIRES_TYPE, _Expires}) ->
check_expires_argument(_) ->
{error, expires_not_of_type_long}.
+check_message_ttl_argument(undefined) ->
+ ok;
+check_message_ttl_argument({?TTL_TYPE, TTL})
+ when is_integer(TTL) andalso TTL > 0 ->
+ ok;
+check_message_ttl_argument({?TTL_TYPE, _TTL}) ->
+ {error, ttl_zero_or_less};
+check_message_ttl_argument(_) ->
+ {error, ttl_not_of_type_long}.
+
list(VHostPath) ->
mnesia:dirty_match_object(
rabbit_queue,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2cab7136a6..b6f898cece 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -39,6 +39,8 @@
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(BASE_MSG_PROPERTIES, #msg_properties{expiry = undefined}).
+
-export([start_link/1, info_keys/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
@@ -60,7 +62,8 @@
sync_timer_ref,
rate_timer_ref,
expiry_timer_ref,
- stats_timer
+ stats_timer,
+ ttl
}).
-record(consumer, {tag, ack_required}).
@@ -122,6 +125,7 @@ init(Q) ->
sync_timer_ref = undefined,
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
+ ttl = undefined,
stats_timer = rabbit_event:init_stats_timer()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -144,12 +148,21 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+init_queue_state(State) ->
+ init_expires(init_ttl(State)).
+
init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of
{long, Expires} -> ensure_expiry_timer(State#q{expires = Expires});
undefined -> State
end.
+init_ttl(State = #q{q = #amqqueue{arguments = Arguments}}) ->
+ case rabbit_misc:table_lookup(Arguments, <<"x-message-ttl">>) of
+ {long, Ttl} -> State#q{ttl=Ttl};
+ undefined -> State
+ end.
+
declare(Recover, From,
State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
backing_queue = BQ, backing_queue_state = undefined}) ->
@@ -167,7 +180,7 @@ declare(Recover, From,
queue_created,
[{Item, i(Item, State)} ||
Item <- ?CREATION_EVENT_KEYS]),
- noreply(init_expires(State#q{backing_queue_state = BQS}));
+ noreply(init_queue_state(State#q{backing_queue_state = BQS}));
Q1 -> {stop, normal, {existing, Q1}, State}
end.
@@ -393,7 +406,7 @@ deliver_from_queue_pred(IsEmpty, _State) ->
deliver_from_queue_deliver(AckRequired, false,
State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- {{Message, IsDelivered, AckTag, Remaining}, BQS1} =
+ {{Message, _MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} =
BQ:fetch(AckRequired, BQS),
{{Message, IsDelivered, AckTag}, 0 == Remaining,
State #q { backing_queue_state = BQS1 }}.
@@ -418,7 +431,8 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
record_current_channel_tx(ChPid, Txn),
- {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}.
+ MsgProperties = new_msg_properties(State),
+ {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, MsgProperties, BQS)}}.
deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
case attempt_delivery(Txn, ChPid, Message, State) of
@@ -426,13 +440,28 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
- BQS = BQ:publish(Message, State #q.backing_queue_state),
+ MsgProperties = new_msg_properties(State),
+ BQS = BQ:publish(Message, MsgProperties, State #q.backing_queue_state),
{false, NewState#q{backing_queue_state = BQS}}
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
+ MsgPropsFun = reset_msg_expiry_fun(State),
maybe_run_queue_via_backing_queue(
- fun (BQS) -> BQ:requeue(AckTags, BQS) end, State).
+ fun (BQS) -> BQ:requeue(AckTags, MsgPropsFun, BQS) end, State).
+
+fetch(AckRequired, State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ case BQ:fetch(AckRequired, BQS) of
+ {empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}};
+ {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} ->
+ case msg_expired(MsgProperties) of
+ true ->
+ fetch(AckRequired, State#q{backing_queue_state = BQS1});
+ false ->
+ {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}}
+ end
+ end.
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
@@ -530,7 +559,7 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{AckTags, BQS1} =
- BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS),
+ BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, reset_msg_expiry_fun(State), BQS),
%% ChPid must be known here because of the participant management
%% by the channel.
C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
@@ -549,6 +578,26 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
subtract_acks(A, B) when is_list(B) ->
lists:foldl(fun sets:del_element/2, A, B).
+msg_expired(_MsgProperties = #msg_properties{expiry = undefined}) ->
+ false;
+msg_expired(_MsgProperties = #msg_properties{expiry=Expiry}) ->
+ Now = timer:now_diff(now(), {0,0,0}),
+ Now > Expiry.
+
+reset_msg_expiry_fun(State) ->
+ fun(MsgProps) ->
+ MsgProps#msg_properties{expiry=calculate_msg_expiry(State)}
+ end.
+
+new_msg_properties(State) ->
+ #msg_properties{expiry = calculate_msg_expiry(State)}.
+
+calculate_msg_expiry(_State = #q{ttl = undefined}) ->
+ undefined;
+calculate_msg_expiry(_State = #q{ttl = Ttl}) ->
+ Now = timer:now_diff(now(), {0,0,0}),
+ Now + (Ttl * 1000).
+
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
@@ -670,13 +719,12 @@ handle_call({notify_down, ChPid}, _From, State) ->
end;
handle_call({basic_get, ChPid, NoAck}, _From,
- State = #q{q = #amqqueue{name = QName},
- backing_queue_state = BQS, backing_queue = BQ}) ->
+ State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case BQ:fetch(AckRequired, BQS) of
- {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1});
- {{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
+ case fetch(AckRequired, State1) of
+ {empty, State2} -> reply(empty, State2);
+ {{Message, IsDelivered, AckTag, Remaining}, State2} ->
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
store_ch_record(
@@ -684,7 +732,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
false -> ok
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1})
+ reply({ok, Remaining, Msg}, State2)
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid,
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 2230c507e9..cc7f857188 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -62,7 +62,7 @@ behaviour_info(callbacks) ->
{purge, 1},
%% Publish a message.
- {publish, 2},
+ {publish, 3},
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
@@ -77,7 +77,7 @@ behaviour_info(callbacks) ->
{ack, 2},
%% A publish, but in the context of a transaction.
- {tx_publish, 3},
+ {tx_publish, 4},
%% Acks, but in the context of a transaction.
{tx_ack, 3},
@@ -89,11 +89,11 @@ behaviour_info(callbacks) ->
%% Commit a transaction. The Fun passed in must be called once
%% the messages have really been commited. This CPS permits the
%% possibility of commit coalescing.
- {tx_commit, 3},
+ {tx_commit, 4},
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
- {requeue, 2},
+ {requeue, 3},
%% How long is my queue?
{len, 1},
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0f52eee84f..4e978fd5ad 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -32,9 +32,9 @@
-module(rabbit_variable_queue).
-export([init/3, terminate/1, delete_and_terminate/1,
- purge/1, publish/2, publish_delivered/3, fetch/2, ack/2,
- tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3,
- requeue/2, len/1, is_empty/1,
+ purge/1, publish/3, publish_delivered/3, fetch/2, ack/2,
+ tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
+ requeue/3, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
status/1]).
@@ -248,7 +248,8 @@
is_persistent,
is_delivered,
msg_on_disk,
- index_on_disk
+ index_on_disk,
+ msg_properties
}).
-record(delta,
@@ -490,8 +491,8 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
ram_index_count = 0,
persistent_count = 0 })}.
-publish(Msg, State) ->
- {_SeqId, State1} = publish(Msg, false, false, State),
+publish(Msg, MsgProperties, State) ->
+ {_SeqId, State1} = publish(Msg, MsgProperties, false, false, State),
a(reduce_memory_use(State1)).
publish_delivered(false, _Msg, State = #vqstate { len = 0 }) ->
@@ -505,7 +506,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
pending_ack = PA,
durable = IsDurable }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
+ MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, #msg_properties{}))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
PA1 = record_pending_ack(m(MsgStatus1), PA),
@@ -532,7 +533,8 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
{{value, MsgStatus = #msg_status {
msg = Msg, guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent, is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }},
+ msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk,
+ msg_properties = MsgProperties }},
Q4a} ->
%% 1. Mark it delivered if necessary
@@ -563,7 +565,7 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
Len1 = Len - 1,
- {{Msg, IsDelivered, AckTag, Len1},
+ {{Msg, MsgProperties, IsDelivered, AckTag, Len1},
a(State #vqstate { q4 = Q4a,
ram_msg_count = RamMsgCount - 1,
out_counter = OutCount + 1,
@@ -579,12 +581,13 @@ ack(AckTags, State) ->
AckTags, State)).
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent },
+ MsgProperties,
State = #vqstate { durable = IsDurable,
msg_store_clients = MSCState }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
+ store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProperties} | Pubs] }),
a(case IsPersistent andalso IsDurable of
- true -> MsgStatus = msg_status(true, undefined, Msg),
+ true -> MsgStatus = msg_status(true, undefined, Msg, MsgProperties),
{#msg_status { msg_on_disk = true }, MSCState1} =
maybe_write_msg_to_disk(false, MsgStatus, MSCState),
State #vqstate { msg_store_clients = MSCState1 };
@@ -606,10 +609,14 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable }) ->
end,
{lists:append(AckTags), a(State)}.
-tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
+tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) ->
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
- PubsOrdered = lists:reverse(Pubs),
+ F = fun({Msg, MsgProperties}) ->
+ {Msg, MsgPropsFun(MsgProperties)}
+ end,
+ PubsProcessed = lists:map(F, Pubs),
+ PubsOrdered = lists:reverse(PubsProcessed),
AckTags1 = lists:append(AckTags),
PersistentGuids = persistent_guids(PubsOrdered),
HasPersistentPubs = PersistentGuids =/= [],
@@ -624,18 +631,21 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
HasPersistentPubs, PubsOrdered, AckTags1, Fun, State)
end)}.
-requeue(AckTags, State) ->
+requeue(AckTags, MsgPropsFun, State) ->
a(reduce_memory_use(
ack(fun rabbit_msg_store:release/2,
- fun (#msg_status { msg = Msg }, State1) ->
- {_SeqId, State2} = publish(Msg, true, false, State1),
+ fun (#msg_status { msg = Msg,
+ msg_properties = MsgProperties }, State1) ->
+ {_SeqId, State2} =
+ publish(Msg, MsgPropsFun(MsgProperties), true, false, State1),
State2;
- ({IsPersistent, Guid}, State1) ->
+ ({IsPersistent, Guid, MsgProperties}, State1) ->
#vqstate { msg_store_clients = MSCState } = State1,
{{ok, Msg = #basic_message{}}, MSCState1} =
read_from_msg_store(MSCState, IsPersistent, Guid),
State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, true, true, State2),
+ {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProperties),
+ true, true, State2),
State3
end,
AckTags, State))).
@@ -783,10 +793,11 @@ one_if(false) -> 0.
cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
-msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) ->
+msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgProperties) ->
#msg_status { seq_id = SeqId, guid = Guid, msg = Msg,
is_persistent = IsPersistent, is_delivered = false,
- msg_on_disk = false, index_on_disk = false }.
+ msg_on_disk = false, index_on_disk = false,
+ msg_properties = MsgProperties }.
find_msg_store(true) -> ?PERSISTENT_MSG_STORE;
find_msg_store(false) -> ?TRANSIENT_MSG_STORE.
@@ -914,7 +925,7 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun,
true -> [AckTag || AckTag <- AckTags,
case dict:fetch(AckTag, PA) of
#msg_status {} -> false;
- {IsPersistent, _Guid} -> IsPersistent
+ {IsPersistent, _Guid, _MsgProperties} -> IsPersistent
end];
false -> []
end,
@@ -946,10 +957,10 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
Pubs = lists:append(lists:reverse(SPubs)),
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
- fun (Msg = #basic_message { is_persistent = IsPersistent },
+ fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties},
{SeqIdsAcc, State2}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- {SeqId, State3} = publish(Msg, false, IsPersistent1, State2),
+ {SeqId, State3} = publish(Msg, MsgProperties, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
end, {PAcks, ack(Acks, State)}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
@@ -996,7 +1007,7 @@ remove_queue_entries1(
%%----------------------------------------------------------------------------
publish(Msg = #basic_message { is_persistent = IsPersistent },
- IsDelivered, MsgOnDisk,
+ MsgProperties, IsDelivered, MsgOnDisk,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
next_seq_id = SeqId,
len = Len,
@@ -1005,8 +1016,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
durable = IsDurable,
ram_msg_count = RamMsgCount }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
- #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk },
+ MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties))
+ #msg_status { is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk},
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case bpqueue:is_empty(Q3) of
false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
@@ -1073,9 +1085,9 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk } = MsgStatus, PA) ->
+ msg_on_disk = MsgOnDisk, msg_properties = MsgProperties } = MsgStatus, PA) ->
AckEntry = case MsgOnDisk of
- true -> {IsPersistent, Guid};
+ true -> {IsPersistent, Guid, MsgProperties};
false -> MsgStatus
end,
dict:store(SeqId, AckEntry, PA).
@@ -1128,7 +1140,7 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false }, Acc) ->
Acc;
-accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) ->
+accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProperties}, {SeqIdsAcc, Dict}) ->
{cons_if(IsPersistent, SeqId, SeqIdsAcc),
rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}.