diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-12 18:05:16 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-12 18:05:16 +0100 |
| commit | ff94f3c3a6bff3d3b2e1bc0d256b9bc2951fe9ce (patch) | |
| tree | 02ec52953a3fd2bd5c89a0fbf89ea078b86dcb9a | |
| parent | a1100da255e7186636085b934570319edb62083d (diff) | |
| download | rabbitmq-server-git-ff94f3c3a6bff3d3b2e1bc0d256b9bc2951fe9ce.tar.gz | |
in preparation for commit, need to be able to indicate in a publish that persistent msgs have already been sent to disk
| -rw-r--r-- | src/rabbit_variable_queue.erl | 84 |
1 files changed, 62 insertions, 22 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c6d01df1af..47f8fec366 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -140,9 +140,13 @@ init(QueueName) -> }, maybe_load_next_segment(State). -publish(Msg, IsDelivered, State = #vqstate { next_seq_id = SeqId, - len = Len }) -> +publish(Msg, IsDelivered, State) -> + publish(Msg, IsDelivered, false, State). + +publish(Msg, IsDelivered, PersistentMsgsAlreadyOnDisk, + State = #vqstate { next_seq_id = SeqId, len = Len }) -> publish(test_keep_msg_in_ram(SeqId, State), Msg, SeqId, IsDelivered, + PersistentMsgsAlreadyOnDisk, State #vqstate { next_seq_id = SeqId + 1, len = Len + 1 }). set_queue_ram_duration_target( @@ -299,17 +303,37 @@ requeue(MsgsWithAckTags, State) -> ack(AckTags, State1). tx_publish(Msg = #basic_message { is_persistent = true }, State) -> - true = maybe_write_msg_to_disk(true, Msg), + true = maybe_write_msg_to_disk(true, false, Msg), + State; +tx_publish(_Msg, State) -> State. tx_rollback(Pubs, State) -> - ok = rabbit_msg_store:remove( - [MsgId || Obj = #basic_message { guid = MsgId } <- Pubs, - Obj #basic_message.is_persistent]), + ok = rabbit_msg_store:remove(persistent_msg_ids(Pubs)), State. +%% tx_commit(Pubs, AckTags, State) -> +%% case persistent_msg_ids(Pubs) of +%% [] -> +%% do_tx_commit(Pubs, AckTags, State); +%% PersistentMsgIds -> +%% ok = rabbit_msg_store:sync( +%% PersistentMsgIds, +%% fun () -> ok = rabbit_amqqueue:tx_commit_callback( +%% self(), Pubs, AckTags) +%% end), +%% State +%% end. + +%% do_tx_commit(Pubs, AckTags, State) -> +%% lists:foldl(fun (Msg, StateN) -> publish(Msg, false, StateN) end, State, Pubs). + %%---------------------------------------------------------------------------- +persistent_msg_ids(Pubs) -> + [MsgId || Obj = #basic_message { guid = MsgId } <- Pubs, + Obj #basic_message.is_persistent]. + delete1(NextSeqId, Count, GammaSeqId, IndexState) when GammaSeqId >= NextSeqId -> {Count, IndexState}; @@ -385,9 +409,11 @@ entry_salient_details(#beta { msg_id = MsgId, seq_id = SeqId, publish(msg, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, - SeqId, IsDelivered, State = #vqstate { index_state = IndexState, - ram_msg_count = RamMsgCount }) -> - MsgOnDisk = maybe_write_msg_to_disk(false, Msg), + SeqId, IsDelivered, PersistentMsgsAlreadyOnDisk, + State = #vqstate { index_state = IndexState, + ram_msg_count = RamMsgCount }) -> + MsgOnDisk = + maybe_write_msg_to_disk(false, PersistentMsgsAlreadyOnDisk, Msg), {IndexOnDisk, IndexState1} = maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId, IsDelivered, IndexState), @@ -399,9 +425,9 @@ publish(msg, Msg = #basic_message { guid = MsgId, publish(index, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, - SeqId, IsDelivered, State = #vqstate { index_state = IndexState, - q1 = Q1 }) -> - true = maybe_write_msg_to_disk(true, Msg), + SeqId, IsDelivered, PersistentMsgsAlreadyOnDisk, + State = #vqstate { index_state = IndexState, q1 = Q1 }) -> + true = maybe_write_msg_to_disk(true, PersistentMsgsAlreadyOnDisk, Msg), {IndexOnDisk, IndexState1} = maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId, IsDelivered, IndexState), @@ -413,10 +439,10 @@ publish(index, Msg = #basic_message { guid = MsgId, publish(neither, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, - SeqId, IsDelivered, + SeqId, IsDelivered, PersistentMsgsAlreadyOnDisk, State = #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2, gamma = Gamma }) -> - true = maybe_write_msg_to_disk(true, Msg), + true = maybe_write_msg_to_disk(true, PersistentMsgsAlreadyOnDisk, Msg), {true, IndexState1} = maybe_write_index_to_disk(true, IsPersistent, MsgId, SeqId, IsDelivered, IndexState), @@ -550,12 +576,29 @@ reduce_memory_use(State = _ -> State1 end. -maybe_write_msg_to_disk(Bool, Msg = #basic_message { - guid = MsgId, is_persistent = IsPersistent }) - when Bool orelse IsPersistent -> +%% Bool PersistentMsgsAlreadyOnDisk IsPersistent | WriteToDisk? +%% -----------------------------------------------+------------- +%% false false false | false 1 +%% false false true | true 2 +%% false true false | false 3 +%% false true true | false 4 +%% true false false | true 5 +%% true false true | true 6 +%% true true false | true 7 +%% true true true | false 8 + +%% (Bool and (not PersistentMsgsAlreadyOnDisk)) or | 5 6 +%% (Bool and (not IsPersistent)) or | 5 7 +%% ((not PersistentMsgsAlreadyOnDisk) and IsPersistent) | 2 6 +maybe_write_msg_to_disk(Bool, PersistentMsgsAlreadyOnDisk, + Msg = #basic_message { guid = MsgId, + is_persistent = IsPersistent }) + when (Bool andalso (not PersistentMsgsAlreadyOnDisk)) orelse + (Bool andalso (not IsPersistent)) orelse + ((not PersistentMsgsAlreadyOnDisk) andalso (IsPersistent)) -> ok = rabbit_msg_store:write(MsgId, ensure_binary_properties(Msg)), true; -maybe_write_msg_to_disk(_Bool, _Msg) -> +maybe_write_msg_to_disk(_Bool, _PersistentMsgsAlreadyOnDisk, _Msg) -> false. maybe_write_index_to_disk(Bool, IsPersistent, MsgId, SeqId, IsDelivered, @@ -656,10 +699,7 @@ maybe_push_alphas_to_betas(Generator, Consumer, Q, State = seq_id = SeqId, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, Qa} -> - true = case MsgOnDisk of - true -> true; - false -> maybe_write_msg_to_disk(true, Msg) - end, + true = maybe_write_msg_to_disk(true, MsgOnDisk, Msg), Beta = #beta { msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, |
