summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-09-01 11:40:04 +0100
committerAlvaro Videla <videlalvaro@gmail.com>2015-09-01 13:10:19 +0100
commit725b3c7e955af5c6ae4a79add0b2f50f03a556e2 (patch)
treed1fff59fe6170629730e75414d2c5d808a3ec846 /src
parent1085e243a6df514f675b452aa96ffa23b6632ac3 (diff)
downloadrabbitmq-server-git-725b3c7e955af5c6ae4a79add0b2f50f03a556e2.tar.gz
fetches IndexMaxSize at BQ init time
Fixes #291
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl39
1 files changed, 23 insertions, 16 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 4ccd9757e0..ff1d15952f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -273,6 +273,7 @@
msg_store_clients,
durable,
transient_threshold,
+ qi_embed_msgs_below,
len, %% w/o unacked
bytes, %% w/o unacked
@@ -371,6 +372,7 @@
{any(), binary()}},
durable :: boolean(),
transient_threshold :: non_neg_integer(),
+ qi_embed_msgs_below :: non_neg_integer(),
len :: non_neg_integer(),
bytes :: non_neg_integer(),
@@ -570,12 +572,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
IsDelivered, _ChPid, _Flow,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- next_seq_id = SeqId,
- in_counter = InCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
+ qi_embed_msgs_below = IndexMaxSize,
+ next_seq_id = SeqId,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps),
+ MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case ?QUEUE:is_empty(Q3) of
false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
@@ -594,13 +597,14 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
_ChPid, _Flow,
- State = #vqstate { next_seq_id = SeqId,
- out_counter = OutCount,
- in_counter = InCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
+ State = #vqstate { qi_embed_msgs_below = IndexMaxSize,
+ next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps),
+ MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
@@ -968,7 +972,7 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set;
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
msg_status(IsPersistent, IsDelivered, SeqId,
- Msg = #basic_message {id = MsgId}, MsgProps) ->
+ Msg = #basic_message {id = MsgId}, MsgProps, IndexMaxSize) ->
#msg_status{seq_id = SeqId,
msg_id = MsgId,
msg = Msg,
@@ -976,7 +980,7 @@ msg_status(IsPersistent, IsDelivered, SeqId,
is_delivered = IsDelivered,
msg_in_store = false,
index_on_disk = false,
- persist_to = determine_persist_to(Msg, MsgProps),
+ persist_to = determine_persist_to(Msg, MsgProps, IndexMaxSize),
msg_props = MsgProps}.
beta_msg_status({Msg = #basic_message{id = MsgId},
@@ -1137,6 +1141,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
Now = now(),
IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size,
?IO_BATCH_SIZE),
+
+ {ok, IndexMaxSize} = application:get_env(
+ rabbit, queue_index_embed_msgs_below),
State = #vqstate {
q1 = ?QUEUE:new(),
q2 = ?QUEUE:new(),
@@ -1151,6 +1158,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
msg_store_clients = {PersistentClient, TransientClient},
durable = IsDurable,
transient_threshold = NextSeqId,
+ qi_embed_msgs_below = IndexMaxSize,
len = DeltaCount1,
persistent_count = DeltaCount1,
@@ -1408,9 +1416,8 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
determine_persist_to(#basic_message{
content = #content{properties = Props,
properties_bin = PropsBin}},
- #message_properties{size = BodySize}) ->
- {ok, IndexMaxSize} = application:get_env(
- rabbit, queue_index_embed_msgs_below),
+ #message_properties{size = BodySize},
+ IndexMaxSize) ->
%% The >= is so that you can set the env to 0 and never persist
%% to the index.
%%