summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-10-13 15:07:03 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-10-13 15:07:03 +0200
commit362fe50320fde390cc50f66cb80915abdc5a7aa3 (patch)
tree3ee9d8f9613198b5b84c170288c57536f70f1937 /src
parentd1dabd10cf8bf2eb8dbb0c753c0de403ed76215e (diff)
downloadrabbitmq-server-git-362fe50320fde390cc50f66cb80915abdc5a7aa3.tar.gz
implements lazy publish delivered
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl43
1 files changed, 36 insertions, 7 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0065c5efba..c08fa95127 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1370,13 +1370,14 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState,
stats(Signs, Statuses, State) ->
stats0(expand_signs(Signs), expand_statuses(Statuses), State).
-expand_signs(ready0) -> {0, 0, true};
-expand_signs(dormant) -> {1, 0, true};
-expand_signs({A, B}) -> {A, B, false}.
+expand_signs(ready0) -> {0, 0, true};
+expand_signs(lazy_pub) -> {1, 0, true};
+expand_signs(lazy_pub_del) -> {0, 1, true};
+expand_signs({A, B}) -> {A, B, false}.
expand_statuses({none, A}) -> {false, msg_in_ram(A), A};
expand_statuses({B, none}) -> {msg_in_ram(B), false, B};
-expand_statuses({dormant, A}) -> {false , false, A};
+expand_statuses({lazy, A}) -> {false , false, A};
expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}.
%% In this function at least, we are religious: the variable name
@@ -1701,9 +1702,9 @@ publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
{MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State),
- UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
Delta1 = expand_delta(SeqId, Delta),
- stats(dormant, {dormant, m(MsgStatus1)},
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ stats(lazy, {lazy, m(MsgStatus1)},
State1#vqstate{ delta = Delta1,
next_seq_id = SeqId + 1,
in_counter = InCount + 1,
@@ -1718,7 +1719,8 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
_ChPid, _Flow, PersistFun,
- State = #vqstate { qi_embed_msgs_below = IndexMaxSize,
+ State = #vqstate { mode = default,
+ qi_embed_msgs_below = IndexMaxSize,
next_seq_id = SeqId,
out_counter = OutCount,
in_counter = InCount,
@@ -1734,8 +1736,35 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
out_counter = OutCount + 1,
in_counter = InCount + 1,
unconfirmed = UC1 }),
+ {SeqId, State3};
+publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
+ id = MsgId },
+ MsgProps = #message_properties {
+ needs_confirming = NeedsConfirming },
+ _ChPid, _Flow, PersistFun,
+ State = #vqstate { mode = lazy,
+ qi_embed_msgs_below = IndexMaxSize,
+ next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC,
+ delta = Delta }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize),
+ {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State),
+ Delta1 = expand_delta(SeqId, Delta),
+ State2 = record_pending_ack(m(MsgStatus1), State1),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ State3 = stats(lazy_pub_del, {lazy, MsgStatus1},
+ State2 #vqstate { delta = Delta1,
+ next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ unconfirmed = UC1 }),
{SeqId, State3}.
+
batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) ->
{SeqId, State1} =
publish_delivered1(Msg, MsgProps, ChPid, Flow,