diff options
| author | Michael Klishin <michael@novemberain.com> | 2015-09-01 16:03:42 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@novemberain.com> | 2015-09-01 16:03:42 +0300 |
| commit | 5b71eb3d3207f30dff02c3b5fd976058eed7fbdc (patch) | |
| tree | 31dbff65ee5b4b290b6394fa3fadc1fd3d660f67 /src | |
| parent | 7b2dabafb1ce73c47e0f8b1c1e1d775ca51bb3e3 (diff) | |
| parent | 725b3c7e955af5c6ae4a79add0b2f50f03a556e2 (diff) | |
| download | rabbitmq-server-git-5b71eb3d3207f30dff02c3b5fd976058eed7fbdc.tar.gz | |
Merge pull request #292 from rabbitmq/rabbitmq-server-291
fetches IndexMaxSize at BQ init time
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 39 |
1 files changed, 23 insertions, 16 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 4ccd9757e0..ff1d15952f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -273,6 +273,7 @@ msg_store_clients, durable, transient_threshold, + qi_embed_msgs_below, len, %% w/o unacked bytes, %% w/o unacked @@ -371,6 +372,7 @@ {any(), binary()}}, durable :: boolean(), transient_threshold :: non_neg_integer(), + qi_embed_msgs_below :: non_neg_integer(), len :: non_neg_integer(), bytes :: non_neg_integer(), @@ -570,12 +572,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, IsDelivered, _ChPid, _Flow, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, - next_seq_id = SeqId, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC }) -> + qi_embed_msgs_below = IndexMaxSize, + next_seq_id = SeqId, + in_counter = InCount, + durable = IsDurable, + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps), + MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case ?QUEUE:is_empty(Q3) of false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; @@ -594,13 +597,14 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, _ChPid, _Flow, - State = #vqstate { next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC }) -> + State = #vqstate { qi_embed_msgs_below = IndexMaxSize, + next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + durable = IsDurable, + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps), + MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), @@ -968,7 +972,7 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set; gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). msg_status(IsPersistent, IsDelivered, SeqId, - Msg = #basic_message {id = MsgId}, MsgProps) -> + Msg = #basic_message {id = MsgId}, MsgProps, IndexMaxSize) -> #msg_status{seq_id = SeqId, msg_id = MsgId, msg = Msg, @@ -976,7 +980,7 @@ msg_status(IsPersistent, IsDelivered, SeqId, is_delivered = IsDelivered, msg_in_store = false, index_on_disk = false, - persist_to = determine_persist_to(Msg, MsgProps), + persist_to = determine_persist_to(Msg, MsgProps, IndexMaxSize), msg_props = MsgProps}. beta_msg_status({Msg = #basic_message{id = MsgId}, @@ -1137,6 +1141,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, Now = now(), IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size, ?IO_BATCH_SIZE), + + {ok, IndexMaxSize} = application:get_env( + rabbit, queue_index_embed_msgs_below), State = #vqstate { q1 = ?QUEUE:new(), q2 = ?QUEUE:new(), @@ -1151,6 +1158,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, msg_store_clients = {PersistentClient, TransientClient}, durable = IsDurable, transient_threshold = NextSeqId, + qi_embed_msgs_below = IndexMaxSize, len = DeltaCount1, persistent_count = DeltaCount1, @@ -1408,9 +1416,8 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> determine_persist_to(#basic_message{ content = #content{properties = Props, properties_bin = PropsBin}}, - #message_properties{size = BodySize}) -> - {ok, IndexMaxSize} = application:get_env( - rabbit, queue_index_embed_msgs_below), + #message_properties{size = BodySize}, + IndexMaxSize) -> %% The >= is so that you can set the env to 0 and never persist %% to the index. %% |
