summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorD Corbacho <diana@rabbitmq.com>2019-08-30 12:17:49 +0100
committerGitHub <noreply@github.com>2019-08-30 12:17:49 +0100
commit36e323923b1a781c4a2e0d118ee6351d4dbb2d02 (patch)
tree7f7f351ca046affc51ffce8152f12629b780a111
parent7c36cc35eee734e917b22526f8091b8b93f37694 (diff)
parent544d38c706ceb78619b080be2127dba2b9820211 (diff)
downloadrabbitmq-server-git-36e323923b1a781c4a2e0d118ee6351d4dbb2d02.tar.gz
Merge pull request #2091 from rabbitmq/remove-ra-hrl
Remove use of ra.hrl
-rw-r--r--src/rabbit_fifo.erl7
-rw-r--r--src/rabbit_fifo.hrl24
-rw-r--r--src/rabbit_fifo_client.erl23
-rw-r--r--src/rabbit_fifo_index.erl1
-rw-r--r--test/rabbit_fifo_SUITE.erl1
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl1
6 files changed, 27 insertions, 30 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index d9566ea8f7..062fb7eee1 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -22,7 +22,6 @@
-compile(inline).
-compile({no_auto_import, [apply/3]}).
--include_lib("ra/include/ra.hrl").
-include("rabbit_fifo.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -68,8 +67,8 @@
]).
%% command records representing all the protocol actions that are supported
--record(enqueue, {pid :: maybe(pid()),
- seq :: maybe(msg_seqno()),
+-record(enqueue, {pid :: option(pid()),
+ seq :: option(msg_seqno()),
msg :: raw_msg()}).
-record(checkout, {consumer_id :: consumer_id(),
spec :: checkout_spec(),
@@ -1604,7 +1603,7 @@ is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
--spec make_enqueue(maybe(pid()), maybe(msg_seqno()), raw_msg()) -> protocol().
+-spec make_enqueue(option(pid()), option(msg_seqno()), raw_msg()) -> protocol().
make_enqueue(Pid, Seq, Msg) ->
#enqueue{pid = Pid, seq = Seq, msg = Msg}.
-spec make_checkout(consumer_id(),
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index be9dc682bb..0e9de0fb10 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -1,4 +1,6 @@
+-type option(T) :: undefined | T.
+
-type raw_msg() :: term().
%% The raw message. It is opaque to rabbit_fifo.
@@ -28,7 +30,7 @@
-type msg_size() :: non_neg_integer().
%% the size in bytes of the msg payload
--type indexed_msg() :: {ra_index(), msg()}.
+-type indexed_msg() :: {ra:index(), msg()}.
-type prefix_msg() :: {'$prefix_msg', msg_header()}.
@@ -93,7 +95,7 @@
-record(enqueuer,
{next_seqno = 1 :: msg_seqno(),
% out of order enqueues - sorted list
- pending = [] :: [{msg_seqno(), ra_index(), raw_msg()}],
+ pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}],
status = up :: up | suspected_down
}).
@@ -101,16 +103,16 @@
{name :: atom(),
resource :: rabbit_types:r('queue'),
release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(),
- dead_letter_handler :: maybe(applied_mfa()),
- become_leader_handler :: maybe(applied_mfa()),
- max_length :: maybe(non_neg_integer()),
- max_bytes :: maybe(non_neg_integer()),
+ dead_letter_handler :: option(applied_mfa()),
+ become_leader_handler :: option(applied_mfa()),
+ max_length :: option(non_neg_integer()),
+ max_bytes :: option(non_neg_integer()),
%% whether single active consumer is on or not for this queue
consumer_strategy = competing :: consumer_strategy(),
%% the maximum number of unsuccessful delivery attempts permitted
- delivery_limit :: maybe(non_neg_integer()),
- max_in_memory_length :: maybe(non_neg_integer()),
- max_in_memory_bytes :: maybe(non_neg_integer())
+ delivery_limit :: option(non_neg_integer()),
+ max_in_memory_length :: option(non_neg_integer()),
+ max_in_memory_bytes :: option(non_neg_integer())
}).
-record(rabbit_fifo,
@@ -119,7 +121,7 @@
messages = #{} :: #{msg_in_id() => indexed_msg()},
% defines the lowest message in id available in the messages map
% that isn't a return
- low_msg_num :: maybe(msg_in_id()),
+ low_msg_num :: option(msg_in_id()),
% defines the next message in id to be added to the messages map
next_msg_num = 1 :: msg_in_id(),
% list of returned msg_in_ids - when checking out it picks from
@@ -139,7 +141,7 @@
% for normal appending operations as it's backed by a map
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor,
- ra_index(), #rabbit_fifo{}}),
+ ra:index(), #rabbit_fifo{}}),
% consumers need to reflect consumer state at time of snapshot
% needs to be part of snapshot
consumers = #{} :: #{consumer_id() => #consumer{}},
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 136800cc99..7fd0a664f4 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -43,7 +43,6 @@
stat/1
]).
--include_lib("ra/include/ra.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-define(SOFT_LIMIT, 256).
@@ -63,8 +62,8 @@
delivery_count = 0 :: non_neg_integer()}).
-record(state, {cluster_name :: cluster_name(),
- servers = [] :: [ra_server_id()],
- leader :: maybe(ra_server_id()),
+ servers = [] :: [ra:server_id()],
+ leader :: undefined | ra:server_id(),
next_seq = 0 :: seq(),
%% Last applied is initialise to -1 to note that no command has yet been
%% applied, but allowing to resend messages if the first ones on the sequence
@@ -77,7 +76,7 @@
{[seq()], [seq()], [seq()]}},
soft_limit = ?SOFT_LIMIT :: non_neg_integer(),
pending = #{} :: #{seq() =>
- {maybe(term()), rabbit_fifo:command()}},
+ {term(), rabbit_fifo:command()}},
consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() =>
#consumer{}},
block_handler = fun() -> ok end :: fun(() -> term()),
@@ -99,7 +98,7 @@
%% @param ClusterName the id of the cluster to interact with
%% @param Servers The known servers of the queue. If the current leader is known
%% ensure the leader node is at the head of the list.
--spec init(cluster_name(), [ra_server_id()]) -> state().
+-spec init(cluster_name(), [ra:server_id()]) -> state().
init(ClusterName, Servers) ->
init(ClusterName, Servers, ?SOFT_LIMIT).
@@ -109,7 +108,7 @@ init(ClusterName, Servers) ->
%% @param Servers The known servers of the queue. If the current leader is known
%% ensure the leader node is at the head of the list.
%% @param MaxPending size defining the max number of pending commands.
--spec init(cluster_name(), [ra_server_id()], non_neg_integer()) -> state().
+-spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit) ->
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
#state{cluster_name = ClusterName,
@@ -117,7 +116,7 @@ init(ClusterName = #resource{}, Servers, SoftLimit) ->
soft_limit = SoftLimit,
timeout = Timeout}.
--spec init(cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok),
+-spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok),
fun(() -> ok)) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
@@ -401,7 +400,7 @@ cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) ->
%% @doc Purges all the messages from a rabbit_fifo queue and returns the number
%% of messages purged.
--spec purge(ra_server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}.
+-spec purge(ra:server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}.
purge(Node) ->
case ra:process_command(Node, rabbit_fifo:make_purge()) of
{ok, {purge, Reply}, _} ->
@@ -414,7 +413,7 @@ purge(Node) ->
pending_size(#state{pending = Pend}) ->
maps:size(Pend).
--spec stat(ra_server_id()) ->
+-spec stat(ra:server_id()) ->
{ok, non_neg_integer(), non_neg_integer()}
| {error | timeout, term()}.
stat(Leader) ->
@@ -458,7 +457,7 @@ update_machine_state(Node, Conf) ->
%% end
%% '''
%%
-%% @param From the {@link ra_server_id().} of the sending process.
+%% @param From the {@link ra:server_id().} of the sending process.
%% @param Event the body of the `ra_event'.
%% @param State the current {@module} state.
%%
@@ -479,7 +478,7 @@ update_machine_state(Node, Conf) ->
%% <li>`MsgId' is a consumer scoped monotonically incrementing id that can be
%% used to {@link settle/3.} (roughly: AMQP 0.9.1 ack) message once finished
%% with them.</li>
--spec handle_ra_event(ra_server_id(), ra_server_proc:ra_event_body(), state()) ->
+-spec handle_ra_event(ra:server_id(), ra_server_proc:ra_event_body(), state()) ->
{internal, Correlators :: [term()], actions(), state()} |
{rabbit_fifo:client_msg(), state()} | eol.
handle_ra_event(From, {applied, Seqs},
@@ -567,7 +566,7 @@ handle_ra_event(_Leader, {machine, eol}, _State0) ->
%% @param Msg the message to enqueue.
%%
%% @returns `ok'
--spec untracked_enqueue([ra_server_id()], term()) ->
+-spec untracked_enqueue([ra:server_id()], term()) ->
ok.
untracked_enqueue([Node | _], Msg) ->
Cmd = rabbit_fifo:make_enqueue(undefined, undefined, Msg),
diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl
index 3bda9bab26..14ac89faff 100644
--- a/src/rabbit_fifo_index.erl
+++ b/src/rabbit_fifo_index.erl
@@ -10,7 +10,6 @@
map/2
]).
--include_lib("ra/include/ra.hrl").
-compile({no_auto_import, [size/1]}).
%% the empty atom is a lot smaller (4 bytes) than e.g. `undefined` (13 bytes).
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index b992401575..0a0ac94e63 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -10,7 +10,6 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
--include_lib("ra/include/ra.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include("src/rabbit_fifo.hrl").
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 949019a131..0b947a9d1d 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -8,7 +8,6 @@
-include_lib("proper/include/proper.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
--include_lib("ra/include/ra.hrl").
-include("src/rabbit_fifo.hrl").
%%%===================================================================