diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-29 10:32:39 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-29 10:32:39 +0100 |
| commit | 1ec0627fab0541144dea4960977601590895afb2 (patch) | |
| tree | 49544479716a15780b1efc145e6f0f7b392458fe | |
| parent | f18e91e08b265c952b1d46b9d1b49e8216d3ab47 (diff) | |
| parent | 961b38f9564e8c20e1182d398191d456f2bc8352 (diff) | |
| download | rabbitmq-server-git-1ec0627fab0541144dea4960977601590895afb2.tar.gz | |
pull from default
| -rw-r--r-- | Makefile | 1 | ||||
| -rw-r--r-- | include/rabbit.hrl | 2 | ||||
| -rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 7 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 134 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 299 | ||||
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 121 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 165 |
14 files changed, 669 insertions, 200 deletions
@@ -1,4 +1,3 @@ - TMPDIR ?= /tmp RABBITMQ_NODENAME ?= rabbit diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 24aa8d987c..700523d7f7 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -69,7 +69,7 @@ is_persistent}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, immediate, txn, sender, message}). +-record(delivery, {mandatory, immediate, txn, sender, message, origin, msg_seq_no}). -record(amqp_error, {name, explanation, method = none}). -record(event, {type, props, timestamp}). diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 005994f09f..c01e924688 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -43,9 +43,10 @@ -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). --spec(publish/2 :: (rabbit_types:basic_message(), state()) -> state()). --spec(publish_delivered/3 :: - (ack_required(), rabbit_types:basic_message(), state()) -> {ack(), state()}). +-spec(publish/3 :: (rabbit_types:basic_message(), boolean(), state()) -> state()). +-spec(publish_delivered/4 :: + (ack_required(), rabbit_types:basic_message(), boolean(), state()) + -> {ack(), state()}). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). -spec(ack/2 :: ([ack()], state()) -> state()). -spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(), state()) -> state()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 42bddc5e81..d756fcb9f0 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -371,15 +371,18 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity). deliver(QPid, #delivery{immediate = true, - txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid}, + txn = Txn, sender = ChPid, message = Message, + msg_seq_no = MsgSeqNo}) -> + gen_server2:call(QPid, {deliver_immediately, Txn, Message, MsgSeqNo, ChPid}, infinity); deliver(QPid, #delivery{mandatory = true, - txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity), + txn = Txn, sender = ChPid, message = Message, + msg_seq_no = MsgSeqNo}) -> + gen_server2:call(QPid, {deliver, Txn, Message, MsgSeqNo, ChPid}, infinity), true; -deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}), +deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message, + msg_seq_no = MsgSeqNo}) -> + gen_server2:cast(QPid, {deliver, Txn, Message, MsgSeqNo, ChPid}), true. requeue(QPid, MsgIds, ChPid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2c53a8e319..824c4401f0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -61,7 +61,8 @@ sync_timer_ref, rate_timer_ref, expiry_timer_ref, - stats_timer + stats_timer, + guid_to_channel }). -record(consumer, {tag, ack_required}). @@ -123,7 +124,8 @@ init(Q) -> sync_timer_ref = undefined, rate_timer_ref = undefined, expiry_timer_ref = undefined, - stats_timer = rabbit_event:init_stats_timer()}, hibernate, + stats_timer = rabbit_event:init_stats_timer(), + guid_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -341,11 +343,13 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), - ChAckTags1 = case AckRequired of - true -> sets:add_element( - AckTag, ChAckTags); - false -> ChAckTags - end, + {State2, ChAckTags1} = + case AckRequired of + true -> {State1, sets:add_element(AckTag, ChAckTags)}; + false -> {confirm_message_internal( + Message#basic_message.guid, + State1), ChAckTags} + end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, store_ch_record(NewC), @@ -361,10 +365,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - State2 = State1#q{ + State3 = State2#q{ active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers}, - deliver_msgs_to_consumers(Funs, FunAcc1, State2); + deliver_msgs_to_consumers(Funs, FunAcc1, State3); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> store_ch_record(C#cr{is_limit_active = true}), @@ -395,6 +399,27 @@ deliver_from_queue_deliver(AckRequired, false, {{Message, IsDelivered, AckTag}, 0 == Remaining, State #q { backing_queue_state = BQS1 }}. +confirm_messages_internal(Guids, State) when is_list(Guids) -> + lists:foldl(fun(Guid, State0) -> + confirm_message_internal(Guid, State0) + end, State, Guids). + +confirm_message_internal(Guid, State = #q { guid_to_channel = GTC }) -> + case dict:find(Guid, GTC) of + {ok, {_ , undefined}} -> ok; + {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo); + _ -> ok + end, + State #q { guid_to_channel = dict:erase(Guid, GTC) }. + +maybe_record_confirm_message(undefined, _, _, State) -> + State; +maybe_record_confirm_message(MsgSeqNo, + #basic_message { guid = Guid }, + ChPid, State) -> + State #q { guid_to_channel = + dict:store(Guid, {ChPid, MsgSeqNo}, State#q.guid_to_channel) }. + run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Funs = {fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3}, @@ -402,28 +427,30 @@ run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), State1. -attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> +attempt_delivery(none, _ChPid, Message, MsgSeqNo, State = #q{backing_queue = BQ}) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> {AckTag, BQS1} = - BQ:publish_delivered(AckRequired, Message, BQS), + BQ:publish_delivered(AckRequired, Message, + MsgSeqNo =/= undefined, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +attempt_delivery(Txn, ChPid, Message, _MSN, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. -deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> - case attempt_delivery(Txn, ChPid, Message, State) of +deliver_or_enqueue(Txn, ChPid, Message, MsgSeqNo, State = #q{backing_queue = BQ}) -> + case attempt_delivery(Txn, ChPid, Message, MsgSeqNo, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - BQS = BQ:publish(Message, State #q.backing_queue_state), + BQS = BQ:publish(Message, MsgSeqNo =/= undefined, + State #q.backing_queue_state), {false, NewState#q{backing_queue_state = BQS}} end. @@ -522,7 +549,14 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - run_message_queue(State#q{backing_queue_state = Fun(BQS)}). + case Fun(BQS) of + {BQS1, {confirm, Guids}} -> + run_message_queue( + confirm_messages_internal(Guids, + State #q { backing_queue_state = BQS1 })); + BQS1 -> + run_message_queue(State#q{backing_queue_state = BQS1}) + end. commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -656,7 +690,7 @@ handle_call(consumers, _From, [{ChPid, ConsumerTag, AckRequired} | Acc] end, [], queue:join(ActiveConsumers, BlockedConsumers)), State); -handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> +handle_call({deliver_immediately, Txn, Message, MsgSeqNo, ChPid}, _From, State) -> %% Synchronous, "immediate" delivery mode %% %% FIXME: Is this correct semantics? @@ -670,13 +704,15 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), - reply(Delivered, NewState); + State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State), + {Delivered, State2} = attempt_delivery(Txn, ChPid, Message, MsgSeqNo, State1), + reply(Delivered, State2); -handle_call({deliver, Txn, Message, ChPid}, _From, State) -> +handle_call({deliver, Txn, Message, MsgSeqNo, ChPid}, _From, State) -> %% Synchronous, "mandatory" delivery mode - {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), - reply(Delivered, NewState); + State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State), + {Delivered, State2} = deliver_or_enqueue(Txn, ChPid, Message, MsgSeqNo, State1), + reply(Delivered, State2); handle_call({commit, Txn, ChPid}, From, State) -> NewState = commit_transaction(Txn, From, ChPid, State), @@ -701,14 +737,18 @@ handle_call({basic_get, ChPid, NoAck}, _From, case BQ:fetch(AckRequired, BQS) of {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1}); {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> - case AckRequired of - true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - store_ch_record( - C#cr{acktags = sets:add_element(AckTag, ChAckTags)}); - false -> ok - end, + State2 = case AckRequired of + true -> + C = #cr{acktags = ChAckTags} = ch_record(ChPid), + store_ch_record( + C#cr{acktags = sets:add_element(AckTag, ChAckTags)}), + State1; + false -> + confirm_message_internal(Message#basic_message.guid, + State1) + end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1}) + reply({ok, Remaining, Msg}, State2#q{backing_queue_state = BQS1}) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, @@ -827,10 +867,12 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). -handle_cast({deliver, Txn, Message, ChPid}, State) -> + +handle_cast({deliver, Txn, Message, MsgSeqNo, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), - noreply(NewState); + State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State), + {_Delivered, State2} = deliver_or_enqueue(Txn, ChPid, Message, MsgSeqNo, State1), + noreply(State2); handle_cast({ack, Txn, AckTags, ChPid}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -838,14 +880,23 @@ handle_cast({ack, Txn, AckTags, ChPid}, not_found -> noreply(State); C = #cr{acktags = ChAckTags} -> - {C1, BQS1} = + {C1, State1} = case Txn of - none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)}; - _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} + none -> + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + NewC = C#cr{acktags = ChAckTags1}, + AckdGuids = BQ:seqids_to_guids(AckTags, BQS), + NewBQS = BQ:ack(AckTags, BQS), + NewState = confirm_messages_internal( + AckdGuids, + State #q { backing_queue_state = NewBQS }), + {NewC, NewState}; + _ -> + {C#cr{txn = Txn}, + State #q { backing_queue_state = BQ:tx_ack(Txn, AckTags, BQS) }} end, store_ch_record(C1), - noreply(State#q{backing_queue_state = BQS1}) + noreply(State1) end; handle_cast({reject, AckTags, Requeue, ChPid}, @@ -858,8 +909,11 @@ handle_cast({reject, AckTags, Requeue, ChPid}, store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> BQS1 = BQ:ack(AckTags, BQS), - State #q { backing_queue_state = BQS1 } + false -> AckdGuids = BQ:seqids_to_guids(AckTags, BQS), + BQS1 = BQ:ack(AckTags, BQS), + confirm_messages_internal( + AckdGuids, + State #q { backing_queue_state = BQS1 }) end) end; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 2230c507e9..32f9f15ab0 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -62,12 +62,12 @@ behaviour_info(callbacks) -> {purge, 1}, %% Publish a message. - {publish, 2}, + {publish, 3}, %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). - {publish_delivered, 3}, + {publish_delivered, 4}, %% Produce the next message. {fetch, 2}, diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index d62fc07cb0..348310d9b9 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/4, properties/1, delivery/4]). +-export([publish/1, message/4, properties/1, delivery/5]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). -export([is_message_persistent/1]). @@ -50,9 +50,10 @@ -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). --spec(delivery/4 :: +-spec(delivery/5 :: (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), - rabbit_types:message()) -> rabbit_types:delivery()). + rabbit_types:message(), undefined | integer()) + -> rabiit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(), binary()) @@ -92,9 +93,9 @@ publish(Delivery = #delivery{ Other end. -delivery(Mandatory, Immediate, Txn, Message) -> +delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, - sender = self(), message = Message}. + sender = self(), message = Message, msg_seq_no = MsgSeqNo}. build_content(Properties, BodyBin) -> %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 @@ -161,7 +162,8 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, BodyBin) -> publish(delivery(Mandatory, Immediate, Txn, message(ExchangeName, RoutingKeyBin, - properties(Properties), BodyBin))). + properties(Properties), BodyBin), + undefined)). is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index bde11f00e0..4bb1f13b02 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,7 +38,7 @@ -export([start_link/7, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([emit_stats/1, flush/1]). +-export([emit_stats/1, flush/1, flush_multiple_acks/1, confirm/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -48,7 +48,9 @@ start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, queue_collector_pid, stats_timer}). + consumer_mapping, blocking, queue_collector_pid, stats_timer, + confirm_enabled, published_count, confirm_multiple, confirm_tref, + held_confirms, need_confirming, qpid_to_msgs}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -69,6 +71,8 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). +-define(FLUSH_MULTIPLE_ACKS_INTERVAL, 1000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -98,6 +102,8 @@ -spec(info_all/0 :: () -> [[rabbit_types:info()]]). -spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]). -spec(emit_stats/1 :: (pid()) -> 'ok'). +-spec(flush_multiple_acks/1 :: (pid()) -> 'ok'). +-spec(confirm/2 ::(pid(), integer()) -> 'ok'). -endif. @@ -152,6 +158,13 @@ emit_stats(Pid) -> flush(Pid) -> gen_server2:call(Pid, flush). +flush_multiple_acks(Pid) -> + gen_server2:cast(Pid, flush_multiple_acks). + +confirm(Pid, MsgSeqNo) -> + gen_server2:cast(Pid, {confirm, MsgSeqNo}). + + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, @@ -159,24 +172,30 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), StatsTimer = rabbit_event:init_stats_timer(), - State = #ch{state = starting, - channel = Channel, - reader_pid = ReaderPid, - writer_pid = WriterPid, - limiter_pid = undefined, - start_limiter_fun = StartLimiterFun, - transaction_id = none, - tx_participants = sets:new(), - next_tag = 1, - uncommitted_ack_q = queue:new(), - unacked_message_q = queue:new(), - username = Username, - virtual_host = VHost, - most_recently_declared_queue = <<>>, - consumer_mapping = dict:new(), - blocking = dict:new(), - queue_collector_pid = CollectorPid, - stats_timer = StatsTimer}, + State = #ch{ state = starting, + channel = Channel, + reader_pid = ReaderPid, + writer_pid = WriterPid, + limiter_pid = undefined, + start_limiter_fun = StartLimiterFun, + transaction_id = none, + tx_participants = sets:new(), + next_tag = 1, + uncommitted_ack_q = queue:new(), + unacked_message_q = queue:new(), + username = Username, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + consumer_mapping = dict:new(), + blocking = dict:new(), + queue_collector_pid = CollectorPid, + stats_timer = StatsTimer, + confirm_enabled = false, + published_count = 0, + confirm_multiple = false, + held_confirms = gb_sets:new(), + need_confirming = gb_sets:new(), + qpid_to_msgs = dict:new() }, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -257,19 +276,51 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> internal_emit_stats(State), {noreply, - State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}}. - -handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> + State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}}; + +handle_cast(flush_multiple_acks, + State = #ch{writer_pid = WriterPid, + held_confirms = As, + need_confirming = NA}) -> + handle_multiple_flush(WriterPid, As, NA), + {noreply, State #ch { held_confirms = gb_sets:new(), + confirm_tref = undefined }}; + +handle_cast({confirm, MsgSeqNo}, State) -> + {noreply, send_or_enqueue_ack(MsgSeqNo, State)}; + +handle_cast({msg_sent_to_queues, MsgSeqNo, QPids}, State) -> + {noreply, lists:foldl(fun (QPid, State0) -> + msg_sent_to_queues(MsgSeqNo, QPid, State0) + end, State, QPids)}. + + +handle_info({'DOWN', _MRef, process, QPid, _Reason}, + State = #ch{qpid_to_msgs = QTM}) -> + State1 = case dict:find(QPid, QTM) of + {ok, Msgs} -> + S = gb_sets:fold(fun (MsgSeqNo, State0) -> + send_or_enqueue_ack(MsgSeqNo, State0) + end, State, Msgs), + S #ch {qpid_to_msgs = dict:erase(QPid, QTM)}; + error -> + State + end, erase_queue_stats(QPid), - {noreply, queue_blocked(QPid, State)}. + {noreply, queue_blocked(QPid, State1)}. -handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> +handle_pre_hibernate(State = #ch { writer_pid = WriterPid, + held_confirms = As, + stats_timer = StatsTimer, + need_confirming = NA }) -> ok = clear_permission_cache(), - rabbit_event:if_enabled(StatsTimer, fun () -> - internal_emit_stats(State) + handle_multiple_flush(WriterPid, As, NA), + rabbit_event:if_enabled(StatsTimer, fun() -> + internal_emit_stats(State) end), - {hibernate, - State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}. + {hibernate, State #ch { held_confirms = gb_sets:new(), + stats_timer = rabbit_event:stop_stats_timer(StatsTimer), + confirm_tref = undefined }}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -405,6 +456,58 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. +send_or_enqueue_ack(undefined, State) -> + State; +send_or_enqueue_ack(_, State = #ch{confirm_enabled = false}) -> + State; +send_or_enqueue_ack(MsgSeqNo, + State = #ch{confirm_multiple = false}) -> + do_if_not_dup(MsgSeqNo, State, + fun(MSN, S = #ch{writer_pid = WriterPid, + qpid_to_msgs = QTM}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.ack'{delivery_tag = MSN}), + S #ch { qpid_to_msgs = + dict:map(fun (_, Msgs) -> + gb_sets:delete_any(MsgSeqNo, Msgs) + end, QTM) } + end); +send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) -> + do_if_not_dup(MsgSeqNo, State, + fun(MSN, S = #ch{qpid_to_msgs = QTM}) -> + State1 = start_ack_timer(S), + State1 #ch { held_confirms = + gb_sets:add(MSN, State1#ch.held_confirms), + qpid_to_msgs = + dict:map(fun (_, Msgs) -> + gb_sets:delete_any(MsgSeqNo, + Msgs) + end, QTM) } + end). + +msg_sent_to_queues(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) -> + case dict:find(QPid, QTM) of + {ok, Msgs} -> + State #ch {qpid_to_msgs = dict:store(QPid, + gb_sets:add(MsgSeqNo, Msgs), + QTM) }; + error -> + erlang:monitor(process, QPid), + State #ch { qpid_to_msgs = dict:store(QPid, + gb_sets:add(MsgSeqNo, gb_sets:new()), + QTM) } + end. + +do_if_not_dup(MsgSeqNo, State = #ch{need_confirming = NA}, Fun) -> + case gb_sets:is_element(MsgSeqNo, NA) of + true -> + State1 = Fun(MsgSeqNo, State), + State1 #ch { need_confirming = gb_sets:delete(MsgSeqNo, NA) }; + false -> + State + end. + + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -437,6 +540,18 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), IsPersistent = is_message_persistent(DecodedContent), + {MsgSeqNo, State1} + = case State#ch.confirm_enabled of + false -> + {undefined, State}; + true -> + Count = State#ch.published_count, + {Count, + State #ch { published_count = Count + 1, + need_confirming = + gb_sets:add(Count, + State#ch.need_confirming) }} + end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, @@ -445,18 +560,33 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), - case RoutingRes of - routed -> ok; - unroutable -> ok = basic_return(Message, WriterPid, no_route); - not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) - end, + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, + case IsPersistent of + true -> MsgSeqNo; + false -> undefined + end)), + State2 = case RoutingRes of + %% Confirm transient messages now + routed -> + case {IsPersistent, DeliveredQPids} of + {_, []} -> send_or_enqueue_ack(MsgSeqNo, State1); + {true, _} -> State1; + {false, _} -> send_or_enqueue_ack(MsgSeqNo, State1) + end; + %% Confirm after basic.returns + unroutable -> + ok = basic_return(Message, WriterPid, no_route), + send_or_enqueue_ack(MsgSeqNo, State1); + not_delivered -> + ok = basic_return(Message, WriterPid, no_consumers), + send_or_enqueue_ack(MsgSeqNo, State1) + end, maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || - QPid <- DeliveredQPids]], publish, State), + QPid <- DeliveredQPids]], publish, State2), {noreply, case TxnKey of - none -> State; - _ -> add_tx_participants(DeliveredQPids, State) + none -> State2; + _ -> add_tx_participants(DeliveredQPids, State2) end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, @@ -844,6 +974,11 @@ handle_method(#'queue.purge'{queue = QueueNameBin, return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); + +handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> + rabbit_misc:protocol_error( + precondition_failed, "cannot switch from confirm to tx mode", []); + handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) -> {reply, #'tx.select_ok'{}, new_tx(State)}; @@ -864,6 +999,39 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; +handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) + when TxId =/= none -> + rabbit_misc:protocol_error( + precondition_failed, "cannot switch from tx to confirm mode", []); + +handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, + _, + State = #ch{confirm_enabled = false}) -> + rabbit_log:info("got confirm.select{multiple = ~p, nowait = ~p}~n", + [Multiple, NoWait]), + State1 = State #ch { confirm_enabled = true, + confirm_multiple = Multiple }, + case NoWait of + true -> {noreply, State1}; + false -> {reply, #'confirm.select_ok'{}, State1} + end; + +handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, + _, + State = #ch{confirm_enabled = true, + confirm_multiple = Multiple}) -> + rabbit_log:info("got a confirm.select with same options~n"), + case NoWait of + true -> {noreply, State}; + false -> {reply, #'confirm.select_ok'{}, State} + end; + +handle_method(#'confirm.select'{}, + _, + #ch{confirm_enabled = true}) -> + rabbit_misc:protocol_error( + precondition_failed, "cannot change confirm channel multiple setting", []); + handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of @@ -872,6 +1040,7 @@ handle_method(#'channel.flow'{active = true}, _, end, {reply, #'channel.flow_ok'{active = true}, State#ch{limiter_pid = LimiterPid1}}; + handle_method(#'channel.flow'{active = false}, _, State = #ch{limiter_pid = LimiterPid, consumer_mapping = Consumers}) -> @@ -1106,7 +1275,8 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, false -> rabbit_writer:send_command(WriterPid, M, Content) end. -terminate(_State) -> +terminate(State) -> + stop_ack_timer(State), pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]). @@ -1186,3 +1356,54 @@ erase_queue_stats(QPid) -> erase({queue_stats, QPid}), [erase({queue_exchange_stats, QX}) || {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. + +start_ack_timer(State = #ch{confirm_tref = undefined}) -> + {ok, TRef} = timer:apply_after(?FLUSH_MULTIPLE_ACKS_INTERVAL, + ?MODULE, flush_multiple_acks, [self()]), + State #ch { confirm_tref = TRef }; +start_ack_timer(State) -> + State. + +stop_ack_timer(State = #ch{confirm_tref = undefined}) -> + State; +stop_ack_timer(State = #ch{confirm_tref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State #ch { confirm_tref = undefined }. + +handle_multiple_flush(WriterPid, As, NA) -> + case gb_sets:is_empty(As) of + true -> ok; + false -> flush_multiple(As, WriterPid, case gb_sets:is_empty(NA) of + false -> gb_sets:smallest(NA); + true -> gb_sets:largest(As)+1 + end) + end. + + +flush_multiple(Acks, WriterPid, SmallestNotAcked) -> + [First | Rest] = gb_sets:to_list(Acks), + Remaining = case Rest of + [] -> [First]; + _ -> flush_multiple(First, Rest, WriterPid, SmallestNotAcked) + end, + lists:foreach(fun(A) -> + ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = A}) + end, Remaining). + +flush_multiple(Prev, [Cur | Rest], WriterPid, SNA) -> + ExpNext = Prev+1, + case {SNA >= Cur, Cur} of + {true, ExpNext} -> + flush_multiple(Cur, Rest, WriterPid, SNA); + _ -> + flush_multiple(Prev, [], WriterPid, SNA), + [Cur | Rest] + end; +flush_multiple(Prev, [], WriterPid, _) -> + ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = Prev, + multiple = true}), + []. diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 4e0dad8422..664ef65399 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -31,8 +31,8 @@ -module(rabbit_invariable_queue). --export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2, - publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, +-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, + publish_delivered/4, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -99,14 +99,14 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, ok = persist_acks(QName, IsDurable, none, AckTags, PA), {Len, State #iv_state { len = 0, queue = queue:new() }}. -publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable, - len = Len }) -> +publish(Msg, _, State = #iv_state { queue = Q, qname = QName, durable = IsDurable, + len = Len }) -> ok = persist_message(QName, IsDurable, none, Msg), State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }. -publish_delivered(false, _Msg, State) -> +publish_delivered(false, _Msg, _, State) -> {blank_ack, State}; -publish_delivered(true, Msg = #basic_message { guid = Guid }, +publish_delivered(true, Msg = #basic_message { guid = Guid }, _, State = #iv_state { qname = QName, durable = IsDurable, len = 0, pending_ack = PA }) -> ok = persist_message(QName, IsDurable, none, Msg), diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index bbecbfe211..695b44250f 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -35,7 +35,8 @@ -export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, sync/3, client_init/2, client_terminate/2, - client_delete_and_terminate/3, successfully_recovered_state/1]). + client_delete_and_terminate/3, successfully_recovered_state/1, + register_sync_callback/3]). -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal @@ -82,7 +83,9 @@ cur_file_cache_ets, %% tid of current file cache table client_refs, %% set of references of all registered clients successfully_recovered, %% boolean: did we recover state? - file_size_limit %% how big are our files allowed to get? + file_size_limit, %% how big are our files allowed to get? + client_ondisk_callback, %% client ref to callback function mapping + cref_to_guids %% client ref to synced messages mapping }). -record(client_msstate, @@ -94,7 +97,8 @@ file_handles_ets, file_summary_ets, dedup_cache_ets, - cur_file_cache_ets + cur_file_cache_ets, + client_ref }). -record(file_summary, @@ -115,7 +119,8 @@ file_handles_ets :: ets:tid(), file_summary_ets :: ets:tid(), dedup_cache_ets :: ets:tid(), - cur_file_cache_ets :: ets:tid() }). + cur_file_cache_ets :: ets:tid(), + client_ref :: rabbit_guid:guid()}). -type(startup_fun_state() :: {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), A}). @@ -123,8 +128,9 @@ -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', startup_fun_state()) -> rabbit_types:ok_pid_or_error()). --spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) -> - rabbit_types:ok(client_msstate())). +-spec(write/4 :: (server(), rabbit_guid:guid(), + msg(), client_msstate()) + -> rabbit_types:ok(client_msstate())). -spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) -> {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). -spec(contains/2 :: (server(), rabbit_guid:guid()) -> boolean()). @@ -134,10 +140,10 @@ -spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). --spec(client_init/2 :: (server(), binary()) -> client_msstate()). +-spec(client_init/2 :: (server(), rabbit_guid:guid()) -> client_msstate()). -spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). -spec(client_delete_and_terminate/3 :: - (client_msstate(), server(), binary()) -> 'ok'). + (client_msstate(), server(), rabbit_guid:guid()) -> 'ok'). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). -spec(gc/3 :: (non_neg_integer(), non_neg_integer(), @@ -309,9 +315,10 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> [{timeout, infinity}]). write(Server, Guid, Msg, - CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> + CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, + client_ref = CRef }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - {gen_server2:cast(Server, {write, Guid}), CState}. + {gen_server2:cast(Server, {write, CRef, Guid}), CState}. read(Server, Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, @@ -365,11 +372,12 @@ client_init(Server, Ref) -> file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts }. + cur_file_cache_ets = CurFileCacheEts, + client_ref = Ref}. client_terminate(CState, Server) -> close_all_handles(CState), - ok = gen_server2:call(Server, client_terminate, infinity). + ok = gen_server2:call(Server, {client_terminate, CState}, infinity). client_delete_and_terminate(CState, Server, Ref) -> close_all_handles(CState), @@ -378,6 +386,9 @@ client_delete_and_terminate(CState, Server, Ref) -> successfully_recovered_state(Server) -> gen_server2:call(Server, successfully_recovered_state, infinity). +register_sync_callback(Server, ClientRef, Fun) -> + gen_server2:call(Server, {register_sync_callback, ClientRef, Fun}, infinity). + %%---------------------------------------------------------------------------- %% Client-side-only helpers %%---------------------------------------------------------------------------- @@ -553,7 +564,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> cur_file_cache_ets = CurFileCacheEts, client_refs = ClientRefs1, successfully_recovered = CleanShutdown, - file_size_limit = FileSizeLimit + file_size_limit = FileSizeLimit, + client_ondisk_callback = dict:new(), + cref_to_guids = dict:new() }, %% If we didn't recover the msg location index then we need to @@ -616,16 +629,28 @@ handle_call({new_client_state, CRef}, _From, handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); -handle_call(client_terminate, _From, State) -> - reply(ok, State). +handle_call({register_sync_callback, ClientRef, Fun}, _From, + State = #msstate { client_ondisk_callback = CODC }) -> + reply(ok, State #msstate { client_ondisk_callback = + dict:store(ClientRef, Fun, CODC) }); + +handle_call({client_terminate, #client_msstate { client_ref = CRef }}, + _From, + State = #msstate { client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> + reply(ok, State #msstate { client_ondisk_callback = dict:erase(CRef, CODC), + cref_to_guids = dict:erase(CRef, CTG) }). + +handle_cast({write, CRef, Guid}, + State = #msstate { current_file_handle = CurHdl, + current_file = CurFile, + sum_valid_data = SumValid, + sum_file_size = SumFileSize, + file_summary_ets = FileSummaryEts, + cur_file_cache_ets = CurFileCacheEts, + client_ondisk_callback = CODC, + cref_to_guids = CTG}) -> -handle_cast({write, Guid}, - State = #msstate { current_file_handle = CurHdl, - current_file = CurFile, - sum_valid_data = SumValid, - sum_file_size = SumFileSize, - file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), case index_lookup(Guid, State) of @@ -652,7 +677,12 @@ handle_cast({write, Guid}, maybe_roll_to_new_file( NextOffset, State #msstate { sum_valid_data = SumValid + TotalSize, - sum_file_size = SumFileSize + TotalSize })); + sum_file_size = SumFileSize + TotalSize, + cref_to_guids = + case dict:find(CRef, CODC) of + {ok, _} -> rabbit_misc:dict_cons(CRef, Guid, CTG); + error -> CTG + end})); #msg_location { ref_count = RefCount } -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC @@ -783,14 +813,19 @@ reply(Reply, State) -> {State1, Timeout} = next_state(State), {reply, Reply, State1, Timeout}. -next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) -> - {State, hibernate}; -next_state(State = #msstate { sync_timer_ref = undefined }) -> - {start_sync_timer(State), 0}; -next_state(State = #msstate { on_sync = [] }) -> - {stop_sync_timer(State), hibernate}; -next_state(State) -> - {State, 0}. +next_state(State = #msstate { sync_timer_ref = undefined, + on_sync = OS, + cref_to_guids = CTG }) -> + case {OS, dict:size(CTG)} of + {[], 0} -> {State, hibernate}; + _ -> {start_sync_timer(State), 0} + end; +next_state(State = #msstate { on_sync = OS, + cref_to_guids = CTG }) -> + case {OS, dict:size(CTG)} of + {[], 0} -> {stop_sync_timer(State), hibernate}; + _ -> {State, 0} + end. start_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]), @@ -803,14 +838,24 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> State #msstate { sync_timer_ref = undefined }. internal_sync(State = #msstate { current_file_handle = CurHdl, - on_sync = Syncs }) -> + on_sync = Syncs, + client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> State1 = stop_sync_timer(State), - case Syncs of - [] -> State1; - _ -> ok = file_handle_cache:sync(CurHdl), - lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), - State1 #msstate { on_sync = [] } - end. + State2 = case Syncs of + [] -> State1; + _ -> ok = file_handle_cache:sync(CurHdl), + lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), + State1 #msstate { on_sync = [] } + end, + dict:map(fun(CRef, Guids) -> + case dict:find(CRef, CODC) of + {ok, Fun} -> Fun(Guids); + error -> ok %% shouldn't happen + end + end, CTG), + State2 #msstate { cref_to_guids = dict:new() }. + read_message(Guid, From, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 0b98290ccd..d4b613ffe5 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/4, terminate/2, delete_and_terminate/1, publish/4, +-export([init/5, terminate/2, delete_and_terminate/1, publish/4, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). @@ -166,7 +166,7 @@ %%---------------------------------------------------------------------------- -record(qistate, { dir, segments, journal_handle, dirty_count, - max_journal_entries }). + max_journal_entries, on_sync, unsynced_guids }). -record(segment, { num, path, journal_entries, unacked }). @@ -189,15 +189,18 @@ segments :: 'undefined' | seg_dict(), journal_handle :: hdl(), dirty_count :: integer(), - max_journal_entries :: non_neg_integer() + max_journal_entries :: non_neg_integer(), + on_sync :: fun (([rabbit_guid:guid()]) -> ok), + unsynced_guids :: [rabbit_guid:guid()] }). -type(startup_fun_state() :: - {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), - A}). + {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}), + A}). --spec(init/4 :: (rabbit_amqqueue:name(), boolean(), boolean(), - fun ((rabbit_guid:guid()) -> boolean())) -> - {'undefined' | non_neg_integer(), [any()], qistate()}). +-spec(init/5 :: (rabbit_amqqueue:name(), boolean(), boolean(), + fun ((rabbit_guid:guid()) -> boolean()), + fun (([seq_id()]) -> ok)) + -> {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). -spec(publish/4 :: (rabbit_guid:guid(), seq_id(), boolean(), qistate()) -> @@ -222,25 +225,28 @@ %% public API %%---------------------------------------------------------------------------- -init(Name, false, _MsgStoreRecovered, _ContainsCheckFun) -> +init(Name, false, _MsgStoreRecovered, _ContainsCheckFun, OnSyncFun) -> State = #qistate { dir = Dir } = blank_state(Name), false = filelib:is_file(Dir), %% is_file == is file or dir - {0, [], State}; + {0, [], State #qistate { on_sync = OnSyncFun, + unsynced_guids = []}}; -init(Name, true, MsgStoreRecovered, ContainsCheckFun) -> +init(Name, true, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) -> State = #qistate { dir = Dir } = blank_state(Name), + State1 = State #qistate { on_sync = OnSyncFun, + unsynced_guids = [] }, Terms = case read_shutdown_terms(Dir) of {error, _} -> []; {ok, Terms1} -> Terms1 end, CleanShutdown = detect_clean_shutdown(Dir), - {Count, State1} = + {Count, State2} = case CleanShutdown andalso MsgStoreRecovered of true -> RecoveredCounts = proplists:get_value(segments, Terms, []), - init_clean(RecoveredCounts, State); - false -> init_dirty(CleanShutdown, ContainsCheckFun, State) + init_clean(RecoveredCounts, State1); + false -> init_dirty(CleanShutdown, ContainsCheckFun, State1) end, - {Count, Terms, State1}. + {Count, Terms, State2}. terminate(Terms, State) -> {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), @@ -260,7 +266,9 @@ publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]), - maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)). + State2 = State1 #qistate { unsynced_guids = + [Guid | State1#qistate.unsynced_guids] }, + maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State2)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -272,7 +280,8 @@ sync([], State) -> State; sync(_SeqIds, State = #qistate { journal_handle = undefined }) -> State; -sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> +sync(_SeqIds, State = #qistate { journal_handle = JournalHdl, + on_sync = OnSyncFun }) -> %% The SeqIds here contains the SeqId of every publish and ack in %% the transaction. Ideally we should go through these seqids and %% only sync the journal if the pubs or acks appear in the @@ -282,7 +291,8 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> %% seqids not being in the journal, provided the transaction isn't %% emptied (handled above anyway). ok = file_handle_cache:sync(JournalHdl), - State. + OnSyncFun(State#qistate.unsynced_guids), + State#qistate { unsynced_guids = [] }. flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). @@ -561,7 +571,9 @@ maybe_flush_journal(State = #qistate { dirty_count = DCount, maybe_flush_journal(State) -> State. -flush_journal(State = #qistate { segments = Segments }) -> +flush_journal(State = #qistate { segments = Segments, + on_sync = OnSyncFun, + unsynced_guids = UGs }) -> Segments1 = segment_fold( fun (#segment { unacked = 0, path = Path }, SegmentsN) -> @@ -576,7 +588,8 @@ flush_journal(State = #qistate { segments = Segments }) -> {JournalHdl, State1} = get_journal_handle(State #qistate { segments = Segments1 }), ok = file_handle_cache:clear(JournalHdl), - State1 #qistate { dirty_count = 0 }. + OnSyncFun(UGs), + State1 #qistate { dirty_count = 0, unsynced_guids = [] }. append_journal_to_segment(#segment { journal_entries = JEntries, path = Path } = Segment) -> diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index bd57f73726..e5ffe863a9 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -58,7 +58,8 @@ %%---------------------------------------------------------------------------- deliver(QPids, Delivery = #delivery{mandatory = false, - immediate = false}) -> + immediate = false, + msg_seq_no = MsgSeqNo}) -> %% optimisation: when Mandatory = false and Immediate = false, %% rabbit_amqqueue:deliver will deliver the message to the queue %% process asynchronously, and return true, which means all the @@ -68,9 +69,10 @@ deliver(QPids, Delivery = #delivery{mandatory = false, %% case below. delegate:invoke_no_result( QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), + maybe_inform_channel(MsgSeqNo, QPids), {routed, QPids}; -deliver(QPids, Delivery) -> +deliver(QPids, Delivery = #delivery{msg_seq_no = MsgSeqNo}) -> {Success, _} = delegate:invoke(QPids, fun (Pid) -> @@ -78,8 +80,14 @@ deliver(QPids, Delivery) -> end), {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Success), - check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - {Routed, Handled}). + case check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, + {Routed, Handled}) of + {routed, Qs} -> + maybe_inform_channel(MsgSeqNo, Qs), + {routed, Qs}; + O -> + O + end. %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same exchange @@ -117,3 +125,8 @@ fold_deliveries({_, false},{_, Handled}) -> {true, Handled}. check_delivery(true, _ , {false, []}) -> {unroutable, []}; check_delivery(_ , true, {_ , []}) -> {not_delivered, []}; check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. + +maybe_inform_channel(undefined, _) -> + ok; +maybe_inform_channel(MsgSeqNo, QPids) -> + gen_server2:cast(self(), {msg_sent_to_queues, MsgSeqNo, QPids}). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a72656b73b..b814390048 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -75,7 +75,6 @@ all_tests() -> passed = maybe_run_cluster_dependent_tests(), passed. - maybe_run_cluster_dependent_tests() -> SecondaryNode = rabbit_misc:makenode("hare"), @@ -1608,6 +1607,9 @@ init_test_queue() -> test_queue(), true, false, fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) + end, + fun (_) -> + ok %% Sync! end). restart_test_queue(Qi) -> @@ -1790,7 +1792,8 @@ variable_queue_publish(IsPersistent, Count, VQ) -> <<>>, #'P_basic'{delivery_mode = case IsPersistent of true -> 2; false -> 1 - end}, <<>>), VQN) + end}, <<>>), + false, VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -1810,7 +1813,8 @@ assert_props(List, PropVals) -> with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_queue(), true, false), + VQ = rabbit_variable_queue:init(test_queue(), true, false, + fun nop/1, fun nop/1), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -1922,7 +1926,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + fun nop/1, fun nop/1), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -1938,7 +1943,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + fun nop/1, fun nop/1), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -1968,10 +1974,13 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = rabbit_variable_queue:init(QName, true, true), + VQ1 = rabbit_variable_queue:init(QName, true, true, + fun nop/1, fun nop/1), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), rabbit_amqqueue:internal_delete(QName) end), passed. + +nop(_) -> ok. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 30d3a8aec1..9256b8ac89 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,13 +31,13 @@ -module(rabbit_variable_queue). --export([init/3, terminate/1, delete_and_terminate/1, - purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, +-export([init/5, init/3, terminate/1, delete_and_terminate/1, + purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1]). + status/1, seqids_to_guids/2]). -export([start/1, stop/0]). @@ -236,8 +236,11 @@ ram_index_count, out_counter, in_counter, - rates - }). + rates, + msgs_on_disk, + msg_indices_on_disk, + need_confirming + }). -record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). @@ -322,7 +325,10 @@ ram_index_count :: non_neg_integer(), out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), - rates :: rates() }). + rates :: rates(), + msgs_on_disk :: gb_set(), + msg_indices_on_disk :: gb_set(), + need_confirming :: gb_set()}). -include("rabbit_backing_queue_spec.hrl"). @@ -369,13 +375,23 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). init(QueueName, IsDurable, Recover) -> + Self = self(), + init(QueueName, IsDurable, Recover, + fun (Guids) -> + msgs_written_to_disk(Self, Guids) + end, + fun msg_indices_written_to_disk/1). + +init(QueueName, IsDurable, Recover, + MsgOnDiskFun, MsgIdxOnDiskFun) -> {DeltaCount, Terms, IndexState} = rabbit_queue_index:init( QueueName, Recover, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) - end), + end, + MsgIdxOnDiskFun), {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), {PRef, TRef, Terms1} = @@ -398,6 +414,12 @@ init(QueueName, IsDurable, Recover) -> true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef); false -> undefined end, + + rabbit_msg_store:register_sync_callback( + ?PERSISTENT_MSG_STORE, + PRef, + MsgOnDiskFun), + TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), State = #vqstate { q1 = queue:new(), @@ -428,7 +450,10 @@ init(QueueName, IsDurable, Recover) -> ingress = {Now, DeltaCount1}, avg_egress = 0.0, avg_ingress = 0.0, - timestamp = Now } }, + timestamp = Now }, + msgs_on_disk = gb_sets:new(), + msg_indices_on_disk = gb_sets:new(), + need_confirming = gb_sets:new()}, a(maybe_deltas_to_betas(State)). terminate(State) -> @@ -490,13 +515,15 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> ram_index_count = 0, persistent_count = 0 })}. -publish(Msg, State) -> - {_SeqId, State1} = publish(Msg, false, false, State), +publish(Msg, NeedsConfirming, State) -> + {_SeqId, State1} = publish(Msg, false, false, NeedsConfirming, State), a(reduce_memory_use(State1)). -publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> +publish_delivered(false, _Msg, _NC, State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; -publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, +publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, + guid = Guid }, + NeedsConfirming, State = #vqstate { len = 0, next_seq_id = SeqId, out_counter = OutCount, @@ -510,11 +537,17 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), PA1 = record_pending_ack(m(MsgStatus1), PA), PCount1 = PCount + one_if(IsPersistent1), - {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - pending_ack = PA1 })}. + {SeqId, a(State1 #vqstate { + next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + pending_ack = PA1, + need_confirming = + case NeedsConfirming of + true -> gb_sets:insert(Guid, State1#vqstate.need_confirming); + false -> State1#vqstate.need_confirming + end })}. fetch(AckRequired, State = #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, @@ -628,14 +661,14 @@ requeue(AckTags, State) -> a(reduce_memory_use( ack(fun rabbit_msg_store:release/2, fun (#msg_status { msg = Msg }, State1) -> - {_SeqId, State2} = publish(Msg, true, false, State1), + {_SeqId, State2} = publish(Msg, true, false, false, State1), State2; ({IsPersistent, Guid}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, {{ok, Msg = #basic_message{}}, MSCState1} = read_from_msg_store(MSCState, IsPersistent, Guid), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, true, true, State2), + {_SeqId, State3} = publish(Msg, true, true, false, State2), State3 end, AckTags, State))). @@ -739,6 +772,15 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, {avg_egress_rate , AvgEgressRate}, {avg_ingress_rate , AvgIngressRate} ]. +seqids_to_guids(SeqIds, #vqstate{ pending_ack = PA }) -> + lists:foldl(fun(SeqId, Guids) -> + {ok, AckEntry} = dict:find(SeqId, PA), + [case AckEntry of + #msg_status { msg = Msg } -> Msg#basic_message.guid; + {_, Guid} -> Guid + end | Guids] + end, [], SeqIds). + %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- @@ -949,7 +991,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { fun (Msg = #basic_message { is_persistent = IsPersistent }, {SeqIdsAcc, State2}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = publish(Msg, false, IsPersistent1, State2), + {SeqId, State3} = publish(Msg, false, IsPersistent1, false, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), @@ -995,8 +1037,9 @@ remove_queue_entries1( %% Internal gubbins for publishing %%---------------------------------------------------------------------------- -publish(Msg = #basic_message { is_persistent = IsPersistent }, - IsDelivered, MsgOnDisk, +publish(Msg = #basic_message { is_persistent = IsPersistent, + guid = Guid }, + IsDelivered, MsgOnDisk, NeedsConfirming, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, len = Len, @@ -1013,11 +1056,17 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), - {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - ram_msg_count = RamMsgCount + 1}}. + {SeqId, State2 #vqstate { + next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + ram_msg_count = RamMsgCount + 1, + need_confirming = + case NeedsConfirming of + true -> gb_sets:add(Guid, State2#vqstate.need_confirming); + false -> State2#vqstate.need_confirming + end }}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, MSCState) -> @@ -1117,11 +1166,12 @@ ack(MsgStoreFun, Fun, AckTags, State) -> ok = orddict:fold(fun (MsgStore, Guids, ok) -> MsgStoreFun(MsgStore, Guids) end, ok, GuidsByStore), + State2 = msgs_confirmed(seqids_to_guids(AckTags, State), State1), PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of error -> 0; {ok, Guids} -> length(Guids) end, - State1 #vqstate { index_state = IndexState1, + State2 #vqstate { index_state = IndexState1, persistent_count = PCount1 }. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS @@ -1132,6 +1182,65 @@ accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. + +%%---------------------------------------------------------------------------- +%% Internal plumbing for confirms (aka publisher acks) +%%---------------------------------------------------------------------------- + +msgs_confirmed(Guids, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + need_confirming = NC }) -> + GuidSet = gb_sets:from_list(Guids), + State #vqstate { + msgs_on_disk = + gb_sets:difference(MOD, GuidSet), + msg_indices_on_disk = + gb_sets:difference(MIOD, GuidSet), + need_confirming = + gb_sets:difference(NC, GuidSet) }. + +msgs_written_to_disk(QPid, Guids) -> + spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( + QPid, + fun(State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + need_confirming = NC }) -> + GuidSet = gb_sets:from_list(Guids), + ToConfirmMsgs = gb_sets:intersection(GuidSet, MIOD), + MOD1 = gb_sets:intersection(gb_sets:union(MOD, GuidSet), NC), + { State #vqstate { + msgs_on_disk = + gb_sets:difference(MOD1, ToConfirmMsgs), + msg_indices_on_disk = + gb_sets:difference(MIOD, ToConfirmMsgs), + need_confirming = + gb_sets:difference(NC, ToConfirmMsgs) }, + {confirm, gb_sets:to_list(ToConfirmMsgs)} } + end) + end). + +msg_indices_written_to_disk(Guids) -> + Self = self(), + spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( + Self, + fun(State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + need_confirming = NC }) -> + GuidSet = gb_sets:from_list(Guids), + ToConfirmMsgs = gb_sets:intersection(GuidSet, MOD), + MIOD1 = + gb_sets:intersection(gb_sets:union(MIOD, GuidSet), NC), + { State #vqstate { + msgs_on_disk = + gb_sets:difference(MOD, ToConfirmMsgs), + msg_indices_on_disk = + gb_sets:difference(MIOD1, ToConfirmMsgs), + need_confirming = + gb_sets:difference(NC, ToConfirmMsgs) }, + {confirm, gb_sets:to_list(ToConfirmMsgs)} } + end) + end). + %%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- |
