diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-02 18:14:46 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-02 18:14:46 +0100 |
| commit | 6a258c35498d38b3fcf434129fe6d75adb41cdb1 (patch) | |
| tree | f62c732f4dffd1bf305202cb9ee16d17e94a3f65 /src | |
| parent | 0d726de461fd6afc4755ce03a803181fec47e51c (diff) | |
| download | rabbitmq-server-git-6a258c35498d38b3fcf434129fe6d75adb41cdb1.tar.gz | |
moved most of the confirm logic from amqqueue_process to variable_queue
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 68 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 94 |
2 files changed, 83 insertions, 79 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ce5c8162a1..3df59de6b1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -61,9 +61,7 @@ rate_timer_ref, expiry_timer_ref, stats_timer, - guid_to_channel, - msgs_on_disk, - msg_indices_on_disk + guid_to_channel }). -record(consumer, {tag, ack_required}). @@ -126,9 +124,7 @@ init(Q) -> rate_timer_ref = undefined, expiry_timer_ref = undefined, stats_timer = rabbit_event:init_stats_timer(), - guid_to_channel = dict:new(), - msgs_on_disk = gb_sets:new(), - msg_indices_on_disk = gb_sets:new()}, hibernate, + guid_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -406,17 +402,13 @@ deliver_from_queue_deliver(AckRequired, false, {{Message, IsDelivered, AckTag}, 0 == Remaining, State #q { backing_queue_state = BQS1 }}. -confirm_message_internal(Guid, State = #q { guid_to_channel = GTC, - msgs_on_disk = MOD, - msg_indices_on_disk = MIOD }) -> +confirm_message_internal(Guid, State = #q { guid_to_channel = GTC }) -> case dict:find(Guid, GTC) of {ok, {_ , undefined}} -> ok; {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo); _ -> ok end, - State #q { guid_to_channel = dict:erase(Guid, GTC), - msgs_on_disk = gb_sets:delete_any(Guid, MOD), - msg_indices_on_disk = gb_sets:delete_any(Guid, MIOD) }. + State #q { guid_to_channel = dict:erase(Guid, GTC) }. maybe_record_confirm_message(undefined, _, _, State) -> State; @@ -553,7 +545,15 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - run_message_queue(State#q{backing_queue_state = Fun(BQS)}). + case Fun(BQS) of + {BQS1, {confirm, Guids}} -> + State1 = lists:foldl(fun (Guid, State0) -> + confirm_message_internal(Guid, State0) end, + State, Guids), + State1 #q { backing_queue_state = BQS1}; + BQS1 -> + run_message_queue(State#q{backing_queue_state = BQS1}) + end. commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -851,48 +851,6 @@ handle_cast({ack, Txn, AckTags, ChPid}, noreply(State#q{backing_queue_state = BQS1}) end; -%% Called when variable queue gets ack from a consumer. -handle_cast({confirm_messages, Guids}, State) -> - noreply(lists:foldl(fun (Guid, State0) -> - confirm_message_internal(Guid, State0) - end, State, Guids)); - -handle_cast({msgs_written_to_disk, Guids}, - State = #q{guid_to_channel = GTC, - msgs_on_disk = MOD, - msg_indices_on_disk = MIOD}) -> - GuidSet = gb_sets:from_list( - lists:filter(fun(Guid) -> - dict:is_key(Guid, GTC) - end, Guids)), - ToConfirmMsgs = gb_sets:intersection(GuidSet, MIOD), - gb_sets:fold(fun (Guid, State0) -> - confirm_message_internal(Guid, State0) - end, State, ToConfirmMsgs), - noreply(State#q{msgs_on_disk = - gb_sets:difference(gb_sets:union(MOD, GuidSet), - ToConfirmMsgs), - msg_indices_on_disk = - gb_sets:difference(MIOD, ToConfirmMsgs)}); - -handle_cast({msg_indices_written_to_disk, Guids}, - State = #q{guid_to_channel = GTC, - msgs_on_disk = MOD, - msg_indices_on_disk = MIOD}) -> - GuidSet = gb_sets:from_list( - lists:filter(fun(Guid) -> - dict:is_key(Guid, GTC) - end, Guids)), - ToConfirmMsgs = gb_sets:intersection(GuidSet, MOD), - gb_sets:fold(fun (Guid, State0) -> - confirm_message_internal(Guid, State0) - end, State, ToConfirmMsgs), - noreply(State#q{msgs_on_disk = - gb_sets:difference(MOD, ToConfirmMsgs), - msg_indices_on_disk = - gb_sets:difference(gb_sets:union(MIOD, GuidSet), - ToConfirmMsgs)}); - handle_cast({reject, AckTags, Requeue, ChPid}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case lookup_ch(ChPid) of diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0b0cec46ed..448e8c93af 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,7 +31,7 @@ -module(rabbit_variable_queue). --export([init/3, init/4, terminate/1, delete_and_terminate/1, +-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, @@ -237,8 +237,9 @@ out_counter, in_counter, rates, - confirm_functions - }). + msgs_on_disk, + msg_indices_on_disk + }). -record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). @@ -324,7 +325,8 @@ out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), rates :: rates(), - confirm_functions :: {any(), any(), any()} }). + msgs_on_disk :: gb_set(), + msg_indices_on_disk :: gb_set()}). -include("rabbit_backing_queue_spec.hrl"). @@ -371,20 +373,6 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). init(QueueName, IsDurable, Recover) -> - Self = self(), - init(QueueName, IsDurable, Recover, - { fun(Guids) -> %% index-on-disk fun - gen_server2:cast(Self, - {msg_indices_written_to_disk, Guids}) - end, - fun (Guids) -> %% msg-on-disk fun - gen_server2:cast(Self, {msgs_written_to_disk, Guids}) - end, - fun (Guids) -> %% ack-received fun - gen_server2:cast(Self, {confirm_messages, Guids}) - end }). - -init(QueueName, IsDurable, Recover, {IndicesOnDisk, MsgsOnDisk, _} = CF) -> {DeltaCount, Terms, IndexState} = rabbit_queue_index:init( QueueName, Recover, @@ -392,7 +380,7 @@ init(QueueName, IsDurable, Recover, {IndicesOnDisk, MsgsOnDisk, _} = CF) -> fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end, - IndicesOnDisk), + fun msg_indices_written_to_disk/1), {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), {PRef, TRef, Terms1} = @@ -416,9 +404,12 @@ init(QueueName, IsDurable, Recover, {IndicesOnDisk, MsgsOnDisk, _} = CF) -> false -> undefined end, + Self = self(), rabbit_msg_store:register_sync_callback( ?PERSISTENT_MSG_STORE, - MsgsOnDisk), + fun (Guids) -> + msgs_written_to_disk(Self, Guids) + end), TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), State = #vqstate { @@ -451,7 +442,8 @@ init(QueueName, IsDurable, Recover, {IndicesOnDisk, MsgsOnDisk, _} = CF) -> avg_egress = 0.0, avg_ingress = 0.0, timestamp = Now }, - confirm_functions = CF}, + msgs_on_disk = gb_sets:new(), + msg_indices_on_disk = gb_sets:new()}, a(maybe_deltas_to_betas(State)). terminate(State) -> @@ -1126,8 +1118,7 @@ remove_pending_ack(KeepPersistent, ack(_MsgStoreFun, _Fun, [], State) -> State; -ack(MsgStoreFun, Fun, AckTags, - State = #vqstate { confirm_functions = {_, _, AcksReceived} }) -> +ack(MsgStoreFun, Fun, AckTags, State) -> {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = lists:foldl( @@ -1139,7 +1130,7 @@ ack(MsgStoreFun, Fun, AckTags, end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = orddict:fold(fun (MsgStore, Guids, ok) -> - AcksReceived(Guids), + confirm_messages(Guids), MsgStoreFun(MsgStore, Guids) end, ok, GuidsByStore), PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of @@ -1157,6 +1148,61 @@ accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. + +%%---------------------------------------------------------------------------- +%% Internal plumbing for confirms (aka publisher acks) +%%---------------------------------------------------------------------------- + +confirm_messages(Guids) -> + Self = self(), + spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( + Self, + fun (State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD }) -> + { State #vqstate { + msgs_on_disk = + gb_sets:difference(MOD, gb_sets:from_list(Guids)), + msg_indices_on_disk = + gb_sets:delete_any(MIOD, gb_sets:from_list(Guids)) }, + {confirm, Guids} } + end) + end). + +msgs_written_to_disk(QPid, Guids) -> + spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( + QPid, + fun(State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD}) -> + GuidSet = gb_sets:from_list(Guids), + ToConfirmMsgs = gb_sets:intersection(GuidSet, MIOD), + { State #vqstate { + msgs_on_disk = + gb_sets:difference(gb_sets:union(MOD, GuidSet), + ToConfirmMsgs), + msg_indices_on_disk = + gb_sets:difference(MIOD, ToConfirmMsgs) }, + {confirm, gb_sets:to_list(ToConfirmMsgs)} } + end) + end). + +msg_indices_written_to_disk(Guids) -> + Self = self(), + spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( + Self, + fun(State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD}) -> + GuidSet = gb_sets:from_list(Guids), + ToConfirmMsgs = gb_sets:intersection(GuidSet, MOD), + { State #vqstate { + msgs_on_disk = + gb_sets:difference(MOD, ToConfirmMsgs), + msg_indices_on_disk = + gb_sets:difference(gb_sets:union(MIOD, GuidSet), + ToConfirmMsgs) }, + {confirm, gb_sets:to_list(ToConfirmMsgs)} } + end) + end). + %%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- |
