diff options
| -rw-r--r-- | src/rabbit_fifo.erl | 66 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 11 | ||||
| -rw-r--r-- | src/rabbit_fifo_v0.erl | 18 |
3 files changed, 69 insertions, 26 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 4982f008c2..b62d38f67e 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -124,7 +124,8 @@ init(#{name := Name, update_config(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), BLH = maps:get(become_leader_handler, Conf, undefined), - SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY), + RCI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY), + Overflow = maps:get(overflow_strategy, Conf, drop_head), MaxLength = maps:get(max_length, Conf, undefined), MaxBytes = maps:get(max_bytes, Conf, undefined), MaxMemoryLength = maps:get(max_in_memory_length, Conf, undefined), @@ -137,16 +138,17 @@ update_config(Conf, State) -> competing end, Cfg = State#?MODULE.cfg, - RCI = case State#?MODULE.cfg of + RCISpec = case State#?MODULE.cfg of #cfg{release_cursor_interval = undefined} -> - {SHI, SHI}; + {RCI, RCI}; #cfg{release_cursor_interval = {_, C}} -> - {SHI, C} + {RCI, C} end, - State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCI, + State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCISpec, dead_letter_handler = DLH, become_leader_handler = BLH, + overflow_strategy = Overflow, max_length = MaxLength, max_bytes = MaxBytes, max_in_memory_length = MaxMemoryLength, @@ -464,14 +466,51 @@ apply(_, #purge_nodes{nodes = Nodes}, State0) -> {State, ok, Effects}; apply(Meta, #update_config{config = Conf}, State) -> checkout(Meta, update_config(Conf, State), []); -apply(_Meta, {machine_version, 0, 1}, V0State0) -> +apply(_Meta, {machine_version, 0, 1}, V0State) -> + State = convert_v0_to_v1(V0State), + {State, ok, []}. + +convert_v0_to_v1(V0State0) -> V0State = rabbit_fifo_v0:normalize_for_v1(V0State0), - %% quick hack to "convert" the state from version one - State = setelement(1, V0State, ?MODULE), V0Msgs = rabbit_fifo_v0:get_field(messages, V0State), V1Msgs = lqueue:from_list(lists:sort(maps:to_list(V0Msgs))), - %% TODOD each release cursor needs converting too! - {State#?MODULE{messages = V1Msgs}, ok, []}. + Cfg = #cfg{name = rabbit_fifo_v0:get_cfg_field(name, V0State), + resource = rabbit_fifo_v0:get_cfg_field(resource, V0State), + release_cursor_interval = rabbit_fifo_v0:get_cfg_field(release_cursor_interval, V0State), + dead_letter_handler = rabbit_fifo_v0:get_cfg_field(dead_letter_handler, V0State), + become_leader_handler = rabbit_fifo_v0:get_cfg_field(become_leader_handler, V0State), + %% TODO: what if policy enabling reject_publish was applied before conversion? + overflow_strategy = drop_head, + max_length = rabbit_fifo_v0:get_cfg_field(max_length, V0State), + max_bytes = rabbit_fifo_v0:get_cfg_field(max_bytes, V0State), + consumer_strategy = rabbit_fifo_v0:get_cfg_field(consumer_strategy, V0State), + delivery_limit = rabbit_fifo_v0:get_cfg_field(delivery_limit, V0State), + max_in_memory_length = rabbit_fifo_v0:get_cfg_field(max_in_memory_length, V0State), + max_in_memory_bytes = rabbit_fifo_v0:get_cfg_field(max_in_memory_bytes, V0State) + }, + + V0Cursors = rabbit_fifo_v0:get_field(release_cursors, V0State), + Cursors = lqueue:from_list( + [{I, convert_v0_to_v1(C)} || + {I, C} <- lqueue:to_list(V0Cursors)]), + %% need to covert each release cursor + #?MODULE{cfg = Cfg, + messages = V1Msgs, + next_msg_num = rabbit_fifo_v0:get_field(next_msg_num, V0State), + returns = rabbit_fifo_v0:get_field(returns, V0State), + enqueue_count = rabbit_fifo_v0:get_field(enqueue_count, V0State), + enqueuers = rabbit_fifo_v0:get_field(enqueuers, V0State), + ra_indexes = rabbit_fifo_v0:get_field(ra_indexes, V0State), + release_cursors = Cursors, + consumers = rabbit_fifo_v0:get_field(consumers, V0State), + service_queue = rabbit_fifo_v0:get_field(service_queue, V0State), + prefix_msgs = rabbit_fifo_v0:get_field(prefix_msgs, V0State), + msg_bytes_enqueue = rabbit_fifo_v0:get_field(msg_bytes_enqueue, V0State), + msg_bytes_checkout = rabbit_fifo_v0:get_field(msg_bytes_checkout, V0State), + waiting_consumers = rabbit_fifo_v0:get_field(waiting_consumers, V0State), + msg_bytes_in_memory = rabbit_fifo_v0:get_field(msg_bytes_in_memory, V0State), + msgs_ready_in_memory = rabbit_fifo_v0:get_field(msgs_ready_in_memory, V0State) + }. purge_node(Node, State, Effects) -> lists:foldl(fun(Pid, {S0, E0}) -> @@ -1022,9 +1061,6 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, end, State = add_bytes_enqueue(Header, State1), State#?MODULE{messages = lqueue:in({NextMsgNum, Msg}, Messages), - %% this is probably only done to record it when low_msg_num - %% is undefined - % low_msg_num = min(LowMsgNum, NextMsgNum), next_msg_num = NextMsgNum + 1}. append_to_master_index(RaftIdx, @@ -1654,8 +1690,6 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, dehydrate_state(#?MODULE{messages = Messages, consumers = Consumers, returns = Returns, - % low_msg_num = Low, - % next_msg_num = Next, prefix_msgs = {PRCnt, PrefRet0, PPCnt, PrefMsg0}, waiting_consumers = Waiting0} = State) -> RCnt = lqueue:len(Returns), @@ -1799,7 +1833,7 @@ add_bytes_settle(#{size := Bytes}, State) -> add_bytes_return(Bytes, #?MODULE{msg_bytes_checkout = Checkout, - msg_bytes_enqueue = Enqueue} = State) + msg_bytes_enqueue = Enqueue} = State) when is_integer(Bytes) -> State#?MODULE{msg_bytes_checkout = Checkout - Bytes, msg_bytes_enqueue = Enqueue + Bytes}; diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index ec93f480dd..e337540c35 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -104,7 +104,12 @@ {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}], - status = up :: up | suspected_down + status = up :: up | + suspected_down | + %% it is useful to have a record of when this was blocked + %% so that we can retry sending the block effect if + %% the publisher did not receive the initial one + {blocked, At :: ra:index()} }). -record(cfg, @@ -113,6 +118,7 @@ release_cursor_interval :: option({non_neg_integer(), non_neg_integer()}), dead_letter_handler :: option(applied_mfa()), become_leader_handler :: option(applied_mfa()), + overflow_strategy = drop_head :: drop_head | reject_publish, max_length :: option(non_neg_integer()), max_bytes :: option(non_neg_integer()), %% whether single active consumer is on or not for this queue @@ -131,9 +137,6 @@ {cfg :: #cfg{}, % unassigned messages messages = lqueue:new() :: lqueue:queue(), - % defines the lowest message in id available in the messages map - % that isn't a return - low_msg_num :: option(msg_in_id()), % defines the next message in id to be added to the messages map next_msg_num = 1 :: msg_in_id(), % list of returned msg_in_ids - when checking out it picks from diff --git a/src/rabbit_fifo_v0.erl b/src/rabbit_fifo_v0.erl index ad261317c9..95f665d0a9 100644 --- a/src/rabbit_fifo_v0.erl +++ b/src/rabbit_fifo_v0.erl @@ -56,6 +56,7 @@ normalize_for_v1/1, %% getters for coversions get_field/2, + get_cfg_field/2, %% protocol helpers make_enqueue/3, @@ -611,12 +612,12 @@ tick(_Ts, #?STATE{cfg = #cfg{name = Name, -spec overview(state()) -> map(). overview(#?STATE{consumers = Cons, - enqueuers = Enqs, - release_cursors = Cursors, - enqueue_count = EnqCount, - msg_bytes_enqueue = EnqueueBytes, - msg_bytes_checkout = CheckoutBytes, - cfg = Cfg} = State) -> + enqueuers = Enqs, + release_cursors = Cursors, + enqueue_count = EnqCount, + msg_bytes_enqueue = EnqueueBytes, + msg_bytes_checkout = CheckoutBytes, + cfg = Cfg} = State) -> Conf = #{name => Cfg#cfg.name, resource => Cfg#cfg.resource, release_cursor_interval => Cfg#cfg.release_cursor_interval, @@ -1776,6 +1777,11 @@ get_field(Field, State) -> Index = record_index_of(Field, Fields), element(Index, State). +get_cfg_field(Field, #?STATE{cfg = Cfg} ) -> + Fields = record_info(fields, cfg), + Index = record_index_of(Field, Fields), + element(Index, Cfg). + record_index_of(F, Fields) -> index_of(2, F, Fields). |
