summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-04-02 12:26:28 +0100
committerDiana Corbacho <diana@rabbitmq.com>2019-04-03 16:32:32 +0100
commit19f8dec83bfa899cf7d43e532ca8a52c73213a97 (patch)
tree0876af93cb80b793c44c57110a33e48e55e5b68a
parent74aac975655b4d419ac53c911768048234edbb9a (diff)
downloadrabbitmq-server-git-19f8dec83bfa899cf7d43e532ca8a52c73213a97.tar.gz
Fix in-memory counters
[#164735591]
-rw-r--r--src/rabbit_fifo.erl113
-rw-r--r--test/quorum_queue_SUITE.erl77
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl71
3 files changed, 198 insertions, 63 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 0f957b4a19..d397459d4f 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -897,17 +897,20 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
{FullMsg = {_MsgId, {RaftIdxToDrop, {#{size := Bytes} = Header, Msg}}},
State1} ->
Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0),
- State1 = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}),
+ State2 = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}),
State = case Msg of
- 'empty' -> subtract_in_memory_counts(Header, State1);
- _ -> State1
+ 'empty' -> State2;
+ _ -> subtract_in_memory_counts(Header, State2)
end,
Effects = dead_letter_effects(maxlen, #{none => FullMsg},
State, Effects0),
{State, Effects};
- {{'$prefix_msg', #{size := Bytes}}, State1} ->
- State = add_bytes_drop(Bytes, State1),
- {State, Effects0};
+ {{'$prefix_msg', #{size := Bytes} = Header}, State1} ->
+ State2 = subtract_in_memory_counts(Header, add_bytes_drop(Bytes, State1)),
+ {State2, Effects0};
+ {{'$empty_msg', #{size := Bytes}}, State1} ->
+ State2 = add_bytes_drop(Bytes, State1),
+ {State2, Effects0};
empty ->
{State0, Effects0}
end.
@@ -926,10 +929,10 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
end,
State = add_bytes_enqueue(Size, State1),
State#?MODULE{messages = Messages#{NextMsgNum => Msg},
- % this is probably only done to record it when low_msg_num
- % is undefined
- low_msg_num = min(LowMsgNum, NextMsgNum),
- next_msg_num = NextMsgNum + 1}.
+ %% this is probably only done to record it when low_msg_num
+ %% is undefined
+ low_msg_num = min(LowMsgNum, NextMsgNum),
+ next_msg_num = NextMsgNum + 1}.
append_to_master_index(RaftIdx,
#?MODULE{ra_indexes = Indexes0} = State0) ->
@@ -1016,6 +1019,9 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
fun({'$prefix_msg', _} = Msg, {S0, E0}) ->
return_one(0, Msg, S0, E0,
ConsumerId, Con);
+ ({'$empty_msg', _} = Msg, {S0, E0}) ->
+ return_one(0, Msg, S0, E0,
+ ConsumerId, Con);
({MsgNum, Msg}, {S0, E0}) ->
return_one(MsgNum, Msg, S0, E0,
ConsumerId, Con)
@@ -1062,6 +1068,8 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) ->
add_bytes_settle(Header, Acc);
({'$prefix_msg', Header}, Acc) ->
+ add_bytes_settle(Header, Acc);
+ ({'$empty_msg', Header}, Acc) ->
add_bytes_settle(Header, Acc)
end, State0, maps:values(Discarded)),
%% need to pass the length of discarded as $prefix_msgs would be filtered
@@ -1128,14 +1136,14 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
{Potential, Cursors0}
end.
-return_one(0, {'$prefix_msg', Header0},
+return_one(0, {Tag, Header0},
#?MODULE{returns = Returns,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
- Effects0, ConsumerId, Con) ->
+ Effects0, ConsumerId, Con) when Tag == '$prefix_msg'; Tag == '$empty_msg'->
Header = maps:update_with(delivery_count,
fun (C) -> C+1 end,
1, Header0),
- Msg = {'$prefix_msg', Header},
+ Msg = {Tag, Header},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
Checked = Con#consumer.checked_out,
@@ -1144,8 +1152,12 @@ return_one(0, {'$prefix_msg', Header0},
{add_bytes_settle(Header, State1), Effects};
_ ->
%% this should not affect the release cursor in any way
- {add_bytes_return(Msg,
- State0#?MODULE{returns = lqueue:in(Msg, Returns)}),
+ State1 = case Tag of
+ '$empty_msg' -> State0;
+ _ -> add_in_memory_counts(maps:get(size, Header), State0)
+ end,
+ {add_bytes_return(Header,
+ State1#?MODULE{returns = lqueue:in(Msg, Returns)}),
Effects0}
end;
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
@@ -1165,16 +1177,16 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
Checked = Con#consumer.checked_out,
{State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked,
Effects, State0),
- State2 = case RawMsg of
- 'empty' -> State1;
- _ -> add_in_memory_counts(maps:get(size, Header), State1)
- end,
- {add_bytes_settle(Header, State2), Effects1};
+ {add_bytes_settle(Header, State1), Effects1};
_ ->
+ State1 = case RawMsg of
+ 'empty' -> State0;
+ _ -> add_in_memory_counts(maps:get(size, Header), State0)
+ end,
%% this should not affect the release cursor in any way
- {add_bytes_return(RawMsg,
- State0#?MODULE{returns =
- lqueue:in({MsgNum, Msg}, Returns)}),
+ {add_bytes_return(Header,
+ State1#?MODULE{returns =
+ lqueue:in({MsgNum, Msg}, Returns)}),
Effects0}
end.
@@ -1184,6 +1196,8 @@ return_all(State0, Checked0, Effects0, ConsumerId, Consumer) ->
Checked = lists:sort(maps:to_list(Checked0)),
lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) ->
return_one(0, Msg, S, E, ConsumerId, Consumer);
+ ({_, {'$empty_msg', _} = Msg}, {S, E}) ->
+ return_one(0, Msg, S, E, ConsumerId, Consumer);
({_, {MsgNum, Msg}}, {S, E}) ->
return_one(MsgNum, Msg, S, E, ConsumerId, Consumer)
end, {State0, Effects0}, Checked).
@@ -1269,6 +1283,9 @@ append_log_effects(Effects0, AccMap) ->
%%
%% When we return it is always done to the current return queue
%% for both prefix messages and current messages
+take_next_msg(#?MODULE{prefix_msgs = {[{'$empty_msg', _} = Msg | Rem], P}} = State) ->
+ %% there are prefix returns, these should be served first
+ {Msg, State#?MODULE{prefix_msgs = {Rem, P}}};
take_next_msg(#?MODULE{prefix_msgs = {[Header | Rem], P}} = State) ->
%% there are prefix returns, these should be served first
{{'$prefix_msg', Header},
@@ -1301,10 +1318,16 @@ take_next_msg(#?MODULE{returns = Returns,
end
end;
empty ->
- [Header | Rem] = P,
- %% There are prefix msgs
- {{'$prefix_msg', Header},
- State#?MODULE{prefix_msgs = {R, Rem}}}
+ [Msg | Rem] = P,
+ case Msg of
+ {Header, 'empty'} ->
+ %% There are prefix msgs
+ {{'$empty_msg', Header},
+ State#?MODULE{prefix_msgs = {R, Rem}}};
+ Header ->
+ {{'$prefix_msg', Header},
+ State#?MODULE{prefix_msgs = {R, Rem}}}
+ end
end.
send_msg_effect({CTag, CPid}, Msgs) ->
@@ -1360,16 +1383,20 @@ checkout_one(#?MODULE{service_queue = SQ0,
consumers = Cons},
{State, Msg} =
case ConsumerMsg of
- {'$prefix_msg', _} ->
- {add_bytes_checkout(ConsumerMsg, State1),
+ {'$prefix_msg', Header} ->
+ {subtract_in_memory_counts(
+ Header, add_bytes_checkout(Header, State1)),
+ ConsumerMsg};
+ {'$empty_msg', Header} ->
+ {add_bytes_checkout(Header, State1),
ConsumerMsg};
{_, {_, {Header, 'empty'}} = M} ->
- {add_bytes_checkout(maps:get(size, Header), State1),
+ {add_bytes_checkout(Header, State1),
M};
- {_, {_, {Header, RawMsg} = M}} ->
+ {_, {_, {Header, _} = M}} ->
{subtract_in_memory_counts(
Header,
- add_bytes_checkout(RawMsg, State1)),
+ add_bytes_checkout(Header, State1)),
M}
end,
{success, ConsumerId, Next, Msg, State};
@@ -1483,13 +1510,19 @@ dehydrate_state(#?MODULE{messages = Messages,
%% TODO: optimise this function as far as possible
PrefRet = lists:foldl(fun ({'$prefix_msg', Header}, Acc) ->
[Header | Acc];
+ ({'$empty_msg', _} = Msg, Acc) ->
+ [Msg | Acc];
+ ({_, {_, {Header, 'empty'}}}, Acc) ->
+ [{'$empty_msg', Header} | Acc];
({_, {_, {Header, _}}}, Acc) ->
[Header | Acc]
end,
lists:reverse(PrefRet0),
lqueue:to_list(Returns)),
- PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {Header, _}}}, Acc) ->
- [Header| Acc]
+ PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_, 'empty'} = Msg}}, Acc) ->
+ [Msg | Acc];
+ ({_, {_RaftIdx, {Header, _}}}, Acc) ->
+ [Header | Acc]
end,
lists:reverse(PrefMsg0),
lists:sort(maps:to_list(Messages))),
@@ -1509,6 +1542,10 @@ dehydrate_state(#?MODULE{messages = Messages,
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
M;
+ (_, {'$empty_msg', _} = M) ->
+ M;
+ (_, {_, {_, {Header, 'empty'}}}) ->
+ {'$empty_msg', Header};
(_, {_, {_, {Header, _}}}) ->
{'$prefix_msg', Header}
end, Checked0),
@@ -1569,18 +1606,16 @@ add_bytes_enqueue(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
add_bytes_drop(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}.
-add_bytes_checkout(Msg, #?MODULE{msg_bytes_checkout = Checkout,
+add_bytes_checkout(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue } = State) ->
- Bytes = message_size(Msg),
State#?MODULE{msg_bytes_checkout = Checkout + Bytes,
msg_bytes_enqueue = Enqueue - Bytes}.
add_bytes_settle(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout} = State) ->
State#?MODULE{msg_bytes_checkout = Checkout - Bytes}.
-add_bytes_return(Msg, #?MODULE{msg_bytes_checkout = Checkout,
+add_bytes_return(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue} = State) ->
- Bytes = message_size(Msg),
State#?MODULE{msg_bytes_checkout = Checkout - Bytes,
msg_bytes_enqueue = Enqueue + Bytes}.
@@ -1600,6 +1635,8 @@ message_size(#basic_message{content = Content}) ->
iolist_size(PFR);
message_size({'$prefix_msg', #{size := B}}) ->
B;
+message_size({'$empty_msg', #{size := B}}) ->
+ B;
message_size(B) when is_binary(B) ->
byte_size(B);
message_size(Msg) ->
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 531c5a5aed..31d6a4c21b 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -120,7 +120,8 @@ all_tests() ->
queue_length_in_memory_limit,
queue_length_in_memory_bytes_limit_basic_get,
queue_length_in_memory_bytes_limit_subscribe,
- queue_length_in_memory_bytes_limit
+ queue_length_in_memory_bytes_limit,
+ in_memory
].
memory_tests() ->
@@ -1819,7 +1820,7 @@ queue_length_limit_drop_head(Config) ->
no_ack = true})).
queue_length_in_memory_limit_basic_get(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -1841,7 +1842,7 @@ queue_length_in_memory_limit_basic_get(Config) ->
wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
?assertEqual([{1, byte_size(Msg1)}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
@@ -1851,7 +1852,7 @@ queue_length_in_memory_limit_basic_get(Config) ->
no_ack = true})).
queue_length_in_memory_limit_subscribe(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -1867,7 +1868,7 @@ queue_length_in_memory_limit_subscribe(Config) ->
wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
?assertEqual([{1, byte_size(Msg1)}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
subscribe(Ch, QQ, false),
receive
@@ -1878,7 +1879,7 @@ queue_length_in_memory_limit_subscribe(Config) ->
multiple = false})
end,
?assertEqual([{0, 0}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
receive
{#'basic.deliver'{delivery_tag = DeliveryTag2,
redelivered = false},
@@ -1888,7 +1889,7 @@ queue_length_in_memory_limit_subscribe(Config) ->
end.
queue_length_in_memory_limit(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -1908,8 +1909,8 @@ queue_length_in_memory_limit(Config) ->
wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
-
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = true})),
@@ -1919,10 +1920,10 @@ queue_length_in_memory_limit(Config) ->
wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
queue_length_in_memory_bytes_limit_basic_get(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -1944,7 +1945,7 @@ queue_length_in_memory_bytes_limit_basic_get(Config) ->
wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
?assertEqual([{1, byte_size(Msg1)}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
@@ -1954,7 +1955,7 @@ queue_length_in_memory_bytes_limit_basic_get(Config) ->
no_ack = true})).
queue_length_in_memory_bytes_limit_subscribe(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -1970,7 +1971,7 @@ queue_length_in_memory_bytes_limit_subscribe(Config) ->
wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
?assertEqual([{1, byte_size(Msg1)}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
subscribe(Ch, QQ, false),
receive
@@ -1981,7 +1982,7 @@ queue_length_in_memory_bytes_limit_subscribe(Config) ->
multiple = false})
end,
?assertEqual([{0, 0}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
receive
{#'basic.deliver'{delivery_tag = DeliveryTag2,
redelivered = false},
@@ -1991,7 +1992,7 @@ queue_length_in_memory_bytes_limit_subscribe(Config) ->
end.
queue_length_in_memory_bytes_limit(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -2011,7 +2012,7 @@ queue_length_in_memory_bytes_limit(Config) ->
wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
@@ -2022,7 +2023,47 @@ queue_length_in_memory_bytes_limit(Config) ->
wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+
+in_memory(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ Msg2 = <<"msg11">>,
+
+ publish(Ch, QQ, Msg1),
+
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
+ ?assertEqual([{1, byte_size(Msg1)}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ subscribe(Ch, QQ, false),
+
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
+ ?assertEqual([{0, 0}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ publish(Ch, QQ, Msg2),
+
+ wait_for_messages(Config, [[QQ, <<"2">>, <<"0">>, <<"2">>]]),
+ ?assertEqual([{0, 0}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, #amqp_msg{}} ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false})
+ end,
+
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
+ ?assertEqual([{0, 0}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
%%----------------------------------------------------------------------------
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 76b8d7e715..58e6c2452c 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -38,7 +38,10 @@ all_tests() ->
scenario14,
scenario15,
scenario16,
- scenario17
+ scenario17,
+ scenario18,
+ scenario19,
+ scenario20
].
groups() ->
@@ -293,15 +296,65 @@ scenario17(_Config) ->
}, Commands),
ok.
+scenario18(_Config) ->
+ E = c:pid(0,176,1),
+ Commands = [make_enqueue(E,1,<<"1">>),
+ make_enqueue(E,2,<<"2">>),
+ make_enqueue(E,3,<<"3">>),
+ make_enqueue(E,4,<<"4">>),
+ make_enqueue(E,5,<<"5">>)
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ %% max_length => 3,
+ max_in_memory_length => 1}, Commands),
+ ok.
+
+scenario19(_Config) ->
+ C1Pid = c:pid(0,883,1),
+ C1 = {<<>>, C1Pid},
+ E = c:pid(0,176,1),
+ Commands = [make_enqueue(E,1,<<"1">>),
+ make_enqueue(E,2,<<"2">>),
+ make_checkout(C1, {auto,2,simple_prefetch}),
+ make_enqueue(E,3,<<"3">>),
+ make_settle(C1, [0, 1])
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_in_memory_bytes => 370,
+ max_in_memory_length => 1}, Commands),
+ ok.
+
+scenario20(_Config) ->
+ C1Pid = c:pid(0,883,1),
+ C1 = {<<>>, C1Pid},
+ E = c:pid(0,176,1),
+ Commands = [make_enqueue(E,1,<<>>),
+ make_enqueue(E,2,<<>>),
+ make_checkout(C1, {auto,2,simple_prefetch}),
+ {down, C1Pid, noconnection},
+ make_enqueue(E,3,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>),
+ make_enqueue(E,4,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>),
+ make_enqueue(E,5,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>),
+ make_enqueue(E,6,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>),
+ make_enqueue(E,7,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0>>)
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_bytes => 97,
+ max_in_memory_length => 1}, Commands),
+ ok.
+
snapshots(_Config) ->
run_proper(
fun () ->
- ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit},
- frequency([{10, {0, 0, false, 0}},
+ ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength,
+ InMemoryBytes},
+ frequency([{10, {0, 0, false, 0, 0, 0}},
{5, {oneof([range(1, 10), undefined]),
oneof([range(1, 1000), undefined]),
boolean(),
- oneof([range(1, 3), undefined])
+ oneof([range(1, 3), undefined]),
+ oneof([range(1, 10), undefined]),
+ oneof([range(1, 1000), undefined])
}}]),
?FORALL(O, ?LET(Ops, log_gen(250), expand(Ops)),
collect({log_size, length(O)},
@@ -310,15 +363,19 @@ snapshots(_Config) ->
Length,
Bytes,
SingleActiveConsumer,
- DeliveryLimit), O))))
+ DeliveryLimit,
+ InMemoryLength,
+ InMemoryBytes), O))))
end, [], 2500).
-config(Name, Length, Bytes, SingleActive, DeliveryLimit) ->
+config(Name, Length, Bytes, SingleActive, DeliveryLimit, InMemoryLength, InMemoryBytes) ->
#{name => Name,
max_length => map_max(Length),
max_bytes => map_max(Bytes),
single_active_consumer_on => SingleActive,
- delivery_limit => map_max(DeliveryLimit)}.
+ delivery_limit => map_max(DeliveryLimit),
+ max_in_memory_length => map_max(InMemoryLength),
+ max_in_memory_bytes => map_max(InMemoryBytes)}.
map_max(0) -> undefined;
map_max(N) -> N.