diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-21 17:18:44 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-21 17:18:44 +0100 |
| commit | 05b140a3c0820ae62a4201bceb60cd364a3a4eb0 (patch) | |
| tree | 68e48d6bf1ced866ecc068e8d09052394f0d7d4f /src | |
| parent | 69bc3e33a6840e66ef539441d6845024d30d0260 (diff) | |
| download | rabbitmq-server-git-05b140a3c0820ae62a4201bceb60cd364a3a4eb0.tar.gz | |
Stop persistent msgs in non-durable queues from reaching disk
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 45 |
1 files changed, 29 insertions, 16 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index cc876b5e6a..b10703b4ac 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -192,7 +192,8 @@ persistent_store, persistent_count, transient_threshold, - pending_ack + pending_ack, + durable }). -record(msg_status, @@ -268,7 +269,8 @@ persistent_store :: pid() | atom(), persistent_count :: non_neg_integer(), transient_threshold :: non_neg_integer(), - pending_ack :: dict() + pending_ack :: dict(), + durable :: boolean() }). -include("rabbit_backing_queue_spec.hrl"). @@ -360,7 +362,8 @@ init(QueueName, IsDurable, _Recover) -> persistent_store = PersistentStore, persistent_count = DeltaCount1, transient_threshold = NextSeqId, - pending_ack = dict:new() + pending_ack = dict:new(), + durable = IsDurable }, maybe_deltas_to_betas(State). @@ -433,9 +436,11 @@ publish_delivered(true, Msg = #basic_message { guid = Guid, msg_store_clients = MSCState, persistent_store = PersistentStore, persistent_count = PCount, - pending_ack = PA }) -> + pending_ack = PA, + durable = IsDurable }) -> MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, + msg = Msg, guid = Guid, seq_id = SeqId, + is_persistent = IsDurable andalso IsPersistent, is_delivered = true, msg_on_disk = false, index_on_disk = false }, {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState), @@ -575,7 +580,8 @@ ack(AckTags, State = #vqstate { index_state = IndexState, tx_publish(Txn, Msg = #basic_message { is_persistent = true, guid = Guid }, State = #vqstate { msg_store_clients = MSCState, - persistent_store = PersistentStore }) -> + persistent_store = PersistentStore, + durable = true }) -> MsgStatus = #msg_status { msg = Msg, guid = Guid, seq_id = undefined, is_persistent = true, is_delivered = false, msg_on_disk = false, index_on_disk = false }, @@ -591,13 +597,19 @@ tx_ack(Txn, AckTags, State) -> ack_in_tx(Txn, AckTags), State. -tx_rollback(Txn, State = #vqstate { persistent_store = PersistentStore }) -> +tx_rollback(Txn, State = #vqstate { persistent_store = PersistentStore, + durable = IsDurable }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), - ok = rabbit_msg_store:remove(PersistentStore, persistent_guids(Pubs)), + ok = case IsDurable of + true -> rabbit_msg_store:remove(PersistentStore, + persistent_guids(Pubs)); + false -> ok + end, {lists:flatten(AckTags), State}. -tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore }) -> +tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore, + durable = IsDurable }) -> %% If we are a non-durable queue, or we have no persistent pubs, %% we can skip the msg_store loop. #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), @@ -605,7 +617,7 @@ tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore }) -> PubsOrdered = lists:reverse(Pubs), AckTags1 = lists:flatten(AckTags), PersistentGuids = persistent_guids(PubsOrdered), - IsTransientPubs = [] == PersistentGuids, + IsTransientPubs = (not IsDurable) orelse [] == PersistentGuids, {AckTags1, case IsTransientPubs orelse ?TRANSIENT_MSG_STORE == PersistentStore of @@ -965,18 +977,18 @@ tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, Fun, State = tx_commit_index(State = #vqstate { on_sync = {_, _, []} }) -> State; tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns}, - persistent_store = PersistentStore }) -> + durable = IsDurable }) -> Acks = lists:flatten(SAcks), State1 = ack(Acks, State), - IsPersistentStore = ?PERSISTENT_MSG_STORE == PersistentStore, Pubs = lists:flatten(lists:reverse(SPubs)), {SeqIds, State2 = #vqstate { index_state = IndexState }} = lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent }, {SeqIdsAcc, StateN}) -> {SeqId, StateN1} = - publish(Msg, false, IsPersistent, StateN), - {case IsPersistentStore andalso IsPersistent of + publish(Msg, false, IsDurable andalso IsPersistent, + StateN), + {case IsDurable andalso IsPersistent of true -> [SeqId | SeqIdsAcc]; false -> SeqIdsAcc end, StateN1} @@ -1172,9 +1184,10 @@ test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount, publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, IsDelivered, MsgOnDisk, State = #vqstate { next_seq_id = SeqId, len = Len, in_counter = InCount, - persistent_count = PCount }) -> + persistent_count = PCount, durable = IsDurable }) -> MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, + msg = Msg, guid = Guid, seq_id = SeqId, + is_persistent = IsDurable andalso IsPersistent, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = false }, PCount1 = PCount + case IsPersistent of |
