summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl28
1 files changed, 13 insertions, 15 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0c9995b978..3a283bbf6e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -431,8 +431,7 @@ publish(Msg, State) ->
publish_delivered(false, _Msg, State = #vqstate { len = 0 }) ->
{blank_ack, State};
-publish_delivered(true, Msg = #basic_message { guid = Guid,
- is_persistent = IsPersistent },
+publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
State = #vqstate { len = 0,
next_seq_id = SeqId,
out_counter = OutCount,
@@ -441,9 +440,8 @@ publish_delivered(true, Msg = #basic_message { guid = Guid,
pending_ack = PA,
durable = IsDurable }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = #msg_status {
- msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent1,
- is_delivered = true, msg_on_disk = false, index_on_disk = false },
+ MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
+ #msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
PA1 = record_pending_ack(MsgStatus1, PA),
PCount1 = maybe_inc(PCount, IsPersistent1),
@@ -532,12 +530,9 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
ack(AckTags, State) ->
ack(fun (_AckEntry, State1) -> State1 end, AckTags, State).
-tx_publish(Txn,
- Msg = #basic_message { is_persistent = true, guid = Guid },
+tx_publish(Txn, Msg = #basic_message { is_persistent = true },
State = #vqstate { msg_store_clients = MSCState, durable = true }) ->
- MsgStatus = #msg_status {
- msg = Msg, guid = Guid, seq_id = undefined, is_persistent = true,
- is_delivered = false, msg_on_disk = false, index_on_disk = false },
+ MsgStatus = msg_status(true, undefined, Msg),
{#msg_status { msg_on_disk = true }, MSCState1} =
maybe_write_msg_to_disk(false, MsgStatus, MSCState),
publish_in_tx(Txn, Msg),
@@ -691,6 +686,11 @@ maybe_inc(N, false) -> N.
maybe_dec(N, true ) -> N - 1;
maybe_dec(N, false) -> N.
+msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) ->
+ #msg_status { seq_id = SeqId, guid = Guid, msg = Msg,
+ is_persistent = IsPersistent, is_delivered = false,
+ msg_on_disk = false, index_on_disk = false }.
+
record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent,
msg_on_disk = MsgOnDisk } = MsgStatus, PA) ->
@@ -1137,7 +1137,7 @@ test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount,
end
end.
-publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
+publish(Msg = #basic_message { is_persistent = IsPersistent },
IsDelivered, MsgOnDisk,
State = #vqstate { next_seq_id = SeqId,
len = Len,
@@ -1145,10 +1145,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
persistent_count = PCount,
durable = IsDurable }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = #msg_status {
- msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent1,
- is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
- index_on_disk = false },
+ MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
+ #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk },
PCount1 = maybe_inc(PCount, IsPersistent1),
{SeqId, publish(test_keep_msg_in_ram(SeqId, State), MsgStatus,
State #vqstate { next_seq_id = SeqId + 1,