diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-12-03 16:44:13 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-12-03 16:44:13 +0000 |
| commit | 6c8b49e5ee88710f5180c6149c7182376f112562 (patch) | |
| tree | d250b8ff1368a80fa5fa67cd07c904abed8468e1 /src | |
| parent | 13138e9a7c9a8ef29ce1ed092d3cd5289684a3af (diff) | |
| download | rabbitmq-server-git-6c8b49e5ee88710f5180c6149c7182376f112562.tar.gz | |
Rename msg_on_disk to msg_in_store and fix up its semantics. We now don't need to guard msg_store read and remove as we should just naturally not need to do them.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 51 |
1 files changed, 24 insertions, 27 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 41c556522e..21c955db32 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -285,7 +285,7 @@ msg, is_persistent, is_delivered, - msg_on_disk, + msg_in_store, index_on_disk, msg_props }). @@ -664,7 +664,7 @@ ack([], State) -> ack([SeqId], State) -> {#msg_status { msg_id = MsgId, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk }, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, @@ -674,7 +674,7 @@ ack([SeqId], State) -> true -> rabbit_queue_index:ack([SeqId], IndexState); false -> IndexState end, - case MsgOnDisk of + case MsgInStore of true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); false -> ok end, @@ -935,12 +935,10 @@ d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End }) m(MsgStatus = #msg_status { msg = Msg, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk }) -> true = (not IsPersistent) or IndexOnDisk, - true = (not IndexOnDisk) or MsgOnDisk, - true = (Msg =/= undefined) or MsgOnDisk, - + true = (Msg =/= undefined) or MsgInStore, MsgStatus. one_if(true ) -> 1; @@ -959,27 +957,28 @@ msg_status(IsPersistent, IsDelivered, SeqId, msg = Msg, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = false, + msg_in_store = false, index_on_disk = false, msg_props = MsgProps}. beta_msg_status({Msg = #basic_message{id = MsgId}, SeqId, MsgProps, IsPersistent, IsDelivered}) -> MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), - MS0#msg_status{msg_id = MsgId, - msg = Msg}; + MS0#msg_status{msg_id = MsgId, + msg = Msg, + msg_in_store = false}; beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) -> MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), - MS0#msg_status{msg_id = MsgId, - msg = undefined}. + MS0#msg_status{msg_id = MsgId, + msg = undefined, + msg_in_store = true}. beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) -> #msg_status{seq_id = SeqId, msg = undefined, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = true, index_on_disk = true, msg_props = MsgProps}. @@ -1012,21 +1011,21 @@ msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( MSCState, IsPersistent, fun (MSCState1) -> - ok %% rabbit_msg_store:write_flow(MsgId, Msg, MSCState1) + rabbit_msg_store:write_flow(MsgId, Msg, MSCState1) end). msg_store_read(MSCState, IsPersistent, MsgId) -> with_msg_store_state( MSCState, IsPersistent, fun (MSCState1) -> - exit(nah) %% rabbit_msg_store:read(MsgId, MSCState1) + rabbit_msg_store:read(MsgId, MSCState1) end). msg_store_remove(MSCState, IsPersistent, MsgIds) -> with_immutable_msg_store_state( MSCState, IsPersistent, fun (MCSState1) -> - ok %% rabbit_msg_store:remove(MsgIds, MCSState1) + rabbit_msg_store:remove(MsgIds, MCSState1) end). msg_store_close_fds(MSCState, IsPersistent) -> @@ -1214,7 +1213,7 @@ remove(AckRequired, MsgStatus = #msg_status { msg = Msg, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk }, State = #vqstate {ram_msg_count = RamMsgCount, out_counter = OutCount, @@ -1232,7 +1231,7 @@ remove(AckRequired, MsgStatus = #msg_status { ok = msg_store_remove(MSCState, IsPersistent, [MsgId]) end, Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, - IndexState2 = case {AckRequired, MsgOnDisk, IndexOnDisk} of + IndexState2 = case {AckRequired, MsgInStore, IndexOnDisk} of {false, true, false} -> Rem(), IndexState1; {false, true, true} -> Rem(), Ack(); _ -> IndexState1 @@ -1294,11 +1293,11 @@ remove_queue_entries(Q, {RamBytes, PCount, PBytes}, remove_queue_entries1( #msg_status { msg_id = MsgId, seq_id = SeqId, msg = Msg, - is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, + is_delivered = IsDelivered, msg_in_store = MsgInStore, index_on_disk = IndexOnDisk, is_persistent = IsPersistent, msg_props = #message_properties { size = Size } }, {MsgIdsByStore, RamBytes, PBytes, Delivers, Acks}) -> - {case MsgOnDisk of + {case MsgInStore of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, @@ -1312,7 +1311,7 @@ remove_queue_entries1( %%---------------------------------------------------------------------------- maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { - msg_on_disk = true }, _MSCState) -> + msg_in_store = true }, _MSCState) -> MsgStatus; maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { msg = Msg, msg_id = MsgId, @@ -1323,13 +1322,12 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { %% content = rabbit_binary_parser:clear_decoded_content( %% Msg #basic_message.content)}, %% ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1), - MsgStatus #msg_status { msg_on_disk = true }; + MsgStatus; %% #msg_status { msg_on_disk = true }; maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) -> MsgStatus. maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { index_on_disk = true }, IndexState) -> - true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { msg = Msg, @@ -1339,7 +1337,6 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status { is_delivered = IsDelivered, msg_props = MsgProps}, IndexState) when Force orelse IsPersistent -> - true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION Msg1 = Msg #basic_message { %% don't persist any recoverable decoded properties content = rabbit_binary_parser:clear_decoded_content( @@ -1431,11 +1428,11 @@ accumulate_ack_init() -> {[], orddict:new(), []}. accumulate_ack(#msg_status { seq_id = SeqId, msg_id = MsgId, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk }, {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc), - case MsgOnDisk of + case MsgInStore of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, @@ -1835,7 +1832,7 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> {empty, _Q} -> {Quota, State}; {{value, MsgStatus}, Qa} -> - {MsgStatus1 = #msg_status { msg_on_disk = true }, + {MsgStatus1, State1 = #vqstate { ram_msg_count = RamMsgCount }} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), |
