diff options
| -rw-r--r-- | src/rabbit_queue_index.erl | 52 |
1 files changed, 20 insertions, 32 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 099c181e03..981d8e74ff 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -306,24 +306,9 @@ delete_and_terminate(State) -> State1. pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint, - State = #qistate{unconfirmed = UC, - unconfirmed_msg = UCM, - pre_publish_cache = PPC, + State = #qistate{pre_publish_cache = PPC, delivered_cache = DC}) -> - MsgId = case MsgOrId of - #basic_message{id = Id} -> Id; - Id when is_binary(Id) -> Id - end, - ?MSG_ID_BYTES = size(MsgId), - - State1 = - case {MsgProps#message_properties.needs_confirming, MsgOrId} of - {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC), - State#qistate{unconfirmed = UC1}; - {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM), - State#qistate{unconfirmed_msg = UCM1}; - {false, _} -> State - end, + State1 = maybe_needs_confirming(MsgProps, MsgOrId, State), {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps), @@ -377,23 +362,10 @@ flush_delivered_cache(State = #qistate{delivered_cache = DC}) -> State1 = deliver(lists:reverse(DC), State), State1#qistate{delivered_cache = []}. -publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint, - State = #qistate{unconfirmed = UC, - unconfirmed_msg = UCM}) -> - MsgId = case MsgOrId of - #basic_message{id = Id} -> Id; - Id when is_binary(Id) -> Id - end, - ?MSG_ID_BYTES = size(MsgId), +publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint, State) -> {JournalHdl, State1} = get_journal_handle( - case {MsgProps#message_properties.needs_confirming, MsgOrId} of - {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC), - State#qistate{unconfirmed = UC1}; - {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM), - State#qistate{unconfirmed_msg = UCM1}; - {false, _} -> State - end), + maybe_needs_confirming(MsgProps, MsgOrId, State)), file_handle_cache_stats:update(queue_index_journal_write), {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps), ok = file_handle_cache:append( @@ -407,6 +379,22 @@ publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint, JournalSizeHint, add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1)). +maybe_needs_confirming(MsgProps, MsgOrId, + State = #qistate{unconfirmed = UC, + unconfirmed_msg = UCM}) -> + MsgId = case MsgOrId of + #basic_message{id = Id} -> Id; + Id when is_binary(Id) -> Id + end, + ?MSG_ID_BYTES = size(MsgId), + case {MsgProps#message_properties.needs_confirming, MsgOrId} of + {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC), + State#qistate{unconfirmed = UC1}; + {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM), + State#qistate{unconfirmed_msg = UCM1}; + {false, _} -> State + end. + deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). |
