diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-08 12:31:31 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-08 12:31:31 +0000 |
| commit | d5b92f6705b041a3ba89315ed27997b6ff139130 (patch) | |
| tree | bc2cef0cd4741e52711e61af29b002e045f5810c | |
| parent | f6a62256f284304daec565b933269e83bbd4e4f3 (diff) | |
| parent | 1ffd85e580b5d5e8e7bb6b7123126933aa1cf168 (diff) | |
| download | rabbitmq-server-git-d5b92f6705b041a3ba89315ed27997b6ff139130.tar.gz | |
merge bug23482 into default (Detect already-running broker)
| -rw-r--r-- | Makefile | 3 | ||||
| -rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
| -rw-r--r-- | packaging/macports/Portfile.in | 14 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 89 | ||||
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 314 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_persister.erl | 496 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 239 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 41 |
11 files changed, 193 insertions, 1015 deletions
@@ -178,9 +178,6 @@ start-rabbit-on-node: all stop-rabbit-on-node: all echo "rabbit:stop()." | $(ERL_CALL) -force-snapshot: all - echo "rabbit_persister:force_snapshot()." | $(ERL_CALL) - set-memory-alarm: all echo "alarm_handler:set_alarm({vm_memory_high_watermark, []})." | \ $(ERL_CALL) diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 17d05a9919..6c33ef8b16 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -6,7 +6,6 @@ {registered, [rabbit_amqqueue_sup, rabbit_log, rabbit_node_monitor, - rabbit_persister, rabbit_router, rabbit_sup, rabbit_tcp_client_sup]}, diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index ce6b1e34a7..f8417b83b9 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -7,6 +7,8 @@ version @VERSION@ categories net maintainers paperplanes.de:meyer rabbitmq.com:tonyg openmaintainer platforms darwin +supported_archs noarch + description The RabbitMQ AMQP Server long_description \ RabbitMQ is an implementation of AMQP, the emerging standard for \ @@ -31,17 +33,13 @@ checksums \ depends_lib port:erlang depends_build port:libxslt -platform darwin 7 { - depends_build-append port:py25-simplejson - build.args PYTHON=${prefix}/bin/python2.5 -} platform darwin 8 { - depends_build-append port:py25-simplejson - build.args PYTHON=${prefix}/bin/python2.5 + depends_build-append port:py26-simplejson + build.args PYTHON=${prefix}/bin/python2.6 } platform darwin 9 { - depends_build-append port:py25-simplejson - build.args PYTHON=${prefix}/bin/python2.5 + depends_build-append port:py26-simplejson + build.args PYTHON=${prefix}/bin/python2.6 } # no need for simplejson on Snow Leopard or higher diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3dbd2b2299..25859c22f9 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -491,9 +491,8 @@ attempt_delivery(#delivery{txn = none, {AckTag, BQS1} = BQ:publish_delivered( AckRequired, Message, - ?BASE_MESSAGE_PROPERTIES - #message_properties{ - needs_confirming = NeedsConfirming}, + (?BASE_MESSAGE_PROPERTIES)#message_properties{ + needs_confirming = NeedsConfirming}, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b2e6658b29..0c8ad00ae3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -49,7 +49,7 @@ uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, published_count, confirm_multiple, confirm_tref, + confirm_enabled, publish_seqno, confirm_multiple, confirm_tref, held_confirms, unconfirmed, queues_for_msg}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -191,7 +191,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, queue_collector_pid = CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, - published_count = 0, + publish_seqno = 0, confirm_multiple = false, held_confirms = gb_sets:new(), unconfirmed = gb_sets:new(), @@ -460,20 +460,19 @@ send_or_enqueue_ack(undefined, _QPid, State) -> send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> State; send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> - do_if_unconfirmed( - MsgSeqNo, QPid, - fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command( - WriterPid, #'basic.ack'{delivery_tag = MSN}), - State1 - end, State); + do_if_unconfirmed(MsgSeqNo, QPid, + fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.ack'{ + delivery_tag = MSN}), + State1 + end, State); send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> - do_if_unconfirmed( - MsgSeqNo, QPid, - fun(MSN, State1 = #ch{held_confirms = As}) -> - start_confirm_timer( - State1#ch{held_confirms = gb_sets:add(MSN, As)}) - end, State). + do_if_unconfirmed(MsgSeqNo, QPid, + fun(MSN, State1 = #ch{held_confirms = As}) -> + start_confirm_timer( + State1#ch{held_confirms = gb_sets:add(MSN, As)}) + end, State). do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, State = #ch{unconfirmed = UC, @@ -484,9 +483,8 @@ do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC), case QPid of undefined -> - ConfirmFun(MsgSeqNo, - State#ch{unconfirmed = Unconfirmed1}); - _ -> + ConfirmFun(MsgSeqNo, State#ch{unconfirmed = Unconfirmed1}); + _ -> {ok, Qs} = dict:find(MsgSeqNo, QFM), Qs1 = sets:del_element(QPid, Qs), case sets:size(Qs1) of @@ -499,7 +497,8 @@ do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, dict:store(MsgSeqNo, Qs1, QFM)} end end; - false -> State + false -> + State end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -537,11 +536,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, {MsgSeqNo, State1} = case ConfirmEnabled of false -> {undefined, State}; - true -> Count = State#ch.published_count, - {Count, - State#ch{published_count = Count + 1, + true -> SeqNo = State#ch.publish_seqno, + {SeqNo, + State#ch{publish_seqno = SeqNo + 1, unconfirmed = - gb_sets:add(Count, State#ch.unconfirmed)}} + gb_sets:add(SeqNo, State#ch.unconfirmed)}} end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -1359,35 +1358,27 @@ stop_confirm_timer(State = #ch{confirm_tref = TRef}) -> State#ch{confirm_tref = undefined}. flush_multiple(State = #ch{writer_pid = WriterPid, - held_confirms = Cs, - unconfirmed = UC}) -> + held_confirms = Cs}) -> case gb_sets:is_empty(Cs) of - true -> State; + true -> State#ch{confirm_tref = undefined}; false -> [First | Rest] = gb_sets:to_list(Cs), - [rabbit_writer:send_command(WriterPid, - #'basic.ack'{delivery_tag = T}) || - T <- case Rest of - [] -> [First]; - _ -> flush_multiple( - First, Rest, WriterPid, - case gb_sets:is_empty(UC) of - false -> gb_sets:smallest(UC); - true -> gb_sets:largest(Cs) + 1 - end) - end], + {Mult, Inds} = find_consecutive_sequence(First, Rest), + ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = Mult, multiple = true}), + ok = lists:foldl( + fun(T, ok) -> rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = T}) + end, ok, Inds), State#ch{held_confirms = gb_sets:new(), confirm_tref = undefined} end. -flush_multiple(Prev, [Cur | Rest], WriterPid, SNA) -> - ExpNext = Prev + 1, - case {SNA >= Cur, Cur} of - {true, ExpNext} -> flush_multiple(Cur, Rest, WriterPid, SNA); - _ -> flush_multiple(Prev, [], WriterPid, SNA), - [Cur | Rest] - end; -flush_multiple(Prev, [], WriterPid, _) -> - ok = rabbit_writer:send_command(WriterPid, - #'basic.ack'{delivery_tag = Prev, - multiple = true}), - []. +%% Find longest sequence of consecutive numbers at the beginning. +find_consecutive_sequence(Last, []) -> + {Last, []}; +find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) -> + find_consecutive_sequence(N, Ns); +find_consecutive_sequence(Last, Ns) -> + {Last, Ns}. diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl deleted file mode 100644 index 5a0532eac1..0000000000 --- a/src/rabbit_invariable_queue.erl +++ /dev/null @@ -1,314 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_invariable_queue). - --export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, - publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, - dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, - set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, - idle_timeout/1, handle_pre_hibernate/1, status/1]). - --export([start/1, stop/0]). - --behaviour(rabbit_backing_queue). - --include("rabbit.hrl"). - --record(iv_state, { queue, qname, durable, len, pending_ack }). --record(tx, { pending_messages, pending_acks, is_persistent }). - --ifdef(use_specs). - --type(ack() :: rabbit_guid:guid() | 'blank_ack'). --type(state() :: #iv_state { queue :: queue(), - qname :: rabbit_amqqueue:name(), - len :: non_neg_integer(), - pending_ack :: dict() - }). --include("rabbit_backing_queue_spec.hrl"). - --endif. - -start(DurableQueues) -> - ok = rabbit_sup:start_child(rabbit_persister, [DurableQueues]). - -stop() -> - ok = rabbit_sup:stop_child(rabbit_persister). - -init(QName, IsDurable, Recover) -> - Q = queue:from_list(case IsDurable andalso Recover of - true -> rabbit_persister:queue_content(QName); - false -> [] - end), - #iv_state { queue = Q, - qname = QName, - durable = IsDurable, - len = queue:len(Q), - pending_ack = dict:new() }. - -terminate(State) -> - State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }. - -delete_and_terminate(State = #iv_state { qname = QName, durable = IsDurable, - pending_ack = PA }) -> - ok = persist_acks(QName, IsDurable, none, dict:fetch_keys(PA), PA), - {_PLen, State1} = purge(State), - terminate(State1). - -purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, - len = Len }) -> - %% We do not purge messages pending acks. - {AckTags, PA} = - rabbit_misc:queue_fold( - fun ({#basic_message { is_persistent = false }, - _MsgProps, _IsDelivered}, Acc) -> - Acc; - ({Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}, - {AckTagsN, PAN}) -> - ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), - {[Guid | AckTagsN], store_ack(Msg, MsgProps, PAN)} - end, {[], dict:new()}, Q), - ok = persist_acks(QName, IsDurable, none, AckTags, PA), - {Len, State #iv_state { len = 0, queue = queue:new() }}. - -publish(Msg, MsgProps, State = #iv_state { queue = Q, - qname = QName, - durable = IsDurable, - len = Len }) -> - ok = persist_message(QName, IsDurable, none, Msg, MsgProps), - State #iv_state { queue = enqueue(Msg, MsgProps, false, Q), len = Len + 1 }. - -publish_delivered(false, _Msg, _MsgProps, State) -> - {blank_ack, State}; -publish_delivered(true, Msg = #basic_message { guid = Guid }, - MsgProps, - State = #iv_state { qname = QName, durable = IsDurable, - len = 0, pending_ack = PA }) -> - ok = persist_message(QName, IsDurable, none, Msg, MsgProps), - ok = persist_delivery(QName, IsDurable, false, Msg), - {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}. - -dropwhile(_Pred, State = #iv_state { len = 0 }) -> - State; -dropwhile(Pred, State = #iv_state { queue = Q }) -> - {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), - case Pred(MsgProps) of - true -> {_, State1} = fetch_internal(false, Q1, Msg, MsgProps, - IsDelivered, State), - dropwhile(Pred, State1); - false -> State - end. - -fetch(_AckRequired, State = #iv_state { len = 0 }) -> - {empty, State}; -fetch(AckRequired, State = #iv_state { queue = Q }) -> - {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), - fetch_internal(AckRequired, Q1, Msg, MsgProps, IsDelivered, State). - -fetch_internal(AckRequired, Q1, - Msg = #basic_message { guid = Guid }, - MsgProps, IsDelivered, - State = #iv_state { len = Len, - qname = QName, - durable = IsDurable, - pending_ack = PA }) -> - Len1 = Len - 1, - ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), - PA1 = store_ack(Msg, MsgProps, PA), - {AckTag, PA2} = case AckRequired of - true -> {Guid, PA1}; - false -> ok = persist_acks(QName, IsDurable, none, - [Guid], PA1), - {blank_ack, PA} - end, - {{Msg, IsDelivered, AckTag, Len1}, - State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}. - -ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, - pending_ack = PA }) -> - ok = persist_acks(QName, IsDurable, none, AckTags, PA), - PA1 = remove_acks(AckTags, PA), - State #iv_state { pending_ack = PA1 }. - -tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName, - durable = IsDurable }) -> - Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), - ok = persist_message(QName, IsDurable, Txn, Msg, MsgProps), - State. - -tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable, - pending_ack = PA }) -> - Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), - ok = persist_acks(QName, IsDurable, Txn, AckTags, PA), - State. - -tx_rollback(Txn, State = #iv_state { qname = QName }) -> - #tx { pending_acks = AckTags } = lookup_tx(Txn), - ok = do_if_persistent(fun rabbit_persister:rollback_transaction/1, - Txn, QName), - erase_tx(Txn), - {lists:flatten(AckTags), State}. - -tx_commit(Txn, Fun, MsgPropsFun, State = #iv_state { qname = QName, - pending_ack = PA, - queue = Q, - len = Len }) -> - #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn), - ok = do_if_persistent(fun rabbit_persister:commit_transaction/1, - Txn, QName), - erase_tx(Txn), - Fun(), - AckTags1 = lists:flatten(AckTags), - PA1 = remove_acks(AckTags1, PA), - {Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) -> - {enqueue(Msg, MsgPropsFun(MsgProps), - false, QN), - LenN + 1} - end, {Q, Len}, PubsRev), - {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}. - -requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, - queue = Q, - len = Len }) -> - %% We don't need to touch the persister here - the persister will - %% already have these messages published and delivered as - %% necessary. The complication is that the persister's seq_id will - %% now be wrong, given the position of these messages in our queue - %% here. However, the persister's seq_id is only used for sorting - %% on startup, and requeue is silent as to where the requeued - %% messages should appear, thus the persister is permitted to sort - %% based on seq_id, even though it'll likely give a different - %% order to the last known state of our queue, prior to shutdown. - {Q1, Len1} = lists:foldl( - fun (Guid, {QN, LenN}) -> - {Msg = #basic_message {}, MsgProps} - = dict:fetch(Guid, PA), - {enqueue(Msg, MsgPropsFun(MsgProps), true, QN), - LenN + 1} - end, {Q, Len}, AckTags), - PA1 = remove_acks(AckTags, PA), - State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }. - -enqueue(Msg, MsgProps, IsDelivered, Q) -> - queue:in({Msg, MsgProps, IsDelivered}, Q). - -len(#iv_state { len = Len }) -> Len. - -is_empty(State) -> 0 == len(State). - -set_ram_duration_target(_DurationTarget, State) -> State. - -ram_duration(State) -> {0, State}. - -needs_idle_timeout(_State) -> false. - -idle_timeout(State) -> State. - -handle_pre_hibernate(State) -> State. - -status(_State) -> []. - -%%---------------------------------------------------------------------------- - -remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags). - -store_ack(Msg = #basic_message { guid = Guid }, MsgProps, PA) -> - dict:store(Guid, {Msg, MsgProps}, PA). - -%%---------------------------------------------------------------------------- - -lookup_tx(Txn) -> - case get({txn, Txn}) of - undefined -> #tx { pending_messages = [], - pending_acks = [], - is_persistent = false }; - V -> V - end. - -store_tx(Txn, Tx) -> - put({txn, Txn}, Tx). - -erase_tx(Txn) -> - erase({txn, Txn}). - -mark_tx_persistent(Txn) -> - store_tx(Txn, (lookup_tx(Txn)) #tx { is_persistent = true }). - -is_tx_persistent(Txn) -> - (lookup_tx(Txn)) #tx.is_persistent. - -do_if_persistent(F, Txn, QName) -> - ok = case is_tx_persistent(Txn) of - false -> ok; - true -> F({Txn, QName}) - end. - -%%---------------------------------------------------------------------------- - -persist_message(QName, true, Txn, Msg = #basic_message { - is_persistent = true }, MsgProps) -> - Msg1 = Msg #basic_message { - %% don't persist any recoverable decoded properties - content = rabbit_binary_parser:clear_decoded_content( - Msg #basic_message.content)}, - persist_work(Txn, QName, - [{publish, Msg1, MsgProps, - {QName, Msg1 #basic_message.guid}}]); -persist_message(_QName, _IsDurable, _Txn, _Msg, _MsgProps) -> - ok. - -persist_delivery(QName, true, false, #basic_message { is_persistent = true, - guid = Guid }) -> - persist_work(none, QName, [{deliver, {QName, Guid}}]); -persist_delivery(_QName, _IsDurable, _IsDelivered, _Msg) -> - ok. - -persist_acks(QName, true, Txn, AckTags, PA) -> - persist_work(Txn, QName, - [{ack, {QName, Guid}} || Guid <- AckTags, - begin - {Msg, _MsgProps} - = dict:fetch(Guid, PA), - Msg #basic_message.is_persistent - end]); -persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) -> - ok. - -persist_work(_Txn,_QName, []) -> - ok; -persist_work(none, _QName, WorkList) -> - rabbit_persister:dirty_work(WorkList); -persist_work(Txn, QName, WorkList) -> - mark_tx_persistent(Txn), - rabbit_persister:extend_transaction({Txn, QName}, WorkList). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index fea7d4a86b..e8b4e8e270 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -144,13 +144,13 @@ -type(startup_fun_state() :: {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), A}). --type(guid_fun() :: fun ((gb_set()) -> any())). +-type(maybe_guid_fun() :: 'undefined' | fun ((gb_set()) -> any())). -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', startup_fun_state()) -> rabbit_types:ok_pid_or_error()). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). --spec(client_init/3 :: (server(), client_ref(), guid_fun()) -> +-spec(client_init/3 :: (server(), client_ref(), maybe_guid_fun()) -> client_msstate()). -spec(client_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok'). diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl deleted file mode 100644 index 11056c8e12..0000000000 --- a/src/rabbit_persister.erl +++ /dev/null @@ -1,496 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_persister). - --behaviour(gen_server). - --export([start_link/1]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --export([transaction/1, extend_transaction/2, dirty_work/1, - commit_transaction/1, rollback_transaction/1, - force_snapshot/0, queue_content/1]). - --include("rabbit.hrl"). - --define(SERVER, ?MODULE). - --define(LOG_BUNDLE_DELAY, 5). --define(COMPLETE_BUNDLE_DELAY, 2). - --define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}). - --record(pstate, {log_handle, entry_count, deadline, - pending_logs, pending_replies, snapshot}). - -%% two tables for efficient persistency -%% one maps a key to a message -%% the other maps a key to one or more queues. -%% The aim is to reduce the overload of storing a message multiple times -%% when it appears in several queues. --record(psnapshot, {transactions, messages, queues, next_seq_id}). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(pkey() :: rabbit_guid:guid()). --type(pmsg() :: {rabbit_amqqueue:name(), pkey()}). - --type(work_item() :: - {publish, - rabbit_types:message(), rabbit_types:message_properties(), pmsg()} | - {deliver, pmsg()} | - {ack, pmsg()}). - --spec(start_link/1 :: ([rabbit_amqqueue:name()]) -> - rabbit_types:ok_pid_or_error()). --spec(transaction/1 :: ([work_item()]) -> 'ok'). --spec(extend_transaction/2 :: - ({rabbit_types:txn(), rabbit_amqqueue:name()}, [work_item()]) - -> 'ok'). --spec(dirty_work/1 :: ([work_item()]) -> 'ok'). --spec(commit_transaction/1 :: - ({rabbit_types:txn(), rabbit_amqqueue:name()}) -> 'ok'). --spec(rollback_transaction/1 :: - ({rabbit_types:txn(), rabbit_amqqueue:name()}) -> 'ok'). --spec(force_snapshot/0 :: () -> 'ok'). --spec(queue_content/1 :: - (rabbit_amqqueue:name()) -> [{rabbit_types:message(), boolean()}]). - --endif. - -%%---------------------------------------------------------------------------- - -start_link(DurableQueues) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [DurableQueues], []). - -transaction(MessageList) -> - ?LOGDEBUG("transaction ~p~n", [MessageList]), - TxnKey = rabbit_guid:guid(), - gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity). - -extend_transaction(TxnKey, MessageList) -> - ?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]), - gen_server:cast(?SERVER, {extend_transaction, TxnKey, MessageList}). - -dirty_work(MessageList) -> - ?LOGDEBUG("dirty_work ~p~n", [MessageList]), - gen_server:cast(?SERVER, {dirty_work, MessageList}). - -commit_transaction(TxnKey) -> - ?LOGDEBUG("commit_transaction ~p~n", [TxnKey]), - gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity). - -rollback_transaction(TxnKey) -> - ?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]), - gen_server:cast(?SERVER, {rollback_transaction, TxnKey}). - -force_snapshot() -> - gen_server:call(?SERVER, force_snapshot, infinity). - -queue_content(QName) -> - gen_server:call(?SERVER, {queue_content, QName}, infinity). - -%%-------------------------------------------------------------------- - -init([DurableQueues]) -> - process_flag(trap_exit, true), - FileName = base_filename(), - ok = filelib:ensure_dir(FileName), - Snapshot = #psnapshot{transactions = dict:new(), - messages = ets:new(messages, []), - queues = ets:new(queues, [ordered_set]), - next_seq_id = 0}, - LogHandle = - case disk_log:open([{name, rabbit_persister}, - {head, current_snapshot(Snapshot)}, - {file, FileName}]) of - {ok, LH} -> LH; - {repaired, LH, {recovered, Recovered}, {badbytes, Bad}} -> - WarningFun = if - Bad > 0 -> fun rabbit_log:warning/2; - true -> fun rabbit_log:info/2 - end, - WarningFun("Repaired persister log - ~p recovered, ~p bad~n", - [Recovered, Bad]), - LH - end, - {Res, NewSnapshot} = - internal_load_snapshot(LogHandle, DurableQueues, Snapshot), - case Res of - ok -> - ok = take_snapshot(LogHandle, NewSnapshot); - {error, Reason} -> - rabbit_log:error("Failed to load persister log: ~p~n", [Reason]), - ok = take_snapshot_and_save_old(LogHandle, NewSnapshot) - end, - State = #pstate{log_handle = LogHandle, - entry_count = 0, - deadline = infinity, - pending_logs = [], - pending_replies = [], - snapshot = NewSnapshot}, - {ok, State}. - -handle_call({transaction, Key, MessageList}, From, State) -> - NewState = internal_extend(Key, MessageList, State), - do_noreply(internal_commit(From, Key, NewState)); -handle_call({commit_transaction, TxnKey}, From, State) -> - do_noreply(internal_commit(From, TxnKey, State)); -handle_call(force_snapshot, _From, State) -> - do_reply(ok, flush(true, State)); -handle_call({queue_content, QName}, _From, - State = #pstate{snapshot = #psnapshot{messages = Messages, - queues = Queues}}) -> - MatchSpec= [{{{QName,'$1'}, '$2', '$3', '$4'}, [], - [{{'$4', '$1', '$2', '$3'}}]}], - do_reply([{ets:lookup_element(Messages, K, 2), MP, D} || - {_, K, D, MP} <- lists:sort(ets:select(Queues, MatchSpec))], - State); -handle_call(_Request, _From, State) -> - {noreply, State}. - -handle_cast({rollback_transaction, TxnKey}, State) -> - do_noreply(internal_rollback(TxnKey, State)); -handle_cast({dirty_work, MessageList}, State) -> - do_noreply(internal_dirty_work(MessageList, State)); -handle_cast({extend_transaction, TxnKey, MessageList}, State) -> - do_noreply(internal_extend(TxnKey, MessageList, State)); -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(timeout, State = #pstate{deadline = infinity}) -> - State1 = flush(true, State), - {noreply, State1, hibernate}; -handle_info(timeout, State) -> - do_noreply(flush(State)); -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, State = #pstate{log_handle = LogHandle}) -> - flush(State), - disk_log:close(LogHandle), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, flush(State)}. - -%%-------------------------------------------------------------------- - -internal_extend(Key, MessageList, State) -> - log_work(fun (ML) -> {extend_transaction, Key, ML} end, - MessageList, State). - -internal_dirty_work(MessageList, State) -> - log_work(fun (ML) -> {dirty_work, ML} end, - MessageList, State). - -internal_commit(From, Key, State = #pstate{snapshot = Snapshot}) -> - Unit = {commit_transaction, Key}, - NewSnapshot = internal_integrate1(Unit, Snapshot), - complete(From, Unit, State#pstate{snapshot = NewSnapshot}). - -internal_rollback(Key, State = #pstate{snapshot = Snapshot}) -> - Unit = {rollback_transaction, Key}, - NewSnapshot = internal_integrate1(Unit, Snapshot), - log(State#pstate{snapshot = NewSnapshot}, Unit). - -complete(From, Item, State = #pstate{deadline = ExistingDeadline, - pending_logs = Logs, - pending_replies = Waiting}) -> - State#pstate{deadline = compute_deadline( - ?COMPLETE_BUNDLE_DELAY, ExistingDeadline), - pending_logs = [Item | Logs], - pending_replies = [From | Waiting]}. - -%% This is made to limit disk usage by writing messages only once onto -%% disk. We keep a table associating pkeys to messages, and provided -%% the list of messages to output is left to right, we can guarantee -%% that pkeys will be a backreference to a message in memory when a -%% "tied" is met. -log_work(CreateWorkUnit, MessageList, - State = #pstate{ - snapshot = Snapshot = #psnapshot{messages = Messages}}) -> - Unit = CreateWorkUnit( - rabbit_misc:map_in_order( - fun (M = {publish, Message, MsgProps, QK = {_QName, PKey}}) -> - case ets:lookup(Messages, PKey) of - [_] -> {tied, MsgProps, QK}; - [] -> ets:insert(Messages, {PKey, Message}), - M - end; - (M) -> M - end, - MessageList)), - NewSnapshot = internal_integrate1(Unit, Snapshot), - log(State#pstate{snapshot = NewSnapshot}, Unit). - -log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs}, - Message) -> - State#pstate{deadline = compute_deadline(?LOG_BUNDLE_DELAY, - ExistingDeadline), - pending_logs = [Message | Logs]}. - -base_filename() -> - rabbit_mnesia:dir() ++ "/rabbit_persister.LOG". - -take_snapshot(LogHandle, OldFileName, Snapshot) -> - ok = disk_log:sync(LogHandle), - %% current_snapshot is the Head (ie. first thing logged) - ok = disk_log:reopen(LogHandle, OldFileName, current_snapshot(Snapshot)). - -take_snapshot(LogHandle, Snapshot) -> - OldFileName = lists:flatten(base_filename() ++ ".previous"), - file:delete(OldFileName), - rabbit_log:info("Rolling persister log to ~p~n", [OldFileName]), - ok = take_snapshot(LogHandle, OldFileName, Snapshot). - -take_snapshot_and_save_old(LogHandle, Snapshot) -> - {MegaSecs, Secs, MicroSecs} = erlang:now(), - Timestamp = MegaSecs * 1000000 + Secs * 1000 + MicroSecs, - OldFileName = lists:flatten(io_lib:format("~s.saved.~p", - [base_filename(), Timestamp])), - rabbit_log:info("Saving persister log in ~p~n", [OldFileName]), - ok = take_snapshot(LogHandle, OldFileName, Snapshot). - -maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount, - log_handle = LH, - snapshot = Snapshot}) -> - {ok, MaxWrapEntries} = application:get_env(persister_max_wrap_entries), - if - Force orelse EntryCount >= MaxWrapEntries -> - ok = take_snapshot(LH, Snapshot), - State#pstate{entry_count = 0}; - true -> - State - end. - -later_ms(DeltaMilliSec) -> - {MegaSec, Sec, MicroSec} = now(), - %% Note: not normalised. Unimportant for this application. - {MegaSec, Sec, MicroSec + (DeltaMilliSec * 1000)}. - -%% Result = B - A, more or less -time_diff({B1, B2, B3}, {A1, A2, A3}) -> - (B1 - A1) * 1000000 + (B2 - A2) + (B3 - A3) / 1000000.0 . - -compute_deadline(TimerDelay, infinity) -> - later_ms(TimerDelay); -compute_deadline(_TimerDelay, ExistingDeadline) -> - ExistingDeadline. - -compute_timeout(infinity) -> - {ok, HibernateAfter} = application:get_env(persister_hibernate_after), - HibernateAfter; -compute_timeout(Deadline) -> - DeltaMilliSec = time_diff(Deadline, now()) * 1000.0, - if - DeltaMilliSec =< 1 -> - 0; - true -> - round(DeltaMilliSec) - end. - -do_noreply(State = #pstate{deadline = Deadline}) -> - {noreply, State, compute_timeout(Deadline)}. - -do_reply(Reply, State = #pstate{deadline = Deadline}) -> - {reply, Reply, State, compute_timeout(Deadline)}. - -flush(State) -> flush(false, State). - -flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs, - pending_replies = Waiting, - log_handle = LogHandle}) -> - State1 = if PendingLogs /= [] -> - disk_log:alog(LogHandle, lists:reverse(PendingLogs)), - State#pstate{entry_count = State#pstate.entry_count + 1}; - true -> - State - end, - State2 = maybe_take_snapshot(ForceSnapshot, State1), - if Waiting /= [] -> - ok = disk_log:sync(LogHandle), - lists:foreach(fun (From) -> gen_server:reply(From, ok) end, - Waiting); - true -> - ok - end, - State2#pstate{deadline = infinity, - pending_logs = [], - pending_replies = []}. - -current_snapshot(_Snapshot = #psnapshot{transactions = Ts, - messages = Messages, - queues = Queues, - next_seq_id = NextSeqId}) -> - %% Avoid infinite growth of the table by removing messages not - %% bound to a queue anymore - PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, - _MsgProps, _SeqId}, S) -> - sets:add_element(PKey, S) - end, sets:new(), Queues), - prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end), - InnerSnapshot = {{txns, Ts}, - {messages, ets:tab2list(Messages)}, - {queues, ets:tab2list(Queues)}, - {next_seq_id, NextSeqId}}, - ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]), - {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, - term_to_binary(InnerSnapshot)}. - -prune_table(Tab, Pred) -> - true = ets:safe_fixtable(Tab, true), - ok = prune_table(Tab, Pred, ets:first(Tab)), - true = ets:safe_fixtable(Tab, false). - -prune_table(_Tab, _Pred, '$end_of_table') -> ok; -prune_table(Tab, Pred, Key) -> - case Pred(Key) of - true -> ok; - false -> ets:delete(Tab, Key) - end, - prune_table(Tab, Pred, ets:next(Tab, Key)). - -internal_load_snapshot(LogHandle, - DurableQueues, - Snapshot = #psnapshot{messages = Messages, - queues = Queues}) -> - {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), - case check_version(Loaded_Snapshot) of - {ok, StateBin} -> - {{txns, Ts}, {messages, Ms}, {queues, Qs}, - {next_seq_id, NextSeqId}} = binary_to_term(StateBin), - true = ets:insert(Messages, Ms), - true = ets:insert(Queues, Qs), - Snapshot1 = replay(Items, LogHandle, K, - Snapshot#psnapshot{ - transactions = Ts, - next_seq_id = NextSeqId}), - %% Remove all entries for queues that no longer exist. - %% Note that the 'messages' table is pruned when the next - %% snapshot is taken. - DurableQueuesSet = sets:from_list(DurableQueues), - prune_table(Snapshot1#psnapshot.queues, - fun ({QName, _PKey}) -> - sets:is_element(QName, DurableQueuesSet) - end), - %% uncompleted transactions are discarded - this is TRTTD - %% since we only get into this code on node restart, so - %% any uncompleted transactions will have been aborted. - {ok, Snapshot1#psnapshot{transactions = dict:new()}}; - {error, Reason} -> {{error, Reason}, Snapshot} - end. - -check_version({persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, - StateBin}) -> - {ok, StateBin}; -check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) -> - {error, {unsupported_persister_log_format, Vsn}}; -check_version(_Other) -> - {error, unrecognised_persister_log_format}. - -replay([], LogHandle, K, Snapshot) -> - case disk_log:chunk(LogHandle, K) of - {K1, Items} -> - replay(Items, LogHandle, K1, Snapshot); - {K1, Items, Badbytes} -> - rabbit_log:warning("~p bad bytes recovering persister log~n", - [Badbytes]), - replay(Items, LogHandle, K1, Snapshot); - eof -> Snapshot - end; -replay([Item | Items], LogHandle, K, Snapshot) -> - NewSnapshot = internal_integrate_messages(Item, Snapshot), - replay(Items, LogHandle, K, NewSnapshot). - -internal_integrate_messages(Items, Snapshot) -> - lists:foldl(fun (Item, Snap) -> internal_integrate1(Item, Snap) end, - Snapshot, Items). - -internal_integrate1({extend_transaction, Key, MessageList}, - Snapshot = #psnapshot {transactions = Transactions}) -> - Snapshot#psnapshot{transactions = rabbit_misc:dict_cons(Key, MessageList, - Transactions)}; -internal_integrate1({rollback_transaction, Key}, - Snapshot = #psnapshot{transactions = Transactions}) -> - Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; -internal_integrate1({commit_transaction, Key}, - Snapshot = #psnapshot{transactions = Transactions, - messages = Messages, - queues = Queues, - next_seq_id = SeqId}) -> - case dict:find(Key, Transactions) of - {ok, MessageLists} -> - ?LOGDEBUG("persist committing txn ~p~n", [Key]), - NextSeqId = - lists:foldr( - fun (ML, SeqIdN) -> - perform_work(ML, Messages, Queues, SeqIdN) end, - SeqId, MessageLists), - Snapshot#psnapshot{transactions = dict:erase(Key, Transactions), - next_seq_id = NextSeqId}; - error -> - Snapshot - end; -internal_integrate1({dirty_work, MessageList}, - Snapshot = #psnapshot{messages = Messages, - queues = Queues, - next_seq_id = SeqId}) -> - Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages, - Queues, SeqId)}. - -perform_work(MessageList, Messages, Queues, SeqId) -> - lists:foldl(fun (Item, NextSeqId) -> - perform_work_item(Item, Messages, Queues, NextSeqId) - end, SeqId, MessageList). - -perform_work_item({publish, Message, MsgProps, QK = {_QName, PKey}}, - Messages, Queues, NextSeqId) -> - true = ets:insert(Messages, {PKey, Message}), - true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}), - NextSeqId + 1; - -perform_work_item({tied, MsgProps, QK}, _Messages, Queues, NextSeqId) -> - true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}), - NextSeqId + 1; - -perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) -> - true = ets:update_element(Queues, QK, {2, true}), - NextSeqId; - -perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) -> - true = ets:delete(Queues, QK), - NextSeqId. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 8b58b822a4..b9edad9a3a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1898,7 +1898,7 @@ test_variable_queue_ack_limiting(VQ0) -> VQ6 = check_variable_queue_status( rabbit_variable_queue:set_ram_duration_target(0, VQ5), [{len, Len div 2}, - {target_ram_item_count, 0}, + {target_ram_count, 0}, {ram_msg_count, 0}, {ram_ack_count, 0}]), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7c676164b6..0db5116559 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,7 +31,7 @@ -module(rabbit_variable_queue). --export([init/5, init/3, terminate/1, delete_and_terminate/1, +-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, @@ -42,7 +42,7 @@ -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0]). +-export([start_msg_store/2, stop_msg_store/0, init/5]). %%---------------------------------------------------------------------------- %% Definitions: @@ -158,7 +158,7 @@ %% The conversion from alphas to betas is also chunked, but only to %% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at %% any one time. This further smooths the effects of changes to the -%% target_ram_item_count and ensures the queue remains responsive +%% target_ram_count and ensures the queue remains responsive %% even when there is a large amount of IO work to do. The %% idle_timeout callback is utilised to ensure that conversions are %% done as promptly as possible whilst ensuring the queue remains @@ -256,7 +256,7 @@ len, persistent_count, - target_ram_item_count, + target_ram_count, ram_msg_count, ram_msg_count_prev, ram_ack_count_prev, @@ -351,7 +351,7 @@ persistent_count :: non_neg_integer(), transient_threshold :: non_neg_integer(), - target_ram_item_count :: non_neg_integer() | 'infinity', + target_ram_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), ram_index_count :: non_neg_integer(), @@ -734,26 +734,24 @@ len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). -set_ram_duration_target(DurationTarget, - State = #vqstate { - rates = - #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate }, - ack_rates = - #rates { avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate }, - target_ram_item_count = TargetRamItemCount }) -> +set_ram_duration_target( + DurationTarget, State = #vqstate { + rates = #rates { avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate }, + ack_rates = #rates { avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate }, + target_ram_count = TargetRamCount }) -> Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate, - TargetRamItemCount1 = + TargetRamCount1 = case DurationTarget of infinity -> infinity; _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec end, - State1 = State #vqstate { target_ram_item_count = TargetRamItemCount1 }, - a(case TargetRamItemCount1 == infinity orelse - (TargetRamItemCount =/= infinity andalso - TargetRamItemCount1 >= TargetRamItemCount) of + State1 = State #vqstate { target_ram_count = TargetRamCount1 }, + a(case TargetRamCount1 == infinity orelse + (TargetRamCount =/= infinity andalso + TargetRamCount1 >= TargetRamCount) of true -> State1; false -> reduce_memory_use(State1) end). @@ -829,40 +827,39 @@ idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. -status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - len = Len, - pending_ack = PA, - ram_ack_index = RAI, - on_sync = #sync { funs = From }, - target_ram_item_count = TargetRamItemCount, - ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount, - next_seq_id = NextSeqId, - persistent_count = PersistentCount, - rates = #rates { - avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate }, - ack_rates = #rates { - avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate } }) -> - [ {q1 , queue:len(Q1)}, - {q2 , bpqueue:len(Q2)}, - {delta , Delta}, - {q3 , bpqueue:len(Q3)}, - {q4 , queue:len(Q4)}, - {len , Len}, - {pending_acks , dict:size(PA)}, - {outstanding_txns , length(From)}, - {target_ram_item_count , TargetRamItemCount}, - {ram_msg_count , RamMsgCount}, - {ram_ack_count , gb_trees:size(RAI)}, - {ram_index_count , RamIndexCount}, - {next_seq_id , NextSeqId}, - {persistent_count , PersistentCount}, - {avg_ingress_rate , AvgIngressRate}, - {avg_egress_rate , AvgEgressRate}, - {avg_ack_ingress_rate , AvgAckIngressRate}, - {avg_ack_egress_rate , AvgAckEgressRate} ]. +status(#vqstate { + q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, + len = Len, + pending_ack = PA, + ram_ack_index = RAI, + on_sync = #sync { funs = From }, + target_ram_count = TargetRamCount, + ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount, + next_seq_id = NextSeqId, + persistent_count = PersistentCount, + rates = #rates { avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate }, + ack_rates = #rates { avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate } }) -> + [ {q1 , queue:len(Q1)}, + {q2 , bpqueue:len(Q2)}, + {delta , Delta}, + {q3 , bpqueue:len(Q3)}, + {q4 , queue:len(Q4)}, + {len , Len}, + {pending_acks , dict:size(PA)}, + {outstanding_txns , length(From)}, + {target_ram_count , TargetRamCount}, + {ram_msg_count , RamMsgCount}, + {ram_ack_count , gb_trees:size(RAI)}, + {ram_index_count , RamIndexCount}, + {next_seq_id , NextSeqId}, + {persistent_count , PersistentCount}, + {avg_ingress_rate , AvgIngressRate}, + {avg_egress_rate , AvgEgressRate}, + {avg_ack_ingress_rate, AvgAckIngressRate}, + {avg_ack_egress_rate , AvgAckEgressRate} ]. %%---------------------------------------------------------------------------- %% Minor helpers @@ -1056,37 +1053,37 @@ init(IsDurable, IndexState, DeltaCount, Terms, end, Now = now(), State = #vqstate { - q1 = queue:new(), - q2 = bpqueue:new(), - delta = Delta, - q3 = bpqueue:new(), - q4 = queue:new(), - next_seq_id = NextSeqId, - pending_ack = dict:new(), - ram_ack_index = gb_trees:empty(), - index_state = IndexState1, - msg_store_clients = {PersistentClient, TransientClient}, - on_sync = ?BLANK_SYNC, - durable = IsDurable, - transient_threshold = NextSeqId, - - len = DeltaCount1, - persistent_count = DeltaCount1, - - target_ram_item_count = infinity, - ram_msg_count = 0, - ram_msg_count_prev = 0, - ram_ack_count_prev = 0, - ram_index_count = 0, - out_counter = 0, - in_counter = 0, - msgs_on_disk = gb_sets:new(), - msg_indices_on_disk = gb_sets:new(), - unconfirmed = gb_sets:new(), - ack_out_counter = 0, - ack_in_counter = 0, - rates = blank_rate(Now, DeltaCount1), - ack_rates = blank_rate(Now, 0) }, + q1 = queue:new(), + q2 = bpqueue:new(), + delta = Delta, + q3 = bpqueue:new(), + q4 = queue:new(), + next_seq_id = NextSeqId, + pending_ack = dict:new(), + ram_ack_index = gb_trees:empty(), + index_state = IndexState1, + msg_store_clients = {PersistentClient, TransientClient}, + on_sync = ?BLANK_SYNC, + durable = IsDurable, + transient_threshold = NextSeqId, + + len = DeltaCount1, + persistent_count = DeltaCount1, + + target_ram_count = infinity, + ram_msg_count = 0, + ram_msg_count_prev = 0, + ram_ack_count_prev = 0, + ram_index_count = 0, + out_counter = 0, + in_counter = 0, + rates = blank_rate(Now, DeltaCount1), + msgs_on_disk = gb_sets:new(), + msg_indices_on_disk = gb_sets:new(), + unconfirmed = gb_sets:new(), + ack_out_counter = 0, + ack_in_counter = 0, + ack_rates = blank_rate(Now, 0) }, a(maybe_deltas_to_betas(State)). blank_rate(Timestamp, IngressLength) -> @@ -1443,7 +1440,7 @@ msg_indices_written_to_disk(QPid, GuidSet) -> %% though the conversion function for that is called as necessary. The %% reason is twofold. Firstly, this is safe because the conversion is %% only ever necessary just after a transition to a -%% target_ram_item_count of zero or after an incremental alpha->beta +%% target_ram_count of zero or after an incremental alpha->beta %% conversion. In the former case the conversion is performed straight %% away (i.e. any betas present at the time are converted to deltas), %% and in the latter case the need for a conversion is flagged up @@ -1454,51 +1451,41 @@ msg_indices_written_to_disk(QPid, GuidSet) -> %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun, - State = #vqstate {target_ram_item_count = infinity}) -> + State = #vqstate {target_ram_count = infinity}) -> {false, State}; reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, State = #vqstate { - ram_ack_index = RamAckIndex, - ram_msg_count = RamMsgCount, - target_ram_item_count = TargetRamItemCount, - rates = #rates { - avg_ingress = AvgIngress, - avg_egress = AvgEgress }, - ack_rates = #rates { - avg_ingress = AvgAckIngress, - avg_egress = AvgAckEgress } }) -> + ram_ack_index = RamAckIndex, + ram_msg_count = RamMsgCount, + target_ram_count = TargetRamCount, + rates = #rates { avg_ingress = AvgIngress, + avg_egress = AvgEgress }, + ack_rates = #rates { avg_ingress = AvgAckIngress, + avg_egress = AvgAckEgress } + }) -> {Reduce, State1} = case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), - TargetRamItemCount) of - 0 -> - {false, State}; - S1 -> - ReduceFuns = - case (AvgAckIngress - AvgAckEgress) > - (AvgIngress - AvgEgress) of - true -> - %% ACKs are growing faster than the queue, - %% push messages from there first. - [AckFun, AlphaBetaFun]; - false -> - %% The queue is growing faster than the - %% acks, push queue messages first. - [AlphaBetaFun, AckFun] - end, - {_, State2} = - %% Both reduce functions get a chance to reduce - %% memory. The second may very well get a quota of - %% 0 if the first function managed to push out the - %% maximum number of messages. - lists:foldl( - fun (ReduceFun, {QuotaN, StateN}) -> - ReduceFun(QuotaN, StateN) - end, {S1, State}, ReduceFuns), - {true, State2} + TargetRamCount) of + 0 -> {false, State}; + %% Reduce memory of pending acks and alphas. The order is + %% determined based on which is growing faster. Whichever + %% comes second may very well get a quota of 0 if the + %% first manages to push out the max number of messages. + S1 -> {_, State2} = + lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> + ReduceFun(QuotaN, StateN) + end, + {S1, State}, + case (AvgAckIngress - AvgAckEgress) > + (AvgIngress - AvgEgress) of + true -> [AckFun, AlphaBetaFun]; + false -> [AlphaBetaFun, AckFun] + end), + {true, State2} end, - case State1 #vqstate.target_ram_item_count of + case State1 #vqstate.target_ram_count of 0 -> {Reduce, BetaDeltaFun(State1)}; _ -> case chunk_size(State1 #vqstate.ram_index_count, permitted_ram_index_count(State1)) of @@ -1694,11 +1681,11 @@ maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) -> maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, State = #vqstate { - ram_msg_count = RamMsgCount, - target_ram_item_count = TargetRamItemCount }) + ram_msg_count = RamMsgCount, + target_ram_count = TargetRamCount }) when Quota =:= 0 orelse - TargetRamItemCount =:= infinity orelse - TargetRamItemCount >= RamMsgCount -> + TargetRamCount =:= infinity orelse + TargetRamCount >= RamMsgCount -> {Quota, State}; maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> case Generator(Q) of diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 1b4710c6c3..0159609d9a 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -182,7 +182,7 @@ call(Pid, Msg) -> %--------------------------------------------------------------------------- -assemble_frames(Channel, MethodRecord, Protocol) -> +assemble_frame(Channel, MethodRecord, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, none), rabbit_binary_generator:build_simple_method_frame( Channel, MethodRecord, Protocol). @@ -197,17 +197,34 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> Channel, Content, FrameMax, Protocol), [MethodFrame | ContentFrames]. +%% We optimise delivery of small messages. Content-bearing methods +%% require at least three frames. Small messages always fit into +%% that. We hand their frames to the Erlang network functions in one +%% go, which may lead to somewhat more efficient processing in the +%% runtime and a greater chance of coalescing into fewer TCP packets. +%% +%% By contrast, for larger messages, split across many frames, we want +%% to allow interleaving of frames on different channels. Hence we +%% hand them to the Erlang network functions one frame at a time. +send_frames(Fun, Sock, Frames) when length(Frames) =< 3 -> + Fun(Sock, Frames); +send_frames(Fun, Sock, Frames) -> + lists:foldl(fun (Frame, ok) -> Fun(Sock, Frame); + (_Frame, Other) -> Other + end, ok, Frames). + tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, fun () -> rabbit_net:send(Sock, Data) end). internal_send_command(Sock, Channel, MethodRecord, Protocol) -> - ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, Protocol)). + ok = tcp_send(Sock, assemble_frame(Channel, MethodRecord, Protocol)). internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, Protocol) -> - ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax, Protocol)). + ok = send_frames(fun tcp_send/2, Sock, + assemble_frames(Channel, MethodRecord, + Content, FrameMax, Protocol)). %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from @@ -231,19 +248,19 @@ internal_send_command_async(MethodRecord, #wstate{sock = Sock, channel = Channel, protocol = Protocol}) -> - true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)), - ok. + ok = port_cmd(Sock, assemble_frame(Channel, MethodRecord, Protocol)). internal_send_command_async(MethodRecord, Content, #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, protocol = Protocol}) -> - true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax, Protocol)), - ok. + ok = send_frames(fun port_cmd/2, Sock, + assemble_frames(Channel, MethodRecord, + Content, FrameMax, Protocol)). port_cmd(Sock, Data) -> - try rabbit_net:port_command(Sock, Data) - catch error:Error -> exit({writer, send_failed, Error}) - end. + true = try rabbit_net:port_command(Sock, Data) + catch error:Error -> exit({writer, send_failed, Error}) + end, + ok. |
