diff options
| author | Michael Klishin <michael@novemberain.com> | 2016-01-20 17:34:33 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@novemberain.com> | 2016-01-20 17:34:33 +0300 |
| commit | 8021fef20327c90faa35ceebd23bab1ddc3c9708 (patch) | |
| tree | 78136018628012ef6248e38d016d793aeaa88b29 /src | |
| parent | afd30fb0a1739ab8d6c027416ce9ea146a02eb2c (diff) | |
| parent | 7a0b50ce24afa79659ae90bb0883da9ae5d1d135 (diff) | |
| download | rabbitmq-server-git-8021fef20327c90faa35ceebd23bab1ddc3c9708.tar.gz | |
Merge pull request #566 from rabbitmq/rabbitmq-server-319
Remove duplicate code in pre_publish and publish functions
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 52 |
1 files changed, 20 insertions, 32 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 099c181e03..981d8e74ff 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -306,24 +306,9 @@ delete_and_terminate(State) -> State1. pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint, - State = #qistate{unconfirmed = UC, - unconfirmed_msg = UCM, - pre_publish_cache = PPC, + State = #qistate{pre_publish_cache = PPC, delivered_cache = DC}) -> - MsgId = case MsgOrId of - #basic_message{id = Id} -> Id; - Id when is_binary(Id) -> Id - end, - ?MSG_ID_BYTES = size(MsgId), - - State1 = - case {MsgProps#message_properties.needs_confirming, MsgOrId} of - {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC), - State#qistate{unconfirmed = UC1}; - {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM), - State#qistate{unconfirmed_msg = UCM1}; - {false, _} -> State - end, + State1 = maybe_needs_confirming(MsgProps, MsgOrId, State), {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps), @@ -377,23 +362,10 @@ flush_delivered_cache(State = #qistate{delivered_cache = DC}) -> State1 = deliver(lists:reverse(DC), State), State1#qistate{delivered_cache = []}. -publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint, - State = #qistate{unconfirmed = UC, - unconfirmed_msg = UCM}) -> - MsgId = case MsgOrId of - #basic_message{id = Id} -> Id; - Id when is_binary(Id) -> Id - end, - ?MSG_ID_BYTES = size(MsgId), +publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint, State) -> {JournalHdl, State1} = get_journal_handle( - case {MsgProps#message_properties.needs_confirming, MsgOrId} of - {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC), - State#qistate{unconfirmed = UC1}; - {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM), - State#qistate{unconfirmed_msg = UCM1}; - {false, _} -> State - end), + maybe_needs_confirming(MsgProps, MsgOrId, State)), file_handle_cache_stats:update(queue_index_journal_write), {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps), ok = file_handle_cache:append( @@ -407,6 +379,22 @@ publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint, JournalSizeHint, add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1)). +maybe_needs_confirming(MsgProps, MsgOrId, + State = #qistate{unconfirmed = UC, + unconfirmed_msg = UCM}) -> + MsgId = case MsgOrId of + #basic_message{id = Id} -> Id; + Id when is_binary(Id) -> Id + end, + ?MSG_ID_BYTES = size(MsgId), + case {MsgProps#message_properties.needs_confirming, MsgOrId} of + {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC), + State#qistate{unconfirmed = UC1}; + {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM), + State#qistate{unconfirmed_msg = UCM1}; + {false, _} -> State + end. + deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). |
