summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-12 18:05:16 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-12 18:05:16 +0100
commitff94f3c3a6bff3d3b2e1bc0d256b9bc2951fe9ce (patch)
tree02ec52953a3fd2bd5c89a0fbf89ea078b86dcb9a
parenta1100da255e7186636085b934570319edb62083d (diff)
downloadrabbitmq-server-git-ff94f3c3a6bff3d3b2e1bc0d256b9bc2951fe9ce.tar.gz
in preparation for commit, need to be able to indicate in a publish that persistent msgs have already been sent to disk
-rw-r--r--src/rabbit_variable_queue.erl84
1 files changed, 62 insertions, 22 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c6d01df1af..47f8fec366 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -140,9 +140,13 @@ init(QueueName) ->
},
maybe_load_next_segment(State).
-publish(Msg, IsDelivered, State = #vqstate { next_seq_id = SeqId,
- len = Len }) ->
+publish(Msg, IsDelivered, State) ->
+ publish(Msg, IsDelivered, false, State).
+
+publish(Msg, IsDelivered, PersistentMsgsAlreadyOnDisk,
+ State = #vqstate { next_seq_id = SeqId, len = Len }) ->
publish(test_keep_msg_in_ram(SeqId, State), Msg, SeqId, IsDelivered,
+ PersistentMsgsAlreadyOnDisk,
State #vqstate { next_seq_id = SeqId + 1, len = Len + 1 }).
set_queue_ram_duration_target(
@@ -299,17 +303,37 @@ requeue(MsgsWithAckTags, State) ->
ack(AckTags, State1).
tx_publish(Msg = #basic_message { is_persistent = true }, State) ->
- true = maybe_write_msg_to_disk(true, Msg),
+ true = maybe_write_msg_to_disk(true, false, Msg),
+ State;
+tx_publish(_Msg, State) ->
State.
tx_rollback(Pubs, State) ->
- ok = rabbit_msg_store:remove(
- [MsgId || Obj = #basic_message { guid = MsgId } <- Pubs,
- Obj #basic_message.is_persistent]),
+ ok = rabbit_msg_store:remove(persistent_msg_ids(Pubs)),
State.
+%% tx_commit(Pubs, AckTags, State) ->
+%% case persistent_msg_ids(Pubs) of
+%% [] ->
+%% do_tx_commit(Pubs, AckTags, State);
+%% PersistentMsgIds ->
+%% ok = rabbit_msg_store:sync(
+%% PersistentMsgIds,
+%% fun () -> ok = rabbit_amqqueue:tx_commit_callback(
+%% self(), Pubs, AckTags)
+%% end),
+%% State
+%% end.
+
+%% do_tx_commit(Pubs, AckTags, State) ->
+%% lists:foldl(fun (Msg, StateN) -> publish(Msg, false, StateN) end, State, Pubs).
+
%%----------------------------------------------------------------------------
+persistent_msg_ids(Pubs) ->
+ [MsgId || Obj = #basic_message { guid = MsgId } <- Pubs,
+ Obj #basic_message.is_persistent].
+
delete1(NextSeqId, Count, GammaSeqId, IndexState)
when GammaSeqId >= NextSeqId ->
{Count, IndexState};
@@ -385,9 +409,11 @@ entry_salient_details(#beta { msg_id = MsgId, seq_id = SeqId,
publish(msg, Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
- SeqId, IsDelivered, State = #vqstate { index_state = IndexState,
- ram_msg_count = RamMsgCount }) ->
- MsgOnDisk = maybe_write_msg_to_disk(false, Msg),
+ SeqId, IsDelivered, PersistentMsgsAlreadyOnDisk,
+ State = #vqstate { index_state = IndexState,
+ ram_msg_count = RamMsgCount }) ->
+ MsgOnDisk =
+ maybe_write_msg_to_disk(false, PersistentMsgsAlreadyOnDisk, Msg),
{IndexOnDisk, IndexState1} =
maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId,
IsDelivered, IndexState),
@@ -399,9 +425,9 @@ publish(msg, Msg = #basic_message { guid = MsgId,
publish(index, Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
- SeqId, IsDelivered, State = #vqstate { index_state = IndexState,
- q1 = Q1 }) ->
- true = maybe_write_msg_to_disk(true, Msg),
+ SeqId, IsDelivered, PersistentMsgsAlreadyOnDisk,
+ State = #vqstate { index_state = IndexState, q1 = Q1 }) ->
+ true = maybe_write_msg_to_disk(true, PersistentMsgsAlreadyOnDisk, Msg),
{IndexOnDisk, IndexState1} =
maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId,
IsDelivered, IndexState),
@@ -413,10 +439,10 @@ publish(index, Msg = #basic_message { guid = MsgId,
publish(neither, Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
- SeqId, IsDelivered,
+ SeqId, IsDelivered, PersistentMsgsAlreadyOnDisk,
State = #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2,
gamma = Gamma }) ->
- true = maybe_write_msg_to_disk(true, Msg),
+ true = maybe_write_msg_to_disk(true, PersistentMsgsAlreadyOnDisk, Msg),
{true, IndexState1} =
maybe_write_index_to_disk(true, IsPersistent, MsgId, SeqId,
IsDelivered, IndexState),
@@ -550,12 +576,29 @@ reduce_memory_use(State =
_ -> State1
end.
-maybe_write_msg_to_disk(Bool, Msg = #basic_message {
- guid = MsgId, is_persistent = IsPersistent })
- when Bool orelse IsPersistent ->
+%% Bool PersistentMsgsAlreadyOnDisk IsPersistent | WriteToDisk?
+%% -----------------------------------------------+-------------
+%% false false false | false 1
+%% false false true | true 2
+%% false true false | false 3
+%% false true true | false 4
+%% true false false | true 5
+%% true false true | true 6
+%% true true false | true 7
+%% true true true | false 8
+
+%% (Bool and (not PersistentMsgsAlreadyOnDisk)) or | 5 6
+%% (Bool and (not IsPersistent)) or | 5 7
+%% ((not PersistentMsgsAlreadyOnDisk) and IsPersistent) | 2 6
+maybe_write_msg_to_disk(Bool, PersistentMsgsAlreadyOnDisk,
+ Msg = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent })
+ when (Bool andalso (not PersistentMsgsAlreadyOnDisk)) orelse
+ (Bool andalso (not IsPersistent)) orelse
+ ((not PersistentMsgsAlreadyOnDisk) andalso (IsPersistent)) ->
ok = rabbit_msg_store:write(MsgId, ensure_binary_properties(Msg)),
true;
-maybe_write_msg_to_disk(_Bool, _Msg) ->
+maybe_write_msg_to_disk(_Bool, _PersistentMsgsAlreadyOnDisk, _Msg) ->
false.
maybe_write_index_to_disk(Bool, IsPersistent, MsgId, SeqId, IsDelivered,
@@ -656,10 +699,7 @@ maybe_push_alphas_to_betas(Generator, Consumer, Q, State =
seq_id = SeqId, is_delivered = IsDelivered,
msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }},
Qa} ->
- true = case MsgOnDisk of
- true -> true;
- false -> maybe_write_msg_to_disk(true, Msg)
- end,
+ true = maybe_write_msg_to_disk(true, MsgOnDisk, Msg),
Beta = #beta { msg_id = MsgId, seq_id = SeqId,
is_persistent = IsPersistent,
is_delivered = IsDelivered,