diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-08-30 10:45:15 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-08-30 12:03:05 +0100 |
| commit | 544d38c706ceb78619b080be2127dba2b9820211 (patch) | |
| tree | 3a437e84fcd7ff412e981b377d4c6a488eb71554 /src | |
| parent | c9002d8fc3eef554bce5d1f0080f47aac6a80893 (diff) | |
| download | rabbitmq-server-git-544d38c706ceb78619b080be2127dba2b9820211.tar.gz | |
Remove use of ra.hrl
As this is an internal header now.
Rename use of maybe/1 type to option to avoid
confusion with rabbit_types:maybe/1 which is different.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 24 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_fifo_index.erl | 1 |
4 files changed, 27 insertions, 28 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index d9566ea8f7..062fb7eee1 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -22,7 +22,6 @@ -compile(inline). -compile({no_auto_import, [apply/3]}). --include_lib("ra/include/ra.hrl"). -include("rabbit_fifo.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). @@ -68,8 +67,8 @@ ]). %% command records representing all the protocol actions that are supported --record(enqueue, {pid :: maybe(pid()), - seq :: maybe(msg_seqno()), +-record(enqueue, {pid :: option(pid()), + seq :: option(msg_seqno()), msg :: raw_msg()}). -record(checkout, {consumer_id :: consumer_id(), spec :: checkout_spec(), @@ -1604,7 +1603,7 @@ is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength, messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes). --spec make_enqueue(maybe(pid()), maybe(msg_seqno()), raw_msg()) -> protocol(). +-spec make_enqueue(option(pid()), option(msg_seqno()), raw_msg()) -> protocol(). make_enqueue(Pid, Seq, Msg) -> #enqueue{pid = Pid, seq = Seq, msg = Msg}. -spec make_checkout(consumer_id(), diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index be9dc682bb..0e9de0fb10 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -1,4 +1,6 @@ +-type option(T) :: undefined | T. + -type raw_msg() :: term(). %% The raw message. It is opaque to rabbit_fifo. @@ -28,7 +30,7 @@ -type msg_size() :: non_neg_integer(). %% the size in bytes of the msg payload --type indexed_msg() :: {ra_index(), msg()}. +-type indexed_msg() :: {ra:index(), msg()}. -type prefix_msg() :: {'$prefix_msg', msg_header()}. @@ -93,7 +95,7 @@ -record(enqueuer, {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list - pending = [] :: [{msg_seqno(), ra_index(), raw_msg()}], + pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}], status = up :: up | suspected_down }). @@ -101,16 +103,16 @@ {name :: atom(), resource :: rabbit_types:r('queue'), release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(), - dead_letter_handler :: maybe(applied_mfa()), - become_leader_handler :: maybe(applied_mfa()), - max_length :: maybe(non_neg_integer()), - max_bytes :: maybe(non_neg_integer()), + dead_letter_handler :: option(applied_mfa()), + become_leader_handler :: option(applied_mfa()), + max_length :: option(non_neg_integer()), + max_bytes :: option(non_neg_integer()), %% whether single active consumer is on or not for this queue consumer_strategy = competing :: consumer_strategy(), %% the maximum number of unsuccessful delivery attempts permitted - delivery_limit :: maybe(non_neg_integer()), - max_in_memory_length :: maybe(non_neg_integer()), - max_in_memory_bytes :: maybe(non_neg_integer()) + delivery_limit :: option(non_neg_integer()), + max_in_memory_length :: option(non_neg_integer()), + max_in_memory_bytes :: option(non_neg_integer()) }). -record(rabbit_fifo, @@ -119,7 +121,7 @@ messages = #{} :: #{msg_in_id() => indexed_msg()}, % defines the lowest message in id available in the messages map % that isn't a return - low_msg_num :: maybe(msg_in_id()), + low_msg_num :: option(msg_in_id()), % defines the next message in id to be added to the messages map next_msg_num = 1 :: msg_in_id(), % list of returned msg_in_ids - when checking out it picks from @@ -139,7 +141,7 @@ % for normal appending operations as it's backed by a map ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(), release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor, - ra_index(), #rabbit_fifo{}}), + ra:index(), #rabbit_fifo{}}), % consumers need to reflect consumer state at time of snapshot % needs to be part of snapshot consumers = #{} :: #{consumer_id() => #consumer{}}, diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 136800cc99..7fd0a664f4 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -43,7 +43,6 @@ stat/1 ]). --include_lib("ra/include/ra.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -define(SOFT_LIMIT, 256). @@ -63,8 +62,8 @@ delivery_count = 0 :: non_neg_integer()}). -record(state, {cluster_name :: cluster_name(), - servers = [] :: [ra_server_id()], - leader :: maybe(ra_server_id()), + servers = [] :: [ra:server_id()], + leader :: undefined | ra:server_id(), next_seq = 0 :: seq(), %% Last applied is initialise to -1 to note that no command has yet been %% applied, but allowing to resend messages if the first ones on the sequence @@ -77,7 +76,7 @@ {[seq()], [seq()], [seq()]}}, soft_limit = ?SOFT_LIMIT :: non_neg_integer(), pending = #{} :: #{seq() => - {maybe(term()), rabbit_fifo:command()}}, + {term(), rabbit_fifo:command()}}, consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() => #consumer{}}, block_handler = fun() -> ok end :: fun(() -> term()), @@ -99,7 +98,7 @@ %% @param ClusterName the id of the cluster to interact with %% @param Servers The known servers of the queue. If the current leader is known %% ensure the leader node is at the head of the list. --spec init(cluster_name(), [ra_server_id()]) -> state(). +-spec init(cluster_name(), [ra:server_id()]) -> state(). init(ClusterName, Servers) -> init(ClusterName, Servers, ?SOFT_LIMIT). @@ -109,7 +108,7 @@ init(ClusterName, Servers) -> %% @param Servers The known servers of the queue. If the current leader is known %% ensure the leader node is at the head of the list. %% @param MaxPending size defining the max number of pending commands. --spec init(cluster_name(), [ra_server_id()], non_neg_integer()) -> state(). +-spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state(). init(ClusterName = #resource{}, Servers, SoftLimit) -> Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, #state{cluster_name = ClusterName, @@ -117,7 +116,7 @@ init(ClusterName = #resource{}, Servers, SoftLimit) -> soft_limit = SoftLimit, timeout = Timeout}. --spec init(cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok), +-spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok), fun(() -> ok)) -> state(). init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, @@ -401,7 +400,7 @@ cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) -> %% @doc Purges all the messages from a rabbit_fifo queue and returns the number %% of messages purged. --spec purge(ra_server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}. +-spec purge(ra:server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}. purge(Node) -> case ra:process_command(Node, rabbit_fifo:make_purge()) of {ok, {purge, Reply}, _} -> @@ -414,7 +413,7 @@ purge(Node) -> pending_size(#state{pending = Pend}) -> maps:size(Pend). --spec stat(ra_server_id()) -> +-spec stat(ra:server_id()) -> {ok, non_neg_integer(), non_neg_integer()} | {error | timeout, term()}. stat(Leader) -> @@ -458,7 +457,7 @@ update_machine_state(Node, Conf) -> %% end %% ''' %% -%% @param From the {@link ra_server_id().} of the sending process. +%% @param From the {@link ra:server_id().} of the sending process. %% @param Event the body of the `ra_event'. %% @param State the current {@module} state. %% @@ -479,7 +478,7 @@ update_machine_state(Node, Conf) -> %% <li>`MsgId' is a consumer scoped monotonically incrementing id that can be %% used to {@link settle/3.} (roughly: AMQP 0.9.1 ack) message once finished %% with them.</li> --spec handle_ra_event(ra_server_id(), ra_server_proc:ra_event_body(), state()) -> +-spec handle_ra_event(ra:server_id(), ra_server_proc:ra_event_body(), state()) -> {internal, Correlators :: [term()], actions(), state()} | {rabbit_fifo:client_msg(), state()} | eol. handle_ra_event(From, {applied, Seqs}, @@ -567,7 +566,7 @@ handle_ra_event(_Leader, {machine, eol}, _State0) -> %% @param Msg the message to enqueue. %% %% @returns `ok' --spec untracked_enqueue([ra_server_id()], term()) -> +-spec untracked_enqueue([ra:server_id()], term()) -> ok. untracked_enqueue([Node | _], Msg) -> Cmd = rabbit_fifo:make_enqueue(undefined, undefined, Msg), diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl index 3bda9bab26..14ac89faff 100644 --- a/src/rabbit_fifo_index.erl +++ b/src/rabbit_fifo_index.erl @@ -10,7 +10,6 @@ map/2 ]). --include_lib("ra/include/ra.hrl"). -compile({no_auto_import, [size/1]}). %% the empty atom is a lot smaller (4 bytes) than e.g. `undefined` (13 bytes). |
