diff options
| -rw-r--r-- | src/rabbit_tests.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 68 |
2 files changed, 38 insertions, 34 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 00547a2674..435fdfac5d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -41,8 +41,8 @@ -include("rabbit_framing.hrl"). -include_lib("kernel/include/file.hrl"). --define(PERSISTENT_MSG_STORE, msg_store_persistent). --define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(PERSISTENT_MSG_STORE, msg_store_persistent). +-define(TRANSIENT_MSG_STORE, msg_store_transient). test_content_prop_roundtrip(Datum, Binary) -> Types = [element(1, E) || E <- Datum], diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1395c77fe4..1ac5be7d6c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -507,24 +507,24 @@ publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, - State = #vqstate { len = 0, - next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - persistent_count = PCount, - pending_ack = PA, - durable = IsDurable }) -> + State = #vqstate { len = 0, + next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + persistent_count = PCount, + pending_ack = PA, + durable = IsDurable }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), PA1 = record_pending_ack(m(MsgStatus1), PA), PCount1 = PCount + one_if(IsPersistent1), - {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - pending_ack = PA1 })}. + {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + pending_ack = PA1 })}. dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), @@ -575,22 +575,26 @@ read_msg(MsgStatus = #msg_status { msg = undefined, msg_store_clients = MSCState}) -> {{ok, Msg = #basic_message {}}, MSCState1} = read_from_msg_store(MSCState, IsPersistent, Guid), - {MsgStatus #msg_status { msg = Msg }, State #vqstate { ram_msg_count = RamMsgCount + 1, msg_store_clients = MSCState1 }}; read_msg(MsgStatus, State) -> {MsgStatus, State}. -internal_fetch(AckRequired, - MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = SeqId, - is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, - State = #vqstate { - ram_msg_count = RamMsgCount, out_counter = OutCount, - index_state = IndexState, len = Len, - persistent_count = PCount, pending_ack = PA }) -> +internal_fetch(AckRequired, MsgStatus = #msg_status { + seq_id = SeqId, + guid = Guid, + msg = Msg, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }, + State = #vqstate {ram_msg_count = RamMsgCount, + out_counter = OutCount, + index_state = IndexState, + len = Len, + persistent_count = PCount, + pending_ack = PA }) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, @@ -1108,16 +1112,16 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { msg = Msg, guid = Guid, is_persistent = IsPersistent }, MSCState) when Force orelse IsPersistent -> - {ok, MSCState1} = - with_msg_store_state( - MSCState, IsPersistent, - fun (MsgStore, MSCState2) -> - Msg1 = Msg #basic_message { - %% don't persist any recoverable decoded properties - content = rabbit_binary_parser:clear_decoded_content( - Msg #basic_message.content)}, - rabbit_msg_store:write(MsgStore, Guid, Msg1, MSCState2) - end), + Msg1 = Msg #basic_message { + %% don't persist any recoverable decoded properties + content = rabbit_binary_parser:clear_decoded_content( + Msg #basic_message.content)}, + {ok, MSCState1} = with_msg_store_state( + MSCState, IsPersistent, + fun (MsgStore, MSCState2) -> + rabbit_msg_store:write(MsgStore, Guid, Msg1, + MSCState2) + end), {MsgStatus #msg_status { msg_on_disk = true }, MSCState1}; maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) -> {MsgStatus, MSCState}. |
