summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-12-03 16:44:13 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-12-03 16:44:13 +0000
commit6c8b49e5ee88710f5180c6149c7182376f112562 (patch)
treed250b8ff1368a80fa5fa67cd07c904abed8468e1 /src
parent13138e9a7c9a8ef29ce1ed092d3cd5289684a3af (diff)
downloadrabbitmq-server-git-6c8b49e5ee88710f5180c6149c7182376f112562.tar.gz
Rename msg_on_disk to msg_in_store and fix up its semantics. We now don't need to guard msg_store read and remove as we should just naturally not need to do them.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl51
1 files changed, 24 insertions, 27 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 41c556522e..21c955db32 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -285,7 +285,7 @@
msg,
is_persistent,
is_delivered,
- msg_on_disk,
+ msg_in_store,
index_on_disk,
msg_props
}).
@@ -664,7 +664,7 @@ ack([], State) ->
ack([SeqId], State) ->
{#msg_status { msg_id = MsgId,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk,
+ msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk },
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
@@ -674,7 +674,7 @@ ack([SeqId], State) ->
true -> rabbit_queue_index:ack([SeqId], IndexState);
false -> IndexState
end,
- case MsgOnDisk of
+ case MsgInStore of
true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
false -> ok
end,
@@ -935,12 +935,10 @@ d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End })
m(MsgStatus = #msg_status { msg = Msg,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk,
+ msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk }) ->
true = (not IsPersistent) or IndexOnDisk,
- true = (not IndexOnDisk) or MsgOnDisk,
- true = (Msg =/= undefined) or MsgOnDisk,
-
+ true = (Msg =/= undefined) or MsgInStore,
MsgStatus.
one_if(true ) -> 1;
@@ -959,27 +957,28 @@ msg_status(IsPersistent, IsDelivered, SeqId,
msg = Msg,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
- msg_on_disk = false,
+ msg_in_store = false,
index_on_disk = false,
msg_props = MsgProps}.
beta_msg_status({Msg = #basic_message{id = MsgId},
SeqId, MsgProps, IsPersistent, IsDelivered}) ->
MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
- MS0#msg_status{msg_id = MsgId,
- msg = Msg};
+ MS0#msg_status{msg_id = MsgId,
+ msg = Msg,
+ msg_in_store = false};
beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
- MS0#msg_status{msg_id = MsgId,
- msg = undefined}.
+ MS0#msg_status{msg_id = MsgId,
+ msg = undefined,
+ msg_in_store = true}.
beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) ->
#msg_status{seq_id = SeqId,
msg = undefined,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
- msg_on_disk = true,
index_on_disk = true,
msg_props = MsgProps}.
@@ -1012,21 +1011,21 @@ msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
fun (MSCState1) ->
- ok %% rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
+ rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
end).
msg_store_read(MSCState, IsPersistent, MsgId) ->
with_msg_store_state(
MSCState, IsPersistent,
fun (MSCState1) ->
- exit(nah) %% rabbit_msg_store:read(MsgId, MSCState1)
+ rabbit_msg_store:read(MsgId, MSCState1)
end).
msg_store_remove(MSCState, IsPersistent, MsgIds) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
fun (MCSState1) ->
- ok %% rabbit_msg_store:remove(MsgIds, MCSState1)
+ rabbit_msg_store:remove(MsgIds, MCSState1)
end).
msg_store_close_fds(MSCState, IsPersistent) ->
@@ -1214,7 +1213,7 @@ remove(AckRequired, MsgStatus = #msg_status {
msg = Msg,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk,
+ msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk },
State = #vqstate {ram_msg_count = RamMsgCount,
out_counter = OutCount,
@@ -1232,7 +1231,7 @@ remove(AckRequired, MsgStatus = #msg_status {
ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
end,
Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
- IndexState2 = case {AckRequired, MsgOnDisk, IndexOnDisk} of
+ IndexState2 = case {AckRequired, MsgInStore, IndexOnDisk} of
{false, true, false} -> Rem(), IndexState1;
{false, true, true} -> Rem(), Ack();
_ -> IndexState1
@@ -1294,11 +1293,11 @@ remove_queue_entries(Q, {RamBytes, PCount, PBytes},
remove_queue_entries1(
#msg_status { msg_id = MsgId, seq_id = SeqId, msg = Msg,
- is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
+ is_delivered = IsDelivered, msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent,
msg_props = #message_properties { size = Size } },
{MsgIdsByStore, RamBytes, PBytes, Delivers, Acks}) ->
- {case MsgOnDisk of
+ {case MsgInStore of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
@@ -1312,7 +1311,7 @@ remove_queue_entries1(
%%----------------------------------------------------------------------------
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
- msg_on_disk = true }, _MSCState) ->
+ msg_in_store = true }, _MSCState) ->
MsgStatus;
maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
msg = Msg, msg_id = MsgId,
@@ -1323,13 +1322,12 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
%% content = rabbit_binary_parser:clear_decoded_content(
%% Msg #basic_message.content)},
%% ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1),
- MsgStatus #msg_status { msg_on_disk = true };
+ MsgStatus; %% #msg_status { msg_on_disk = true };
maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) ->
MsgStatus.
maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
index_on_disk = true }, IndexState) ->
- true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
msg = Msg,
@@ -1339,7 +1337,6 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
is_delivered = IsDelivered,
msg_props = MsgProps}, IndexState)
when Force orelse IsPersistent ->
- true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
Msg1 = Msg #basic_message {
%% don't persist any recoverable decoded properties
content = rabbit_binary_parser:clear_decoded_content(
@@ -1431,11 +1428,11 @@ accumulate_ack_init() -> {[], orddict:new(), []}.
accumulate_ack(#msg_status { seq_id = SeqId,
msg_id = MsgId,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk,
+ msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk },
{IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
{cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
- case MsgOnDisk of
+ case MsgInStore of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
@@ -1835,7 +1832,7 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
{empty, _Q} ->
{Quota, State};
{{value, MsgStatus}, Qa} ->
- {MsgStatus1 = #msg_status { msg_on_disk = true },
+ {MsgStatus1,
State1 = #vqstate { ram_msg_count = RamMsgCount }} =
maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),