summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-05-21 17:18:44 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-05-21 17:18:44 +0100
commit05b140a3c0820ae62a4201bceb60cd364a3a4eb0 (patch)
tree68e48d6bf1ced866ecc068e8d09052394f0d7d4f /src
parent69bc3e33a6840e66ef539441d6845024d30d0260 (diff)
downloadrabbitmq-server-git-05b140a3c0820ae62a4201bceb60cd364a3a4eb0.tar.gz
Stop persistent msgs in non-durable queues from reaching disk
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl45
1 files changed, 29 insertions, 16 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index cc876b5e6a..b10703b4ac 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -192,7 +192,8 @@
persistent_store,
persistent_count,
transient_threshold,
- pending_ack
+ pending_ack,
+ durable
}).
-record(msg_status,
@@ -268,7 +269,8 @@
persistent_store :: pid() | atom(),
persistent_count :: non_neg_integer(),
transient_threshold :: non_neg_integer(),
- pending_ack :: dict()
+ pending_ack :: dict(),
+ durable :: boolean()
}).
-include("rabbit_backing_queue_spec.hrl").
@@ -360,7 +362,8 @@ init(QueueName, IsDurable, _Recover) ->
persistent_store = PersistentStore,
persistent_count = DeltaCount1,
transient_threshold = NextSeqId,
- pending_ack = dict:new()
+ pending_ack = dict:new(),
+ durable = IsDurable
},
maybe_deltas_to_betas(State).
@@ -433,9 +436,11 @@ publish_delivered(true, Msg = #basic_message { guid = Guid,
msg_store_clients = MSCState,
persistent_store = PersistentStore,
persistent_count = PCount,
- pending_ack = PA }) ->
+ pending_ack = PA,
+ durable = IsDurable }) ->
MsgStatus = #msg_status {
- msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent,
+ msg = Msg, guid = Guid, seq_id = SeqId,
+ is_persistent = IsDurable andalso IsPersistent,
is_delivered = true, msg_on_disk = false, index_on_disk = false },
{MsgStatus1, MSCState1} = maybe_write_msg_to_disk(PersistentStore, false,
MsgStatus, MSCState),
@@ -575,7 +580,8 @@ ack(AckTags, State = #vqstate { index_state = IndexState,
tx_publish(Txn,
Msg = #basic_message { is_persistent = true, guid = Guid },
State = #vqstate { msg_store_clients = MSCState,
- persistent_store = PersistentStore }) ->
+ persistent_store = PersistentStore,
+ durable = true }) ->
MsgStatus = #msg_status {
msg = Msg, guid = Guid, seq_id = undefined, is_persistent = true,
is_delivered = false, msg_on_disk = false, index_on_disk = false },
@@ -591,13 +597,19 @@ tx_ack(Txn, AckTags, State) ->
ack_in_tx(Txn, AckTags),
State.
-tx_rollback(Txn, State = #vqstate { persistent_store = PersistentStore }) ->
+tx_rollback(Txn, State = #vqstate { persistent_store = PersistentStore,
+ durable = IsDurable }) ->
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
- ok = rabbit_msg_store:remove(PersistentStore, persistent_guids(Pubs)),
+ ok = case IsDurable of
+ true -> rabbit_msg_store:remove(PersistentStore,
+ persistent_guids(Pubs));
+ false -> ok
+ end,
{lists:flatten(AckTags), State}.
-tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore }) ->
+tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore,
+ durable = IsDurable }) ->
%% If we are a non-durable queue, or we have no persistent pubs,
%% we can skip the msg_store loop.
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
@@ -605,7 +617,7 @@ tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore }) ->
PubsOrdered = lists:reverse(Pubs),
AckTags1 = lists:flatten(AckTags),
PersistentGuids = persistent_guids(PubsOrdered),
- IsTransientPubs = [] == PersistentGuids,
+ IsTransientPubs = (not IsDurable) orelse [] == PersistentGuids,
{AckTags1,
case IsTransientPubs orelse
?TRANSIENT_MSG_STORE == PersistentStore of
@@ -965,18 +977,18 @@ tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, Fun, State =
tx_commit_index(State = #vqstate { on_sync = {_, _, []} }) ->
State;
tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns},
- persistent_store = PersistentStore }) ->
+ durable = IsDurable }) ->
Acks = lists:flatten(SAcks),
State1 = ack(Acks, State),
- IsPersistentStore = ?PERSISTENT_MSG_STORE == PersistentStore,
Pubs = lists:flatten(lists:reverse(SPubs)),
{SeqIds, State2 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun (Msg = #basic_message { is_persistent = IsPersistent },
{SeqIdsAcc, StateN}) ->
{SeqId, StateN1} =
- publish(Msg, false, IsPersistent, StateN),
- {case IsPersistentStore andalso IsPersistent of
+ publish(Msg, false, IsDurable andalso IsPersistent,
+ StateN),
+ {case IsDurable andalso IsPersistent of
true -> [SeqId | SeqIdsAcc];
false -> SeqIdsAcc
end, StateN1}
@@ -1172,9 +1184,10 @@ test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount,
publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
IsDelivered, MsgOnDisk, State =
#vqstate { next_seq_id = SeqId, len = Len, in_counter = InCount,
- persistent_count = PCount }) ->
+ persistent_count = PCount, durable = IsDurable }) ->
MsgStatus = #msg_status {
- msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent,
+ msg = Msg, guid = Guid, seq_id = SeqId,
+ is_persistent = IsDurable andalso IsPersistent,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = false },
PCount1 = PCount + case IsPersistent of