summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-25 17:41:12 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-25 17:41:12 +0100
commitdcf663fcf99eeefb1c0de07e7389b47988cfbd92 (patch)
treeeff2f2f31d572615bc2453378f3b760d1323bb9f
parent9eaa79d5f80ec3025ce0dbbac5e81a60437dec7c (diff)
parenta4b602567081b28c4bc53ac5995b5c054a305da9 (diff)
downloadrabbitmq-server-git-dcf663fcf99eeefb1c0de07e7389b47988cfbd92.tar.gz
Merge branch 'master' into rabbitmq-server-1838-active-field-for-consumers
Conflicts: src/rabbit_fifo.erl
-rw-r--r--docs/rabbitmq-echopid.89
-rw-r--r--docs/rabbitmq-env.conf.524
-rw-r--r--docs/rabbitmq-plugins.8104
-rw-r--r--docs/rabbitmq-server.852
-rw-r--r--docs/rabbitmq-service.842
-rw-r--r--docs/rabbitmqctl.8184
-rwxr-xr-xscripts/rabbitmq-server4
-rw-r--r--scripts/rabbitmq-server.bat4
-rw-r--r--scripts/rabbitmq-service.bat4
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_fifo.erl581
-rw-r--r--src/rabbit_fifo_index.erl26
-rw-r--r--src/rabbit_quorum_queue.erl16
-rw-r--r--test/backing_queue_SUITE.erl18
-rw-r--r--test/dynamic_qq_SUITE.erl6
-rw-r--r--test/quorum_queue_SUITE.erl69
-rw-r--r--test/quorum_queue_utils.erl9
-rw-r--r--test/rabbit_fifo_SUITE.erl21
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl217
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl207
20 files changed, 1054 insertions, 549 deletions
diff --git a/docs/rabbitmq-echopid.8 b/docs/rabbitmq-echopid.8
index 095bc50ccb..1ce2561ec7 100644
--- a/docs/rabbitmq-echopid.8
+++ b/docs/rabbitmq-echopid.8
@@ -14,12 +14,12 @@
.\" The Initial Developer of the Original Code is Pivotal Software, Inc.
.\" Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
.\"
-.Dd April 25, 2017
+.Dd January 25, 2019
.Dt RABBITMQ-ECHOPID.BAT 8
.Os "RabbitMQ Server"
.Sh NAME
.Nm rabbitmq-echopid.bat
-.Nd return the Windows process id of the Erlang runtime hosting RabbitMQ
+.Nd returns the Windows process id of the Erlang runtime running RabbitMQ
.\" ------------------------------------------------------------------
.Sh SYNOPSIS
.\" ------------------------------------------------------------------
@@ -28,10 +28,7 @@
.\" ------------------------------------------------------------------
.Sh DESCRIPTION
.\" ------------------------------------------------------------------
-RabbitMQ is an implementation of AMQP, the emerging standard for high
-performance enterprise messaging.
-The RabbitMQ server is a robust and scalable implementation of an AMQP
-broker.
+RabbitMQ is an open source multi-protocol messaging broker.
.Pp
Running
.Nm
diff --git a/docs/rabbitmq-env.conf.5 b/docs/rabbitmq-env.conf.5
index 6b572dd50a..a97431c538 100644
--- a/docs/rabbitmq-env.conf.5
+++ b/docs/rabbitmq-env.conf.5
@@ -14,18 +14,18 @@
.\" The Initial Developer of the Original Code is Pivotal Software, Inc.
.\" Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
.\"
-.Dd April 25, 2017
+.Dd January 25, 2019
.Dt RABBITMQ-ENV.CONF 5
.Os "RabbitMQ Server"
.Sh NAME
.Nm rabbitmq-env.conf
-.Nd default settings for RabbitMQ AMQP server
+.Nd environment variables used by RabbitMQ server
.\" ------------------------------------------------------------------
.Sh DESCRIPTION
.\" ------------------------------------------------------------------
.Nm
-contains variable settings that override the defaults built in to the
-RabbitMQ startup scripts.
+contains environment variables that override the defaults built in to the
+RabbitMQ scripts and CLI tools.
.Pp
The file is interpreted by the system shell, and so should consist of a
sequence of shell environment variable definitions.
@@ -58,20 +58,28 @@ prefix removed:
from the environment becomes
.Ev NODE_PORT
in
-.Nm ,
-etc.
+.Nm .
.\" ------------------------------------------------------------------
.Sh EXAMPLES
.\" ------------------------------------------------------------------
-Here is an example of a complete
+Below is an example of a minimalistic
.Nm
-file that overrides the default Erlang node name from "rabbit" to
+file that overrides the default node name prefix from "rabbit" to
"hare".
.sp
.Dl # I am a complete rabbitmq-env.conf file.
.Dl # Comment lines start with a hash character.
.Dl # This is a /bin/sh script file - use ordinary envt var syntax
.Dl NODENAME=hare
+
+In the below
+.Nm
+file RabbitMQ configuration file location is changed to "/data/services/rabbitmq/rabbitmq.conf".
+.sp
+.Dl # I am a complete rabbitmq-env.conf file.
+.Dl # Comment lines start with a hash character.
+.Dl # This is a /bin/sh script file - use ordinary envt var syntax
+.Dl CONFIG_FILE=/data/services/rabbitmq/rabbitmq.conf
.\" ------------------------------------------------------------------
.Sh SEE ALSO
.\" ------------------------------------------------------------------
diff --git a/docs/rabbitmq-plugins.8 b/docs/rabbitmq-plugins.8
index 338399b3ad..b7f2065e37 100644
--- a/docs/rabbitmq-plugins.8
+++ b/docs/rabbitmq-plugins.8
@@ -14,31 +14,41 @@
.\" The Initial Developer of the Original Code is Pivotal Software, Inc.
.\" Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
.\"
-.Dd April 25, 2017
+.Dd January 25, 2019
.Dt RABBITMQ-PLUGINS 8
.Os "RabbitMQ Server"
.Sh NAME
.Nm rabbitmq-plugins
-.Nd command line for managing RabbitMQ broker plugins
+.Nd command line for managing RabbitMQ plugins
.\" ------------------------------------------------------------------
.Sh SYNOPSIS
.\" ------------------------------------------------------------------
.Nm
+.Op Fl q
+.Op Fl s
+.Op Fl l
.Op Fl n Ar node
+.Op Fl t Ar timeout
.Ar command
.Op Ar command_options
.\" ------------------------------------------------------------------
.Sh DESCRIPTION
.\" ------------------------------------------------------------------
.Nm
-is a command line tool for managing RabbitMQ broker plugins.
-It allows one to enable, disable and browse plugins.
+is a command line tool for managing RabbitMQ plugins.
+See the
+.Lk https://www.rabbitmq.com/plugins.html "RabbitMQ Plugins guide"
+for an overview of RabbitMQ plugins and how they are used.
+
+.Nm
+allows the operator to enable, disable and inspect plugins.
It must be run by a user with write permissions to the RabbitMQ
configuration directory.
.Pp
-Some plugins depend on others to work correctly.
+Plugins can depend on other plugins.
.Nm
-traverses these dependencies and enables all required plugins.
+resolves the dependencies and enables or disables all dependencies
+so that the user doesn't have to manage them explicitly.
Plugins listed on the
.Nm
command line are marked as explicitly enabled; dependent plugins are
@@ -54,12 +64,66 @@ and
commands will update the plugins file and then attempt to connect to the
broker and ensure it is running all enabled plugins.
By default if it is not possible to connect to the running broker (for
-example if it is stopped) then a warning is displayed.
-Specify
-.Fl -online
-or
+example if it is stopped) the operation will fail.
+If
+.Nm
+is used on the same host as the target node,
.Fl -offline
-to change this behaviour.
+can be specified to make
+.Nm
+resolve and update plugin state directly (without contacting the node).
+Such changes will only have an effect on next node start.
+To learn more, see the
+.Lk https://www.rabbitmq.com/plugins.html "RabbitMQ Plugins guide"
+.
+.\" ------------------------------------------------------------------
+.Sh OPTIONS
+.\" ------------------------------------------------------------------
+.Bl -tag -width Ds
+.It Fl n Ar node
+Default node is
+.Qq Pf rabbit@ Ar target-hostname ,
+where
+.Ar target-hostname
+is the local host.
+On a host named
+.Qq myserver.example.com ,
+the node name will usually be
+.Qq rabbit@myserver
+(unless
+.Ev RABBITMQ_NODENAME
+has been overridden).
+The output of
+.Qq hostname -s
+is usually the correct suffix to use after the
+.Qq @
+sign.
+See
+.Xr rabbitmq-server 8
+for details of configuring a RabbitMQ node.
+.It Fl q , -quiet
+Quiet output mode is selected.
+Informational messages are reduced when quiet mode is in effect.
+.It Fl s , -silent
+Silent output mode is selected.
+Informational messages are reduced and table headers are suppressed when silent mode is in effect.
+.It Fl t Ar timeout , Fl -timeout Ar timeout
+Operation timeout in seconds.
+Not all commands support timeouts.
+Default is
+.Cm infinity .
+.It Fl l , Fl -longnames
+Must be specified when the cluster is configured to use long (FQDN) node names.
+To learn more, see the
+.Lk https://www.rabbitmq.com/clustering.html "RabbitMQ Clustering guide"
+.It Fl -erlang-cookie Ar cookie
+Shared secret to use to authenticate to the target node.
+Prefer using a local file or the
+.Ev RABBITMQ_ERLANG_COOKIE
+environment variable instead of specifying this option on the command line.
+To learn more, see the
+.Lk https://www.rabbitmq.com/cli.html "RabbitMQ CLI Tools guide"
+.El
.\" ------------------------------------------------------------------
.Sh COMMANDS
.\" ------------------------------------------------------------------
@@ -124,9 +188,9 @@ This command lists all implicitly or explicitly enabled RabbitMQ plugins.
.It Cm enable Oo Fl -offline Oc Oo Fl -online Oc Ar plugin ...
.Bl -tag -width Ds
.It Fl -offline
-Just modify the enabled plugins file.
+Modify node's enabled plugin state directly without contacting the node.
.It Fl -online
-Treat failure to connect to the running broker as fatal.
+Treat a failure to connect to the running broker as fatal.
.It Ar plugin
One or more plugins to enable.
.El
@@ -144,9 +208,9 @@ plugins and all their dependencies:
.It Cm disable Oo Fl -offline Oc Oo Fl -online Oc Ar plugin ...
.Bl -tag -width Ds
.It Fl -offline
-Just modify the enabled plugins file.
+Modify node's enabled plugin state directly without contacting the node.
.It Fl -online
-Treat failure to connect to the running broker as fatal.
+Treat a failure to connect to the running broker as fatal.
.It Ar plugin
One or more plugins to disable.
.El
@@ -154,19 +218,19 @@ One or more plugins to disable.
Disables the specified plugins and all their dependencies.
.Pp
For example, this command disables
-.Qq amqp_client
+.Qq rabbitmq_management
and all plugins that depend on it:
.sp
-.Dl rabbitmq-plugins disable amqp_client
+.Dl rabbitmq-plugins disable rabbitmq_management
.\" ------------------------------------
.It Cm set Oo Fl -offline Oc Oo Fl -online Oc Op Ar plugin ...
.Bl -tag -width Ds
.It Fl -offline
-Just modify the enabled plugins file.
+Modify node's enabled plugin state directly without contacting the node.
.It Fl -online
-Treat failure to connect to the running broker as fatal.
+Treat a failure to connect to the running broker as fatal.
.It Ar plugin
-Zero or more plugins to enable.
+Zero or more plugins to disable.
.El
.Pp
Enables the specified plugins and all their dependencies.
diff --git a/docs/rabbitmq-server.8 b/docs/rabbitmq-server.8
index 381bc62652..8882ce62c7 100644
--- a/docs/rabbitmq-server.8
+++ b/docs/rabbitmq-server.8
@@ -14,12 +14,12 @@
.\" The Initial Developer of the Original Code is Pivotal Software, Inc.
.\" Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
.\"
-.Dd April 25, 2017
+.Dd January 25, 2019
.Dt RABBITMQ-SERVER 8
.Os "RabbitMQ Server"
.Sh NAME
.Nm rabbitmq-server
-.Nd start RabbitMQ AMQP server
+.Nd starts a RabbitMQ node
.\" ------------------------------------------------------------------
.Sh SYNOPSIS
.\" ------------------------------------------------------------------
@@ -28,47 +28,55 @@
.\" ------------------------------------------------------------------
.Sh DESCRIPTION
.\" ------------------------------------------------------------------
-RabbitMQ is an implementation of AMQP, the emerging standard for high
-performance enterprise messaging.
-The RabbitMQ server is a robust and scalable implementation of an AMQP
-broker.
+RabbitMQ is an open source multi-protocol messaging broker.
.Pp
Running
.Nm
-in the foreground displays a banner message, and reports on progress in
-the startup sequence, concluding with the message
-.Qq broker running ,
-indicating that the RabbitMQ broker has been started successfully.
-To shut down the server, just terminate the process or use
+starts a RabbitMQ node in the foreground. The node will display a startup
+banner and report when startup is complete.
+To shut down the server, use service management tools or
.Xr rabbitmqctl 8 .
.\" ------------------------------------------------------------------
.Sh ENVIRONMENT
.\" ------------------------------------------------------------------
.Bl -tag -width Ds
+.It Ev RABBITMQ_CONFIG_FILE
+Defaults to
+.Pa /etc/rabbitmq/rabbitmq.conf .
+Node configuration file path.
+To learn more, see the
+.Lk https://www.rabbitmq.com/configure.html "RabbitMQ Configuration guide"
.It Ev RABBITMQ_MNESIA_BASE
Defaults to
.Pa /var/lib/rabbitmq/mnesia .
-Set this to the directory where Mnesia database files should be placed.
+Node data directory will be located (or created) in this directory.
+To learn more, see the
+.Lk https://www.rabbitmq.com/relocate.html "RabbitMQ File and Directory guide"
.It Ev RABBITMQ_LOG_BASE
Defaults to
.Pa /var/log/rabbitmq .
Log files generated by the server will be placed in this directory.
+To learn more, see the
+.Lk https://www.rabbitmq.com/logging.html "RabbitMQ Logging guide"
.It Ev RABBITMQ_NODENAME
Defaults to
-.Qq rabbit .
-This can be useful if you want to run more than one node per machine -
+.Qq rabbit@ .
+followed by the computed hostname.
+Can be used to run multiple nodes on the same host.
+Every node in a cluster must have a unique
.Ev RABBITMQ_NODENAME
-should be unique per erlang-node-and-machine combination.
-See the
-.Lk http://www.rabbitmq.com/clustering.html#single-machine "clustering on a single machine guide"
-for details.
+To learn more, see the
+.Lk https://www.rabbitmq.com/clustering.html "RabbitMQ Clustering guide"
.It Ev RABBITMQ_NODE_IP_ADDRESS
-By default RabbitMQ will bind to all interfaces, on IPv4 and IPv6 if
-available.
-Set this if you only want to bind to one network interface or address
+By default RabbitMQ will bind to all IPv6 and IPv4 interfaces available.
+This variable limits the node to one network interface or address
family.
+To learn more, see the
+.Lk https://www.rabbitmq.com/networking.html "RabbitMQ Networking guide"
.It Ev RABBITMQ_NODE_PORT
-Defaults to 5672.
+AMQP 0-9-1 and AMQP 1.0 port. Defaults to 5672.
+To learn more, see the
+.Lk https://www.rabbitmq.com/networking.html "RabbitMQ Networking guide"
.El
.\" ------------------------------------------------------------------
.Sh OPTIONS
diff --git a/docs/rabbitmq-service.8 b/docs/rabbitmq-service.8
index d293fba7b5..fd01bbb814 100644
--- a/docs/rabbitmq-service.8
+++ b/docs/rabbitmq-service.8
@@ -14,12 +14,12 @@
.\" The Initial Developer of the Original Code is Pivotal Software, Inc.
.\" Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
.\"
-.Dd April 25, 2017
+.Dd January 25, 2019
.Dt RABBITMQ-SERVICE.BAT 8
.Os "RabbitMQ Server"
.Sh NAME
.Nm rabbitmq-service.bat
-.Nd manage RabbitMQ AMQP Windows service
+.Nd tool for managing RabbitMQ Windows service
.\" ------------------------------------------------------------------
.Sh SYNOPSIS
.\" ------------------------------------------------------------------
@@ -28,17 +28,14 @@
.\" ------------------------------------------------------------------
.Sh DESCRIPTION
.\" ------------------------------------------------------------------
-RabbitMQ is an implementation of AMQP, the emerging standard for high
-performance enterprise messaging.
-The RabbitMQ server is a robust and scalable implementation of an AMQP
-broker.
+RabbitMQ is an open source multi-protocol messaging broker.
.Pp
Running
.Nm
-allows the RabbitMQ broker to be run as a service on
-NT/2000/2003/XP/Vista® environments.
+allows the RabbitMQ broker to be run as a service in
+Windows® environments.
The RabbitMQ broker service can be started and stopped using the
-Windows® services applet.
+Windows® services panel.
.Pp
By default the service will run in the authentication context of the
local system account.
@@ -93,25 +90,28 @@ Note: Windows only. Defaults to the application data directory of the
current user. This is the location of log and database directories.
.It Ev RABBITMQ_NODENAME
Defaults to
-.Qq rabbit .
-This can be useful if you want to run more than one node per machine -
+.Qq rabbit@ .
+followed by the computed hostname.
+Can be used to run multiple nodes on the same host.
+Every node in a cluster must have a unique
.Ev RABBITMQ_NODENAME
-should be unique per erlang-node-and-machine combination.
-See the
-.Lk http://www.rabbitmq.com/clustering.html#single-machine clustering on a single machine guide
-for details.
+To learn more, see the
+.Lk https://www.rabbitmq.com/clustering.html "RabbitMQ Clustering guide"
.It Ev RABBITMQ_NODE_IP_ADDRESS
-By default RabbitMQ will bind to all interfaces, on IPv4 and IPv6 if
-available.
-Set this if you only want to bind to one network interface or address
+By default RabbitMQ will bind to all IPv6 and IPv4 interfaces available.
+This variable limits the node to one network interface or address
family.
+To learn more, see the
+.Lk https://www.rabbitmq.com/networking.html "RabbitMQ Networking guide"
.It Ev RABBITMQ_NODE_PORT
-Defaults to 5672.
+AMQP 0-9-1 and AMQP 1.0 port. Defaults to 5672.
+To learn more, see the
+.Lk https://www.rabbitmq.com/networking.html "RabbitMQ Networking guide"
.It Ev ERLANG_SERVICE_MANAGER_PATH
Defaults to
-.Pa C:\(rsProgram\ Files\(rserl5.5.5\(rserts-5.5.5\(rsbin
+.Pa C:\(rsProgram\ Files\(rserl{version}\(rserts-{version}\(rsbin
(or
-.Pa C:\(rsProgram\ Files\ (x86)\(rserl5.5.5\(rserts-5.5.5\(rsbin
+.Pa C:\(rsProgram\ Files\ (x86)\(rserl{version}\(rserts-{version}\(rsbin
for 64-bit environments).
This is the installation location of the Erlang service manager.
.It Ev RABBITMQ_CONSOLE_LOG
diff --git a/docs/rabbitmqctl.8 b/docs/rabbitmqctl.8
index 3e395926d0..321a069844 100644
--- a/docs/rabbitmqctl.8
+++ b/docs/rabbitmqctl.8
@@ -14,17 +14,18 @@
.\" The Initial Developer of the Original Code is Pivotal Software, Inc.
.\" Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
.\"
-.Dd April 25, 2017
+.Dd January 25, 2019
.Dt RABBITMQCTL 8
.Os "RabbitMQ Server"
.Sh NAME
.Nm rabbitmqctl
-.Nd command line for managing a RabbitMQ broker
+.Nd tool for managing RabbitMQ nodes
.\" ------------------------------------------------------------------
.Sh SYNOPSIS
.\" ------------------------------------------------------------------
.Nm
.Op Fl q
+.Op Fl s
.Op Fl l
.Op Fl n Ar node
.Op Fl t Ar timeout
@@ -33,35 +34,38 @@
.\" ------------------------------------------------------------------
.Sh DESCRIPTION
.\" ------------------------------------------------------------------
-RabbitMQ is an implementation of AMQP, the emerging standard for high
-performance enterprise messaging.
-The RabbitMQ Server is a robust and scalable implementation of an AMQP
-broker.
+RabbitMQ is an open source multi-protocol messaging broker.
.Pp
.Nm
-is a command line tool for managing a RabbitMQ broker.
-It performs all actions by connecting to one of the broker's nodes.
-.Pp
-Diagnostic information is displayed if the broker was not running, could
-not be reached, or rejected the connection due to mismatching Erlang
-cookies.
+is a command line tool for managing a RabbitMQ server node.
+It performs all actions by connecting to the target RabbitMQ node
+on a dedicated CLI tool communication port and authenticating
+using a shared secret (known as the cookie file).
+.Pp
+Diagnostic information is displayed if connection failed,
+the target node was not running, or .Nm could not authenticate to
+the target node successfully.
+To learn more, see the
+.Lk https://www.rabbitmq.com/cli.html "RabbitMQ CLI Tools guide"
+and
+.Lk https://www.rabbitmq.com/networking.html "RabbitMQ Networking guide"
.\" ------------------------------------------------------------------
.Sh OPTIONS
.\" ------------------------------------------------------------------
.Bl -tag -width Ds
.It Fl n Ar node
Default node is
-.Qq Pf rabbit@ Ar server ,
+.Qq Pf rabbit@ Ar target-hostname ,
where
-.Ar server
+.Ar target-hostname
is the local host.
On a host named
.Qq myserver.example.com ,
-the node name of the RabbitMQ Erlang node will usually be
+the node name will usually be
.Qq rabbit@myserver
(unless
.Ev RABBITMQ_NODENAME
-has been set to some non-default value at broker startup time).
+has been overridden).
The output of
.Qq hostname -s
is usually the correct suffix to use after the
@@ -69,13 +73,13 @@ is usually the correct suffix to use after the
sign.
See
.Xr rabbitmq-server 8
-for details of configuring the RabbitMQ broker.
+for details of configuring a RabbitMQ node.
.It Fl q , -quiet
Quiet output mode is selected.
-Informational messages are suppressed when quiet mode is in effect.
+Informational messages are reduced when quiet mode is in effect.
.It Fl s , -silent
Silent output mode is selected.
-Informational messages and table headers are suppressed when silent mode is in effect.
+Informational messages are reduced and table headers are suppressed when silent mode is in effect.
.It Fl -no-table-headers
Do not output headers for tabular data.
.It Fl -dry-run
@@ -83,19 +87,20 @@ Do not run the command.
Only print information message.
.It Fl t Ar timeout , Fl -timeout Ar timeout
Operation timeout in seconds.
-Only applicable to
-.Qq list
-commands.
+Not all commands support timeouts.
Default is
.Cm infinity .
.It Fl l , Fl -longnames
-Use longnames for erlang distribution.
-If RabbitMQ broker uses long node names for erlang distribution, the
-option must be specified.
+Must be specified when the cluster is configured to use long (FQDN) node names.
+To learn more, see the
+.Lk https://www.rabbitmq.com/clustering.html "RabbitMQ Clustering guide"
.It Fl -erlang-cookie Ar cookie
-Erlang distribution cookie.
-If RabbitMQ node is using a custom erlang cookie value, the cookie value
-must be set vith this parameter.
+Shared secret to use to authenticate to the target node.
+Prefer using a local file or the
+.Ev RABBITMQ_ERLANG_COOKIE
+environment variable instead of specifying this option on the command line.
+To learn more, see the
+.Lk https://www.rabbitmq.com/cli.html "RabbitMQ CLI Tools guide"
.El
.\" ------------------------------------------------------------------
.Sh COMMANDS
@@ -194,22 +199,26 @@ Rotation is performed asynchronously, so there is no guarantee that it
will be completed when this command returns.
.\" ------------------------------------
.It Cm shutdown
-Shuts down the Erlang process on which RabbitMQ is running.
-The command is blocking and will return after the Erlang process exits.
+Shuts down the node, both RabbitMQ and its runtime.
+The command is blocking and will return after the runtime process exits.
If RabbitMQ fails to stop, it will return a non-zero exit code.
+This command infers the process pid of the target node and
+therefore can only be used to shut down nodes running on the same
+host (or broadly speaking, in the same operating system,
+e.g. in the same VM or container)
.Pp
Unlike the stop command, the shutdown command:
.Bl -bullet
.It
does not require a
.Ar pid_file
-to wait for the Erlang process to exit
+to wait for the runtime process to exit
.It
returns a non-zero exit code if RabbitMQ node is not running
.El
.Pp
-For example, to shut down the Erlang process on which RabbitMQ is
-running:
+For example, this will shut down a locally running RabbitMQ node
+with default node name:
.sp
.Dl rabbitmqctl shutdown
.\" ------------------------------------
@@ -244,7 +253,7 @@ For example, to instruct the RabbitMQ node to terminate:
.Dl rabbitmqctl stop
.\" ------------------------------------
.It Cm stop_app
-Stops the RabbitMQ application, leaving the Erlang node running.
+Stops the RabbitMQ application, leaving the runtme (Erlang VM) running.
.Pp
This command is typically run prior to performing other management
actions that require the RabbitMQ application to be stopped, e.g.\&
@@ -554,15 +563,17 @@ time it is started:
The name of the queue to synchronise.
.El
.Pp
-Instructs a mirrored queue with unsynchronised slaves to synchronise
-itself.
+Instructs a mirrored queue with unsynchronised mirrors (follower replicas)
+to synchronise them.
The queue will block while synchronisation takes place (all publishers
-to and consumers from the queue will block).
-The queue must be mirrored for this command to succeed.
+to and consumers using the queue will block or temporarily see no activity).
+This command can only be used with mirrored queues.
+To learn more, see the
+.Lk https://www.rabbitmq.com/ha.html "RabbitMQ Mirroring guide"
.Pp
-Note that unsynchronised queues from which messages are being drained
-will become synchronised eventually.
-This command is primarily useful for queues which are not being drained.
+Note that queues with unsynchronised replicas and active consumers
+will become synchronised eventually (assuming that consumers make progress).
+This command is primarily useful for queues which do not have active consumers.
.\" ------------------------------------
.It Cm cancel_sync_queue Oo Fl p Ar vhost Oc Ar queue
.Bl -tag -width Ds
@@ -594,10 +605,11 @@ For example, this sets the cluster name to
.Dl rabbitmqctl set_cluster_name london
.El
.Ss User Management
-Note that
+Note that all user management commands
.Nm
-manages the RabbitMQ internal user database.
-Users from any alternative authentication backend will not be visible to
+only can manage users in the internal RabbitMQ database.
+Users from any alternative authentication backends such as LDAP cannot be inspected
+or managed with those commands.
.Nm .
.Bl -tag -width Ds
.\" ------------------------------------
@@ -610,11 +622,11 @@ The password the created user will use to log in to the broker.
.El
.Pp
For example, this command instructs the RabbitMQ broker to create a (non-administrative) user named
-.Qq tonyg
+.Qq janeway
with (initial) password
.Qq changeit :
.sp
-.Dl rabbitmqctl add_user tonyg changeit
+.Dl rabbitmqctl add_user janeway changeit
.\" ------------------------------------
.It Cm delete_user Ar username
.Bl -tag -width Ds
@@ -623,9 +635,9 @@ The name of the user to delete.
.El
.Pp
For example, this command instructs the RabbitMQ broker to delete the user named
-.Qq tonyg :
+.Qq janeway :
.sp
-.Dl rabbitmqctl delete_user tonyg
+.Dl rabbitmqctl delete_user janeway
.\" ------------------------------------
.It Cm change_password Ar username Ar newpassword
.Bl -tag -width Ds
@@ -637,11 +649,11 @@ The new password for the user.
.Pp
For example, this command instructs the RabbitMQ broker to change the
password for the user named
-.Qq tonyg
+.Qq janeway
to
.Qq newpass :
.sp
-.Dl rabbitmqctl change_password tonyg newpass
+.Dl rabbitmqctl change_password janeway newpass
.\" ------------------------------------
.It Cm clear_password Ar username
.Bl -tag -width Ds
@@ -651,9 +663,9 @@ The name of the user whose password is to be cleared.
.Pp
For example, this command instructs the RabbitMQ broker to clear the
password for the user named
-.Qq tonyg :
+.Qq janeway :
.sp
-.Dl rabbitmqctl clear_password tonyg
+.Dl rabbitmqctl clear_password janeway
.Pp
This user now cannot log in with a password (but may be able to through
e.g. SASL EXTERNAL if configured).
@@ -667,11 +679,11 @@ The password of the user.
.El
.Pp
For example, this command instructs the RabbitMQ broker to authenticate the user named
-.Qq tonyg
+.Qq janeway
with password
.Qq verifyit :
.sp
-.Dl rabbitmqctl authenticate_user tonyg verifyit
+.Dl rabbitmqctl authenticate_user janeway verifyit
.\" ------------------------------------
.It Cm set_user_tags Ar username Op Ar tag ...
.Bl -tag -width Ds
@@ -683,10 +695,10 @@ Any existing tags will be removed.
.El
.Pp
For example, this command instructs the RabbitMQ broker to ensure the user named
-.Qq tonyg
+.Qq janeway
is an administrator:
.sp
-.Dl rabbitmqctl set_user_tags tonyg administrator
+.Dl rabbitmqctl set_user_tags janeway administrator
.Pp
This has no effect when the user logs in via AMQP, but can be used to
permit the user to manage users, virtual hosts and permissions when
@@ -694,9 +706,9 @@ the user logs in via some other means (for example with the management
plugin).
.Pp
This command instructs the RabbitMQ broker to remove any tags from the user named
-.Qq tonyg :
+.Qq janeway :
.sp
-.Dl rabbitmqctl set_user_tags tonyg
+.Dl rabbitmqctl set_user_tags janeway
.\" ------------------------------------
.It Cm list_users
Lists users.
@@ -796,14 +808,14 @@ Sets user permissions.
.Pp
For example, this command instructs the RabbitMQ broker to grant the
user named
-.Qq tonyg
+.Qq janeway
access to the virtual host called
.Qq /myvhost ,
with configure permissions on all resources whose names starts with
-.Qq tonyg- ,
+.Qq janeway- ,
and write and read permissions on all resources:
.sp
-.Dl rabbitmqctl set_permissions -p /myvhost tonyg Qo ^tonyg-.* Qc Qo .* Qc Qq .*
+.Dl rabbitmqctl set_permissions -p /myvhost janeway Qo ^janeway-.* Qc Qo .* Qc Qq .*
.\" ------------------------------------
.It Cm clear_permissions Oo Fl p Ar vhost Oc Ar username
.Bl -tag -width Ds
@@ -819,11 +831,11 @@ Sets user permissions.
.Pp
For example, this command instructs the RabbitMQ broker to deny the user
named
-.Qq tonyg
+.Qq janeway
access to the virtual host called
.Qq /myvhost :
.sp
-.Dl rabbitmqctl clear_permissions -p /myvhost tonyg
+.Dl rabbitmqctl clear_permissions -p /myvhost janeway
.\" ------------------------------------
.It Cm list_permissions Op Fl p Ar vhost
.Bl -tag -width Ds
@@ -855,11 +867,11 @@ Lists user permissions.
.Pp
For example, this command instructs the RabbitMQ broker to list all the
virtual hosts to which the user named
-.Qq tonyg
+.Qq janeway
has been granted access, and the permissions the user has for operations
on resources in these virtual hosts:
.sp
-.Dl rabbitmqctl list_user_permissions tonyg
+.Dl rabbitmqctl list_user_permissions janeway
.\" ------------------------------------
.It Cm set_topic_permissions Oo Fl p Ar vhost Oc Ar user Ar exchange Ar write Ar read
.Bl -tag -width Ds
@@ -881,22 +893,22 @@ Sets user topic permissions.
.Pp
For example, this command instructs the RabbitMQ broker to let the
user named
-.Qq tonyg
+.Qq janeway
publish and consume messages going through the
.Qq amp.topic
exchange of the
.Qq /myvhost
virtual host with a routing key starting with
-.Qq tonyg- :
+.Qq janeway- :
.sp
-.Dl rabbitmqctl set_topic_permissions -p /myvhost tonyg amq.topic Qo ^tonyg-.* Qc Qo ^tonyg-.* Qc
+.Dl rabbitmqctl set_topic_permissions -p /myvhost janeway amq.topic Qo ^janeway-.* Qc Qo ^janeway-.* Qc
.Pp
Topic permissions support variable expansion for the following variables:
username, vhost, and client_id. Note that client_id is expanded only when using MQTT.
The previous example could be made more generic by using
.Qq ^{username}-.* :
.sp
-.Dl rabbitmqctl set_topic_permissions -p /myvhost tonyg amq.topic Qo ^{username}-.* Qc Qo ^{username}-.* Qc
+.Dl rabbitmqctl set_topic_permissions -p /myvhost janeway amq.topic Qo ^{username}-.* Qc Qo ^{username}-.* Qc
.\" ------------------------------------
.It Cm clear_topic_permissions Oo Fl p Ar vhost Oc Ar username Oo Ar exchange Oc
.Bl -tag -width Ds
@@ -915,13 +927,13 @@ Clear user topic permissions.
.Pp
For example, this command instructs the RabbitMQ broker to remove topic permissions for user
named
-.Qq tonyg
+.Qq janeway
for the topic exchange
.Qq amq.topic
in the virtual host called
.Qq /myvhost :
.sp
-.Dl rabbitmqctl clear_topic_permissions -p /myvhost tonyg amq.topic
+.Dl rabbitmqctl clear_topic_permissions -p /myvhost janeway amq.topic
.\" ------------------------------------
.It Cm list_topic_permissions Op Fl p Ar vhost
.Bl -tag -width Ds
@@ -949,22 +961,22 @@ Lists user topic permissions.
.Pp
For example, this command instructs the RabbitMQ broker to list all the
virtual hosts to which the user named
-.Qq tonyg
+.Qq janeway
has been granted access, and the topic permissions the user has in these virtual hosts:
.sp
-.Dl rabbitmqctl list_topic_user_permissions tonyg
+.Dl rabbitmqctl list_topic_user_permissions janeway
.El
.Ss Parameter Management
-Certain features of RabbitMQ (such as the federation plugin) are
+Certain features of RabbitMQ (such as the Federation plugin) are
controlled by dynamic, cluster-wide
.Em parameters.
There are 2 kinds of parameters: parameters scoped to a virtual host and
global parameters.
Each vhost-scoped parameter consists of a component name, a name and a
value.
-The component name and name are strings, and the value is an Erlang term.
+The component name and name are strings, and the value is a valid JSON document.
A global parameter consists of a name and value.
-The name is a string and the value is an Erlang term.
+The name is a string and the value is an arbitrary Erlang data structure.
Parameters can be set, cleared and listed.
In general you should refer to the documentation for the feature in
question to see how to set parameters.
@@ -1283,11 +1295,11 @@ Whether the queue will be deleted automatically when no longer used.
.It Cm arguments
Queue arguments.
.It Cm policy
-Policy name applying to the queue.
+Effective policy name for the queue.
.It Cm pid
-Id of the Erlang process associated with the queue.
+Erlang process identifier of the queue.
.It Cm owner_pid
-Id of the Erlang process representing the connection which is the
+Id of the Erlang process of the connection which is the
exclusive owner of the queue.
Empty if the queue is non-exclusive.
.It Cm exclusive
@@ -1331,7 +1343,7 @@ acknowledged.
.It Cm message_bytes_ram
Like
.Cm message_bytes
-but counting only those messages which are in RAM.
+but counting only those messages which are currently held in RAM.
.It Cm message_bytes_persistent
Like
.Cm message_bytes
@@ -1353,14 +1365,16 @@ immediately deliver messages to consumers.
This can be less than 1.0 if consumers are limited by network congestion
or prefetch count.
.It Cm memory
-Bytes of memory consumed by the Erlang process associated with the
+Bytes of memory allocated by the runtime for the
queue, including stack, heap and internal structures.
.It Cm slave_pids
-If the queue is mirrored, this gives the IDs of the current slaves.
+If the queue is mirrored, this lists the IDs of the mirrors (follower replicas).
+To learn more, see the
+.Lk https://www.rabbitmq.com/ha.html "RabbitMQ Mirroring guide"
.It Cm synchronised_slave_pids
-If the queue is mirrored, this gives the IDs of the current slaves which
-are synchronised with the master - i.e. those which could take over from
-the master without message loss.
+If the queue is mirrored, this gives the IDs of the mirrors (follower replicas) which
+are synchronised with the master (leader). To learn more, see the
+.Lk https://www.rabbitmq.com/ha.html "RabbitMQ Mirroring guide"
.It Cm state
The state of the queue.
Normally
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index b205785b33..efff38c3af 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -160,9 +160,7 @@ if [ "${RABBITMQ_CONFIG_FILE_NOEX}.conf" = "${RABBITMQ_CONFIG_FILE}" ]; then
mkdir -p "${RABBITMQ_GENERATED_CONFIG_DIR}"
fi
- if [ ! -f "${RABBITMQ_SCHEMA_DIR}/rabbit.schema" ]; then
- cp "${RABBITMQ_HOME}/priv/schema/rabbit.schema" "${RABBITMQ_SCHEMA_DIR}"
- fi
+ cp -f "${RABBITMQ_HOME}/priv/schema/rabbit.schema" "${RABBITMQ_SCHEMA_DIR}"
RABBITMQ_GENERATED_CONFIG_ARG="-conf ${RABBITMQ_CONFIG_FILE} \
-conf_dir ${RABBITMQ_GENERATED_CONFIG_DIR} \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 41677775f7..9039243c62 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -130,9 +130,7 @@ if "!RABBITMQ_CONFIG_FILE_NOEX!.conf" == "!RABBITMQ_CONFIG_FILE!" (
mkdir "!RABBITMQ_GENERATED_CONFIG_DIR!"
)
- if not exist "!RABBITMQ_SCHEMA_DIR!\rabbit.schema" (
- copy "!RABBITMQ_HOME!\priv\schema\rabbit.schema" "!RABBITMQ_SCHEMA_DIR!\rabbit.schema"
- )
+ copy /Y "!RABBITMQ_HOME!\priv\schema\rabbit.schema" "!RABBITMQ_SCHEMA_DIR!\rabbit.schema"
set RABBITMQ_GENERATED_CONFIG_ARG=-conf "!RABBITMQ_CONFIG_FILE!" ^
-conf_dir "!RABBITMQ_GENERATED_CONFIG_DIR!" ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 57a9587ced..08cc29f2c8 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -213,9 +213,7 @@ if "!RABBITMQ_CONFIG_FILE_NOEX!.conf" == "!RABBITMQ_CONFIG_FILE!" (
mkdir "!RABBITMQ_GENERATED_CONFIG_DIR!"
)
- if not exist "!RABBITMQ_SCHEMA_DIR!\rabbit.schema" (
- copy "!RABBITMQ_HOME!\priv\schema\rabbit.schema" "!RABBITMQ_SCHEMA_DIR!\rabbit.schema"
- )
+ copy /Y "!RABBITMQ_HOME!\priv\schema\rabbit.schema" "!RABBITMQ_SCHEMA_DIR!\rabbit.schema"
set RABBITMQ_GENERATED_CONFIG_ARG=-conf "!RABBITMQ_CONFIG_FILE!" ^
-conf_dir "!RABBITMQ_GENERATED_CONFIG_DIR!" ^
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c8c6968ab0..44a044a4dc 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -31,7 +31,7 @@
-export([start/2, stop/1, prep_stop/1]).
-export([start_apps/1, start_apps/2, stop_apps/1]).
-export([log_locations/0, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent
--export([is_booted/1]).
+-export([is_booted/1, is_booted/0, is_booting/1, is_booting/0]).
-ifdef(TEST).
@@ -696,6 +696,8 @@ await_startup(Node) ->
end
end.
+is_booting() -> is_booting(node()).
+
is_booting(Node) ->
case rpc:call(Node, erlang, whereis, [rabbit_boot]) of
{badrpc, _} = Err -> Err;
@@ -790,6 +792,8 @@ is_running() -> is_running(node()).
is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit).
+is_booted() -> is_booted(node()).
+
is_booted(Node) ->
case is_booting(Node) of
false ->
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index ad02205e02..612cd52dcc 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -38,6 +38,7 @@
% queries
query_messages_ready/1,
query_messages_checked_out/1,
+ query_messages_total/1,
query_processes/1,
query_ra_indexes/1,
query_consumer_count/1,
@@ -87,8 +88,13 @@
-type msg() :: {msg_header(), raw_msg()}.
%% message with a header map.
+-type msg_size() :: non_neg_integer().
+%% the size in bytes of the msg payload
+
-type indexed_msg() :: {ra_index(), msg()}.
+-type prefix_msg() :: {'$prefix_msg', msg_size()}.
+
-type delivery_msg() :: {msg_id(), msg()}.
%% A tuple consisting of the message id and the headered message.
@@ -157,7 +163,7 @@
-type applied_mfa() :: {module(), atom(), list()}.
% represents a partially applied module call
--define(SHADOW_COPY_INTERVAL, 4096 * 4).
+-define(SHADOW_COPY_INTERVAL, 4096 * 8).
-define(USE_AVG_HALF_LIFE, 10000.0).
-record(consumer,
@@ -202,7 +208,8 @@
next_msg_num = 1 :: msg_in_id(),
% list of returned msg_in_ids - when checking out it picks from
% this list first before taking low_msg_num
- returns = lqueue:new() :: lqueue:lqueue('$prefix_msg' | msg_in_id()),
+ returns = lqueue:new() :: lqueue:lqueue(prefix_msg() |
+ {msg_in_id(), indexed_msg()}),
% a counter of enqueues - used to trigger shadow copy points
enqueue_count = 0 :: non_neg_integer(),
% a map containing all the live processes that have ever enqueued
@@ -224,19 +231,20 @@
dead_letter_handler :: maybe(applied_mfa()),
become_leader_handler :: maybe(applied_mfa()),
%% This is a special field that is only used for snapshots
- %% It represents the number of queued messages at the time the
+ %% It represents the queued messages at the time the
%% dehydrated snapshot state was cached.
%% As release_cursors are only emitted for raft indexes where all
%% prior messages no longer contribute to the current state we can
- %% replace all message payloads at some index with a single integer
- %% to be decremented during `checkout_one' until it's 0 after which
- %% it instead takes messages from the `messages' map.
+ %% replace all message payloads with their sizes (to be used for
+ %% overflow calculations).
%% This is done so that consumers are still served in a deterministic
%% order on recovery.
- prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(),
- PrefixMsgs :: non_neg_integer()},
+ prefix_msgs = {[], []} :: {Return :: [msg_size()],
+ PrefixMsgs :: [msg_size()]},
msg_bytes_enqueue = 0 :: non_neg_integer(),
msg_bytes_checkout = 0 :: non_neg_integer(),
+ max_length :: maybe(non_neg_integer()),
+ max_bytes :: maybe(non_neg_integer()),
%% whether single active consumer is on or not for this queue
consumer_strategy = default :: default | single_active,
%% waiting consumers, one is picked active consumer is cancelled or dies
@@ -251,6 +259,8 @@
dead_letter_handler => applied_mfa(),
become_leader_handler => applied_mfa(),
shadow_copy_interval => non_neg_integer(),
+ max_length => non_neg_integer(),
+ max_bytes => non_neg_integer(),
single_active_consumer_on => boolean()}.
-export_type([protocol/0,
@@ -272,12 +282,14 @@
init(#{name := Name,
queue_resource := Resource} = Conf) ->
update_config(Conf, #state{name = Name,
- queue_resource = Resource}).
+ queue_resource = Resource}).
update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
+ MaxLength = maps:get(max_length, Conf, undefined),
+ MaxBytes = maps:get(max_bytes, Conf, undefined),
ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
true ->
single_active;
@@ -287,6 +299,8 @@ update_config(Conf, State) ->
State#state{dead_letter_handler = DLH,
become_leader_handler = BLH,
shadow_copy_interval = SHI,
+ max_length = MaxLength,
+ max_bytes = MaxBytes,
consumer_strategy = ConsumerStrategy}.
zero(_) ->
@@ -294,59 +308,49 @@ zero(_) ->
% msg_ids are scoped per consumer
% ra_indexes holds all raft indexes for enqueues currently on queue
--spec apply(ra_machine:command_meta_data(), command(),
- state()) ->
- {state(), Reply :: term(), ra_machine:effects()}.
-apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq,
- msg = RawMsg}, State00) ->
- case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State00) of
- {ok, State0, Effects1} ->
- %% need to checkout before capturing the shadow copy else
- %% snapshots may not be complete
- {State, ok, Effects} = checkout(
- add_bytes_enqueue(RawMsg, State0),
- Effects1),
- append_to_master_index(RaftIdx, Effects, State);
- {duplicate, State, Effects} ->
- {State, ok, lists:reverse(Effects)}
- end;
-apply(#{index := RaftIdx},
+-spec apply(ra_machine:command_meta_data(), command(), state()) ->
+ {state(), Reply :: term(), ra_machine:effects()} |
+ {state(), Reply :: term()}.
+apply(Metadata, #enqueue{pid = From, seq = Seq,
+ msg = RawMsg}, State00) ->
+ apply_enqueue(Metadata, From, Seq, RawMsg, State00);
+apply(Meta,
#settle{msg_ids = MsgIds, consumer_id = ConsumerId},
#state{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0} ->
% need to increment metrics before completing as any snapshot
% states taken need to includ them
- complete_and_checkout(RaftIdx, MsgIds, ConsumerId,
+ complete_and_checkout(Meta, MsgIds, ConsumerId,
Con0, [], State);
_ ->
{State, ok}
end;
-apply(#{index := RaftIdx}, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
+apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
#state{consumers = Cons0} = State0) ->
case Cons0 of
#{ConsumerId := Con0} ->
Discarded = maps:with(MsgIds, Con0#consumer.checked_out),
Effects = dead_letter_effects(Discarded, State0, []),
- complete_and_checkout(RaftIdx, MsgIds, ConsumerId, Con0,
+ complete_and_checkout(Meta, MsgIds, ConsumerId, Con0,
Effects, State0);
_ ->
{State0, ok}
end;
-apply(_, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
+apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
#state{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0 = #consumer{checked_out = Checked0}} ->
Checked = maps:without(MsgIds, Checked0),
Returned = maps:with(MsgIds, Checked0),
MsgNumMsgs = maps:values(Returned),
- return(ConsumerId, MsgNumMsgs, Con0, Checked, [], State);
+ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, [], State);
_ ->
{State, ok}
end;
-apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
- drain = Drain, consumer_id = ConsumerId},
+apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
+ drain = Drain, consumer_id = ConsumerId},
#state{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
case Cons0 of
@@ -359,7 +363,7 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
ServiceQueue0),
Cons = maps:put(ConsumerId, Con1, Cons0),
{State1, ok, Effects} =
- checkout(State0#state{service_queue = ServiceQueue,
+ checkout(Meta, State0#state{service_queue = ServiceQueue,
consumers = Cons}, []),
Response = {send_credit_reply, maps:size(State1#state.messages)},
%% by this point all checkouts for the updated credit value
@@ -389,59 +393,63 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
%% credit for unknown consumer - just ignore
{State0, ok}
end;
-apply(_, #checkout{spec = {dequeue, _}},
- #state{messages = M,
- prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 ->
- %% FIXME: also check if there are returned messages
- %% TODO do we need metric visibility of empty get requests?
- {State0, {dequeue, empty}};
-apply(Meta, #checkout{spec = {dequeue, settled}, meta = ConsumerMeta,
+apply(Meta, #checkout{spec = {dequeue, Settlement},
+ meta = ConsumerMeta,
consumer_id = ConsumerId},
- State0) ->
- % TODO: this clause could probably be optimised
- State1 = update_consumer(ConsumerId, ConsumerMeta,
- {once, 1, simple_prefetch}, State0),
- % turn send msg effect into reply
- {success, _, MsgId, Msg, State2} = checkout_one(State1),
- % immediately settle
- {State, _, Effects} = apply(Meta, make_settle(ConsumerId, [MsgId]), State2),
- {State, {dequeue, {MsgId, Msg}}, Effects};
-apply(_, #checkout{spec = {dequeue, unsettled},
- meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId},
- State0) ->
- State1 = update_consumer(ConsumerId, ConsumerMeta,
- {once, 1, simple_prefetch}, State0),
- case checkout_one(State1) of
- {success, _, MsgId, Msg, S} ->
- {S, {dequeue, {MsgId, Msg}}, [{monitor, process, Pid}]};
- {inactive, S} ->
- {S, {dequeue, empty}, [{aux, inactive}]};
- S ->
- {S, {dequeue, empty}}
+ #state{consumers = Consumers} = State0) ->
+ Exists = maps:is_key(ConsumerId, Consumers),
+ case messages_ready(State0) of
+ 0 ->
+ {State0, {dequeue, empty}};
+ _ when Exists ->
+ %% a dequeue using the same consumer_id isn't possible at this point
+ {State0, {dequeue, empty}};
+ _ ->
+ State1 = update_consumer(ConsumerId, ConsumerMeta,
+ {once, 1, simple_prefetch}, State0),
+ {success, _, MsgId, Msg, State2} = checkout_one(State1),
+ case Settlement of
+ unsettled ->
+ {_, Pid} = ConsumerId,
+ {State2, {dequeue, {MsgId, Msg}},
+ [{monitor, process, Pid}]};
+ settled ->
+ %% immediately settle the checkout
+ {State, _, Effects} = apply(Meta,
+ make_settle(ConsumerId, [MsgId]),
+ State2),
+ {State, {dequeue, {MsgId, Msg}}, Effects}
+ end
end;
-apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
+apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
{State, Effects} = cancel_consumer(ConsumerId, State0, []),
% TODO: here we should really demonitor the pid but _only_ if it has no
- % other consumers or enqueuers.
- checkout(State, Effects);
-apply(_, #checkout{spec = Spec, meta = Meta,
- consumer_id = {_, Pid} = ConsumerId},
+ % other consumers or enqueuers. leaving a monitor in place isn't harmful
+ % however
+ checkout(Meta, State, Effects);
+apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
+ consumer_id = {_, Pid} = ConsumerId},
State0) ->
- State1 = update_consumer(ConsumerId, Meta, Spec, State0),
- checkout(State1, [{monitor, process, Pid}]);
+ State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0),
+ checkout(Meta, State1, [{monitor, process, Pid}]);
apply(#{index := RaftIdx}, #purge{},
#state{ra_indexes = Indexes0,
+ returns = Returns,
messages = Messages} = State0) ->
- Total = maps:size(Messages),
- Indexes = lists:foldl(fun rabbit_fifo_index:delete/2,
+ Total = messages_ready(State0),
+ Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2,
Indexes0,
[I || {I, _} <- lists:sort(maps:values(Messages))]),
+ Indexes = lists:foldl(fun rabbit_fifo_index:delete/2,
+ Indexes1,
+ [I || {_, {I, _}} <- lqueue:to_list(Returns)]),
{State, _, Effects} =
update_smallest_raft_index(RaftIdx, Indexes0,
State0#state{ra_indexes = Indexes,
messages = #{},
returns = lqueue:new(),
msg_bytes_enqueue = 0,
+ prefix_msgs = {[], []},
low_msg_num = undefined},
[]),
%% as we're not checking out after a purge (no point) we have to
@@ -461,8 +469,6 @@ apply(_, {down, ConsumerPid, noconnection},
#consumer{checked_out = Checked0} = C,
{Co, St0, Eff}) when node(P) =:= Node ->
St = return_all(St0, Checked0),
- %% TODO: need to increment credit here
- %% with the size of the Checked map
Credit = increase_credit(C, maps:size(Checked0)),
Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff),
{maps:put(K, C#consumer{suspected_down = true,
@@ -488,8 +494,8 @@ apply(_, {down, ConsumerPid, noconnection},
%% TODO: should we run a checkout here?
{State#state{consumers = Cons, enqueuers = Enqs,
waiting_consumers = WaitingConsumers}, ok, Effects2};
-apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
State1 = case maps:take(Pid, Enqs0) of
@@ -508,8 +514,8 @@ apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
{State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) ->
cancel_consumer(ConsumerId, S, E)
end, {State2, Effects1}, DownConsumers),
- checkout(State, Effects);
-apply(_, {nodeup, Node}, #state{consumers = Cons0,
+ checkout(Meta, State, Effects);
+apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
enqueuers = Enqs0,
service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
@@ -538,14 +544,14 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0,
(_, _, Acc) ->
Acc
end, {Cons0, SQ0, Monitors}, Cons0),
- % TODO: avoid list concat
- checkout(State0#state{consumers = Cons1, enqueuers = Enqs1,
- service_queue = SQ,
- waiting_consumers = WaitingConsumers}, Effects);
+
+ checkout(Meta, State0#state{consumers = Cons1, enqueuers = Enqs1,
+ service_queue = SQ,
+ waiting_consumers = WaitingConsumers}, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
-apply(_, #update_config{config = Conf}, State) ->
- {update_config(Conf, State), ok}.
+apply(Meta, #update_config{config = Conf}, State) ->
+ checkout(Meta, update_config(Conf, State), []).
consumer_active_flag_update_function(#state{consumer_strategy = default}) ->
fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
@@ -605,7 +611,7 @@ state_enter(leader, #state{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
name = Name,
- prefix_msg_counts = {0, 0},
+ prefix_msgs = {[], []},
become_leader_handler = BLH}) ->
% return effects to monitor all current consumers and enqueuers
Pids = lists:usort(maps:keys(Enqs)
@@ -621,10 +627,10 @@ state_enter(leader, #state{consumers = Cons,
{Mod, Fun, Args} ->
[{mod_call, Mod, Fun, Args ++ [Name]} | Effects]
end;
-state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts})
- when PrefixMsgCounts =/= {0, 0} ->
+state_enter(recovered, #state{prefix_msgs = PrefixMsgCounts})
+ when PrefixMsgCounts =/= {[], []} ->
%% TODO: remove assertion?
- exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts});
+ exit({rabbit_fifo, unexpected_prefix_msgs, PrefixMsgCounts});
state_enter(eol, #state{enqueuers = Enqs,
consumers = Custs0,
waiting_consumers = WaitingConsumers0}) ->
@@ -642,14 +648,12 @@ state_enter(_, _) ->
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
tick(_Ts, #state{name = Name,
queue_resource = QName,
- messages = Messages,
- ra_indexes = Indexes,
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes} = State) ->
Metrics = {Name,
- maps:size(Messages), % Ready
+ messages_ready(State),
num_checked_out(State), % checked out
- rabbit_fifo_index:size(Indexes), %% Total
+ messages_total(State),
query_consumer_count(State), % Consumers
EnqueueBytes,
CheckoutBytes},
@@ -659,17 +663,14 @@ tick(_Ts, #state{name = Name,
-spec overview(state()) -> map().
overview(#state{consumers = Cons,
enqueuers = Enqs,
- messages = Messages,
- ra_indexes = Indexes,
msg_bytes_enqueue = EnqueueBytes,
- msg_bytes_checkout = CheckoutBytes
- } = State) ->
+ msg_bytes_checkout = CheckoutBytes} = State) ->
#{type => ?MODULE,
num_consumers => maps:size(Cons),
num_checked_out => num_checked_out(State),
num_enqueuers => maps:size(Enqs),
- num_ready_messages => maps:size(Messages),
- num_messages => rabbit_fifo_index:size(Indexes),
+ num_ready_messages => messages_ready(State),
+ num_messages => messages_total(State),
enqueue_message_bytes => EnqueueBytes,
checkout_message_bytes => CheckoutBytes}.
@@ -706,13 +707,16 @@ handle_aux(_, cast, Cmd, {Name, Use0}, Log, _) ->
%%% Queries
-query_messages_ready(#state{messages = M}) ->
- M.
+query_messages_ready(State) ->
+ messages_ready(State).
query_messages_checked_out(#state{consumers = Consumers}) ->
maps:fold(fun (_, #consumer{checked_out = C}, S) ->
- maps:merge(S, maps:from_list(maps:values(C)))
- end, #{}, Consumers).
+ maps:size(C) + S
+ end, 0, Consumers).
+
+query_messages_total(State) ->
+ messages_total(State).
query_processes(#state{enqueuers = Enqs, consumers = Cons0}) ->
Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0),
@@ -802,6 +806,18 @@ usage(Name) when is_atom(Name) ->
%%% Internal
+messages_ready(#state{messages = M,
+ prefix_msgs = {PreR, PreM},
+ returns = R}) ->
+
+ %% TODO: optimise to avoid length/1 call
+ maps:size(M) + lqueue:len(R) + length(PreR) + length(PreM).
+
+messages_total(#state{ra_indexes = I,
+ prefix_msgs = {PreR, PreM}}) ->
+
+ rabbit_fifo_index:size(I) + length(PreR) + length(PreM).
+
update_use({inactive, _, _, _} = CUInfo, inactive) ->
CUInfo;
update_use({active, _, _} = CUInfo, active) ->
@@ -913,6 +929,35 @@ cancel_consumer0(ConsumerId,
{S0, Effects0}
end.
+apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
+ Bytes = message_size(RawMsg),
+ case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
+ {ok, State1, Effects1} ->
+ State2 = append_to_master_index(RaftIdx,
+ add_bytes_enqueue(Bytes, State1)),
+ {State, ok, Effects} = checkout(Meta, State2, Effects1),
+ {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
+ {duplicate, State, Effects} ->
+ {State, ok, Effects}
+ end.
+
+drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) ->
+ case take_next_msg(State0) of
+ {FullMsg = {_MsgId, {RaftIdxToDrop, {_Header, Msg}}},
+ State1} ->
+ Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0),
+ Bytes = message_size(Msg),
+ State = add_bytes_drop(Bytes, State1#state{ra_indexes = Indexes}),
+ Effects = dead_letter_effects(maps:put(none, FullMsg, #{}),
+ State, Effects0),
+ {State, Effects};
+ {{'$prefix_msg', Bytes}, State1} ->
+ State = add_bytes_drop(Bytes, State1),
+ {State, Effects0};
+ empty ->
+ {State0, Effects0}
+ end.
+
enqueue(RaftIdx, RawMsg, #state{messages = Messages,
low_msg_num = LowMsgNum,
next_msg_num = NextMsgNum} = State0) ->
@@ -923,19 +968,28 @@ enqueue(RaftIdx, RawMsg, #state{messages = Messages,
low_msg_num = min(LowMsgNum, NextMsgNum),
next_msg_num = NextMsgNum + 1}.
-append_to_master_index(RaftIdx, Effects,
+append_to_master_index(RaftIdx,
#state{ra_indexes = Indexes0} = State0) ->
- {State, Shadow} = incr_enqueue_count(State0),
- Indexes = rabbit_fifo_index:append(RaftIdx, Shadow, Indexes0),
- {State#state{ra_indexes = Indexes}, ok, Effects}.
+ State = incr_enqueue_count(State0),
+ Indexes = rabbit_fifo_index:append(RaftIdx, undefined, Indexes0),
+ State#state{ra_indexes = Indexes}.
incr_enqueue_count(#state{enqueue_count = C,
shadow_copy_interval = C} = State0) ->
- % time to stash a dehydrated state version
- State = State0#state{enqueue_count = 0},
- {State, dehydrate_state(State)};
+ % this will trigger a dehydrated version of the state to be stored
+ % at this raft index for potential future snapshot generation
+ State0#state{enqueue_count = 0};
incr_enqueue_count(#state{enqueue_count = C} = State) ->
- {State#state{enqueue_count = C + 1}, undefined}.
+ State#state{enqueue_count = C + 1}.
+
+maybe_store_dehydrated_state(RaftIdx, #state{enqueue_count = 0,
+ ra_indexes = Indexes} = State) ->
+ Dehydrated = dehydrate_state(State),
+ State#state{ra_indexes =
+ rabbit_fifo_index:update_if_present(RaftIdx, Dehydrated,
+ Indexes)};
+maybe_store_dehydrated_state(_RaftIdx, State) ->
+ State.
enqueue_pending(From,
@@ -950,7 +1004,8 @@ enqueue_pending(From, Enq, #state{enqueuers = Enqueuers0} = State) ->
maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) ->
% direct enqueue without tracking
- {ok, enqueue(RaftIdx, RawMsg, State0), Effects};
+ State = enqueue(RaftIdx, RawMsg, State0),
+ {ok, State, Effects};
maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
#state{enqueuers = Enqueuers0} = State0) ->
case maps:get(From, Enqueuers0, undefined) of
@@ -981,19 +1036,19 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
snd(T) ->
element(2, T).
-return(ConsumerId, MsgNumMsgs, Con0, Checked,
+return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) ->
Con = Con0#consumer{checked_out = Checked,
credit = increase_credit(Con0, length(MsgNumMsgs))},
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
- State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) ->
+ State1 = lists:foldl(fun({'$prefix_msg', _} = Msg, S0) ->
return_one(0, Msg, S0);
({MsgNum, Msg}, S0) ->
return_one(MsgNum, Msg, S0)
end, State0, MsgNumMsgs),
- checkout(State1#state{consumers = Cons,
- service_queue = SQ},
+ checkout(Meta, State1#state{consumers = Cons,
+ service_queue = SQ},
Effects).
% used to processes messages that are finished
@@ -1024,7 +1079,7 @@ increase_credit(#consumer{lifetime = auto,
increase_credit(#consumer{credit = Current}, Credit) ->
Current + Credit.
-complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
+complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
#consumer{checked_out = Checked0} = Con0,
Effects0, #state{ra_indexes = Indexes0} = State0) ->
Checked = maps:without(MsgIds, Checked0),
@@ -1032,15 +1087,15 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) ->
add_bytes_settle(RawMsg, Acc);
- (_, Acc) ->
- Acc
+ ({'$prefix_msg', _} = M, Acc) ->
+ add_bytes_settle(M, Acc)
end, State0, maps:values(Discarded)),
%% need to pass the length of discarded as $prefix_msgs would be filtered
%% by the above list comprehension
{State2, Effects1} = complete(ConsumerId, MsgRaftIdxs,
maps:size(Discarded),
Con0, Checked, Effects0, State1),
- {State, ok, Effects} = checkout(State2, Effects1),
+ {State, ok, Effects} = checkout(Meta, State2, Effects1),
% settle metrics are incremented separately
update_smallest_raft_index(IncomingRaftIdx, Indexes0, State, Effects).
@@ -1050,7 +1105,7 @@ dead_letter_effects(_Discarded,
Effects;
dead_letter_effects(Discarded,
#state{dead_letter_handler = {Mod, Fun, Args}}, Effects) ->
- DeadLetters = maps:fold(fun(_, {_, {_, {_, Msg}}},
+ DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}},
% MsgId, MsgIdID, RaftId, Header
Acc) -> [{rejected, Msg} | Acc]
end, [], Discarded),
@@ -1062,7 +1117,6 @@ cancel_consumer_effects(ConsumerId, #state{queue_resource = QName}, Effects) ->
update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
#state{ra_indexes = Indexes,
- % prefix_msg_count = 0,
messages = Messages} = State, Effects) ->
case rabbit_fifo_index:size(Indexes) of
0 when map_size(Messages) =:= 0 ->
@@ -1080,46 +1134,50 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
% effects
{State, ok, Effects};
{_, {Smallest, Shadow}} when Shadow =/= undefined ->
- % ?INFO("RELEASE ~w ~w ~w~n", [IncomingRaftIdx, Smallest,
- % Shadow]),
{State, ok, [{release_cursor, Smallest, Shadow}]};
_ -> % smallest
- % no shadow taken for this index,
% no release cursor increase
{State, ok, Effects}
end
end.
-% TODO update message then update messages and returns in single operations
-return_one(0, '$prefix_msg',
+return_one(0, {'$prefix_msg', _} = Msg,
#state{returns = Returns} = State0) ->
- State0#state{returns = lqueue:in('$prefix_msg', Returns)};
+ add_bytes_return(Msg,
+ State0#state{returns = lqueue:in(Msg, Returns)});
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
- #state{messages = Messages,
- returns = Returns} = State0) ->
+ #state{returns = Returns} = State0) ->
Header = maps:update_with(delivery_count,
fun (C) -> C+1 end,
1, Header0),
Msg = {RaftId, {Header, RawMsg}},
% this should not affect the release cursor in any way
add_bytes_return(RawMsg,
- State0#state{messages = maps:put(MsgNum, Msg, Messages),
- returns = lqueue:in(MsgNum, Returns)}).
+ State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}).
-return_all(State, Checked0) ->
+return_all(State0, Checked0) ->
%% need to sort the list so that we return messages in the order
%% they were checked out
Checked = lists:sort(maps:to_list(Checked0)),
- lists:foldl(fun ({_, '$prefix_msg'}, S) ->
- return_one(0, '$prefix_msg', S);
+ lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, S) ->
+ return_one(0, Msg, S);
({_, {MsgNum, Msg}}, S) ->
return_one(MsgNum, Msg, S)
- end, State, Checked).
+ end, State0, Checked).
%% checkout new messages to consumers
%% reverses the effects list
-checkout(State, Effects) ->
- checkout0(checkout_one(State), Effects, #{}).
+checkout(#{index := Index}, State0, Effects0) ->
+ {State1, _Result, Effects1} = checkout0(checkout_one(State0),
+ Effects0, #{}),
+ case evaluate_limit(State0#state.ra_indexes, false,
+ State1, Effects1) of
+ {State, true, Effects} ->
+ update_smallest_raft_index(Index, State0#state.ra_indexes,
+ State, Effects);
+ {State, false, Effects} ->
+ {State, ok, Effects}
+ end.
checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) ->
DelMsg = {MsgId, Msg},
@@ -1127,12 +1185,30 @@ checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) ->
fun (M) -> [DelMsg | M] end,
[DelMsg], Acc0),
checkout0(checkout_one(State), Effects, Acc);
-checkout0({inactive, State}, Effects0, Acc) ->
- Effects = append_send_msg_effects(Effects0, Acc),
- {State, ok, lists:reverse([{aux, inactive} | Effects])};
-checkout0(State, Effects0, Acc) ->
- Effects = append_send_msg_effects(Effects0, Acc),
- {State, ok, lists:reverse(Effects)}.
+checkout0({Activity, State0}, Effects0, Acc) ->
+ Effects1 = case Activity of
+ nochange ->
+ append_send_msg_effects(Effects0, Acc);
+ inactive ->
+ [{aux, inactive}
+ | append_send_msg_effects(Effects0, Acc)]
+ end,
+ {State0, ok, lists:reverse(Effects1)}.
+
+evaluate_limit(_OldIndexes, Result,
+ #state{max_length = undefined,
+ max_bytes = undefined} = State,
+ Effects) ->
+ {State, Result, Effects};
+evaluate_limit(OldIndexes, Result,
+ State0, Effects0) ->
+ case is_over_limit(State0) of
+ true ->
+ {State, Effects} = drop_head(State0, Effects0),
+ evaluate_limit(OldIndexes, true, State, Effects);
+ false ->
+ {State0, Result, Effects0}
+ end.
append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 ->
Effects;
@@ -1142,57 +1218,50 @@ append_send_msg_effects(Effects0, AccMap) ->
end, Effects0, AccMap),
[{aux, active} | Effects].
-next_checkout_message(#state{prefix_msg_counts = {PReturns, P}} = State)
- when PReturns > 0 ->
+%% next message is determined as follows:
+%% First we check if there are are prefex returns
+%% Then we check if there are current returns
+%% then we check prefix msgs
+%% then we check current messages
+%%
+%% When we return it is always done to the current return queue
+%% for both prefix messages and current messages
+take_next_msg(#state{prefix_msgs = {[Bytes | Rem], P}} = State) ->
%% there are prefix returns, these should be served first
- {'$prefix_msg', State#state{prefix_msg_counts = {PReturns - 1, P}}};
-next_checkout_message(#state{returns = Returns,
- low_msg_num = Low0,
- prefix_msg_counts = {R, P},
- next_msg_num = NextMsgNum} = State) ->
+ {{'$prefix_msg', Bytes},
+ State#state{prefix_msgs = {Rem, P}}};
+take_next_msg(#state{returns = Returns,
+ low_msg_num = Low0,
+ messages = Messages0,
+ prefix_msgs = {R, P}} = State) ->
%% use peek rather than out there as the most likely case is an empty
%% queue
case lqueue:peek(Returns) of
- {value, Next} ->
- {Next, State#state{returns = lqueue:drop(Returns)}};
- empty when P == 0 ->
+ {value, NextMsg} ->
+ {NextMsg,
+ State#state{returns = lqueue:drop(Returns)}};
+ empty when P == [] ->
case Low0 of
undefined ->
- {undefined, State};
+ empty;
_ ->
- case Low0 + 1 of
- NextMsgNum ->
- %% the map will be empty after this item is removed
- {Low0, State#state{low_msg_num = undefined}};
- Low ->
- {Low0, State#state{low_msg_num = Low}}
+ {Msg, Messages} = maps:take(Low0, Messages0),
+ case maps:size(Messages) of
+ 0 ->
+ {{Low0, Msg},
+ State#state{messages = Messages,
+ low_msg_num = undefined}};
+ _ ->
+ {{Low0, Msg},
+ State#state{messages = Messages,
+ low_msg_num = Low0 + 1}}
end
end;
empty ->
+ [Bytes | Rem] = P,
%% There are prefix msgs
- {'$prefix_msg', State#state{prefix_msg_counts = {R, P - 1}}}
- end.
-
-%% next message is determined as follows:
-%% First we check if there are are prefex returns
-%% Then we check if there are current returns
-%% then we check prefix msgs
-%% then we check current messages
-%%
-%% When we return it is always done to the current return queue
-%% for both prefix messages and current messages
-take_next_msg(#state{messages = Messages0} = State0) ->
- case next_checkout_message(State0) of
- {'$prefix_msg', State} ->
- {'$prefix_msg', State, Messages0};
- {NextMsgInId, State} ->
- %% messages are available
- case maps:take(NextMsgInId, Messages0) of
- {IdxMsg, Messages} ->
- {{NextMsgInId, IdxMsg}, State, Messages};
- error ->
- error
- end
+ {{'$prefix_msg', Bytes},
+ State#state{prefix_msgs = {R, Rem}}}
end.
send_msg_effect({CTag, CPid}, Msgs) ->
@@ -1204,7 +1273,7 @@ checkout_one(#state{service_queue = SQ0,
case queue:peek(SQ0) of
{value, ConsumerId} ->
case take_next_msg(InitState) of
- {ConsumerMsg, State0, Messages} ->
+ {ConsumerMsg, State0} ->
SQ1 = queue:drop(SQ0),
%% there are consumers waiting to be serviced
%% process consumer checkout
@@ -1229,31 +1298,31 @@ checkout_one(#state{service_queue = SQ0,
update_or_remove_sub(ConsumerId, Con,
Cons0, SQ1, []),
State1 = State0#state{service_queue = SQ,
- messages = Messages,
consumers = Cons},
{State, Msg} =
case ConsumerMsg of
- '$prefix_msg' ->
- {State1, '$prefix_msg'};
+ {'$prefix_msg', _} ->
+ {add_bytes_checkout(ConsumerMsg, State1),
+ ConsumerMsg};
{_, {_, {_, RawMsg} = M}} ->
- {add_bytes_checkout(RawMsg, State1), M}
+ {add_bytes_checkout(RawMsg, State1),
+ M}
end,
{success, ConsumerId, Next, Msg, State};
error ->
%% consumer did not exist but was queued, recurse
checkout_one(InitState#state{service_queue = SQ1})
end;
- error ->
- InitState
+ empty ->
+ {nochange, InitState}
end;
empty ->
case maps:size(Messages0) of
- 0 -> InitState;
+ 0 -> {nochange, InitState};
_ -> {inactive, InitState}
end
end.
-
update_or_remove_sub(ConsumerId, #consumer{lifetime = auto,
credit = 0} = Con,
Cons, ServiceQueue, Effects) ->
@@ -1296,14 +1365,14 @@ update_consumer(ConsumerId, Meta, Spec,
%% general case, single active consumer off
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, Spec,
- #state{consumers = Cons0,
+ #state{consumers = Cons0,
consumer_strategy = single_active} = State0)
when map_size(Cons0) == 0 ->
%% single active consumer on, no one is consuming yet
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
- #state{consumer_strategy = single_active,
- waiting_consumers = WaitingConsumers0} = State0) ->
+ #state{consumer_strategy = single_active,
+ waiting_consumers = WaitingConsumers0} = State0) ->
%% single active consumer on and one active consumer already
%% adding the new consumer to the waiting list
Consumer = #consumer{lifetime = Life, meta = Meta,
@@ -1323,11 +1392,10 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
%% the credit update
N = maps:size(S#consumer.checked_out),
C = max(0, Credit - N),
- S#consumer{lifetime = Life,
- credit = C}
+ S#consumer{lifetime = Life, credit = C}
end, Init, Cons0),
ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons),
- ServiceQueue0),
+ ServiceQueue0),
State0#state{consumers = Cons, service_queue = ServiceQueue}.
@@ -1347,13 +1415,20 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
dehydrate_state(#state{messages = Messages,
consumers = Consumers,
returns = Returns,
- prefix_msg_counts = {PrefRetCnt, MsgCount}} = State) ->
- %% TODO: optimise to avoid having to iterate the queue to get the number
- %% of current returned messages
- RetLen = lqueue:len(Returns), % O(1)
- CurReturns = length([R || R <- lqueue:to_list(Returns),
- R =/= '$prefix_msg']),
- PrefixMsgCnt = MsgCount + maps:size(Messages) - CurReturns,
+ prefix_msgs = {PrefRet0, PrefMsg0}} = State) ->
+ %% TODO: optimise this function as far as possible
+ PrefRet = lists:foldl(fun ({'$prefix_msg', Bytes}, Acc) ->
+ [Bytes | Acc];
+ ({_, {_, {_, Raw}}}, Acc) ->
+ [message_size(Raw) | Acc]
+ end,
+ lists:reverse(PrefRet0),
+ lqueue:to_list(Returns)),
+ PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_H, Raw}}}, Acc) ->
+ [message_size(Raw) | Acc]
+ end,
+ lists:reverse(PrefMsg0),
+ lists:sort(maps:to_list(Messages))),
State#state{messages = #{},
ra_indexes = rabbit_fifo_index:empty(),
low_msg_num = undefined,
@@ -1361,14 +1436,26 @@ dehydrate_state(#state{messages = Messages,
dehydrate_consumer(C)
end, Consumers),
returns = lqueue:new(),
- %% messages include returns
- prefix_msg_counts = {RetLen + PrefRetCnt,
- PrefixMsgCnt}}.
+ prefix_msgs = {lists:reverse(PrefRet),
+ lists:reverse(PrefMsgs)}}.
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
- Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0),
+ Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
+ M;
+ (_, {_, {_, {_, Raw}}}) ->
+ {'$prefix_msg', message_size(Raw)}
+ end, Checked0),
Con#consumer{checked_out = Checked}.
+is_over_limit(#state{max_length = undefined,
+ max_bytes = undefined}) ->
+ false;
+is_over_limit(#state{max_length = MaxLength,
+ max_bytes = MaxBytes,
+ msg_bytes_enqueue = BytesEnq} = State) ->
+
+ messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
+
-spec make_enqueue(maybe(pid()), maybe(msg_seqno()), raw_msg()) -> protocol().
make_enqueue(Pid, Seq, Msg) ->
#enqueue{pid = Pid, seq = Seq, msg = Msg}.
@@ -1405,10 +1492,12 @@ make_purge() -> #purge{}.
make_update_config(Config) ->
#update_config{config = Config}.
-add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) ->
- Bytes = message_size(Msg),
+add_bytes_enqueue(Bytes, #state{msg_bytes_enqueue = Enqueue} = State) ->
State#state{msg_bytes_enqueue = Enqueue + Bytes}.
+add_bytes_drop(Bytes, #state{msg_bytes_enqueue = Enqueue} = State) ->
+ State#state{msg_bytes_enqueue = Enqueue - Bytes}.
+
add_bytes_checkout(Msg, #state{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue } = State) ->
Bytes = message_size(Msg),
@@ -1428,6 +1517,8 @@ add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout,
message_size(#basic_message{content = Content}) ->
#content{payload_fragments_rev = PFR} = Content,
iolist_size(PFR);
+message_size({'$prefix_msg', B}) ->
+ B;
message_size(B) when is_binary(B) ->
byte_size(B);
message_size(Msg) ->
@@ -1588,7 +1679,7 @@ enq_enq_deq_deq_settle_test() ->
{State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} =
apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
State2),
- {_State4, {dequeue, empty}, _} =
+ {_State4, {dequeue, empty}} =
apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}),
State3),
ok.
@@ -1633,7 +1724,7 @@ release_cursor_test() ->
checkout_enq_settle_test() ->
Cid = {?FUNCTION_NAME, self()},
- {State1, [{monitor, _, _}]} = check(Cid, 1, test_init(test)),
+ {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)),
{State2, Effects0} = enq(2, 1, first, State1),
?ASSERT_EFF({send_msg, _,
{delivery, ?FUNCTION_NAME,
@@ -1648,7 +1739,7 @@ checkout_enq_settle_test() ->
out_of_order_enqueue_test() ->
Cid = {?FUNCTION_NAME, self()},
- {State1, [{monitor, _, _}]} = check_n(Cid, 5, 5, test_init(test)),
+ {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
{State2, Effects2} = enq(2, 1, first, State1),
?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
% assert monitor was set up
@@ -1678,7 +1769,7 @@ out_of_order_first_enqueue_test() ->
duplicate_enqueue_test() ->
Cid = {<<"duplicate_enqueue_test">>, self()},
- {State1, [{monitor, _, _}]} = check_n(Cid, 5, 5, test_init(test)),
+ {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
{State2, Effects2} = enq(2, 1, first, State1),
?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
{_State3, Effects3} = enq(3, 1, first, State2),
@@ -1697,10 +1788,10 @@ return_checked_out_test() ->
{State0, [_, _]} = enq(1, 1, first, test_init(test)),
{State1, [_Monitor,
{send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event},
- {aux, active}
+ {aux, active} | _
]} = check(Cid, 2, State0),
% return
- {_State2, _, [_, _]} = apply(meta(3), make_return(Cid, [MsgId]), State1),
+ {_State2, _, [_]} = apply(meta(3), make_return(Cid, [MsgId]), State1),
ok.
return_auto_checked_out_test() ->
@@ -1729,7 +1820,8 @@ cancelled_checkout_out_test() ->
{State1, _} = check_auto(Cid, 2, State0),
% cancelled checkout should return all pending messages to queue
{State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), State1),
- ?assertEqual(2, maps:size(State2#state.messages)),
+ ?assertEqual(1, maps:size(State2#state.messages)),
+ ?assertEqual(1, lqueue:len(State2#state.returns)),
{State3, {dequeue, {0, {_, first}}}, _} =
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2),
@@ -1782,14 +1874,14 @@ down_with_noconnection_returns_unack_test() ->
?assertEqual(0, maps:size(State1#state.messages)),
?assertEqual(0, lqueue:len(State1#state.returns)),
{State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
- ?assertEqual(1, maps:size(State2a#state.messages)),
+ ?assertEqual(0, maps:size(State2a#state.messages)),
?assertEqual(1, lqueue:len(State2a#state.returns)),
ok.
down_with_noproc_enqueuer_is_cleaned_up_test() ->
State00 = test_init(test),
Pid = spawn(fun() -> ok end),
- {State0, _, Effects0} = apply(meta(1), {enqueue, Pid, 1, first}, State00),
+ {State0, _, Effects0} = apply(meta(1), make_enqueue(Pid, 1, first), State00),
?ASSERT_EFF({monitor, process, _}, Effects0),
{State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0),
% ensure there are no enqueuers
@@ -2150,7 +2242,7 @@ single_active_consumer_test() ->
% adding some consumers
AddConsumer = fun(CTag, State) ->
{NewState, _, _} = apply(
- #{},
+ meta(1),
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, self()}},
@@ -2168,7 +2260,8 @@ single_active_consumer_test() ->
?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#state.waiting_consumers)),
% cancelling a waiting consumer
- {State2, _, Effects1} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1),
+ {State2, _, Effects1} = apply(meta(2),
+ #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1),
% the active consumer should still be in place
?assertEqual(1, map_size(State2#state.consumers)),
?assert(maps:is_key({<<"ctag1">>, self()}, State2#state.consumers)),
@@ -2180,7 +2273,7 @@ single_active_consumer_test() ->
?assertEqual(1, length(Effects1)),
% cancelling the active consumer
- {State3, _, Effects2} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2),
+ {State3, _, Effects2} = apply(meta(3), #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2),
% the second registered consumer is now the active one
?assertEqual(1, map_size(State3#state.consumers)),
?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)),
@@ -2191,7 +2284,7 @@ single_active_consumer_test() ->
?assertEqual(2, length(Effects2)),
% cancelling the active consumer
- {State4, _, Effects3} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3),
+ {State4, _, Effects3} = apply(meta(4), #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3),
% the last waiting consumer became the active one
?assertEqual(1, map_size(State4#state.consumers)),
?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)),
@@ -2201,7 +2294,7 @@ single_active_consumer_test() ->
?assertEqual(2, length(Effects3)),
% cancelling the last consumer
- {State5, _, Effects4} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4),
+ {State5, _, Effects4} = apply(meta(5), #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4),
% no active consumer anymore
?assertEqual(0, map_size(State5#state.consumers)),
% still nothing in the waiting list
@@ -2226,7 +2319,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
- #{},
+ #{index => 1},
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, ChannelId}},
@@ -2237,16 +2330,17 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
% the channel of the active consumer goes down
- {State2, _, Effects} = apply(#{}, {down, Pid1, doesnotmatter}, State1),
+ {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, doesnotmatter}, State1),
% fell back to another consumer
?assertEqual(1, map_size(State2#state.consumers)),
% there are still waiting consumers
?assertEqual(2, length(State2#state.waiting_consumers)),
- % effects to unregister the consumer and to update the new active one (metrics) are there
+ % effects to unregister the consumer and
+ % to update the new active one (metrics) are there
?assertEqual(2, length(Effects)),
% the channel of the active consumer and a waiting consumer goes down
- {State3, _, Effects2} = apply(#{}, {down, Pid2, doesnotmatter}, State2),
+ {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, doesnotmatter}, State2),
% fell back to another consumer
?assertEqual(1, map_size(State3#state.consumers)),
% no more waiting consumer
@@ -2255,7 +2349,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
?assertEqual(3, length(Effects2)),
% the last channel goes down
- {State4, _, Effects3} = apply(#{}, {down, Pid3, doesnotmatter}, State3),
+ {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3),
% no more consumers
?assertEqual(0, map_size(State4#state.consumers)),
?assertEqual(0, length(State4#state.waiting_consumers)),
@@ -2271,10 +2365,11 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti
shadow_copy_interval => 0,
single_active_consumer_on => true}),
+ Meta = #{index => 1},
% adding some consumers
AddConsumer = fun(CTag, State) ->
{NewState, _, _} = apply(
- #{},
+ Meta,
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, self()}},
@@ -2293,7 +2388,7 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti
end, State2#state.waiting_consumers),
% simulate node goes back up
- {State3, _, _} = apply(#{}, {nodeup, node(self())}, State2),
+ {State3, _, _} = apply(#{index => 2}, {nodeup, node(self())}, State2),
% all the waiting consumers should be un-suspected
?assertEqual(3, length(State3#state.waiting_consumers)),
@@ -2315,10 +2410,11 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test() ->
Pid2 = spawn(DummyFunction),
Pid3 = spawn(DummyFunction),
+ Meta = #{index => 1},
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
- #{},
+ Meta,
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, ChannelId}},
@@ -2344,10 +2440,11 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test() ->
Pid2 = spawn(DummyFunction),
Pid3 = spawn(DummyFunction),
+ Meta = #{index => 1},
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
- #{},
+ Meta,
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, ChannelId}},
@@ -2371,7 +2468,7 @@ query_consumers_test() ->
% adding some consumers
AddConsumer = fun(CTag, State) ->
{NewState, _, _} = apply(
- #{},
+ #{index => 1},
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, self()}},
@@ -2405,11 +2502,11 @@ query_consumers_when_single_active_consumer_is_on_test() ->
atom_to_binary(?FUNCTION_NAME, utf8)),
shadow_copy_interval => 0,
single_active_consumer_on => true}),
-
+ Meta = #{index => 1},
% adding some consumers
AddConsumer = fun(CTag, State) ->
{NewState, _, _} = apply(
- #{},
+ Meta,
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, self()}},
@@ -2448,7 +2545,7 @@ active_flag_updated_when_consumer_suspected_unsuspected_test() ->
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
- #{},
+ #{index => 1},
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, ChannelId}},
@@ -2458,11 +2555,11 @@ active_flag_updated_when_consumer_suspected_unsuspected_test() ->
State1 = lists:foldl(AddConsumer, State0,
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
- {State2, _, Effects2} = apply(doesnotmatter, {down, Pid1, noconnection}, State1),
+ {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1),
% 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node
?assertEqual(4 + 1, length(Effects2)),
- {_, _, Effects3} = apply(doesnotmatter, {nodeup, node(self())}, State2),
+ {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
% for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID
?assertEqual(4 + 4, length(Effects3)).
@@ -2481,7 +2578,7 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
- #{},
+ #{index => 1},
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, ChannelId}},
@@ -2491,11 +2588,11 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co
State1 = lists:foldl(AddConsumer, State0,
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
- {State2, _, Effects2} = apply(doesnotmatter, {down, Pid1, noconnection}, State1),
+ {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1),
% only 1 effect to monitor the node
?assertEqual(1, length(Effects2)),
- {_, _, Effects3} = apply(doesnotmatter, {nodeup, node(self())}, State2),
+ {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
% for each consumer: 1 effect to monitor the consumer PID
?assertEqual(4, length(Effects3)).
diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl
index f8f414f453..184002611e 100644
--- a/src/rabbit_fifo_index.erl
+++ b/src/rabbit_fifo_index.erl
@@ -4,12 +4,14 @@
empty/0,
fetch/2,
append/3,
+ update_if_present/3,
return/3,
delete/2,
size/1,
smallest/1,
next_key_after/2,
- map/2
+ map/2,
+ to_map/1
]).
-include_lib("ra/include/ra.hrl").
@@ -36,12 +38,22 @@ fetch(Key, #?MODULE{data = Data}) ->
-spec append(integer(), term(), state()) -> state().
append(Key, Value,
#?MODULE{data = Data,
- smallest = Smallest,
- largest = Largest} = State)
+ smallest = Smallest,
+ largest = Largest} = State)
when Key > Largest orelse Largest =:= undefined ->
State#?MODULE{data = maps:put(Key, Value, Data),
- smallest = ra_lib:default(Smallest, Key),
- largest = Key}.
+ smallest = ra_lib:default(Smallest, Key),
+ largest = Key}.
+
+-spec update_if_present(integer(), term(), state()) -> state().
+update_if_present(Key, Value, #?MODULE{data = Data} = State) ->
+ case Data of
+ #{Key := _} ->
+ State#?MODULE{data = maps:put(Key, Value, Data)};
+ _ ->
+ State
+ end.
+
-spec return(integer(), term(), state()) -> state().
return(Key, Value, #?MODULE{data = Data, smallest = Smallest} = State)
@@ -76,6 +88,10 @@ delete(Key, #?MODULE{data = Data} = State) ->
size(#?MODULE{data = Data}) ->
maps:size(Data).
+-spec to_map(state()) -> #{integer() => term()}.
+to_map(#?MODULE{data = Data}) ->
+ Data.
+
-spec smallest(state()) -> undefined | {integer(), term()}.
smallest(#?MODULE{smallest = undefined}) ->
undefined;
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 6c8c99516d..114af4fdfb 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -152,10 +152,15 @@ ra_machine(Q) ->
ra_machine_config(Q = #amqqueue{name = QName,
pid = {Name, _}}) ->
+ %% take the minimum value of the policy and the queue arg if present
+ MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
+ MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
#{name => Name,
queue_resource => QName,
dead_letter_handler => dlx_mfa(Q),
become_leader_handler => {?MODULE, become_leader, [QName]},
+ max_length => MaxLength,
+ max_bytes => MaxBytes,
single_active_consumer_on => single_active_consumer_on(Q)}.
single_active_consumer_on(#amqqueue{arguments = QArguments}) ->
@@ -641,8 +646,10 @@ delete_member(#amqqueue{pid = {RaName, _}, name = QName}, Node) ->
%%----------------------------------------------------------------------------
dlx_mfa(Q) ->
- DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, fun res_arg/2, Q), Q),
- DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>, fun res_arg/2, Q),
+ DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>,
+ fun res_arg/2, Q), Q),
+ DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>,
+ fun res_arg/2, Q),
{?MODULE, dead_letter_publish, [DLX, DLXRKey, Q#amqqueue.name]}.
init_dlx(undefined, _Q) ->
@@ -834,9 +841,8 @@ qnode({_, Node}) ->
Node.
check_invalid_arguments(QueueName, Args) ->
- Keys = [<<"x-expires">>, <<"x-message-ttl">>, <<"x-max-length">>,
- <<"x-max-length-bytes">>, <<"x-max-priority">>, <<"x-overflow">>,
- <<"x-queue-mode">>],
+ Keys = [<<"x-expires">>, <<"x-message-ttl">>,
+ <<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>],
[case rabbit_misc:table_lookup(Args, Key) of
undefined -> ok;
_TypeVal -> rabbit_misc:protocol_error(
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl
index 5864c387c5..433bc66bff 100644
--- a/test/backing_queue_SUITE.erl
+++ b/test/backing_queue_SUITE.erl
@@ -723,7 +723,7 @@ bq_queue_recover1(Config) ->
true, false, [], none, <<"acting-user">>),
publish_and_confirm(Q, <<>>, Count),
- SupPid = rabbit_ct_broker_helpers:get_queue_sup_pid(Q),
+ SupPid = get_queue_sup_pid(Q),
true = is_pid(SupPid),
exit(SupPid, kill),
exit(QPid, kill),
@@ -751,6 +751,22 @@ bq_queue_recover1(Config) ->
end),
passed.
+%% Return the PID of the given queue's supervisor.
+get_queue_sup_pid(#amqqueue { pid = QPid, name = QName }) ->
+ VHost = QName#resource.virtual_host,
+ {ok, AmqSup} = rabbit_amqqueue_sup_sup:find_for_vhost(VHost, node(QPid)),
+ Sups = supervisor:which_children(AmqSup),
+ get_queue_sup_pid(Sups, QPid).
+
+get_queue_sup_pid([{_, SupPid, _, _} | Rest], QueuePid) ->
+ WorkerPids = [Pid || {_, Pid, _, _} <- supervisor:which_children(SupPid)],
+ case lists:member(QueuePid, WorkerPids) of
+ true -> SupPid;
+ false -> get_queue_sup_pid(Rest, QueuePid)
+ end;
+get_queue_sup_pid([], _QueuePid) ->
+ undefined.
+
variable_queue_dynamic_duration_change(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, variable_queue_dynamic_duration_change1, [Config]).
diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl
index d1158ef07a..fbc1e81827 100644
--- a/test/dynamic_qq_SUITE.erl
+++ b/test/dynamic_qq_SUITE.erl
@@ -133,11 +133,9 @@ force_delete_if_no_consensus(Config) ->
arguments = Args,
durable = true,
passive = true})),
- %% TODO implement a force delete
BCh2 = rabbit_ct_client_helpers:open_channel(Config, B),
- ?assertExit({{shutdown,
- {connection_closing, {server_initiated_close, 541, _}}}, _},
- amqp_channel:call(BCh2, #'queue.delete'{queue = QName})),
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(BCh2, #'queue.delete'{queue = QName})),
ok.
takeover_on_failure(Config) ->
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index dcba910a6a..48dac3ca57 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -22,6 +22,7 @@
-import(quorum_queue_utils, [wait_for_messages_ready/3,
wait_for_messages_pending_ack/3,
+ wait_for_messages_total/3,
dirty_query/3,
ra_name/1]).
@@ -37,6 +38,7 @@ all() ->
groups() ->
[
{single_node, [], all_tests()},
+ {single_node, [], memory_tests()},
{unclustered, [], [
{cluster_size_2, [], [add_member]}
]},
@@ -51,6 +53,7 @@ groups() ->
delete_member_not_found,
delete_member]
++ all_tests()},
+ {cluster_size_2, [], memory_tests()},
{cluster_size_3, [], [
declare_during_node_down,
simple_confirm_availability_on_leader_change,
@@ -61,7 +64,8 @@ groups() ->
delete_declare,
metrics_cleanup_on_leadership_takeover,
metrics_cleanup_on_leader_crash,
- consume_in_minority]},
+ consume_in_minority
+ ]},
{cluster_size_5, [], [start_queue,
start_queue_concurrent,
quorum_cluster_size_3,
@@ -126,6 +130,11 @@ all_tests() ->
consume_redelivery_count,
subscribe_redelivery_count,
message_bytes_metrics,
+ queue_length_limit_drop_head
+ ].
+
+memory_tests() ->
+ [
memory_alarm_rolls_wal
].
@@ -240,7 +249,9 @@ declare_args(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
LQ = ?config(queue_name, Config),
- declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
+ declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-length">>, long, 2000},
+ {<<"x-max-length-bytes">>, long, 2000}]),
assert_queue_type(Server, LQ, quorum),
DQ = <<"classic-declare-args-q">>,
@@ -293,16 +304,6 @@ declare_invalid_args(Config) ->
declare(rabbit_ct_client_helpers:open_channel(Config, Server),
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-message-ttl">>, long, 2000}])),
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-max-length">>, long, 2000}])),
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-max-length-bytes">>, long, 2000}])),
?assertExit(
{{shutdown, {server_initiated_close, 406, _}}, _},
@@ -314,7 +315,7 @@ declare_invalid_args(Config) ->
{{shutdown, {server_initiated_close, 406, _}}, _},
declare(rabbit_ct_client_helpers:open_channel(Config, Server),
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-overflow">>, longstr, <<"drop-head">>}])),
+ {<<"x-overflow">>, longstr, <<"reject-publish">>}])),
?assertExit(
{{shutdown, {server_initiated_close, 406, _}}, _},
@@ -1422,7 +1423,7 @@ metrics_cleanup_on_leadership_takeover(Config) ->
_ -> false
end
end),
- force_leader_change(Leader, Servers, QQ),
+ force_leader_change(Servers, QQ),
wait_until(fun () ->
[] =:= rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes]) andalso
[] =:= rpc:call(Leader, ets, lookup, [queue_metrics, QRes])
@@ -2151,6 +2152,32 @@ memory_alarm_rolls_wal(Config) ->
timer:sleep(1000),
ok.
+queue_length_limit_drop_head(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-length">>, long, 1}])),
+
+ RaName = ra_name(QQ),
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = QQ},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = <<"msg1">>}),
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = QQ},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = <<"msg2">>}),
+ wait_for_consensus(QQ, Config),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages_total(Servers, RaName, 1),
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})).
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->
@@ -2201,6 +2228,9 @@ filter_queues(Expected, Got) ->
lists:member(K, Keys)
end, Got).
+publish_many(Ch, Queue, Count) ->
+ [publish(Ch, Queue) || _ <- lists:seq(1, Count)].
+
publish(Ch, Queue) ->
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = Queue},
@@ -2268,14 +2298,16 @@ wait_until(Condition, N) ->
wait_until(Condition, N - 1)
end.
-force_leader_change(Leader, Servers, Q) ->
+
+force_leader_change([Server | _] = Servers, Q) ->
RaName = ra_name(Q),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
[F1, _] = Servers -- [Leader],
ok = rpc:call(F1, ra, trigger_election, [{RaName, F1}]),
case ra:members({RaName, Leader}) of
{ok, _, {_, Leader}} ->
%% Leader has been re-elected
- force_leader_change(Leader, Servers, Q);
+ force_leader_change(Servers, Q);
{ok, _, _} ->
%% Leader has changed
ok
@@ -2297,3 +2329,8 @@ get_message_bytes(Leader, QRes) ->
_ ->
[]
end.
+
+wait_for_consensus(Name, Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ RaName = ra_name(Name),
+ {ok, _, _} = ra:members({RaName, Server}).
diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl
index a216c220e6..6b820c7b5c 100644
--- a/test/quorum_queue_utils.erl
+++ b/test/quorum_queue_utils.erl
@@ -5,6 +5,7 @@
-export([
wait_for_messages_ready/3,
wait_for_messages_pending_ack/3,
+ wait_for_messages_total/3,
dirty_query/3,
ra_name/1
]).
@@ -17,6 +18,10 @@ wait_for_messages_pending_ack(Servers, QName, Ready) ->
wait_for_messages(Servers, QName, Ready,
fun rabbit_fifo:query_messages_checked_out/1, 60).
+wait_for_messages_total(Servers, QName, Total) ->
+ wait_for_messages(Servers, QName, Total,
+ fun rabbit_fifo:query_messages_total/1, 60).
+
wait_for_messages(Servers, QName, Number, Fun, 0) ->
Msgs = dirty_query(Servers, QName, Fun),
Totals = lists:map(fun(M) when is_map(M) ->
@@ -28,8 +33,8 @@ wait_for_messages(Servers, QName, Number, Fun, 0) ->
wait_for_messages(Servers, QName, Number, Fun, N) ->
Msgs = dirty_query(Servers, QName, Fun),
ct:pal("Got messages ~p", [Msgs]),
- case lists:all(fun(M) when is_map(M) ->
- maps:size(M) == Number;
+ case lists:all(fun(C) when is_integer(C) ->
+ C == Number;
(_) ->
false
end, Msgs) of
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 0512e8161a..6df61d4288 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -124,7 +124,6 @@ basics(Config) ->
{ra_event, Frm, E} ->
case rabbit_fifo_client:handle_ra_event(Frm, E, FState6b) of
{internal, _, _, _FState7} ->
- ct:pal("unexpected event ~p~n", [E]),
exit({unexpected_internal_event, E});
{{delivery, Ctag, [{Mid, {_, two}}]}, FState7} ->
{ok, _S} = rabbit_fifo_client:return(Ctag, [Mid], FState7),
@@ -218,9 +217,7 @@ usage(Config) ->
% force tick and usage stats emission
ServerId ! tick_timeout,
timer:sleep(50),
- % ct:pal("ets ~w ~w ~w", [ets:tab2list(rabbit_fifo_usage), ServerId, UId]),
Use = rabbit_fifo:usage(element(1, ServerId)),
- ct:pal("Use ~w~n", [Use]),
ra:stop_server(ServerId),
?assert(Use > 0.0),
ok.
@@ -300,6 +297,7 @@ returns_after_down(Config) ->
Self ! checkout_done
end),
receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end,
+ timer:sleep(1000),
% message should be available for dequeue
{ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
ra:stop_server(ServerId),
@@ -380,7 +378,6 @@ discard(Config) ->
{ok, empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3),
receive
{dead_letter, Letters} ->
- ct:pal("dead letters ~p~n", [Letters]),
[{_, msg1}] = Letters,
ok
after 500 ->
@@ -481,16 +478,13 @@ test_queries(Config) ->
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
{ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0),
{ok, {_, Ready}, _} = ra:local_query(ServerId,
- fun rabbit_fifo:query_messages_ready/1),
- ?assertEqual(1, maps:size(Ready)),
- ct:pal("Ready ~w~n", [Ready]),
+ fun rabbit_fifo:query_messages_ready/1),
+ ?assertEqual(1, Ready),
{ok, {_, Checked}, _} = ra:local_query(ServerId,
- fun rabbit_fifo:query_messages_checked_out/1),
- ?assertEqual(1, maps:size(Checked)),
- ct:pal("Checked ~w~n", [Checked]),
+ fun rabbit_fifo:query_messages_checked_out/1),
+ ?assertEqual(1, Checked),
{ok, {_, Processes}, _} = ra:local_query(ServerId,
- fun rabbit_fifo:query_processes/1),
- ct:pal("Processes ~w~n", [Processes]),
+ fun rabbit_fifo:query_processes/1),
?assertEqual(2, length(Processes)),
P ! stop,
ra:stop_server(ServerId),
@@ -565,7 +559,6 @@ process_ra_events(State, Acc, Wait) ->
process_ra_events0(State0, Acc, Actions0, Wait, DeliveryFun) ->
receive
{ra_event, From, Evt} ->
- % ct:pal("ra event ~w~n", [Evt]),
case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
{internal, _, Actions, State} ->
process_ra_events0(State, Acc, Actions0 ++ Actions,
@@ -588,7 +581,6 @@ discard_next_delivery(State0, Wait) ->
discard_next_delivery(State, Wait);
{{delivery, Tag, Msgs}, State1} ->
MsgIds = [element(1, M) || M <- Msgs],
- ct:pal("discarding ~p", [Msgs]),
{ok, State} = rabbit_fifo_client:discard(Tag, MsgIds,
State1),
State
@@ -605,7 +597,6 @@ return_next_delivery(State0, Wait) ->
return_next_delivery(State, Wait);
{{delivery, Tag, Msgs}, State1} ->
MsgIds = [element(1, M) || M <- Msgs],
- ct:pal("returning ~p", [Msgs]),
{ok, State} = rabbit_fifo_client:return(Tag, MsgIds,
State1),
State
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index a8604b46af..5643da1991 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -25,7 +25,17 @@ all_tests() ->
scenario1,
scenario2,
scenario3,
- scenario4
+ scenario4,
+ scenario5,
+ scenario6,
+ scenario7,
+ scenario8,
+ scenario9,
+ scenario10,
+ scenario11,
+ scenario12,
+ scenario13,
+ scenario14
].
groups() ->
@@ -73,7 +83,7 @@ scenario1(_Config) ->
make_return(C2, [1]), %% E2 in returns E1 with C2
make_settle(C2, [2]) %% E2 with C2
],
- run_snapshot_test(?FUNCTION_NAME, Commands),
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
ok.
scenario2(_Config) ->
@@ -88,7 +98,7 @@ scenario2(_Config) ->
make_settle(C1, [0]),
make_settle(C2, [0])
],
- run_snapshot_test(?FUNCTION_NAME, Commands),
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
ok.
scenario3(_Config) ->
@@ -102,7 +112,7 @@ scenario3(_Config) ->
make_settle(C1, [1]),
make_settle(C1, [2])
],
- run_snapshot_test(?FUNCTION_NAME, Commands),
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
ok.
scenario4(_Config) ->
@@ -112,19 +122,147 @@ scenario4(_Config) ->
make_enqueue(E,1,msg),
make_settle(C1, [0])
],
- run_snapshot_test(?FUNCTION_NAME, Commands),
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
+ ok.
+
+scenario5(_Config) ->
+ C1 = {<<>>, c:pid(0,505,0)},
+ E = c:pid(0,465,9),
+ Commands = [make_enqueue(E,1,<<0>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,2,<<>>),
+ make_settle(C1,[0])],
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
+ ok.
+
+scenario6(_Config) ->
+ E = c:pid(0,465,9),
+ Commands = [make_enqueue(E,1,<<>>), %% 1 msg on queue - snap: prefix 1
+ make_enqueue(E,2,<<>>) %% 1. msg on queue - snap: prefix 1
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario7(_Config) ->
+ C1 = {<<>>, c:pid(0,208,0)},
+ E = c:pid(0,188,0),
+ Commands = [
+ make_enqueue(E,1,<<>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,2,<<>>),
+ make_enqueue(E,3,<<>>),
+ make_settle(C1,[0])],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario8(_Config) ->
+ C1 = {<<>>, c:pid(0,208,0)},
+ E = c:pid(0,188,0),
+ Commands = [
+ make_enqueue(E,1,<<>>),
+ make_enqueue(E,2,<<>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ % make_checkout(C1, cancel),
+ {down, E, noconnection},
+ make_settle(C1, [0])],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario9(_Config) ->
+ E = c:pid(0,188,0),
+ Commands = [
+ make_enqueue(E,1,<<>>),
+ make_enqueue(E,2,<<>>),
+ make_enqueue(E,3,<<>>)],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario10(_Config) ->
+ C1 = {<<>>, c:pid(0,208,0)},
+ E = c:pid(0,188,0),
+ Commands = [
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,1,<<>>),
+ make_settle(C1, [0])
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario11(_Config) ->
+ C1 = {<<>>, c:pid(0,215,0)},
+ E = c:pid(0,217,0),
+ Commands = [
+ make_enqueue(E,1,<<>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_checkout(C1, cancel),
+ make_enqueue(E,2,<<>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_settle(C1, [0]),
+ make_checkout(C1, cancel)
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 2}, Commands),
+ ok.
+
+scenario12(_Config) ->
+ E = c:pid(0,217,0),
+ Commands = [make_enqueue(E,1,<<0>>),
+ make_enqueue(E,2,<<0>>),
+ make_enqueue(E,3,<<0>>)],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_bytes => 2}, Commands),
+ ok.
+
+scenario13(_Config) ->
+ E = c:pid(0,217,0),
+ Commands = [make_enqueue(E,1,<<0>>),
+ make_enqueue(E,2,<<>>),
+ make_enqueue(E,3,<<>>),
+ make_enqueue(E,4,<<>>)
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 2}, Commands),
+ ok.
+
+scenario14(_Config) ->
+ E = c:pid(0,217,0),
+ Commands = [make_enqueue(E,1,<<0,0>>)],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_bytes => 1}, Commands),
ok.
snapshots(_Config) ->
run_proper(
fun () ->
- ?FORALL(O, ?LET(Ops, log_gen(), expand(Ops)),
- test1_prop(O))
- end, [], 1000).
-
-test1_prop(Commands) ->
- ct:pal("Commands: ~p~n", [Commands]),
- try run_snapshot_test(?FUNCTION_NAME, Commands) of
+ ?FORALL({Length, Bytes, SingleActiveConsumer},
+ frequency([{10, {0, 0, false}},
+ {5, {non_neg_integer(), non_neg_integer(),
+ boolean()}}]),
+ ?FORALL(O, ?LET(Ops, log_gen(200), expand(Ops)),
+ collect({Length, Bytes},
+ snapshots_prop(
+ config(?FUNCTION_NAME,
+ Length, Bytes,
+ SingleActiveConsumer), O))))
+ end, [], 2000).
+
+config(Name, Length, Bytes, SingleActive) ->
+ #{name => Name,
+ max_length => map_max(Length),
+ max_bytes => map_max(Bytes),
+ single_active_consumer_on => SingleActive}.
+
+map_max(0) -> undefined;
+map_max(N) -> N.
+
+snapshots_prop(Conf, Commands) ->
+ ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]),
+ try run_snapshot_test(Conf, Commands) of
_ -> true
catch
Err ->
@@ -132,10 +270,10 @@ test1_prop(Commands) ->
false
end.
-log_gen() ->
+log_gen(Size) ->
?LET(EPids, vector(2, pid_gen()),
?LET(CPids, vector(2, pid_gen()),
- resize(200,
+ resize(Size,
list(
frequency(
[{20, enqueue_gen(oneof(EPids))},
@@ -157,15 +295,17 @@ down_gen(Pid) ->
?LET(E, {down, Pid, oneof([noconnection, noproc])}, E).
enqueue_gen(Pid) ->
- ?LET(E, {enqueue, Pid, frequency([{10, enqueue},
- {1, delay}])}, E).
+ ?LET(E, {enqueue, Pid,
+ frequency([{10, enqueue},
+ {1, delay}]),
+ binary()}, E).
checkout_cancel_gen(Pid) ->
{checkout, Pid, cancel}.
checkout_gen(Pid) ->
%% pid, tag, prefetch
- ?LET(C, {checkout, {binary(), Pid}, choose(1, 10)}, C).
+ ?LET(C, {checkout, {binary(), Pid}, choose(1, 100)}, C).
-record(t, {state = rabbit_fifo:init(#{name => proper,
@@ -193,9 +333,10 @@ expand(Ops) ->
lists:reverse(Log).
-handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0,
- down = Down,
- effects = Effs} = T) ->
+handle_op({enqueue, Pid, When, Data},
+ #t{enqueuers = Enqs0,
+ down = Down,
+ effects = Effs} = T) ->
case Down of
#{Pid := noproc} ->
%% if it's a noproc then it cannot exist - can it?
@@ -204,13 +345,12 @@ handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0,
_ ->
Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0),
MsgSeq = maps:get(Pid, Enqs),
- Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, msg),
+ Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, Data),
case When of
enqueue ->
do_apply(Cmd, T#t{enqueuers = Enqs});
delay ->
%% just put the command on the effects queue
- ct:pal("delaying ~w", [Cmd]),
T#t{effects = queue:in(Cmd, Effs)}
end
end;
@@ -308,7 +448,6 @@ enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) ->
Cmd = rabbit_fifo:make_settle({CTag, P}, MsgIds),
enq_effs(Rem, queue:in(Cmd, Q));
enq_effs([_ | Rem], Q) ->
- % ct:pal("enq_effs dropping ~w~n", [E]),
enq_effs(Rem, Q).
@@ -323,29 +462,40 @@ run_proper(Fun, Args, NumTests) ->
(F, A) -> ct:pal(?LOW_IMPORTANCE, F, A)
end}])).
-run_snapshot_test(Name, Commands) ->
+run_snapshot_test(Conf, Commands) ->
%% create every incremental permuation of the commands lists
%% and run the snapshot tests against that
[begin
% ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]),
- run_snapshot_test0(Name, C)
+ run_snapshot_test0(Conf, C)
end || C <- prefixes(Commands, 1, [])].
-run_snapshot_test0(Name, Commands) ->
+run_snapshot_test0(Conf, Commands) ->
Indexes = lists:seq(1, length(Commands)),
Entries = lists:zip(Indexes, Commands),
- {State, Effects} = run_log(test_init(Name), Entries),
+ {State, Effects} = run_log(test_init(Conf), Entries),
+ % ct:pal("beginning snapshot test run for ~w numn commands ~b",
+ % [maps:get(name, Conf), length(Commands)]),
[begin
+ %% drop all entries below and including the snapshot
Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
end, Entries),
{S, _} = run_log(SnapState, Filtered),
% assert log can be restored from any release cursor index
- ?assertEqual(State, S)
+ case S of
+ State -> ok;
+ _ ->
+ ct:pal("Snapshot tests failed run log:~n"
+ "~p~n from ~n~p~n Entries~n~p~n",
+ [Filtered, SnapState, Entries]),
+ ?assertEqual(State, S)
+ end
end || {release_cursor, SnapIdx, SnapState} <- Effects],
ok.
+%% transforms [1,2,3] into [[1,2,3], [1,2], [1]]
prefixes(Source, N, Acc) when N > length(Source) ->
lists:reverse(Acc);
prefixes(Source, N, Acc) ->
@@ -364,11 +514,12 @@ run_log(InitState, Entries) ->
end
end, {InitState, []}, Entries).
-test_init(Name) ->
- rabbit_fifo:init(#{name => Name,
- queue_resource => blah,
- shadow_copy_interval => 0,
- metrics_handler => {?MODULE, metrics_handler, []}}).
+test_init(Conf) ->
+ Default = #{queue_resource => blah,
+ shadow_copy_interval => 0,
+ metrics_handler => {?MODULE, metrics_handler, []}},
+ rabbit_fifo:init(maps:merge(Default, Conf)).
+
meta(Idx) ->
#{index => Idx, term => 1}.
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index 0343e7d136..581440d179 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -35,9 +35,15 @@ all() ->
].
groups() ->
- MaxLengthTests = [max_length_drop_head,
+ MaxLengthTests = [max_length_default,
+ max_length_bytes_default,
+ max_length_drop_head,
+ max_length_bytes_drop_head,
max_length_reject_confirm,
- max_length_drop_publish],
+ max_length_bytes_reject_confirm,
+ max_length_drop_publish,
+ max_length_drop_publish_requeue,
+ max_length_bytes_drop_publish],
[
{parallel_tests, [parallel], [
amqp_connection_refusal,
@@ -59,11 +65,16 @@ groups() ->
set_disk_free_limit_command,
set_vm_memory_high_watermark_command,
topic_matching,
+ max_message_size,
+
{queue_max_length, [], [
- {max_length_simple, [], MaxLengthTests},
- {max_length_mirrored, [], MaxLengthTests}]},
- max_message_size
- ]}
+ {max_length_classic, [], MaxLengthTests},
+ {max_length_quorum, [], [max_length_default,
+ max_length_bytes_default]
+ },
+ {max_length_mirrored, [], MaxLengthTests}
+ ]}
+ ]}
].
suite() ->
@@ -82,10 +93,23 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
+init_per_group(max_length_classic, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, false}]);
+init_per_group(max_length_quorum, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
init_per_group(max_length_mirrored, Config) ->
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
<<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
- Config1 = rabbit_ct_helpers:set_config(Config, [{is_mirrored, true}]),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [{is_mirrored, true},
+ {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, false}]),
rabbit_ct_helpers:run_steps(Config1, []);
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
@@ -132,29 +156,22 @@ end_per_group(Group, Config) ->
end.
init_per_testcase(Testcase, Config) ->
- rabbit_ct_helpers:testcase_started(Config, Testcase).
-
-end_per_testcase(max_length_drop_head = Testcase, Config) ->
+ Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
+ Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q}]),
+ rabbit_ct_helpers:testcase_started(Config1, Testcase).
+
+end_per_testcase(Testcase, Config)
+ when Testcase == max_length_drop_publish; Testcase == max_length_bytes_drop_publish;
+ Testcase == max_length_drop_publish_requeue;
+ Testcase == max_length_reject_confirm; Testcase == max_length_bytes_reject_confirm;
+ Testcase == max_length_drop_head; Testcase == max_length_bytes_drop_head;
+ Testcase == max_length_default; Testcase == max_length_bytes_default ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_head_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_default_drop_head_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_drop_head_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_default_drop_head_queue">>}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
rabbit_ct_helpers:testcase_finished(Config, Testcase);
-end_per_testcase(max_length_reject_confirm = Testcase, Config) ->
- {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_reject_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_reject_queue">>}),
- rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
- rabbit_ct_helpers:testcase_finished(Config, Testcase);
-end_per_testcase(max_length_drop_publish = Testcase, Config) ->
- {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_publish_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_drop_publish_queue">>}),
- rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
- rabbit_ct_helpers:testcase_finished(Config, Testcase);
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
@@ -1159,43 +1176,66 @@ set_vm_memory_high_watermark_command1(_Config) ->
)
end.
-max_length_drop_head(Config) ->
+max_length_bytes_drop_head(Config) ->
+ max_length_bytes_drop_head(Config, [{<<"x-overflow">>, longstr, <<"drop-head">>}]).
+
+max_length_bytes_default(Config) ->
+ max_length_bytes_drop_head(Config, []).
+
+max_length_bytes_drop_head(Config, ExtraArgs) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- QName = <<"max_length_drop_head_queue">>,
- QNameDefault = <<"max_length_default_drop_head_queue">>,
- QNameBytes = <<"max_length_bytes_drop_head_queue">>,
- QNameDefaultBytes = <<"max_length_bytes_default_drop_head_queue">>,
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
- MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
- OverflowArgs = [{<<"x-overflow">>, longstr, <<"drop-head">>}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefault, arguments = MaxLengthArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefaultBytes, arguments = MaxLengthBytesArgs}),
-
- check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
- check_max_length_drops_head(Config, QNameDefault, Ch, <<"1">>, <<"2">>, <<"3">>),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthBytesArgs ++ Args ++ ExtraArgs, durable = Durable}),
%% 80 bytes payload
Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>,
Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>,
- check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3),
- check_max_length_drops_head(Config, QNameDefault, Ch, Payload1, Payload2, Payload3).
+ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3).
+
+max_length_drop_head(Config) ->
+ max_length_drop_head(Config, [{<<"x-overflow">>, longstr, <<"drop-head">>}]).
+
+max_length_default(Config) ->
+ %% Defaults to drop_head
+ max_length_drop_head(Config, []).
+
+max_length_drop_head(Config, ExtraArgs) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+
+ MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ Args ++ ExtraArgs, durable = Durable}),
+
+ check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>).
max_length_reject_confirm(Config) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- QName = <<"max_length_reject_queue">>,
- QNameBytes = <<"max_length_bytes_reject_queue">>,
+ Args = ?config(queue_args, Config),
+ QName = ?config(queue_name, Config),
+ Durable = ?config(queue_durable, Config),
MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
- MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
- check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
+ check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>).
+
+max_length_bytes_reject_confirm(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ QNameBytes = ?config(queue_name, Config),
+ Durable = ?config(queue_durable, Config),
+ MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs ++ Args, durable = Durable}),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
%% 80 bytes payload
Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
@@ -1207,15 +1247,55 @@ max_length_reject_confirm(Config) ->
max_length_drop_publish(Config) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- QName = <<"max_length_drop_publish_queue">>,
- QNameBytes = <<"max_length_bytes_drop_publish_queue">>,
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
- MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}),
%% If confirms are not enable, publishes will still be dropped in reject-publish mode.
- check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
+ check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>).
+
+max_length_drop_publish_requeue(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+ MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}),
+ %% If confirms are not enable, publishes will still be dropped in reject-publish mode.
+ check_max_length_requeue(Config, QName, Ch, <<"1">>, <<"2">>).
+
+check_max_length_requeue(Config, QName, Ch, Payload1, Payload2) ->
+ sync_mirrors(QName, Config),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ %% A single message is published and consumed
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
+ {#'basic.get_ok'{delivery_tag = DeliveryTag},
+ #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Another message is published
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ wait_for_consensus(QName, Config),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+ wait_for_consensus(QName, Config),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+
+max_length_bytes_drop_publish(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QNameBytes = ?config(queue_name, Config),
+ MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs ++ Args, durable = Durable}),
%% 80 bytes payload
Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
@@ -1229,22 +1309,38 @@ check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3)
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% A single message is published and consumed
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% Message 2 is dropped, message 1 stays
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% Messages 2 and 3 are dropped, message 1 stays
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ wait_for_consensus(QName, Config),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+wait_for_consensus(QName, Config) ->
+ case lists:keyfind(<<"x-queue-type">>, 1, ?config(queue_args, Config)) of
+ {_, _, <<"quorum">>} ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ RaName = binary_to_atom(<<"%2F_", QName/binary>>, utf8),
+ {ok, _, _} = ra:members({RaName, Server});
+ _ ->
+ ok
+ end.
+
check_max_length_rejects(Config, QName, Ch, Payload1, Payload2, Payload3) ->
sync_mirrors(QName, Config),
amqp_channel:register_confirm_handler(Ch, self()),
@@ -1283,12 +1379,14 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) ->
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% A single message is published and consumed
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% Message 1 is replaced by message 2
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
@@ -1296,6 +1394,7 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) ->
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload3}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).