diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-20 14:15:51 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-20 14:15:51 +0100 |
| commit | 1541193155b4dd1870111fcb4f7a61ab11b5d369 (patch) | |
| tree | be31ad66f21d5fc74c9f9f20266f911224227dc6 /src | |
| parent | 7531d0f281c1a7a652336e31dbcaeb502b6715ab (diff) | |
| download | rabbitmq-server-git-1541193155b4dd1870111fcb4f7a61ab11b5d369.tar.gz | |
Switched the From to a Fun and hence CPS
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 26 |
3 files changed, 19 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 697282fe3f..b10baacb46 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -469,7 +469,8 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {AckTags, BQS1} = BQ:tx_commit(Txn, From, BQS), + {AckTags, BQS1} = + BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS), %% ChPid must be known here because of the participant management %% by the channel. C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 8e7de95e1b..7090d9cc59 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -83,7 +83,10 @@ behaviour_info(callbacks) -> {tx_rollback, 2}, %% Commit these publishes and acktags. The publishes you will - %% have previously seen in calls to tx_publish. + %% have previously seen in calls to tx_publish, and the acks in + %% calls to tx_ack. The Fun passed in must be called once the + %% messages have really been commited. This CPS permits the + %% possibility of commit coalescing. {tx_commit, 3}, %% Reinsert messages into the queue which have already been diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 9328164b64..d53e6e3f55 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -180,7 +180,7 @@ end_seq_id %% note the end_seq_id is always >, not >= }). --record(tx, {pending_messages, pending_acks}). +-record(tx, { pending_messages, pending_acks }). %% When we discover, on publish, that we should write some indices to %% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of @@ -228,7 +228,7 @@ avg_ingress_rate :: float(), rate_timestamp :: {integer(), integer(), integer()}, len :: non_neg_integer(), - on_sync :: {[[ack()]], [[guid()]], [{pid(), any()}]}, + on_sync :: {[[ack()]], [[guid()]], [fun (() -> any())]}, msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, persistent_store :: pid() | atom(), persistent_count :: non_neg_integer(), @@ -552,7 +552,7 @@ tx_rollback(Txn, State = #vqstate { persistent_store = PersistentStore }) -> ok = rabbit_msg_store:remove(PersistentStore, persistent_guids(Pubs)), {lists:flatten(AckTags), State}. -tx_commit(Txn, From, State = #vqstate { persistent_store = PersistentStore }) -> +tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore }) -> %% If we are a non-durable queue, or we have no persistent pubs, %% we can skip the msg_store loop. #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), @@ -566,12 +566,12 @@ tx_commit(Txn, From, State = #vqstate { persistent_store = PersistentStore }) -> ?TRANSIENT_MSG_STORE == PersistentStore of true -> tx_commit_post_msg_store( - IsTransientPubs, PubsOrdered, AckTags1, From, State); + IsTransientPubs, PubsOrdered, AckTags1, Fun, State); false -> ok = rabbit_msg_store:sync( ?PERSISTENT_MSG_STORE, PersistentGuids, msg_store_callback(PersistentGuids, IsTransientPubs, - PubsOrdered, AckTags1, From)), + PubsOrdered, AckTags1, Fun)), State end}. @@ -881,7 +881,7 @@ should_force_index_to_disk(State = %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, From) -> +msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, Fun) -> Self = self(), fun() -> spawn( @@ -895,14 +895,14 @@ msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, From) -> Self, fun (StateN) -> tx_commit_post_msg_store( IsTransientPubs, Pubs, - AckTags, From, StateN) + AckTags, Fun, StateN) end) end) end) end. -tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, From, State = - #vqstate { on_sync = OnSync = {SAcks, SPubs, SFroms}, +tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, Fun, State = + #vqstate { on_sync = OnSync = {SAcks, SPubs, SFuns}, persistent_store = PersistentStore, pending_ack = PA }) -> %% If we are a non-durable queue, or (no persisent pubs, and no @@ -918,16 +918,16 @@ tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, From, State = (_AckTag, false) -> false end, true, AckTags)) of true -> State1 = tx_commit_index(State #vqstate { - on_sync = {[], [Pubs], [From]} }), + on_sync = {[], [Pubs], [Fun]} }), State1 #vqstate { on_sync = OnSync }; false -> State #vqstate { on_sync = { [AckTags | SAcks], [Pubs | SPubs], - [From | SFroms] }} + [Fun | SFuns] }} end. tx_commit_index(State = #vqstate { on_sync = {_, _, []} }) -> State; -tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, +tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns}, persistent_store = PersistentStore }) -> Acks = lists:flatten(SAcks), State1 = ack(Acks, State), @@ -946,7 +946,7 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, end, {Acks, State1}, Pubs), IndexState1 = rabbit_queue_index:sync_seq_ids(SeqIds, IndexState), - [ gen_server2:reply(From, ok) || From <- lists:reverse(SFroms) ], + [ Fun() || Fun <- lists:reverse(SFuns) ], State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }. delete1(_PersistentStore, _TransientThreshold, NextSeqId, Count, DeltaSeqId, |
