summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLoïc Hoguin <essen@ninenines.eu>2016-01-20 14:14:52 +0100
committerLoïc Hoguin <essen@ninenines.eu>2016-01-20 14:15:16 +0100
commit7a0b50ce24afa79659ae90bb0883da9ae5d1d135 (patch)
tree02f01afff10b1f6e7ee369b160a9e089068fe4af /src
parent6f6825a07440f7272f37bbd47dd445de631c9787 (diff)
downloadrabbitmq-server-git-7a0b50ce24afa79659ae90bb0883da9ae5d1d135.tar.gz
Remove duplicate code in pre_publish and publish functions
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl52
1 files changed, 20 insertions, 32 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 099c181e03..981d8e74ff 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -306,24 +306,9 @@ delete_and_terminate(State) ->
State1.
pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint,
- State = #qistate{unconfirmed = UC,
- unconfirmed_msg = UCM,
- pre_publish_cache = PPC,
+ State = #qistate{pre_publish_cache = PPC,
delivered_cache = DC}) ->
- MsgId = case MsgOrId of
- #basic_message{id = Id} -> Id;
- Id when is_binary(Id) -> Id
- end,
- ?MSG_ID_BYTES = size(MsgId),
-
- State1 =
- case {MsgProps#message_properties.needs_confirming, MsgOrId} of
- {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC),
- State#qistate{unconfirmed = UC1};
- {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM),
- State#qistate{unconfirmed_msg = UCM1};
- {false, _} -> State
- end,
+ State1 = maybe_needs_confirming(MsgProps, MsgOrId, State),
{Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps),
@@ -377,23 +362,10 @@ flush_delivered_cache(State = #qistate{delivered_cache = DC}) ->
State1 = deliver(lists:reverse(DC), State),
State1#qistate{delivered_cache = []}.
-publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint,
- State = #qistate{unconfirmed = UC,
- unconfirmed_msg = UCM}) ->
- MsgId = case MsgOrId of
- #basic_message{id = Id} -> Id;
- Id when is_binary(Id) -> Id
- end,
- ?MSG_ID_BYTES = size(MsgId),
+publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint, State) ->
{JournalHdl, State1} =
get_journal_handle(
- case {MsgProps#message_properties.needs_confirming, MsgOrId} of
- {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC),
- State#qistate{unconfirmed = UC1};
- {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM),
- State#qistate{unconfirmed_msg = UCM1};
- {false, _} -> State
- end),
+ maybe_needs_confirming(MsgProps, MsgOrId, State)),
file_handle_cache_stats:update(queue_index_journal_write),
{Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps),
ok = file_handle_cache:append(
@@ -407,6 +379,22 @@ publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint,
JournalSizeHint,
add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1)).
+maybe_needs_confirming(MsgProps, MsgOrId,
+ State = #qistate{unconfirmed = UC,
+ unconfirmed_msg = UCM}) ->
+ MsgId = case MsgOrId of
+ #basic_message{id = Id} -> Id;
+ Id when is_binary(Id) -> Id
+ end,
+ ?MSG_ID_BYTES = size(MsgId),
+ case {MsgProps#message_properties.needs_confirming, MsgOrId} of
+ {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC),
+ State#qistate{unconfirmed = UC1};
+ {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM),
+ State#qistate{unconfirmed_msg = UCM1};
+ {false, _} -> State
+ end.
+
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).