summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-20 14:15:51 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-20 14:15:51 +0100
commit1541193155b4dd1870111fcb4f7a61ab11b5d369 (patch)
treebe31ad66f21d5fc74c9f9f20266f911224227dc6 /src
parent7531d0f281c1a7a652336e31dbcaeb502b6715ab (diff)
downloadrabbitmq-server-git-1541193155b4dd1870111fcb4f7a61ab11b5d369.tar.gz
Switched the From to a Fun and hence CPS
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl3
-rw-r--r--src/rabbit_backing_queue.erl5
-rw-r--r--src/rabbit_variable_queue.erl26
3 files changed, 19 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 697282fe3f..b10baacb46 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -469,7 +469,8 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- {AckTags, BQS1} = BQ:tx_commit(Txn, From, BQS),
+ {AckTags, BQS1} =
+ BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS),
%% ChPid must be known here because of the participant management
%% by the channel.
C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 8e7de95e1b..7090d9cc59 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -83,7 +83,10 @@ behaviour_info(callbacks) ->
{tx_rollback, 2},
%% Commit these publishes and acktags. The publishes you will
- %% have previously seen in calls to tx_publish.
+ %% have previously seen in calls to tx_publish, and the acks in
+ %% calls to tx_ack. The Fun passed in must be called once the
+ %% messages have really been commited. This CPS permits the
+ %% possibility of commit coalescing.
{tx_commit, 3},
%% Reinsert messages into the queue which have already been
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9328164b64..d53e6e3f55 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -180,7 +180,7 @@
end_seq_id %% note the end_seq_id is always >, not >=
}).
--record(tx, {pending_messages, pending_acks}).
+-record(tx, { pending_messages, pending_acks }).
%% When we discover, on publish, that we should write some indices to
%% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of
@@ -228,7 +228,7 @@
avg_ingress_rate :: float(),
rate_timestamp :: {integer(), integer(), integer()},
len :: non_neg_integer(),
- on_sync :: {[[ack()]], [[guid()]], [{pid(), any()}]},
+ on_sync :: {[[ack()]], [[guid()]], [fun (() -> any())]},
msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}},
persistent_store :: pid() | atom(),
persistent_count :: non_neg_integer(),
@@ -552,7 +552,7 @@ tx_rollback(Txn, State = #vqstate { persistent_store = PersistentStore }) ->
ok = rabbit_msg_store:remove(PersistentStore, persistent_guids(Pubs)),
{lists:flatten(AckTags), State}.
-tx_commit(Txn, From, State = #vqstate { persistent_store = PersistentStore }) ->
+tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore }) ->
%% If we are a non-durable queue, or we have no persistent pubs,
%% we can skip the msg_store loop.
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
@@ -566,12 +566,12 @@ tx_commit(Txn, From, State = #vqstate { persistent_store = PersistentStore }) ->
?TRANSIENT_MSG_STORE == PersistentStore of
true ->
tx_commit_post_msg_store(
- IsTransientPubs, PubsOrdered, AckTags1, From, State);
+ IsTransientPubs, PubsOrdered, AckTags1, Fun, State);
false ->
ok = rabbit_msg_store:sync(
?PERSISTENT_MSG_STORE, PersistentGuids,
msg_store_callback(PersistentGuids, IsTransientPubs,
- PubsOrdered, AckTags1, From)),
+ PubsOrdered, AckTags1, Fun)),
State
end}.
@@ -881,7 +881,7 @@ should_force_index_to_disk(State =
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, From) ->
+msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, Fun) ->
Self = self(),
fun() ->
spawn(
@@ -895,14 +895,14 @@ msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, From) ->
Self, fun (StateN) ->
tx_commit_post_msg_store(
IsTransientPubs, Pubs,
- AckTags, From, StateN)
+ AckTags, Fun, StateN)
end)
end)
end)
end.
-tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, From, State =
- #vqstate { on_sync = OnSync = {SAcks, SPubs, SFroms},
+tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, Fun, State =
+ #vqstate { on_sync = OnSync = {SAcks, SPubs, SFuns},
persistent_store = PersistentStore,
pending_ack = PA }) ->
%% If we are a non-durable queue, or (no persisent pubs, and no
@@ -918,16 +918,16 @@ tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, From, State =
(_AckTag, false) -> false
end, true, AckTags)) of
true -> State1 = tx_commit_index(State #vqstate {
- on_sync = {[], [Pubs], [From]} }),
+ on_sync = {[], [Pubs], [Fun]} }),
State1 #vqstate { on_sync = OnSync };
false -> State #vqstate { on_sync = { [AckTags | SAcks],
[Pubs | SPubs],
- [From | SFroms] }}
+ [Fun | SFuns] }}
end.
tx_commit_index(State = #vqstate { on_sync = {_, _, []} }) ->
State;
-tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms},
+tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns},
persistent_store = PersistentStore }) ->
Acks = lists:flatten(SAcks),
State1 = ack(Acks, State),
@@ -946,7 +946,7 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms},
end, {Acks, State1}, Pubs),
IndexState1 =
rabbit_queue_index:sync_seq_ids(SeqIds, IndexState),
- [ gen_server2:reply(From, ok) || From <- lists:reverse(SFroms) ],
+ [ Fun() || Fun <- lists:reverse(SFuns) ],
State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }.
delete1(_PersistentStore, _TransientThreshold, NextSeqId, Count, DeltaSeqId,