summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-10-10 17:54:14 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-10-10 17:54:14 +0200
commit8f2bc0848376f15a745e3a9879b3a47504f440db (patch)
tree29ee93269b1360119ab324a2bc862287968d4d5d
parenta3490dc3acd3a624f0cc51ece84ebcd65fb9235e (diff)
downloadrabbitmq-server-git-8f2bc0848376f15a745e3a9879b3a47504f440db.tar.gz
publish/6 for lazy queues
-rw-r--r--src/rabbit_variable_queue.erl30
1 files changed, 29 insertions, 1 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 317eeaf710..65e829521b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -567,6 +567,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
IsDelivered, _ChPid, _Flow,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ mode = default,
qi_embed_msgs_below = IndexMaxSize,
next_seq_id = SeqId,
in_counter = InCount,
@@ -585,7 +586,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
State2#vqstate{ next_seq_id = SeqId + 1,
in_counter = InCount1,
unconfirmed = UC1 }),
- a(reduce_memory_use(maybe_update_rates(State3))).
+ a(reduce_memory_use(maybe_update_rates(State3)));
+publish(Msg, MsgProps, IsDelivered, _ChPid, _Flow,
+ State = #vqstate { mode = lazy }) ->
+ State1 = lazy_publish(Msg, MsgProps, IsDelivered, _ChPid, _Flow,
+ fun maybe_write_to_disk/4,
+ State),
+ a(reduce_memory_use(maybe_update_rates(State1))).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
@@ -1389,10 +1396,12 @@ stats(Signs, Statuses, State) ->
stats0(expand_signs(Signs), expand_statuses(Statuses), State).
expand_signs(ready0) -> {0, 0, true};
+expand_signs(dormant) -> {1, 0, true};
expand_signs({A, B}) -> {A, B, false}.
expand_statuses({none, A}) -> {false, msg_in_ram(A), A};
expand_statuses({B, none}) -> {msg_in_ram(B), false, B};
+expand_statuses({dormant, A}) -> {false , false, A};
expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}.
%% In this function at least, we are religious: the variable name
@@ -1680,6 +1689,25 @@ process_delivers_and_acks_fun(_) ->
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
+lazy_publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
+ MsgProps = #message_properties { needs_confirming = NeedsConfirming },
+ IsDelivered, _ChPid, _Flow, PersistFun,
+ State = #vqstate { qi_embed_msgs_below = IndexMaxSize,
+ next_seq_id = SeqId,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC,
+ delta = Delta }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
+ {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ Delta1 = expand_delta(SeqId, Delta),
+ stats(dormant, {dormant, m(MsgStatus1)},
+ State1#vqstate{ delta = Delta1,
+ next_seq_id = SeqId + 1,
+ in_counter = InCount + 1,
+ unconfirmed = UC1 }).
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_in_store = true }, State) ->