summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-09-02 18:14:46 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-09-02 18:14:46 +0100
commit6a258c35498d38b3fcf434129fe6d75adb41cdb1 (patch)
treef62c732f4dffd1bf305202cb9ee16d17e94a3f65 /src
parent0d726de461fd6afc4755ce03a803181fec47e51c (diff)
downloadrabbitmq-server-git-6a258c35498d38b3fcf434129fe6d75adb41cdb1.tar.gz
moved most of the confirm logic from amqqueue_process to variable_queue
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl68
-rw-r--r--src/rabbit_variable_queue.erl94
2 files changed, 83 insertions, 79 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ce5c8162a1..3df59de6b1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -61,9 +61,7 @@
rate_timer_ref,
expiry_timer_ref,
stats_timer,
- guid_to_channel,
- msgs_on_disk,
- msg_indices_on_disk
+ guid_to_channel
}).
-record(consumer, {tag, ack_required}).
@@ -126,9 +124,7 @@ init(Q) ->
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
stats_timer = rabbit_event:init_stats_timer(),
- guid_to_channel = dict:new(),
- msgs_on_disk = gb_sets:new(),
- msg_indices_on_disk = gb_sets:new()}, hibernate,
+ guid_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -406,17 +402,13 @@ deliver_from_queue_deliver(AckRequired, false,
{{Message, IsDelivered, AckTag}, 0 == Remaining,
State #q { backing_queue_state = BQS1 }}.
-confirm_message_internal(Guid, State = #q { guid_to_channel = GTC,
- msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD }) ->
+confirm_message_internal(Guid, State = #q { guid_to_channel = GTC }) ->
case dict:find(Guid, GTC) of
{ok, {_ , undefined}} -> ok;
{ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo);
_ -> ok
end,
- State #q { guid_to_channel = dict:erase(Guid, GTC),
- msgs_on_disk = gb_sets:delete_any(Guid, MOD),
- msg_indices_on_disk = gb_sets:delete_any(Guid, MIOD) }.
+ State #q { guid_to_channel = dict:erase(Guid, GTC) }.
maybe_record_confirm_message(undefined, _, _, State) ->
State;
@@ -553,7 +545,15 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- run_message_queue(State#q{backing_queue_state = Fun(BQS)}).
+ case Fun(BQS) of
+ {BQS1, {confirm, Guids}} ->
+ State1 = lists:foldl(fun (Guid, State0) ->
+ confirm_message_internal(Guid, State0) end,
+ State, Guids),
+ State1 #q { backing_queue_state = BQS1};
+ BQS1 ->
+ run_message_queue(State#q{backing_queue_state = BQS1})
+ end.
commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -851,48 +851,6 @@ handle_cast({ack, Txn, AckTags, ChPid},
noreply(State#q{backing_queue_state = BQS1})
end;
-%% Called when variable queue gets ack from a consumer.
-handle_cast({confirm_messages, Guids}, State) ->
- noreply(lists:foldl(fun (Guid, State0) ->
- confirm_message_internal(Guid, State0)
- end, State, Guids));
-
-handle_cast({msgs_written_to_disk, Guids},
- State = #q{guid_to_channel = GTC,
- msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD}) ->
- GuidSet = gb_sets:from_list(
- lists:filter(fun(Guid) ->
- dict:is_key(Guid, GTC)
- end, Guids)),
- ToConfirmMsgs = gb_sets:intersection(GuidSet, MIOD),
- gb_sets:fold(fun (Guid, State0) ->
- confirm_message_internal(Guid, State0)
- end, State, ToConfirmMsgs),
- noreply(State#q{msgs_on_disk =
- gb_sets:difference(gb_sets:union(MOD, GuidSet),
- ToConfirmMsgs),
- msg_indices_on_disk =
- gb_sets:difference(MIOD, ToConfirmMsgs)});
-
-handle_cast({msg_indices_written_to_disk, Guids},
- State = #q{guid_to_channel = GTC,
- msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD}) ->
- GuidSet = gb_sets:from_list(
- lists:filter(fun(Guid) ->
- dict:is_key(Guid, GTC)
- end, Guids)),
- ToConfirmMsgs = gb_sets:intersection(GuidSet, MOD),
- gb_sets:fold(fun (Guid, State0) ->
- confirm_message_internal(Guid, State0)
- end, State, ToConfirmMsgs),
- noreply(State#q{msgs_on_disk =
- gb_sets:difference(MOD, ToConfirmMsgs),
- msg_indices_on_disk =
- gb_sets:difference(gb_sets:union(MIOD, GuidSet),
- ToConfirmMsgs)});
-
handle_cast({reject, AckTags, Requeue, ChPid},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case lookup_ch(ChPid) of
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0b0cec46ed..448e8c93af 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,7 +31,7 @@
-module(rabbit_variable_queue).
--export([init/3, init/4, terminate/1, delete_and_terminate/1,
+-export([init/3, terminate/1, delete_and_terminate/1,
purge/1, publish/2, publish_delivered/3, fetch/2, ack/2,
tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3,
requeue/2, len/1, is_empty/1,
@@ -237,8 +237,9 @@
out_counter,
in_counter,
rates,
- confirm_functions
- }).
+ msgs_on_disk,
+ msg_indices_on_disk
+ }).
-record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }).
@@ -324,7 +325,8 @@
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
rates :: rates(),
- confirm_functions :: {any(), any(), any()} }).
+ msgs_on_disk :: gb_set(),
+ msg_indices_on_disk :: gb_set()}).
-include("rabbit_backing_queue_spec.hrl").
@@ -371,20 +373,6 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
init(QueueName, IsDurable, Recover) ->
- Self = self(),
- init(QueueName, IsDurable, Recover,
- { fun(Guids) -> %% index-on-disk fun
- gen_server2:cast(Self,
- {msg_indices_written_to_disk, Guids})
- end,
- fun (Guids) -> %% msg-on-disk fun
- gen_server2:cast(Self, {msgs_written_to_disk, Guids})
- end,
- fun (Guids) -> %% ack-received fun
- gen_server2:cast(Self, {confirm_messages, Guids})
- end }).
-
-init(QueueName, IsDurable, Recover, {IndicesOnDisk, MsgsOnDisk, _} = CF) ->
{DeltaCount, Terms, IndexState} =
rabbit_queue_index:init(
QueueName, Recover,
@@ -392,7 +380,7 @@ init(QueueName, IsDurable, Recover, {IndicesOnDisk, MsgsOnDisk, _} = CF) ->
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
end,
- IndicesOnDisk),
+ fun msg_indices_written_to_disk/1),
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
{PRef, TRef, Terms1} =
@@ -416,9 +404,12 @@ init(QueueName, IsDurable, Recover, {IndicesOnDisk, MsgsOnDisk, _} = CF) ->
false -> undefined
end,
+ Self = self(),
rabbit_msg_store:register_sync_callback(
?PERSISTENT_MSG_STORE,
- MsgsOnDisk),
+ fun (Guids) ->
+ msgs_written_to_disk(Self, Guids)
+ end),
TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef),
State = #vqstate {
@@ -451,7 +442,8 @@ init(QueueName, IsDurable, Recover, {IndicesOnDisk, MsgsOnDisk, _} = CF) ->
avg_egress = 0.0,
avg_ingress = 0.0,
timestamp = Now },
- confirm_functions = CF},
+ msgs_on_disk = gb_sets:new(),
+ msg_indices_on_disk = gb_sets:new()},
a(maybe_deltas_to_betas(State)).
terminate(State) ->
@@ -1126,8 +1118,7 @@ remove_pending_ack(KeepPersistent,
ack(_MsgStoreFun, _Fun, [], State) ->
State;
-ack(MsgStoreFun, Fun, AckTags,
- State = #vqstate { confirm_functions = {_, _, AcksReceived} }) ->
+ack(MsgStoreFun, Fun, AckTags, State) ->
{{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
lists:foldl(
@@ -1139,7 +1130,7 @@ ack(MsgStoreFun, Fun, AckTags,
end, {{[], orddict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = orddict:fold(fun (MsgStore, Guids, ok) ->
- AcksReceived(Guids),
+ confirm_messages(Guids),
MsgStoreFun(MsgStore, Guids)
end, ok, GuidsByStore),
PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
@@ -1157,6 +1148,61 @@ accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) ->
{cons_if(IsPersistent, SeqId, SeqIdsAcc),
rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}.
+
+%%----------------------------------------------------------------------------
+%% Internal plumbing for confirms (aka publisher acks)
+%%----------------------------------------------------------------------------
+
+confirm_messages(Guids) ->
+ Self = self(),
+ spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
+ Self,
+ fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD }) ->
+ { State #vqstate {
+ msgs_on_disk =
+ gb_sets:difference(MOD, gb_sets:from_list(Guids)),
+ msg_indices_on_disk =
+ gb_sets:delete_any(MIOD, gb_sets:from_list(Guids)) },
+ {confirm, Guids} }
+ end)
+ end).
+
+msgs_written_to_disk(QPid, Guids) ->
+ spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
+ QPid,
+ fun(State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD}) ->
+ GuidSet = gb_sets:from_list(Guids),
+ ToConfirmMsgs = gb_sets:intersection(GuidSet, MIOD),
+ { State #vqstate {
+ msgs_on_disk =
+ gb_sets:difference(gb_sets:union(MOD, GuidSet),
+ ToConfirmMsgs),
+ msg_indices_on_disk =
+ gb_sets:difference(MIOD, ToConfirmMsgs) },
+ {confirm, gb_sets:to_list(ToConfirmMsgs)} }
+ end)
+ end).
+
+msg_indices_written_to_disk(Guids) ->
+ Self = self(),
+ spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
+ Self,
+ fun(State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD}) ->
+ GuidSet = gb_sets:from_list(Guids),
+ ToConfirmMsgs = gb_sets:intersection(GuidSet, MOD),
+ { State #vqstate {
+ msgs_on_disk =
+ gb_sets:difference(MOD, ToConfirmMsgs),
+ msg_indices_on_disk =
+ gb_sets:difference(gb_sets:union(MIOD, GuidSet),
+ ToConfirmMsgs) },
+ {confirm, gb_sets:to_list(ToConfirmMsgs)} }
+ end)
+ end).
+
%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------