diff options
| -rw-r--r-- | .github/ISSUE_TEMPLATE.md | 43 | ||||
| -rw-r--r-- | .github/PULL_REQUEST_TEMPLATE.md | 43 | ||||
| -rw-r--r-- | CONTRIBUTING.md | 104 | ||||
| -rw-r--r-- | Makefile | 7 | ||||
| -rw-r--r-- | docs/rabbitmq.conf.example | 144 | ||||
| -rw-r--r-- | docs/rabbitmq.config.example | 134 | ||||
| -rw-r--r-- | priv/schema/rabbit.schema | 27 | ||||
| -rw-r--r-- | rabbitmq-components.mk | 18 | ||||
| -rw-r--r-- | src/gm.erl | 29 | ||||
| -rw-r--r-- | src/rabbit.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 95 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_looking_glass.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_priority_queue.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_queue_location_validator.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 94 | ||||
| -rw-r--r-- | src/rabbit_version.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_vm.erl | 57 | ||||
| -rw-r--r-- | src/tcp_listener_sup.erl | 3 | ||||
| -rw-r--r-- | test/clustering_management_SUITE.erl | 12 | ||||
| -rw-r--r-- | test/config_schema_SUITE_data/rabbit.snippets | 20 |
26 files changed, 669 insertions, 325 deletions
diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md new file mode 100644 index 0000000000..70b54cd818 --- /dev/null +++ b/.github/ISSUE_TEMPLATE.md @@ -0,0 +1,43 @@ +Thank you for using RabbitMQ and for taking the time to report an +issue. + +## Does This Belong to GitHub or RabbitMQ Mailing List? + +*Important:* please first read the `CONTRIBUTING.md` document in the +root of this repository. It will help you determine whether your +feedback should be directed to the RabbitMQ mailing list [1] instead. + +## Please Help Maintainers and Contributors Help You + +In order for the RabbitMQ team to investigate your issue, please provide +**as much as possible** of the following details: + +* RabbitMQ version +* Erlang version +* RabbitMQ server and client application log files +* A runnable code sample, terminal transcript or detailed set of + instructions that can be used to reproduce the issue +* RabbitMQ plugin information via `rabbitmq-plugins list` +* Client library version (for all libraries used) +* Operating system, version, and patch level + +Running the `rabbitmq-collect-env` [2] script can provide most of the +information needed. Please make the archive available via a third-party +service and note that **the script does not attempt to scrub any +sensitive data**. + +If your issue involves RabbitMQ management UI or HTTP API, please also provide +the following: + + * Browser and its version + * What management UI page was used (if applicable) + * How the HTTP API requests performed can be reproduced with `curl` + * Operating system on which you are running your browser, and its version + * Errors reported in the JavaScript console (if any) + +This information **greatly speeds up issue investigation** (or makes it +possible to investigate it at all). Please help project maintainers and +contributors to help you by providing it! + +1. https://groups.google.com/forum/#!forum/rabbitmq-users +2. https://github.com/rabbitmq/support-tools/blob/master/scripts/rabbitmq-collect-env diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 0000000000..4bd618567b --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,43 @@ +## Proposed Changes + +Please describe the big picture of your changes here to communicate to the +RabbitMQ team why we should accept this pull request. If it fixes a bug or +resolves a feature request, be sure to link to that issue. + +A pull request that doesn't explain **why** the change was made has a much +lower chance of being accepted. + +If English isn't your first language, don't worry about it and try to +communicate the problem you are trying to solve to the best of your abilities. +As long as we can understand the intent, it's all good. + +## Types of Changes + +What types of changes does your code introduce to this project? +_Put an `x` in the boxes that apply_ + +- [ ] Bugfix (non-breaking change which fixes issue #NNNN) +- [ ] New feature (non-breaking change which adds functionality) +- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) +- [ ] Documentation (correction or otherwise) +- [ ] Cosmetics (whitespace, appearance) + +## Checklist + +_Put an `x` in the boxes that apply. You can also fill these out after creating +the PR. If you're unsure about any of them, don't hesitate to ask on the +mailing list. We're here to help! This is simply a reminder of what we are +going to look for before merging your code._ + +- [ ] I have read the `CONTRIBUTING.md` document +- [ ] I have signed the CA (see https://cla.pivotal.io/sign/rabbitmq) +- [ ] All tests pass locally with my changes +- [ ] I have added tests that prove my fix is effective or that my feature works +- [ ] I have added necessary documentation (if appropriate) +- [ ] Any dependent changes have been merged and published in related repositories + +## Further Comments + +If this is a relatively large or complex change, kick off the discussion by +explaining why you chose the solution you did and what alternatives you +considered, etc. diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index daaea42b18..29c7fa2748 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,29 +1,80 @@ +Thank you for using RabbitMQ and for taking the time to contribute to the project. +This document has two main parts: + + * when and how to file GitHub issues for RabbitMQ projects + * how to submit pull requests + +They intend to save you and RabbitMQ maintainers some time, so please +take a moment to read through them. + ## Overview -RabbitMQ projects use pull requests to discuss, collaborate on and accept code contributions. -Pull requests is the primary place of discussing code changes. +### GitHub issues -## How to Contribute +The RabbitMQ team uses GitHub issues for _specific actionable items_ that +engineers can work on. This assumes the following: -The process is fairly standard: +* GitHub issues are not used for questions, investigations, root cause + analysis, discussions of potential issues, etc (as defined by this team) +* Enough information is provided by the reporter for maintainers to work with - * Fork the repository or repositories you plan on contributing to - * Clone [RabbitMQ umbrella repository](https://github.com/rabbitmq/rabbitmq-public-umbrella) - * `cd umbrella`, `make co` - * Create a branch with a descriptive name in the relevant repositories - * Make your changes, run tests, commit with a [descriptive message](http://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html), push to your fork - * Submit pull requests with an explanation what has been changed and **why** - * Submit a filled out and signed [Contributor Agreement](https://github.com/rabbitmq/ca#how-to-submit) if needed (see below) - * Be patient. We will get to your pull request eventually +The team receives many questions through various venues every single +day. Frequently, these questions do not include the necessary details +the team needs to begin useful work. GitHub issues can very quickly +turn into a something impossible to navigate and make sense +of. Because of this, questions, investigations, root cause analysis, +and discussions of potential features are all considered to be +[mailing list][rmq-users] material. If you are unsure where to begin, +the [RabbitMQ users mailing list][rmq-users] is the right place. -If what you are going to work on is a substantial change, please first ask the core team -of their opinion on [RabbitMQ mailing list](https://groups.google.com/forum/#!forum/rabbitmq-users). +Getting all the details necessary to reproduce an issue, make a +conclusion or even form a hypothesis about what's happening can take a +fair amount of time. Please help others help you by providing a way to +reproduce the behavior you're observing, or at least sharing as much +relevant information as possible on the [RabbitMQ users mailing +list][rmq-users]. +Please provide versions of the software used: -## Code of Conduct + * RabbitMQ server + * Erlang + * Operating system version (and distribution, if applicable) + * All client libraries used + * RabbitMQ plugins (if applicable) -See [CODE_OF_CONDUCT.md](./CODE_OF_CONDUCT.md). +The following information greatly helps in investigating and reproducing issues: + + * RabbitMQ server logs + * A code example or terminal transcript that can be used to reproduce + * Full exception stack traces (a single line message is not enough!) + * `rabbitmqctl report` and `rabbitmqctl environment` output + * Other relevant details about the environment and workload, e.g. a traffic capture + * Feel free to edit out hostnames and other potentially sensitive information. + +To make collecting much of this and other environment information, use +the [`rabbitmq-collect-env`][rmq-collect-env] script. It will produce an archive with +server logs, operating system logs, output of certain diagnostics commands and so on. +Please note that **no effort is made to scrub any information that may be sensitive**. + +### Pull Requests +RabbitMQ projects use pull requests to discuss, collaborate on and accept code contributions. +Pull requests is the primary place of discussing code changes. + +Here's the recommended workflow: + + * [Fork the repository][github-fork] or repositories you plan on contributing to. If multiple + repositories are involved in addressing the same issue, please use the same branch name + in each repository + * Create a branch with a descriptive name in the relevant repositories + * Make your changes, run tests (usually with `make tests`), commit with a + [descriptive message][git-commit-msgs], push to your fork + * Submit pull requests with an explanation what has been changed and **why** + * Submit a filled out and signed [Contributor Agreement][ca-agreement] if needed (see below) + * Be patient. We will get to your pull request eventually + +If what you are going to work on is a substantial change, please first +ask the core team for their opinion on the [RabbitMQ users mailing list][rmq-users]. ## Running Tests @@ -49,15 +100,24 @@ Finally, will run all suites. +## Code of Conduct -## Contributor Agreement +See [CODE_OF_CONDUCT.md](./CODE_OF_CONDUCT.md). -If you want to contribute a non-trivial change, please submit a signed copy of our -[Contributor Agreement](https://github.com/rabbitmq/ca#how-to-submit) around the time -you submit your pull request. This will make it much easier (in some cases, possible) -for the RabbitMQ team at Pivotal to merge your contribution. +## Contributor Agreement +If you want to contribute a non-trivial change, please submit a signed +copy of our [Contributor Agreement][ca-agreement] around the time you +submit your pull request. This will make it much easier (in some +cases, possible) for the RabbitMQ team at Pivotal to merge your +contribution. ## Where to Ask Questions -If something isn't clear, feel free to ask on our [mailing list](https://groups.google.com/forum/#!forum/rabbitmq-users). +If something isn't clear, feel free to ask on our [mailing list][rmq-users]. + +[rmq-collect-env]: https://github.com/rabbitmq/support-tools/blob/master/scripts/rabbitmq-collect-env +[git-commit-msgs]: https://goo.gl/xwWq +[rmq-users]: https://groups.google.com/forum/#!forum/rabbitmq-users +[ca-agreement]: https://cla.pivotal.io/sign/rabbitmq +[github-fork]: https://help.github.com/articles/fork-a-repo/ @@ -12,7 +12,7 @@ define PROJECT_ENV {tcp_listeners, [5672]}, {num_tcp_acceptors, 10}, {ssl_listeners, []}, - {num_ssl_acceptors, 1}, + {num_ssl_acceptors, 10}, {ssl_options, []}, {vm_memory_high_watermark, 0.4}, {vm_memory_high_watermark_paging_ratio, 0.5}, @@ -25,6 +25,7 @@ define PROJECT_ENV %% breaks the QPid Java client {frame_max, 131072}, {channel_max, 0}, + {connection_max, infinity}, {heartbeat, 60}, {msg_store_file_size_limit, 16777216}, {fhc_write_buffering, true}, @@ -123,7 +124,9 @@ define PROJECT_ENV %% either "stop_node" or "continue". %% by default we choose to not terminate the entire node if one %% vhost had to shut down, see server#1158 and server#1280 - {vhost_restart_strategy, continue} + {vhost_restart_strategy, continue}, + %% {global, prefetch count} + {default_consumer_prefetch, {false, 0}} ] endef diff --git a/docs/rabbitmq.conf.example b/docs/rabbitmq.conf.example index eb45bffbf5..e6d1849e47 100644 --- a/docs/rabbitmq.conf.example +++ b/docs/rabbitmq.conf.example @@ -2,11 +2,16 @@ # RabbbitMQ broker section # ====================================== -## Network Connectivity +## Related doc guide: http://rabbitmq.com/configure.html. See +## http://rabbitmq.com/documentation.html for documentation ToC. + +## Networking ## ==================== ## +## Related doc guide: http://rabbitmq.com/networking.html. +## ## By default, RabbitMQ will listen on all interfaces, using -## the standard (reserved) AMQP port. +## the standard (reserved) AMQP 0-9-1 and 1.0 port. ## # listeners.tcp.default = 5672 @@ -24,20 +29,20 @@ # listeners.tcp.other_ip = 10.10.10.10:5672 -## SSL listeners are configured in the same fashion as TCP listeners, +## TLS listeners are configured in the same fashion as TCP listeners, ## including the option to control the choice of interface. ## # listeners.ssl.default = 5671 ## Number of Erlang processes that will accept connections for the TCP -## and SSL listeners. +## and TLS listeners. ## # num_acceptors.tcp = 10 # num_acceptors.ssl = 1 ## Maximum time for AMQP 0-8/0-9/0-9-1 handshake (after socket connection -## and SSL handshake), in milliseconds. +## and TLS handshake), in milliseconds. ## # handshake_timeout = 10000 @@ -48,10 +53,12 @@ # reverse_dns_lookups = true ## -## Security / AAA +## Security, Access Control ## ============== ## +## Related doc guide: http://rabbitmq.com/access-control.html. + ## The default "guest" user is only permitted to access the server ## via a loopback interface (e.g. localhost). ## {loopback_users, [<<"guest">>]}, @@ -62,8 +69,9 @@ ## guest user from anywhere on the network. # loopback_users.guest = false -## Configuring SSL. -## See http://www.rabbitmq.com/ssl.html for full documentation. +## TLS configuration. +## +## Related doc guide: http://rabbitmq.com/ssl.html. ## # ssl_options.verify = verify_peer # ssl_options.fail_if_no_peer_cert = false @@ -76,8 +84,12 @@ ## Alternative backends are provided by plugins, such as rabbitmq-auth-backend-ldap. ## ## NB: These settings require certain plugins to be enabled. -## See http://www.rabbitmq.com/plugins.html and http://rabbitmq.com/access-control.html -## for details. +## +## Related doc guides: +## +## * http://rabbitmq.com/plugins.html +## * http://rabbitmq.com/access-control.html +## # auth_backends.1 = rabbit_auth_backend_internal @@ -90,8 +102,10 @@ ## perform authentication and authorisation by deferring to an ## external LDAP server. ## -## For more information about configuring the LDAP backend, see -## http://www.rabbitmq.com/ldap.html and http://rabbitmq.com/access-control.html. +## Relevant doc guides: +## +## * http://rabbitmq.com/ldap.html +## * http://rabbitmq.com/access-control.html ## ## uses LDAP for both authentication and authorisation # auth_backends.1 = rabbit_auth_backend_ldap @@ -109,14 +123,14 @@ ## 'AMQPLAIN', and 'EXTERNAL' Additional mechanisms can be added via ## plugins. ## -## See http://www.rabbitmq.com/authentication.html for more details. +## Related doc guide: http://rabbitmq.com/authentication.html. ## # auth_mechanisms.1 = PLAIN # auth_mechanisms.2 = AMQPLAIN ## The rabbitmq-auth-mechanism-ssl plugin makes it possible to ## authenticate a user based on the client's x509 (TLS) certificate. -## See http://www.rabbitmq.com/authentication.html for more info. +## Related doc guide: http://rabbitmq.com/authentication.html. ## ## To use auth-mechanism-ssl, the EXTERNAL mechanism should ## be enabled: @@ -132,16 +146,16 @@ # auth_mechanisms.1 = EXTERNAL ## This pertains to both the rabbitmq-auth-mechanism-ssl plugin and -## STOMP ssl_cert_login configurations. See the rabbitmq_stomp +## STOMP ssl_cert_login configurations. See the RabbitMQ STOMP plugin ## configuration section later in this file and the README in ## https://github.com/rabbitmq/rabbitmq-auth-mechanism-ssl for further ## details. ## -## To use the SSL cert's CN instead of its DN as the username +## To use the TLS cert's CN instead of its DN as the username ## # ssl_cert_login_from = common_name -## SSL handshake timeout, in milliseconds. +## TLS handshake timeout, in milliseconds. ## # ssl_handshake_timeout = 5000 @@ -166,9 +180,8 @@ ## ## On first start RabbitMQ will create a vhost and a user. These -## config items control what gets created. See -## http://www.rabbitmq.com/access-control.html for further -## information about vhosts and access control. +## config items control what gets created. +## Relevant doc guide: http://rabbitmq.com/access-control.html ## # default_vhost = / # default_user = guest @@ -181,7 +194,7 @@ ## Tags for default user ## ## For more details about tags, see the documentation for the -## Management Plugin at http://www.rabbitmq.com/management.html. +## Management Plugin at http://rabbitmq.com/management.html. ## # default_user_tags.administrator = true @@ -195,7 +208,10 @@ ## ## Set the default AMQP 0-9-1 heartbeat interval (in seconds). -## See http://rabbitmq.com/heartbeats.html for more details. +## Related doc guides: +## +## * http://rabbitmq.com/heartbeats.html +## * http://rabbitmq.com/networking.html ## # heartbeat = 600 @@ -213,10 +229,12 @@ ## # channel_max = 128 -## Customising Socket Options. +## Customising TCP Listener (Socket) Configuration. +## +## Related doc guides: ## -## See (http://www.erlang.org/doc/man/inet.html#setopts-2) for -## further documentation. +## * http://rabbitmq.com/networking.html +## * http://www.erlang.org/doc/man/inet.html#setopts-2 ## # tcp_listen_options.backlog = 128 @@ -227,7 +245,7 @@ ## Resource Limits & Flow Control ## ============================== ## -## See http://www.rabbitmq.com/memory.html for full details. +## Related doc guide: http://rabbitmq.com/memory.html. ## Memory-based Flow Control threshold. ## @@ -263,13 +281,13 @@ ## ## Another alternative is to configure queues to page all messages (both ## persistent and transient) to disk as quickly -## as possible, see http://www.rabbitmq.com/lazy-queues.html. +## as possible, see http://rabbitmq.com/lazy-queues.html. ## # vm_memory_high_watermark_paging_ratio = 0.5 -## Selects Erlang VM memory consumption calculation strategy. Can be `rss` or `erlang`, -## `rss` is the default. Introduced in 3.6.11. -## See https://github.com/rabbitmq/rabbitmq-server/issues/1223 for background. +## Selects Erlang VM memory consumption calculation strategy. Can be `allocated`, `rss` or `legacy` (aliased as `erlang`), +## Introduced in 3.6.11. `rss` is the default as of 3.6.12. +## See https://github.com/rabbitmq/rabbitmq-server/issues/1223 and rabbitmq/rabbitmq-common#224 for background. # vm_memory_calculation_strategy = rss ## Interval (in milliseconds) at which we perform the check of the memory @@ -317,10 +335,10 @@ ## # mirroring_sync_batch_size = 4096 -## Make clustering happen *automatically* at startup - only applied +## Make clustering happen *automatically* at startup. Only applied ## to nodes that have just been reset or started for the first time. -## See http://www.rabbitmq.com/clustering.html#auto-config for -## further details. +## +## Relevant doc guide: http://rabbitmq.com//cluster-formation.html ## # autocluster.peer_discovery_backend = rabbit_peer_discovery_classic_config @@ -386,8 +404,8 @@ ## # mnesia_table_loading_retry_limit = 10 -## Size in bytes below which to embed messages in the queue index. See -## http://www.rabbitmq.com/persistence-conf.html +## Size in bytes below which to embed messages in the queue index. +## Related doc guide: http://rabbitmq.com/persistence-conf.html ## # queue_index_embed_msgs_below = 4096 @@ -423,7 +441,7 @@ ## ---------------------------------------------------------------------------- ## Advanced Erlang Networking/Clustering Options. ## -## See http://www.rabbitmq.com/clustering.html for details +## Related doc guide: http://rabbitmq.com/clustering.html ## ---------------------------------------------------------------------------- # ====================================== @@ -435,17 +453,17 @@ ## ---------------------------------------------------------------------------- ## RabbitMQ Management Plugin ## -## See http://www.rabbitmq.com/management.html for details +## Related doc guide: http://rabbitmq.com/management.html. ## ---------------------------------------------------------------------------- # ======================================= # Management section # ======================================= -## Pre-Load schema definitions from the following JSON file. See -## http://www.rabbitmq.com/management.html#load-definitions +## Preload schema definitions from the following JSON file. +## Related doc guide: http://rabbitmq.com/management.html#load-definitions. ## -# management.load_definitions = /path/to/schema.json +# management.load_definitions = /path/to/exported/definitions.json ## Log all requests to the management HTTP API to a file. ## @@ -453,11 +471,10 @@ ## Change the port on which the HTTP listener listens, ## specifying an interface for the web server to bind to. -## Also set the listener to use SSL and provide SSL options. +## Also set the listener to use TLS and provide TLS options. ## -# QA: Maybe use IP type like in tcp_listener? -# management.listener.port = 12345 +# management.listener.port = 15672 # management.listener.ip = 127.0.0.1 # management.listener.ssl = true @@ -466,12 +483,12 @@ # management.listener.ssl_opts.keyfile = /path/to/key.pem ## One of 'basic', 'detailed' or 'none'. See -## http://www.rabbitmq.com/management.html#fine-stats for more details. +## http://rabbitmq.com/management.html#fine-stats for more details. # management.rates_mode = basic ## Configure how long aggregated data (such as message rates and queue ## lengths) is retained. Please read the plugin's documentation in -## http://www.rabbitmq.com/management.html#configuration for more +## http://rabbitmq.com/management.html#configuration for more ## details. ## Your can use 'minute', 'hour' and 'day' keys or integer key (in seconds) # management.sample_retention_policies.global.minute = 5 @@ -486,23 +503,23 @@ ## ---------------------------------------------------------------------------- ## RabbitMQ Shovel Plugin ## -## See http://www.rabbitmq.com/shovel.html for details +## Related doc guide: http://rabbitmq.com/shovel.html ## ---------------------------------------------------------------------------- ## Shovel plugin config example is defined in additional.config file ## ---------------------------------------------------------------------------- -## RabbitMQ Stomp Adapter +## RabbitMQ STOMP Plugin ## -## See http://www.rabbitmq.com/stomp.html for details +## Related doc guide: http://rabbitmq.com/stomp.html ## ---------------------------------------------------------------------------- # ======================================= # STOMP section # ======================================= -## Network Configuration - the format is generally the same as for the broker +## Network Configuration. The format is generally the same as for the core broker. ## # stomp.listeners.tcp.default = 61613 @@ -511,14 +528,14 @@ # stomp.listeners.ssl.default = 61614 ## Number of Erlang processes that will accept connections for the TCP -## and SSL listeners. +## and TLS listeners. ## # stomp.num_acceptors.tcp = 10 # stomp.num_acceptors.ssl = 1 -## Additional SSL options +## Additional TLS options -## Extract a name from the client's certificate when using SSL. +## Extract a name from the client's certificate when using TLS. ## # stomp.ssl_cert_login = true @@ -531,11 +548,11 @@ # stomp.default_user = guest # stomp.default_pass = guest -## If a default user is configured, or you have configured use SSL client +## If a default user is configured, or you have configured use TLS client ## certificate based authentication, you can choose to allow clients to ## omit the CONNECT frame entirely. If set to true, the client is ## automatically connected as the default user or user supplied in the -## SSL certificate whenever the first frame sent on a session is not a +## TLS certificate whenever the first frame sent on a session is not a ## CONNECT frame. ## # stomp.implicit_connect = true @@ -603,12 +620,12 @@ # mqtt.listeners.ssl.default = 1884 ## Number of Erlang processes that will accept connections for the TCP -## and SSL listeners. +## and TLS listeners. ## # mqtt.num_acceptors.tcp = 10 # mqtt.num_acceptors.ssl = 1 -## TCP/Socket options (as per the broker configuration). +## TCP listener options (as per the broker configuration). ## # mqtt.tcp_listen_options.backlog = 128 # mqtt.tcp_listen_options.nodelay = true @@ -626,12 +643,11 @@ ## ---------------------------------------------------------------------------- ## RabbitMQ AMQP 1.0 Support ## -## See https://github.com/rabbitmq/rabbitmq-amqp1.0/blob/stable/README.md -## for details +## See https://github.com/rabbitmq/rabbitmq-amqp1.0/blob/stable/README.md. ## ---------------------------------------------------------------------------- # ======================================= -# AMQP_1 section +# AMQP 1.0 section # ======================================= @@ -689,7 +705,7 @@ ## ---------------------------------------------------------------------------- ## RabbitMQ LDAP Plugin ## -## See http://www.rabbitmq.com/ldap.html for details. +## Related doc guide: http://rabbitmq.com/ldap.html. ## ## ---------------------------------------------------------------------------- @@ -710,7 +726,7 @@ ## You can define multiple servers # auth_ldap.servers.2 = your-other-server -## Connect to the LDAP server using SSL +## Connect to the LDAP server using TLS ## # auth_ldap.use_ssl = false @@ -788,9 +804,9 @@ ## ## The LDAP plugin can perform a variety of queries against your -## LDAP server to determine questions of authorisation. See -## http://www.rabbitmq.com/ldap.html#authorisation for more -## information. +## LDAP server to determine questions of authorisation. +## +## Related doc guide: http://rabbitmq.com/ldap.html#authorisation. ## Following configuration should be defined in additional.config file ## DO NOT UNCOMMENT THIS LINES! diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index 60af5b7eea..3d41ae138c 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -2,14 +2,16 @@ %% ---------------------------------------------------------------------------- %% RabbitMQ Sample Configuration File. %% -%% See http://www.rabbitmq.com/configure.html for details. +%% Related doc guide: http://www.rabbitmq.com/configure.html. See +%% http://rabbitmq.com/documentation.html for documentation ToC. %% ---------------------------------------------------------------------------- [ {rabbit, [%% - %% Network Connectivity + %% Networking %% ==================== %% + %% Related doc guide: http://www.rabbitmq.com/networking.html. %% By default, RabbitMQ will listen on all interfaces, using %% the standard (reserved) AMQP port. @@ -22,19 +24,19 @@ %% {tcp_listeners, [{"127.0.0.1", 5672}, %% {"::1", 5672}]}, - %% SSL listeners are configured in the same fashion as TCP listeners, + %% TLS listeners are configured in the same fashion as TCP listeners, %% including the option to control the choice of interface. %% %% {ssl_listeners, [5671]}, %% Number of Erlang processes that will accept connections for the TCP - %% and SSL listeners. + %% and TLS listeners. %% %% {num_tcp_acceptors, 10}, %% {num_ssl_acceptors, 1}, %% Maximum time for AMQP 0-8/0-9/0-9-1 handshake (after socket connection - %% and SSL handshake), in milliseconds. + %% and TLS handshake), in milliseconds. %% %% {handshake_timeout, 10000}, @@ -45,9 +47,10 @@ %% {reverse_dns_lookups, false}, %% - %% Security / AAA - %% ============== + %% Security, Access Control + %% ======================== %% + %% Related doc guide: http://www.rabbitmq.com/access-control.html. %% The default "guest" user is only permitted to access the server %% via a loopback interface (e.g. localhost). @@ -57,8 +60,10 @@ %% guest user from anywhere on the network. %% {loopback_users, []}, - %% Configuring SSL. - %% See http://www.rabbitmq.com/ssl.html for full documentation. + + %% TLS configuration. + %% + %% Related doc guide: http://www.rabbitmq.com/ssl.html. %% %% {ssl_options, [{cacertfile, "/path/to/testca/cacert.pem"}, %% {certfile, "/path/to/server/cert.pem"}, @@ -71,7 +76,7 @@ %% 'AMQPLAIN'. Additional mechanisms can be added via %% plugins. %% - %% See http://www.rabbitmq.com/authentication.html for more details. + %% Related doc guide: http://www.rabbitmq.com/authentication.html. %% %% {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, @@ -84,10 +89,10 @@ %% rabbitmq_auth_backend_ldap plugins. %% %% NB: These options require that the relevant plugin is enabled. - %% See http://www.rabbitmq.com/plugins.html for further details. + %% Related doc guide: http://www.rabbitmq.com/plugins.html for further details. %% The RabbitMQ-auth-mechanism-ssl plugin makes it possible to - %% authenticate a user based on the client's SSL certificate. + %% authenticate a user based on the client's TLS certificate. %% %% To use auth-mechanism-ssl, add to or replace the auth_mechanisms %% list with the entry 'EXTERNAL'. @@ -112,11 +117,11 @@ %% https://github.com/rabbitmq/rabbitmq-auth-mechanism-ssl for further %% details. %% - %% To use the SSL cert's CN instead of its DN as the username + %% To use the TLS cert's CN instead of its DN as the username %% %% {ssl_cert_login_from, distinguished_name}, - %% SSL handshake timeout, in milliseconds. + %% TLS handshake timeout, in milliseconds. %% %% {ssl_handshake_timeout, 5000}, @@ -138,7 +143,7 @@ %% {password_hashing_module, rabbit_password_hashing_sha256}, %% Configuration entry encryption. - %% See http://www.rabbitmq.com/configure.html#configuration-encryption + %% Related doc guide: http://www.rabbitmq.com/configure.html#configuration-encryption %% %% To specify the passphrase in the configuration file: %% @@ -175,8 +180,7 @@ %% Tags for default user %% - %% For more details about tags, see the documentation for the - %% Management Plugin at http://www.rabbitmq.com/management.html. + %% Related doc guide: http://www.rabbitmq.com/management.html. %% %% {default_user_tags, [administrator]}, @@ -188,7 +192,11 @@ %% Sets the default AMQP 0-9-1 heartbeat timeout in seconds. %% Values lower than 6 can produce false positives and are not %% recommended. - %% See http://www.rabbitmq.com/heartbeats.html. + %% + %% Related doc guides: + %% + %% * http://www.rabbitmq.com/heartbeats.html + %% * http://www.rabbitmq.com/networking.html %% %% {heartbeat, 60}, @@ -206,9 +214,28 @@ %% %% {channel_max, 0}, + %% Set the max permissible number of client connections to the node. + %% `infinity` means "no limit". + %% + %% This limit applies to client connections to all listeners (regardless of + %% the protocol, whether TLS is used and so on). CLI tools and inter-node + %% connections are exempt. + %% + %% When client connections are rapidly opened in succession, it is possible + %% for the total connection count to go slightly higher than the configured limit. + %% The limit works well as a general safety measure. + %% + %% Clients that are hitting the limit will see their TCP connections fail or time out. + %% + %% Introduced in 3.6.13. + %% + %% Related doc guide: http://www.rabbitmq.com/networking.html. + %% + %% {connection_max, infinity}, + %% TCP socket options. %% - %% See http://www.rabbitmq.com/networking.html. + %% Related doc guide: http://www.rabbitmq.com/networking.html. %% %% {tcp_listen_options, [{backlog, 128}, %% {nodelay, true}, @@ -218,7 +245,7 @@ %% Resource Limits & Flow Control %% ============================== %% - %% See http://www.rabbitmq.com/memory.html for full details. + %% Related doc guide: http://www.rabbitmq.com/memory.html, http://www.rabbitmq.com/memory-use.html. %% Memory-based Flow Control threshold. %% @@ -258,9 +285,9 @@ %% %% {vm_memory_high_watermark_paging_ratio, 0.5}, - %% Selects Erlang VM memory consumption calculation strategy. Can be `rss` or `erlang`, - %% `rss` is the default. Introduced in 3.6.11. - %% See https://github.com/rabbitmq/rabbitmq-server/issues/1223 for background. + %% Selects Erlang VM memory consumption calculation strategy. Can be `allocated`, `rss` or `legacy` (aliased as `erlang`), + %% Introduced in 3.6.11. `rss` is the default as of 3.6.12. + %% See https://github.com/rabbitmq/rabbitmq-server/issues/1223 and rabbitmq/rabbitmq-common#224 for background. %% {vm_memory_calculation_strategy, rss}, %% Interval (in milliseconds) at which we perform the check of the memory @@ -298,12 +325,13 @@ %% * <<"min-masters">> %% * <<"client-local">> %% * <<"random">> - %% See https://www.rabbitmq.com/ha.html#queue-master-location + %% + %% Related doc guide: https://www.rabbitmq.com/ha.html#queue-master-location %% %% {queue_master_locator, <<"client-local">>}, %% Batch size (number of messages) used during eager queue mirror synchronisation. - %% See https://www.rabbitmq.com/ha.html#batch-sync. When average message size is relatively large + %% Related doc guide: https://www.rabbitmq.com/ha.html#batch-sync. When average message size is relatively large %% (say, 10s of kilobytes or greater), reducing this value will decrease peak amount %% of RAM used by newly joining nodes that need eager synchronisation. %% @@ -321,7 +349,7 @@ %% {server_properties, []}, %% How to respond to cluster partitions. - %% See http://www.rabbitmq.com/partitions.html + %% Related doc guide: http://www.rabbitmq.com/partitions.html %% %% {cluster_partition_handling, ignore}, @@ -333,7 +361,7 @@ %% Make clustering happen *automatically* at startup - only applied %% to nodes that have just been reset or started for the first time. - %% See http://www.rabbitmq.com/clustering.html#auto-config + %% Related doc guide: http://www.rabbitmq.com/clustering.html#auto-config %% %% {cluster_nodes, {['rabbit@my.host.com'], disc}}, @@ -385,12 +413,12 @@ %% {mnesia_table_loading_retry_timeout, 30000}, %% Size in bytes below which to embed messages in the queue index. - %% See http://www.rabbitmq.com/persistence-conf.html + %% Related doc guide: http://www.rabbitmq.com/persistence-conf.html %% %% {queue_index_embed_msgs_below, 4096}, %% Maximum number of queue index entries to keep in journal - %% See http://www.rabbitmq.com/persistence-conf.html. + %% Related doc guide: http://www.rabbitmq.com/persistence-conf.html. %% %% {queue_index_max_journal_entries, 32768}, @@ -415,7 +443,7 @@ %% Increasing these values may help with throughput but also can be dangerous: %% high credit flow values are no different from not having flow control at all. %% - %% See https://www.rabbitmq.com/blog/2015/10/06/new-credit-flow-settings-on-rabbitmq-3-5-5/ + %% Related doc guide: https://www.rabbitmq.com/blog/2015/10/06/new-credit-flow-settings-on-rabbitmq-3-5-5/ %% and http://alvaro-videla.com/2013/09/rabbitmq-internals-credit-flow-for-erlang-processes.html. %% %% {credit_flow_default_credit, {400, 200}}, @@ -473,7 +501,7 @@ %% ---------------------------------------------------------------------------- %% Advanced Erlang Networking/Clustering Options. %% - %% See http://www.rabbitmq.com/clustering.html for details + %% Related doc guide: http://www.rabbitmq.com/clustering.html %% ---------------------------------------------------------------------------- {kernel, [%% Sets the net_kernel tick time. @@ -486,22 +514,22 @@ %% ---------------------------------------------------------------------------- %% RabbitMQ Management Plugin %% - %% See http://www.rabbitmq.com/management.html for details + %% Related doc guide: http://www.rabbitmq.com/management.html %% ---------------------------------------------------------------------------- {rabbitmq_management, - [%% Pre-Load schema definitions from the following JSON file. See + [%% Preload schema definitions from a previously exported definitions file. See %% http://www.rabbitmq.com/management.html#load-definitions %% - %% {load_definitions, "/path/to/schema.json"}, + %% {load_definitions, "/path/to/exported/definitions.json"}, - %% Log all requests to the management HTTP API to a file. + %% Log all requests to the management HTTP API to a directory. %% - %% {http_log_dir, "/path/to/access.log"}, + %% {http_log_dir, "/path/to/rabbitmq/logs/http"}, %% Change the port on which the HTTP listener listens, %% specifying an interface for the web server to bind to. - %% Also set the listener to use SSL and provide SSL options. + %% Also set the listener to use TLS and provide TLS options. %% %% {listener, [{port, 12345}, %% {ip, "127.0.0.1"}, @@ -528,7 +556,7 @@ %% ---------------------------------------------------------------------------- %% RabbitMQ Shovel Plugin %% - %% See http://www.rabbitmq.com/shovel.html for details + %% Related doc guide: http://www.rabbitmq.com/shovel.html %% ---------------------------------------------------------------------------- {rabbitmq_shovel, @@ -594,9 +622,9 @@ ]}, %% ---------------------------------------------------------------------------- - %% RabbitMQ Stomp Adapter + %% RabbitMQ STOMP Plugin %% - %% See http://www.rabbitmq.com/stomp.html for details + %% Related doc guide: http://www.rabbitmq.com/stomp.html %% ---------------------------------------------------------------------------- {rabbitmq_stomp, @@ -606,18 +634,18 @@ %% {tcp_listeners, [{"127.0.0.1", 61613}, %% {"::1", 61613}]}, - %% Listen for SSL connections on a specific port. + %% Listen for TLS connections on a specific port. %% {ssl_listeners, [61614]}, %% Number of Erlang processes that will accept connections for the TCP - %% and SSL listeners. + %% and TLS listeners. %% %% {num_tcp_acceptors, 10}, %% {num_ssl_acceptors, 1}, - %% Additional SSL options + %% Additional TLS options - %% Extract a name from the client's certificate when using SSL. + %% Extract a name from the client's certificate when using TLS. %% %% {ssl_cert_login, true}, @@ -630,11 +658,11 @@ %% {default_user, [{login, "guest"}, %% {passcode, "guest"}]}, - %% If a default user is configured, or you have configured use SSL client + %% If a default user is configured, or you have configured use TLS client %% certificate based authentication, you can choose to allow clients to %% omit the CONNECT frame entirely. If set to true, the client is %% automatically connected as the default user or user supplied in the - %% SSL certificate whenever the first frame sent on a session is not a + %% TLS certificate whenever the first frame sent on a session is not a %% CONNECT frame. %% %% {implicit_connect, true}, @@ -651,10 +679,10 @@ ]}, %% ---------------------------------------------------------------------------- - %% RabbitMQ MQTT Adapter + %% RabbitMQ MQTT Plugin + %% + %% Related doc guide: https://github.com/rabbitmq/rabbitmq-mqtt/blob/stable/README.md %% - %% See https://github.com/rabbitmq/rabbitmq-mqtt/blob/stable/README.md - %% for details %% ---------------------------------------------------------------------------- {rabbitmq_mqtt, @@ -727,8 +755,8 @@ %% ---------------------------------------------------------------------------- %% RabbitMQ AMQP 1.0 Support %% - %% See https://github.com/rabbitmq/rabbitmq-amqp1.0/blob/stable/README.md - %% for details + %% Related doc guide: https://github.com/rabbitmq/rabbitmq-amqp1.0/blob/stable/README.md + %% %% ---------------------------------------------------------------------------- {rabbitmq_amqp1_0, @@ -748,7 +776,7 @@ %% ---------------------------------------------------------------------------- %% RabbitMQ LDAP Plugin %% - %% See http://www.rabbitmq.com/ldap.html for details. + %% Related doc guide: http://www.rabbitmq.com/ldap.html. %% %% ---------------------------------------------------------------------------- @@ -763,7 +791,7 @@ %% %% {servers, ["your-server-name-goes-here"]}, - %% Connect to the LDAP server using SSL + %% Connect to the LDAP server using TLS %% %% {use_ssl, false}, diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema index 38edd6e1ca..9a0d54a2ad 100644 --- a/priv/schema/rabbit.schema +++ b/priv/schema/rabbit.schema @@ -523,10 +523,29 @@ end}. %% Set the max permissible number of channels per connection. %% 0 means "no limit". %% -%% {channel_max, 128}, +%% {channel_max, 0}, {mapping, "channel_max", "rabbit.channel_max", [{datatype, integer}]}. +%% Set the max permissible number of client connections per node. +%% `infinity` means "no limit". +%% +%% {connection_max, infinity}, + +{mapping, "connection_max", "rabbit.connection_max", + [{datatype, [{atom, infinity}, integer]}]}. + +{translation, "rabbit.connection_max", + fun(Conf) -> + case cuttlefish:conf_get("connection_max", Conf, undefined) of + undefined -> cuttlefish:unset(); + infinity -> infinity; + Val when is_integer(Val) -> Val; + _ -> cuttlefish:invalid("should be a non-negative integer") + end + end +}. + %% Customising Socket Options. %% %% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for @@ -686,13 +705,13 @@ end}. {mapping, "memory_monitor_interval", "rabbit.memory_monitor_interval", [{datatype, integer}]}. -%% When set to rss, RabbitMQ will display the memory usage as reported -%% by the operating system (RSS value), not by the Erlang VM. +%% Selects Erlang VM memory consumption calculation strategy. +%% Can be `allocated`, `rss` or `legacy` (aliased as `erlang`). %% %% {vm_memory_calculation_strategy, rss}, {mapping, "vm_memory_calculation_strategy", "rabbit.vm_memory_calculation_strategy", - [{datatype, {enum, [rss, erlang]}}]}. + [{datatype, {enum, [rss, erlang, allocated, legacy]}}]}. %% Set disk free limit (in bytes). Once free disk space reaches this %% lower bound, a disk alarm will be set - see the documentation diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 13cb82c333..9567477aa0 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -38,6 +38,8 @@ endif # base of the topic branch. dep_amqp_client = git_rmq rabbitmq-erlang-client $(current_rmq_ref) $(base_rmq_ref) master +dep_amqp10_client = git_rmq rabbitmq-amqp1.0-client $(current_rmq_ref) $(base_rmq_ref) master +dep_amqp10_common = git_rmq rabbitmq-amqp1.0-common $(current_rmq_ref) $(base_rmq_ref) master dep_rabbit = git_rmq rabbitmq-server $(current_rmq_ref) $(base_rmq_ref) master dep_rabbit_common = git_rmq rabbitmq-common $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_amqp1_0 = git_rmq rabbitmq-amqp1.0 $(current_rmq_ref) $(base_rmq_ref) master @@ -78,6 +80,7 @@ dep_rabbitmq_peer_discovery_common = git_rmq rabbitmq-peer-discovery-common $ dep_rabbitmq_peer_discovery_consul = git_rmq rabbitmq-peer-discovery-consul $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_peer_discovery_etcd = git_rmq rabbitmq-peer-discovery-etcd $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_peer_discovery_k8s = git_rmq rabbitmq-peer-discovery-k8s $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_random_exchange = git_rmq rabbitmq-random-exchange $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_recent_history_exchange = git_rmq rabbitmq-recent-history-exchange $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_routing_node_stamp = git_rmq rabbitmq-routing-node-stamp $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_rtopic_exchange = git_rmq rabbitmq-rtopic-exchange $(current_rmq_ref) $(base_rmq_ref) master @@ -107,14 +110,18 @@ dep_rabbitmq_public_umbrella = git_rmq rabbitmq-public-umbrella $(curre # all projects use the same versions. It avoids conflicts and makes it # possible to work with rabbitmq-public-umbrella. -dep_cowboy_commit = 1.1.2 -dep_mochiweb = git git://github.com/basho/mochiweb.git v2.9.0p2 -dep_ranch_commit = 1.3.2 +dep_cowboy = hex 1.1.2 +dep_jsx = hex 2.8.2 +dep_lager = hex 3.5.1 +dep_ranch = hex 1.3.2 +dep_ranch_proxy_protocol = hex 1.4.2 +dep_recon = hex 2.3.2 + dep_sockjs = git https://github.com/rabbitmq/sockjs-erlang.git 405990ea62353d98d36dbf5e1e64942d9b0a1daf -dep_webmachine_commit = 1.10.8p2 -dep_ranch_proxy_protocol = git git://github.com/heroku/ranch_proxy_protocol.git 1.4.2 RABBITMQ_COMPONENTS = amqp_client \ + amqp10_common \ + amqp10_client \ rabbit \ rabbit_common \ rabbitmq_amqp1_0 \ @@ -155,6 +162,7 @@ RABBITMQ_COMPONENTS = amqp_client \ rabbitmq_peer_discovery_consul \ rabbitmq_peer_discovery_etcd \ rabbitmq_peer_discovery_k8s \ + rabbitmq_random_exchange \ rabbitmq_recent_history_exchange \ rabbitmq_routing_node_stamp \ rabbitmq_rtopic_exchange \ diff --git a/src/gm.erl b/src/gm.erl index 0da190a57d..c53e9bb007 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -399,7 +399,6 @@ -define(FORCE_GC_TIMER, 250). -define(VERSION_START, 0). -define(SETS, ordsets). --define(DICT, orddict). -record(state, { self, @@ -824,8 +823,8 @@ handle_msg({catchup, Left, MembersStateLeft}, members_state = MembersState }) when MembersState =/= undefined -> MembersStateLeft1 = build_members_state(MembersStateLeft), - AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++ - ?DICT:fetch_keys(MembersStateLeft1)), + AllMembers = lists:usort(maps:keys(MembersState) ++ + maps:keys(MembersStateLeft1)), {MembersState1, Activity} = lists:foldl( fun (Id, MembersStateActivity) -> @@ -995,21 +994,21 @@ is_member_alias(Member, Self, View) -> dead_member_id({dead, Member}) -> Member. store_view_member(VMember = #view_member { id = Id }, {Ver, View}) -> - {Ver, ?DICT:store(Id, VMember, View)}. + {Ver, maps:put(Id, VMember, View)}. with_view_member(Fun, View, Id) -> store_view_member(Fun(fetch_view_member(Id, View)), View). -fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View). +fetch_view_member(Id, {_Ver, View}) -> maps:get(Id, View). -find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View). +find_view_member(Id, {_Ver, View}) -> maps:find(Id, View). -blank_view(Ver) -> {Ver, ?DICT:new()}. +blank_view(Ver) -> {Ver, maps:new()}. -alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View). +alive_view_members({_Ver, View}) -> maps:keys(View). all_known_members({_Ver, View}) -> - ?DICT:fold( + maps:fold( fun (Member, #view_member { aliases = Aliases }, Acc) -> ?SETS:to_list(Aliases) ++ [Member | Acc] end, [], View). @@ -1374,24 +1373,24 @@ with_member_acc(Fun, Id, {MembersState, Acc}) -> {store_member(Id, MemberState, MembersState), Acc1}. find_member_or_blank(Id, MembersState) -> - case ?DICT:find(Id, MembersState) of + case maps:find(Id, MembersState) of {ok, Result} -> Result; error -> blank_member() end. -erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState). +erase_member(Id, MembersState) -> maps:remove(Id, MembersState). blank_member() -> #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }. -blank_member_state() -> ?DICT:new(). +blank_member_state() -> maps:new(). store_member(Id, MemberState, MembersState) -> - ?DICT:store(Id, MemberState, MembersState). + maps:put(Id, MemberState, MembersState). -prepare_members_state(MembersState) -> ?DICT:to_list(MembersState). +prepare_members_state(MembersState) -> maps:to_list(MembersState). -build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList). +build_members_state(MembersStateList) -> maps:from_list(MembersStateList). make_member(GroupName) -> {case dirty_read_group(GroupName) of diff --git a/src/rabbit.erl b/src/rabbit.erl index cc1e0e08c4..0d0ff2f9fc 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -23,6 +23,7 @@ status/0, is_running/0, alarms/0, is_running/1, environment/0, rotate_logs/0, force_event_refresh/1, start_fhc/0]). + -export([start/2, stop/1, prep_stop/1]). -export([start_apps/1, start_apps/2, stop_apps/1]). -export([log_locations/0, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent @@ -230,6 +231,7 @@ %%---------------------------------------------------------------------------- +-type restart_type() :: 'permanent' | 'transient' | 'temporary'. %% this really should be an abstract type -type log_location() :: string(). -type param() :: atom(). @@ -267,7 +269,7 @@ -spec recover() -> 'ok'. -spec start_apps([app_name()]) -> 'ok'. -spec start_apps([app_name()], - #{app_name() => permanent|transient|temporary}) -> 'ok'. + #{app_name() => restart_type()}) -> 'ok'. -spec stop_apps([app_name()]) -> 'ok'. %%---------------------------------------------------------------------------- @@ -506,7 +508,7 @@ stop_and_halt() -> start_apps(Apps) -> start_apps(Apps, #{}). -start_apps(Apps, AppModes) -> +start_apps(Apps, RestartTypes) -> app_utils:load_applications(Apps), ConfigEntryDecoder = case application:get_env(rabbit, config_entry_decoder) of @@ -547,7 +549,7 @@ start_apps(Apps, AppModes) -> end, ok = app_utils:start_applications(OrderedApps, handle_app_error(could_not_start), - AppModes). + RestartTypes). %% This function retrieves the correct IoDevice for requesting %% input. The problem with using the default IoDevice is that diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ed88e69378..b51375f9fb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -22,6 +22,7 @@ -define(SYNC_INTERVAL, 200). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(CONSUMER_BIAS_RATIO, 2.0). %% i.e. consume 100% faster -export([info_keys/0]). @@ -1072,27 +1073,27 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName, ActingUser) -> %%---------------------------------------------------------------------------- -prioritise_call(Msg, _From, _Len, _State) -> +prioritise_call(Msg, _From, _Len, State) -> case Msg of info -> 9; {info, _Items} -> 9; consumers -> 9; stat -> 7; - {basic_consume, _, _, _, _, _, _, _, _, _, _} -> 1; - {basic_cancel, _, _, _} -> 1; + {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State, 0, 2); + {basic_cancel, _, _, _} -> consumer_bias(State, 0, 2); _ -> 0 end. -prioritise_cast(Msg, _Len, _State) -> +prioritise_cast(Msg, _Len, State) -> case Msg of delete_immediately -> 8; {delete_exclusive, _Pid} -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; {run_backing_queue, _Mod, _Fun} -> 6; - {ack, _AckTags, _ChPid} -> 3; %% [1] - {resume, _ChPid} -> 2; - {notify_sent, _ChPid, _Credit} -> 1; + {ack, _AckTags, _ChPid} -> 4; %% [1] + {resume, _ChPid} -> 3; + {notify_sent, _ChPid, _Credit} -> consumer_bias(State, 0, 2); _ -> 0 end. @@ -1104,6 +1105,16 @@ prioritise_cast(Msg, _Len, _State) -> %% stack are optimised for that) and to make things easier to reason %% about. Finally, we prioritise ack over resume since it should %% always reduce memory use. +%% bump_reduce_memory_use is prioritised over publishes, because sending +%% credit to self is hard to reason about. Consumers can continue while +%% reduce_memory_use is in progress. + +consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}, Low, High) -> + case BQ:msg_rates(BQS) of + {0.0, _} -> Low; + {Ingress, Egress} when Egress / Ingress < ?CONSUMER_BIAS_RATIO -> High; + {_, _} -> Low + end. prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> case Msg of @@ -1113,6 +1124,7 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> {drop_expired, _Version} -> 8; emit_stats -> 7; sync_timeout -> 6; + bump_reduce_memory_use -> 1; _ -> 0 end. @@ -1503,6 +1515,10 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ, %% rabbit_variable_queue:msg_store_write/4. credit_flow:handle_bump_msg(Msg), noreply(State#q{backing_queue_state = BQ:resume(BQS)}); +handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + put(waiting_bump, false), + noreply(State#q{backing_queue_state = BQ:resume(BQS)}); handle_info(Info, State) -> {stop, {unhandled_info, Info}, State}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 1167ac66f9..c671438ce8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -186,10 +186,21 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). --define(INCR_STATS(Incs, Measure, State), +-define(INCR_STATS(Type, Key, Inc, Measure, State), case rabbit_event:stats_level(State, #ch.stats_timer) of - fine -> incr_stats(Incs, Measure); - _ -> ok + fine -> + rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc), + %% Keys in the process dictionary are used to clean up the core metrics + put({Type, Key}, none); + _ -> + ok + end). + +-define(INCR_STATS(Type, Key, Inc, Measure), + begin + rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc), + %% Keys in the process dictionary are used to clean up the core metrics + put({Type, Key}, none) end). %%---------------------------------------------------------------------------- @@ -378,7 +389,16 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, true -> flow; false -> noflow end, - + {ok, {Global, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch), + Limiter0 = rabbit_limiter:new(LimiterPid), + Limiter = case {Global, Prefetch} of + {true, 0} -> + rabbit_limiter:unlimit_prefetch(Limiter0); + {true, _} -> + rabbit_limiter:limit_prefetch(Limiter0, Prefetch, 0); + _ -> + Limiter0 + end, State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -386,7 +406,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, writer_pid = WriterPid, conn_pid = ConnPid, conn_name = ConnName, - limiter = rabbit_limiter:new(LimiterPid), + limiter = Limiter, tx = none, next_tag = 1, unacked_message_q = queue:new(), @@ -407,7 +427,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, mandatory = dtree:empty(), capabilities = Capabilities, trace_state = rabbit_trace:init(VHost), - consumer_prefetch = 0, + consumer_prefetch = Prefetch, reply_consumer = none, delivery_flow = Flow, interceptor_state = undefined}, @@ -1258,8 +1278,12 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{global = false, - prefetch_count = PrefetchCount}, _, State) -> - {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount}}; + prefetch_count = PrefetchCount}, + _, State = #ch{limiter = Limiter}) -> + %% Ensures that if default was set, it's overriden + Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), + {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount, + limiter = Limiter1}}; handle_method(#'basic.qos'{global = true, prefetch_count = 0}, @@ -1632,7 +1656,7 @@ basic_return(#basic_message{exchange_name = ExchangeName, content = Content}, State = #ch{protocol = Protocol, writer_pid = WriterPid}, Reason) -> - ?INCR_STATS([{exchange_stats, ExchangeName, 1}], return_unroutable, State), + ?INCR_STATS(exchange_stats, ExchangeName, 1, return_unroutable, State), {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), ok = rabbit_writer:send_command( WriterPid, @@ -1669,14 +1693,14 @@ record_sent(ConsumerTag, AckRequired, user = #user{username = Username}, conn_name = ConnName, channel = ChannelNum}) -> - ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of - {none, true} -> get; - {none, false} -> get_no_ack; - {_ , true} -> deliver; - {_ , false} -> deliver_no_ack - end, State), + ?INCR_STATS(queue_stats, QName, 1, case {ConsumerTag, AckRequired} of + {none, true} -> get; + {none, false} -> get_no_ack; + {_ , true} -> deliver; + {_ , false} -> deliver_no_ack + end, State), case Redelivered of - true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State); + true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State); false -> ok end, rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState), @@ -1721,11 +1745,11 @@ ack(Acked, State = #ch{queue_names = QNames}) -> foreach_per_queue( fun (QPid, MsgIds) -> ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), - ?INCR_STATS(case maps:find(QPid, QNames) of - {ok, QName} -> Count = length(MsgIds), - [{queue_stats, QName, Count}]; - error -> [] - end, ack, State) + case maps:find(QPid, QNames) of + {ok, QName} -> Count = length(MsgIds), + ?INCR_STATS(queue_stats, QName, Count, ack, State); + error -> ok + end end, Acked), ok = notify_limiter(State#ch.limiter, Acked). @@ -1790,7 +1814,7 @@ deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, confirm = false, mandatory = false}, []}, State) -> %% optimisation - ?INCR_STATS([{exchange_stats, XName, 1}], publish, State), + ?INCR_STATS(exchange_stats, XName, 1, publish, State), State; deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, @@ -1827,11 +1851,15 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ Message, State1), State3 = process_routing_confirm( Confirm, DeliveredQPids, MsgSeqNo, XName, State2), - ?INCR_STATS([{exchange_stats, XName, 1} | - [{queue_exchange_stats, {QName, XName}, 1} || - QPid <- DeliveredQPids, - {ok, QName} <- [maps:find(QPid, QNames1)]]], - publish, State3), + case rabbit_event:stats_level(State3, #ch.stats_timer) of + fine -> + ?INCR_STATS(exchange_stats, XName, 1, publish), + [?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) || + QPid <- DeliveredQPids, + {ok, QName} <- [maps:find(QPid, QNames1)]]; + _ -> + ok + end, State3. process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) -> @@ -1874,7 +1902,7 @@ send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) -> ok -> ConfirmMsgSeqNos = lists:foldl( fun ({MsgSeqNo, XName}, MSNs) -> - ?INCR_STATS([{exchange_stats, XName, 1}], + ?INCR_STATS(exchange_stats, XName, 1, confirm, State), [MsgSeqNo | MSNs] end, [], lists:append(C)), @@ -1977,17 +2005,14 @@ i(Item, _) -> name(#ch{conn_name = ConnName, channel = Channel}) -> list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])). -incr_stats(Incs, Measure) -> - [begin - rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc), - %% Keys in the process dictionary are used to clean up the core metrics - put({Type, Key}, none) - end || {Type, Key, Inc} <- Incs]. - emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> [{reductions, Red} | Coarse0] = infos(?STATISTICS_KEYS, State), + %% First metric must be `idle_since` (if available), as expected by + %% `rabbit_mgmt_format:format_channel_stats`. This is a performance + %% optimisation that avoids traversing the whole list when only + %% one element has to be formatted. rabbit_core_metrics:channel_stats(self(), Extra ++ Coarse0), rabbit_core_metrics:channel_stats(reductions, self(), Red). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index e81b10a555..93bacc568d 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -184,7 +184,7 @@ %% 'Notify' is a boolean that indicates whether a queue should be %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. - queues = orddict:new(), % QPid -> {MonitorRef, Notify} + queues = maps:new(), % QPid -> {MonitorRef, Notify} volume = 0}). %% mode is of type credit_mode() @@ -402,28 +402,28 @@ prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. remember_queue(QPid, State = #lim{queues = Queues}) -> - case orddict:is_key(QPid, Queues) of + case maps:is_key(QPid, Queues) of false -> MRef = erlang:monitor(process, QPid), - State#lim{queues = orddict:store(QPid, {MRef, false}, Queues)}; + State#lim{queues = maps:put(QPid, {MRef, false}, Queues)}; true -> State end. forget_queue(QPid, State = #lim{queues = Queues}) -> - case orddict:find(QPid, Queues) of + case maps:find(QPid, Queues) of {ok, {MRef, _}} -> true = erlang:demonitor(MRef), - State#lim{queues = orddict:erase(QPid, Queues)}; + State#lim{queues = maps:remove(QPid, Queues)}; error -> State end. limit_queue(QPid, State = #lim{queues = Queues}) -> UpdateFun = fun ({MRef, _}) -> {MRef, true} end, - State#lim{queues = orddict:update(QPid, UpdateFun, Queues)}. + State#lim{queues = maps:update_with(QPid, UpdateFun, Queues)}. notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> {QList, NewQueues} = - orddict:fold(fun (_QPid, {_, false}, Acc) -> Acc; + maps:fold(fun (_QPid, {_, false}, Acc) -> Acc; (QPid, {MRef, true}, {L, D}) -> - {[QPid | L], orddict:store(QPid, {MRef, false}, D)} + {[QPid | L], maps:put(QPid, {MRef, false}, D)} end, {[], Queues}, Queues), case length(QList) of 0 -> ok; diff --git a/src/rabbit_looking_glass.erl b/src/rabbit_looking_glass.erl index 702fb41eb8..6cd53dbc23 100644 --- a/src/rabbit_looking_glass.erl +++ b/src/rabbit_looking_glass.erl @@ -17,6 +17,7 @@ -module(rabbit_looking_glass). -ignore_xref([{lg, trace, 4}]). +-ignore_xref([{maps, from_list, 1}]). -export([boot/0]). -export([connections/0]). @@ -27,12 +28,21 @@ boot() -> ok; Value -> Input = parse_value(Value), - rabbit_log:info("Enabling Looking Glass profiler, input value: ~p", [Input]), + rabbit_log:info( + "Enabling Looking Glass profiler, input value: ~p", + [Input] + ), {ok, _} = application:ensure_all_started(looking_glass), - lg:trace(Input, lg_file_tracer, "traces.lz4", #{ - mode => profile, - running => true, - send => true}) + lg:trace( + Input, + lg_file_tracer, + "traces.lz4", + maps:from_list([ + {mode, profile}, + {running, true}, + {send, true}] + ) + ) end. parse_value(Value) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 6139099ed1..65a13f03c0 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -374,6 +374,9 @@ handle_info({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), noreply(State); +handle_info(bump_reduce_memory_use, State) -> + noreply(State); + %% In the event of a short partition during sync we can detect the %% master's 'death', drop out of sync, and then receive sync messages %% which were still in flight. Ignore them. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index fe78075d0f..76976ad771 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -780,7 +780,7 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) -> sync_timer_ref = undefined, sum_valid_data = 0, sum_file_size = 0, - pending_gc_completion = orddict:new(), + pending_gc_completion = maps:new(), gc_pid = GCPid, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, @@ -1269,7 +1269,7 @@ contains_message(MsgId, From, gen_server2:reply(From, false), State; #msg_location { file = File } -> - case orddict:is_key(File, Pending) of + case maps:is_key(File, Pending) of true -> add_to_pending_gc_completion( {contains, MsgId, From}, File, State); false -> gen_server2:reply(From, true), @@ -1280,16 +1280,16 @@ contains_message(MsgId, From, add_to_pending_gc_completion( Op, File, State = #msstate { pending_gc_completion = Pending }) -> State #msstate { pending_gc_completion = - rabbit_misc:orddict_cons(File, Op, Pending) }. + rabbit_misc:maps_cons(File, Op, Pending) }. run_pending(Files, State) -> lists:foldl( fun (File, State1 = #msstate { pending_gc_completion = Pending }) -> - Pending1 = orddict:erase(File, Pending), + Pending1 = maps:remove(File, Pending), lists:foldl( fun run_pending_action/2, State1 #msstate { pending_gc_completion = Pending1 }, - lists:reverse(orddict:fetch(File, Pending))) + lists:reverse(maps:get(File, Pending))) end, State, Files). run_pending_action({read, MsgId, From}, State) -> @@ -1320,9 +1320,9 @@ adjust_valid_total_size(File, Delta, State = #msstate { [{#file_summary.valid_total_size, Delta}]), State #msstate { sum_valid_data = SumValid + Delta }. -orddict_store(Key, Val, Dict) -> - false = orddict:is_key(Key, Dict), - orddict:store(Key, Val, Dict). +maps_store(Key, Val, Dict) -> + false = maps:is_key(Key, Dict), + maps:put(Key, Val, Dict). update_pending_confirms(Fun, CRef, State = #msstate { clients = Clients, @@ -1860,7 +1860,7 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, %% complete traversal of FileSummaryEts. First = ets:first(FileSummaryEts), case First =:= '$end_of_table' orelse - orddict:size(Pending) >= ?MAXIMUM_SIMULTANEOUS_GC_FILES of + maps:size(Pending) >= ?MAXIMUM_SIMULTANEOUS_GC_FILES of true -> State; false -> @@ -1869,8 +1869,8 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, not_found -> State; {Src, Dst} -> - Pending1 = orddict_store(Dst, [], - orddict_store(Src, [], Pending)), + Pending1 = maps_store(Dst, [], + maps_store(Src, [], Pending)), State1 = close_handle(Src, close_handle(Dst, State)), true = ets:update_element(FileSummaryEts, Src, {#file_summary.locked, true}), @@ -1926,7 +1926,7 @@ delete_file_if_empty(File, State = #msstate { 0 -> true = ets:update_element(FileSummaryEts, File, {#file_summary.locked, true}), ok = rabbit_msg_store_gc:delete(GCPid, File), - Pending1 = orddict_store(File, [], Pending), + Pending1 = maps_store(File, [], Pending), close_handle(File, State #msstate { pending_gc_completion = Pending1 }); _ -> State diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 8bf79f7130..565676af3c 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -350,7 +350,7 @@ init([]) -> subscribers = pmon:new(), partitions = [], guid = rabbit_guid:gen(), - node_guids = orddict:new(), + node_guids = maps:new(), autoheal = rabbit_autoheal:init()})}. handle_call(partitions, _From, State = #state{partitions = Partitions}) -> @@ -405,17 +405,17 @@ handle_cast({node_up, Node, NodeType, GUID}, State = #state{guid = MyGUID, node_guids = GUIDs}) -> cast(Node, {announce_guid, node(), MyGUID}), - GUIDs1 = orddict:store(Node, GUID, GUIDs), + GUIDs1 = maps:put(Node, GUID, GUIDs), handle_cast({node_up, Node, NodeType}, State#state{node_guids = GUIDs1}); handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) -> - {noreply, State#state{node_guids = orddict:store(Node, GUID, GUIDs)}}; + {noreply, State#state{node_guids = maps:put(Node, GUID, GUIDs)}}; handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID}, State = #state{guid = MyGUID, node_guids = GUIDs}) -> case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) andalso - orddict:find(Node, GUIDs) =:= {ok, NodeGUID} of + maps:find(Node, GUIDs) =:= {ok, NodeGUID} of true -> spawn_link( %%[1] fun () -> case rpc:call(Node, rabbit, is_running, []) of @@ -560,10 +560,10 @@ handle_info({nodedown, Node, Info}, State = #state{guid = MyGUID, cast(N, {check_partial_partition, Node, node(), DownGUID, CheckGUID, MyGUID}) end, - case orddict:find(Node, GUIDs) of + case maps:find(Node, GUIDs) of {ok, DownGUID} -> Alive = rabbit_mnesia:cluster_nodes(running) -- [node(), Node], - [case orddict:find(N, GUIDs) of + [case maps:find(N, GUIDs) of {ok, CheckGUID} -> Check(N, CheckGUID, DownGUID); error -> ok end || N <- Alive]; diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index 41e65e8a1f..9f9ef5adca 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -207,13 +207,13 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow, ?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)). batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) -> - PubDict = partition_publish_batch(Publishes, MaxP), + PubMap = partition_publish_batch(Publishes, MaxP), lists:foldl( fun ({Priority, Pubs}, St) -> pick1(fun (_P, BQSN) -> BQ:batch_publish(Pubs, ChPid, Flow, BQSN) end, Priority, St) - end, State, orddict:to_list(PubDict)); + end, State, maps:to_list(PubMap)); batch_publish(Publishes, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough1(batch_publish(Publishes, ChPid, Flow, BQS)). @@ -229,7 +229,7 @@ publish_delivered(Msg, MsgProps, ChPid, Flow, ?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)). batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) -> - PubDict = partition_publish_delivered_batch(Publishes, MaxP), + PubMap = partition_publish_delivered_batch(Publishes, MaxP), {PrioritiesAndAcks, State1} = lists:foldl( fun ({Priority, Pubs}, {PriosAndAcks, St}) -> @@ -241,7 +241,7 @@ batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [ {priority_on_acktags(P, AckTags), BQSN1} end, Priority, St), {[PriosAndAcks1 | PriosAndAcks], St1} - end, {[], State}, orddict:to_list(PubDict)), + end, {[], State}, maps:to_list(PubMap)), {lists:reverse(PrioritiesAndAcks), State1}; batch_publish_delivered(Publishes, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> @@ -327,7 +327,7 @@ ackfold(MsgFun, Acc, State = #state{bq = BQ}, AckTags) -> AckTagsByPriority = partition_acktags(AckTags), fold2( fun (P, BQSN, AccN) -> - case orddict:find(P, AckTagsByPriority) of + case maps:find(P, AckTagsByPriority) of {ok, ATagsN} -> {AccN1, BQSN1} = BQ:ackfold(MsgFun, AccN, BQSN, ATagsN), {priority_on_acktags(P, AccN1), BQSN1}; @@ -439,7 +439,7 @@ zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{bqss = [{MaxP, _} |_]}) -> MsgsByPriority = partition_publish_delivered_batch(Msgs, MaxP), lists:foldl(fun (Acks, MAs) -> {P, _AckTag} = hd(Acks), - Pubs = orddict:fetch(P, MsgsByPriority), + Pubs = maps:get(P, MsgsByPriority), MAs0 = zip_msgs_and_acks(Pubs, Acks), MAs ++ MAs0 end, Accumulator, AckTags); @@ -527,7 +527,7 @@ fold_min2(Fun, State) -> fold_by_acktags2(Fun, AckTags, State) -> AckTagsByPriority = partition_acktags(AckTags), fold_append2(fun (P, BQSN) -> - case orddict:find(P, AckTagsByPriority) of + case maps:find(P, AckTagsByPriority) of {ok, AckTagsN} -> Fun(AckTagsN, BQSN); error -> {[], BQSN} end @@ -597,11 +597,11 @@ partition_publishes(Publishes, ExtractMsg, MaxP) -> Partitioned = lists:foldl(fun (Pub, Dict) -> Msg = ExtractMsg(Pub), - rabbit_misc:orddict_cons(priority(Msg, MaxP), Pub, Dict) - end, orddict:new(), Publishes), - orddict:map(fun (_P, RevPubs) -> - lists:reverse(RevPubs) - end, Partitioned). + rabbit_misc:maps_cons(priority(Msg, MaxP), Pub, Dict) + end, maps:new(), Publishes), + maps:map(fun (_P, RevPubs) -> + lists:reverse(RevPubs) + end, Partitioned). priority_bq(Priority, [{MaxP, _} | _] = BQSs) -> @@ -625,14 +625,14 @@ add_maybe_infinity(infinity, _) -> infinity; add_maybe_infinity(_, infinity) -> infinity; add_maybe_infinity(A, B) -> A + B. -partition_acktags(AckTags) -> partition_acktags(AckTags, orddict:new()). +partition_acktags(AckTags) -> partition_acktags(AckTags, maps:new()). partition_acktags([], Partitioned) -> - orddict:map(fun (_P, RevAckTags) -> - lists:reverse(RevAckTags) - end, Partitioned); + maps:map(fun (_P, RevAckTags) -> + lists:reverse(RevAckTags) + end, Partitioned); partition_acktags([{P, AckTag} | Rest], Partitioned) -> - partition_acktags(Rest, rabbit_misc:orddict_cons(P, AckTag, Partitioned)). + partition_acktags(Rest, rabbit_misc:maps_cons(P, AckTag, Partitioned)). priority_on_acktags(P, AckTags) -> [case Tag of diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index f13a46fcf3..0fe3065fe8 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -254,9 +254,9 @@ subtract_acks(ChPid, AckTags, State) -> not_found; C = #cr{acktags = ChAckTags, limiter = Lim} -> {CTagCounts, AckTags2} = subtract_acks( - AckTags, [], orddict:new(), ChAckTags), + AckTags, [], maps:new(), ChAckTags), {Unblocked, Lim2} = - orddict:fold( + maps:fold( fun (CTag, Count, {UnblockedN, LimN}) -> {Unblocked1, LimN1} = rabbit_limiter:ack_from_queue(LimN, CTag, Count), @@ -278,7 +278,7 @@ subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) -> case queue:out(AckQ) of {{value, {T, CTag}}, QTail} -> subtract_acks(TL, Prefix, - orddict:update_counter(CTag, 1, CTagCounts), QTail); + maps:update_with(CTag, fun (Old) -> Old + 1 end, 1, CTagCounts), QTail); {{value, V}, QTail} -> subtract_acks(AckTags, [V | Prefix], CTagCounts, QTail); {empty, _} -> diff --git a/src/rabbit_queue_location_validator.erl b/src/rabbit_queue_location_validator.erl index 2b0f8c7a0f..e70bcd314a 100644 --- a/src/rabbit_queue_location_validator.erl +++ b/src/rabbit_queue_location_validator.erl @@ -54,7 +54,6 @@ module(#amqqueue{} = Q) -> undefined -> no_location_strategy; Mode -> module(Mode) end; - module(Strategy) when is_binary(Strategy) -> case rabbit_registry:binary_to_type(Strategy) of {error, not_found} -> no_location_strategy; @@ -68,4 +67,6 @@ module(Strategy) when is_binary(Strategy) -> _ -> no_location_strategy end - end. + end; +module(Strategy) -> + module(rabbit_data_coercion:to_binary(Strategy)). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ac3d60bb11..49f4d8d3ed 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -829,7 +829,7 @@ set_ram_duration_target( (TargetRamCount =/= infinity andalso TargetRamCount1 >= TargetRamCount) of true -> State1; - false -> maybe_reduce_memory_use(State1) + false -> reduce_memory_use(State1) end). maybe_update_rates(State = #vqstate{ in_counter = InCount, @@ -911,7 +911,7 @@ timeout(State = #vqstate { index_state = IndexState }) -> handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. -resume(State) -> a(maybe_reduce_memory_use(State)). +resume(State) -> a(reduce_memory_use(State)). msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, out = AvgEgressRate } }) -> @@ -1776,7 +1776,7 @@ remove_queue_entries(Q, DelsAndAcksFun, State = #vqstate{msg_store_clients = MSCState}) -> {MsgIdsByStore, Delivers, Acks, State1} = ?QUEUE:foldl(fun remove_queue_entries1/2, - {orddict:new(), [], [], State}, Q), + {maps:new(), [], [], State}, Q), remove_msgs_by_id(MsgIdsByStore, MSCState), DelsAndAcksFun(Delivers, Acks, State1). @@ -1786,7 +1786,7 @@ remove_queue_entries1( is_persistent = IsPersistent} = MsgStatus, {MsgIdsByStore, Delivers, Acks, State}) -> {case MsgInStore of - true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); + true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), @@ -2143,27 +2143,27 @@ purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA, qi_pending_ack = gb_trees:empty()}, {IndexOnDiskSeqIds, MsgIdsByStore, State1}. -%% MsgIdsByStore is an orddict with two keys: +%% MsgIdsByStore is an map with two keys: %% %% true: holds a list of Persistent Message Ids. %% false: holds a list of Transient Message Ids. %% -%% When we call orddict:to_list/1 we get two sets of msg ids, where +%% When we call maps:to_list/1 we get two sets of msg ids, where %% IsPersistent is either true for persistent messages or false for %% transient ones. The msg_store_remove/3 function takes this boolean %% flag to determine from which store the messages should be removed %% from. remove_msgs_by_id(MsgIdsByStore, MSCState) -> [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) - || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)]. + || {IsPersistent, MsgIds} <- maps:to_list(MsgIdsByStore)]. remove_transient_msgs_by_id(MsgIdsByStore, MSCState) -> - case orddict:find(false, MsgIdsByStore) of + case maps:find(false, MsgIdsByStore) of error -> ok; {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds) end. -accumulate_ack_init() -> {[], orddict:new(), []}. +accumulate_ack_init() -> {[], maps:new(), []}. accumulate_ack(#msg_status { seq_id = SeqId, msg_id = MsgId, @@ -2173,7 +2173,7 @@ accumulate_ack(#msg_status { seq_id = SeqId, {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc), case MsgInStore of - true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); + true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, [MsgId | AllMsgIds]}. @@ -2407,45 +2407,79 @@ reduce_memory_use(State = #vqstate { out = AvgEgress, ack_in = AvgAckIngress, ack_out = AvgAckEgress } }) -> - State1 = #vqstate { q2 = Q2, q3 = Q3 } = + {CreditDiscBound, _} =rabbit_misc:get_env(rabbit, + msg_store_credit_disc_bound, + ?CREDIT_DISC_BOUND), + {NeedResumeA2B, State1} = {_, #vqstate { q2 = Q2, q3 = Q3 }} = case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of - 0 -> State; + 0 -> {false, State}; %% Reduce memory of pending acks and alphas. The order is %% determined based on which is growing faster. Whichever %% comes second may very well get a quota of 0 if the %% first manages to push out the max number of messages. - S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) > + A2BChunk -> + %% In case there are few messages to be sent to a message store + %% and many messages to be embedded to the queue index, + %% we should limit the number of messages to be flushed + %% to avoid blocking the process. + A2BChunkActual = case A2BChunk > CreditDiscBound * 2 of + true -> CreditDiscBound * 2; + false -> A2BChunk + end, + Funs = case ((AvgAckIngress - AvgAckEgress) > (AvgIngress - AvgEgress)) of true -> [fun limit_ram_acks/2, fun push_alphas_to_betas/2]; false -> [fun push_alphas_to_betas/2, fun limit_ram_acks/2] end, - {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> + {Quota, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> ReduceFun(QuotaN, StateN) - end, {S1, State}, Funs), - State2 + end, {A2BChunkActual, State}, Funs), + {(Quota == 0) andalso (A2BChunk > A2BChunkActual), State2} end, - - State3 = + Permitted = permitted_beta_count(State1), + {NeedResumeB2D, State3} = %% If there are more messages with their queue position held in RAM, %% a.k.a. betas, in Q2 & Q3 than IoBatchSize, %% write their queue position to disk, a.k.a. push_betas_to_deltas case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), - permitted_beta_count(State1)) of - S2 when S2 >= IoBatchSize -> - %% There is an implicit, but subtle, upper bound here. We - %% may shuffle a lot of messages from Q2/3 into delta, but - %% the number of these that require any disk operation, - %% namely index writing, i.e. messages that are genuine - %% betas and not gammas, is bounded by the credit_flow - %% limiting of the alpha->beta conversion above. - push_betas_to_deltas(S2, State1); + Permitted) of + B2DChunk when B2DChunk >= IoBatchSize -> + %% Same as for alphas to betas. Limit a number of messages + %% to be flushed to disk at once to avoid blocking the process. + B2DChunkActual = case B2DChunk > CreditDiscBound * 2 of + true -> CreditDiscBound * 2; + false -> B2DChunk + end, + StateBD = push_betas_to_deltas(B2DChunkActual, State1), + {B2DChunk > B2DChunkActual, StateBD}; _ -> - State1 + {false, State1} end, - %% See rabbitmq-server-290 for the reasons behind this GC call. - garbage_collect(), + %% We can be blocked by the credit flow, or limited by a batch size, + %% or finished with flushing. + %% If blocked by the credit flow - the credit grant will resume processing, + %% if limited by a batch - the batch continuation message should be sent. + %% The continuation message will be prioritised over publishes, + %% but not cinsumptions, so the queue can make progess. + Blocked = credit_flow:blocked(), + case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of + %% Credit bump will continue paging + {true, _} -> ok; + %% Finished with paging + {false, false} -> ok; + %% Planning next batch + {false, true} -> + %% We don't want to use self-credit-flow, because it's harder to + %% reason about. So the process sends a (prioritised) message to + %% itself and sets a waiting_bump value to keep the message box clean + case get(waiting_bump) of + true -> ok; + _ -> self() ! bump_reduce_memory_use, + put(waiting_bump, true) + end + end, State3; %% When using lazy queues, there are no alphas, so we don't need to %% call push_alphas_to_betas/2. diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index 3d69040b04..710e89922e 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -197,10 +197,10 @@ categorise_by_scope(Version) when is_list(Version) -> rabbit_misc:all_module_attributes(rabbit_upgrade), {Name, Scope, _Requires} <- Attributes, lists:member(Name, Version)], - orddict:to_list( + maps:to_list( lists:foldl(fun ({Scope, Name}, CatVersion) -> - rabbit_misc:orddict_cons(Scope, Name, CatVersion) - end, orddict:new(), Categorised)). + rabbit_misc:maps_cons(Scope, Name, CatVersion) + end, maps:new(), Categorised)). dir() -> rabbit_mnesia:dir(). diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 88062dc32a..b2474a6b38 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -51,8 +51,6 @@ memory() -> 0 end, MgmtDbETS = ets_memory([rabbit_mgmt_storage]), - VMTotal = vm_memory_monitor:get_process_memory(), - [{total, ErlangTotal}, {processes, Processes}, {ets, ETS}, @@ -62,48 +60,59 @@ memory() -> {system, System}] = erlang:memory([total, processes, ets, atom, binary, code, system]), - Unaccounted = case VMTotal - ErlangTotal of - GTZ when GTZ > 0 -> GTZ; - _LTZ -> 0 + Strategy = vm_memory_monitor:get_memory_calculation_strategy(), + {Allocated, VMTotal} = case Strategy of + erlang -> {ErlangTotal, ErlangTotal}; + allocated -> + Alloc = recon_alloc:memory(allocated), + {Alloc, Alloc}; + rss -> + Alloc = recon_alloc:memory(allocated), + Vm = vm_memory_monitor:get_process_memory(current), + {Alloc, Vm} end, + AllocatedUnused = max(Allocated - ErlangTotal, 0), + OSReserved = max(VMTotal - Allocated, 0), + OtherProc = Processes - ConnsReader - ConnsWriter - ConnsChannel - ConnsOther - Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc, [ %% Connections - {connection_readers, ConnsReader}, - {connection_writers, ConnsWriter}, - {connection_channels, ConnsChannel}, - {connection_other, ConnsOther}, + {connection_readers, ConnsReader}, + {connection_writers, ConnsWriter}, + {connection_channels, ConnsChannel}, + {connection_other, ConnsOther}, %% Queues - {queue_procs, Qs}, - {queue_slave_procs, QsSlave}, + {queue_procs, Qs}, + {queue_slave_procs, QsSlave}, %% Processes - {plugins, Plugins}, - {other_proc, lists:max([0, OtherProc])}, %% [1] + {plugins, Plugins}, + {other_proc, lists:max([0, OtherProc])}, %% [1] %% Metrics - {metrics, MetricsETS + MetricsProc}, - {mgmt_db, MgmtDbETS + MgmtDbProc}, + {metrics, MetricsETS + MetricsProc}, + {mgmt_db, MgmtDbETS + MgmtDbProc}, %% ETS - {mnesia, MnesiaETS}, - {other_ets, ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS}, + {mnesia, MnesiaETS}, + {other_ets, ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS}, %% Messages (mostly, some binaries are not messages) - {binary, Bin}, - {msg_index, MsgIndexETS + MsgIndexProc}, + {binary, Bin}, + {msg_index, MsgIndexETS + MsgIndexProc}, %% System - {code, Code}, - {atom, Atom}, - {other_system, System - ETS - Bin - Code - Atom + Unaccounted}, - - {total, VMTotal} + {code, Code}, + {atom, Atom}, + {other_system, System - ETS - Bin - Code - Atom}, + {allocated_unused, AllocatedUnused}, + {reserved_unallocated, OSReserved}, + {total, VMTotal} ]. %% [1] - erlang:memory(processes) can be less than the sum of its %% parts. Rather than display something nonsensical, just silence any diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 14654535d6..7cb1214c8e 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -49,10 +49,11 @@ start_link(IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartu init({IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown, ConcurrentAcceptorCount, Label}) -> {ok, AckTimeout} = application:get_env(rabbit, ssl_handshake_timeout), + MaxConnections = rabbit_misc:get_env(rabbit, connection_max, infinity), {ok, {{one_for_all, 10, 10}, [ ranch:child_spec({acceptor, IPAddress, Port}, ConcurrentAcceptorCount, Transport, [{port, Port}, {ip, IPAddress}, - {max_connections, infinity}, + {max_connections, MaxConnections}, {ack_timeout, AckTimeout}, {connection_type, supervisor}|SocketOpts], ProtoSup, ProtoOpts), diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl index 8bf8a9a8b8..e04fff2182 100644 --- a/test/clustering_management_SUITE.erl +++ b/test/clustering_management_SUITE.erl @@ -532,7 +532,9 @@ erlang_config(Config) -> ok = start_app(Hare), assert_clustered([Rabbit, Hare]), - %% If we use an invalid node name, the node fails to start. + %% If we use an invalid node type, the node fails to start. + %% The Erlang VM has stopped after previous rabbit app failure + rabbit_ct_broker_helpers:start_node(Config, Hare), ok = stop_app(Hare), ok = reset(Hare), ok = rpc:call(Hare, application, set_env, @@ -543,7 +545,7 @@ erlang_config(Config) -> %% If we use an invalid node type, the node fails to start. %% The Erlang VM has stopped after previous rabbit app failure - ok = rabbit_ct_broker_helpers:start_node(Config, Hare), + rabbit_ct_broker_helpers:start_node(Config, Hare), ok = stop_app(Hare), ok = reset(Hare), ok = rpc:call(Hare, application, set_env, @@ -554,7 +556,7 @@ erlang_config(Config) -> %% If we use an invalid cluster_nodes conf, the node fails to start. %% The Erlang VM has stopped after previous rabbit app failure - ok = rabbit_ct_broker_helpers:start_node(Config, Hare), + rabbit_ct_broker_helpers:start_node(Config, Hare), ok = stop_app(Hare), ok = reset(Hare), ok = rpc:call(Hare, application, set_env, @@ -564,7 +566,7 @@ erlang_config(Config) -> assert_not_clustered(Rabbit), %% The Erlang VM has stopped after previous rabbit app failure - ok = rabbit_ct_broker_helpers:start_node(Config, Hare), + rabbit_ct_broker_helpers:start_node(Config, Hare), ok = stop_app(Hare), ok = reset(Hare), ok = rpc:call(Hare, application, set_env, @@ -703,6 +705,8 @@ assert_failure(Fun) -> {error, Reason} -> Reason; {error_string, Reason} -> Reason; {badrpc, {'EXIT', Reason}} -> Reason; + %% Failure to start an app result in node shutdown + {badrpc, nodedown} -> nodedown; {badrpc_multi, Reason, _Nodes} -> Reason; Other -> error({expected_failure, Other}) end. diff --git a/test/config_schema_SUITE_data/rabbit.snippets b/test/config_schema_SUITE_data/rabbit.snippets index 52babbd34a..f978ee0b41 100644 --- a/test/config_schema_SUITE_data/rabbit.snippets +++ b/test/config_schema_SUITE_data/rabbit.snippets @@ -143,6 +143,26 @@ tcp_listen_options.exit_on_close = false", [{rabbit, [{vm_memory_calculation_strategy, erlang}]}], []}, + {vm_memory_calculation_strategy, "vm_memory_calculation_strategy = allocated", + [{rabbit, + [{vm_memory_calculation_strategy, allocated}]}], + []}, + {vm_memory_calculation_strategy, "vm_memory_calculation_strategy = legacy", + [{rabbit, + [{vm_memory_calculation_strategy, legacy}]}], + []}, + {connection_max, + "connection_max = 999", + [{rabbit,[{connection_max, 999}]}], + []}, + {connection_max, + "connection_max = infinity", + [{rabbit,[{connection_max, infinity}]}], + []}, + {channel_max, + "channel_max = 16", + [{rabbit,[{channel_max, 16}]}], + []}, {listeners_tcp_ip, "listeners.tcp.1 = 192.168.1.99:5672", [{rabbit,[{tcp_listeners,[{"192.168.1.99",5672}]}]}], |
