summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_fifo.erl66
-rw-r--r--src/rabbit_fifo.hrl11
-rw-r--r--src/rabbit_fifo_v0.erl18
3 files changed, 69 insertions, 26 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 4982f008c2..b62d38f67e 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -124,7 +124,8 @@ init(#{name := Name,
update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
- SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
+ RCI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
+ Overflow = maps:get(overflow_strategy, Conf, drop_head),
MaxLength = maps:get(max_length, Conf, undefined),
MaxBytes = maps:get(max_bytes, Conf, undefined),
MaxMemoryLength = maps:get(max_in_memory_length, Conf, undefined),
@@ -137,16 +138,17 @@ update_config(Conf, State) ->
competing
end,
Cfg = State#?MODULE.cfg,
- RCI = case State#?MODULE.cfg of
+ RCISpec = case State#?MODULE.cfg of
#cfg{release_cursor_interval = undefined} ->
- {SHI, SHI};
+ {RCI, RCI};
#cfg{release_cursor_interval = {_, C}} ->
- {SHI, C}
+ {RCI, C}
end,
- State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCI,
+ State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCISpec,
dead_letter_handler = DLH,
become_leader_handler = BLH,
+ overflow_strategy = Overflow,
max_length = MaxLength,
max_bytes = MaxBytes,
max_in_memory_length = MaxMemoryLength,
@@ -464,14 +466,51 @@ apply(_, #purge_nodes{nodes = Nodes}, State0) ->
{State, ok, Effects};
apply(Meta, #update_config{config = Conf}, State) ->
checkout(Meta, update_config(Conf, State), []);
-apply(_Meta, {machine_version, 0, 1}, V0State0) ->
+apply(_Meta, {machine_version, 0, 1}, V0State) ->
+ State = convert_v0_to_v1(V0State),
+ {State, ok, []}.
+
+convert_v0_to_v1(V0State0) ->
V0State = rabbit_fifo_v0:normalize_for_v1(V0State0),
- %% quick hack to "convert" the state from version one
- State = setelement(1, V0State, ?MODULE),
V0Msgs = rabbit_fifo_v0:get_field(messages, V0State),
V1Msgs = lqueue:from_list(lists:sort(maps:to_list(V0Msgs))),
- %% TODOD each release cursor needs converting too!
- {State#?MODULE{messages = V1Msgs}, ok, []}.
+ Cfg = #cfg{name = rabbit_fifo_v0:get_cfg_field(name, V0State),
+ resource = rabbit_fifo_v0:get_cfg_field(resource, V0State),
+ release_cursor_interval = rabbit_fifo_v0:get_cfg_field(release_cursor_interval, V0State),
+ dead_letter_handler = rabbit_fifo_v0:get_cfg_field(dead_letter_handler, V0State),
+ become_leader_handler = rabbit_fifo_v0:get_cfg_field(become_leader_handler, V0State),
+ %% TODO: what if policy enabling reject_publish was applied before conversion?
+ overflow_strategy = drop_head,
+ max_length = rabbit_fifo_v0:get_cfg_field(max_length, V0State),
+ max_bytes = rabbit_fifo_v0:get_cfg_field(max_bytes, V0State),
+ consumer_strategy = rabbit_fifo_v0:get_cfg_field(consumer_strategy, V0State),
+ delivery_limit = rabbit_fifo_v0:get_cfg_field(delivery_limit, V0State),
+ max_in_memory_length = rabbit_fifo_v0:get_cfg_field(max_in_memory_length, V0State),
+ max_in_memory_bytes = rabbit_fifo_v0:get_cfg_field(max_in_memory_bytes, V0State)
+ },
+
+ V0Cursors = rabbit_fifo_v0:get_field(release_cursors, V0State),
+ Cursors = lqueue:from_list(
+ [{I, convert_v0_to_v1(C)} ||
+ {I, C} <- lqueue:to_list(V0Cursors)]),
+ %% need to covert each release cursor
+ #?MODULE{cfg = Cfg,
+ messages = V1Msgs,
+ next_msg_num = rabbit_fifo_v0:get_field(next_msg_num, V0State),
+ returns = rabbit_fifo_v0:get_field(returns, V0State),
+ enqueue_count = rabbit_fifo_v0:get_field(enqueue_count, V0State),
+ enqueuers = rabbit_fifo_v0:get_field(enqueuers, V0State),
+ ra_indexes = rabbit_fifo_v0:get_field(ra_indexes, V0State),
+ release_cursors = Cursors,
+ consumers = rabbit_fifo_v0:get_field(consumers, V0State),
+ service_queue = rabbit_fifo_v0:get_field(service_queue, V0State),
+ prefix_msgs = rabbit_fifo_v0:get_field(prefix_msgs, V0State),
+ msg_bytes_enqueue = rabbit_fifo_v0:get_field(msg_bytes_enqueue, V0State),
+ msg_bytes_checkout = rabbit_fifo_v0:get_field(msg_bytes_checkout, V0State),
+ waiting_consumers = rabbit_fifo_v0:get_field(waiting_consumers, V0State),
+ msg_bytes_in_memory = rabbit_fifo_v0:get_field(msg_bytes_in_memory, V0State),
+ msgs_ready_in_memory = rabbit_fifo_v0:get_field(msgs_ready_in_memory, V0State)
+ }.
purge_node(Node, State, Effects) ->
lists:foldl(fun(Pid, {S0, E0}) ->
@@ -1022,9 +1061,6 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
end,
State = add_bytes_enqueue(Header, State1),
State#?MODULE{messages = lqueue:in({NextMsgNum, Msg}, Messages),
- %% this is probably only done to record it when low_msg_num
- %% is undefined
- % low_msg_num = min(LowMsgNum, NextMsgNum),
next_msg_num = NextMsgNum + 1}.
append_to_master_index(RaftIdx,
@@ -1654,8 +1690,6 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
dehydrate_state(#?MODULE{messages = Messages,
consumers = Consumers,
returns = Returns,
- % low_msg_num = Low,
- % next_msg_num = Next,
prefix_msgs = {PRCnt, PrefRet0, PPCnt, PrefMsg0},
waiting_consumers = Waiting0} = State) ->
RCnt = lqueue:len(Returns),
@@ -1799,7 +1833,7 @@ add_bytes_settle(#{size := Bytes}, State) ->
add_bytes_return(Bytes,
#?MODULE{msg_bytes_checkout = Checkout,
- msg_bytes_enqueue = Enqueue} = State)
+ msg_bytes_enqueue = Enqueue} = State)
when is_integer(Bytes) ->
State#?MODULE{msg_bytes_checkout = Checkout - Bytes,
msg_bytes_enqueue = Enqueue + Bytes};
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index ec93f480dd..e337540c35 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -104,7 +104,12 @@
{next_seqno = 1 :: msg_seqno(),
% out of order enqueues - sorted list
pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}],
- status = up :: up | suspected_down
+ status = up :: up |
+ suspected_down |
+ %% it is useful to have a record of when this was blocked
+ %% so that we can retry sending the block effect if
+ %% the publisher did not receive the initial one
+ {blocked, At :: ra:index()}
}).
-record(cfg,
@@ -113,6 +118,7 @@
release_cursor_interval :: option({non_neg_integer(), non_neg_integer()}),
dead_letter_handler :: option(applied_mfa()),
become_leader_handler :: option(applied_mfa()),
+ overflow_strategy = drop_head :: drop_head | reject_publish,
max_length :: option(non_neg_integer()),
max_bytes :: option(non_neg_integer()),
%% whether single active consumer is on or not for this queue
@@ -131,9 +137,6 @@
{cfg :: #cfg{},
% unassigned messages
messages = lqueue:new() :: lqueue:queue(),
- % defines the lowest message in id available in the messages map
- % that isn't a return
- low_msg_num :: option(msg_in_id()),
% defines the next message in id to be added to the messages map
next_msg_num = 1 :: msg_in_id(),
% list of returned msg_in_ids - when checking out it picks from
diff --git a/src/rabbit_fifo_v0.erl b/src/rabbit_fifo_v0.erl
index ad261317c9..95f665d0a9 100644
--- a/src/rabbit_fifo_v0.erl
+++ b/src/rabbit_fifo_v0.erl
@@ -56,6 +56,7 @@
normalize_for_v1/1,
%% getters for coversions
get_field/2,
+ get_cfg_field/2,
%% protocol helpers
make_enqueue/3,
@@ -611,12 +612,12 @@ tick(_Ts, #?STATE{cfg = #cfg{name = Name,
-spec overview(state()) -> map().
overview(#?STATE{consumers = Cons,
- enqueuers = Enqs,
- release_cursors = Cursors,
- enqueue_count = EnqCount,
- msg_bytes_enqueue = EnqueueBytes,
- msg_bytes_checkout = CheckoutBytes,
- cfg = Cfg} = State) ->
+ enqueuers = Enqs,
+ release_cursors = Cursors,
+ enqueue_count = EnqCount,
+ msg_bytes_enqueue = EnqueueBytes,
+ msg_bytes_checkout = CheckoutBytes,
+ cfg = Cfg} = State) ->
Conf = #{name => Cfg#cfg.name,
resource => Cfg#cfg.resource,
release_cursor_interval => Cfg#cfg.release_cursor_interval,
@@ -1776,6 +1777,11 @@ get_field(Field, State) ->
Index = record_index_of(Field, Fields),
element(Index, State).
+get_cfg_field(Field, #?STATE{cfg = Cfg} ) ->
+ Fields = record_info(fields, cfg),
+ Index = record_index_of(Field, Fields),
+ element(Index, Cfg).
+
record_index_of(F, Fields) ->
index_of(2, F, Fields).