diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-13 15:07:03 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-13 15:07:03 +0200 |
| commit | 362fe50320fde390cc50f66cb80915abdc5a7aa3 (patch) | |
| tree | 3ee9d8f9613198b5b84c170288c57536f70f1937 /src | |
| parent | d1dabd10cf8bf2eb8dbb0c753c0de403ed76215e (diff) | |
| download | rabbitmq-server-git-362fe50320fde390cc50f66cb80915abdc5a7aa3.tar.gz | |
implements lazy publish delivered
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 43 |
1 files changed, 36 insertions, 7 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0065c5efba..c08fa95127 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1370,13 +1370,14 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState, stats(Signs, Statuses, State) -> stats0(expand_signs(Signs), expand_statuses(Statuses), State). -expand_signs(ready0) -> {0, 0, true}; -expand_signs(dormant) -> {1, 0, true}; -expand_signs({A, B}) -> {A, B, false}. +expand_signs(ready0) -> {0, 0, true}; +expand_signs(lazy_pub) -> {1, 0, true}; +expand_signs(lazy_pub_del) -> {0, 1, true}; +expand_signs({A, B}) -> {A, B, false}. expand_statuses({none, A}) -> {false, msg_in_ram(A), A}; expand_statuses({B, none}) -> {msg_in_ram(B), false, B}; -expand_statuses({dormant, A}) -> {false , false, A}; +expand_statuses({lazy, A}) -> {false , false, A}; expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}. %% In this function at least, we are religious: the variable name @@ -1701,9 +1702,9 @@ publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State), - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), Delta1 = expand_delta(SeqId, Delta), - stats(dormant, {dormant, m(MsgStatus1)}, + UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), + stats(lazy, {lazy, m(MsgStatus1)}, State1#vqstate{ delta = Delta1, next_seq_id = SeqId + 1, in_counter = InCount + 1, @@ -1718,7 +1719,8 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, _ChPid, _Flow, PersistFun, - State = #vqstate { qi_embed_msgs_below = IndexMaxSize, + State = #vqstate { mode = default, + qi_embed_msgs_below = IndexMaxSize, next_seq_id = SeqId, out_counter = OutCount, in_counter = InCount, @@ -1734,8 +1736,35 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, out_counter = OutCount + 1, in_counter = InCount + 1, unconfirmed = UC1 }), + {SeqId, State3}; +publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, + id = MsgId }, + MsgProps = #message_properties { + needs_confirming = NeedsConfirming }, + _ChPid, _Flow, PersistFun, + State = #vqstate { mode = lazy, + qi_embed_msgs_below = IndexMaxSize, + next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + durable = IsDurable, + unconfirmed = UC, + delta = Delta }) -> + IsPersistent1 = IsDurable andalso IsPersistent, + MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), + {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State), + Delta1 = expand_delta(SeqId, Delta), + State2 = record_pending_ack(m(MsgStatus1), State1), + UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), + State3 = stats(lazy_pub_del, {lazy, MsgStatus1}, + State2 #vqstate { delta = Delta1, + next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + unconfirmed = UC1 }), {SeqId, State3}. + batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) -> {SeqId, State1} = publish_delivered1(Msg, MsgProps, ChPid, Flow, |
