summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2016-01-20 17:35:52 +0300
committerMichael Klishin <michael@clojurewerkz.org>2016-01-20 17:35:52 +0300
commit1f9c3492a793f0dfba6be7b84727bf850f9ba1e4 (patch)
tree6e2bb11803645829a3cbad556d38eca74a34003e /src
parent3d1f30e4824f7ba50576617ab3e966509d6d46c4 (diff)
parent8021fef20327c90faa35ceebd23bab1ddc3c9708 (diff)
downloadrabbitmq-server-git-1f9c3492a793f0dfba6be7b84727bf850f9ba1e4.tar.gz
Merge branch 'stable'
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl52
1 files changed, 20 insertions, 32 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 099c181e03..981d8e74ff 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -306,24 +306,9 @@ delete_and_terminate(State) ->
State1.
pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint,
- State = #qistate{unconfirmed = UC,
- unconfirmed_msg = UCM,
- pre_publish_cache = PPC,
+ State = #qistate{pre_publish_cache = PPC,
delivered_cache = DC}) ->
- MsgId = case MsgOrId of
- #basic_message{id = Id} -> Id;
- Id when is_binary(Id) -> Id
- end,
- ?MSG_ID_BYTES = size(MsgId),
-
- State1 =
- case {MsgProps#message_properties.needs_confirming, MsgOrId} of
- {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC),
- State#qistate{unconfirmed = UC1};
- {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM),
- State#qistate{unconfirmed_msg = UCM1};
- {false, _} -> State
- end,
+ State1 = maybe_needs_confirming(MsgProps, MsgOrId, State),
{Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps),
@@ -377,23 +362,10 @@ flush_delivered_cache(State = #qistate{delivered_cache = DC}) ->
State1 = deliver(lists:reverse(DC), State),
State1#qistate{delivered_cache = []}.
-publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint,
- State = #qistate{unconfirmed = UC,
- unconfirmed_msg = UCM}) ->
- MsgId = case MsgOrId of
- #basic_message{id = Id} -> Id;
- Id when is_binary(Id) -> Id
- end,
- ?MSG_ID_BYTES = size(MsgId),
+publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint, State) ->
{JournalHdl, State1} =
get_journal_handle(
- case {MsgProps#message_properties.needs_confirming, MsgOrId} of
- {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC),
- State#qistate{unconfirmed = UC1};
- {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM),
- State#qistate{unconfirmed_msg = UCM1};
- {false, _} -> State
- end),
+ maybe_needs_confirming(MsgProps, MsgOrId, State)),
file_handle_cache_stats:update(queue_index_journal_write),
{Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps),
ok = file_handle_cache:append(
@@ -407,6 +379,22 @@ publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint,
JournalSizeHint,
add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1)).
+maybe_needs_confirming(MsgProps, MsgOrId,
+ State = #qistate{unconfirmed = UC,
+ unconfirmed_msg = UCM}) ->
+ MsgId = case MsgOrId of
+ #basic_message{id = Id} -> Id;
+ Id when is_binary(Id) -> Id
+ end,
+ ?MSG_ID_BYTES = size(MsgId),
+ case {MsgProps#message_properties.needs_confirming, MsgOrId} of
+ {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC),
+ State#qistate{unconfirmed = UC1};
+ {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM),
+ State#qistate{unconfirmed_msg = UCM1};
+ {false, _} -> State
+ end.
+
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).