diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2015-09-01 16:06:43 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2015-09-01 16:06:43 +0300 |
| commit | 758fd677eb347fce88451150d34ba24663ecb02b (patch) | |
| tree | e75ecdc394742cccbb8dc4e1d4573aebebd05767 /src | |
| parent | 09af3f6fd57fa065d3b14789edebb4d36cdcca62 (diff) | |
| parent | 5b71eb3d3207f30dff02c3b5fd976058eed7fbdc (diff) | |
| download | rabbitmq-server-git-758fd677eb347fce88451150d34ba24663ecb02b.tar.gz | |
Merge branch 'stable'
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 39 |
1 files changed, 23 insertions, 16 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 2224a74b59..859dc6051c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -273,6 +273,7 @@ msg_store_clients, durable, transient_threshold, + qi_embed_msgs_below, len, %% w/o unacked bytes, %% w/o unacked @@ -370,6 +371,7 @@ {any(), binary()}}, durable :: boolean(), transient_threshold :: non_neg_integer(), + qi_embed_msgs_below :: non_neg_integer(), len :: non_neg_integer(), bytes :: non_neg_integer(), @@ -569,12 +571,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, IsDelivered, _ChPid, _Flow, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, - next_seq_id = SeqId, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC }) -> + qi_embed_msgs_below = IndexMaxSize, + next_seq_id = SeqId, + in_counter = InCount, + durable = IsDurable, + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps), + MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case ?QUEUE:is_empty(Q3) of false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; @@ -593,13 +596,14 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, _ChPid, _Flow, - State = #vqstate { next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC }) -> + State = #vqstate { qi_embed_msgs_below = IndexMaxSize, + next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + durable = IsDurable, + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps), + MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), @@ -1029,7 +1033,7 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set; gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). msg_status(IsPersistent, IsDelivered, SeqId, - Msg = #basic_message {id = MsgId}, MsgProps) -> + Msg = #basic_message {id = MsgId}, MsgProps, IndexMaxSize) -> #msg_status{seq_id = SeqId, msg_id = MsgId, msg = Msg, @@ -1037,7 +1041,7 @@ msg_status(IsPersistent, IsDelivered, SeqId, is_delivered = IsDelivered, msg_in_store = false, index_on_disk = false, - persist_to = determine_persist_to(Msg, MsgProps), + persist_to = determine_persist_to(Msg, MsgProps, IndexMaxSize), msg_props = MsgProps}. beta_msg_status({Msg = #basic_message{id = MsgId}, @@ -1198,6 +1202,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, Now = time_compat:monotonic_time(), IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size, ?IO_BATCH_SIZE), + + {ok, IndexMaxSize} = application:get_env( + rabbit, queue_index_embed_msgs_below), State = #vqstate { q1 = ?QUEUE:new(), q2 = ?QUEUE:new(), @@ -1212,6 +1219,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, msg_store_clients = {PersistentClient, TransientClient}, durable = IsDurable, transient_threshold = NextSeqId, + qi_embed_msgs_below = IndexMaxSize, len = DeltaCount1, persistent_count = DeltaCount1, @@ -1469,9 +1477,8 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> determine_persist_to(#basic_message{ content = #content{properties = Props, properties_bin = PropsBin}}, - #message_properties{size = BodySize}) -> - {ok, IndexMaxSize} = application:get_env( - rabbit, queue_index_embed_msgs_below), + #message_properties{size = BodySize}, + IndexMaxSize) -> %% The >= is so that you can set the env to 0 and never persist %% to the index. %% |
