summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-09-21 14:25:19 +0100
committerRob Harrop <rharrop@vmware.com>2010-09-21 14:25:19 +0100
commit95937eccabb50d5e5cc190d1b9f577ccb9a13b51 (patch)
tree2cba2252c78beeb7883c072b3ac4b6e77f0258aa /src
parent42cd6a4e553e44dd09d2d8004cf7ea3eac7da25c (diff)
parent453b60dc6a45ef50209f5831f70bbd2a330ee88f (diff)
downloadrabbitmq-server-git-95937eccabb50d5e5cc190d1b9f577ccb9a13b51.tar.gz
merge with default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl14
-rw-r--r--src/rabbit_amqqueue_process.erl84
-rw-r--r--src/rabbit_backing_queue.erl10
-rw-r--r--src/rabbit_invariable_queue.erl63
-rw-r--r--src/rabbit_queue_index.erl86
-rw-r--r--src/rabbit_tests.erl36
-rw-r--r--src/rabbit_variable_queue.erl93
7 files changed, 263 insertions, 123 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3e677c3809..ec25a871d4 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -57,6 +57,7 @@
-include_lib("stdlib/include/qlc.hrl").
-define(EXPIRES_TYPES, [byte, short, signedint, long]).
+-define(TTL_TYPES, [byte, short, signedint, long]).
%%----------------------------------------------------------------------------
@@ -309,7 +310,8 @@ check_declare_arguments(QueueName, Args) ->
precondition_failed,
"invalid arg '~s' for ~s: ~w",
[Key, rabbit_misc:rs(QueueName), Error])
- end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}]],
+ end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1},
+ {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]],
ok.
check_expires_argument(undefined) ->
@@ -322,6 +324,16 @@ check_expires_argument({Type, Expires}) when Expires > 0 ->
check_expires_argument({_Type, _Expires}) ->
{error, expires_zero_or_less}.
+check_message_ttl_argument(undefined) ->
+ ok;
+check_message_ttl_argument({Type, TTL}) when TTL > 0 ->
+ case lists:member(Type, ?TTL_TYPES) of
+ true -> ok;
+ false -> {error, {ttl_not_of_acceptable_type, Type, TTL}}
+ end;
+check_message_ttl_argument({_Type, _TTL}) ->
+ {error, ttl_zero_or_less}.
+
list(VHostPath) ->
mnesia:dirty_match_object(
rabbit_queue,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d15a6eb3a9..429f664498 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -39,6 +39,8 @@
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(BASE_MSG_PROPERTIES, #msg_properties{expiry = undefined}).
+
-export([start_link/1, info_keys/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
@@ -61,7 +63,8 @@
sync_timer_ref,
rate_timer_ref,
expiry_timer_ref,
- stats_timer
+ stats_timer,
+ ttl
}).
-record(consumer, {tag, ack_required}).
@@ -123,6 +126,7 @@ init(Q) ->
sync_timer_ref = undefined,
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
+ ttl = undefined,
stats_timer = rabbit_event:init_stats_timer()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -145,12 +149,21 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+init_queue_state(State) ->
+ init_expires(init_ttl(State)).
+
init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of
{_Type, Expires} -> ensure_expiry_timer(State#q{expires = Expires});
undefined -> State
end.
+init_ttl(State = #q{q = #amqqueue{arguments = Arguments}}) ->
+ case rabbit_misc:table_lookup(Arguments, <<"x-message-ttl">>) of
+ {_Type, Ttl} -> State#q{ttl=Ttl};
+ undefined -> State
+ end.
+
declare(Recover, From,
State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
backing_queue = BQ, backing_queue_state = undefined,
@@ -165,7 +178,8 @@ declare(Recover, From,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
- State1 = init_expires(State#q{backing_queue_state = BQS}),
+ State1 = init_queue_state(
+ State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(StatsTimer,
@@ -387,13 +401,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
deliver_from_queue_pred(IsEmpty, _State) ->
not IsEmpty.
-deliver_from_queue_deliver(AckRequired, false,
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {{Message, IsDelivered, AckTag, Remaining}, BQS1} =
- BQ:fetch(AckRequired, BQS),
- {{Message, IsDelivered, AckTag}, 0 == Remaining,
- State #q { backing_queue_state = BQS1 }}.
+deliver_from_queue_deliver(AckRequired, false, State) ->
+ {{Message, IsDelivered, AckTag, Remaining}, State1} =
+ fetch(AckRequired, State),
+ {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -415,7 +426,8 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
record_current_channel_tx(ChPid, Txn),
- {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}.
+ MsgProperties = new_msg_properties(State),
+ {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, MsgProperties, BQS)}}.
deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
case attempt_delivery(Txn, ChPid, Message, State) of
@@ -423,13 +435,28 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
- BQS = BQ:publish(Message, State #q.backing_queue_state),
+ MsgProperties = new_msg_properties(State),
+ BQS = BQ:publish(Message, MsgProperties, State #q.backing_queue_state),
{false, NewState#q{backing_queue_state = BQS}}
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
+ MsgPropsFun = reset_msg_expiry_fun(State),
maybe_run_queue_via_backing_queue(
- fun (BQS) -> BQ:requeue(AckTags, BQS) end, State).
+ fun (BQS) -> BQ:requeue(AckTags, MsgPropsFun, BQS) end, State).
+
+fetch(AckRequired, State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ case BQ:fetch(AckRequired, BQS) of
+ {empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}};
+ {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} ->
+ case msg_expired(MsgProperties) of
+ true ->
+ fetch(AckRequired, State#q{backing_queue_state = BQS1});
+ false ->
+ {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}}
+ end
+ end.
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
@@ -527,7 +554,7 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{AckTags, BQS1} =
- BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS),
+ BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, reset_msg_expiry_fun(State), BQS),
%% ChPid must be known here because of the participant management
%% by the channel.
C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
@@ -546,6 +573,26 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
subtract_acks(A, B) when is_list(B) ->
lists:foldl(fun sets:del_element/2, A, B).
+msg_expired(_MsgProperties = #msg_properties{expiry = undefined}) ->
+ false;
+msg_expired(_MsgProperties = #msg_properties{expiry=Expiry}) ->
+ Now = timer:now_diff(now(), {0,0,0}),
+ Now > Expiry.
+
+reset_msg_expiry_fun(State) ->
+ fun(MsgProps) ->
+ MsgProps#msg_properties{expiry=calculate_msg_expiry(State)}
+ end.
+
+new_msg_properties(State) ->
+ #msg_properties{expiry = calculate_msg_expiry(State)}.
+
+calculate_msg_expiry(_State = #q{ttl = undefined}) ->
+ undefined;
+calculate_msg_expiry(_State = #q{ttl = Ttl}) ->
+ Now = timer:now_diff(now(), {0,0,0}),
+ Now + (Ttl * 1000).
+
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
@@ -689,13 +736,12 @@ handle_call({notify_down, ChPid}, _From, State) ->
end;
handle_call({basic_get, ChPid, NoAck}, _From,
- State = #q{q = #amqqueue{name = QName},
- backing_queue_state = BQS, backing_queue = BQ}) ->
+ State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case BQ:fetch(AckRequired, BQS) of
- {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1});
- {{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
+ case fetch(AckRequired, State1) of
+ {empty, State2} -> reply(empty, State2);
+ {{Message, IsDelivered, AckTag, Remaining}, State2} ->
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
store_ch_record(
@@ -703,7 +749,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
false -> ok
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1})
+ reply({ok, Remaining, Msg}, State2)
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid,
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 2230c507e9..5cb7836822 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -62,12 +62,12 @@ behaviour_info(callbacks) ->
{purge, 1},
%% Publish a message.
- {publish, 2},
+ {publish, 3},
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
- {publish_delivered, 3},
+ {publish_delivered, 4},
%% Produce the next message.
{fetch, 2},
@@ -77,7 +77,7 @@ behaviour_info(callbacks) ->
{ack, 2},
%% A publish, but in the context of a transaction.
- {tx_publish, 3},
+ {tx_publish, 4},
%% Acks, but in the context of a transaction.
{tx_ack, 3},
@@ -89,11 +89,11 @@ behaviour_info(callbacks) ->
%% Commit a transaction. The Fun passed in must be called once
%% the messages have really been commited. This CPS permits the
%% possibility of commit coalescing.
- {tx_commit, 3},
+ {tx_commit, 4},
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
- {requeue, 2},
+ {requeue, 3},
%% How long is my queue?
{len, 1},
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 4e0dad8422..3eea7becce 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -31,9 +31,9 @@
-module(rabbit_invariable_queue).
--export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2,
- publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3,
- tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1,
+-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3,
+ publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3,
+ tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1,
idle_timeout/1, handle_pre_hibernate/1, status/1]).
@@ -89,47 +89,52 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
%% We do not purge messages pending acks.
{AckTags, PA} =
rabbit_misc:queue_fold(
- fun ({#basic_message { is_persistent = false }, _IsDelivered}, Acc) ->
+ fun ({#basic_message { is_persistent = false }, _MsgProps, _IsDelivered},
+ Acc) ->
Acc;
- ({Msg = #basic_message { guid = Guid }, IsDelivered},
+ ({Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered},
{AckTagsN, PAN}) ->
ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
- {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)}
+ {[Guid | AckTagsN], store_ack(Msg, MsgProps, PAN)}
end, {[], dict:new()}, Q),
ok = persist_acks(QName, IsDurable, none, AckTags, PA),
{Len, State #iv_state { len = 0, queue = queue:new() }}.
-publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
- len = Len }) ->
+publish(Msg, MsgProps, State = #iv_state { queue = Q,
+ qname = QName,
+ durable = IsDurable,
+ len = Len }) ->
ok = persist_message(QName, IsDurable, none, Msg),
- State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }.
+ Q1 = enqueue(Msg, MsgProps, false, Q),
+ State #iv_state { queue = Q1, len = Len + 1 }.
-publish_delivered(false, _Msg, State) ->
+publish_delivered(false, _Msg, _MsgProps, State) ->
{blank_ack, State};
publish_delivered(true, Msg = #basic_message { guid = Guid },
+ MsgProps,
State = #iv_state { qname = QName, durable = IsDurable,
len = 0, pending_ack = PA }) ->
ok = persist_message(QName, IsDurable, none, Msg),
ok = persist_delivery(QName, IsDurable, false, Msg),
- {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}.
+ {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}.
fetch(_AckRequired, State = #iv_state { len = 0 }) ->
{empty, State};
fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName,
durable = IsDurable,
pending_ack = PA }) ->
- {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} =
- queue:out(Q),
+ {{value, {Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}},
+ Q1} = queue:out(Q),
Len1 = Len - 1,
ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
- PA1 = dict:store(Guid, Msg, PA),
+ PA1 = store_ack(Msg, MsgProps, PA),
{AckTag, PA2} = case AckRequired of
true -> {Guid, PA1};
false -> ok = persist_acks(QName, IsDurable, none,
[Guid], PA1),
{blank_ack, PA}
end,
- {{Msg, IsDelivered, AckTag, Len1},
+ {{Msg, MsgProps, IsDelivered, AckTag, Len1},
State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}.
ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable,
@@ -138,10 +143,10 @@ ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable,
PA1 = remove_acks(AckTags, PA),
State #iv_state { pending_ack = PA1 }.
-tx_publish(Txn, Msg, State = #iv_state { qname = QName,
+tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName,
durable = IsDurable }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
+ store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
ok = persist_message(QName, IsDurable, Txn, Msg),
State.
@@ -159,7 +164,8 @@ tx_rollback(Txn, State = #iv_state { qname = QName }) ->
erase_tx(Txn),
{lists:flatten(AckTags), State}.
-tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA,
+tx_commit(Txn, Fun, MsgPropsFun,
+ State = #iv_state { qname = QName, pending_ack = PA,
queue = Q, len = Len }) ->
#tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn),
ok = do_if_persistent(fun rabbit_persister:commit_transaction/1,
@@ -168,12 +174,14 @@ tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA,
Fun(),
AckTags1 = lists:flatten(AckTags),
PA1 = remove_acks(AckTags1, PA),
- {Q1, Len1} = lists:foldr(fun (Msg, {QN, LenN}) ->
- {queue:in({Msg, false}, QN), LenN + 1}
+ {Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) ->
+ MsgProps1 = MsgPropsFun(MsgProps),
+ QN = enqueue(Msg, MsgProps1, false, Q),
+ {QN, LenN + 1}
end, {Q, Len}, PubsRev),
{AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}.
-requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q,
+requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, queue = Q,
len = Len }) ->
%% We don't need to touch the persister here - the persister will
%% already have these messages published and delivered as
@@ -186,12 +194,18 @@ requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q,
%% order to the last known state of our queue, prior to shutdown.
{Q1, Len1} = lists:foldl(
fun (Guid, {QN, LenN}) ->
- {ok, Msg = #basic_message {}} = dict:find(Guid, PA),
- {queue:in({Msg, true}, QN), LenN + 1}
+ {ok, {Msg = #basic_message {}, MsgProps}}
+ = dict:find(Guid, PA),
+ MsgProps1 = MsgPropsFun(MsgProps),
+ {enqueue(Msg, MsgProps1, true, QN), LenN + 1}
end, {Q, Len}, AckTags),
PA1 = remove_acks(AckTags, PA),
State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }.
+enqueue(Msg, MsgProps, IsDelivered, Q) ->
+ I = {Msg, MsgProps, IsDelivered},
+ queue:in(I, Q).
+
len(#iv_state { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
@@ -212,6 +226,9 @@ status(_State) -> [].
remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags).
+store_ack(Msg = #basic_message { guid = Guid}, MsgProps, PA) ->
+ dict:store(Guid, {Msg, MsgProps}, PA).
+
%%----------------------------------------------------------------------------
lookup_tx(Txn) ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index d6b8bb2889..b5e92dcaad 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,7 +31,7 @@
-module(rabbit_queue_index).
--export([init/4, terminate/2, delete_and_terminate/1, publish/4,
+-export([init/4, terminate/2, delete_and_terminate/1, publish/5,
deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
@@ -98,7 +98,7 @@
%% and seeding the message store on start up.
%%
%% Note that in general, the representation of a message's state as
-%% the tuple: {('no_pub'|{Guid, IsPersistent}), ('del'|'no_del'),
+%% the tuple: {('no_pub'|{Guid, MsgProperties, IsPersistent}), ('del'|'no_del'),
%% ('ack'|'no_ack')} is richer than strictly necessary for most
%% operations. However, for startup, and to ensure the safe and
%% correct combination of journal entries with entries read from the
@@ -141,14 +141,19 @@
-define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2).
%% publish record is binary 1 followed by a bit for is_persistent,
-%% then 14 bits of rel seq id, and 128 bits of md5sum msg id
+%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits
+%% of md5sum msg id
-define(PUBLISH_PREFIX, 1).
-define(PUBLISH_PREFIX_BITS, 1).
+-define(EXPIRY_BYTES, 8).
+-define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)).
+-define(NO_EXPIRY, 0).
+
-define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
-define(GUID_BITS, (?GUID_BYTES * 8)).
-%% 16 bytes for md5sum + 2 for seq, bits and prefix
--define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + 2).
+%% 16 bytes for md5sum + 8 for expiry + 2 for seq, bits and prefix
+-define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + ?EXPIRY_BYTES + 2).
%% 1 publish, 1 deliver, 1 ack per msg
-define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
@@ -157,7 +162,7 @@
%% ---- misc ----
--define(PUB, {_, _}). %% {Guid, IsPersistent}
+-define(PUB, {_, _, _}). %% {Guid, MsgProperties, IsPersistent}
-define(READ_MODE, [binary, raw, read, {read_ahead, ?SEGMENT_TOTAL_SIZE}]).
@@ -198,15 +203,15 @@
{'undefined' | non_neg_integer(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
--spec(publish/4 :: (rabbit_guid:guid(), seq_id(), boolean(), qistate()) ->
- qistate()).
+-spec(publish/5 :: (rabbit_guid:guid(), seq_id(), msg_properties(),
+ boolean(), qistate()) -> qistate()).
-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
- {[{rabbit_guid:guid(), seq_id(), boolean(), boolean()}],
- qistate()}).
+ {[{rabbit_guid:guid(), seq_id(), msg_properties(),
+ boolean(), boolean()}], qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
@@ -245,15 +250,18 @@ delete_and_terminate(State) ->
ok = rabbit_misc:recursive_delete([Dir]),
State1.
-publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) ->
+publish(Guid, SeqId, MsgProperties, IsPersistent, State)
+ when is_binary(Guid) ->
?GUID_BYTES = size(Guid),
{JournalHdl, State1} = get_journal_handle(State),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
false -> ?PUB_TRANS_JPREFIX
- end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]),
- maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)).
+ end):?JPREFIX_BITS,
+ SeqId:?SEQ_BITS>>,
+ create_pub_record_body(Guid, MsgProperties)]),
+ maybe_flush_journal(add_to_journal(SeqId, {Guid, MsgProperties, IsPersistent}, State1)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -282,7 +290,7 @@ flush(State) -> flush_journal(State).
read(StartEnd, StartEnd, State) ->
{[], State};
-read(Start, End, State = #qistate { segments = Segments,
+read(Start, End, State = #qistate { segments = Segments,
dir = Dir }) when Start =< End ->
%% Start is inclusive, End is exclusive.
LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start),
@@ -449,7 +457,7 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment1) ->
+ fun (RelSeq, {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack}, Segment1) ->
recover_message(ContainsCheckFun(Guid), CleanShutdown,
Del, RelSeq, Segment1)
end,
@@ -502,7 +510,7 @@ queue_index_walker_reader(QueueName, Gatherer) ->
State = #qistate { segments = Segments, dir = Dir } =
recover_journal(blank_state(QueueName, false)),
[ok = segment_entries_foldr(
- fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) ->
+ fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, ok) ->
gatherer:in(Gatherer, {Guid, 1});
(_RelSeq, _Value, Acc) ->
Acc
@@ -512,6 +520,28 @@ queue_index_walker_reader(QueueName, Gatherer) ->
ok = gatherer:finish(Gatherer).
%%----------------------------------------------------------------------------
+%% expiry/binary manipulation
+%%----------------------------------------------------------------------------
+
+create_pub_record_body(Guid, #msg_properties{expiry = Expiry}) ->
+ [Guid, expiry_to_binary(Expiry)].
+
+expiry_to_binary(undefined) ->
+ <<?NO_EXPIRY:?EXPIRY_BITS>>;
+expiry_to_binary(Expiry) ->
+ <<Expiry:?EXPIRY_BITS>>.
+
+read_pub_record_body(Hdl) ->
+ {ok, Bin} = file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES),
+ <<GuidNum:?GUID_BITS, Expiry:?EXPIRY_BITS>> = Bin,
+ <<Guid:?GUID_BYTES/binary>> = <<GuidNum:?GUID_BITS>>,
+ Exp = case Expiry of
+ ?NO_EXPIRY -> undefined;
+ X -> X
+ end,
+ {Guid, #msg_properties{expiry = Exp}}.
+
+%%----------------------------------------------------------------------------
%% journal manipulation
%%----------------------------------------------------------------------------
@@ -631,14 +661,9 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
?ACK_JPREFIX ->
load_journal_entries(add_to_journal(SeqId, ack, State));
_ ->
- case file_handle_cache:read(Hdl, ?GUID_BYTES) of
- {ok, <<GuidNum:?GUID_BITS>>} ->
- %% work around for binary data
- %% fragmentation. See
- %% rabbit_msg_file:read_next/2
- <<Guid:?GUID_BYTES/binary>> =
- <<GuidNum:?GUID_BITS>>,
- Publish = {Guid, case Prefix of
+ case read_pub_record_body(Hdl) of
+ {Guid, MsgProperties} ->
+ Publish = {Guid, MsgProperties, case Prefix of
?PUB_PERSIST_JPREFIX -> true;
?PUB_TRANS_JPREFIX -> false
end},
@@ -739,11 +764,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
ok = case Pub of
no_pub ->
ok;
- {Guid, IsPersistent} ->
+ {Guid, MsgProperties, IsPersistent} ->
file_handle_cache:append(
Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
(bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS>>, Guid])
+ RelSeq:?REL_SEQ_BITS>>,
+ create_pub_record_body(Guid, MsgProperties)])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->
@@ -763,10 +789,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
{Messages, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
{segment_entries_foldr(
- fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc)
+ fun (RelSeq, {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack}, Acc)
when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
(Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
- [ {Guid, reconstruct_seq_id(StartSeg, RelSeq),
+ [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProperties,
IsPersistent, IsDelivered == del} | Acc ];
(_RelSeq, _Value, Acc) ->
Acc
@@ -798,8 +824,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) ->
IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
%% because we specify /binary, and binaries are complete
%% bytes, the size spec is in bytes, not bits.
- {ok, Guid} = file_handle_cache:read(Hdl, ?GUID_BYTES),
- Obj = {{Guid, 1 == IsPersistentNum}, no_del, no_ack},
+ {Guid, MsgProperties} = read_pub_record_body(Hdl),
+ Obj = {{Guid, MsgProperties, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
load_segment_entries(KeepAcked, Hdl, SegEntries1,
UnackedCount + 1);
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a72656b73b..fdead8f9e0 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1414,6 +1414,7 @@ test_backing_queue() ->
application:set_env(rabbit, msg_store_file_size_limit,
FileSizeLimit, infinity),
passed = test_queue_index(),
+ passed = test_queue_index_props(),
passed = test_variable_queue(),
passed = test_queue_recover(),
application:set_env(rabbit, queue_index_max_journal_entries,
@@ -1639,7 +1640,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
fun (SeqId, {QiN, SeqIdsGuidsAcc, MSCStateN}) ->
Guid = rabbit_guid:guid(),
QiM = rabbit_queue_index:publish(
- Guid, SeqId, Persistent, QiN),
+ Guid, SeqId, #msg_properties{}, Persistent, QiN),
{ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid,
Guid, MSCStateN),
{QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM}
@@ -1651,12 +1652,27 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
verify_read_with_published(_Delivered, _Persistent, [], _) ->
ok;
verify_read_with_published(Delivered, Persistent,
- [{Guid, SeqId, Persistent, Delivered}|Read],
+ [{Guid, SeqId, _Props, Persistent, Delivered}|Read],
[{SeqId, Guid}|Published]) ->
verify_read_with_published(Delivered, Persistent, Read, Published);
verify_read_with_published(_Delivered, _Persistent, _Read, _Published) ->
ko.
+test_queue_index_props() ->
+ with_empty_test_queue(
+ fun(Qi0) ->
+ Guid = rabbit_guid:guid(),
+ Props = #msg_properties{expiry=12345},
+ Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0),
+ {[{Guid, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1),
+ Qi2
+ end),
+
+ ok = rabbit_variable_queue:stop(),
+ ok = rabbit_variable_queue:start([]),
+
+ passed.
+
test_queue_index() ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
TwoSegs = SegmentSize + SegmentSize,
@@ -1786,18 +1802,19 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
fun (_N, VQN) ->
rabbit_variable_queue:publish(
rabbit_basic:message(
- rabbit_misc:r(<<>>, exchange, <<>>),
+ rabbit_misc:r(<<>>, exchange, <<>>),
<<>>, #'P_basic'{delivery_mode = case IsPersistent of
true -> 2;
false -> 1
- end}, <<>>), VQN)
+ end}, <<>>),
+ #msg_properties{}, VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
Rem = Len - N,
{{#basic_message { is_persistent = IsPersistent },
- IsDelivered, AckTagN, Rem}, VQM} =
+ _Props, IsDelivered, AckTagN, Rem}, VQM} =
rabbit_variable_queue:fetch(true, VQN),
{VQM, [AckTagN | AckTagsAcc]}
end, {VQ, []}, lists:seq(1, Count)).
@@ -1837,6 +1854,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% squeeze and relax queue
Churn = Len div 32,
VQ2 = publish_fetch_and_ack(Churn, Len, VQ1),
+
{Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2),
VQ7 = lists:foldl(
fun (Duration1, VQ4) ->
@@ -1859,7 +1877,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
VQ0;
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
- {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
+ {{_Msg, _MsgProps, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -1923,7 +1941,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
- {{_Msg1, true, _AckTag1, Count1}, VQ8} =
+ {{_Msg1, _Props, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9),
@@ -1935,7 +1953,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(false, 4, VQ1),
{VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2),
- VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3),
+ VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
@@ -1969,7 +1987,7 @@ test_queue_recover() ->
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
VQ1 = rabbit_variable_queue:init(QName, true, true),
- {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
+ {{_Msg1, _Props, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
rabbit_amqqueue:internal_delete(QName)
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 30d3a8aec1..ddcf958f8c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -32,9 +32,9 @@
-module(rabbit_variable_queue).
-export([init/3, terminate/1, delete_and_terminate/1,
- purge/1, publish/2, publish_delivered/3, fetch/2, ack/2,
- tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3,
- requeue/2, len/1, is_empty/1,
+ purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
+ tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
+ requeue/3, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
status/1]).
@@ -248,7 +248,8 @@
is_persistent,
is_delivered,
msg_on_disk,
- index_on_disk
+ index_on_disk,
+ msg_properties
}).
-record(delta,
@@ -490,13 +491,14 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
ram_index_count = 0,
persistent_count = 0 })}.
-publish(Msg, State) ->
- {_SeqId, State1} = publish(Msg, false, false, State),
+publish(Msg, MsgProperties, State) ->
+ {_SeqId, State1} = publish(Msg, MsgProperties, false, false, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, _Msg, State = #vqstate { len = 0 }) ->
+publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) ->
{blank_ack, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
+ MsgProps,
State = #vqstate { len = 0,
next_seq_id = SeqId,
out_counter = OutCount,
@@ -505,7 +507,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
pending_ack = PA,
durable = IsDurable }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
+ MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
PA1 = record_pending_ack(m(MsgStatus1), PA),
@@ -532,7 +534,8 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
{{value, MsgStatus = #msg_status {
msg = Msg, guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent, is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }},
+ msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk,
+ msg_properties = MsgProperties }},
Q4a} ->
%% 1. Mark it delivered if necessary
@@ -563,7 +566,7 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
Len1 = Len - 1,
- {{Msg, IsDelivered, AckTag, Len1},
+ {{Msg, MsgProperties, IsDelivered, AckTag, Len1},
a(State #vqstate { q4 = Q4a,
ram_msg_count = RamMsgCount - 1,
out_counter = OutCount + 1,
@@ -579,12 +582,13 @@ ack(AckTags, State) ->
AckTags, State)).
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent },
+ MsgProperties,
State = #vqstate { durable = IsDurable,
msg_store_clients = MSCState }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
+ store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProperties} | Pubs] }),
a(case IsPersistent andalso IsDurable of
- true -> MsgStatus = msg_status(true, undefined, Msg),
+ true -> MsgStatus = msg_status(true, undefined, Msg, MsgProperties),
{#msg_status { msg_on_disk = true }, MSCState1} =
maybe_write_msg_to_disk(false, MsgStatus, MSCState),
State #vqstate { msg_store_clients = MSCState1 };
@@ -606,10 +610,14 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable }) ->
end,
{lists:append(AckTags), a(State)}.
-tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
+tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) ->
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
- PubsOrdered = lists:reverse(Pubs),
+ F = fun({Msg, MsgProperties}) ->
+ {Msg, MsgPropsFun(MsgProperties)}
+ end,
+ PubsProcessed = lists:map(F, Pubs),
+ PubsOrdered = lists:reverse(PubsProcessed),
AckTags1 = lists:append(AckTags),
PersistentGuids = persistent_guids(PubsOrdered),
HasPersistentPubs = PersistentGuids =/= [],
@@ -624,18 +632,21 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
HasPersistentPubs, PubsOrdered, AckTags1, Fun, State)
end)}.
-requeue(AckTags, State) ->
+requeue(AckTags, MsgPropsFun, State) ->
a(reduce_memory_use(
ack(fun rabbit_msg_store:release/2,
- fun (#msg_status { msg = Msg }, State1) ->
- {_SeqId, State2} = publish(Msg, true, false, State1),
+ fun (#msg_status { msg = Msg,
+ msg_properties = MsgProperties }, State1) ->
+ {_SeqId, State2} =
+ publish(Msg, MsgPropsFun(MsgProperties), true, false, State1),
State2;
- ({IsPersistent, Guid}, State1) ->
+ ({IsPersistent, Guid, MsgProperties}, State1) ->
#vqstate { msg_store_clients = MSCState } = State1,
{{ok, Msg = #basic_message{}}, MSCState1} =
read_from_msg_store(MSCState, IsPersistent, Guid),
State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, true, true, State2),
+ {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProperties),
+ true, true, State2),
State3
end,
AckTags, State))).
@@ -783,10 +794,11 @@ one_if(false) -> 0.
cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
-msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) ->
+msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgProperties) ->
#msg_status { seq_id = SeqId, guid = Guid, msg = Msg,
is_persistent = IsPersistent, is_delivered = false,
- msg_on_disk = false, index_on_disk = false }.
+ msg_on_disk = false, index_on_disk = false,
+ msg_properties = MsgProperties }.
find_msg_store(true) -> ?PERSISTENT_MSG_STORE;
find_msg_store(false) -> ?TRANSIENT_MSG_STORE.
@@ -821,12 +833,13 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx).
erase_tx(Txn) -> erase({txn, Txn}).
persistent_guids(Pubs) ->
- [Guid || #basic_message { guid = Guid, is_persistent = true } <- Pubs].
+ [Guid ||
+ {#basic_message { guid = Guid, is_persistent = true }, _MsgProps} <- Pubs].
betas_from_index_entries(List, TransientThreshold, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
- fun ({Guid, SeqId, IsPersistent, IsDelivered},
+ fun ({Guid, SeqId, MsgProperties, IsPersistent, IsDelivered},
{Filtered1, Delivers1, Acks1}) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
@@ -838,7 +851,8 @@ betas_from_index_entries(List, TransientThreshold, IndexState) ->
is_persistent = IsPersistent,
is_delivered = IsDelivered,
msg_on_disk = true,
- index_on_disk = true
+ index_on_disk = true,
+ msg_properties = MsgProperties
}) | Filtered1],
Delivers1,
Acks1}
@@ -914,7 +928,7 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun,
true -> [AckTag || AckTag <- AckTags,
case dict:fetch(AckTag, PA) of
#msg_status {} -> false;
- {IsPersistent, _Guid} -> IsPersistent
+ {IsPersistent, _Guid, _MsgProperties} -> IsPersistent
end];
false -> []
end,
@@ -946,10 +960,10 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
Pubs = lists:append(lists:reverse(SPubs)),
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
- fun (Msg = #basic_message { is_persistent = IsPersistent },
+ fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties},
{SeqIdsAcc, State2}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- {SeqId, State3} = publish(Msg, false, IsPersistent1, State2),
+ {SeqId, State3} = publish(Msg, MsgProperties, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
end, {PAcks, ack(Acks, State)}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
@@ -996,7 +1010,7 @@ remove_queue_entries1(
%%----------------------------------------------------------------------------
publish(Msg = #basic_message { is_persistent = IsPersistent },
- IsDelivered, MsgOnDisk,
+ MsgProperties, IsDelivered, MsgOnDisk,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
next_seq_id = SeqId,
len = Len,
@@ -1005,8 +1019,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
durable = IsDurable,
ram_msg_count = RamMsgCount }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
- #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk },
+ MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties))
+ #msg_status { is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk},
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case bpqueue:is_empty(Q3) of
false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
@@ -1045,12 +1060,18 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
- guid = Guid, seq_id = SeqId,
+ guid = Guid,
+ seq_id = SeqId,
is_persistent = IsPersistent,
- is_delivered = IsDelivered }, IndexState)
+ is_delivered = IsDelivered,
+ msg_properties = MsgProperties},
+ IndexState)
when Force orelse IsPersistent ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
- IndexState1 = rabbit_queue_index:publish(Guid, SeqId, IsPersistent,
+ IndexState1 = rabbit_queue_index:publish(Guid,
+ SeqId,
+ MsgProperties,
+ IsPersistent,
IndexState),
{MsgStatus #msg_status { index_on_disk = true },
maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
@@ -1073,9 +1094,9 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk } = MsgStatus, PA) ->
+ msg_on_disk = MsgOnDisk, msg_properties = MsgProperties } = MsgStatus, PA) ->
AckEntry = case MsgOnDisk of
- true -> {IsPersistent, Guid};
+ true -> {IsPersistent, Guid, MsgProperties};
false -> MsgStatus
end,
dict:store(SeqId, AckEntry, PA).
@@ -1128,7 +1149,7 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false }, Acc) ->
Acc;
-accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) ->
+accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProperties}, {SeqIdsAcc, Dict}) ->
{cons_if(IsPersistent, SeqId, SeqIdsAcc),
rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}.