diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2017-07-26 22:24:24 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2017-07-26 22:24:24 +0300 |
| commit | 8e05c4dbf362f61d2e4c12de59ae1e3997f2fcd2 (patch) | |
| tree | 82b83e71c4fd8f02ec70f4c74e09aeeee7c69ef1 /test | |
| parent | 1f5240617b424b0d36e56f2d6608ed529b723184 (diff) | |
| parent | 2b2ea1173bafb01349b180c76f1e52fef5856dd0 (diff) | |
| download | rabbitmq-server-git-8e05c4dbf362f61d2e4c12de59ae1e3997f2fcd2.tar.gz | |
Merge branch 'master' into rabbitmq-server-1303
Diffstat (limited to 'test')
| -rw-r--r-- | test/channel_operation_timeout_test_queue.erl | 2275 |
1 files changed, 74 insertions, 2201 deletions
diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl index 09d7ab4e95..247477bf40 100644 --- a/test/channel_operation_timeout_test_queue.erl +++ b/test/channel_operation_timeout_test_queue.erl @@ -26,13 +26,9 @@ set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2, - zip_msgs_and_acks/4, multiple_routing_keys/0]). - + zip_msgs_and_acks/4]). -export([start/2, stop/1]). -%% exported for testing only --export([start_msg_store/3, stop_msg_store/1, init/6]). - %%---------------------------------------------------------------------------- %% This test backing queue follows the variable queue implementation, with %% the exception that it will introduce infinite delays on some operations if @@ -64,6 +60,7 @@ unacked_bytes, persistent_count, %% w unacked persistent_bytes, %% w unacked + delta_transient_bytes, %% target_ram_count, ram_msg_count, %% w/o unacked @@ -88,6 +85,12 @@ %% default queue or lazy queue mode, + %% number of reduce_memory_usage executions, once it + %% reaches a threshold the queue will manually trigger a runtime GC + %% see: maybe_execute_gc/1 + memory_reduction_run_count, + %% Queue data is grouped by VHost. We need to store it + %% to work with queue index. virtual_host }). @@ -108,22 +111,18 @@ -record(delta, { start_seq_id, %% start_seq_id is inclusive count, + transient, end_seq_id %% end_seq_id is exclusive }). --define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 --define(PERSISTENT_MSG_STORE, msg_store_persistent). --define(TRANSIENT_MSG_STORE, msg_store_transient). --define(QUEUE, lqueue). --define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>). --define(EMPTY_START_FUN_STATE, {fun (ok) -> finished end, ok}). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). -%%---------------------------------------------------------------------------- +-define(QUEUE, lqueue). +-define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>). --rabbit_upgrade({multiple_routing_keys, local, []}). +%%---------------------------------------------------------------------------- -type seq_id() :: non_neg_integer(). @@ -191,2220 +190,118 @@ %% Duplicated from rabbit_backing_queue -spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}. --spec multiple_routing_keys() -> 'ok'. - --define(BLANK_DELTA, #delta { start_seq_id = undefined, - count = 0, - end_seq_id = undefined }). --define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z, - count = 0, - end_seq_id = Z }). - --define(MICROS_PER_SECOND, 1000000.0). - -%% We're sampling every 5s for RAM duration; a half life that is of -%% the same order of magnitude is probably about right. --define(RATE_AVG_HALF_LIFE, 5.0). - -%% We will recalculate the #rates{} every time we get asked for our -%% RAM duration, or every N messages published, whichever is -%% sooner. We do this since the priority calculations in -%% rabbit_amqqueue_process need fairly fresh rates. --define(MSGS_PER_RATE_CALC, 100). - %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- start(VHost, DurableQueues) -> - {AllTerms, StartFunState} = rabbit_queue_index:start(VHost, DurableQueues), - %% Group recovery terms by vhost. - ClientRefs = [Ref || Terms <- AllTerms, - Terms /= non_clean_shutdown, - begin - Ref = proplists:get_value(persistent_ref, Terms), - Ref =/= undefined - end], - start_msg_store(VHost, ClientRefs, StartFunState), - {ok, AllTerms}. + rabbit_variable_queue:start(VHost, DurableQueues). stop(VHost) -> - ok = stop_msg_store(VHost), - ok = rabbit_queue_index:stop(VHost). - -start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined -> - rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]), - {ok, _} = rabbit_vhost_msg_store:start(VHost, - ?TRANSIENT_MSG_STORE, - undefined, - ?EMPTY_START_FUN_STATE), - {ok, _} = rabbit_vhost_msg_store:start(VHost, - ?PERSISTENT_MSG_STORE, - Refs, - StartFunState), - rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]). - -stop_msg_store(VHost) -> - rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE), - rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE), - ok. - + rabbit_variable_queue:stop(VHost). init(Queue, Recover, Callback) -> - init( - Queue, Recover, Callback, - fun (MsgIds, ActionTaken) -> - msgs_written_to_disk(Callback, MsgIds, ActionTaken) - end, - fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end, - fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end). - -init(#amqqueue { name = QueueName, durable = IsDurable }, new, - AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> - IndexState = rabbit_queue_index:init(QueueName, - MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), - VHost = QueueName#resource.virtual_host, - init(IsDurable, IndexState, 0, 0, [], - case IsDurable of - true -> msg_store_client_init(?PERSISTENT_MSG_STORE, - MsgOnDiskFun, AsyncCallback, - VHost); - false -> undefined - end, - msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost), VHost); - -%% We can be recovering a transient queue if it crashed -init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, - AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> - {PRef, RecoveryTerms} = process_recovery_terms(Terms), - VHost = QueueName#resource.virtual_host, - {PersistentClient, ContainsCheckFun} = - case IsDurable of - true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, - MsgOnDiskFun, AsyncCallback, - VHost), - {C, fun (MsgId) when is_binary(MsgId) -> - rabbit_msg_store:contains(MsgId, C); - (#basic_message{is_persistent = Persistent}) -> - Persistent - end}; - false -> {undefined, fun(_MsgId) -> false end} - end, - TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, - undefined, AsyncCallback, - VHost), - {DeltaCount, DeltaBytes, IndexState} = - rabbit_queue_index:recover( - QueueName, RecoveryTerms, - rabbit_vhost_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE, VHost), - ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), - init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, - PersistentClient, TransientClient, VHost). - -process_recovery_terms(Terms=non_clean_shutdown) -> - {rabbit_guid:gen(), Terms}; -process_recovery_terms(Terms) -> - case proplists:get_value(persistent_ref, Terms) of - undefined -> {rabbit_guid:gen(), []}; - PRef -> {PRef, Terms} - end. - -terminate(_Reason, State) -> - State1 = #vqstate { persistent_count = PCount, - persistent_bytes = PBytes, - index_state = IndexState, - msg_store_clients = {MSCStateP, MSCStateT}, - virtual_host = VHost } = - purge_pending_ack(true, State), - PRef = case MSCStateP of - undefined -> undefined; - _ -> ok = rabbit_msg_store:client_terminate(MSCStateP), - rabbit_msg_store:client_ref(MSCStateP) - end, - ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT), - Terms = [{persistent_ref, PRef}, - {persistent_count, PCount}, - {persistent_bytes, PBytes}], - a(State1 #vqstate { index_state = rabbit_queue_index:terminate( - VHost, Terms, IndexState), - msg_store_clients = undefined }). - -%% the only difference between purge and delete is that delete also -%% needs to delete everything that's been delivered and not ack'd. -delete_and_terminate(_Reason, State) -> - %% Normally when we purge messages we interact with the qi by - %% issues delivers and acks for every purged message. In this case - %% we don't need to do that, so we just delete the qi. - State1 = purge_and_index_reset(State), - State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } = - purge_pending_ack_delete_and_terminate(State1), - case MSCStateP of - undefined -> ok; - _ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP) - end, - rabbit_msg_store:client_delete_and_terminate(MSCStateT), - a(State2 #vqstate { msg_store_clients = undefined }). - -delete_crashed(#amqqueue{name = QName}) -> - ok = rabbit_queue_index:erase(QName). - -purge(State = #vqstate { len = Len, qi_pending_ack= QPA }) -> + rabbit_variable_queue:init(Queue, Recover, Callback). + +terminate(Reason, State) -> + rabbit_variable_queue:terminate(Reason, State). + +delete_and_terminate(Reason, State) -> + rabbit_variable_queue:delete_and_terminate(Reason, State). + +delete_crashed(Q) -> + rabbit_variable_queue:delete_crashed(Q). + +purge(State = #vqstate { qi_pending_ack= QPA }) -> maybe_delay(QPA), - case is_pending_ack_empty(State) of - true -> - {Len, purge_and_index_reset(State)}; - false -> - {Len, purge_when_pending_acks(State)} - end. + rabbit_variable_queue:purge(State). -purge_acks(State) -> a(purge_pending_ack(false, State)). +purge_acks(State) -> + rabbit_variable_queue:purge_acks(State). publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) -> - State1 = - publish1(Msg, MsgProps, IsDelivered, ChPid, Flow, - fun maybe_write_to_disk/4, - State), - a(reduce_memory_use(maybe_update_rates(State1))). + rabbit_variable_queue:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State). batch_publish(Publishes, ChPid, Flow, State) -> - {ChPid, Flow, State1} = - lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes), - State2 = ui(State1), - a(reduce_memory_use(maybe_update_rates(State2))). + rabbit_variable_queue:batch_publish(Publishes, ChPid, Flow, State). publish_delivered(Msg, MsgProps, ChPid, Flow, State) -> - {SeqId, State1} = - publish_delivered1(Msg, MsgProps, ChPid, Flow, - fun maybe_write_to_disk/4, - State), - {SeqId, a(reduce_memory_use(maybe_update_rates(State1)))}. + rabbit_variable_queue:publish_delivered(Msg, MsgProps, ChPid, Flow, State). batch_publish_delivered(Publishes, ChPid, Flow, State) -> - {ChPid, Flow, SeqIds, State1} = - lists:foldl(fun batch_publish_delivered1/2, - {ChPid, Flow, [], State}, Publishes), - State2 = ui(State1), - {lists:reverse(SeqIds), a(reduce_memory_use(maybe_update_rates(State2)))}. + rabbit_variable_queue:batch_publish_delivered(Publishes, ChPid, Flow, State). discard(_MsgId, _ChPid, _Flow, State) -> State. -drain_confirmed(State = #vqstate { confirmed = C }) -> - case gb_sets:is_empty(C) of - true -> {[], State}; %% common case - false -> {gb_sets:to_list(C), State #vqstate { - confirmed = gb_sets:new() }} - end. +drain_confirmed(State) -> + rabbit_variable_queue:drain_confirmed(State). dropwhile(Pred, State) -> - {MsgProps, State1} = - remove_by_predicate(Pred, State), - {MsgProps, a(State1)}. + rabbit_variable_queue:dropwhile(Pred, State). fetchwhile(Pred, Fun, Acc, State) -> - {MsgProps, Acc1, State1} = - fetch_by_predicate(Pred, Fun, Acc, State), - {MsgProps, Acc1, a(State1)}. + rabbit_variable_queue:fetchwhile(Pred, Fun, Acc, State). fetch(AckRequired, State) -> - case queue_out(State) of - {empty, State1} -> - {empty, a(State1)}; - {{value, MsgStatus}, State1} -> - %% it is possible that the message wasn't read from disk - %% at this point, so read it in. - {Msg, State2} = read_msg(MsgStatus, State1), - {AckTag, State3} = remove(AckRequired, MsgStatus, State2), - {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)} - end. + rabbit_variable_queue:fetch(AckRequired, State). drop(AckRequired, State) -> - case queue_out(State) of - {empty, State1} -> - {empty, a(State1)}; - {{value, MsgStatus}, State1} -> - {AckTag, State2} = remove(AckRequired, MsgStatus, State1), - {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)} - end. - -ack([], State) -> - {[], State}; -%% optimisation: this head is essentially a partial evaluation of the -%% general case below, for the single-ack case. -ack([SeqId], State) -> - {#msg_status { msg_id = MsgId, - is_persistent = IsPersistent, - msg_in_store = MsgInStore, - index_on_disk = IndexOnDisk }, - State1 = #vqstate { index_state = IndexState, - msg_store_clients = MSCState, - ack_out_counter = AckOutCount }} = - remove_pending_ack(true, SeqId, State), - IndexState1 = case IndexOnDisk of - true -> rabbit_queue_index:ack([SeqId], IndexState); - false -> IndexState - end, - case MsgInStore of - true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); - false -> ok - end, - {[MsgId], - a(State1 #vqstate { index_state = IndexState1, - ack_out_counter = AckOutCount + 1 })}; -ack(AckTags, State) -> - {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, - State1 = #vqstate { index_state = IndexState, - msg_store_clients = MSCState, - ack_out_counter = AckOutCount }} = - lists:foldl( - fun (SeqId, {Acc, State2}) -> - {MsgStatus, State3} = remove_pending_ack(true, SeqId, State2), - {accumulate_ack(MsgStatus, Acc), State3} - end, {accumulate_ack_init(), State}, AckTags), - IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), - remove_msgs_by_id(MsgIdsByStore, MSCState), - {lists:reverse(AllMsgIds), - a(State1 #vqstate { index_state = IndexState1, - ack_out_counter = AckOutCount + length(AckTags) })}. - -requeue(AckTags, #vqstate { mode = default, - delta = Delta, - q3 = Q3, - q4 = Q4, - in_counter = InCounter, - len = Len, - qi_pending_ack = QPA } = State) -> - maybe_delay(QPA), - {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [], - beta_limit(Q3), - fun publish_alpha/2, State), - {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds, - delta_limit(Delta), - fun publish_beta/2, State1), - {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1, - State2), - MsgCount = length(MsgIds2), - {MsgIds2, a(reduce_memory_use( - maybe_update_rates( - State3 #vqstate { delta = Delta1, - q3 = Q3a, - q4 = Q4a, - in_counter = InCounter + MsgCount, - len = Len + MsgCount })))}; -requeue(AckTags, #vqstate { mode = lazy, - delta = Delta, - q3 = Q3, - in_counter = InCounter, - len = Len, - qi_pending_ack = QPA } = State) -> + rabbit_variable_queue:drop(AckRequired, State). + +ack(List, State) -> + rabbit_variable_queue:ack(List, State). + +requeue(AckTags, #vqstate { qi_pending_ack = QPA } = State) -> maybe_delay(QPA), - {SeqIds, Q3a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q3, [], - delta_limit(Delta), - fun publish_beta/2, State), - {Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds, - State1), - MsgCount = length(MsgIds1), - {MsgIds1, a(reduce_memory_use( - maybe_update_rates( - State2 #vqstate { delta = Delta1, - q3 = Q3a, - in_counter = InCounter + MsgCount, - len = Len + MsgCount })))}. + rabbit_variable_queue:requeue(AckTags, State). ackfold(MsgFun, Acc, State, AckTags) -> - {AccN, StateN} = - lists:foldl(fun(SeqId, {Acc0, State0}) -> - MsgStatus = lookup_pending_ack(SeqId, State0), - {Msg, State1} = read_msg(MsgStatus, State0), - {MsgFun(Msg, SeqId, Acc0), State1} - end, {Acc, State}, AckTags), - {AccN, a(StateN)}. - -fold(Fun, Acc, State = #vqstate{index_state = IndexState}) -> - {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState}, - [msg_iterator(State), - disk_ack_iterator(State), - ram_ack_iterator(State), - qi_ack_iterator(State)]), - ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}). - -len(#vqstate { len = Len, qi_pending_ack = QPA }) -> + rabbit_variable_queue:ackfold(MsgFun, Acc, State, AckTags). + +fold(Fun, Acc, State) -> + rabbit_variable_queue:fold(Fun, Acc, State). + +len(#vqstate { qi_pending_ack = QPA } = State) -> maybe_delay(QPA), - Len. + rabbit_variable_queue:len(State). is_empty(State) -> 0 == len(State). depth(State) -> - len(State) + count_pending_acks(State). - -set_ram_duration_target( - DurationTarget, State = #vqstate { - rates = #rates { in = AvgIngressRate, - out = AvgEgressRate, - ack_in = AvgAckIngressRate, - ack_out = AvgAckEgressRate }, - target_ram_count = TargetRamCount }) -> - Rate = - AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate, - TargetRamCount1 = - case DurationTarget of - infinity -> infinity; - _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec - end, - State1 = State #vqstate { target_ram_count = TargetRamCount1 }, - a(case TargetRamCount1 == infinity orelse - (TargetRamCount =/= infinity andalso - TargetRamCount1 >= TargetRamCount) of - true -> State1; - false -> reduce_memory_use(State1) - end). - -maybe_update_rates(State = #vqstate{ in_counter = InCount, - out_counter = OutCount }) - when InCount + OutCount > ?MSGS_PER_RATE_CALC -> - update_rates(State); -maybe_update_rates(State) -> - State. - -update_rates(State = #vqstate{ in_counter = InCount, - out_counter = OutCount, - ack_in_counter = AckInCount, - ack_out_counter = AckOutCount, - rates = #rates{ in = InRate, - out = OutRate, - ack_in = AckInRate, - ack_out = AckOutRate, - timestamp = TS }}) -> - Now = erlang:monotonic_time(), - - Rates = #rates { in = update_rate(Now, TS, InCount, InRate), - out = update_rate(Now, TS, OutCount, OutRate), - ack_in = update_rate(Now, TS, AckInCount, AckInRate), - ack_out = update_rate(Now, TS, AckOutCount, AckOutRate), - timestamp = Now }, - - State#vqstate{ in_counter = 0, - out_counter = 0, - ack_in_counter = 0, - ack_out_counter = 0, - rates = Rates }. - -update_rate(Now, TS, Count, Rate) -> - Time = erlang:convert_time_unit(Now - TS, native, micro_seconds) / - ?MICROS_PER_SECOND, - if - Time == 0 -> Rate; - true -> rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE, - Count / Time, Rate) - end. - -ram_duration(State) -> - State1 = #vqstate { rates = #rates { in = AvgIngressRate, - out = AvgEgressRate, - ack_in = AvgAckIngressRate, - ack_out = AvgAckEgressRate }, - ram_msg_count = RamMsgCount, - ram_msg_count_prev = RamMsgCountPrev, - ram_pending_ack = RPA, - qi_pending_ack = QPA, - ram_ack_count_prev = RamAckCountPrev } = - update_rates(State), - - RamAckCount = gb_trees:size(RPA) + gb_trees:size(QPA), - - Duration = %% msgs+acks / (msgs+acks/sec) == sec - case lists:all(fun (X) -> X < 0.01 end, - [AvgEgressRate, AvgIngressRate, - AvgAckEgressRate, AvgAckIngressRate]) of - true -> infinity; - false -> (RamMsgCountPrev + RamMsgCount + - RamAckCount + RamAckCountPrev) / - (4 * (AvgEgressRate + AvgIngressRate + - AvgAckEgressRate + AvgAckIngressRate)) - end, - - {Duration, State1}. - -needs_timeout(#vqstate { index_state = IndexState }) -> - case rabbit_queue_index:needs_sync(IndexState) of - confirms -> timed; - other -> idle; - false -> false - end. - -timeout(State = #vqstate { index_state = IndexState }) -> - State #vqstate { index_state = rabbit_queue_index:sync(IndexState) }. - -handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> - State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. - -resume(State) -> a(reduce_memory_use(State)). - -msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, - out = AvgEgressRate } }) -> - {AvgIngressRate, AvgEgressRate}. - -info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) -> - RamMsgCount; -info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA, - qi_pending_ack = QPA}) -> - gb_trees:size(RPA) + gb_trees:size(QPA); -info(messages_ram, State) -> - info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State); -info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> - PersistentCount; -info(message_bytes, #vqstate{bytes = Bytes, - unacked_bytes = UBytes}) -> - Bytes + UBytes; -info(message_bytes_ready, #vqstate{bytes = Bytes}) -> - Bytes; -info(message_bytes_unacknowledged, #vqstate{unacked_bytes = UBytes}) -> - UBytes; -info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> - RamBytes; -info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> - PersistentBytes; -info(head_message_timestamp, #vqstate{ - q3 = Q3, - q4 = Q4, - ram_pending_ack = RPA, - qi_pending_ack = QPA}) -> - head_message_timestamp(Q3, Q4, RPA, QPA); -info(disk_reads, #vqstate{disk_read_count = Count}) -> - Count; -info(disk_writes, #vqstate{disk_write_count = Count}) -> - Count; -info(backing_queue_status, #vqstate { - q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - mode = Mode, - len = Len, - target_ram_count = TargetRamCount, - next_seq_id = NextSeqId, - rates = #rates { in = AvgIngressRate, - out = AvgEgressRate, - ack_in = AvgAckIngressRate, - ack_out = AvgAckEgressRate }}) -> - - [ {mode , Mode}, - {q1 , ?QUEUE:len(Q1)}, - {q2 , ?QUEUE:len(Q2)}, - {delta , Delta}, - {q3 , ?QUEUE:len(Q3)}, - {q4 , ?QUEUE:len(Q4)}, - {len , Len}, - {target_ram_count , TargetRamCount}, - {next_seq_id , NextSeqId}, - {avg_ingress_rate , AvgIngressRate}, - {avg_egress_rate , AvgEgressRate}, - {avg_ack_ingress_rate, AvgAckIngressRate}, - {avg_ack_egress_rate , AvgAckEgressRate} ]; -info(Item, _) -> - throw({bad_argument, Item}). - -invoke(?MODULE, Fun, State) -> Fun(?MODULE, State); -invoke( _, _, State) -> State. - -is_duplicate(_Msg, State) -> {false, State}. - -set_queue_mode(Mode, State = #vqstate { mode = Mode }) -> - State; -set_queue_mode(lazy, State = #vqstate { - target_ram_count = TargetRamCount }) -> - %% To become a lazy queue we need to page everything to disk first. - State1 = convert_to_lazy(State), - %% restore the original target_ram_count - a(State1 #vqstate { mode = lazy, target_ram_count = TargetRamCount }); -set_queue_mode(default, State) -> - %% becoming a default queue means loading messages from disk like - %% when a queue is recovered. - a(maybe_deltas_to_betas(State #vqstate { mode = default })); -set_queue_mode(_, State) -> - State. - -zip_msgs_and_acks(Msgs, AckTags, Accumulator, _State) -> - lists:foldl(fun ({{#basic_message{ id = Id }, _Props}, AckTag}, Acc) -> - [{Id, AckTag} | Acc] - end, Accumulator, lists:zip(Msgs, AckTags)). - -convert_to_lazy(State) -> - State1 = #vqstate { delta = Delta, q3 = Q3, len = Len } = - set_ram_duration_target(0, State), - case Delta#delta.count + ?QUEUE:len(Q3) == Len of - true -> - State1; - false -> - %% When pushing messages to disk, we might have been - %% blocked by the msg_store, so we need to see if we have - %% to wait for more credit, and then keep paging messages. - %% - %% The amqqueue_process could have taken care of this, but - %% between the time it receives the bump_credit msg and - %% calls BQ:resume to keep paging messages to disk, some - %% other request may arrive to the BQ which at this moment - %% is not in a proper state for a lazy BQ (unless all - %% messages have been paged to disk already). - wait_for_msg_store_credit(), - convert_to_lazy(State1) - end. - -wait_for_msg_store_credit() -> - case credit_flow:blocked() of - true -> receive - {bump_credit, Msg} -> - credit_flow:handle_bump_msg(Msg) - end; - false -> ok - end. - -%% Get the Timestamp property of the first msg, if present. This is -%% the one with the oldest timestamp among the heads of the pending -%% acks and unread queues. We can't check disk_pending_acks as these -%% are paged out - we assume some will soon be paged in rather than -%% forcing it to happen. Pending ack msgs are included as they are -%% regarded as unprocessed until acked, this also prevents the result -%% apparently oscillating during repeated rejects. Q3 is only checked -%% when Q4 is empty as any Q4 msg will be earlier. -head_message_timestamp(Q3, Q4, RPA, QPA) -> - HeadMsgs = [ HeadMsgStatus#msg_status.msg || - HeadMsgStatus <- - [ get_qs_head([Q4, Q3]), - get_pa_head(RPA), - get_pa_head(QPA) ], - HeadMsgStatus /= undefined, - HeadMsgStatus#msg_status.msg /= undefined ], - - Timestamps = - [Timestamp || HeadMsg <- HeadMsgs, - Timestamp <- [rabbit_basic:extract_timestamp( - HeadMsg#basic_message.content)], - Timestamp /= undefined - ], - - case Timestamps == [] of - true -> ''; - false -> lists:min(Timestamps) - end. - -get_qs_head(Qs) -> - catch lists:foldl( - fun (Q, Acc) -> - case get_q_head(Q) of - undefined -> Acc; - Val -> throw(Val) - end - end, undefined, Qs). - -get_q_head(Q) -> - get_collection_head(Q, fun ?QUEUE:is_empty/1, fun ?QUEUE:peek/1). - -get_pa_head(PA) -> - get_collection_head(PA, fun gb_trees:is_empty/1, fun gb_trees:smallest/1). - -get_collection_head(Col, IsEmpty, GetVal) -> - case IsEmpty(Col) of - false -> - {_, MsgStatus} = GetVal(Col), - MsgStatus; - true -> undefined - end. - -%%---------------------------------------------------------------------------- -%% Minor helpers -%%---------------------------------------------------------------------------- -a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - mode = default, - len = Len, - bytes = Bytes, - unacked_bytes = UnackedBytes, - persistent_count = PersistentCount, - persistent_bytes = PersistentBytes, - ram_msg_count = RamMsgCount, - ram_bytes = RamBytes}) -> - E1 = ?QUEUE:is_empty(Q1), - E2 = ?QUEUE:is_empty(Q2), - ED = Delta#delta.count == 0, - E3 = ?QUEUE:is_empty(Q3), - E4 = ?QUEUE:is_empty(Q4), - LZ = Len == 0, - - %% if q1 has messages then q3 cannot be empty. See publish/6. - true = E1 or not E3, - %% if q2 has messages then we have messages in delta (paged to - %% disk). See push_alphas_to_betas/2. - true = E2 or not ED, - %% if delta has messages then q3 cannot be empty. This is enforced - %% by paging, where min([?SEGMENT_ENTRY_COUNT, len(q3)]) messages - %% are always kept on RAM. - true = ED or not E3, - %% if the queue length is 0, then q3 and q4 must be empty. - true = LZ == (E3 and E4), - - true = Len >= 0, - true = Bytes >= 0, - true = UnackedBytes >= 0, - true = PersistentCount >= 0, - true = PersistentBytes >= 0, - true = RamMsgCount >= 0, - true = RamMsgCount =< Len, - true = RamBytes >= 0, - true = RamBytes =< Bytes + UnackedBytes, - - State; -a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - mode = lazy, - len = Len, - bytes = Bytes, - unacked_bytes = UnackedBytes, - persistent_count = PersistentCount, - persistent_bytes = PersistentBytes, - ram_msg_count = RamMsgCount, - ram_bytes = RamBytes}) -> - E1 = ?QUEUE:is_empty(Q1), - E2 = ?QUEUE:is_empty(Q2), - ED = Delta#delta.count == 0, - E3 = ?QUEUE:is_empty(Q3), - E4 = ?QUEUE:is_empty(Q4), - LZ = Len == 0, - L3 = ?QUEUE:len(Q3), - - %% q1 must always be empty, since q1 only gets messages during - %% publish, but for lazy queues messages go straight to delta. - true = E1, - - %% q2 only gets messages from q1 when push_alphas_to_betas is - %% called for a non empty delta, which won't be the case for a - %% lazy queue. This means q2 must always be empty. - true = E2, - - %% q4 must always be empty, since q1 only gets messages during - %% publish, but for lazy queues messages go straight to delta. - true = E4, - - %% if the queue is empty, then delta is empty and q3 is empty. - true = LZ == (ED and E3), - - %% There should be no messages in q1, q2, and q4 - true = Delta#delta.count + L3 == Len, - - true = Len >= 0, - true = Bytes >= 0, - true = UnackedBytes >= 0, - true = PersistentCount >= 0, - true = PersistentBytes >= 0, - true = RamMsgCount >= 0, - true = RamMsgCount =< Len, - true = RamBytes >= 0, - true = RamBytes =< Bytes + UnackedBytes, - - State. - -d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End }) - when Start + Count =< End -> - Delta. - -m(MsgStatus = #msg_status { is_persistent = IsPersistent, - msg_in_store = MsgInStore, - index_on_disk = IndexOnDisk }) -> - true = (not IsPersistent) or IndexOnDisk, - true = msg_in_ram(MsgStatus) or MsgInStore, - MsgStatus. - -one_if(true ) -> 1; -one_if(false) -> 0. - -cons_if(true, E, L) -> [E | L]; -cons_if(false, _E, L) -> L. - -gb_sets_maybe_insert(false, _Val, Set) -> Set; -gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). - -msg_status(IsPersistent, IsDelivered, SeqId, - Msg = #basic_message {id = MsgId}, MsgProps, IndexMaxSize) -> - #msg_status{seq_id = SeqId, - msg_id = MsgId, - msg = Msg, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_in_store = false, - index_on_disk = false, - persist_to = determine_persist_to(Msg, MsgProps, IndexMaxSize), - msg_props = MsgProps}. - -beta_msg_status({Msg = #basic_message{id = MsgId}, - SeqId, MsgProps, IsPersistent, IsDelivered}) -> - MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), - MS0#msg_status{msg_id = MsgId, - msg = Msg, - persist_to = queue_index, - msg_in_store = false}; - -beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) -> - MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), - MS0#msg_status{msg_id = MsgId, - msg = undefined, - persist_to = msg_store, - msg_in_store = true}. - -beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) -> - #msg_status{seq_id = SeqId, - msg = undefined, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - index_on_disk = true, - msg_props = MsgProps}. - -trim_msg_status(MsgStatus) -> - case persist_to(MsgStatus) of - msg_store -> MsgStatus#msg_status{msg = undefined}; - queue_index -> MsgStatus - end. - -with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) -> - {Result, MSCStateP1} = Fun(MSCStateP), - {Result, {MSCStateP1, MSCStateT}}; -with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) -> - {Result, MSCStateT1} = Fun(MSCStateT), - {Result, {MSCStateP, MSCStateT1}}. - -with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> - {Res, MSCState} = with_msg_store_state(MSCState, IsPersistent, - fun (MSCState1) -> - {Fun(MSCState1), MSCState1} - end), - Res. - -msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) -> - msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun, - Callback, VHost). - -msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) -> - CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), - rabbit_vhost_msg_store:client_init(VHost, - MsgStore, Ref, MsgOnDiskFun, - fun () -> Callback(?MODULE, CloseFDsFun) end). - -msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> - with_immutable_msg_store_state( - MSCState, IsPersistent, - fun (MSCState1) -> - rabbit_msg_store:write_flow(MsgId, Msg, MSCState1) - end). - -msg_store_read(MSCState, IsPersistent, MsgId) -> - with_msg_store_state( - MSCState, IsPersistent, - fun (MSCState1) -> - rabbit_msg_store:read(MsgId, MSCState1) - end). - -msg_store_remove(MSCState, IsPersistent, MsgIds) -> - with_immutable_msg_store_state( - MSCState, IsPersistent, - fun (MCSState1) -> - rabbit_msg_store:remove(MsgIds, MCSState1) - end). - -msg_store_close_fds(MSCState, IsPersistent) -> - with_msg_store_state( - MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end). - -msg_store_close_fds_fun(IsPersistent) -> - fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) -> - {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent), - State #vqstate { msg_store_clients = MSCState1 } - end. - -maybe_write_delivered(false, _SeqId, IndexState) -> - IndexState; -maybe_write_delivered(true, SeqId, IndexState) -> - rabbit_queue_index:deliver([SeqId], IndexState). - -betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) -> - {Filtered, Delivers, Acks, RamReadyCount, RamBytes} = - lists:foldr( - fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, - {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) -> - case SeqId < TransientThreshold andalso not IsPersistent of - true -> {Filtered1, - cons_if(not IsDelivered, SeqId, Delivers1), - [SeqId | Acks1], RRC, RB}; - false -> MsgStatus = m(beta_msg_status(M)), - HaveMsg = msg_in_ram(MsgStatus), - Size = msg_size(MsgStatus), - case is_msg_in_pending_acks(SeqId, State) of - false -> {?QUEUE:in_r(MsgStatus, Filtered1), - Delivers1, Acks1, - RRC + one_if(HaveMsg), - RB + one_if(HaveMsg) * Size}; - true -> Acc %% [0] - end - end - end, {?QUEUE:new(), [], [], 0, 0}, List), - {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State)}. -%% [0] We don't increase RamBytes here, even though it pertains to -%% unacked messages too, since if HaveMsg then the message must have -%% been stored in the QI, thus the message must have been in -%% qi_pending_ack, thus it must already have been in RAM. - -is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA }) -> - (gb_trees:is_defined(SeqId, RPA) orelse - gb_trees:is_defined(SeqId, DPA) orelse - gb_trees:is_defined(SeqId, QPA)). - -expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) -> - d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 }); -expand_delta(SeqId, #delta { start_seq_id = StartSeqId, - count = Count } = Delta) - when SeqId < StartSeqId -> - d(Delta #delta { start_seq_id = SeqId, count = Count + 1 }); -expand_delta(SeqId, #delta { count = Count, - end_seq_id = EndSeqId } = Delta) - when SeqId >= EndSeqId -> - d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1 }); -expand_delta(_SeqId, #delta { count = Count } = Delta) -> - d(Delta #delta { count = Count + 1 }). - -%%---------------------------------------------------------------------------- -%% Internal major helpers for Public API -%%---------------------------------------------------------------------------- - -init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, - PersistentClient, TransientClient, VHost) -> - {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), - - {DeltaCount1, DeltaBytes1} = - case Terms of - non_clean_shutdown -> {DeltaCount, DeltaBytes}; - _ -> {proplists:get_value(persistent_count, - Terms, DeltaCount), - proplists:get_value(persistent_bytes, - Terms, DeltaBytes)} - end, - Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of - true -> ?BLANK_DELTA; - false -> d(#delta { start_seq_id = LowSeqId, - count = DeltaCount1, - end_seq_id = NextSeqId }) - end, - Now = erlang:monotonic_time(), - IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size, - ?IO_BATCH_SIZE), - - {ok, IndexMaxSize} = application:get_env( - rabbit, queue_index_embed_msgs_below), - State = #vqstate { - q1 = ?QUEUE:new(), - q2 = ?QUEUE:new(), - delta = Delta, - q3 = ?QUEUE:new(), - q4 = ?QUEUE:new(), - next_seq_id = NextSeqId, - ram_pending_ack = gb_trees:empty(), - disk_pending_ack = gb_trees:empty(), - qi_pending_ack = gb_trees:empty(), - index_state = IndexState1, - msg_store_clients = {PersistentClient, TransientClient}, - durable = IsDurable, - transient_threshold = NextSeqId, - qi_embed_msgs_below = IndexMaxSize, - - len = DeltaCount1, - persistent_count = DeltaCount1, - bytes = DeltaBytes1, - persistent_bytes = DeltaBytes1, - - target_ram_count = infinity, - ram_msg_count = 0, - ram_msg_count_prev = 0, - ram_ack_count_prev = 0, - ram_bytes = 0, - unacked_bytes = 0, - out_counter = 0, - in_counter = 0, - rates = blank_rates(Now), - msgs_on_disk = gb_sets:new(), - msg_indices_on_disk = gb_sets:new(), - unconfirmed = gb_sets:new(), - confirmed = gb_sets:new(), - ack_out_counter = 0, - ack_in_counter = 0, - disk_read_count = 0, - disk_write_count = 0, - - io_batch_size = IoBatchSize, - - mode = default, - virtual_host = VHost }, - a(maybe_deltas_to_betas(State)). - -blank_rates(Now) -> - #rates { in = 0.0, - out = 0.0, - ack_in = 0.0, - ack_out = 0.0, - timestamp = Now}. - -in_r(MsgStatus = #msg_status { msg = undefined }, - State = #vqstate { mode = default, q3 = Q3, q4 = Q4 }) -> - case ?QUEUE:is_empty(Q4) of - true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; - false -> {Msg, State1 = #vqstate { q4 = Q4a }} = - read_msg(MsgStatus, State), - MsgStatus1 = MsgStatus#msg_status{msg = Msg}, - stats(ready0, {MsgStatus, MsgStatus1}, - State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }) - end; -in_r(MsgStatus, - State = #vqstate { mode = default, q4 = Q4 }) -> - State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }; -%% lazy queues -in_r(MsgStatus = #msg_status { seq_id = SeqId }, - State = #vqstate { mode = lazy, q3 = Q3, delta = Delta}) -> - case ?QUEUE:is_empty(Q3) of - true -> - {_MsgStatus1, State1} = - maybe_write_to_disk(true, true, MsgStatus, State), - State2 = stats(ready0, {MsgStatus, none}, State1), - Delta1 = expand_delta(SeqId, Delta), - State2 #vqstate{ delta = Delta1 }; - false -> - State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) } - end. - -queue_out(State = #vqstate { mode = default, q4 = Q4 }) -> - case ?QUEUE:out(Q4) of - {empty, _Q4} -> - case fetch_from_q3(State) of - {empty, _State1} = Result -> Result; - {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} - end; - {{value, MsgStatus}, Q4a} -> - {{value, MsgStatus}, State #vqstate { q4 = Q4a }} - end; -%% lazy queues -queue_out(State = #vqstate { mode = lazy }) -> - case fetch_from_q3(State) of - {empty, _State1} = Result -> Result; - {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} - end. - -read_msg(#msg_status{msg = undefined, - msg_id = MsgId, - is_persistent = IsPersistent}, State) -> - read_msg(MsgId, IsPersistent, State); -read_msg(#msg_status{msg = Msg}, State) -> - {Msg, State}. - -read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState, - disk_read_count = Count}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, MsgId), - {Msg, State #vqstate {msg_store_clients = MSCState1, - disk_read_count = Count + 1}}. - -stats(Signs, Statuses, State) -> - stats0(expand_signs(Signs), expand_statuses(Statuses), State). - -expand_signs(ready0) -> {0, 0, true}; -expand_signs(lazy_pub) -> {1, 0, true}; -expand_signs({A, B}) -> {A, B, false}. - -expand_statuses({none, A}) -> {false, msg_in_ram(A), A}; -expand_statuses({B, none}) -> {msg_in_ram(B), false, B}; -expand_statuses({lazy, A}) -> {false , false, A}; -expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}. - -%% In this function at least, we are religious: the variable name -%% contains "Ready" or "Unacked" iff that is what it counts. If -%% neither is present it counts both. -stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged}, - {InRamBefore, InRamAfter, MsgStatus}, - State = #vqstate{len = ReadyCount, - bytes = ReadyBytes, - ram_msg_count = RamReadyCount, - persistent_count = PersistentCount, - unacked_bytes = UnackedBytes, - ram_bytes = RamBytes, - persistent_bytes = PersistentBytes}) -> - S = msg_size(MsgStatus), - DeltaTotal = DeltaReady + DeltaUnacked, - DeltaRam = case {InRamBefore, InRamAfter} of - {false, false} -> 0; - {false, true} -> 1; - {true, false} -> -1; - {true, true} -> 0 - end, - DeltaRamReady = case DeltaReady of - 1 -> one_if(InRamAfter); - -1 -> -one_if(InRamBefore); - 0 when ReadyMsgPaged -> DeltaRam; - 0 -> 0 - end, - DeltaPersistent = DeltaTotal * one_if(MsgStatus#msg_status.is_persistent), - State#vqstate{len = ReadyCount + DeltaReady, - ram_msg_count = RamReadyCount + DeltaRamReady, - persistent_count = PersistentCount + DeltaPersistent, - bytes = ReadyBytes + DeltaReady * S, - unacked_bytes = UnackedBytes + DeltaUnacked * S, - ram_bytes = RamBytes + DeltaRam * S, - persistent_bytes = PersistentBytes + DeltaPersistent * S}. - -msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. - -msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined. - -%% first param: AckRequired -remove(true, MsgStatus = #msg_status { - seq_id = SeqId, - is_delivered = IsDelivered, - index_on_disk = IndexOnDisk }, - State = #vqstate {out_counter = OutCount, - index_state = IndexState}) -> - %% Mark it delivered if necessary - IndexState1 = maybe_write_delivered( - IndexOnDisk andalso not IsDelivered, - SeqId, IndexState), - - State1 = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, State), - - State2 = stats({-1, 1}, {MsgStatus, MsgStatus}, State1), - - {SeqId, maybe_update_rates( - State2 #vqstate {out_counter = OutCount + 1, - index_state = IndexState1})}; - -%% This function body has the same behaviour as remove_queue_entries/3 -%% but instead of removing messages based on a ?QUEUE, this removes -%% just one message, the one referenced by the MsgStatus provided. -remove(false, MsgStatus = #msg_status { - seq_id = SeqId, - msg_id = MsgId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_in_store = MsgInStore, - index_on_disk = IndexOnDisk }, - State = #vqstate {out_counter = OutCount, - index_state = IndexState, - msg_store_clients = MSCState}) -> - %% Mark it delivered if necessary - IndexState1 = maybe_write_delivered( - IndexOnDisk andalso not IsDelivered, - SeqId, IndexState), - - %% Remove from msg_store and queue index, if necessary - case MsgInStore of - true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); - false -> ok - end, - - IndexState2 = - case IndexOnDisk of - true -> rabbit_queue_index:ack([SeqId], IndexState1); - false -> IndexState1 - end, - - State1 = stats({-1, 0}, {MsgStatus, none}, State), - - {undefined, maybe_update_rates( - State1 #vqstate {out_counter = OutCount + 1, - index_state = IndexState2})}. - -%% This function exists as a way to improve dropwhile/2 -%% performance. The idea of having this function is to optimise calls -%% to rabbit_queue_index by batching delivers and acks, instead of -%% sending them one by one. -%% -%% Instead of removing every message as their are popped from the -%% queue, it first accumulates them and then removes them by calling -%% remove_queue_entries/3, since the behaviour of -%% remove_queue_entries/3 when used with -%% process_delivers_and_acks_fun(deliver_and_ack) is the same as -%% calling remove(false, MsgStatus, State). -%% -%% remove/3 also updates the out_counter in every call, but here we do -%% it just once at the end. -remove_by_predicate(Pred, State = #vqstate {out_counter = OutCount}) -> - {MsgProps, QAcc, State1} = - collect_by_predicate(Pred, ?QUEUE:new(), State), - State2 = - remove_queue_entries( - QAcc, process_delivers_and_acks_fun(deliver_and_ack), State1), - %% maybe_update_rates/1 is called in remove/2 for every - %% message. Since we update out_counter only once, we call it just - %% there. - {MsgProps, maybe_update_rates( - State2 #vqstate { - out_counter = OutCount + ?QUEUE:len(QAcc)})}. - -%% This function exists as a way to improve fetchwhile/4 -%% performance. The idea of having this function is to optimise calls -%% to rabbit_queue_index by batching delivers, instead of sending them -%% one by one. -%% -%% Fun is the function passed to fetchwhile/4 that's -%% applied to every fetched message and used to build the fetchwhile/4 -%% result accumulator FetchAcc. -fetch_by_predicate(Pred, Fun, FetchAcc, - State = #vqstate { - index_state = IndexState, - out_counter = OutCount}) -> - {MsgProps, QAcc, State1} = - collect_by_predicate(Pred, ?QUEUE:new(), State), - - {Delivers, FetchAcc1, State2} = - process_queue_entries(QAcc, Fun, FetchAcc, State1), - - IndexState1 = rabbit_queue_index:deliver(Delivers, IndexState), - - {MsgProps, FetchAcc1, maybe_update_rates( - State2 #vqstate { - index_state = IndexState1, - out_counter = OutCount + ?QUEUE:len(QAcc)})}. - -%% We try to do here the same as what remove(true, State) does but -%% processing several messages at the same time. The idea is to -%% optimize rabbit_queue_index:deliver/2 calls by sending a list of -%% SeqIds instead of one by one, thus process_queue_entries1 will -%% accumulate the required deliveries, will record_pending_ack for -%% each message, and will update stats, like remove/2 does. -%% -%% For the meaning of Fun and FetchAcc arguments see -%% fetch_by_predicate/4 above. -process_queue_entries(Q, Fun, FetchAcc, State) -> - ?QUEUE:foldl(fun (MsgStatus, Acc) -> - process_queue_entries1(MsgStatus, Fun, Acc) - end, - {[], FetchAcc, State}, Q). - -process_queue_entries1( - #msg_status { seq_id = SeqId, is_delivered = IsDelivered, - index_on_disk = IndexOnDisk} = MsgStatus, - Fun, - {Delivers, FetchAcc, State}) -> - {Msg, State1} = read_msg(MsgStatus, State), - State2 = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, State1), - {cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), - Fun(Msg, SeqId, FetchAcc), - stats({-1, 1}, {MsgStatus, MsgStatus}, State2)}. - -collect_by_predicate(Pred, QAcc, State) -> - case queue_out(State) of - {empty, State1} -> - {undefined, QAcc, State1}; - {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case Pred(MsgProps) of - true -> collect_by_predicate(Pred, ?QUEUE:in(MsgStatus, QAcc), - State1); - false -> {MsgProps, QAcc, in_r(MsgStatus, State1)} - end - end. - -%%---------------------------------------------------------------------------- -%% Helpers for Public API purge/1 function -%%---------------------------------------------------------------------------- + rabbit_variable_queue:depth(State). -%% The difference between purge_when_pending_acks/1 -%% vs. purge_and_index_reset/1 is that the first one issues a deliver -%% and an ack to the queue index for every message that's being -%% removed, while the later just resets the queue index state. -purge_when_pending_acks(State) -> - State1 = purge1(process_delivers_and_acks_fun(deliver_and_ack), State), - a(State1). +set_ram_duration_target(DurationTarget, State) -> + rabbit_variable_queue:set_ram_duration_target(DurationTarget, State). -purge_and_index_reset(State) -> - State1 = purge1(process_delivers_and_acks_fun(none), State), - a(reset_qi_state(State1)). - -%% This function removes messages from each of {q1, q2, q3, q4}. -%% -%% With remove_queue_entries/3 q1 and q4 are emptied, while q2 and q3 -%% are specially handled by purge_betas_and_deltas/2. -%% -%% purge_betas_and_deltas/2 loads messages from the queue index, -%% filling up q3 and in some cases moving messages form q2 to q3 while -%% reseting q2 to an empty queue (see maybe_deltas_to_betas/2). The -%% messages loaded into q3 are removed by calling -%% remove_queue_entries/3 until there are no more messages to be read -%% from the queue index. Messages are read in batches from the queue -%% index. -purge1(AfterFun, State = #vqstate { q4 = Q4}) -> - State1 = remove_queue_entries(Q4, AfterFun, State), - - State2 = #vqstate {q1 = Q1} = - purge_betas_and_deltas(AfterFun, State1#vqstate{q4 = ?QUEUE:new()}), - - State3 = remove_queue_entries(Q1, AfterFun, State2), - - a(State3#vqstate{q1 = ?QUEUE:new()}). - -reset_qi_state(State = #vqstate{index_state = IndexState}) -> - State#vqstate{index_state = - rabbit_queue_index:reset_state(IndexState)}. - -is_pending_ack_empty(State) -> - count_pending_acks(State) =:= 0. - -count_pending_acks(#vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA }) -> - gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA). - -purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { mode = Mode }) -> - State0 = #vqstate { q3 = Q3 } = - case Mode of - lazy -> maybe_deltas_to_betas(DelsAndAcksFun, State); - _ -> State - end, - - case ?QUEUE:is_empty(Q3) of - true -> State0; - false -> State1 = remove_queue_entries(Q3, DelsAndAcksFun, State0), - purge_betas_and_deltas(DelsAndAcksFun, - maybe_deltas_to_betas( - DelsAndAcksFun, - State1#vqstate{q3 = ?QUEUE:new()})) - end. - -remove_queue_entries(Q, DelsAndAcksFun, - State = #vqstate{msg_store_clients = MSCState}) -> - {MsgIdsByStore, Delivers, Acks, State1} = - ?QUEUE:foldl(fun remove_queue_entries1/2, - {orddict:new(), [], [], State}, Q), - remove_msgs_by_id(MsgIdsByStore, MSCState), - DelsAndAcksFun(Delivers, Acks, State1). - -remove_queue_entries1( - #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, - msg_in_store = MsgInStore, index_on_disk = IndexOnDisk, - is_persistent = IsPersistent} = MsgStatus, - {MsgIdsByStore, Delivers, Acks, State}) -> - {case MsgInStore of - true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); - false -> MsgIdsByStore - end, - cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), - cons_if(IndexOnDisk, SeqId, Acks), - stats({-1, 0}, {MsgStatus, none}, State)}. - -process_delivers_and_acks_fun(deliver_and_ack) -> - fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) -> - IndexState1 = - rabbit_queue_index:ack( - Acks, rabbit_queue_index:deliver(Delivers, IndexState)), - State #vqstate { index_state = IndexState1 } - end; -process_delivers_and_acks_fun(_) -> - fun (_, _, State) -> - State - end. - -%%---------------------------------------------------------------------------- -%% Internal gubbins for publishing -%%---------------------------------------------------------------------------- - -publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, - MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - IsDelivered, _ChPid, _Flow, PersistFun, - State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, - mode = default, - qi_embed_msgs_below = IndexMaxSize, - next_seq_id = SeqId, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC }) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), - {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State), - State2 = case ?QUEUE:is_empty(Q3) of - false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; - true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } - end, - InCount1 = InCount + 1, - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - stats({1, 0}, {none, MsgStatus1}, - State2#vqstate{ next_seq_id = SeqId + 1, - in_counter = InCount1, - unconfirmed = UC1 }); -publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, - MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - IsDelivered, _ChPid, _Flow, PersistFun, - State = #vqstate { mode = lazy, - qi_embed_msgs_below = IndexMaxSize, - next_seq_id = SeqId, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC, - delta = Delta }) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), - {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State), - Delta1 = expand_delta(SeqId, Delta), - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - stats(lazy_pub, {lazy, m(MsgStatus1)}, - State1#vqstate{ delta = Delta1, - next_seq_id = SeqId + 1, - in_counter = InCount + 1, - unconfirmed = UC1 }). - -batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) -> - {ChPid, Flow, publish1(Msg, MsgProps, IsDelivered, ChPid, Flow, - fun maybe_prepare_write_to_disk/4, State)}. - -publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, - id = MsgId }, - MsgProps = #message_properties { - needs_confirming = NeedsConfirming }, - _ChPid, _Flow, PersistFun, - State = #vqstate { mode = default, - qi_embed_msgs_below = IndexMaxSize, - next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC }) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), - {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State), - State2 = record_pending_ack(m(MsgStatus1), State1), - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = stats({0, 1}, {none, MsgStatus1}, - State2 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - unconfirmed = UC1 }), - {SeqId, State3}; -publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, - id = MsgId }, - MsgProps = #message_properties { - needs_confirming = NeedsConfirming }, - _ChPid, _Flow, PersistFun, - State = #vqstate { mode = lazy, - qi_embed_msgs_below = IndexMaxSize, - next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC }) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), - {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State), - State2 = record_pending_ack(m(MsgStatus1), State1), - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = stats({0, 1}, {none, MsgStatus1}, - State2 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - unconfirmed = UC1 }), - {SeqId, State3}. - -batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) -> - {SeqId, State1} = - publish_delivered1(Msg, MsgProps, ChPid, Flow, - fun maybe_prepare_write_to_disk/4, - State), - {ChPid, Flow, [SeqId | SeqIds], State1}. - -maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { - msg_in_store = true }, State) -> - {MsgStatus, State}; -maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { - msg = Msg, msg_id = MsgId, - is_persistent = IsPersistent }, - State = #vqstate{ msg_store_clients = MSCState, - disk_write_count = Count}) - when Force orelse IsPersistent -> - case persist_to(MsgStatus) of - msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId, - prepare_to_store(Msg)), - {MsgStatus#msg_status{msg_in_store = true}, - State#vqstate{disk_write_count = Count + 1}}; - queue_index -> {MsgStatus, State} - end; -maybe_write_msg_to_disk(_Force, MsgStatus, State) -> - {MsgStatus, State}. - -%% Due to certain optimizations made inside -%% rabbit_queue_index:pre_publish/7 we need to have two separate -%% functions for index persistence. This one is only used when paging -%% during memory pressure. We didn't want to modify -%% maybe_write_index_to_disk/3 because that function is used in other -%% places. -maybe_batch_write_index_to_disk(_Force, - MsgStatus = #msg_status { - index_on_disk = true }, State) -> - {MsgStatus, State}; -maybe_batch_write_index_to_disk(Force, - MsgStatus = #msg_status { - msg = Msg, - msg_id = MsgId, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_props = MsgProps}, - State = #vqstate { - target_ram_count = TargetRamCount, - disk_write_count = DiskWriteCount, - index_state = IndexState}) - when Force orelse IsPersistent -> - {MsgOrId, DiskWriteCount1} = - case persist_to(MsgStatus) of - msg_store -> {MsgId, DiskWriteCount}; - queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1} - end, - IndexState1 = rabbit_queue_index:pre_publish( - MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, - TargetRamCount, IndexState), - {MsgStatus#msg_status{index_on_disk = true}, - State#vqstate{index_state = IndexState1, - disk_write_count = DiskWriteCount1}}; -maybe_batch_write_index_to_disk(_Force, MsgStatus, State) -> - {MsgStatus, State}. - -maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { - index_on_disk = true }, State) -> - {MsgStatus, State}; -maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - msg = Msg, - msg_id = MsgId, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_props = MsgProps}, - State = #vqstate{target_ram_count = TargetRamCount, - disk_write_count = DiskWriteCount, - index_state = IndexState}) - when Force orelse IsPersistent -> - {MsgOrId, DiskWriteCount1} = - case persist_to(MsgStatus) of - msg_store -> {MsgId, DiskWriteCount}; - queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1} - end, - IndexState1 = rabbit_queue_index:publish( - MsgOrId, SeqId, MsgProps, IsPersistent, TargetRamCount, - IndexState), - IndexState2 = maybe_write_delivered(IsDelivered, SeqId, IndexState1), - {MsgStatus#msg_status{index_on_disk = true}, - State#vqstate{index_state = IndexState2, - disk_write_count = DiskWriteCount1}}; - -maybe_write_index_to_disk(_Force, MsgStatus, State) -> - {MsgStatus, State}. - -maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> - {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), - maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1). - -maybe_prepare_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> - {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), - maybe_batch_write_index_to_disk(ForceIndex, MsgStatus1, State1). - -determine_persist_to(#basic_message{ - content = #content{properties = Props, - properties_bin = PropsBin}}, - #message_properties{size = BodySize}, - IndexMaxSize) -> - %% The >= is so that you can set the env to 0 and never persist - %% to the index. - %% - %% We want this to be fast, so we avoid size(term_to_binary()) - %% here, or using the term size estimation from truncate.erl, both - %% of which are too slow. So instead, if the message body size - %% goes over the limit then we avoid any other checks. - %% - %% If it doesn't we need to decide if the properties will push - %% it past the limit. If we have the encoded properties (usual - %% case) we can just check their size. If we don't (message came - %% via the direct client), we make a guess based on the number of - %% headers. - case BodySize >= IndexMaxSize of - true -> msg_store; - false -> Est = case is_binary(PropsBin) of - true -> BodySize + size(PropsBin); - false -> #'P_basic'{headers = Hs} = Props, - case Hs of - undefined -> 0; - _ -> length(Hs) - end * ?HEADER_GUESS_SIZE + BodySize - end, - case Est >= IndexMaxSize of - true -> msg_store; - false -> queue_index - end - end. - -persist_to(#msg_status{persist_to = To}) -> To. - -prepare_to_store(Msg) -> - Msg#basic_message{ - %% don't persist any recoverable decoded properties - content = rabbit_binary_parser:clear_decoded_content( - Msg #basic_message.content)}. +ram_duration(State) -> + rabbit_variable_queue:ram_duration(State). -%%---------------------------------------------------------------------------- -%% Internal gubbins for acks -%%---------------------------------------------------------------------------- +needs_timeout(State) -> + rabbit_variable_queue:needs_timeout(State). -record_pending_ack(#msg_status { seq_id = SeqId } = MsgStatus, - State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA, - ack_in_counter = AckInCount}) -> - Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end, - {RPA1, DPA1, QPA1} = - case {msg_in_ram(MsgStatus), persist_to(MsgStatus)} of - {false, _} -> {RPA, Insert(DPA), QPA}; - {_, queue_index} -> {RPA, DPA, Insert(QPA)}; - {_, msg_store} -> {Insert(RPA), DPA, QPA} - end, - State #vqstate { ram_pending_ack = RPA1, - disk_pending_ack = DPA1, - qi_pending_ack = QPA1, - ack_in_counter = AckInCount + 1}. - -lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA}) -> - case gb_trees:lookup(SeqId, RPA) of - {value, V} -> V; - none -> case gb_trees:lookup(SeqId, DPA) of - {value, V} -> V; - none -> gb_trees:get(SeqId, QPA) - end - end. - -%% First parameter = UpdateStats -remove_pending_ack(true, SeqId, State) -> - {MsgStatus, State1} = remove_pending_ack(false, SeqId, State), - {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)}; -remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA}) -> - case gb_trees:lookup(SeqId, RPA) of - {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA), - {V, State #vqstate { ram_pending_ack = RPA1 }}; - none -> case gb_trees:lookup(SeqId, DPA) of - {value, V} -> - DPA1 = gb_trees:delete(SeqId, DPA), - {V, State#vqstate{disk_pending_ack = DPA1}}; - none -> - QPA1 = gb_trees:delete(SeqId, QPA), - {gb_trees:get(SeqId, QPA), - State#vqstate{qi_pending_ack = QPA1}} - end - end. - -purge_pending_ack(KeepPersistent, - State = #vqstate { index_state = IndexState, - msg_store_clients = MSCState }) -> - {IndexOnDiskSeqIds, MsgIdsByStore, State1} = purge_pending_ack1(State), - case KeepPersistent of - true -> remove_transient_msgs_by_id(MsgIdsByStore, MSCState), - State1; - false -> IndexState1 = - rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), - remove_msgs_by_id(MsgIdsByStore, MSCState), - State1 #vqstate { index_state = IndexState1 } - end. - -purge_pending_ack_delete_and_terminate( - State = #vqstate { index_state = IndexState, - msg_store_clients = MSCState }) -> - {_, MsgIdsByStore, State1} = purge_pending_ack1(State), - IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState), - remove_msgs_by_id(MsgIdsByStore, MSCState), - State1 #vqstate { index_state = IndexState1 }. - -purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA }) -> - F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end, - {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = - rabbit_misc:gb_trees_fold( - F, rabbit_misc:gb_trees_fold( - F, rabbit_misc:gb_trees_fold( - F, accumulate_ack_init(), RPA), DPA), QPA), - State1 = State #vqstate { ram_pending_ack = gb_trees:empty(), - disk_pending_ack = gb_trees:empty(), - qi_pending_ack = gb_trees:empty()}, - {IndexOnDiskSeqIds, MsgIdsByStore, State1}. - -%% MsgIdsByStore is an orddict with two keys: -%% -%% true: holds a list of Persistent Message Ids. -%% false: holds a list of Transient Message Ids. -%% -%% When we call orddict:to_list/1 we get two sets of msg ids, where -%% IsPersistent is either true for persistent messages or false for -%% transient ones. The msg_store_remove/3 function takes this boolean -%% flag to determine from which store the messages should be removed -%% from. -remove_msgs_by_id(MsgIdsByStore, MSCState) -> - [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) - || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)]. - -remove_transient_msgs_by_id(MsgIdsByStore, MSCState) -> - case orddict:find(false, MsgIdsByStore) of - error -> ok; - {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds) - end. - -accumulate_ack_init() -> {[], orddict:new(), []}. - -accumulate_ack(#msg_status { seq_id = SeqId, - msg_id = MsgId, - is_persistent = IsPersistent, - msg_in_store = MsgInStore, - index_on_disk = IndexOnDisk }, - {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> - {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc), - case MsgInStore of - true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); - false -> MsgIdsByStore - end, - [MsgId | AllMsgIds]}. +timeout(State) -> + rabbit_variable_queue:timeout(State). -%%---------------------------------------------------------------------------- -%% Internal plumbing for confirms (aka publisher acks) -%%---------------------------------------------------------------------------- +handle_pre_hibernate(State) -> + rabbit_variable_queue:handle_pre_hibernate(State). -record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC, - confirmed = C }) -> - State #vqstate { - msgs_on_disk = rabbit_misc:gb_sets_difference(MOD, MsgIdSet), - msg_indices_on_disk = rabbit_misc:gb_sets_difference(MIOD, MsgIdSet), - unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet), - confirmed = gb_sets:union(C, MsgIdSet) }. - -msgs_written_to_disk(Callback, MsgIdSet, ignored) -> - Callback(?MODULE, - fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end); -msgs_written_to_disk(Callback, MsgIdSet, written) -> - Callback(?MODULE, - fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - Confirmed = gb_sets:intersection(UC, MsgIdSet), - record_confirms(gb_sets:intersection(MsgIdSet, MIOD), - State #vqstate { - msgs_on_disk = - gb_sets:union(MOD, Confirmed) }) - end). - -msg_indices_written_to_disk(Callback, MsgIdSet) -> - Callback(?MODULE, - fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - Confirmed = gb_sets:intersection(UC, MsgIdSet), - record_confirms(gb_sets:intersection(MsgIdSet, MOD), - State #vqstate { - msg_indices_on_disk = - gb_sets:union(MIOD, Confirmed) }) - end). - -msgs_and_indices_written_to_disk(Callback, MsgIdSet) -> - Callback(?MODULE, - fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end). +resume(State) -> rabbit_variable_queue:resume(State). -%%---------------------------------------------------------------------------- -%% Internal plumbing for requeue -%%---------------------------------------------------------------------------- +msg_rates(State) -> + rabbit_variable_queue:msg_rates(State). -publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> - {Msg, State1} = read_msg(MsgStatus, State), - MsgStatus1 = MsgStatus#msg_status { msg = Msg }, - {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, State1)}; -publish_alpha(MsgStatus, State) -> - {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, State)}. - -publish_beta(MsgStatus, State) -> - {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), - MsgStatus2 = m(trim_msg_status(MsgStatus1)), - {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, State1)}. - -%% Rebuild queue, inserting sequence ids to maintain ordering -queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> - queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds, - Limit, PubFun, State). - -queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, - Limit, PubFun, State) - when Limit == undefined orelse SeqId < Limit -> - case ?QUEUE:out(Q) of - {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1} - when SeqIdQ < SeqId -> - %% enqueue from the remaining queue - queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds, - Limit, PubFun, State); - {_, _Q1} -> - %% enqueue from the remaining list of sequence ids - {MsgStatus, State1} = msg_from_pending_ack(SeqId, State), - {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = - PubFun(MsgStatus, State1), - queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], - Limit, PubFun, State2) - end; -queue_merge(SeqIds, Q, Front, MsgIds, - _Limit, _PubFun, State) -> - {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}. - -delta_merge([], Delta, MsgIds, State) -> - {Delta, MsgIds, State}; -delta_merge(SeqIds, Delta, MsgIds, State) -> - lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) -> - {#msg_status { msg_id = MsgId } = MsgStatus, State1} = - msg_from_pending_ack(SeqId, State0), - {_MsgStatus, State2} = - maybe_write_to_disk(true, true, MsgStatus, State1), - {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], - stats({1, -1}, {MsgStatus, none}, State2)} - end, {Delta, MsgIds, State}, SeqIds). - -%% Mostly opposite of record_pending_ack/2 -msg_from_pending_ack(SeqId, State) -> - {#msg_status { msg_props = MsgProps } = MsgStatus, State1} = - remove_pending_ack(false, SeqId, State), - {MsgStatus #msg_status { - msg_props = MsgProps #message_properties { needs_confirming = false } }, - State1}. - -beta_limit(Q) -> - case ?QUEUE:peek(Q) of - {value, #msg_status { seq_id = SeqId }} -> SeqId; - empty -> undefined - end. - -delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; -delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. +info(Info, State) -> + rabbit_variable_queue:info(Info, State). -%%---------------------------------------------------------------------------- -%% Iterator -%%---------------------------------------------------------------------------- +invoke(Module, Fun, State) -> rabbit_variable_queue:invoke(Module, Fun, State). -ram_ack_iterator(State) -> - {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}. - -disk_ack_iterator(State) -> - {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}. - -qi_ack_iterator(State) -> - {ack, gb_trees:iterator(State#vqstate.qi_pending_ack)}. - -msg_iterator(State) -> istate(start, State). - -istate(start, State) -> {q4, State#vqstate.q4, State}; -istate(q4, State) -> {q3, State#vqstate.q3, State}; -istate(q3, State) -> {delta, State#vqstate.delta, State}; -istate(delta, State) -> {q2, State#vqstate.q2, State}; -istate(q2, State) -> {q1, State#vqstate.q1, State}; -istate(q1, _State) -> done. - -next({ack, It}, IndexState) -> - case gb_trees:next(It) of - none -> {empty, IndexState}; - {_SeqId, MsgStatus, It1} -> Next = {ack, It1}, - {value, MsgStatus, true, Next, IndexState} - end; -next(done, IndexState) -> {empty, IndexState}; -next({delta, #delta{start_seq_id = SeqId, - end_seq_id = SeqId}, State}, IndexState) -> - next(istate(delta, State), IndexState); -next({delta, #delta{start_seq_id = SeqId, - end_seq_id = SeqIdEnd} = Delta, State}, IndexState) -> - SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId), - SeqId1 = lists:min([SeqIdB, SeqIdEnd]), - {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState), - next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1); -next({delta, Delta, [], State}, IndexState) -> - next({delta, Delta, State}, IndexState); -next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) -> - case is_msg_in_pending_acks(SeqId, State) of - false -> Next = {delta, Delta, Rest, State}, - {value, beta_msg_status(M), false, Next, IndexState}; - true -> next({delta, Delta, Rest, State}, IndexState) - end; -next({Key, Q, State}, IndexState) -> - case ?QUEUE:out(Q) of - {empty, _Q} -> next(istate(Key, State), IndexState); - {{value, MsgStatus}, QN} -> Next = {Key, QN, State}, - {value, MsgStatus, false, Next, IndexState} - end. - -inext(It, {Its, IndexState}) -> - case next(It, IndexState) of - {empty, IndexState1} -> - {Its, IndexState1}; - {value, MsgStatus1, Unacked, It1, IndexState1} -> - {[{MsgStatus1, Unacked, It1} | Its], IndexState1} - end. - -ifold(_Fun, Acc, [], State) -> - {Acc, State}; -ifold(Fun, Acc, Its, State) -> - [{MsgStatus, Unacked, It} | Rest] = - lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _}, - {#msg_status{seq_id = SeqId2}, _, _}) -> - SeqId1 =< SeqId2 - end, Its), - {Msg, State1} = read_msg(MsgStatus, State), - case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of - {stop, Acc1} -> - {Acc1, State}; - {cont, Acc1} -> - {Its1, IndexState1} = inext(It, {Rest, State1#vqstate.index_state}), - ifold(Fun, Acc1, Its1, State1#vqstate{index_state = IndexState1}) - end. +is_duplicate(Msg, State) -> rabbit_variable_queue:is_duplicate(Msg, State). -%%---------------------------------------------------------------------------- -%% Phase changes -%%---------------------------------------------------------------------------- +set_queue_mode(Mode, State) -> + rabbit_variable_queue:set_queue_mode(Mode, State). -reduce_memory_use(State = #vqstate { target_ram_count = infinity }) -> - State; -reduce_memory_use(State = #vqstate { - mode = default, - ram_pending_ack = RPA, - ram_msg_count = RamMsgCount, - target_ram_count = TargetRamCount, - io_batch_size = IoBatchSize, - rates = #rates { in = AvgIngress, - out = AvgEgress, - ack_in = AvgAckIngress, - ack_out = AvgAckEgress } }) -> - - State1 = #vqstate { q2 = Q2, q3 = Q3 } = - case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of - 0 -> State; - %% Reduce memory of pending acks and alphas. The order is - %% determined based on which is growing faster. Whichever - %% comes second may very well get a quota of 0 if the - %% first manages to push out the max number of messages. - S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) > - (AvgIngress - AvgEgress)) of - true -> [fun limit_ram_acks/2, - fun push_alphas_to_betas/2]; - false -> [fun push_alphas_to_betas/2, - fun limit_ram_acks/2] - end, - {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> - ReduceFun(QuotaN, StateN) - end, {S1, State}, Funs), - State2 - end, - - State3 = - case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), - permitted_beta_count(State1)) of - S2 when S2 >= IoBatchSize -> - %% There is an implicit, but subtle, upper bound here. We - %% may shuffle a lot of messages from Q2/3 into delta, but - %% the number of these that require any disk operation, - %% namely index writing, i.e. messages that are genuine - %% betas and not gammas, is bounded by the credit_flow - %% limiting of the alpha->beta conversion above. - push_betas_to_deltas(S2, State1); - _ -> - State1 - end, - %% See rabbitmq-server-290 for the reasons behind this GC call. - garbage_collect(), - State3; -%% When using lazy queues, there are no alphas, so we don't need to -%% call push_alphas_to_betas/2. -reduce_memory_use(State = #vqstate { - mode = lazy, - ram_pending_ack = RPA, - ram_msg_count = RamMsgCount, - target_ram_count = TargetRamCount }) -> - State1 = #vqstate { q3 = Q3 } = - case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of - 0 -> State; - S1 -> {_, State2} = limit_ram_acks(S1, State), - State2 - end, - - State3 = - case chunk_size(?QUEUE:len(Q3), - permitted_beta_count(State1)) of - 0 -> - State1; - S2 -> - push_betas_to_deltas(S2, State1) - end, - garbage_collect(), - State3. - -limit_ram_acks(0, State) -> - {0, ui(State)}; -limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> - case gb_trees:is_empty(RPA) of - true -> - {Quota, ui(State)}; - false -> - {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA), - {MsgStatus1, State1} = - maybe_prepare_write_to_disk(true, false, MsgStatus, State), - MsgStatus2 = m(trim_msg_status(MsgStatus1)), - DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA), - limit_ram_acks(Quota - 1, - stats({0, 0}, {MsgStatus, MsgStatus2}, - State1 #vqstate { ram_pending_ack = RPA1, - disk_pending_ack = DPA1 })) - end. - -permitted_beta_count(#vqstate { len = 0 }) -> - infinity; -permitted_beta_count(#vqstate { mode = lazy, - target_ram_count = TargetRamCount}) -> - TargetRamCount; -permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) -> - lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]); -permitted_beta_count(#vqstate { q1 = Q1, - q4 = Q4, - target_ram_count = TargetRamCount, - len = Len }) -> - BetaDelta = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4), - lists:max([rabbit_queue_index:next_segment_boundary(0), - BetaDelta - ((BetaDelta * BetaDelta) div - (BetaDelta + TargetRamCount))]). - -chunk_size(Current, Permitted) - when Permitted =:= infinity orelse Permitted >= Current -> - 0; -chunk_size(Current, Permitted) -> - Current - Permitted. - -fetch_from_q3(State = #vqstate { mode = default, - q1 = Q1, - q2 = Q2, - delta = #delta { count = DeltaCount }, - q3 = Q3, - q4 = Q4 }) -> - case ?QUEUE:out(Q3) of - {empty, _Q3} -> - {empty, State}; - {{value, MsgStatus}, Q3a} -> - State1 = State #vqstate { q3 = Q3a }, - State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of - {true, true} -> - %% q3 is now empty, it wasn't before; - %% delta is still empty. So q2 must be - %% empty, and we know q4 is empty - %% otherwise we wouldn't be loading from - %% q3. As such, we can just set q4 to Q1. - true = ?QUEUE:is_empty(Q2), %% ASSERTION - true = ?QUEUE:is_empty(Q4), %% ASSERTION - State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 }; - {true, false} -> - maybe_deltas_to_betas(State1); - {false, _} -> - %% q3 still isn't empty, we've not - %% touched delta, so the invariants - %% between q1, q2, delta and q3 are - %% maintained - State1 - end, - {loaded, {MsgStatus, State2}} - end; -%% lazy queues -fetch_from_q3(State = #vqstate { mode = lazy, - delta = #delta { count = DeltaCount }, - q3 = Q3 }) -> - case ?QUEUE:out(Q3) of - {empty, _Q3} when DeltaCount =:= 0 -> - {empty, State}; - {empty, _Q3} -> - fetch_from_q3(maybe_deltas_to_betas(State)); - {{value, MsgStatus}, Q3a} -> - State1 = State #vqstate { q3 = Q3a }, - {loaded, {MsgStatus, State1}} - end. - -maybe_deltas_to_betas(State) -> - AfterFun = process_delivers_and_acks_fun(deliver_and_ack), - maybe_deltas_to_betas(AfterFun, State). - -maybe_deltas_to_betas(_DelsAndAcksFun, - State = #vqstate {delta = ?BLANK_DELTA_PATTERN(X) }) -> - State; -maybe_deltas_to_betas(DelsAndAcksFun, - State = #vqstate { - q2 = Q2, - delta = Delta, - q3 = Q3, - index_state = IndexState, - ram_msg_count = RamMsgCount, - ram_bytes = RamBytes, - disk_read_count = DiskReadCount, - transient_threshold = TransientThreshold }) -> - #delta { start_seq_id = DeltaSeqId, - count = DeltaCount, - end_seq_id = DeltaSeqIdEnd } = Delta, - DeltaSeqId1 = - lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), - DeltaSeqIdEnd]), - {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, - IndexState), - {Q3a, RamCountsInc, RamBytesInc, State1} = - betas_from_index_entries(List, TransientThreshold, - DelsAndAcksFun, - State #vqstate { index_state = IndexState1 }), - State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc, - ram_bytes = RamBytes + RamBytesInc, - disk_read_count = DiskReadCount + RamCountsInc }, - case ?QUEUE:len(Q3a) of - 0 -> - %% we ignored every message in the segment due to it being - %% transient and below the threshold - maybe_deltas_to_betas( - DelsAndAcksFun, - State2 #vqstate { - delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })}); - Q3aLen -> - Q3b = ?QUEUE:join(Q3, Q3a), - case DeltaCount - Q3aLen of - 0 -> - %% delta is now empty, but it wasn't before, so - %% can now join q2 onto q3 - State2 #vqstate { q2 = ?QUEUE:new(), - delta = ?BLANK_DELTA, - q3 = ?QUEUE:join(Q3b, Q2) }; - N when N > 0 -> - Delta1 = d(#delta { start_seq_id = DeltaSeqId1, - count = N, - end_seq_id = DeltaSeqIdEnd }), - State2 #vqstate { delta = Delta1, - q3 = Q3b } - end - end. - -push_alphas_to_betas(Quota, State) -> - {Quota1, State1} = - push_alphas_to_betas( - fun ?QUEUE:out/1, - fun (MsgStatus, Q1a, - State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> - State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) }; - (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) -> - State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) } - end, Quota, State #vqstate.q1, State), - {Quota2, State2} = - push_alphas_to_betas( - fun ?QUEUE:out_r/1, - fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) -> - State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a } - end, Quota1, State1 #vqstate.q4, State1), - {Quota2, State2}. - -push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, - State = #vqstate { ram_msg_count = RamMsgCount, - target_ram_count = TargetRamCount }) - when Quota =:= 0 orelse - TargetRamCount =:= infinity orelse - TargetRamCount >= RamMsgCount -> - {Quota, ui(State)}; -push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> - %% We consume credits from the message_store whenever we need to - %% persist a message to disk. See: - %% rabbit_variable_queue:msg_store_write/4. So perhaps the - %% msg_store is trying to throttle down our queue. - case credit_flow:blocked() of - true -> {Quota, ui(State)}; - false -> case Generator(Q) of - {empty, _Q} -> - {Quota, ui(State)}; - {{value, MsgStatus}, Qa} -> - {MsgStatus1, State1} = - maybe_prepare_write_to_disk(true, false, MsgStatus, - State), - MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = stats( - ready0, {MsgStatus, MsgStatus2}, State1), - State3 = Consumer(MsgStatus2, Qa, State2), - push_alphas_to_betas(Generator, Consumer, Quota - 1, - Qa, State3) - end - end. - -push_betas_to_deltas(Quota, State = #vqstate { mode = default, - q2 = Q2, - delta = Delta, - q3 = Q3}) -> - PushState = {Quota, Delta, State}, - {Q3a, PushState1} = push_betas_to_deltas( - fun ?QUEUE:out_r/1, - fun rabbit_queue_index:next_segment_boundary/1, - Q3, PushState), - {Q2a, PushState2} = push_betas_to_deltas( - fun ?QUEUE:out/1, - fun (Q2MinSeqId) -> Q2MinSeqId end, - Q2, PushState1), - {_, Delta1, State1} = PushState2, - State1 #vqstate { q2 = Q2a, - delta = Delta1, - q3 = Q3a }; -%% In the case of lazy queues we want to page as many messages as -%% possible from q3. -push_betas_to_deltas(Quota, State = #vqstate { mode = lazy, - delta = Delta, - q3 = Q3}) -> - PushState = {Quota, Delta, State}, - {Q3a, PushState1} = push_betas_to_deltas( - fun ?QUEUE:out_r/1, - fun (Q2MinSeqId) -> Q2MinSeqId end, - Q3, PushState), - {_, Delta1, State1} = PushState1, - State1 #vqstate { delta = Delta1, - q3 = Q3a }. - - -push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> - case ?QUEUE:is_empty(Q) of - true -> - {Q, PushState}; - false -> - {value, #msg_status { seq_id = MinSeqId }} = ?QUEUE:peek(Q), - {value, #msg_status { seq_id = MaxSeqId }} = ?QUEUE:peek_r(Q), - Limit = LimitFun(MinSeqId), - case MaxSeqId < Limit of - true -> {Q, PushState}; - false -> push_betas_to_deltas1(Generator, Limit, Q, PushState) - end - end. - -push_betas_to_deltas1(_Generator, _Limit, Q, {0, Delta, State}) -> - {Q, {0, Delta, ui(State)}}; -push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State}) -> - case Generator(Q) of - {empty, _Q} -> - {Q, {Quota, Delta, ui(State)}}; - {{value, #msg_status { seq_id = SeqId }}, _Qa} - when SeqId < Limit -> - {Q, {Quota, Delta, ui(State)}}; - {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> - {#msg_status { index_on_disk = true }, State1} = - maybe_batch_write_index_to_disk(true, MsgStatus, State), - State2 = stats(ready0, {MsgStatus, none}, State1), - Delta1 = expand_delta(SeqId, Delta), - push_betas_to_deltas1(Generator, Limit, Qa, - {Quota - 1, Delta1, State2}) - end. - -%% Flushes queue index batch caches and updates queue index state. -ui(#vqstate{index_state = IndexState, - target_ram_count = TargetRamCount} = State) -> - IndexState1 = rabbit_queue_index:flush_pre_publish_cache( - TargetRamCount, IndexState), - State#vqstate{index_state = IndexState1}. +zip_msgs_and_acks(Msgs, AckTags, Accumulator, State) -> + rabbit_variable_queue:zip_msgs_and_acks(Msgs, AckTags, Accumulator, State). %% Delay maybe_delay(QPA) -> @@ -2430,27 +327,3 @@ is_timeout_test([#msg_status{ _ -> is_timeout_test(Rem) end; is_timeout_test([_|Rem]) -> is_timeout_test(Rem). - -%%---------------------------------------------------------------------------- -%% Upgrading -%%---------------------------------------------------------------------------- - -multiple_routing_keys() -> - transform_storage( - fun ({basic_message, ExchangeName, Routing_Key, Content, - MsgId, Persistent}) -> - {ok, {basic_message, ExchangeName, [Routing_Key], Content, - MsgId, Persistent}}; - (_) -> {error, corrupt_message} - end), - ok. - - -%% Assumes message store is not running -transform_storage(TransformFun) -> - transform_store(?PERSISTENT_MSG_STORE, TransformFun), - transform_store(?TRANSIENT_MSG_STORE, TransformFun). - -transform_store(Store, TransformFun) -> - rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store), - rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun). |
