diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-16 11:14:08 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-16 11:14:08 +0100 |
| commit | e6ab220d0f26546730ce34d126e010028c8d2320 (patch) | |
| tree | b947fc534a57ab75e037959c204eed5abd882adc | |
| parent | 183085e1c386bac6342302a92ea2cf865ebdb8fc (diff) | |
| download | rabbitmq-server-git-e6ab220d0f26546730ce34d126e010028c8d2320.tar.gz | |
Thorough reworking of API - the BQ is now responsible for hanging onto unacked msgs and all details of transactions
| -rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 21 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 356 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 660 |
9 files changed, 526 insertions, 559 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 3a0f701b85..d86a538288 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -29,22 +29,25 @@ %% Contributor(s): ______________________________________. %% +-type(fetch_result() :: + %% Message, IsDelivered, AckTag, Remaining_Len + ('empty'|{basic_message(), boolean(), ack(), non_neg_integer()})). + -spec(start/1 :: ([queue_name()]) -> 'ok'). -spec(init/2 :: (queue_name(), boolean()) -> state()). -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {non_neg_integer(), state()}). -spec(publish/2 :: (basic_message(), state()) -> state()). --spec(publish_delivered/2 :: (basic_message(), state()) -> {ack(), state()}). --spec(fetch/1 :: (state()) -> - {('empty'|{basic_message(), boolean(), ack(), non_neg_integer()}), - state()}). +-spec(publish_delivered/3 :: + (boolean(), basic_message(), state()) -> {ack(), state()}). +-spec(fetch/2 :: (boolean(), state()) -> {fetch_result(), state()}). -spec(ack/2 :: ([ack()], state()) -> state()). --spec(tx_publish/2 :: (basic_message(), state()) -> state()). --spec(tx_rollback/2 :: ([guid()], state()) -> state()). --spec(tx_commit/4 :: ([basic_message()], [ack()], {pid(), any()}, state()) -> - {boolean(), state()}). --spec(requeue/2 :: ([{basic_message(), ack()}], state()) -> state()). +-spec(tx_publish/3 :: (txn(), basic_message(), state()) -> state()). +-spec(tx_ack/3 :: (txn(), [ack()], state()) -> state()). +-spec(tx_rollback/2 :: (txn(), state()) -> {[ack()], state()}). +-spec(tx_commit/3 :: (txn(), {pid(), any()}, state()) -> {[ack()], state()}). +-spec(requeue/2 :: ([ack()], state()) -> state()). -spec(len/1 :: (state()) -> non_neg_integer()). -spec(is_empty/1 :: (state()) -> boolean()). -spec(set_ram_duration_target/2 :: diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6125fddcc2..cc6f08b7d4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -43,7 +43,7 @@ -export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2, unblock/2, maybe_run_queue_via_backing_queue/2, flush_all/2]). --export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). +-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -import(mnesia). @@ -92,8 +92,8 @@ -spec(deliver/2 :: (pid(), delivery()) -> boolean()). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). --spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). --spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). +-spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). +-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). @@ -107,8 +107,7 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(maybe_run_queue_via_backing_queue/2 :: - (pid(), (fun ((A) -> {boolean(), A}))) -> 'ok'). +-spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok'). -spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). @@ -298,16 +297,16 @@ requeue(QPid, MsgIds, ChPid) -> ack(QPid, Txn, MsgIds, ChPid) -> gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). -commit_all(QPids, Txn) -> +commit_all(QPids, Txn, ChPid) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end, + fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end, QPids). -rollback_all(QPids, Txn) -> +rollback_all(QPids, Txn, ChPid) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end, + fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end, QPids). notify_down_all(QPids, ChPid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9697cc1347..efbc276692 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -56,7 +56,6 @@ backing_queue, backing_queue_state, backing_queue_timeout_fun, - next_msg_id, active_consumers, blocked_consumers, sync_timer_ref, @@ -65,8 +64,6 @@ -record(consumer, {tag, ack_required}). --record(tx, {ch_pid, pending_messages, pending_acks}). - %% These are held in our process dictionary -record(cr, {consumer_count, ch_pid, @@ -88,9 +85,7 @@ exclusive_consumer_tag, messages_ready, messages_unacknowledged, - messages_uncommitted, messages, - acks_uncommitted, consumers, transactions, memory, @@ -122,7 +117,6 @@ init([Q, InitBQ]) -> backing_queue = BQ, backing_queue_state = maybe_init_backing_queue(InitBQ, BQ, Q), backing_queue_timeout_fun = undefined, - next_msg_id = 1, active_consumers = queue:new(), blocked_consumers = queue:new(), sync_timer_ref = undefined, @@ -135,49 +129,39 @@ maybe_init_backing_queue( maybe_init_backing_queue(false, _BQ, _Q) -> undefined. -terminate(shutdown, #q{backing_queue_state = BQS, - backing_queue = BQ}) -> - ok = rabbit_memory_monitor:deregister(self()), - case BQS of - undefined -> ok; - _ -> BQ:terminate(BQS) - end; -terminate({shutdown, _}, #q{backing_queue_state = BQS, - backing_queue = BQ}) -> - ok = rabbit_memory_monitor:deregister(self()), - case BQS of - undefined -> ok; - _ -> BQ:terminate(BQS) - end; -terminate(_Reason, State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> +terminate(shutdown, State) -> + terminate_shutdown(terminate, State); +terminate({shutdown, _}, State) -> + terminate_shutdown(terminate, State); +terminate(_Reason, State) -> ok = rabbit_memory_monitor:deregister(self()), %% FIXME: How do we cancel active subscriptions? %% Ensure that any persisted tx messages are removed. %% TODO: wait for all in flight tx_commits to complete - case BQS of - undefined -> - ok; - _ -> - BQS1 = BQ:tx_rollback( - lists:concat([PM || #tx { pending_messages = PM } <- - all_tx_record()]), BQS), - %% Delete from disk first. If we crash at this point, when - %% a durable queue, we will be recreated at startup, - %% possibly with partial content. The alternative is much - %% worse however - if we called internal_delete first, we - %% would then have a race between the disk delete and a - %% new queue with the same name being created and - %% published to. - BQ:delete_and_terminate(BQS1) - end, - ok = rabbit_amqqueue:internal_delete(qname(State)). + State1 = terminate_shutdown(delete_and_terminate, State), + ok = rabbit_amqqueue:internal_delete(qname(State1)). code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- +terminate_shutdown(Fun, State = + #q{backing_queue = BQ, backing_queue_state = BQS}) -> + ok = rabbit_memory_monitor:deregister(self()), + case BQS of + undefined -> State; + _ -> BQS1 = lists:foldl( + fun (#cr{txn = none}, BQSN) -> + BQSN; + (#cr{txn = Txn}, BQSN) -> + {_AckTags, BQSN1} = + BQ:tx_rollback(Txn, BQSN), + BQSN1 + end, BQS, all_ch_record()), + State#q{backing_queue_state = BQ:Fun(BQS1)} + end. + reply(Reply, NewState) -> assert_invariant(NewState), {NewState1, Timeout} = next_state(NewState), @@ -248,7 +232,7 @@ ch_record(ChPid) -> C = #cr{consumer_count = 0, ch_pid = ChPid, monitor_ref = MonitorRef, - unacked_messages = dict:new(), + unacked_messages = [], is_limit_active = false, txn = none, unsent_message_count = 0}, @@ -282,8 +266,7 @@ record_current_channel_tx(ChPid, Txn) -> deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, State = #q{q = #amqqueue{name = QName}, active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers, - next_msg_id = NextId}) -> + blocked_consumers = BlockedConsumers}) -> case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, @@ -299,12 +282,11 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, DeliverFun(AckRequired, FunAcc, State), rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, IsDelivered, Message}), - NewUAM = - case AckRequired of - true -> dict:store(NextId, {Message, AckTag}, UAM); - false -> UAM - end, + {QName, self(), AckTag, IsDelivered, Message}), + NewUAM = case AckRequired of + true -> [AckTag|UAM]; + false -> UAM + end, NewC = C#cr{unsent_message_count = Count + 1, unacked_messages = NewUAM}, store_ch_record(NewC), @@ -322,8 +304,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, end, State2 = State1#q{ active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers, - next_msg_id = NextId + 1}, + blocked_consumers = NewBlockedConsumers}, deliver_msgs_to_consumers(Funs, FunAcc1, State2); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> @@ -344,50 +325,39 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {FunAcc, State} end. -deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) -> +deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty. -deliver_from_queue_deliver(AckRequired, {false, AutoAcks}, + +deliver_from_queue_deliver(AckRequired, false, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> - {{Message, IsDelivered, AckTag, Remaining}, BQS1} = BQ:fetch(BQS), - AutoAcks1 = case AckRequired of - true -> AutoAcks; - false -> [AckTag | AutoAcks] - end, - {{Message, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1}, + {{Message, IsDelivered, AckTag, Remaining}, BQS1} = + BQ:fetch(AckRequired, BQS), + {{Message, IsDelivered, AckTag}, 0 == Remaining, State #q { backing_queue_state = BQS1 }}. run_message_queue(State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> - Funs = { fun deliver_from_queue_pred/2, - fun deliver_from_queue_deliver/3 }, + Funs = {fun deliver_from_queue_pred/2, + fun deliver_from_queue_deliver/3}, IsEmpty = BQ:is_empty(BQS), - {{_IsEmpty1, AutoAcks}, State1} = - deliver_msgs_to_consumers(Funs, {IsEmpty, []}, State), - BQS1 = BQ:ack(AutoAcks, State1 #q.backing_queue_state), - State1 #q { backing_queue_state = BQS1 }. + {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), + State1. attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = - fun (AckRequired, false, State1) -> - {AckTag, State2} = - case AckRequired of - true -> - {AckTag1, BQS} = - BQ:publish_delivered( - Message, State1 #q.backing_queue_state), - {AckTag1, State1 #q { backing_queue_state = BQS }}; - false -> - {noack, State1} - end, - {{Message, false, AckTag}, true, State2} + fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> + {AckTag, BQS1} = + BQ:publish_delivered(AckRequired, Message, BQS), + {{Message, false, AckTag}, true, + State1#q{backing_queue_state = BQS1}} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> - BQS = BQ:tx_publish(Message, State #q.backing_queue_state), - record_pending_message(Txn, ChPid, Message), - {true, State #q { backing_queue_state = BQS }}. +attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + record_current_channel_tx(ChPid, Txn), + {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of @@ -396,49 +366,22 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {false, NewState} -> %% Txn is none and no unblocked channels with consumers BQS = BQ:publish(Message, State #q.backing_queue_state), - {false, NewState #q { backing_queue_state = BQS }} + {false, NewState#q{backing_queue_state = BQS}} end. -%% all these messages have already been delivered at least once and -%% not ack'd, but need to be either redelivered or requeued -deliver_or_requeue_n([], State) -> - State; -deliver_or_requeue_n(MsgsWithAcks, State = #q{backing_queue = BQ}) -> - Funs = { fun deliver_or_requeue_msgs_pred/2, - fun deliver_or_requeue_msgs_deliver/3 }, - {{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} = - deliver_msgs_to_consumers( - Funs, {length(MsgsWithAcks), [], MsgsWithAcks}, State), - BQS = BQ:ack(AutoAcks, NewState #q.backing_queue_state), - case OutstandingMsgs of - [] -> NewState #q { backing_queue_state = BQS }; - _ -> BQS1 = BQ:requeue(OutstandingMsgs, BQS), - NewState #q { backing_queue_state = BQS1 } - end. - -deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) -> - 0 < Len. -deliver_or_requeue_msgs_deliver( - false, {Len, AcksAcc, [{Message, AckTag} | MsgsWithAcks]}, State) -> - {{Message, true, noack}, {Len - 1, [AckTag | AcksAcc], MsgsWithAcks}, - State}; -deliver_or_requeue_msgs_deliver( - true, {Len, AcksAcc, [{Message, AckTag} | MsgsWithAcks]}, State) -> - {{Message, true, AckTag}, {Len - 1, AcksAcc, MsgsWithAcks}, State}. +requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> + maybe_run_queue_via_backing_queue( + fun (BQS) -> BQ:requeue(AckTags, BQS) end, State). add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). remove_consumer(ChPid, ConsumerTag, Queue) -> - %% TODO: replace this with queue:filter/2 once we move to R12 - queue:from_list(lists:filter( - fun ({CP, #consumer{tag = CT}}) -> - (CP /= ChPid) or (CT /= ConsumerTag) - end, queue:to_list(Queue))). + queue:filter(fun ({CP, #consumer{tag = CT}}) -> + (CP /= ChPid) or (CT /= ConsumerTag) + end, Queue). remove_consumers(ChPid, Queue) -> - %% TODO: replace this with queue:filter/2 once we move to R12 - queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end, - queue:to_list(Queue))). + queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue). move_consumers(ChPid, From, To) -> {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end, @@ -489,12 +432,10 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> true -> {stop, State1}; false -> State2 = case Txn of none -> State1; - _ -> rollback_transaction(Txn, State1) + _ -> rollback_transaction(Txn, ChPid, + State1) end, - {ok, deliver_or_requeue_n( - [MsgWithAck || - {_MsgId, MsgWithAck} <- dict:to_list(UAM)], - State2)} + {ok, requeue_and_run(UAM, State2)} end end. @@ -526,72 +467,34 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - {RunQueue, BQS1} = Fun(BQS), - State1 = State#q{backing_queue_state = BQS1}, - case RunQueue of - true -> run_message_queue(State1); - false -> State1 - end. + run_message_queue(State#q{backing_queue_state = Fun(BQS)}). -lookup_tx(Txn) -> - case get({txn, Txn}) of - undefined -> #tx{ch_pid = none, - pending_messages = [], - pending_acks = []}; - V -> V - end. - -store_tx(Txn, Tx) -> - put({txn, Txn}, Tx). - -erase_tx(Txn) -> - erase({txn, Txn}). - -all_tx_record() -> - [T || {{txn, _}, T} <- get()]. +commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {AckTags, BQS1} = BQ:tx_commit(Txn, From, BQS), + case lookup_ch(ChPid) of + not_found -> + []; + C = #cr{unacked_messages = UAM} -> + Remaining = ordsets:to_list(ordsets:subtract( + ordsets:from_list(UAM), + ordsets:from_list(AckTags))), + store_ch_record(C#cr{unacked_messages = Remaining, txn = none}) + end, + State#q{backing_queue_state = BQS1}. -record_pending_message(Txn, ChPid, Message) -> - Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), - record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_messages = [Message | Pending], - ch_pid = ChPid}). +rollback_transaction(Txn, _ChPid, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), + %% Iff we removed acktags from the channel record on ack+txn then + %% we would add them back in here (would also require ChPid) + State#q{backing_queue_state = BQS1}. -record_pending_acks(Txn, ChPid, MsgIds) -> - Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), - record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], - ch_pid = ChPid}). - -commit_transaction(Txn, From, State = #q{backing_queue = BQ}) -> - #tx{ch_pid = ChPid, pending_messages = PendingMessages, - pending_acks = PendingAcks} = lookup_tx(Txn), - PendingMessagesOrdered = lists:reverse(PendingMessages), - PendingAcksOrdered = lists:append(PendingAcks), - Acks = - case lookup_ch(ChPid) of - not_found -> - []; - C = #cr{unacked_messages = UAM} -> - {MsgsWithAcks, Remaining} = - collect_messages(PendingAcksOrdered, UAM), - store_ch_record(C#cr{unacked_messages = Remaining}), - [AckTag || {_Message, AckTag} <- MsgsWithAcks] - end, - {RunQueue, BQS} = BQ:tx_commit(PendingMessagesOrdered, Acks, From, - State#q.backing_queue_state), - erase_tx(Txn), - {RunQueue, State#q{backing_queue_state = BQS}}. - -rollback_transaction(Txn, State = #q{backing_queue = BQ}) -> - #tx{pending_messages = PendingMessages} = lookup_tx(Txn), - BQS = BQ:tx_rollback(PendingMessages, State #q.backing_queue_state), - erase_tx(Txn), - State#q{backing_queue_state = BQS}. - -collect_messages(MsgIds, UAM) -> - lists:mapfoldl( - fun (MsgId, D) -> {dict:fetch(MsgId, D), dict:erase(MsgId, D)} end, - UAM, MsgIds). +collect_messages(AckTags, UAM) -> + AckTagsSet = ordsets:from_list(AckTags), + UAMSet = ordsets:from_list(UAM), + {ordsets:to_list(ordsets:intersection(AckTagsSet, UAMSet)), + ordsets:to_list(ordsets:subtract(UAMSet, AckTagsSet))}. infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -616,22 +519,15 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) -> i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:len(BQS); i(messages_unacknowledged, _) -> - lists:sum([dict:size(UAM) || + lists:sum([ordsets:size(UAM) || #cr{unacked_messages = UAM} <- all_ch_record()]); -i(messages_uncommitted, _) -> - lists:sum([length(Pending) || - #tx{pending_messages = Pending} <- all_tx_record()]); i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, - messages_unacknowledged, - messages_uncommitted]]); -i(acks_uncommitted, _) -> - lists:sum([length(Pending) || - #tx{pending_acks = Pending} <- all_tx_record()]); + messages_unacknowledged]]); i(consumers, State) -> queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers); i(transactions, _) -> - length(all_tx_record()); + length([ok || #cr{txn = Txn} <- all_ch_record(), Txn =/= none]); i(memory, _) -> {memory, M} = process_info(self(), memory), M; @@ -685,12 +581,9 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) -> {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), reply(Delivered, NewState); -handle_call({commit, Txn}, From, State) -> - {RunQueue, NewState} = commit_transaction(Txn, From, State), - noreply(case RunQueue of - true -> run_message_queue(NewState); - false -> NewState - end); +handle_call({commit, Txn, ChPid}, From, State) -> + NewState = commit_transaction(Txn, From, ChPid, State), + noreply(run_message_queue(NewState)); handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues @@ -704,25 +597,19 @@ handle_call({notify_down, ChPid}, _From, State) -> end; handle_call({basic_get, ChPid, NoAck}, _From, - State = #q{q = #amqqueue{name = QName}, next_msg_id = NextId, + State = #q{q = #amqqueue{name = QName}, backing_queue_state = BQS, backing_queue = BQ}) -> - case BQ:fetch(BQS) of - {empty, BQS1} -> reply(empty, State #q { backing_queue_state = BQS1 }); + AckRequired = not NoAck, + case BQ:fetch(AckRequired, BQS) of + {empty, BQS1} -> reply(empty, State#q{backing_queue_state = BQS1}); {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> - AckRequired = not(NoAck), - BQS2 = - case AckRequired of - true -> - C = #cr{unacked_messages = UAM} = ch_record(ChPid), - NewUAM = dict:store(NextId, {Message, AckTag}, UAM), - store_ch_record(C#cr{unacked_messages = NewUAM}), - BQS1; - false -> - BQ:ack([AckTag], BQS1) - end, - Msg = {QName, self(), NextId, IsDelivered, Message}, - reply({ok, Remaining, Msg}, - State #q { next_msg_id = NextId + 1, backing_queue_state = BQS2 }) + case AckRequired of + true -> C = #cr{unacked_messages = UAM} = ch_record(ChPid), + store_ch_record(C#cr{unacked_messages = [AckTag|UAM]}); + false -> ok + end, + reply({ok, Remaining, {QName, self(), AckTag, IsDelivered, Message}}, + State#q{backing_queue_state = BQS1}) end; handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, @@ -740,7 +627,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ok -> C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, - ack_required = not(NoAck)}, + ack_required = not NoAck}, store_ch_record(C#cr{consumer_count = ConsumerCount +1, limiter_pid = LimiterPid}), case ConsumerCount of @@ -862,37 +749,36 @@ handle_cast({deliver, Txn, Message, ChPid}, State) -> {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), noreply(NewState); -handle_cast({ack, Txn, MsgIds, ChPid}, State = #q{backing_queue = BQ}) -> +handle_cast({ack, Txn, AckTags, ChPid}, State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> case lookup_ch(ChPid) of not_found -> noreply(State); C = #cr{unacked_messages = UAM} -> - case Txn of - none -> - {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM), - BQS = BQ:ack([AckTag || {_Message, AckTag} <- MsgWithAcks], - State #q.backing_queue_state), - store_ch_record(C#cr{unacked_messages = Remaining}), - noreply(State #q { backing_queue_state = BQS }); - _ -> - record_pending_acks(Txn, ChPid, MsgIds), - noreply(State) - end + {AckTags1, Remaining} = collect_messages(AckTags, UAM), + {C1, BQS1} = + case Txn of + none -> {C#cr{unacked_messages = Remaining}, + BQ:ack(AckTags1, BQS)}; + _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags1, BQS)} + end, + store_ch_record(C1), + noreply(State #q { backing_queue_state = BQS1 }) end; -handle_cast({rollback, Txn}, State) -> - noreply(rollback_transaction(Txn, State)); +handle_cast({rollback, Txn, ChPid}, State) -> + noreply(rollback_transaction(Txn, ChPid, State)); -handle_cast({requeue, MsgIds, ChPid}, State) -> +handle_cast({requeue, AckTags, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", [ChPid]), noreply(State); C = #cr{unacked_messages = UAM} -> - {MsgWithAcks, NewUAM} = collect_messages(MsgIds, UAM), - store_ch_record(C#cr{unacked_messages = NewUAM}), - noreply(deliver_or_requeue_n(MsgWithAcks, State)) + {AckTags1, Remaining} = collect_messages(AckTags, UAM), + store_ch_record(C#cr{unacked_messages = Remaining}), + noreply(requeue_and_run(AckTags1, State)) end; handle_cast({unblock, ChPid}, State) -> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index cc6fda55fb..8e7de95e1b 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -63,17 +63,20 @@ behaviour_info(callbacks) -> %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). - {publish_delivered, 2}, + {publish_delivered, 3}, %% Produce the next message - {fetch, 1}, + {fetch, 2}, %% Acktags supplied are for messages which can now be forgotten %% about {ack, 2}, %% A publish, but in the context of a transaction. - {tx_publish, 2}, + {tx_publish, 3}, + + %% Acks, but in the context of a transaction. + {tx_ack, 3}, %% Undo anything which has been done by the tx_publish of the %% indicated messages. @@ -81,7 +84,7 @@ behaviour_info(callbacks) -> %% Commit these publishes and acktags. The publishes you will %% have previously seen in calls to tx_publish. - {tx_commit, 4}, + {tx_commit, 3}, %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9aeb4623f1..7d3cd7225d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -928,7 +928,7 @@ new_tx(State) -> internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> case rabbit_amqqueue:commit_all(sets:to_list(Participants), - TxnKey) of + TxnKey, self()) of ok -> ok = notify_limiter(State#ch.limiter_pid, State#ch.uncommitted_ack_q), new_tx(State); @@ -945,7 +945,7 @@ internal_rollback(State = #ch{transaction_id = TxnKey, queue:len(UAQ), queue:len(UAMQ)]), case rabbit_amqqueue:rollback_all(sets:to_list(Participants), - TxnKey) of + TxnKey, self()) of ok -> NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}); {error, Errors} -> rabbit_misc:protocol_error( diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 4ac4a16edc..74fa098039 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -338,10 +338,12 @@ read(Server, Guid, CState = end. contains(Server, Guid) -> gen_server2:call(Server, {contains, Guid}, infinity). +remove(_Server, []) -> ok; remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}). +release(_Server, []) -> ok; release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}). sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}). -sync(Server) -> gen_server2:pcast(Server, 8, sync). %% internal +sync(Server) -> gen_server2:pcast(Server, 8, sync). %% internal gc_done(Server, Reclaimed, Source, Destination) -> gen_server2:pcast(Server, 8, {gc_done, Reclaimed, Source, Destination}). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index f678433913..d6ef0cb8b9 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -344,6 +344,8 @@ write_delivered(SeqId, State) -> JournalHdl, <<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>), maybe_flush_journal(add_to_journal(SeqId, del, State1)). +write_acks([], State) -> + State; write_acks(SeqIds, State) -> {JournalHdl, State1} = get_journal_handle(State), ok = file_handle_cache:append( diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index c772271f01..4bef843596 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1362,7 +1362,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> Rem = Len - N, {{#basic_message { is_persistent = IsPersistent }, IsDelivered, AckTagN, Rem}, VQM} = - rabbit_variable_queue:fetch(VQN), + rabbit_variable_queue:fetch(true, VQN), {VQM, [AckTagN | AckTagsAcc]} end, {VQ, []}, lists:seq(1, Count)). @@ -1399,7 +1399,7 @@ test_variable_queue_dynamic_duration_change() -> VQ3 = test_variable_queue_dynamic_duration_change_f(Len1, VQ2), {VQ4, AckTags} = variable_queue_fetch(Len1, false, false, Len1, VQ3), VQ5 = rabbit_variable_queue:ack(AckTags, VQ4), - {empty, VQ6} = rabbit_variable_queue:fetch(VQ5), + {empty, VQ6} = rabbit_variable_queue:fetch(true, VQ5), %% just publish and fetch some persistent msgs, this hits the the %% partial segment path in queue_index due to the period when @@ -1408,7 +1408,7 @@ test_variable_queue_dynamic_duration_change() -> {VQ8, AckTags1} = variable_queue_fetch(20, true, false, 20, VQ7), VQ9 = rabbit_variable_queue:ack(AckTags1, VQ8), VQ10 = rabbit_variable_queue:handle_pre_hibernate(VQ9), - {empty, VQ11} = rabbit_variable_queue:fetch(VQ10), + {empty, VQ11} = rabbit_variable_queue:fetch(true, VQ10), rabbit_variable_queue:terminate(VQ11), @@ -1416,7 +1416,7 @@ test_variable_queue_dynamic_duration_change() -> test_variable_queue_dynamic_duration_change_f(Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), - {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(VQ1), + {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), VQ3 = rabbit_variable_queue:ack([AckTag], VQ2), receive {duration, _, stop} -> @@ -1475,7 +1475,7 @@ test_variable_queue_partial_segments_delta_thing() -> HalfSegment + 1, VQ6), VQ8 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ7), %% should be empty now - {empty, VQ9} = rabbit_variable_queue:fetch(VQ8), + {empty, VQ9} = rabbit_variable_queue:fetch(true, VQ8), rabbit_variable_queue:terminate(VQ9), passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 60e50800d4..90e1eb6c2a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,11 +31,11 @@ -module(rabbit_variable_queue). --export([init/2, terminate/1, publish/2, publish_delivered/2, - set_ram_duration_target/2, ram_duration/1, - fetch/1, ack/2, len/1, is_empty/1, purge/1, - delete_and_terminate/1, requeue/2, tx_publish/2, tx_rollback/2, - tx_commit/4, sync_callback/1, handle_pre_hibernate/1, status/1]). +-export([init/2, terminate/1, publish/2, publish_delivered/3, + set_ram_duration_target/2, ram_duration/1, fetch/2, ack/2, len/1, + is_empty/1, purge/1, delete_and_terminate/1, requeue/2, tx_publish/3, + tx_ack/3, tx_rollback/2, tx_commit/3, sync_callback/1, + handle_pre_hibernate/1, status/1]). -export([start/1]). @@ -160,13 +160,14 @@ msg_store_clients, persistent_store, persistent_count, - transient_threshold + transient_threshold, + pending_ack }). -record(msg_status, - { msg, + { seq_id, guid, - seq_id, + msg, is_persistent, is_delivered, msg_on_disk, @@ -179,6 +180,8 @@ end_seq_id %% note the end_seq_id is always >, not >= }). +-record(tx, {pending_messages, pending_acks}). + %% When we discover, on publish, that we should write some indices to %% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of %% betas that we must be due to write indices for before we do any @@ -198,8 +201,7 @@ -type(bpqueue() :: any()). -type(seq_id() :: non_neg_integer()). --type(ack() :: {'ack_index_and_store', guid(), seq_id(), atom() | pid()} - | 'ack_not_on_disk'). +-type(ack() :: {'ack', seq_id(), guid(), boolean()} | 'blank_ack'). -type(delta() :: #delta { start_seq_id :: non_neg_integer(), count :: non_neg_integer (), @@ -234,9 +236,8 @@ }). -spec(tx_commit_post_msg_store/5 :: - (boolean(), [guid()], [ack()], {pid(), any()}, state()) -> - {boolean(), state()}). --spec(tx_commit_index/1 :: (state()) -> {boolean(), state()}). + (boolean(), [guid()], [ack()], {pid(), any()}, state()) -> state()). +-spec(tx_commit_index/1 :: (state()) -> state()). -include("rabbit_backing_queue_spec.hrl"). @@ -313,7 +314,8 @@ init(QueueName, IsDurable) -> {rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), TRef}}, persistent_store = PersistentStore, persistent_count = DeltaCount1, - transient_threshold = NextSeqId + transient_threshold = NextSeqId, + pending_ack = dict:new() }, maybe_deltas_to_betas(State). @@ -327,157 +329,185 @@ terminate(State = #vqstate { {persistent_count, PCount}], State #vqstate { index_state = rabbit_queue_index:terminate(Terms, IndexState) }. +%% the only difference between purge and delete is that delete also +%% needs to delete everything that's been delivered and not ack'd. +delete_and_terminate(State) -> + {_PurgeCount, State1 = #vqstate { + index_state = IndexState, + msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}}, + persistent_store = PersistentStore, + transient_threshold = TransientThreshold }} = + purge(State), + %% flushing here is good because it deletes all full segments, + %% leaving only partial segments around. + IndexState1 = rabbit_queue_index:flush_journal(IndexState), + IndexState2 = + case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id( + IndexState1) of + {N, N, IndexState3} -> + IndexState3; + {DeltaSeqId, NextSeqId, IndexState3} -> + {_DeleteCount, IndexState4} = + delete1(PersistentStore, TransientThreshold, NextSeqId, 0, + DeltaSeqId, IndexState3), + IndexState4 + end, + IndexState5 = rabbit_queue_index:terminate_and_erase(IndexState2), + rabbit_msg_store:delete_client(PersistentStore, PRef), + rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef), + rabbit_msg_store:client_terminate(MSCStateP), + rabbit_msg_store:client_terminate(MSCStateT), + State1 #vqstate { index_state = IndexState5 }. + +purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len, + persistent_store = PersistentStore }) -> + {Q4Count, IndexState1} = + remove_queue_entries(PersistentStore, fun rabbit_misc:queue_fold/3, + Q4, IndexState), + {Len, State1} = + purge1(Q4Count, State #vqstate { index_state = IndexState1, + q4 = queue:new() }), + {Len, State1 #vqstate { len = 0, ram_msg_count = 0, ram_index_count = 0, + persistent_count = 0 }}. + publish(Msg, State) -> State1 = limit_ram_index(State), {_SeqId, State2} = publish(Msg, false, false, State1), State2. -publish_delivered(Msg = #basic_message { guid = Guid, - is_persistent = IsPersistent }, +publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> + {blank_ack, State}; +publish_delivered(true, Msg = #basic_message { guid = Guid, + is_persistent = IsPersistent }, State = #vqstate { len = 0, index_state = IndexState, next_seq_id = SeqId, out_counter = OutCount, in_counter = InCount, msg_store_clients = MSCState, persistent_store = PersistentStore, - persistent_count = PCount }) -> - State1 = State #vqstate { out_counter = OutCount + 1, - in_counter = InCount + 1 }, + persistent_count = PCount, + pending_ack = PA }) -> MsgStatus = #msg_status { msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = true, msg_on_disk = false, index_on_disk = false }, {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState), - State2 = State1 #vqstate { msg_store_clients = MSCState1, - persistent_count = PCount + case IsPersistent of - true -> 1; - false -> 0 - end }, - case MsgStatus1 #msg_status.msg_on_disk of - true -> - {#msg_status { index_on_disk = true }, IndexState1} = - maybe_write_index_to_disk(false, MsgStatus1, IndexState), - {{ack_index_and_store, Guid, SeqId, - find_msg_store(IsPersistent, PersistentStore)}, - State2 #vqstate { index_state = IndexState1, - next_seq_id = SeqId + 1 }}; - false -> - {ack_not_on_disk, State2} - end. - -set_ram_duration_target( - DurationTarget, State = #vqstate { avg_egress_rate = AvgEgressRate, - avg_ingress_rate = AvgIngressRate, - target_ram_msg_count = TargetRamMsgCount - }) -> - Rate = AvgEgressRate + AvgIngressRate, - TargetRamMsgCount1 = - case DurationTarget of - infinity -> undefined; - undefined -> undefined; - _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec - end, - State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1, - duration_target = DurationTarget }, - case TargetRamMsgCount1 == undefined orelse - TargetRamMsgCount1 >= TargetRamMsgCount of - true -> State1; - false -> reduce_memory_use(State1) - end. - -ram_duration(State = #vqstate { egress_rate = Egress, - ingress_rate = Ingress, - rate_timestamp = Timestamp, - in_counter = InCount, - out_counter = OutCount, - ram_msg_count = RamMsgCount, - duration_target = DurationTarget, - ram_msg_count_prev = RamMsgCountPrev }) -> - Now = now(), - {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), - {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), - - Duration = %% msgs / (msgs/sec) == sec - case AvgEgressRate == 0 andalso AvgIngressRate == 0 of - true -> infinity; - false -> (RamMsgCountPrev + RamMsgCount) / - (2 * (AvgEgressRate + AvgIngressRate)) - end, - - {Duration, set_ram_duration_target( - DurationTarget, - State #vqstate { egress_rate = Egress1, - avg_egress_rate = AvgEgressRate, - ingress_rate = Ingress1, - avg_ingress_rate = AvgIngressRate, - rate_timestamp = Now, - ram_msg_count_prev = RamMsgCount, - out_counter = 0, in_counter = 0 })}. - -fetch(State = + State1 = State #vqstate { msg_store_clients = MSCState1, + persistent_count = PCount + case IsPersistent of + true -> 1; + false -> 0 + end, + next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1 }, + AckTag = {ack, SeqId, Guid, IsPersistent}, + {AckTag, + case MsgStatus1 #msg_status.msg_on_disk of + true -> + {#msg_status { index_on_disk = true }, IndexState1} = + maybe_write_index_to_disk(false, MsgStatus1, IndexState), + State1 #vqstate { index_state = IndexState1 }; + false -> + State1 #vqstate { pending_ack = + dict:store(AckTag, MsgStatus1, PA) } + end}. + +fetch(AckRequired, State = #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount, index_state = IndexState, len = Len, - persistent_store = PersistentStore }) -> + persistent_store = PersistentStore, pending_ack = PA }) -> case queue:out(Q4) of {empty, _Q4} -> - fetch_from_q3_or_delta(State); - {{value, #msg_status { + fetch_from_q3_or_delta(AckRequired, State); + {{value, MsgStatus = #msg_status { msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, Q4a} -> - {IndexState1, IsPersistent} = - case IndexOnDisk of - true -> - IndexState2 = - case IsDelivered of - false -> rabbit_queue_index:write_delivered( - SeqId, IndexState); - true -> IndexState - end, - {case IsPersistent of - true -> IndexState2; - false -> rabbit_queue_index:write_acks( - [SeqId], IndexState2) - end, IsPersistent}; - false -> %% If index isn't on disk, we can't be persistent - {IndexState, false} - end, + + AckTag = case AckRequired of + true -> {ack, SeqId, Guid, IsPersistent}; + false -> blank_ack + end, + + %% 1. Mark it delivered if necessary + IndexState1 = case IndexOnDisk andalso not IsDelivered of + true -> rabbit_queue_index:write_delivered( + SeqId, IndexState); + false -> IndexState + end, + + %% 2. If it's on disk and there's no Ack required, remove it MsgStore = find_msg_store(IsPersistent, PersistentStore), - AckTag = - case IsPersistent of - true -> true = MsgOnDisk, %% ASSERTION - {ack_index_and_store, Guid, SeqId, MsgStore}; - false -> ok = case MsgOnDisk of - true -> - rabbit_msg_store:remove( - MsgStore, [Guid]); - false -> ok - end, - ack_not_on_disk + IndexState2 = + case MsgOnDisk andalso not AckRequired of + true -> %% Remove from disk now + case IndexOnDisk of + true -> + ok = rabbit_msg_store:remove(MsgStore, [Guid]), + rabbit_queue_index:write_acks([SeqId], + IndexState1); + false -> + ok = case MsgOnDisk of + true -> rabbit_msg_store:remove( + MsgStore, [Guid]); + false -> ok + end, + IndexState1 + end; + false -> + IndexState1 + end, + + %% 3. If it's on disk, not persistent and an ack's + %% required then remove it from the queue index only. + IndexState3 = + case IndexOnDisk andalso AckRequired andalso not IsPersistent of + true -> rabbit_queue_index:write_acks([SeqId], IndexState2); + false -> IndexState2 end, + + %% 4. If it's not on disk and we need an Ack, add it to PA + PA1 = case AckRequired andalso not MsgOnDisk of + true -> dict:store(AckTag, MsgStatus #msg_status { + is_delivered = true }, PA); + false -> PA + end, + Len1 = Len - 1, {{Msg, IsDelivered, AckTag, Len1}, State #vqstate { q4 = Q4a, out_counter = OutCount + 1, ram_msg_count = RamMsgCount - 1, - index_state = IndexState1, len = Len1 }} + index_state = IndexState3, len = Len1, + pending_ack = PA1 }} end. ack([], State) -> State; ack(AckTags, State = #vqstate { index_state = IndexState, persistent_count = PCount, - persistent_store = PersistentStore }) -> - {GuidsByStore, SeqIds} = + persistent_store = PersistentStore, + pending_ack = PA }) -> + {GuidsByStore, SeqIds, PA1} = lists:foldl( - fun (ack_not_on_disk, Acc) -> Acc; - ({ack_index_and_store, Guid, SeqId, MsgStore}, {Dict, SeqIds}) -> - {rabbit_misc:dict_cons(MsgStore, Guid, Dict), [SeqId | SeqIds]} - end, {dict:new(), []}, AckTags), - IndexState1 = case SeqIds of - [] -> IndexState; - _ -> rabbit_queue_index:write_acks(SeqIds, IndexState) - end, + fun (blank_ack, Acc) -> Acc; + ({ack, SeqId, Guid, true}, {Dict, SeqIds, PAN}) -> + {rabbit_misc:dict_cons(PersistentStore, Guid, Dict), + [SeqId | SeqIds], PAN}; + ({ack, _SeqId, Guid, false} = AckTag, {Dict, SeqIds, PAN}) -> + case dict:find(AckTag, PAN) of + error -> + %% must be in the transient store and won't + %% be in the queue index. + {rabbit_misc:dict_cons( + ?TRANSIENT_MSG_STORE, Guid, Dict), SeqIds, PAN}; + {ok, #msg_status { index_on_disk = false, %% ASSERTIONS + msg_on_disk = false, + is_persistent = false }} -> + {Dict, SeqIds, dict:erase(AckTag, PAN)} + end + end, {dict:new(), [], PA}, AckTags), + IndexState1 = rabbit_queue_index:write_acks(SeqIds, IndexState), ok = dict:fold(fun (MsgStore, Guids, ok) -> rabbit_msg_store:remove(MsgStore, Guids) end, ok, GuidsByStore), @@ -485,90 +515,104 @@ ack(AckTags, State = #vqstate { index_state = IndexState, error -> 0; {ok, Guids} -> length(Guids) end, - State #vqstate { index_state = IndexState1, persistent_count = PCount1 }. + State #vqstate { index_state = IndexState1, persistent_count = PCount1, + pending_ack = PA1 }. -len(#vqstate { len = Len }) -> - Len. +tx_publish(Txn, + Msg = #basic_message { is_persistent = true, guid = Guid }, + State = #vqstate { msg_store_clients = MSCState, + persistent_store = PersistentStore }) -> + MsgStatus = #msg_status { + msg = Msg, guid = Guid, seq_id = undefined, is_persistent = true, + is_delivered = false, msg_on_disk = false, index_on_disk = false }, + {#msg_status { msg_on_disk = true }, MSCState1} = + maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState), + publish_in_tx(Txn, Msg), + State #vqstate { msg_store_clients = MSCState1 }; +tx_publish(Txn, Msg, State) -> + publish_in_tx(Txn, Msg), + State. -is_empty(State) -> - 0 == len(State). +tx_ack(Txn, AckTags, State) -> + ack_in_tx(Txn, AckTags), + State. -purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len, - persistent_store = PersistentStore }) -> - {Q4Count, IndexState1} = - remove_queue_entries(PersistentStore, fun rabbit_misc:queue_fold/3, - Q4, IndexState), - {Len, State1} = - purge1(Q4Count, State #vqstate { index_state = IndexState1, - q4 = queue:new() }), - {Len, State1 #vqstate { len = 0, ram_msg_count = 0, ram_index_count = 0, - persistent_count = 0 }}. +tx_rollback(Txn, State = #vqstate { persistent_store = PersistentStore }) -> + #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), + erase_tx(Txn), + ok = rabbit_msg_store:remove(PersistentStore, persistent_guids(Pubs)), + {lists:flatten(AckTags), State}. -%% the only difference between purge and delete is that delete also -%% needs to delete everything that's been delivered and not ack'd. -delete_and_terminate(State) -> - {_PurgeCount, State1 = #vqstate { - index_state = IndexState, - msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}}, - persistent_store = PersistentStore, - transient_threshold = TransientThreshold }} = - purge(State), - %% flushing here is good because it deletes all full segments, - %% leaving only partial segments around. - IndexState1 = rabbit_queue_index:flush_journal(IndexState), - IndexState2 = - case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id( - IndexState1) of - {N, N, IndexState3} -> - IndexState3; - {DeltaSeqId, NextSeqId, IndexState3} -> - {_DeleteCount, IndexState4} = - delete1(PersistentStore, TransientThreshold, NextSeqId, 0, - DeltaSeqId, IndexState3), - IndexState4 - end, - IndexState5 = rabbit_queue_index:terminate_and_erase(IndexState2), - rabbit_msg_store:delete_client(PersistentStore, PRef), - rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef), - rabbit_msg_store:client_terminate(MSCStateP), - rabbit_msg_store:client_terminate(MSCStateT), - State1 #vqstate { index_state = IndexState5 }. +tx_commit(Txn, From, State = #vqstate { persistent_store = PersistentStore }) -> + %% If we are a non-durable queue, or we have no persistent pubs, + %% we can skip the msg_store loop. + #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), + erase_tx(Txn), + PubsOrdered = lists:reverse(Pubs), + AckTags1 = lists:flatten(AckTags), + PersistentGuids = persistent_guids(PubsOrdered), + IsTransientPubs = [] == PersistentGuids, + {AckTags1, + case IsTransientPubs orelse + ?TRANSIENT_MSG_STORE == PersistentStore of + true -> + tx_commit_post_msg_store( + IsTransientPubs, PubsOrdered, AckTags1, From, State); + false -> + Self = self(), + ok = + rabbit_msg_store:sync( + ?PERSISTENT_MSG_STORE, PersistentGuids, + fun () -> + ok = + rabbit_amqqueue:maybe_run_queue_via_backing_queue( + Self, + fun (StateN) -> tx_commit_post_msg_store( + IsTransientPubs, PubsOrdered, + AckTags1, From, StateN) + end) + end), + State + end}. -%% [{Msg, AckTag}] -%% We guarantee that after fetch, only persistent msgs are left on -%% disk. This means that in a requeue, we set MsgOnDisk to true, thus -%% avoiding calls to msg_store:write for persistent msgs. It also -%% means that we don't need to worry about calling msg_store:remove -%% (as ack would do) because transient msgs won't be on disk anyway, -%% thus they won't need to be removed. However, we do call -%% msg_store:release so that the cache isn't held full of msgs which -%% are now at the tail of the queue. -requeue(MsgsWithAckTags, State) -> +requeue(AckTags, State = #vqstate { persistent_store = PersistentStore, + pending_ack = PA }) -> {SeqIds, GuidsByStore, State1 = #vqstate { index_state = IndexState, - persistent_count = PCount, - persistent_store = PersistentStore }} = + persistent_count = PCount }} = lists:foldl( - fun ({Msg = #basic_message { guid = Guid }, AckTag}, - {SeqIdsAcc, Dict, StateN}) -> - {SeqIdsAcc1, Dict1, MsgOnDisk} = - case AckTag of - ack_not_on_disk -> - {SeqIdsAcc, Dict, false}; - {ack_index_and_store, Guid, SeqId, MsgStore} -> - {[SeqId | SeqIdsAcc], - rabbit_misc:dict_cons(MsgStore, Guid, Dict), - true} - end, - {_SeqId, StateN1} = - publish(Msg, true, MsgOnDisk, StateN), - {SeqIdsAcc1, Dict1, StateN1} - end, {[], dict:new(), State}, MsgsWithAckTags), - IndexState1 = - case SeqIds of - [] -> IndexState; - _ -> rabbit_queue_index:write_acks(SeqIds, IndexState) - end, + fun ({ack, SeqId, Guid, IsPersistent} = AckTag, + {SeqIdsAcc, Dict, StateN = #vqstate { + msg_store_clients = MSCStateN }}) -> + case dict:find(AckTag, PA) of + error -> + {{ok, Msg = #basic_message{}}, MSCStateN1} = + read_from_msg_store(PersistentStore, MSCStateN, + IsPersistent, Guid), + StateN1 = StateN #vqstate { + msg_store_clients = MSCStateN1 }, + {_SeqId, StateN2} = publish(Msg, true, true, StateN1), + {SeqIdsAcc1, MsgStore} = + case IsPersistent of + true -> + {[SeqId | SeqIdsAcc], PersistentStore}; + false -> + {SeqIdsAcc, ?TRANSIENT_MSG_STORE} + end, + {SeqIdsAcc1, + rabbit_misc:dict_cons(MsgStore, Guid, Dict), + StateN2}; + {ok, #msg_status { index_on_disk = false, + msg_on_disk = false, + is_persistent = false, + msg = Msg }} -> + {_SeqId, StateN1} = publish(Msg, true, false, StateN), + {SeqIdsAcc, Dict, + StateN1 #vqstate { + pending_ack = dict:erase(AckTag, PA) }} + end + end, {[], dict:new(), State}, AckTags), + IndexState1 = rabbit_queue_index:write_acks(SeqIds, IndexState), ok = dict:fold(fun (MsgStore, Guids, ok) -> rabbit_msg_store:release(MsgStore, Guids) end, ok, GuidsByStore), @@ -579,98 +623,60 @@ requeue(MsgsWithAckTags, State) -> State1 #vqstate { index_state = IndexState1, persistent_count = PCount1 }. -tx_publish(Msg = #basic_message { is_persistent = true, guid = Guid }, - State = #vqstate { msg_store_clients = MSCState, - persistent_store = PersistentStore }) -> - MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = undefined, is_persistent = true, - is_delivered = false, msg_on_disk = false, index_on_disk = false }, - {#msg_status { msg_on_disk = true }, MSCState1} = - maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState), - State #vqstate { msg_store_clients = MSCState1 }; -tx_publish(_Msg, State) -> - State. +len(#vqstate { len = Len }) -> + Len. -tx_rollback(Pubs, State = #vqstate { persistent_store = PersistentStore }) -> - ok = case persistent_guids(Pubs) of - [] -> ok; - PP -> rabbit_msg_store:remove(PersistentStore, PP) - end, - State. +is_empty(State) -> + 0 == len(State). -tx_commit(Pubs, AckTags, From, State = - #vqstate { persistent_store = PersistentStore }) -> - %% If we are a non-durable queue, or we have no persistent pubs, - %% we can skip the msg_store loop. - PersistentGuids = persistent_guids(Pubs), - IsTransientPubs = [] == PersistentGuids, - case IsTransientPubs orelse - ?TRANSIENT_MSG_STORE == PersistentStore of - true -> - tx_commit_post_msg_store( - IsTransientPubs, Pubs, AckTags, From, State); - false -> - Self = self(), - ok = rabbit_msg_store:sync( - ?PERSISTENT_MSG_STORE, PersistentGuids, - fun () -> ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue( - Self, - fun (StateN) -> tx_commit_post_msg_store( - IsTransientPubs, Pubs, - AckTags, From, StateN) - end) - end), - {false, State} +set_ram_duration_target( + DurationTarget, State = #vqstate { avg_egress_rate = AvgEgressRate, + avg_ingress_rate = AvgIngressRate, + target_ram_msg_count = TargetRamMsgCount + }) -> + Rate = AvgEgressRate + AvgIngressRate, + TargetRamMsgCount1 = + case DurationTarget of + infinity -> undefined; + undefined -> undefined; + _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec + end, + State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1, + duration_target = DurationTarget }, + case TargetRamMsgCount1 == undefined orelse + TargetRamMsgCount1 >= TargetRamMsgCount of + true -> State1; + false -> reduce_memory_use(State1) end. -tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, From, State = - #vqstate { on_sync = OnSync = {SAcks, SPubs, SFroms}, - persistent_store = PersistentStore }) -> - %% If we are a non-durable queue, or (no persisent pubs, and no - %% persistent acks) then we can skip the queue_index loop. - DiskAcks = - lists:filter(fun (AckTag) -> AckTag /= ack_not_on_disk end, AckTags), - case PersistentStore == ?TRANSIENT_MSG_STORE orelse - (IsTransientPubs andalso [] == DiskAcks) of - true -> {Res, State1} = - tx_commit_index(State #vqstate { - on_sync = {[], [Pubs], [From]} }), - {Res, State1 #vqstate { on_sync = OnSync }}; - false -> {false, State #vqstate { on_sync = { [DiskAcks | SAcks], - [Pubs | SPubs], - [From | SFroms] }}} - end. +ram_duration(State = #vqstate { egress_rate = Egress, + ingress_rate = Ingress, + rate_timestamp = Timestamp, + in_counter = InCount, + out_counter = OutCount, + ram_msg_count = RamMsgCount, + duration_target = DurationTarget, + ram_msg_count_prev = RamMsgCountPrev }) -> + Now = now(), + {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), + {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), -tx_commit_index(State = #vqstate { on_sync = {_, _, []} }) -> - {false, State}; -tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, - persistent_store = PersistentStore }) -> - Acks = lists:flatten(SAcks), - State1 = ack(Acks, State), - AckSeqIds = lists:foldl(fun ({ack_index_and_store, _Guid, - SeqId, ?PERSISTENT_MSG_STORE}, SeqIdsAcc) -> - [SeqId | SeqIdsAcc]; - (_, SeqIdsAcc) -> - SeqIdsAcc - end, [], Acks), - IsPersistentStore = ?PERSISTENT_MSG_STORE == PersistentStore, - Pubs = lists:flatten(lists:reverse(SPubs)), - {SeqIds, State2 = #vqstate { index_state = IndexState }} = - lists:foldl( - fun (Msg = #basic_message { is_persistent = IsPersistent }, - {SeqIdsAcc, StateN}) -> - {SeqId, StateN1} = - publish(Msg, false, IsPersistent, StateN), - {case IsPersistentStore andalso IsPersistent of - true -> [SeqId | SeqIdsAcc]; - false -> SeqIdsAcc - end, StateN1} - end, {AckSeqIds, State1}, Pubs), - IndexState1 = - rabbit_queue_index:sync_seq_ids(SeqIds, IndexState), - [ gen_server2:reply(From, ok) || From <- lists:reverse(SFroms) ], - {Pubs /= [], - State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }}. + Duration = %% msgs / (msgs/sec) == sec + case AvgEgressRate == 0 andalso AvgIngressRate == 0 of + true -> infinity; + false -> (RamMsgCountPrev + RamMsgCount) / + (2 * (AvgEgressRate + AvgIngressRate)) + end, + + {Duration, set_ram_duration_target( + DurationTarget, + State #vqstate { egress_rate = Egress1, + avg_egress_rate = AvgEgressRate, + ingress_rate = Ingress1, + avg_ingress_rate = AvgIngressRate, + rate_timestamp = Now, + ram_msg_count_prev = RamMsgCount, + out_counter = 0, in_counter = 0 })}. sync_callback(#vqstate { on_sync = {_, _, []} }) -> undefined; sync_callback(_) -> fun tx_commit_index/1. @@ -705,6 +711,27 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, %% Minor helpers %%---------------------------------------------------------------------------- +lookup_tx(Txn) -> + case get({txn, Txn}) of + undefined -> #tx { pending_messages = [], + pending_acks = [] }; + V -> V + end. + +store_tx(Txn, Tx) -> + put({txn, Txn}, Tx). + +erase_tx(Txn) -> + erase({txn, Txn}). + +publish_in_tx(Txn, Msg) -> + Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), + store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }). + +ack_in_tx(Txn, AckTags) -> + Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), + store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }). + update_rate(Now, Then, Count, {OThen, OCount}) -> %% form the avg over the current period and the previous Avg = 1000000 * ((Count + OCount) / timer:now_diff(Now, OThen)), @@ -712,7 +739,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> persistent_guids(Pubs) -> [Guid || Obj = #basic_message { guid = Guid } <- Pubs, - Obj #basic_message.is_persistent]. + Obj #basic_message.is_persistent]. betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) -> {Filtered, IndexState1} = @@ -805,6 +832,50 @@ should_force_index_to_disk(State = %% Internal major helpers for Public API %%---------------------------------------------------------------------------- +tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, From, State = + #vqstate { on_sync = OnSync = {SAcks, SPubs, SFroms}, + persistent_store = PersistentStore }) -> + %% If we are a non-durable queue, or (no persisent pubs, and no + %% persistent acks) then we can skip the queue_index loop. + case PersistentStore == ?TRANSIENT_MSG_STORE orelse + (IsTransientPubs andalso [] == AckTags) of %%% AGH FIX ME + true -> State1 = tx_commit_index(State #vqstate { + on_sync = {[], [Pubs], [From]} }), + State1 #vqstate { on_sync = OnSync }; + false -> State #vqstate { on_sync = { [AckTags | SAcks], + [Pubs | SPubs], + [From | SFroms] }} + end. + +tx_commit_index(State = #vqstate { on_sync = {_, _, []} }) -> + State; +tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, + persistent_store = PersistentStore }) -> + Acks = lists:flatten(SAcks), + State1 = ack(Acks, State), + AckSeqIds = lists:foldl(fun ({ack, SeqId, _Guid, true}, SeqIdsAcc) -> + [SeqId | SeqIdsAcc]; + (_, SeqIdsAcc) -> + SeqIdsAcc + end, [], Acks), + IsPersistentStore = ?PERSISTENT_MSG_STORE == PersistentStore, + Pubs = lists:flatten(lists:reverse(SPubs)), + {SeqIds, State2 = #vqstate { index_state = IndexState }} = + lists:foldl( + fun (Msg = #basic_message { is_persistent = IsPersistent }, + {SeqIdsAcc, StateN}) -> + {SeqId, StateN1} = + publish(Msg, false, IsPersistent, StateN), + {case IsPersistentStore andalso IsPersistent of + true -> [SeqId | SeqIdsAcc]; + false -> SeqIdsAcc + end, StateN1} + end, {AckSeqIds, State1}, Pubs), + IndexState1 = + rabbit_queue_index:sync_seq_ids(SeqIds, IndexState), + [ gen_server2:reply(From, ok) || From <- lists:reverse(SFroms) ], + State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }. + delete1(_PersistentStore, _TransientThreshold, NextSeqId, Count, DeltaSeqId, IndexState) when DeltaSeqId >= NextSeqId -> {Count, IndexState}; @@ -887,7 +958,8 @@ remove_queue_entries1( end, {PersistentStore, CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}. -fetch_from_q3_or_delta(State = #vqstate { +fetch_from_q3_or_delta(AckRequired, + State = #vqstate { q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, @@ -933,7 +1005,7 @@ fetch_from_q3_or_delta(State = #vqstate { %% delta and q3 are maintained State1 end, - fetch(State2) + fetch(AckRequired, State2) end. reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount, |
