diff options
| author | Rob Harrop <rob@rabbitmq.com> | 2010-11-25 16:08:47 +0000 |
|---|---|---|
| committer | Rob Harrop <rob@rabbitmq.com> | 2010-11-25 16:08:47 +0000 |
| commit | 6c50d609f32995e223e22ab286bdf9b3c03ed952 (patch) | |
| tree | 19beabaca231db2cb1c958f3322c826cc434ad41 | |
| parent | b0c2cac6b7d7b3efb9df6d39dfaba5ca7b90fc1b (diff) | |
| parent | dadd84b373cdabd2c0c2c350527a97d0a5ff887a (diff) | |
| download | rabbitmq-server-git-6c50d609f32995e223e22ab286bdf9b3c03ed952.tar.gz | |
merge with default
| -rw-r--r-- | include/rabbit.hrl | 5 | ||||
| -rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 3 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 173 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 223 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 199 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 52 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_types.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 149 |
12 files changed, 671 insertions, 229 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index a1987fb292..fccfad9708 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -69,12 +69,13 @@ is_persistent}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, immediate, txn, sender, message}). +-record(delivery, {mandatory, immediate, txn, sender, message, + msg_seq_no}). -record(amqp_error, {name, explanation = "", method = none}). -record(event, {type, props, timestamp}). --record(message_properties, {expiry}). +-record(message_properties, {expiry, needs_confirming = false}). %%---------------------------------------------------------------------------- diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 20230b2447..f67c6f46d1 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -37,6 +37,7 @@ -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(ack_required() :: boolean()). +-type(confirm_required() :: boolean()). -type(message_properties_transformer() :: fun ((rabbit_types:message_properties()) -> rabbit_types:message_properties())). @@ -57,7 +58,7 @@ (fun ((rabbit_types:message_properties()) -> boolean()), state()) -> state()). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). --spec(ack/2 :: ([ack()], state()) -> state()). +-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). -spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(), rabbit_types:message_properties(), state()) -> state()). -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fa4175441b..70d8f2dda7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -34,6 +34,7 @@ -export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, + maybe_run_queue_via_backing_queue_async/2, update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2, maybe_expire/1, drop_expired/1]). -export([pseudo_queue/2]). @@ -156,7 +157,9 @@ (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit()). -spec(maybe_run_queue_via_backing_queue/2 :: - (pid(), (fun ((A) -> A))) -> 'ok'). + (pid(), (fun ((A) -> A | {any(), A}))) -> 'ok'). +-spec(maybe_run_queue_via_backing_queue_async/2 :: + (pid(), (fun ((A) -> A | {any(), A}))) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -380,16 +383,13 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity). -deliver(QPid, #delivery{immediate = true, - txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid}, - infinity); -deliver(QPid, #delivery{mandatory = true, - txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity), +deliver(QPid, Delivery = #delivery{immediate = true}) -> + gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity); +deliver(QPid, Delivery = #delivery{mandatory = true}) -> + gen_server2:call(QPid, {deliver, Delivery}, infinity), true; -deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}), +deliver(QPid, Delivery) -> + gen_server2:cast(QPid, {deliver, Delivery}), true. requeue(QPid, MsgIds, ChPid) -> @@ -466,6 +466,9 @@ internal_delete(QueueName) -> maybe_run_queue_via_backing_queue(QPid, Fun) -> gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity). +maybe_run_queue_via_backing_queue_async(QPid, Fun) -> + gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}). + update_ram_duration(QPid) -> gen_server2:cast(QPid, update_ram_duration). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a999fe582c..3dbd2b2299 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,7 +39,8 @@ -define(SYNC_INTERVAL, 5). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). --define(BASE_MESSAGE_PROPERTIES, #message_properties{expiry = undefined}). +-define(BASE_MESSAGE_PROPERTIES, + #message_properties{expiry = undefined, needs_confirming = false}). -export([start_link/1, info_keys/0]). @@ -64,6 +65,7 @@ rate_timer_ref, expiry_timer_ref, stats_timer, + guid_to_channel, ttl, ttl_timer_ref }). @@ -128,7 +130,8 @@ init(Q) -> rate_timer_ref = undefined, expiry_timer_ref = undefined, ttl = undefined, - stats_timer = rabbit_event:init_stats_timer()}, hibernate, + stats_timer = rabbit_event:init_stats_timer(), + guid_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -373,11 +376,13 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), - ChAckTags1 = case AckRequired of - true -> sets:add_element( - AckTag, ChAckTags); - false -> ChAckTags - end, + {State2, ChAckTags1} = + case AckRequired of + true -> {State1, + sets:add_element(AckTag, ChAckTags)}; + false -> {confirm_message(Message, State1), + ChAckTags} + end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, true = maybe_store_ch_record(NewC), @@ -393,10 +398,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - State2 = State1#q{ + State3 = State2#q{ active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers}, - deliver_msgs_to_consumers(Funs, FunAcc1, State2); + deliver_msgs_to_consumers(Funs, FunAcc1, State3); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> true = maybe_store_ch_record(C#cr{is_limit_active = true}), @@ -424,6 +429,39 @@ deliver_from_queue_deliver(AckRequired, false, State) -> fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. +confirm_messages(Guids, State) -> + lists:foldl(fun confirm_message_by_guid/2, State, Guids). + +confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) -> + case dict:find(Guid, GTC) of + {ok, {_ , undefined}} -> ok; + {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo); + _ -> ok + end, + State#q{guid_to_channel = dict:erase(Guid, GTC)}. + +confirm_message(#basic_message{guid = Guid}, State) -> + confirm_message_by_guid(Guid, State). + +record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> + State; +record_confirm_message(#delivery{sender = ChPid, + msg_seq_no = MsgSeqNo, + message = #basic_message { + is_persistent = true, + guid = Guid}}, + State = + #q{guid_to_channel = GTC, + q = #amqqueue{durable = true}}) -> + State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}; +record_confirm_message(_Delivery, State) -> + State. + +ack_by_acktags(AckTags, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {AckdGuids, BQS1} = BQ:ack(AckTags, BQS), + confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}). + run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3}, @@ -433,7 +471,17 @@ run_message_queue(State) -> {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), State2. -attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> +attempt_delivery(#delivery{txn = none, + sender = ChPid, + message = Message, + msg_seq_no = MsgSeqNo}, + State = #q{backing_queue = BQ, q = Q}) -> + NeedsConfirming = Message#basic_message.is_persistent andalso + Q#amqqueue.durable, + case NeedsConfirming of + false -> rabbit_channel:confirm(ChPid, MsgSeqNo); + _ -> ok + end, PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> @@ -441,29 +489,37 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> %% not being enqueued, so we use an empty %% message_properties. {AckTag, BQS1} = - BQ:publish_delivered(AckRequired, Message, - ?BASE_MESSAGE_PROPERTIES, BQS), + BQ:publish_delivered( + AckRequired, Message, + ?BASE_MESSAGE_PROPERTIES + #message_properties{ + needs_confirming = NeedsConfirming}, + BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +attempt_delivery(#delivery{txn = Txn, + sender = ChPid, + message = Message}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}. -deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> - case attempt_delivery(Txn, ChPid, Message, State) of - {true, NewState} -> - {true, NewState}; - {false, NewState} -> - %% Txn is none and no unblocked channels with consumers - BQS = BQ:publish(Message, - message_properties(State), - State #q.backing_queue_state), - {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})} +deliver_or_enqueue(Delivery, State) -> + case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of + {true, State1} -> + {true, State1}; + {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> + #delivery{message = Message, msg_seq_no = MsgSeqNo} = Delivery, + BQS1 = BQ:publish(Message, + (message_properties(State)) #message_properties{ + needs_confirming = (MsgSeqNo =/= undefined)}, + BQS), + {false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})} end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> @@ -566,7 +622,12 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - run_message_queue(State#q{backing_queue_state = Fun(BQS)}). + {BQS2, State1} = + case Fun(BQS) of + {{confirm, Guids}, BQS1} -> {BQS1, confirm_messages(Guids, State)}; + BQS1 -> {BQS1, State} + end, + run_message_queue(State1#q{backing_queue_state = BQS2}). commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS, @@ -745,7 +806,8 @@ handle_call(consumers, _From, [{ChPid, ConsumerTag, AckRequired} | Acc] end, [], queue:join(ActiveConsumers, BlockedConsumers)), State); -handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> +handle_call({deliver_immediately, Delivery = #delivery{message = Message}}, + _From, State) -> %% Synchronous, "immediate" delivery mode %% %% FIXME: Is this correct semantics? @@ -759,12 +821,16 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), - reply(Delivered, NewState); - -handle_call({deliver, Txn, Message, ChPid}, _From, State) -> + {Delivered, State1} = + attempt_delivery(Delivery, record_confirm_message(Delivery, State)), + reply(Delivered, case Delivered of + true -> State1; + false -> confirm_message(Message, State1) + end); + +handle_call({deliver, Delivery}, _From, State) -> %% Synchronous, "mandatory" delivery mode - {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), + {Delivered, NewState} = deliver_or_enqueue(Delivery, State), reply(Delivered, NewState); handle_call({commit, Txn, ChPid}, From, State) -> @@ -790,15 +856,18 @@ handle_call({basic_get, ChPid, NoAck}, _From, {empty, State2} -> reply(empty, State2); {{Message, IsDelivered, AckTag, Remaining}, State2} -> - case AckRequired of - true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - true = maybe_store_ch_record( - C#cr{acktags = sets:add_element(AckTag, - ChAckTags)}); - false -> ok - end, + State3 = + case AckRequired of + true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + true = maybe_store_ch_record( + C#cr{acktags = + sets:add_element(AckTag, + ChAckTags)}), + State2; + false -> confirm_message(Message, State2) + end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State2) + reply({ok, Remaining, Msg}, State3) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, @@ -911,9 +980,13 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). -handle_cast({deliver, Txn, Message, ChPid}, State) -> + +handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> + noreply(maybe_run_queue_via_backing_queue(Fun, State)); + +handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), + {_Delivered, NewState} = deliver_or_enqueue(Delivery, State), noreply(NewState); handle_cast({ack, Txn, AckTags, ChPid}, @@ -922,18 +995,21 @@ handle_cast({ack, Txn, AckTags, ChPid}, not_found -> noreply(State); C = #cr{acktags = ChAckTags} -> - {C1, BQS1} = + {C1, State1} = case Txn of none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)}; - _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} + NewC = C#cr{acktags = ChAckTags1}, + NewState = ack_by_acktags(AckTags, State), + {NewC, NewState}; + _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), + {C#cr{txn = Txn}, + State#q{backing_queue_state = BQS1}} end, maybe_store_ch_record(C1), - noreply(State#q{backing_queue_state = BQS1}) + noreply(State1) end; -handle_cast({reject, AckTags, Requeue, ChPid}, - State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +handle_cast({reject, AckTags, Requeue, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> noreply(State); @@ -942,8 +1018,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> BQS1 = BQ:ack(AckTags, BQS), - State #q { backing_queue_state = BQS1 } + false -> ack_by_acktags(AckTags, State) end) end; diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 3841298244..1ac39b6538 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/4, properties/1, delivery/4]). +-export([publish/1, message/4, properties/1, delivery/5]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). -export([is_message_persistent/1]). @@ -50,9 +50,10 @@ -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). --spec(delivery/4 :: +-spec(delivery/5 :: (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), - rabbit_types:message()) -> rabbit_types:delivery()). + rabbit_types:message(), undefined | integer()) -> + rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(), binary()) -> @@ -88,9 +89,9 @@ publish(Delivery = #delivery{ Other end. -delivery(Mandatory, Immediate, Txn, Message) -> +delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, - sender = self(), message = Message}. + sender = self(), message = Message, msg_seq_no = MsgSeqNo}. build_content(Properties, BodyBin) -> %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 @@ -157,7 +158,8 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, BodyBin) -> publish(delivery(Mandatory, Immediate, Txn, message(ExchangeName, RoutingKeyBin, - properties(Properties), BodyBin))). + properties(Properties), BodyBin), + undefined)). is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 6bed63a3cb..b2e6658b29 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,7 +38,7 @@ -export([start_link/7, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([emit_stats/1, flush/1]). +-export([emit_stats/1, flush/1, flush_multiple_acks/1, confirm/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -48,7 +48,9 @@ start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, queue_collector_pid, stats_timer}). + consumer_mapping, blocking, queue_collector_pid, stats_timer, + confirm_enabled, published_count, confirm_multiple, confirm_tref, + held_confirms, unconfirmed, queues_for_msg}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -70,6 +72,8 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). +-define(FLUSH_MULTIPLE_ACKS_INTERVAL, 1000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -99,6 +103,8 @@ -spec(info_all/0 :: () -> [rabbit_types:infos()]). -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(emit_stats/1 :: (pid()) -> 'ok'). +-spec(flush_multiple_acks/1 :: (pid()) -> 'ok'). +-spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok'). -endif. @@ -153,6 +159,12 @@ emit_stats(Pid) -> flush(Pid) -> gen_server2:call(Pid, flush). +flush_multiple_acks(Pid) -> + gen_server2:cast(Pid, flush_multiple_acks). + +confirm(Pid, MsgSeqNo) -> + gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}). + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, @@ -177,7 +189,13 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, consumer_mapping = dict:new(), blocking = dict:new(), queue_collector_pid = CollectorPid, - stats_timer = StatsTimer}, + stats_timer = StatsTimer, + confirm_enabled = false, + published_count = 0, + confirm_multiple = false, + held_confirms = gb_sets:new(), + unconfirmed = gb_sets:new(), + queues_for_msg = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -261,21 +279,38 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> internal_emit_stats(State), {noreply, State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, - hibernate}. - -handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> + hibernate}; + +handle_cast(flush_multiple_acks, State) -> + {noreply, flush_multiple(State)}; + +handle_cast({confirm, MsgSeqNo, From}, State) -> + {noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}. + +handle_info({'DOWN', _MRef, process, QPid, _Reason}, + State = #ch{queues_for_msg = QFM}) -> + State1 = dict:fold( + fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> + Qs = sets:del_element(QPid, QPids), + case sets:size(Qs) of + 0 -> send_or_enqueue_ack(Msg, QPid, State0); + _ -> State0#ch{queues_for_msg = + dict:store(Msg, Qs, QFM0)} + end + end, State, QFM), erase_queue_stats(QPid), - {noreply, queue_blocked(QPid, State), hibernate}. + {noreply, queue_blocked(QPid, State1), hibernate}. -handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> +handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), + State1 = flush_multiple(State), rabbit_event:if_enabled(StatsTimer, fun () -> internal_emit_stats( State, [{idle_since, now()}]) end), - {hibernate, - State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}. + StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), + {hibernate, State1#ch{stats_timer = StatsTimer1}}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -420,6 +455,53 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. +send_or_enqueue_ack(undefined, _QPid, State) -> + State; +send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> + State; +send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> + do_if_unconfirmed( + MsgSeqNo, QPid, + fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.ack'{delivery_tag = MSN}), + State1 + end, State); +send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> + do_if_unconfirmed( + MsgSeqNo, QPid, + fun(MSN, State1 = #ch{held_confirms = As}) -> + start_confirm_timer( + State1#ch{held_confirms = gb_sets:add(MSN, As)}) + end, State). + +do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, + State = #ch{unconfirmed = UC, + queues_for_msg = QFM}) -> + %% clears references to MsgSeqNo and does ConfirmFun + case gb_sets:is_element(MsgSeqNo, UC) of + true -> + Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC), + case QPid of + undefined -> + ConfirmFun(MsgSeqNo, + State#ch{unconfirmed = Unconfirmed1}); + _ -> + {ok, Qs} = dict:find(MsgSeqNo, QFM), + Qs1 = sets:del_element(QPid, Qs), + case sets:size(Qs1) of + 0 -> ConfirmFun(MsgSeqNo, + State#ch{ + queues_for_msg = + dict:erase(MsgSeqNo, QFM), + unconfirmed = Unconfirmed1}); + _ -> State#ch{queues_for_msg = + dict:store(MsgSeqNo, Qs1, QFM)} + end + end; + false -> State + end. + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -442,9 +524,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, immediate = Immediate}, - Content, State = #ch{virtual_host = VHostPath, - transaction_id = TxnKey, - writer_pid = WriterPid}) -> + Content, State = #ch{virtual_host = VHostPath, + transaction_id = TxnKey, + confirm_enabled = ConfirmEnabled}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -452,6 +534,15 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), IsPersistent = is_message_persistent(DecodedContent), + {MsgSeqNo, State1} + = case ConfirmEnabled of + false -> {undefined, State}; + true -> Count = State#ch.published_count, + {Count, + State#ch{published_count = Count + 1, + unconfirmed = + gb_sets:add(Count, State#ch.unconfirmed)}} + end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, @@ -460,18 +551,16 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), - case RoutingRes of - routed -> ok; - unroutable -> ok = basic_return(Message, WriterPid, no_route); - not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) - end, + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, + MsgSeqNo)), + State2 = process_routing_result(RoutingRes, DeliveredQPids, + MsgSeqNo, Message, State1), maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || - QPid <- DeliveredQPids]], publish, State), + QPid <- DeliveredQPids]], publish, State2), {noreply, case TxnKey of - none -> State; - _ -> add_tx_participants(DeliveredQPids, State) + none -> State2; + _ -> add_tx_participants(DeliveredQPids, State2) end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, @@ -860,6 +949,11 @@ handle_method(#'queue.purge'{queue = QueueNameBin, return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); + +handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> + rabbit_misc:protocol_error( + precondition_failed, "cannot switch from confirm to tx mode", []); + handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) -> {reply, #'tx.select_ok'{}, new_tx(State)}; @@ -880,6 +974,25 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; +handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) + when TxId =/= none -> + rabbit_misc:protocol_error( + precondition_failed, "cannot switch from tx to confirm mode", []); + +handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, + _, State = #ch{confirm_enabled = false}) -> + return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple}, + NoWait, #'confirm.select_ok'{}); + +handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, + _, State = #ch{confirm_enabled = true, + confirm_multiple = Multiple}) -> + return_ok(State, NoWait, #'confirm.select_ok'{}); + +handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) -> + rabbit_misc:protocol_error( + precondition_failed, "cannot change confirm_multiple setting", []); + handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of @@ -1107,6 +1220,22 @@ is_message_persistent(Content) -> IsPersistent end. +process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> + ok = basic_return(Message, State#ch.writer_pid, no_route), + send_or_enqueue_ack(MsgSeqNo, undefined, State); +process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> + ok = basic_return(Message, State#ch.writer_pid, no_consumers), + send_or_enqueue_ack(MsgSeqNo, undefined, State); +process_routing_result(routed, [], MsgSeqNo, _, State) -> + send_or_enqueue_ack(MsgSeqNo, undefined, State); +process_routing_result(routed, _, undefined, _, State) -> + State; +process_routing_result(routed, QPids, MsgSeqNo, _, + State = #ch{queues_for_msg = QFM}) -> + QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), + [maybe_monitor(QPid) || QPid <- QPids], + State#ch{queues_for_msg = QFM1}. + lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; lock_message(false, _MsgStruct, State) -> @@ -1128,7 +1257,8 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, false -> rabbit_writer:send_command(WriterPid, M, Content) end. -terminate(_State) -> +terminate(State) -> + stop_confirm_timer(State), pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]). @@ -1214,3 +1344,50 @@ erase_queue_stats(QPid) -> erase({queue_stats, QPid}), [erase({queue_exchange_stats, QX}) || {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. + +start_confirm_timer(State = #ch{confirm_tref = undefined}) -> + {ok, TRef} = timer:apply_after(?FLUSH_MULTIPLE_ACKS_INTERVAL, + ?MODULE, flush_multiple_acks, [self()]), + State#ch{confirm_tref = TRef}; +start_confirm_timer(State) -> + State. + +stop_confirm_timer(State = #ch{confirm_tref = undefined}) -> + State; +stop_confirm_timer(State = #ch{confirm_tref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State#ch{confirm_tref = undefined}. + +flush_multiple(State = #ch{writer_pid = WriterPid, + held_confirms = Cs, + unconfirmed = UC}) -> + case gb_sets:is_empty(Cs) of + true -> State; + false -> [First | Rest] = gb_sets:to_list(Cs), + [rabbit_writer:send_command(WriterPid, + #'basic.ack'{delivery_tag = T}) || + T <- case Rest of + [] -> [First]; + _ -> flush_multiple( + First, Rest, WriterPid, + case gb_sets:is_empty(UC) of + false -> gb_sets:smallest(UC); + true -> gb_sets:largest(Cs) + 1 + end) + end], + State#ch{held_confirms = gb_sets:new(), + confirm_tref = undefined} + end. + +flush_multiple(Prev, [Cur | Rest], WriterPid, SNA) -> + ExpNext = Prev + 1, + case {SNA >= Cur, Cur} of + {true, ExpNext} -> flush_multiple(Cur, Rest, WriterPid, SNA); + _ -> flush_multiple(Prev, [], WriterPid, SNA), + [Cur | Rest] + end; +flush_multiple(Prev, [], WriterPid, _) -> + ok = rabbit_writer:send_command(WriterPid, + #'basic.ack'{delivery_tag = Prev, + multiple = true}), + []. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index fd84109bf9..fea7d4a86b 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,7 +34,7 @@ -behaviour(gen_server2). -export([start_link/4, successfully_recovered_state/1, - client_init/2, client_terminate/1, client_delete_and_terminate/1, + client_init/3, client_terminate/1, client_delete_and_terminate/1, client_ref/1, write/3, read/2, contains/2, remove/2, release/2, sync/3]). @@ -83,7 +83,9 @@ cur_file_cache_ets, %% tid of current file cache table client_refs, %% set of references of all registered clients successfully_recovered, %% boolean: did we recover state? - file_size_limit %% how big are our files allowed to get? + file_size_limit, %% how big are our files allowed to get? + client_ondisk_callback, %% client ref to callback function mapping + cref_to_guids %% client ref to synced messages mapping }). -record(client_msstate, @@ -138,16 +140,18 @@ file_handles_ets :: ets:tid(), file_summary_ets :: ets:tid(), dedup_cache_ets :: ets:tid(), - cur_file_cache_ets :: ets:tid() }). + cur_file_cache_ets :: ets:tid()}). -type(startup_fun_state() :: {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), A}). +-type(guid_fun() :: fun ((gb_set()) -> any())). -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', startup_fun_state()) -> rabbit_types:ok_pid_or_error()). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). --spec(client_init/2 :: (server(), client_ref()) -> client_msstate()). +-spec(client_init/3 :: (server(), client_ref(), guid_fun()) -> + client_msstate()). -spec(client_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_ref/1 :: (client_msstate()) -> client_ref()). @@ -334,10 +338,11 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> successfully_recovered_state(Server) -> gen_server2:call(Server, successfully_recovered_state, infinity). -client_init(Server, Ref) -> +client_init(Server, Ref, MsgOnDiskFun) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = - gen_server2:call(Server, {new_client_state, Ref}, infinity), + gen_server2:call(Server, {new_client_state, Ref, MsgOnDiskFun}, + infinity), #client_msstate { server = Server, client_ref = Ref, file_handle_cache = dict:new(), @@ -350,9 +355,9 @@ client_init(Server, Ref) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }. -client_terminate(CState) -> +client_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), - ok = server_call(CState, client_terminate). + ok = server_call(CState, {client_terminate, Ref}). client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), @@ -361,9 +366,10 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> client_ref(#client_msstate { client_ref = Ref }) -> Ref. write(Guid, Msg, - CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> + CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, + client_ref = CRef }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - ok = server_cast(CState, {write, Guid}). + ok = server_cast(CState, {write, CRef, Guid}). read(Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, @@ -392,7 +398,8 @@ read(Guid, contains(Guid, CState) -> server_call(CState, {contains, Guid}). remove([], _CState) -> ok; -remove(Guids, CState) -> server_cast(CState, {remove, Guids}). +remove(Guids, CState = #client_msstate { client_ref = CRef }) -> + server_cast(CState, {remove, CRef, Guids}). release([], _CState) -> ok; release(Guids, CState) -> server_cast(CState, {release, Guids}). sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}). @@ -519,6 +526,13 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, end end. +clear_client_callback(CRef, + State = #msstate { client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> + State #msstate { client_ondisk_callback = dict:erase(CRef, CODC), + cref_to_guids = dict:erase(CRef, CTG)}. + + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -586,7 +600,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> cur_file_cache_ets = CurFileCacheEts, client_refs = ClientRefs1, successfully_recovered = CleanShutdown, - file_size_limit = FileSizeLimit + file_size_limit = FileSizeLimit, + client_ondisk_callback = dict:new(), + cref_to_guids = dict:new() }, %% If we didn't recover the msg location index then we need to @@ -615,10 +631,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> prioritise_call(Msg, _From, _State) -> case Msg of - successfully_recovered_state -> 7; - {new_client_state, _Ref} -> 7; - {read, _Guid} -> 2; - _ -> 0 + successfully_recovered_state -> 7; + {new_client_state, _Ref, _MODC} -> 7; + {read, _Guid} -> 2; + _ -> 0 end. prioritise_cast(Msg, _State) -> @@ -633,22 +649,29 @@ prioritise_cast(Msg, _State) -> handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); -handle_call({new_client_state, CRef}, _From, - State = #msstate { dir = Dir, - index_state = IndexState, - index_module = IndexModule, - file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts, - client_refs = ClientRefs, - gc_pid = GCPid }) -> +handle_call({new_client_state, CRef, Callback}, _From, + State = #msstate { dir = Dir, + index_state = IndexState, + index_module = IndexModule, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts, + client_refs = ClientRefs, + client_ondisk_callback = CODC, + gc_pid = GCPid }) -> + CODC1 = case Callback of + undefined -> CODC; + _ -> dict:store(CRef, Callback, CODC) + end, reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts}, - State #msstate { client_refs = sets:add_element(CRef, ClientRefs) }); + State #msstate { client_refs = sets:add_element(CRef, ClientRefs), + client_ondisk_callback = CODC1 }); -handle_call(client_terminate, _From, State) -> - reply(ok, State); +handle_call({client_terminate, CRef}, _From, + State) -> + reply(ok, clear_client_callback(CRef, State)); handle_call({read, Guid}, From, State) -> State1 = read_message(Guid, From, State), @@ -660,43 +683,63 @@ handle_call({contains, Guid}, From, State) -> handle_cast({client_delete, CRef}, State = #msstate { client_refs = ClientRefs }) -> - noreply( - State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }); + State1 = clear_client_callback(CRef, State), + noreply(State1 #msstate { + client_refs = sets:del_element(CRef, ClientRefs) }); + +handle_cast({write, CRef, Guid}, + State = #msstate { sum_valid_data = SumValid, + file_summary_ets = FileSummaryEts, + current_file = CurFile, + cur_file_cache_ets = CurFileCacheEts, + client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> -handle_cast({write, Guid}, - State = #msstate { sum_valid_data = SumValid, - file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), - case index_lookup(Guid, State) of + CTG1 = case dict:find(CRef, CODC) of + {ok, _} -> dict:update(CRef, fun(Guids) -> + gb_sets:add(Guid, Guids) + end, + gb_sets:empty(), CTG); + error -> CTG + end, + State1 = State #msstate { cref_to_guids = CTG1 }, + case index_lookup(Guid, State1) of not_found -> - write_message(Guid, Msg, State); + write_message(Guid, Msg, State1); #msg_location { ref_count = 0, file = File, total_size = TotalSize } -> case ets:lookup(FileSummaryEts, File) of [#file_summary { locked = true }] -> - ok = index_delete(Guid, State), - write_message(Guid, Msg, State); + ok = index_delete(Guid, State1), + write_message(Guid, Msg, State1); [#file_summary {}] -> - ok = index_update_ref_count(Guid, 1, State), + ok = index_update_ref_count(Guid, 1, State1), [_] = ets:update_counter( FileSummaryEts, File, [{#file_summary.valid_total_size, TotalSize}]), - noreply(State #msstate { + noreply(State1 #msstate { sum_valid_data = SumValid + TotalSize }) end; - #msg_location { ref_count = RefCount } -> + #msg_location { ref_count = RefCount, file = File } -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC - ok = index_update_ref_count(Guid, RefCount + 1, State), - noreply(State) + ok = index_update_ref_count(Guid, RefCount + 1, State1), + CTG2 = case {dict:find(CRef, CODC), File} of + {{ok, _}, CurFile} -> CTG1; + {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid)), + CTG; + _ -> CTG1 + end, + noreply(State #msstate { cref_to_guids = CTG2 }) end; -handle_cast({remove, Guids}, State) -> +handle_cast({remove, CRef, Guids}, State) -> State1 = lists:foldl( fun (Guid, State2) -> remove_message(Guid, State2) end, State, Guids), - noreply(maybe_compact(State1)); + State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1), + noreply(maybe_compact(State2)); handle_cast({release, Guids}, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> @@ -794,14 +837,19 @@ reply(Reply, State) -> {State1, Timeout} = next_state(State), {reply, Reply, State1, Timeout}. -next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) -> - {State, hibernate}; -next_state(State = #msstate { sync_timer_ref = undefined }) -> - {start_sync_timer(State), 0}; -next_state(State = #msstate { on_sync = [] }) -> - {stop_sync_timer(State), hibernate}; -next_state(State) -> - {State, 0}. +next_state(State = #msstate { sync_timer_ref = undefined, + on_sync = Syncs, + cref_to_guids = CTG }) -> + case {Syncs, dict:size(CTG)} of + {[], 0} -> {State, hibernate}; + _ -> {start_sync_timer(State), 0} + end; +next_state(State = #msstate { on_sync = Syncs, + cref_to_guids = CTG }) -> + case {Syncs, dict:size(CTG)} of + {[], 0} -> {stop_sync_timer(State), hibernate}; + _ -> {State, 0} + end. start_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]), @@ -813,15 +861,23 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> {ok, cancel} = timer:cancel(TRef), State #msstate { sync_timer_ref = undefined }. -internal_sync(State = #msstate { current_file_handle = CurHdl, - on_sync = Syncs }) -> +internal_sync(State = #msstate { current_file_handle = CurHdl, + on_sync = Syncs, + cref_to_guids = CTG }) -> State1 = stop_sync_timer(State), - case Syncs of - [] -> State1; - _ -> ok = file_handle_cache:sync(CurHdl), - lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), - State1 #msstate { on_sync = [] } - end. + CGs = dict:fold(fun (CRef, Guids, NS) -> + case gb_sets:is_empty(Guids) of + true -> NS; + false -> [{CRef, Guids} | NS] + end + end, [], CTG), + if Syncs =:= [] andalso CGs =:= [] -> ok; + true -> file_handle_cache:sync(CurHdl) + end, + lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), + [client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs], + State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. + write_message(Guid, Msg, State = #msstate { current_file_handle = CurHdl, @@ -999,6 +1055,25 @@ orddict_store(Key, Val, Dict) -> false = orddict:is_key(Key, Dict), orddict:store(Key, Val, Dict). +client_confirm(CRef, Guids, + State = #msstate { client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> + case dict:find(CRef, CODC) of + {ok, Fun} -> Fun(Guids), + CTG1 = case dict:find(CRef, CTG) of + {ok, Gs} -> + Guids1 = gb_sets:difference(Gs, Guids), + case gb_sets:is_empty(Guids1) of + true -> dict:erase(CRef, CTG); + false -> dict:store(CRef, Guids1, CTG) + end; + error -> CTG + end, + State #msstate { cref_to_guids = CTG1 }; + error -> State + end. + + %%---------------------------------------------------------------------------- %% file helper functions %%---------------------------------------------------------------------------- diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 43dbf9d4cb..b646f330cc 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/1, shutdown_terms/1, recover/4, +-export([init/2, shutdown_terms/1, recover/5, terminate/2, delete_and_terminate/1, publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). @@ -174,7 +174,7 @@ %%---------------------------------------------------------------------------- -record(qistate, { dir, segments, journal_handle, dirty_count, - max_journal_entries }). + max_journal_entries, on_sync, unsynced_guids }). -record(segment, { num, path, journal_entries, unacked }). @@ -195,21 +195,24 @@ })). -type(seq_id() :: integer()). -type(seg_dict() :: {dict(), [segment()]}). +-type(on_sync_fun() :: fun ((gb_set()) -> ok)). -type(qistate() :: #qistate { dir :: file:filename(), segments :: 'undefined' | seg_dict(), journal_handle :: hdl(), dirty_count :: integer(), - max_journal_entries :: non_neg_integer() + max_journal_entries :: non_neg_integer(), + on_sync :: on_sync_fun(), + unsynced_guids :: [rabbit_guid:guid()] }). -type(startup_fun_state() :: - {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), + {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}), A}). -type(shutdown_terms() :: [any()]). --spec(init/1 :: (rabbit_amqqueue:name()) -> qistate()). +-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). -spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()). --spec(recover/4 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), - fun ((rabbit_guid:guid()) -> boolean())) -> +-spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), + fun ((rabbit_guid:guid()) -> boolean()), on_sync_fun()) -> {'undefined' | non_neg_integer(), qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). @@ -227,8 +230,8 @@ -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). --spec(recover/1 :: - ([rabbit_amqqueue:name()]) -> {[[any()]], startup_fun_state()}). +-spec(recover/1 :: ([rabbit_amqqueue:name()]) -> + {[[any()]], startup_fun_state()}). -spec(add_queue_ttl/0 :: () -> 'ok'). @@ -239,10 +242,10 @@ %% public API %%---------------------------------------------------------------------------- -init(Name) -> +init(Name, OnSyncFun) -> State = #qistate { dir = Dir } = blank_state(Name), false = filelib:is_file(Dir), %% is_file == is file or dir - State. + State #qistate { on_sync = OnSyncFun }. shutdown_terms(Name) -> #qistate { dir = Dir } = blank_state(Name), @@ -251,13 +254,14 @@ shutdown_terms(Name) -> {ok, Terms1} -> Terms1 end. -recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun) -> +recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) -> State = #qistate { dir = Dir } = blank_state(Name), + State1 = State #qistate { on_sync = OnSyncFun }, CleanShutdown = detect_clean_shutdown(Dir), case CleanShutdown andalso MsgStoreRecovered of true -> RecoveredCounts = proplists:get_value(segments, Terms, []), - init_clean(RecoveredCounts, State); - false -> init_dirty(CleanShutdown, ContainsCheckFun, State) + init_clean(RecoveredCounts, State1); + false -> init_dirty(CleanShutdown, ContainsCheckFun, State1) end. terminate(Terms, State) -> @@ -270,9 +274,13 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(Guid, SeqId, MsgProps, IsPersistent, State) when is_binary(Guid) -> +publish(Guid, SeqId, MsgProps, IsPersistent, + State = #qistate { unsynced_guids = UnsyncedGuids }) + when is_binary(Guid) -> ?GUID_BYTES = size(Guid), - {JournalHdl, State1} = get_journal_handle(State), + {JournalHdl, State1} = get_journal_handle( + State #qistate { + unsynced_guids = [Guid | UnsyncedGuids] }), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; @@ -303,7 +311,7 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> %% seqids not being in the journal, provided the transaction isn't %% emptied (handled above anyway). ok = file_handle_cache:sync(JournalHdl), - State. + notify_sync(State). flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). @@ -393,7 +401,9 @@ blank_state(QueueName) -> segments = segments_new(), journal_handle = undefined, dirty_count = 0, - max_journal_entries = MaxJournal }. + max_journal_entries = MaxJournal, + on_sync = fun (_) -> ok end, + unsynced_guids = [] }. clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). @@ -625,7 +635,7 @@ flush_journal(State = #qistate { segments = Segments }) -> {JournalHdl, State1} = get_journal_handle(State #qistate { segments = Segments1 }), ok = file_handle_cache:clear(JournalHdl), - State1 #qistate { dirty_count = 0 }. + notify_sync(State1 #qistate { dirty_count = 0 }). append_journal_to_segment(#segment { journal_entries = JEntries, path = Path } = Segment) -> @@ -713,6 +723,10 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). +notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) -> + OnSyncFun(gb_sets:from_list(UG)), + State #qistate { unsynced_guids = [] }. + %%---------------------------------------------------------------------------- %% segment manipulation %%---------------------------------------------------------------------------- diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 00df1ce1f7..e36feb76af 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -72,7 +72,8 @@ deliver(QNames, Delivery = #delivery{mandatory = false, QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; -deliver(QNames, Delivery) -> +deliver(QNames, Delivery = #delivery{mandatory = Mandatory, + immediate = Immediate}) -> QPids = lookup_qpids(QNames), {Success, _} = delegate:invoke(QPids, @@ -80,9 +81,11 @@ deliver(QNames, Delivery) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {Routed, Handled} = - lists:foldl(fun fold_deliveries/2, {false, []}, Success), - check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - {Routed, Handled}). + lists:foldl(fun fold_deliveries/2, {false, []}, Success), + case check_delivery(Mandatory, Immediate, {Routed, Handled}) of + {routed, Qs} -> {routed, Qs}; + O -> O + end. %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same source diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 27e4d925af..8b58b822a4 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1470,12 +1470,12 @@ msg_store_remove(MsgStore, Ref, Guids) -> with_msg_store_client(MsgStore, Ref, Fun) -> rabbit_msg_store:client_terminate( - Fun(rabbit_msg_store:client_init(MsgStore, Ref))). + Fun(rabbit_msg_store:client_init(MsgStore, Ref, undefined))). foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> rabbit_msg_store:client_terminate( lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MSCState) end, - rabbit_msg_store:client_init(MsgStore, Ref), L)). + rabbit_msg_store:client_init(MsgStore, Ref, undefined), L)). test_msg_store() -> restart_msg_store_empty(), @@ -1483,7 +1483,8 @@ test_msg_store() -> Guids = [guid_bin(M) || M <- lists:seq(1,100)], {Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids), Ref = rabbit_guid:guid(), - MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, + undefined), %% check we don't contain any of the msgs we're about to publish false = msg_store_contains(false, Guids, MSCState), %% publish the first half @@ -1549,7 +1550,8 @@ test_msg_store() -> ([Guid|GuidsTail]) -> {Guid, 0, GuidsTail} end, Guids2ndHalf}), - MSCState5 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + MSCState5 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, + undefined), %% check we have the right msgs left lists:foldl( fun (Guid, Bool) -> @@ -1558,7 +1560,8 @@ test_msg_store() -> ok = rabbit_msg_store:client_terminate(MSCState5), %% restart empty restart_msg_store_empty(), - MSCState6 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + MSCState6 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, + undefined), %% check we don't contain any of the msgs false = msg_store_contains(false, Guids, MSCState6), %% publish the first half again @@ -1566,7 +1569,8 @@ test_msg_store() -> %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( msg_store_read(Guids1stHalf, MSCState6)), - MSCState7 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + MSCState7 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, + undefined), ok = rabbit_msg_store:remove(Guids1stHalf, MSCState7), ok = rabbit_msg_store:client_terminate(MSCState7), %% restart empty @@ -1625,12 +1629,13 @@ init_test_queue() -> Terms = rabbit_queue_index:shutdown_terms(TestQueue), PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()), PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, - PRef), + PRef, undefined), Res = rabbit_queue_index:recover( TestQueue, Terms, false, fun (Guid) -> rabbit_msg_store:contains(Guid, PersistentClient) - end), + end, + fun nop/1), ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient), Res. @@ -1658,7 +1663,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> true -> ?PERSISTENT_MSG_STORE; false -> ?TRANSIENT_MSG_STORE end, - MSCState = rabbit_msg_store:client_init(MsgStore, Ref), + MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined), {A, B} = lists:foldl( fun (SeqId, {QiN, SeqIdsGuidsAcc}) -> @@ -1850,7 +1855,8 @@ assert_props(List, PropVals) -> with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_queue(), true, false), + VQ = rabbit_variable_queue:init(test_queue(), true, false, + fun nop/1, fun nop/1), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -1935,7 +1941,6 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% start by sending in a couple of segments worth Len = 2*SegmentSize, VQ1 = variable_queue_publish(false, Len, VQ0), - %% squeeze and relax queue Churn = Len div 32, VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), @@ -1953,7 +1958,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags, VQ8), + {_, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -1963,7 +1968,8 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)). + {_, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), + publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), @@ -1996,7 +2002,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + {_, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2025,7 +2031,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + fun nop/1, fun nop/1), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -2041,7 +2048,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + fun nop/1, fun nop/1), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2071,7 +2079,8 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = rabbit_variable_queue:init(QName, true, true), + VQ1 = rabbit_variable_queue:init(QName, true, true, + fun nop/1, fun nop/1), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), @@ -2131,3 +2140,5 @@ test_configurable_server_properties() -> application:set_env(rabbit, server_properties, ServerProperties), passed. + +nop(_) -> ok. diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index b9993823d1..548014be3e 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -90,7 +90,8 @@ sender :: pid(), message :: message()}). -type(message_properties() :: - #message_properties{expiry :: pos_integer() | 'undefined'}). + #message_properties{expiry :: pos_integer() | 'undefined', + needs_confirming :: boolean()}). %% this is really an abstract type, but dialyzer does not support them -type(txn() :: rabbit_guid:guid()). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 5ac042a252..27e482d5e2 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,7 +31,7 @@ -module(rabbit_variable_queue). --export([init/3, terminate/1, delete_and_terminate/1, +-export([init/5, init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, @@ -263,11 +263,14 @@ ram_index_count, out_counter, in_counter, + rates, + msgs_on_disk, + msg_indices_on_disk, + unconfirmed, ack_out_counter, ack_in_counter, - rates, ack_rates - }). + }). -record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). @@ -354,9 +357,12 @@ ram_index_count :: non_neg_integer(), out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), + rates :: rates(), + msgs_on_disk :: gb_set(), + msg_indices_on_disk :: gb_set(), + unconfirmed :: gb_set(), ack_out_counter :: non_neg_integer(), ack_in_counter :: non_neg_integer(), - rates :: rates(), ack_rates :: rates() }). -include("rabbit_backing_queue_spec.hrl"). @@ -403,16 +409,23 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(QueueName, IsDurable, false) -> - IndexState = rabbit_queue_index:init(QueueName), +init(QueueName, IsDurable, Recover) -> + Self = self(), + init(QueueName, IsDurable, Recover, + fun (Guids) -> msgs_written_to_disk(Self, Guids) end, + fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end). + +init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) -> + IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), init(IsDurable, IndexState, 0, [], case IsDurable of - true -> msg_store_client_init(?PERSISTENT_MSG_STORE); + true -> msg_store_client_init(?PERSISTENT_MSG_STORE, + MsgOnDiskFun); false -> undefined end, - msg_store_client_init(?TRANSIENT_MSG_STORE)); + msg_store_client_init(?TRANSIENT_MSG_STORE, undefined)); -init(QueueName, true, true) -> +init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) -> Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of @@ -422,16 +435,17 @@ init(QueueName, true, true) -> _ -> {rabbit_guid:guid(), rabbit_guid:guid(), []} end, PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, - PRef), + PRef, MsgOnDiskFun), TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, - TRef), + TRef, undefined), {DeltaCount, IndexState} = rabbit_queue_index:recover( QueueName, Terms1, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), fun (Guid) -> rabbit_msg_store:contains(Guid, PersistentClient) - end), + end, + MsgIdxOnDiskFun), init(true, IndexState, DeltaCount, Terms1, PersistentClient, TransientClient). @@ -507,25 +521,30 @@ publish(Msg, MsgProps, State) -> publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; -publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, - MsgProps, +publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, + guid = Guid }, + MsgProps = #message_properties { + needs_confirming = NeedsConfirming }, State = #vqstate { len = 0, next_seq_id = SeqId, out_counter = OutCount, in_counter = InCount, persistent_count = PCount, - durable = IsDurable }) -> + durable = IsDurable, + unconfirmed = Unconfirmed }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), + Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), {SeqId, a(reduce_memory_use( State2 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, - persistent_count = PCount1 }))}. + persistent_count = PCount1, + unconfirmed = Unconfirmed1 }))}. dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), @@ -635,9 +654,15 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { persistent_count = PCount1 })}. ack(AckTags, State) -> - a(ack(fun msg_store_remove/3, - fun (_AckEntry, State1) -> State1 end, - AckTags, State)). + {Guids, State1} = + ack(fun msg_store_remove/3, + fun ({_IsPersistent, Guid, _MsgProps}, State1) -> + remove_confirms(gb_sets:singleton(Guid), State1); + (#msg_status{msg = #basic_message{guid = Guid}}, State1) -> + remove_confirms(gb_sets:singleton(Guid), State1) + end, + AckTags, State), + {Guids, a(State1)}. tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, State = #vqstate { durable = IsDurable, @@ -687,7 +712,7 @@ tx_commit(Txn, Fun, MsgPropsFun, end)}. requeue(AckTags, MsgPropsFun, State) -> - a(reduce_memory_use( + {_Guids, State1} = ack(fun msg_store_release/3, fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps), @@ -702,7 +727,8 @@ requeue(AckTags, MsgPropsFun, State) -> true, true, State2), State3 end, - AckTags, State))). + AckTags, State), + a(reduce_memory_use(State1)). len(#vqstate { len = Len }) -> Len. @@ -882,6 +908,10 @@ one_if(false) -> 0. cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. +gb_sets_maybe_insert(false, _Val, Set) -> Set; +%% when requeueing, we re-add a guid to the unconfimred set +gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). + msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgProps) -> #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, @@ -903,8 +933,8 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> end), Res. -msg_store_client_init(MsgStore) -> - rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid()). +msg_store_client_init(MsgStore, MsgOnDiskFun) -> + rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun). msg_store_write(MSCState, IsPersistent, Guid, Msg) -> with_immutable_msg_store_state( @@ -1050,6 +1080,9 @@ init(IsDurable, IndexState, DeltaCount, Terms, ram_index_count = 0, out_counter = 0, in_counter = 0, + msgs_on_disk = gb_sets:new(), + msg_indices_on_disk = gb_sets:new(), + unconfirmed = gb_sets:new(), ack_out_counter = 0, ack_in_counter = 0, rates = blank_rate(Now, DeltaCount1), @@ -1079,7 +1112,7 @@ msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> end. remove_persistent_messages(Guids) -> - PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE), + PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, undefined), ok = rabbit_msg_store:remove(Guids, PersistentClient), rabbit_msg_store:client_delete_and_terminate(PersistentClient). @@ -1130,6 +1163,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { durable = IsDurable }) -> PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), + {_Guids, NewState} = ack(Acks, State), Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), {Msg, MsgProps} <- lists:reverse(PubsN)], {SeqIds, State1 = #vqstate { index_state = IndexState }} = @@ -1141,7 +1175,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { {SeqId, State3} = publish(Msg, MsgProps, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} - end, {PAcks, ack(Acks, State)}, Pubs), + end, {PAcks, NewState}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], reduce_memory_use( @@ -1195,15 +1229,17 @@ sum_guids_by_store_to_len(LensByStore, GuidsByStore) -> %% Internal gubbins for publishing %%---------------------------------------------------------------------------- -publish(Msg = #basic_message { is_persistent = IsPersistent }, - MsgProps, IsDelivered, MsgOnDisk, +publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, + MsgProps = #message_properties { needs_confirming = NeedsConfirming }, + IsDelivered, MsgOnDisk, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, len = Len, in_counter = InCount, persistent_count = PCount, durable = IsDurable, - ram_msg_count = RamMsgCount }) -> + ram_msg_count = RamMsgCount, + unconfirmed = Unconfirmed}) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, @@ -1213,11 +1249,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), + Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, len = Len + 1, in_counter = InCount + 1, persistent_count = PCount1, - ram_msg_count = RamMsgCount + 1}}. + ram_msg_count = RamMsgCount + 1, + unconfirmed = Unconfirmed1 }}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, _MSCState) -> @@ -1310,7 +1348,7 @@ remove_pending_ack(KeepPersistent, end. ack(_MsgStoreFun, _Fun, [], State) -> - State; + {[], State}; ack(MsgStoreFun, Fun, AckTags, State) -> {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, @@ -1328,14 +1366,17 @@ ack(MsgStoreFun, Fun, AckTags, State) -> gb_trees:delete_any(SeqId, RAI)})} end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - ok = orddict:fold(fun (IsPersistent, Guids, ok) -> - MsgStoreFun(MSCState, IsPersistent, Guids) - end, ok, GuidsByStore), + AckdGuids = lists:concat( + orddict:fold( + fun (IsPersistent, Guids, Gs) -> + MsgStoreFun(MSCState, IsPersistent, Guids), + [Guids | Gs] + end, [], GuidsByStore)), PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), - State1 #vqstate { index_state = IndexState1, + {AckdGuids, State1 #vqstate { index_state = IndexState1, persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) }. + ack_out_counter = AckOutCount + length(AckTags) }}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, @@ -1352,6 +1393,44 @@ find_persistent_count(LensByStore) -> end. %%---------------------------------------------------------------------------- +%% Internal plumbing for confirms (aka publisher acks) +%%---------------------------------------------------------------------------- + +remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet), + msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet), + unconfirmed = gb_sets:difference(UC, GuidSet) }. + +msgs_confirmed(GuidSet, State) -> + {{confirm, gb_sets:to_list(GuidSet)}, remove_confirms(GuidSet, State)}. + +msgs_written_to_disk(QPid, GuidSet) -> + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + QPid, fun(State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + msgs_confirmed(gb_sets:intersection(GuidSet, MIOD), + State #vqstate { + msgs_on_disk = + gb_sets:intersection( + gb_sets:union(MOD, GuidSet), UC) }) + end). + +msg_indices_written_to_disk(QPid, GuidSet) -> + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + QPid, fun(State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + msgs_confirmed(gb_sets:intersection(GuidSet, MOD), + State #vqstate { + msg_indices_on_disk = + gb_sets:intersection( + gb_sets:union(MIOD, GuidSet), UC) }) + end). + +%%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- |
