diff options
| -rw-r--r-- | include/rabbit.hrl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 79 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 80 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 38 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 87 |
7 files changed, 215 insertions, 93 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 24aa8d987c..0f0a0e87fd 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -74,6 +74,8 @@ -record(event, {type, props, timestamp}). +-record(msg_properties, {expiry}). + %%---------------------------------------------------------------------------- -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3e677c3809..ec25a871d4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -57,6 +57,7 @@ -include_lib("stdlib/include/qlc.hrl"). -define(EXPIRES_TYPES, [byte, short, signedint, long]). +-define(TTL_TYPES, [byte, short, signedint, long]). %%---------------------------------------------------------------------------- @@ -309,7 +310,8 @@ check_declare_arguments(QueueName, Args) -> precondition_failed, "invalid arg '~s' for ~s: ~w", [Key, rabbit_misc:rs(QueueName), Error]) - end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}]], + end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}, + {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]], ok. check_expires_argument(undefined) -> @@ -322,6 +324,16 @@ check_expires_argument({Type, Expires}) when Expires > 0 -> check_expires_argument({_Type, _Expires}) -> {error, expires_zero_or_less}. +check_message_ttl_argument(undefined) -> + ok; +check_message_ttl_argument({Type, TTL}) when TTL > 0 -> + case lists:member(Type, ?TTL_TYPES) of + true -> ok; + false -> {error, {ttl_not_of_acceptable_type, Type, TTL}} + end; +check_message_ttl_argument({_Type, _TTL}) -> + {error, ttl_zero_or_less}. + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 91877efbd7..8d5699bee2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,6 +39,8 @@ -define(SYNC_INTERVAL, 5). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(BASE_MSG_PROPERTIES, #msg_properties{expiry = undefined}). + -export([start_link/1, info_keys/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, @@ -61,7 +63,8 @@ sync_timer_ref, rate_timer_ref, expiry_timer_ref, - stats_timer + stats_timer, + ttl }). -record(consumer, {tag, ack_required}). @@ -123,6 +126,7 @@ init(Q) -> sync_timer_ref = undefined, rate_timer_ref = undefined, expiry_timer_ref = undefined, + ttl = undefined, stats_timer = rabbit_event:init_stats_timer()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -145,12 +149,21 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +init_queue_state(State) -> + init_expires(init_ttl(State)). + init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of {_Type, Expires} -> ensure_expiry_timer(State#q{expires = Expires}); undefined -> State end. +init_ttl(State = #q{q = #amqqueue{arguments = Arguments}}) -> + case rabbit_misc:table_lookup(Arguments, <<"x-message-ttl">>) of + {_Type, Ttl} -> State#q{ttl=Ttl}; + undefined -> State + end. + declare(Recover, From, State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, backing_queue = BQ, backing_queue_state = undefined}) -> @@ -166,7 +179,7 @@ declare(Recover, From, BQS = BQ:init(QName, IsDurable, Recover), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), - noreply(init_expires(State#q{backing_queue_state = BQS})); + noreply(init_queue_state(State#q{backing_queue_state = BQS})); Q1 -> {stop, normal, {existing, Q1}, State} end. @@ -392,10 +405,9 @@ deliver_from_queue_pred(IsEmpty, _State) -> deliver_from_queue_deliver(AckRequired, false, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {{Message, IsDelivered, AckTag, Remaining}, BQS1} = - BQ:fetch(AckRequired, BQS), - {{Message, IsDelivered, AckTag}, 0 == Remaining, - State #q { backing_queue_state = BQS1 }}. + {{Message, IsDelivered, AckTag, Remaining}, State1} = + fetch(AckRequired, State), + {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Funs = {fun deliver_from_queue_pred/2, @@ -417,7 +429,8 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), - {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. + MsgProperties = new_msg_properties(State), + {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, MsgProperties, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of @@ -425,13 +438,28 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - BQS = BQ:publish(Message, State #q.backing_queue_state), + MsgProperties = new_msg_properties(State), + BQS = BQ:publish(Message, MsgProperties, State #q.backing_queue_state), {false, NewState#q{backing_queue_state = BQS}} end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> + MsgPropsFun = reset_msg_expiry_fun(State), maybe_run_queue_via_backing_queue( - fun (BQS) -> BQ:requeue(AckTags, BQS) end, State). + fun (BQS) -> BQ:requeue(AckTags, MsgPropsFun, BQS) end, State). + +fetch(AckRequired, State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + case BQ:fetch(AckRequired, BQS) of + {empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}}; + {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} -> + case msg_expired(MsgProperties) of + true -> + fetch(AckRequired, State#q{backing_queue_state = BQS1}); + false -> + {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}} + end + end. add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). @@ -529,7 +557,7 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {AckTags, BQS1} = - BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS), + BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, reset_msg_expiry_fun(State), BQS), %% ChPid must be known here because of the participant management %% by the channel. C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), @@ -548,6 +576,26 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). +msg_expired(_MsgProperties = #msg_properties{expiry = undefined}) -> + false; +msg_expired(_MsgProperties = #msg_properties{expiry=Expiry}) -> + Now = timer:now_diff(now(), {0,0,0}), + Now > Expiry. + +reset_msg_expiry_fun(State) -> + fun(MsgProps) -> + MsgProps#msg_properties{expiry=calculate_msg_expiry(State)} + end. + +new_msg_properties(State) -> + #msg_properties{expiry = calculate_msg_expiry(State)}. + +calculate_msg_expiry(_State = #q{ttl = undefined}) -> + undefined; +calculate_msg_expiry(_State = #q{ttl = Ttl}) -> + Now = timer:now_diff(now(), {0,0,0}), + Now + (Ttl * 1000). + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(name, #q{q = #amqqueue{name = Name}}) -> Name; @@ -691,13 +739,12 @@ handle_call({notify_down, ChPid}, _From, State) -> end; handle_call({basic_get, ChPid, NoAck}, _From, - State = #q{q = #amqqueue{name = QName}, - backing_queue_state = BQS, backing_queue = BQ}) -> + State = #q{q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), - case BQ:fetch(AckRequired, BQS) of - {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1}); - {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> + case fetch(AckRequired, State1) of + {empty, State2} -> reply(empty, State2); + {{Message, IsDelivered, AckTag, Remaining}, State2} -> case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), store_ch_record( @@ -705,7 +752,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, false -> ok end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1}) + reply({ok, Remaining, Msg}, State2) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 2230c507e9..cc7f857188 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -62,7 +62,7 @@ behaviour_info(callbacks) -> {purge, 1}, %% Publish a message. - {publish, 2}, + {publish, 3}, %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls @@ -77,7 +77,7 @@ behaviour_info(callbacks) -> {ack, 2}, %% A publish, but in the context of a transaction. - {tx_publish, 3}, + {tx_publish, 4}, %% Acks, but in the context of a transaction. {tx_ack, 3}, @@ -89,11 +89,11 @@ behaviour_info(callbacks) -> %% Commit a transaction. The Fun passed in must be called once %% the messages have really been commited. This CPS permits the %% possibility of commit coalescing. - {tx_commit, 3}, + {tx_commit, 4}, %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. - {requeue, 2}, + {requeue, 3}, %% How long is my queue? {len, 1}, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index d6b8bb2889..d0b5d31ae9 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/4, terminate/2, delete_and_terminate/1, publish/4, +-export([init/4, terminate/2, delete_and_terminate/1, publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). @@ -98,7 +98,7 @@ %% and seeding the message store on start up. %% %% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{Guid, IsPersistent}), ('del'|'no_del'), +%% the tuple: {('no_pub'|{Guid, MsgProperties, IsPersistent}), ('del'|'no_del'), %% ('ack'|'no_ack')} is richer than strictly necessary for most %% operations. However, for startup, and to ensure the safe and %% correct combination of journal entries with entries read from the @@ -141,14 +141,19 @@ -define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2). %% publish record is binary 1 followed by a bit for is_persistent, -%% then 14 bits of rel seq id, and 128 bits of md5sum msg id +%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits +%% of md5sum msg id -define(PUBLISH_PREFIX, 1). -define(PUBLISH_PREFIX_BITS, 1). +-define(EXPIRY_BYTES, 8). +-define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)). +-define(NO_EXPIRY, 0). + -define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes -define(GUID_BITS, (?GUID_BYTES * 8)). -%% 16 bytes for md5sum + 2 for seq, bits and prefix --define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + 2). +%% 16 bytes for md5sum + 8 for expiry + 2 for seq, bits and prefix +-define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + ?EXPIRY_BYTES + 2). %% 1 publish, 1 deliver, 1 ack per msg -define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * @@ -157,7 +162,7 @@ %% ---- misc ---- --define(PUB, {_, _}). %% {Guid, IsPersistent} +-define(PUB, {_, _, _}). %% {Guid, MsgProperties, IsPersistent} -define(READ_MODE, [binary, raw, read, {read_ahead, ?SEGMENT_TOTAL_SIZE}]). @@ -198,15 +203,15 @@ {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). --spec(publish/4 :: (rabbit_guid:guid(), seq_id(), boolean(), qistate()) -> - qistate()). +-spec(publish/5 :: (rabbit_guid:guid(), seq_id(), msg_properties(), + boolean(), qistate()) -> qistate()). -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush/1 :: (qistate()) -> qistate()). -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> - {[{rabbit_guid:guid(), seq_id(), boolean(), boolean()}], - qistate()}). + {[{rabbit_guid:guid(), seq_id(), msg_properties(), + boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). @@ -245,15 +250,18 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> +publish(Guid, SeqId, MsgProperties, IsPersistent, State) + when is_binary(Guid) -> ?GUID_BYTES = size(Guid), {JournalHdl, State1} = get_journal_handle(State), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX - end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]), - maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)). + end):?JPREFIX_BITS, + SeqId:?SEQ_BITS>>, + create_pub_record_body(Guid, MsgProperties)]), + maybe_flush_journal(add_to_journal(SeqId, {Guid, MsgProperties, IsPersistent}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -282,7 +290,7 @@ flush(State) -> flush_journal(State). read(StartEnd, StartEnd, State) -> {[], State}; -read(Start, End, State = #qistate { segments = Segments, +read(Start, End, State = #qistate { segments = Segments, dir = Dir }) when Start =< End -> %% Start is inclusive, End is exclusive. LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start), @@ -449,7 +457,7 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment1) -> + fun (RelSeq, {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack}, Segment1) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, Del, RelSeq, Segment1) end, @@ -512,6 +520,24 @@ queue_index_walker_reader(QueueName, Gatherer) -> ok = gatherer:finish(Gatherer). %%---------------------------------------------------------------------------- +%% expiry/binary manipulation +%%---------------------------------------------------------------------------- + +create_pub_record_body(Guid, #msg_properties{expiry = Expiry}) -> + [Guid, expiry_to_binary(Expiry)]. + +expiry_to_binary(undefined) -> + <<?NO_EXPIRY:?EXPIRY_BITS>>; +expiry_to_binary(Expiry) -> + <<Expiry:?EXPIRY_BITS>>. + +read_pub_record_body(Hdl) -> + {ok, Bin} = file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES), + <<GuidNum:?GUID_BITS, Expiry:?EXPIRY_BITS>> = Bin, + <<Guid:?GUID_BYTES/binary>> = <<GuidNum:?GUID_BITS>>, + {Guid, #msg_properties{expiry = Expiry}}. + +%%---------------------------------------------------------------------------- %% journal manipulation %%---------------------------------------------------------------------------- @@ -631,14 +657,9 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> ?ACK_JPREFIX -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> - case file_handle_cache:read(Hdl, ?GUID_BYTES) of - {ok, <<GuidNum:?GUID_BITS>>} -> - %% work around for binary data - %% fragmentation. See - %% rabbit_msg_file:read_next/2 - <<Guid:?GUID_BYTES/binary>> = - <<GuidNum:?GUID_BITS>>, - Publish = {Guid, case Prefix of + case read_pub_record_body(Hdl) of + {Guid, MsgProperties} -> + Publish = {Guid, MsgProperties, case Prefix of ?PUB_PERSIST_JPREFIX -> true; ?PUB_TRANS_JPREFIX -> false end}, @@ -739,11 +760,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> ok = case Pub of no_pub -> ok; - {Guid, IsPersistent} -> + {Guid, MsgProperties, IsPersistent} -> file_handle_cache:append( Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS>>, Guid]) + RelSeq:?REL_SEQ_BITS>>, + create_pub_record_body(Guid, MsgProperties)]) end, ok = case {Del, Ack} of {no_del, no_ack} -> @@ -763,10 +785,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) + fun (RelSeq, {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack}, Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), + [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProperties, IsPersistent, IsDelivered == del} | Acc ]; (_RelSeq, _Value, Acc) -> Acc @@ -798,8 +820,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> %% because we specify /binary, and binaries are complete %% bytes, the size spec is in bytes, not bits. - {ok, Guid} = file_handle_cache:read(Hdl, ?GUID_BYTES), - Obj = {{Guid, 1 == IsPersistentNum}, no_del, no_ack}, + {Guid, MsgProperties} = read_pub_record_body(Hdl), + Obj = {{Guid, MsgProperties, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), load_segment_entries(KeepAcked, Hdl, SegEntries1, UnackedCount + 1); diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a72656b73b..79792786b2 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1414,6 +1414,7 @@ test_backing_queue() -> application:set_env(rabbit, msg_store_file_size_limit, FileSizeLimit, infinity), passed = test_queue_index(), + passed = test_queue_index_props(), passed = test_variable_queue(), passed = test_queue_recover(), application:set_env(rabbit, queue_index_max_journal_entries, @@ -1639,7 +1640,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> fun (SeqId, {QiN, SeqIdsGuidsAcc, MSCStateN}) -> Guid = rabbit_guid:guid(), QiM = rabbit_queue_index:publish( - Guid, SeqId, Persistent, QiN), + Guid, SeqId, #msg_properties{}, Persistent, QiN), {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid, Guid, MSCStateN), {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM} @@ -1651,12 +1652,27 @@ queue_index_publish(SeqIds, Persistent, Qi) -> verify_read_with_published(_Delivered, _Persistent, [], _) -> ok; verify_read_with_published(Delivered, Persistent, - [{Guid, SeqId, Persistent, Delivered}|Read], + [{Guid, SeqId, _Props, Persistent, Delivered}|Read], [{SeqId, Guid}|Published]) -> verify_read_with_published(Delivered, Persistent, Read, Published); verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> ko. +test_queue_index_props() -> + with_empty_test_queue( + fun(Qi0) -> + Guid = rabbit_guid:guid(), + Props = #msg_properties{expiry=12345}, + Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0), + {[{Guid, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1), + Qi2 + end), + + ok = rabbit_variable_queue:stop(), + ok = rabbit_variable_queue:start([]), + + passed. + test_queue_index() -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), TwoSegs = SegmentSize + SegmentSize, @@ -1786,18 +1802,19 @@ variable_queue_publish(IsPersistent, Count, VQ) -> fun (_N, VQN) -> rabbit_variable_queue:publish( rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), + rabbit_misc:r(<<>>, exchange, <<>>), <<>>, #'P_basic'{delivery_mode = case IsPersistent of true -> 2; false -> 1 - end}, <<>>), VQN) + end}, <<>>), + #msg_properties{}, VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> lists:foldl(fun (N, {VQN, AckTagsAcc}) -> Rem = Len - N, {{#basic_message { is_persistent = IsPersistent }, - IsDelivered, AckTagN, Rem}, VQM} = + _Props, IsDelivered, AckTagN, Rem}, VQM} = rabbit_variable_queue:fetch(true, VQN), {VQM, [AckTagN | AckTagsAcc]} end, {VQ, []}, lists:seq(1, Count)). @@ -1837,6 +1854,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% squeeze and relax queue Churn = Len div 32, VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), + {Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), VQ7 = lists:foldl( fun (Duration1, VQ4) -> @@ -1859,7 +1877,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> VQ0; publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), - {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + {{_Msg, _MsgProps, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)). test_variable_queue_partial_segments_delta_thing(VQ0) -> @@ -1923,7 +1941,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true), - {{_Msg1, true, _AckTag1, Count1}, VQ8} = + {{_Msg1, _Props, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9), @@ -1935,7 +1953,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish(false, 4, VQ1), {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), - VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3), + VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true), @@ -1953,6 +1971,7 @@ test_queue_recover() -> sender = self(), message = Msg}, [true = rabbit_amqqueue:deliver(QPid, Delivery) || _ <- lists:seq(1, Count)], + io:format("Calling commit~n"), rabbit_amqqueue:commit_all([QPid], TxID, self()), exit(QPid, kill), MRef = erlang:monitor(process, QPid), @@ -1960,6 +1979,7 @@ test_queue_recover() -> after 10000 -> exit(timeout_waiting_for_queue_death) end, rabbit_amqqueue:stop(), + io:format("Restarting queue~n"), ok = rabbit_amqqueue:start(), rabbit_amqqueue:with_or_die( QName, @@ -1969,7 +1989,7 @@ test_queue_recover() -> rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), VQ1 = rabbit_variable_queue:init(QName, true, true), - {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = + {{_Msg1, _Props, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), rabbit_amqqueue:internal_delete(QName) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 30d3a8aec1..aee8d47b36 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -32,9 +32,9 @@ -module(rabbit_variable_queue). -export([init/3, terminate/1, delete_and_terminate/1, - purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, - tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, - requeue/2, len/1, is_empty/1, + purge/1, publish/3, publish_delivered/3, fetch/2, ack/2, + tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, + requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -248,7 +248,8 @@ is_persistent, is_delivered, msg_on_disk, - index_on_disk + index_on_disk, + msg_properties }). -record(delta, @@ -490,8 +491,8 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> ram_index_count = 0, persistent_count = 0 })}. -publish(Msg, State) -> - {_SeqId, State1} = publish(Msg, false, false, State), +publish(Msg, MsgProperties, State) -> + {_SeqId, State1} = publish(Msg, MsgProperties, false, false, State), a(reduce_memory_use(State1)). publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> @@ -505,7 +506,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, pending_ack = PA, durable = IsDurable }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, #msg_properties{})) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), PA1 = record_pending_ack(m(MsgStatus1), PA), @@ -532,7 +533,8 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, {{value, MsgStatus = #msg_status { msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, + msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, + msg_properties = MsgProperties }}, Q4a} -> %% 1. Mark it delivered if necessary @@ -563,7 +565,7 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), Len1 = Len - 1, - {{Msg, IsDelivered, AckTag, Len1}, + {{Msg, MsgProperties, IsDelivered, AckTag, Len1}, a(State #vqstate { q4 = Q4a, ram_msg_count = RamMsgCount - 1, out_counter = OutCount + 1, @@ -579,12 +581,13 @@ ack(AckTags, State) -> AckTags, State)). tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, + MsgProperties, State = #vqstate { durable = IsDurable, msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), + store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProperties} | Pubs] }), a(case IsPersistent andalso IsDurable of - true -> MsgStatus = msg_status(true, undefined, Msg), + true -> MsgStatus = msg_status(true, undefined, Msg, MsgProperties), {#msg_status { msg_on_disk = true }, MSCState1} = maybe_write_msg_to_disk(false, MsgStatus, MSCState), State #vqstate { msg_store_clients = MSCState1 }; @@ -606,10 +609,14 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> end, {lists:append(AckTags), a(State)}. -tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> +tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), - PubsOrdered = lists:reverse(Pubs), + F = fun({Msg, MsgProperties}) -> + {Msg, MsgPropsFun(MsgProperties)} + end, + PubsProcessed = lists:map(F, Pubs), + PubsOrdered = lists:reverse(PubsProcessed), AckTags1 = lists:append(AckTags), PersistentGuids = persistent_guids(PubsOrdered), HasPersistentPubs = PersistentGuids =/= [], @@ -624,18 +631,21 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> HasPersistentPubs, PubsOrdered, AckTags1, Fun, State) end)}. -requeue(AckTags, State) -> +requeue(AckTags, MsgPropsFun, State) -> a(reduce_memory_use( ack(fun rabbit_msg_store:release/2, - fun (#msg_status { msg = Msg }, State1) -> - {_SeqId, State2} = publish(Msg, true, false, State1), + fun (#msg_status { msg = Msg, + msg_properties = MsgProperties }, State1) -> + {_SeqId, State2} = + publish(Msg, MsgPropsFun(MsgProperties), true, false, State1), State2; - ({IsPersistent, Guid}, State1) -> + ({IsPersistent, Guid, MsgProperties}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, {{ok, Msg = #basic_message{}}, MSCState1} = read_from_msg_store(MSCState, IsPersistent, Guid), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, true, true, State2), + {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProperties), + true, true, State2), State3 end, AckTags, State))). @@ -783,10 +793,11 @@ one_if(false) -> 0. cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. -msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) -> +msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgProperties) -> #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, is_persistent = IsPersistent, is_delivered = false, - msg_on_disk = false, index_on_disk = false }. + msg_on_disk = false, index_on_disk = false, + msg_properties = MsgProperties }. find_msg_store(true) -> ?PERSISTENT_MSG_STORE; find_msg_store(false) -> ?TRANSIENT_MSG_STORE. @@ -826,7 +837,7 @@ persistent_guids(Pubs) -> betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( - fun ({Guid, SeqId, IsPersistent, IsDelivered}, + fun ({Guid, SeqId, MsgProperties, IsPersistent, IsDelivered}, {Filtered1, Delivers1, Acks1}) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, @@ -838,7 +849,8 @@ betas_from_index_entries(List, TransientThreshold, IndexState) -> is_persistent = IsPersistent, is_delivered = IsDelivered, msg_on_disk = true, - index_on_disk = true + index_on_disk = true, + msg_properties = MsgProperties }) | Filtered1], Delivers1, Acks1} @@ -914,7 +926,7 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, true -> [AckTag || AckTag <- AckTags, case dict:fetch(AckTag, PA) of #msg_status {} -> false; - {IsPersistent, _Guid} -> IsPersistent + {IsPersistent, _Guid, _MsgProperties} -> IsPersistent end]; false -> [] end, @@ -946,10 +958,10 @@ tx_commit_index(State = #vqstate { on_sync = #sync { Pubs = lists:append(lists:reverse(SPubs)), {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( - fun (Msg = #basic_message { is_persistent = IsPersistent }, + fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties}, {SeqIdsAcc, State2}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = publish(Msg, false, IsPersistent1, State2), + {SeqId, State3} = publish(Msg, MsgProperties, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), @@ -996,7 +1008,7 @@ remove_queue_entries1( %%---------------------------------------------------------------------------- publish(Msg = #basic_message { is_persistent = IsPersistent }, - IsDelivered, MsgOnDisk, + MsgProperties, IsDelivered, MsgOnDisk, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, len = Len, @@ -1005,8 +1017,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, durable = IsDurable, ram_msg_count = RamMsgCount }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) - #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk }, + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties)) + #msg_status { is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk}, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case bpqueue:is_empty(Q3) of false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; @@ -1045,12 +1058,18 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - guid = Guid, seq_id = SeqId, + guid = Guid, + seq_id = SeqId, is_persistent = IsPersistent, - is_delivered = IsDelivered }, IndexState) + is_delivered = IsDelivered, + msg_properties = MsgProperties}, + IndexState) when Force orelse IsPersistent -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION - IndexState1 = rabbit_queue_index:publish(Guid, SeqId, IsPersistent, + IndexState1 = rabbit_queue_index:publish(Guid, + SeqId, + MsgProperties, + IsPersistent, IndexState), {MsgStatus #msg_status { index_on_disk = true }, maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; @@ -1073,9 +1092,9 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk } = MsgStatus, PA) -> + msg_on_disk = MsgOnDisk, msg_properties = MsgProperties } = MsgStatus, PA) -> AckEntry = case MsgOnDisk of - true -> {IsPersistent, Guid}; + true -> {IsPersistent, Guid, MsgProperties}; false -> MsgStatus end, dict:store(SeqId, AckEntry, PA). @@ -1128,7 +1147,7 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false }, Acc) -> Acc; -accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> +accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProperties}, {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. |
