summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-04-28 15:05:55 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-04-28 15:05:55 +0100
commit78759491b8cb7f4d006bd021fcc0cb0dde08e413 (patch)
treef02544bb60347224cf86a49133b4ae10eb1ab72d
parent20710489718efdf22412bb0316957719f0783059 (diff)
parentca1ad94464bde375a054162e2ee91a688ab9e7dd (diff)
downloadrabbitmq-server-git-78759491b8cb7f4d006bd021fcc0cb0dde08e413.tar.gz
Merging default into bug24004
-rw-r--r--include/rabbit_backing_queue_spec.hrl31
-rw-r--r--include/rabbit_exchange_type_spec.hrl2
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/common/rabbitmq-server.init3
-rwxr-xr-xpackaging/common/rabbitmq-server.ocf8
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rwxr-xr-xscripts/rabbitmq-env3
-rw-r--r--scripts/rabbitmq-service.bat1
-rw-r--r--src/gm.erl11
-rw-r--r--src/gm_soak_test.erl7
-rw-r--r--src/rabbit.erl23
-rw-r--r--src/rabbit_amqqueue.erl29
-rw-r--r--src/rabbit_amqqueue_process.erl174
-rw-r--r--src/rabbit_backing_queue.erl37
-rw-r--r--src/rabbit_binding.erl234
-rw-r--r--src/rabbit_channel.erl1
-rw-r--r--src/rabbit_control.erl44
-rw-r--r--src/rabbit_direct.erl15
-rw-r--r--src/rabbit_exchange.erl34
-rw-r--r--src/rabbit_exchange_type.erl9
-rw-r--r--src/rabbit_exchange_type_direct.erl3
-rw-r--r--src/rabbit_exchange_type_fanout.erl3
-rw-r--r--src/rabbit_exchange_type_headers.erl3
-rw-r--r--src/rabbit_exchange_type_topic.erl16
-rw-r--r--src/rabbit_misc.erl40
-rw-r--r--src/rabbit_mnesia.erl5
-rw-r--r--src/rabbit_msg_store.erl151
-rw-r--r--src/rabbit_prelaunch.erl9
-rw-r--r--src/rabbit_queue_index.erl6
-rw-r--r--src/rabbit_reader.erl3
-rw-r--r--src/rabbit_ssl.erl83
-rw-r--r--src/rabbit_tests.erl220
-rw-r--r--src/rabbit_upgrade_functions.erl6
-rw-r--r--src/rabbit_variable_queue.erl140
-rw-r--r--src/supervisor2.erl25
-rw-r--r--src/test_sup.erl2
36 files changed, 732 insertions, 658 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index b2bf6bbbce..d9296bf631 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -25,23 +25,24 @@
-type(message_properties_transformer() ::
fun ((rabbit_types:message_properties())
-> rabbit_types:message_properties())).
--type(async_callback() :: fun ((fun ((state()) -> state())) -> 'ok')).
--type(sync_callback() :: fun ((fun ((state()) -> state())) -> 'ok' | 'error')).
+-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
+-type(sync_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok' | 'error')).
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(init/5 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery(),
+-spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(),
async_callback(), sync_callback()) -> state()).
-spec(terminate/1 :: (state()) -> state()).
-spec(delete_and_terminate/1 :: (state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
--spec(publish/3 :: (rabbit_types:basic_message(),
- rabbit_types:message_properties(), state()) -> state()).
--spec(publish_delivered/4 :: (true, rabbit_types:basic_message(),
- rabbit_types:message_properties(), state())
+-spec(publish/4 :: (rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), state()) ->
+ state()).
+-spec(publish_delivered/5 :: (true, rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), state())
-> {ack(), state()};
(false, rabbit_types:basic_message(),
- rabbit_types:message_properties(), state())
+ rabbit_types:message_properties(), pid(), state())
-> {undefined, state()}).
-spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}).
-spec(dropwhile/2 ::
@@ -49,16 +50,17 @@
-> state()).
-spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}).
--spec(ack/2 :: ([ack()], state()) -> state()).
--spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(),
- rabbit_types:message_properties(), state()) -> state()).
+-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
+-spec(tx_publish/5 :: (rabbit_types:txn(), rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), state()) ->
+ state()).
-spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
-spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}).
-spec(tx_commit/4 ::
(rabbit_types:txn(), fun (() -> any()),
message_properties_transformer(), state()) -> {[ack()], state()}).
-spec(requeue/3 :: ([ack()], message_properties_transformer(), state())
- -> state()).
+ -> {[rabbit_guid:guid()], state()}).
-spec(len/1 :: (state()) -> non_neg_integer()).
-spec(is_empty/1 :: (state()) -> boolean()).
-spec(set_ram_duration_target/2 ::
@@ -68,3 +70,8 @@
-spec(idle_timeout/1 :: (state()) -> state()).
-spec(handle_pre_hibernate/1 :: (state()) -> state()).
-spec(status/1 :: (state()) -> [{atom(), any()}]).
+-spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()).
+-spec(is_duplicate/3 ::
+ (rabbit_types:txn(), rabbit_types:basic_message(), state()) ->
+ {'false'|'published'|'discarded', state()}).
+-spec(discard/3 :: (rabbit_types:basic_message(), pid(), state()) -> state()).
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index 45c475d88d..c80cc1966d 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -21,8 +21,6 @@
-> rabbit_router:match_result()).
-spec(validate/1 :: (rabbit_types:exchange()) -> 'ok').
-spec(create/2 :: (boolean(), rabbit_types:exchange()) -> 'ok').
--spec(recover/2 :: (rabbit_types:exchange(),
- [rabbit_types:binding()]) -> 'ok').
-spec(delete/3 :: (boolean(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
-spec(add_binding/3 :: (boolean(), rabbit_types:exchange(),
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 45af770ace..f9e9df8be5 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -120,6 +120,9 @@ done
rm -rf %{buildroot}
%changelog
+* Thu Apr 7 2011 Alexandru Scvortov <alexandru@rabbitmq.com> 2.4.1-1
+- New Upstream Release
+
* Tue Mar 22 2011 Alexandru Scvortov <alexandru@rabbitmq.com> 2.4.0-1
- New Upstream Release
diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init
index f3bdc3d2ad..d8a7a94d56 100644
--- a/packaging/common/rabbitmq-server.init
+++ b/packaging/common/rabbitmq-server.init
@@ -28,6 +28,7 @@ INIT_LOG_DIR=/var/log/rabbitmq
LOCK_FILE= # This is filled in when building packages
test -x $DAEMON || exit 0
+test -x $CONTROL || exit 0
RETVAL=0
set -e
@@ -94,7 +95,7 @@ status_rabbitmq() {
rotate_logs_rabbitmq() {
set +e
- $DAEMON rotate_logs ${ROTATE_SUFFIX}
+ $CONTROL rotate_logs ${ROTATE_SUFFIX}
if [ $? != 0 ] ; then
RETVAL=1
fi
diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf
index 94999d0edf..d58c48ed52 100755
--- a/packaging/common/rabbitmq-server.ocf
+++ b/packaging/common/rabbitmq-server.ocf
@@ -103,9 +103,9 @@ The IP Port for rabbitmq-server to listen on
<parameter name="config_file" unique="0" required="0">
<longdesc lang="en">
-Location of the config file
+Location of the config file (without the .config suffix)
</longdesc>
-<shortdesc lang="en">Config file path</shortdesc>
+<shortdesc lang="en">Config file path (without the .config suffix)</shortdesc>
<content type="string" default="" />
</parameter>
@@ -189,8 +189,8 @@ rabbit_validate_partial() {
}
rabbit_validate_full() {
- if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e $RABBITMQ_CONFIG_FILE ]; then
- ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file";
+ if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e "${RABBITMQ_CONFIG_FILE}.config" ]; then
+ ocf_log err "rabbitmq-server config_file ${RABBITMQ_CONFIG_FILE}.config does not exist or is not a file";
exit $OCF_ERR_INSTALLED;
fi
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 2ca5074f64..0383b955d9 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (2.4.1-1) lucid; urgency=low
+
+ * New Upstream Release
+
+ -- Alexandru Scvortov <alexandru@rabbitmq.com> Thu, 07 Apr 2011 16:49:22 +0100
+
rabbitmq-server (2.4.0-1) lucid; urgency=low
* New Upstream Release
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 3e17394981..a2ef8d3ceb 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -37,7 +37,8 @@ RABBITMQ_HOME="${SCRIPT_DIR}/.."
NODENAME=rabbit@${HOSTNAME%%.*}
# Load configuration from the rabbitmq.conf file
-if [ -f /etc/rabbitmq/rabbitmq.conf ]; then
+if [ -f /etc/rabbitmq/rabbitmq.conf ] && \
+ [ ! -f /etc/rabbitmq/rabbitmq-env.conf ] ; then
echo -n "WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- "
echo "location has moved to /etc/rabbitmq/rabbitmq-env.conf"
fi
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index aa428a8c4e..b2aa4f58c9 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -227,6 +227,7 @@ set ERLANG_SERVICE_ARGUMENTS=!ERLANG_SERVICE_ARGUMENTS:"=\"!
-stopaction "rabbit:stop_and_halt()." ^
-sname !RABBITMQ_NODENAME! ^
!CONSOLE_FLAG! ^
+-comment "A robust and scalable messaging broker" ^
-args "!ERLANG_SERVICE_ARGUMENTS!" > NUL
goto END
diff --git a/src/gm.erl b/src/gm.erl
index 5b3623cf81..8b7dc70c83 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -516,7 +516,8 @@ flush(Server) ->
init([GroupName, Module, Args]) ->
- random:seed(now()),
+ {MegaSecs, Secs, MicroSecs} = now(),
+ random:seed(MegaSecs, Secs, MicroSecs),
gen_server2:cast(self(), join),
Self = self(),
{ok, #state { self = Self,
@@ -1010,7 +1011,7 @@ prune_or_create_group(Self, GroupName) ->
fun () -> GroupNew = #gm_group { name = GroupName,
members = [Self],
version = 0 },
- case mnesia:read(?GROUP_TABLE, GroupName) of
+ case mnesia:read({?GROUP_TABLE, GroupName}) of
[] ->
mnesia:write(GroupNew),
GroupNew;
@@ -1028,7 +1029,7 @@ record_dead_member_in_group(Member, GroupName) ->
{atomic, Group} =
mnesia:sync_transaction(
fun () -> [Group1 = #gm_group { members = Members, version = Ver }] =
- mnesia:read(?GROUP_TABLE, GroupName),
+ mnesia:read({?GROUP_TABLE, GroupName}),
case lists:splitwith(
fun (Member1) -> Member1 =/= Member end, Members) of
{_Members1, []} -> %% not found - already recorded dead
@@ -1048,7 +1049,7 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
mnesia:sync_transaction(
fun () ->
[#gm_group { members = Members, version = Ver } = Group1] =
- mnesia:read(?GROUP_TABLE, GroupName),
+ mnesia:read({?GROUP_TABLE, GroupName}),
{Prefix, [Left | Suffix]} =
lists:splitwith(fun (M) -> M =/= Left end, Members),
Members1 = Prefix ++ [Left, NewMember | Suffix],
@@ -1067,7 +1068,7 @@ erase_members_in_group(Members, GroupName) ->
fun () ->
[Group1 = #gm_group { members = [_|_] = Members1,
version = Ver }] =
- mnesia:read(?GROUP_TABLE, GroupName),
+ mnesia:read({?GROUP_TABLE, GroupName}),
case Members1 -- DeadMembers of
Members1 -> Group1;
Members2 -> Group2 =
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index 1f8832a6b2..dae42ac7b8 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -35,7 +35,7 @@ with_state(Fun) ->
inc() ->
case 1 + get(count) of
- 100000 -> Now = os:timestamp(),
+ 100000 -> Now = now(),
Start = put(ts, Now),
Diff = timer:now_diff(Now, Start),
Rate = 100000 / (Diff / 1000000),
@@ -48,7 +48,7 @@ joined([], Members) ->
io:format("Joined ~p (~p members)~n", [self(), length(Members)]),
put(state, dict:from_list([{Member, empty} || Member <- Members])),
put(count, 0),
- put(ts, os:timestamp()),
+ put(ts, now()),
ok.
members_changed([], Births, Deaths) ->
@@ -101,7 +101,8 @@ terminate([], Reason) ->
spawn_member() ->
spawn_link(
fun () ->
- random:seed(now()),
+ {MegaSecs, Secs, MicroSecs} = now(),
+ random:seed(MegaSecs, Secs, MicroSecs),
%% start up delay of no more than 10 seconds
timer:sleep(random:uniform(10000)),
{ok, Pid} = gm:start_link(?MODULE, ?MODULE, []),
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 807e9e7d58..e6e80b4aac 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -27,13 +27,16 @@
%%---------------------------------------------------------------------------
%% Boot steps.
--export([maybe_insert_default_data/0, boot_delegate/0]).
+-export([maybe_insert_default_data/0, boot_delegate/0, recover/0]).
+
+-rabbit_boot_step({pre_boot, [{description, "rabbit boot start"}]}).
-rabbit_boot_step({codec_correctness_check,
[{description, "codec correctness check"},
{mfa, {rabbit_binary_generator,
check_empty_content_body_frame_size,
[]}},
+ {requires, pre_boot},
{enables, external_infrastructure}]}).
-rabbit_boot_step({database,
@@ -45,11 +48,13 @@
[{description, "file handle cache server"},
{mfa, {rabbit_sup, start_restartable_child,
[file_handle_cache]}},
+ {requires, pre_boot},
{enables, worker_pool}]}).
-rabbit_boot_step({worker_pool,
[{description, "worker pool"},
{mfa, {rabbit_sup, start_child, [worker_pool_sup]}},
+ {requires, pre_boot},
{enables, external_infrastructure}]}).
-rabbit_boot_step({external_infrastructure,
@@ -123,15 +128,9 @@
{requires, core_initialized},
{enables, routing_ready}]}).
--rabbit_boot_step({exchange_recovery,
- [{description, "exchange recovery"},
- {mfa, {rabbit_exchange, recover, []}},
- {requires, empty_db_check},
- {enables, routing_ready}]}).
-
--rabbit_boot_step({queue_sup_queue_recovery,
- [{description, "queue supervisor and queue recovery"},
- {mfa, {rabbit_amqqueue, start, []}},
+-rabbit_boot_step({recovery,
+ [{description, "exchange, queue and binding recovery"},
+ {mfa, {rabbit, recover, []}},
{requires, empty_db_check},
{enables, routing_ready}]}).
@@ -186,6 +185,7 @@
-spec(maybe_insert_default_data/0 :: () -> 'ok').
-spec(boot_delegate/0 :: () -> 'ok').
+-spec(recover/0 :: () -> 'ok').
-endif.
@@ -464,6 +464,9 @@ boot_delegate() ->
{ok, Count} = application:get_env(rabbit, delegate_count),
rabbit_sup:start_child(delegate_sup, [Count]).
+recover() ->
+ rabbit_binding:recover(rabbit_exchange:recover(), rabbit_amqqueue:start()).
+
maybe_insert_default_data() ->
case rabbit_mnesia:is_db_empty() of
true -> insert_default_data();
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c7391965d7..7bb90fd9d9 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -30,7 +30,7 @@
%% internal
-export([internal_declare/2, internal_delete/1,
- run_backing_queue/2, run_backing_queue_async/2,
+ run_backing_queue/3, run_backing_queue_async/3,
sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2, maybe_expire/1, drop_expired/1,
emit_stats/1]).
@@ -57,7 +57,7 @@
-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
--spec(start/0 :: () -> 'ok').
+-spec(start/0 :: () -> [name()]).
-spec(stop/0 :: () -> 'ok').
-spec(declare/5 ::
(name(), boolean(), boolean(),
@@ -141,10 +141,12 @@
rabbit_types:connection_exit() |
fun ((boolean()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit())).
--spec(run_backing_queue/2 ::
- (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
--spec(run_backing_queue_async/2 ::
- (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
+-spec(run_backing_queue/3 ::
+ (pid(), atom(),
+ (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
+-spec(run_backing_queue_async/3 ::
+ (pid(), atom(),
+ (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
-spec(sync_timeout/1 :: (pid()) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
@@ -166,8 +168,7 @@ start() ->
{rabbit_amqqueue_sup,
{rabbit_amqqueue_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
- _RealDurableQueues = recover_durable_queues(DurableQueues),
- ok.
+ recover_durable_queues(DurableQueues).
stop() ->
ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup),
@@ -187,8 +188,8 @@ find_durable_queues() ->
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
- [Q || Q <- Qs,
- gen_server2:call(Q#amqqueue.pid, {init, true}, infinity) == Q].
+ [QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs,
+ gen_server2:call(Pid, {init, true}, infinity) == {new, Q}].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
@@ -439,11 +440,11 @@ internal_delete(QueueName) ->
end
end).
-run_backing_queue(QPid, Fun) ->
- gen_server2:call(QPid, {run_backing_queue, Fun}, infinity).
+run_backing_queue(QPid, Mod, Fun) ->
+ gen_server2:call(QPid, {run_backing_queue, Mod, Fun}, infinity).
-run_backing_queue_async(QPid, Fun) ->
- gen_server2:cast(QPid, {run_backing_queue, Fun}).
+run_backing_queue_async(QPid, Mod, Fun) ->
+ gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
sync_timeout(QPid) ->
gen_server2:cast(QPid, sync_timeout).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3f5758ce93..110817a955 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -137,8 +137,7 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
declare(Recover, From,
- State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined,
+ State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined,
stats_timer = StatsTimer}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
@@ -149,7 +148,7 @@ declare(Recover, From,
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
- BQS = bq_init(BQ, QName, IsDurable, Recover),
+ BQS = bq_init(BQ, Q, Recover),
State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
@@ -159,17 +158,17 @@ declare(Recover, From,
Q1 -> {stop, normal, {existing, Q1}, State}
end.
-bq_init(BQ, QName, IsDurable, Recover) ->
+bq_init(BQ, Q, Recover) ->
Self = self(),
- BQ:init(QName, IsDurable, Recover,
- fun (Fun) ->
- rabbit_amqqueue:run_backing_queue_async(Self, Fun)
+ BQ:init(Q, Recover,
+ fun (Mod, Fun) ->
+ rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun)
end,
- fun (Fun) ->
+ fun (Mod, Fun) ->
rabbit_misc:with_exit_handler(
fun () -> error end,
fun () ->
- rabbit_amqqueue:run_backing_queue(Self, Fun)
+ rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end)
end).
@@ -428,11 +427,19 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
{CMs, MTC0}
end
end, {gb_trees:empty(), MTC}, MsgIds),
- gb_trees:map(fun(ChPid, MsgSeqNos) ->
- rabbit_channel:confirm(ChPid, MsgSeqNos)
- end, CMs),
+ gb_trees_foreach(fun(ChPid, MsgSeqNos) ->
+ rabbit_channel:confirm(ChPid, MsgSeqNos)
+ end, CMs),
State#q{msg_id_to_channel = MTC1}.
+gb_trees_foreach(_, none) ->
+ ok;
+gb_trees_foreach(Fun, {Key, Val, It}) ->
+ Fun(Key, Val),
+ gb_trees_foreach(Fun, gb_trees:next(It));
+gb_trees_foreach(Fun, Tree) ->
+ gb_trees_foreach(Fun, gb_trees:next(gb_trees:iterator(Tree))).
+
gb_trees_cons(Key, Value, Tree) ->
case gb_trees:lookup(Key, Tree) of
{value, Values} -> gb_trees:update(Key, [Value | Values], Tree);
@@ -469,45 +476,70 @@ run_message_queue(State) ->
{_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1),
State2.
-attempt_delivery(#delivery{txn = none,
- sender = ChPid,
- message = Message,
- msg_seq_no = MsgSeqNo} = Delivery,
- State = #q{backing_queue = BQ}) ->
+attempt_delivery(Delivery = #delivery{txn = none,
+ sender = ChPid,
+ message = Message,
+ msg_seq_no = MsgSeqNo},
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Confirm = should_confirm_message(Delivery, State),
case Confirm of
immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
_ -> ok
end,
- PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
- DeliverFun =
- fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
- %% we don't need an expiry here because messages are
- %% not being enqueued, so we use an empty
- %% message_properties.
- {AckTag, BQS1} =
- BQ:publish_delivered(
- AckRequired, Message,
- (?BASE_MESSAGE_PROPERTIES)#message_properties{
- needs_confirming = needs_confirming(Confirm)},
- BQS),
- {{Message, false, AckTag}, true,
- State1#q{backing_queue_state = BQS1}}
- end,
- {Delivered, State1} =
- deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State),
- {Delivered, Confirm, State1};
-attempt_delivery(#delivery{txn = Txn,
- sender = ChPid,
- message = Message} = Delivery,
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
- BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS),
- {true, should_confirm_message(Delivery, State),
- State#q{backing_queue_state = BQS1}}.
+ case BQ:is_duplicate(none, Message, BQS) of
+ {false, BQS1} ->
+ PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
+ DeliverFun =
+ fun (AckRequired, false,
+ State1 = #q{backing_queue_state = BQS2}) ->
+ %% we don't need an expiry here because
+ %% messages are not being enqueued, so we use
+ %% an empty message_properties.
+ {AckTag, BQS3} =
+ BQ:publish_delivered(
+ AckRequired, Message,
+ (?BASE_MESSAGE_PROPERTIES)#message_properties{
+ needs_confirming = needs_confirming(Confirm)},
+ ChPid, BQS2),
+ {{Message, false, AckTag}, true,
+ State1#q{backing_queue_state = BQS3}}
+ end,
+ {Delivered, State2} =
+ deliver_msgs_to_consumers({ PredFun, DeliverFun }, false,
+ State#q{backing_queue_state = BQS1}),
+ {Delivered, Confirm, State2};
+ {Duplicate, BQS1} ->
+ %% if the message has previously been seen by the BQ then
+ %% it must have been seen under the same circumstances as
+ %% now: i.e. if it is now a deliver_immediately then it
+ %% must have been before.
+ Delivered = case Duplicate of
+ published -> true;
+ discarded -> false
+ end,
+ {Delivered, Confirm, State#q{backing_queue_state = BQS1}}
+ end;
+attempt_delivery(Delivery = #delivery{txn = Txn,
+ sender = ChPid,
+ message = Message},
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ Confirm = should_confirm_message(Delivery, State),
+ case BQ:is_duplicate(Txn, Message, BQS) of
+ {false, BQS1} ->
+ store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
+ BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid,
+ BQS1),
+ {true, Confirm, State#q{backing_queue_state = BQS2}};
+ {Duplicate, BQS1} ->
+ Delivered = case Duplicate of
+ published -> true;
+ discarded -> false
+ end,
+ {Delivered, Confirm, State#q{backing_queue_state = BQS1}}
+ end.
-deliver_or_enqueue(Delivery = #delivery{message = Message}, State) ->
+deliver_or_enqueue(Delivery = #delivery{message = Message,
+ sender = ChPid}, State) ->
{Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_record_confirm_message(Confirm, State1),
@@ -517,14 +549,17 @@ deliver_or_enqueue(Delivery = #delivery{message = Message}, State) ->
BQ:publish(Message,
(message_properties(State)) #message_properties{
needs_confirming = needs_confirming(Confirm)},
- BQS),
+ ChPid, BQS),
ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
run_backing_queue(
- fun (BQS) -> BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) end,
- State).
+ BQ, fun (M, BQS) ->
+ {_MsgIds, BQS1} =
+ M:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS),
+ BQS1
+ end, State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
@@ -627,10 +662,11 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
backing_queue_idle_timeout(State = #q{backing_queue = BQ}) ->
- run_backing_queue(fun (BQS) -> BQ:idle_timeout(BQS) end, State).
+ run_backing_queue(BQ, fun (M, BQS) -> M:idle_timeout(BQS) end, State).
-run_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- run_message_queue(State#q{backing_queue_state = Fun(BQS)}).
+run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}).
commit_transaction(Txn, From, C = #cr{acktags = ChAckTags},
State = #q{backing_queue = BQ,
@@ -654,6 +690,12 @@ rollback_transaction(Txn, C, State = #q{backing_queue = BQ,
subtract_acks(A, B) when is_list(B) ->
lists:foldl(fun sets:del_element/2, A, B).
+discard_delivery(#delivery{sender = ChPid,
+ message = Message},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}.
+
reset_msg_expiry_fun(TTL) ->
fun(MsgProps) ->
MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)}
@@ -760,11 +802,11 @@ emit_consumer_deleted(ChPid, ConsumerTag) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- {run_backing_queue, _Fun} -> 6;
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ {run_backing_queue, _Mod, _Fun} -> 6;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
@@ -780,7 +822,7 @@ prioritise_cast(Msg, _State) ->
{reject, _AckTags, _Requeue, _ChPid} -> 7;
{notify_sent, _ChPid} -> 7;
{unblock, _ChPid} -> 7;
- {run_backing_queue, _Fun} -> 6;
+ {run_backing_queue, _Mod, _Fun} -> 6;
sync_timeout -> 6;
_ -> 0
end.
@@ -799,14 +841,14 @@ handle_call({init, Recover}, From,
true -> erlang:monitor(process, Owner),
declare(Recover, From, State);
false -> #q{backing_queue = BQ, backing_queue_state = undefined,
- q = #amqqueue{name = QName, durable = IsDurable}} = State,
+ q = #amqqueue{name = QName} = Q} = State,
gen_server2:reply(From, not_found),
case Recover of
true -> ok;
_ -> rabbit_log:warning(
"Queue ~p exclusive owner went away~n", [QName])
end,
- BQS = bq_init(BQ, QName, IsDurable, Recover),
+ BQS = bq_init(BQ, Q, Recover),
%% Rely on terminate to delete the queue.
{stop, normal, State#q{backing_queue_state = BQS}}
end;
@@ -840,7 +882,7 @@ handle_call({deliver_immediately, Delivery}, _From, State) ->
{Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
reply(Delivered, case Delivered of
true -> maybe_record_confirm_message(Confirm, State1);
- false -> State1
+ false -> discard_delivery(Delivery, State1)
end);
handle_call({deliver, Delivery}, From, State) ->
@@ -996,12 +1038,12 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(requeue_and_run(AckTags, State))
end;
-handle_call({run_backing_queue, Fun}, _From, State) ->
- reply(ok, run_backing_queue(Fun, State)).
+handle_call({run_backing_queue, Mod, Fun}, _From, State) ->
+ reply(ok, run_backing_queue(Mod, Fun, State)).
-handle_cast({run_backing_queue, Fun}, State) ->
- noreply(run_backing_queue(Fun, State));
+handle_cast({run_backing_queue, Mod, Fun}, State) ->
+ noreply(run_backing_queue(Mod, Fun, State));
handle_cast(sync_timeout, State) ->
noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined}));
@@ -1020,7 +1062,7 @@ handle_cast({ack, Txn, AckTags, ChPid},
case Txn of
none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
NewC = C#cr{acktags = ChAckTags1},
- BQS1 = BQ:ack(AckTags, BQS),
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
{NewC, State#q{backing_queue_state = BQS1}};
_ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
{C#cr{txn = Txn},
@@ -1041,7 +1083,7 @@ handle_cast({reject, AckTags, Requeue, ChPid},
maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> BQS1 = BQ:ack(AckTags, BQS),
+ false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS),
State#q{backing_queue_state = BQS1}
end)
end;
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 0ca8d260ef..0955a0804b 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -35,19 +35,18 @@ behaviour_info(callbacks) ->
%% Initialise the backing queue and its state.
%%
%% Takes
- %% 1. the queue name
- %% 2. a boolean indicating whether the queue is durable
- %% 3. a boolean indicating whether the queue is an existing queue
+ %% 1. the amqqueue record
+ %% 2. a boolean indicating whether the queue is an existing queue
%% that should be recovered
- %% 4. an asynchronous callback which accepts a function of type
+ %% 3. an asynchronous callback which accepts a function of type
%% backing-queue-state to backing-queue-state. This callback
%% function can be safely invoked from any process, which
%% makes it useful for passing messages back into the backing
%% queue, especially as the backing queue does not have
%% control of its own mailbox.
- %% 5. a synchronous callback. Same as the asynchronous callback
+ %% 4. a synchronous callback. Same as the asynchronous callback
%% but waits for completion and returns 'error' on error.
- {init, 5},
+ {init, 4},
%% Called on queue shutdown when queue isn't being deleted.
{terminate, 1},
@@ -61,12 +60,12 @@ behaviour_info(callbacks) ->
{purge, 1},
%% Publish a message.
- {publish, 3},
+ {publish, 4},
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
- {publish_delivered, 4},
+ {publish_delivered, 5},
%% Return ids of messages which have been confirmed since
%% the last invocation of this function (or initialisation).
@@ -109,7 +108,7 @@ behaviour_info(callbacks) ->
{ack, 2},
%% A publish, but in the context of a transaction.
- {tx_publish, 4},
+ {tx_publish, 5},
%% Acks, but in the context of a transaction.
{tx_ack, 3},
@@ -165,7 +164,25 @@ behaviour_info(callbacks) ->
%% Exists for debugging purposes, to be able to expose state via
%% rabbitmqctl list_queues backing_queue_status
- {status, 1}
+ {status, 1},
+
+ %% Passed a function to be invoked with the relevant backing
+ %% queue's state. Useful for when the backing queue or other
+ %% components need to pass functions into the backing queue.
+ {invoke, 3},
+
+ %% Called prior to a publish or publish_delivered call. Allows
+ %% the BQ to signal that it's already seen this message (and in
+ %% what capacity - i.e. was it published previously or discarded
+ %% previously) and thus the message should be dropped.
+ {is_duplicate, 3},
+
+ %% Called to inform the BQ about messages which have reached the
+ %% queue, but are not going to be further passed to BQ for some
+ %% reason. Note that this is may be invoked for messages for
+ %% which BQ:is_duplicate/2 has already returned {'published' |
+ %% 'discarded', BQS}.
+ {discard, 3}
];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 6167790e58..dc119fbd5e 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -17,7 +17,7 @@
-module(rabbit_binding).
-include("rabbit.hrl").
--export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]).
+-export([recover/2, exists/1, add/1, add/2, remove/1, remove/2, list/1]).
-export([list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2]).
-export([new_deletions/0, combine_deletions/2, add_deletion/3,
@@ -38,24 +38,24 @@
-type(bind_errors() :: rabbit_types:error('source_not_found' |
'destination_not_found' |
'source_and_destination_not_found')).
--type(bind_res() :: 'ok' | bind_errors()).
+-type(bind_ok_or_error() :: 'ok' | bind_errors() |
+ rabbit_types:error('binding_not_found')).
+-type(bind_res() :: bind_ok_or_error() | rabbit_misc:const(bind_ok_or_error())).
-type(inner_fun() ::
fun((rabbit_types:exchange(),
rabbit_types:exchange() | rabbit_types:amqqueue()) ->
rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
-type(bindings() :: [rabbit_types:binding()]).
--type(add_res() :: bind_res() | rabbit_misc:const(bind_res())).
--type(bind_or_error() :: bind_res() | rabbit_types:error('binding_not_found')).
--type(remove_res() :: bind_or_error() | rabbit_misc:const(bind_or_error())).
-opaque(deletions() :: dict()).
--spec(recover/0 :: () -> [rabbit_types:binding()]).
+-spec(recover/2 :: ([rabbit_exchange:name()], [rabbit_amqqueue:name()]) ->
+ 'ok').
-spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()).
--spec(add/1 :: (rabbit_types:binding()) -> add_res()).
--spec(remove/1 :: (rabbit_types:binding()) -> remove_res()).
--spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> add_res()).
--spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> remove_res()).
+-spec(add/1 :: (rabbit_types:binding()) -> bind_res()).
+-spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()).
+-spec(remove/1 :: (rabbit_types:binding()) -> bind_res()).
+-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()).
-spec(list/1 :: (rabbit_types:vhost()) -> bindings()).
-spec(list_for_source/1 ::
(rabbit_types:binding_source()) -> bindings()).
@@ -93,14 +93,36 @@
destination_name, destination_kind,
routing_key, arguments]).
-recover() ->
- rabbit_misc:table_fold(
- fun (Route = #route{binding = B}, Acc) ->
- {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(rabbit_route, Route, write),
- ok = mnesia:write(rabbit_reverse_route, ReverseRoute, write),
- [B | Acc]
- end, [], rabbit_durable_route).
+recover(XNames, QNames) ->
+ XNameSet = sets:from_list(XNames),
+ QNameSet = sets:from_list(QNames),
+ rabbit_misc:table_filter(
+ fun (Route) ->
+ mnesia:read({rabbit_semi_durable_route, Route}) =:= []
+ end,
+ fun (Route, true) ->
+ ok = mnesia:write(rabbit_semi_durable_route, Route, write);
+ (_Route, false) ->
+ ok
+ end, rabbit_durable_route),
+ rabbit_misc:table_filter(
+ fun (#route{binding = #binding{destination = Dst =
+ #resource{kind = Kind}}}) ->
+ sets:is_element(Dst, case Kind of
+ exchange -> XNameSet;
+ queue -> QNameSet
+ end)
+ end,
+ fun (R = #route{binding = B = #binding{source = Src}}, Tx) ->
+ case Tx of
+ true -> ok = sync_transient_route(R, fun mnesia:write/3);
+ false -> ok
+ end,
+ {ok, X} = rabbit_exchange:lookup(Src),
+ rabbit_exchange:callback(X, add_binding, [Tx, X, B])
+ end,
+ rabbit_semi_durable_route),
+ ok.
exists(Binding) ->
binding_action(
@@ -110,8 +132,6 @@ exists(Binding) ->
add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end).
-remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end).
-
add(Binding, InnerFun) ->
binding_action(
Binding,
@@ -120,51 +140,49 @@ add(Binding, InnerFun) ->
%% in general, we want to fail on that in preference to
%% anything else
case InnerFun(Src, Dst) of
- ok ->
- case mnesia:read({rabbit_route, B}) of
- [] -> ok = sync_binding(B, all_durable([Src, Dst]),
- fun mnesia:write/3),
- fun (Tx) ->
- ok = rabbit_exchange:callback(
- Src, add_binding, [Tx, Src, B]),
- rabbit_event:notify_if(
- not Tx, binding_created, info(B))
- end;
- [_] -> fun rabbit_misc:const_ok/1
- end;
- {error, _} = Err ->
- rabbit_misc:const(Err)
+ ok -> case mnesia:read({rabbit_route, B}) of
+ [] -> add(Src, Dst, B);
+ [_] -> fun rabbit_misc:const_ok/1
+ end;
+ {error, _} = Err -> rabbit_misc:const(Err)
end
end).
+add(Src, Dst, B) ->
+ [SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]],
+ case (not (SrcDurable andalso DstDurable) orelse
+ mnesia:read({rabbit_durable_route, B}) =:= []) of
+ true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
+ fun mnesia:write/3),
+ fun (Tx) -> ok = rabbit_exchange:callback(Src, add_binding,
+ [Tx, Src, B]),
+ rabbit_event:notify_if(not Tx, binding_created,
+ info(B))
+ end;
+ false -> rabbit_misc:const({error, binding_not_found})
+ end.
+
+remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end).
+
remove(Binding, InnerFun) ->
binding_action(
Binding,
fun (Src, Dst, B) ->
- Result =
- case mnesia:match_object(rabbit_route, #route{binding = B},
- write) of
- [] ->
- {error, binding_not_found};
- [_] ->
- case InnerFun(Src, Dst) of
- ok ->
- ok = sync_binding(B, all_durable([Src, Dst]),
- fun mnesia:delete_object/3),
- {ok, maybe_auto_delete(B#binding.source,
- [B], new_deletions())};
- {error, _} = E ->
- E
- end
- end,
- case Result of
- {error, _} = Err ->
- rabbit_misc:const(Err);
- {ok, Deletions} ->
- fun (Tx) -> ok = process_deletions(Deletions, Tx) end
+ case mnesia:read(rabbit_route, B, write) of
+ [] -> rabbit_misc:const({error, binding_not_found});
+ [_] -> case InnerFun(Src, Dst) of
+ ok -> remove(Src, Dst, B);
+ {error, _} = Err -> rabbit_misc:const(Err)
+ end
end
end).
+remove(Src, Dst, B) ->
+ ok = sync_route(#route{binding = B}, durable(Src), durable(Dst),
+ fun mnesia:delete_object/3),
+ Deletions = maybe_auto_delete(B#binding.source, [B], new_deletions()),
+ fun (Tx) -> ok = process_deletions(Deletions, Tx) end.
+
list(VHostPath) ->
VHostResource = rabbit_misc:r(VHostPath, '_'),
Route = #route{binding = #binding{source = VHostResource,
@@ -222,32 +240,31 @@ has_for_source(SrcName) ->
%% we need to check for durable routes here too in case a bunch of
%% routes to durable queues have been removed temporarily as a
%% result of a node failure
- contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match).
+ contains(rabbit_route, Match) orelse
+ contains(rabbit_semi_durable_route, Match).
remove_for_source(SrcName) ->
+ Match = #route{binding = #binding{source = SrcName, _ = '_'}},
+ Routes = lists:usort(
+ mnesia:match_object(rabbit_route, Match, write) ++
+ mnesia:match_object(rabbit_durable_route, Match, write)),
[begin
- ok = mnesia:delete_object(rabbit_reverse_route,
- reverse_route(Route), write),
- ok = delete_forward_routes(Route),
+ sync_route(Route, fun mnesia:delete_object/3),
Route#route.binding
- end || Route <- mnesia:match_object(
- rabbit_route,
- #route{binding = #binding{source = SrcName,
- _ = '_'}},
- write)].
+ end || Route <- Routes].
-remove_for_destination(DstName) ->
- remove_for_destination(DstName, fun delete_forward_routes/1).
+remove_for_destination(Dst) ->
+ remove_for_destination(
+ Dst, fun (R) -> sync_route(R, fun mnesia:delete_object/3) end).
-remove_transient_for_destination(DstName) ->
- remove_for_destination(DstName, fun delete_transient_forward_routes/1).
+remove_transient_for_destination(Dst) ->
+ remove_for_destination(
+ Dst, fun (R) -> sync_transient_route(R, fun mnesia:delete_object/3) end).
%%----------------------------------------------------------------------------
-all_durable(Resources) ->
- lists:all(fun (#exchange{durable = D}) -> D;
- (#amqqueue{durable = D}) -> D
- end, Resources).
+durable(#exchange{durable = D}) -> D;
+durable(#amqqueue{durable = D}) -> D.
binding_action(Binding = #binding{source = SrcName,
destination = DstName,
@@ -259,31 +276,36 @@ binding_action(Binding = #binding{source = SrcName,
Fun(Src, Dst, Binding#binding{args = SortedArgs})
end).
-sync_binding(Binding, Durable, Fun) ->
- ok = case Durable of
- true -> Fun(rabbit_durable_route,
- #route{binding = Binding}, write);
- false -> ok
- end,
- {Route, ReverseRoute} = route_with_reverse(Binding),
+sync_route(R, Fun) -> sync_route(R, true, true, Fun).
+
+sync_route(Route, true, true, Fun) ->
+ ok = Fun(rabbit_durable_route, Route, write),
+ sync_route(Route, false, true, Fun);
+
+sync_route(Route, false, true, Fun) ->
+ ok = Fun(rabbit_semi_durable_route, Route, write),
+ sync_route(Route, false, false, Fun);
+
+sync_route(Route, _SrcDurable, false, Fun) ->
+ sync_transient_route(Route, Fun).
+
+sync_transient_route(Route, Fun) ->
ok = Fun(rabbit_route, Route, write),
- ok = Fun(rabbit_reverse_route, ReverseRoute, write),
- ok.
+ ok = Fun(rabbit_reverse_route, reverse_route(Route), write).
call_with_source_and_destination(SrcName, DstName, Fun) ->
SrcTable = table_for_resource(SrcName),
DstTable = table_for_resource(DstName),
- ErrFun = fun (Err) -> rabbit_misc:const(Err) end,
+ ErrFun = fun (Err) -> rabbit_misc:const({error, Err}) end,
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case {mnesia:read({SrcTable, SrcName}),
mnesia:read({DstTable, DstName})} of
{[Src], [Dst]} -> Fun(Src, Dst);
- {[], [_] } -> ErrFun({error, source_not_found});
- {[_], [] } -> ErrFun({error, destination_not_found});
- {[], [] } -> ErrFun({error,
- source_and_destination_not_found})
- end
+ {[], [_] } -> ErrFun(source_not_found);
+ {[_], [] } -> ErrFun(destination_not_found);
+ {[], [] } -> ErrFun(source_and_destination_not_found)
+ end
end).
table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
@@ -296,22 +318,15 @@ continue('$end_of_table') -> false;
continue({[_|_], _}) -> true;
continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
-remove_for_destination(DstName, FwdDeleteFun) ->
- Bindings =
- [begin
- Route = reverse_route(ReverseRoute),
- ok = FwdDeleteFun(Route),
- ok = mnesia:delete_object(rabbit_reverse_route,
- ReverseRoute, write),
- Route#route.binding
- end || ReverseRoute
- <- mnesia:match_object(
- rabbit_reverse_route,
- reverse_route(#route{
- binding = #binding{
- destination = DstName,
- _ = '_'}}),
- write)],
+remove_for_destination(DstName, DeleteFun) ->
+ Match = reverse_route(
+ #route{binding = #binding{destination = DstName, _ = '_'}}),
+ ReverseRoutes = mnesia:match_object(rabbit_reverse_route, Match, write),
+ Bindings = [begin
+ Route = reverse_route(ReverseRoute),
+ ok = DeleteFun(Route),
+ Route#route.binding
+ end || ReverseRoute <- ReverseRoutes],
group_bindings_fold(fun maybe_auto_delete/3, new_deletions(),
lists:keysort(#binding.source, Bindings)).
@@ -344,19 +359,6 @@ maybe_auto_delete(XName, Bindings, Deletions) ->
end,
add_deletion(XName, Entry, Deletions1).
-delete_forward_routes(Route) ->
- ok = mnesia:delete_object(rabbit_route, Route, write),
- ok = mnesia:delete_object(rabbit_durable_route, Route, write).
-
-delete_transient_forward_routes(Route) ->
- ok = mnesia:delete_object(rabbit_route, Route, write).
-
-route_with_reverse(#route{binding = Binding}) ->
- route_with_reverse(Binding);
-route_with_reverse(Binding = #binding{}) ->
- Route = #route{binding = Binding},
- {Route, reverse_route(Route)}.
-
reverse_route(#route{binding = Binding}) ->
#reverse_route{reverse_binding = reverse_binding(Binding)};
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5099bf3fbe..0c12614cc6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -156,6 +156,7 @@ ready_for_close(Pid) ->
init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
Capabilities, CollectorPid, StartLimiterFun]) ->
+ process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
StatsTimer = rabbit_event:init_stats_timer(),
State = #ch{state = starting,
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 8364ecd8d7..1af91f4c3a 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -127,6 +127,8 @@ usage() ->
io:format("~s", [rabbit_ctl_usage:usage()]),
quit(1).
+%%----------------------------------------------------------------------------
+
action(stop, Node, [], _Opts, Inform) ->
Inform("Stopping and halting node ~p", [Node]),
call(Node, {rabbit, stop_and_halt, []});
@@ -159,6 +161,10 @@ action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
[Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]);
+action(wait, Node, [], _Opts, Inform) ->
+ Inform("Waiting for ~p", [Node]),
+ wait_for_application(Node, ?WAIT_FOR_VM_ATTEMPTS);
+
action(status, Node, [], _Opts, Inform) ->
Inform("Status of node ~p", [Node]),
case call(Node, {rabbit, status, []}) of
@@ -292,18 +298,15 @@ action(list_permissions, Node, [], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Listing permissions in vhost ~p", [VHost]),
display_list(call(Node, {rabbit_auth_backend_internal,
- list_vhost_permissions, [VHost]}));
+ list_vhost_permissions, [VHost]})).
-action(wait, Node, [], _Opts, Inform) ->
- Inform("Waiting for ~p", [Node]),
- wait_for_application(Node, ?WAIT_FOR_VM_ATTEMPTS).
+%%----------------------------------------------------------------------------
wait_for_application(Node, Attempts) ->
case rpc_call(Node, application, which_applications, [infinity]) of
- {badrpc, _} = E -> NewAttempts = Attempts - 1,
- case NewAttempts of
+ {badrpc, _} = E -> case Attempts of
0 -> E;
- _ -> wait_for_application0(Node, NewAttempts)
+ _ -> wait_for_application0(Node, Attempts - 1)
end;
Apps -> case proplists:is_defined(rabbit, Apps) of
%% We've seen the node up; if it goes down
@@ -382,12 +385,9 @@ rpc_call(Node, Mod, Fun, Args) ->
%% characters. We don't escape characters above 127, since they may
%% form part of UTF-8 strings.
-escape(Atom) when is_atom(Atom) ->
- escape(atom_to_list(Atom));
-escape(Bin) when is_binary(Bin) ->
- escape(binary_to_list(Bin));
-escape(L) when is_list(L) ->
- escape_char(lists:reverse(L), []).
+escape(Atom) when is_atom(Atom) -> escape(atom_to_list(Atom));
+escape(Bin) when is_binary(Bin) -> escape(binary_to_list(Bin));
+escape(L) when is_list(L) -> escape_char(lists:reverse(L), []).
escape_char([$\\ | T], Acc) ->
escape_char(T, [$\\, $\\ | Acc]);
@@ -402,19 +402,15 @@ escape_char([], Acc) ->
prettify_amqp_table(Table) ->
[{escape(K), prettify_typed_amqp_value(T, V)} || {K, T, V} <- Table].
-prettify_typed_amqp_value(Type, Value) ->
- case Type of
- longstr -> escape(Value);
- table -> prettify_amqp_table(Value);
- array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value];
- _ -> Value
- end.
+prettify_typed_amqp_value(longstr, Value) -> escape(Value);
+prettify_typed_amqp_value(table, Value) -> prettify_amqp_table(Value);
+prettify_typed_amqp_value(array, Value) -> [prettify_typed_amqp_value(T, V) ||
+ {T, V} <- Value];
+prettify_typed_amqp_value(_Type, Value) -> Value.
%% the slower shutdown on windows required to flush stdout
quit(Status) ->
case os:type() of
- {unix, _} ->
- halt(Status);
- {win32, _} ->
- init:stop(Status)
+ {unix, _} -> halt(Status);
+ {win32, _} -> init:stop(Status)
end.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 0810c762c6..0dac18d1fe 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -16,7 +16,7 @@
-module(rabbit_direct).
--export([boot/0, connect/4, start_channel/8]).
+-export([boot/0, connect/5, start_channel/8, disconnect/1]).
-include("rabbit.hrl").
@@ -25,7 +25,8 @@
-ifdef(use_specs).
-spec(boot/0 :: () -> 'ok').
--spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) ->
+-spec(connect/5 :: (binary(), binary(), binary(), rabbit_types:protocol(),
+ rabbit_event:event_props()) ->
{'ok', {rabbit_types:user(),
rabbit_framing:amqp_table()}}).
-spec(start_channel/8 ::
@@ -33,6 +34,8 @@
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
pid()) -> {'ok', pid()}).
+-spec(disconnect/1 :: (rabbit_event:event_props()) -> 'ok').
+
-endif.
%%----------------------------------------------------------------------------
@@ -50,13 +53,14 @@ boot() ->
%%----------------------------------------------------------------------------
-connect(Username, Password, VHost, Protocol) ->
+connect(Username, Password, VHost, Protocol, Infos) ->
case lists:keymember(rabbit, 1, application:which_applications()) of
true ->
try rabbit_access_control:user_pass_login(Username, Password) of
#user{} = User ->
try rabbit_access_control:check_vhost_access(User, VHost) of
- ok -> {ok, {User,
+ ok -> rabbit_event:notify(connection_created, Infos),
+ {ok, {User,
rabbit_reader:server_properties(Protocol)}}
catch
exit:#amqp_error{name = access_refused} ->
@@ -77,3 +81,6 @@ start_channel(Number, ClientChannelPid, ConnPid, Protocol, User, VHost,
[{direct, Number, ClientChannelPid, ConnPid, Protocol, User, VHost,
Capabilities, Collector}]),
{ok, ChannelPid}.
+
+disconnect(Infos) ->
+ rabbit_event:notify(connection_closed, Infos).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 9d9b07aff4..421117736d 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -36,7 +36,7 @@
-type(type() :: atom()).
-type(fun_name() :: atom()).
--spec(recover/0 :: () -> 'ok').
+-spec(recover/0 :: () -> [name()]).
-spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok').
-spec(declare/6 ::
(name(), type(), boolean(), boolean(), boolean(),
@@ -83,25 +83,19 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]).
recover() ->
- Xs = rabbit_misc:table_fold(
- fun (X, Acc) ->
- ok = mnesia:write(rabbit_exchange, X, write),
- [X | Acc]
- end, [], rabbit_durable_exchange),
- Bs = rabbit_binding:recover(),
- recover_with_bindings(
- lists:keysort(#binding.source, Bs),
- lists:keysort(#exchange.name, Xs), []).
-
-recover_with_bindings([B = #binding{source = XName} | Rest],
- Xs = [#exchange{name = XName} | _],
- Bindings) ->
- recover_with_bindings(Rest, Xs, [B | Bindings]);
-recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) ->
- (type_to_module(Type)):recover(X, Bindings),
- recover_with_bindings(Bs, Xs, []);
-recover_with_bindings([], [], []) ->
- ok.
+ Xs = rabbit_misc:table_filter(
+ fun (#exchange{name = XName}) ->
+ mnesia:read({rabbit_exchange, XName}) =:= []
+ end,
+ fun (X, Tx) ->
+ case Tx of
+ true -> ok = mnesia:write(rabbit_exchange, X, write);
+ false -> ok
+ end,
+ rabbit_exchange:callback(X, create, [Tx, X])
+ end,
+ rabbit_durable_exchange),
+ [XName || #exchange{name = XName} <- Xs].
callback(#exchange{type = XType}, Fun, Args) ->
apply(type_to_module(XType), Fun, Args).
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 547583e9ac..cd96407cc7 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -26,16 +26,13 @@ behaviour_info(callbacks) ->
%% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
{validate, 1},
- %% called after declaration when previously absent
+ %% called after declaration and recovery
{create, 2},
- %% called when recovering
- {recover, 2},
-
- %% called after exchange deletion.
+ %% called after exchange (auto)deletion.
{delete, 3},
- %% called after a binding has been added
+ %% called after a binding has been added or recovered
{add_binding, 3},
%% called after bindings have been deleted.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 349c2f6ee4..40078b1a5f 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/2, recover/2, delete/3,
+-export([validate/1, create/2, delete/3,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -41,7 +41,6 @@ route(#exchange{name = Name},
validate(_X) -> ok.
create(_Tx, _X) -> ok.
-recover(_X, _Bs) -> ok.
delete(_Tx, _X, _Bs) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index bc5293c81d..f32ef91773 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+-export([validate/1, create/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -40,7 +40,6 @@ route(#exchange{name = Name}, _Delivery) ->
validate(_X) -> ok.
create(_Tx, _X) -> ok.
-recover(_X, _Bs) -> ok.
delete(_Tx, _X, _Bs) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index d3529b0657..139feb04f8 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -21,7 +21,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+-export([validate/1, create/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -114,7 +114,6 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
validate(_X) -> ok.
create(_Tx, _X) -> ok.
-recover(_X, _Bs) -> ok.
delete(_Tx, _X, _Bs) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index ffd1e58395..74c566b803 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -21,7 +21,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+-export([validate/1, create/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -49,12 +49,6 @@ route(#exchange{name = X},
validate(_X) -> ok.
create(_Tx, _X) -> ok.
-recover(_Exchange, Bs) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- lists:foreach(fun (B) -> internal_add_binding(B) end, Bs)
- end).
-
delete(true, #exchange{name = X}, _Bs) ->
trie_remove_all_edges(X),
trie_remove_all_bindings(X),
@@ -188,10 +182,10 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) ->
end.
trie_child(X, Node, Word) ->
- case mnesia:read(rabbit_topic_trie_edge,
- #trie_edge{exchange_name = X,
- node_id = Node,
- word = Word}) of
+ case mnesia:read({rabbit_topic_trie_edge,
+ #trie_edge{exchange_name = X,
+ node_id = Node,
+ word = Word}}) of
[#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode};
[] -> error
end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 6962317f5b..d82ef7f3ff 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -38,7 +38,7 @@
-export([ensure_ok/2]).
-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
-export([upmap/2, map_in_order/2]).
--export([table_fold/3]).
+-export([table_filter/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([read_term_file/1, write_term_file/2, write_file/3]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
@@ -48,8 +48,7 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
--export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3,
- unlink_and_capture_exit/1]).
+-export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3]).
-export([get_options/2]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
@@ -146,7 +145,8 @@
-> atom()).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
--spec(table_fold/3 :: (fun ((any(), A) -> A), A, atom()) -> A).
+-spec(table_filter/3:: (fun ((A) -> boolean()), fun ((A, boolean()) -> 'ok'),
+ atom()) -> [A]).
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom())
-> 'ok' | 'aborted').
@@ -179,7 +179,6 @@
-> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})).
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
--spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok').
-spec(get_options/2 :: ([optdef()], [string()])
-> {[string()], [{string(), any()}]}).
-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
@@ -462,20 +461,23 @@ map_in_order(F, L) ->
lists:reverse(
lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)).
-%% Fold over each entry in a table, executing the cons function in a
-%% transaction. This is often far more efficient than wrapping a tx
-%% around the lot.
+%% Apply a pre-post-commit function to all entries in a table that
+%% satisfy a predicate, and return those entries.
%%
%% We ignore entries that have been modified or removed.
-table_fold(F, Acc0, TableName) ->
+table_filter(Pred, PrePostCommitFun, TableName) ->
lists:foldl(
- fun (E, Acc) -> execute_mnesia_transaction(
- fun () -> case mnesia:match_object(TableName, E, read) of
- [] -> Acc;
- _ -> F(E, Acc)
- end
- end)
- end, Acc0, dirty_read_all(TableName)).
+ fun (E, Acc) ->
+ case execute_mnesia_transaction(
+ fun () -> mnesia:match_object(TableName, E, read) =/= []
+ andalso Pred(E) end,
+ fun (false, _Tx) -> false;
+ (true, Tx) -> PrePostCommitFun(E, Tx), true
+ end) of
+ false -> Acc;
+ true -> [E | Acc]
+ end
+ end, [], dirty_read_all(TableName)).
dirty_read_all(TableName) ->
mnesia:dirty_select(TableName, [{'$1',[],['$1']}]).
@@ -773,12 +775,6 @@ dict_cons(Key, Value, Dict) ->
orddict_cons(Key, Value, Dict) ->
orddict:update(Key, fun (List) -> [Value | List] end, [Value], Dict).
-unlink_and_capture_exit(Pid) ->
- unlink(Pid),
- receive {'EXIT', Pid, _} -> ok
- after 0 -> ok
- end.
-
%% Separate flags and options from arguments.
%% get_options([{flag, "-q"}, {option, "-p", "/"}],
%% ["set_permissions","-p","/","guest",
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index fbcf07ae77..77b06d0c08 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -187,6 +187,11 @@ table_definitions() ->
{attributes, record_info(fields, route)},
{disc_copies, [node()]},
{match, #route{binding = binding_match(), _='_'}}]},
+ {rabbit_semi_durable_route,
+ [{record_name, route},
+ {attributes, record_info(fields, route)},
+ {type, ordered_set},
+ {match, #route{binding = binding_match(), _='_'}}]},
{rabbit_route,
[{record_name, route},
{attributes, record_info(fields, route)},
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index bb26de64a3..3f4162cdd9 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -21,7 +21,7 @@
-export([start_link/4, successfully_recovered_state/1,
client_init/4, client_terminate/1, client_delete_and_terminate/1,
client_ref/1, close_all_indicated/1,
- write/3, read/2, contains/2, remove/2, release/2, sync/3]).
+ write/3, read/2, contains/2, remove/2, sync/3]).
-export([sync/1, set_maximum_since_use/2,
has_readers/2, combine_files/3, delete_file/2]). %% internal
@@ -67,7 +67,6 @@
gc_pid, %% pid of our GC
file_handles_ets, %% tid of the shared file handles table
file_summary_ets, %% tid of the file summary table
- dedup_cache_ets, %% tid of dedup cache table
cur_file_cache_ets, %% tid of current file cache table
dying_clients, %% set of dying clients
clients, %% map of references of all registered clients
@@ -87,7 +86,6 @@
gc_pid,
file_handles_ets,
file_summary_ets,
- dedup_cache_ets,
cur_file_cache_ets
}).
@@ -130,7 +128,6 @@
gc_pid :: pid(),
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
- dedup_cache_ets :: ets:tid(),
cur_file_cache_ets :: ets:tid()}).
-type(msg_ref_delta_gen(A) ::
fun ((A) -> 'finished' |
@@ -153,7 +150,6 @@
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
-spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()).
-spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok').
--spec(release/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok').
-spec(sync/3 ::
([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok').
@@ -396,7 +392,7 @@ successfully_recovered_state(Server) ->
client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
{IState, IModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
+ FileHandlesEts, FileSummaryEts, CurFileCacheEts} =
gen_server2:call(
Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
#client_msstate { server = Server,
@@ -408,7 +404,6 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }.
client_terminate(CState = #client_msstate { client_ref = Ref }) ->
@@ -429,27 +424,16 @@ write(MsgId, Msg,
ok = server_cast(CState, {write, CRef, MsgId}).
read(MsgId,
- CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
- %% 1. Check the dedup cache
- case fetch_and_increment_cache(DedupCacheEts, MsgId) of
- not_found ->
- %% 2. Check the cur file cache
- case ets:lookup(CurFileCacheEts, MsgId) of
- [] ->
- Defer = fun() ->
- {server_call(CState, {read, MsgId}), CState}
- end,
- case index_lookup_positive_ref_count(MsgId, CState) of
- not_found -> Defer();
- MsgLocation -> client_read1(MsgLocation, Defer, CState)
- end;
- [{MsgId, Msg, _CacheRefCount}] ->
- %% Although we've found it, we don't know the
- %% refcount, so can't insert into dedup cache
- {{ok, Msg}, CState}
+ CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ %% Check the cur file cache
+ case ets:lookup(CurFileCacheEts, MsgId) of
+ [] ->
+ Defer = fun() -> {server_call(CState, {read, MsgId}), CState} end,
+ case index_lookup_positive_ref_count(MsgId, CState) of
+ not_found -> Defer();
+ MsgLocation -> client_read1(MsgLocation, Defer, CState)
end;
- Msg ->
+ [{MsgId, Msg, _CacheRefCount}] ->
{{ok, Msg}, CState}
end.
@@ -457,8 +441,6 @@ contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
remove([], _CState) -> ok;
remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
server_cast(CState, {remove, CRef, MsgIds}).
-release([], _CState) -> ok;
-release(MsgIds, CState) -> server_cast(CState, {release, MsgIds}).
sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}).
sync(Server) ->
@@ -517,7 +499,6 @@ client_read2(false, _Right,
client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
CState = #client_msstate { file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
gc_pid = GCPid,
client_ref = Ref }) ->
Release =
@@ -574,8 +555,8 @@ client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
%% Could the msg_store now mark the file to be
%% closed? No: marks for closing are issued only
%% when the msg_store has locked the file.
- {Msg, CState2} = %% This will never be the current file
- read_from_disk(MsgLocation, CState1, DedupCacheEts),
+ %% This will never be the current file
+ {Msg, CState2} = read_from_disk(MsgLocation, CState1),
Release(), %% this MUST NOT fail with badarg
{{ok, Msg}, CState2};
#msg_location {} = MsgLocation -> %% different file!
@@ -639,7 +620,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
%% CleanShutdown <=> msg location index and file_summary both
%% recovered correctly.
- DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]),
FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
@@ -669,7 +649,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
dying_clients = sets:new(),
clients = Clients,
@@ -720,14 +699,12 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
index_module = IndexModule,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
clients = Clients,
gc_pid = GCPid }) ->
Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
- reply({IndexState, IndexModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
- State #msstate { clients = Clients1 });
+ reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
+ CurFileCacheEts}, State #msstate { clients = Clients1 });
handle_call({client_terminate, CRef}, _From, State) ->
reply(ok, clear_client(CRef, State));
@@ -781,12 +758,6 @@ handle_cast({remove, CRef, MsgIds}, State) ->
noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds),
removed, State1)));
-handle_cast({release, MsgIds}, State =
- #msstate { dedup_cache_ets = DedupCacheEts }) ->
- lists:foreach(
- fun (MsgId) -> decrement_cache(DedupCacheEts, MsgId) end, MsgIds),
- noreply(State);
-
handle_cast({sync, MsgIds, K},
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
@@ -840,7 +811,6 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
clients = Clients,
dir = Dir }) ->
@@ -856,7 +826,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
State3 = close_all_handles(State1),
ok = store_file_summary(FileSummaryEts, Dir),
[true = ets:delete(T) ||
- T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]],
+ T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]],
IndexModule:terminate(IndexState),
ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
{index_module, IndexModule}], Dir),
@@ -978,26 +948,18 @@ write_message(MsgId, Msg,
sum_valid_data = SumValid + TotalSize,
sum_file_size = SumFileSize + TotalSize }).
-read_message(MsgId, From,
- State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
+read_message(MsgId, From, State) ->
case index_lookup_positive_ref_count(MsgId, State) of
- not_found ->
- gen_server2:reply(From, not_found),
- State;
- MsgLocation ->
- case fetch_and_increment_cache(DedupCacheEts, MsgId) of
- not_found -> read_message1(From, MsgLocation, State);
- Msg -> gen_server2:reply(From, {ok, Msg}),
- State
- end
+ not_found -> gen_server2:reply(From, not_found),
+ State;
+ MsgLocation -> read_message1(From, MsgLocation, State)
end.
-read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
- file = File, offset = Offset } = MsgLoc,
+read_message1(From, #msg_location { msg_id = MsgId, file = File,
+ offset = Offset } = MsgLoc,
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }) ->
case File =:= CurFile of
true -> {Msg, State1} =
@@ -1010,10 +972,8 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
true -> file_handle_cache:flush(CurHdl);
false -> ok
end,
- read_from_disk(MsgLoc, State, DedupCacheEts);
+ read_from_disk(MsgLoc, State);
[{MsgId, Msg1, _CacheRefCount}] ->
- ok = maybe_insert_into_cache(
- DedupCacheEts, RefCount, MsgId, Msg1),
{Msg1, State}
end,
gen_server2:reply(From, {ok, Msg}),
@@ -1023,17 +983,14 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
case Locked of
true -> add_to_pending_gc_completion({read, MsgId, From},
File, State);
- false -> {Msg, State1} =
- read_from_disk(MsgLoc, State, DedupCacheEts),
+ false -> {Msg, State1} = read_from_disk(MsgLoc, State),
gen_server2:reply(From, {ok, Msg}),
State1
end
end.
-read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
- file = File, offset = Offset,
- total_size = TotalSize },
- State, DedupCacheEts) ->
+read_from_disk(#msg_location { msg_id = MsgId, file = File, offset = Offset,
+ total_size = TotalSize }, State) ->
{Hdl, State1} = get_read_handle(File, State),
{ok, Offset} = file_handle_cache:position(Hdl, Offset),
{ok, {MsgId, Msg}} =
@@ -1049,7 +1006,6 @@ read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
{proc_dict, get()}
]}}
end,
- ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg),
{Msg, State1}.
contains_message(MsgId, From,
@@ -1068,8 +1024,7 @@ contains_message(MsgId, From,
end.
remove_message(MsgId, CRef,
- State = #msstate { file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts }) ->
+ State = #msstate { file_summary_ets = FileSummaryEts }) ->
case should_mask_action(CRef, MsgId, State) of
{true, _Location} ->
State;
@@ -1090,8 +1045,7 @@ remove_message(MsgId, CRef,
%% don't remove from CUR_FILE_CACHE_ETS_NAME here
%% because there may be further writes in the mailbox
%% for the same msg.
- 1 -> ok = remove_cache_entry(DedupCacheEts, MsgId),
- case ets:lookup(FileSummaryEts, File) of
+ 1 -> case ets:lookup(FileSummaryEts, File) of
[#file_summary { locked = true }] ->
add_to_pending_gc_completion(
{remove, MsgId, CRef}, File, State);
@@ -1101,8 +1055,7 @@ remove_message(MsgId, CRef,
File, adjust_valid_total_size(File, -TotalSize,
State))
end;
- _ -> ok = decrement_cache(DedupCacheEts, MsgId),
- ok = Dec(),
+ _ -> ok = Dec(),
State
end
end.
@@ -1325,12 +1278,6 @@ list_sorted_file_names(Dir, Ext) ->
%% message cache helper functions
%%----------------------------------------------------------------------------
-maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg)
- when RefCount > 1 ->
- update_msg_cache(DedupCacheEts, MsgId, Msg);
-maybe_insert_into_cache(_DedupCacheEts, _RefCount, _MsgId, _Msg) ->
- ok.
-
update_msg_cache(CacheEts, MsgId, Msg) ->
case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of
true -> ok;
@@ -1339,34 +1286,6 @@ update_msg_cache(CacheEts, MsgId, Msg) ->
fun () -> update_msg_cache(CacheEts, MsgId, Msg) end)
end.
-remove_cache_entry(DedupCacheEts, MsgId) ->
- true = ets:delete(DedupCacheEts, MsgId),
- ok.
-
-fetch_and_increment_cache(DedupCacheEts, MsgId) ->
- case ets:lookup(DedupCacheEts, MsgId) of
- [] ->
- not_found;
- [{_MsgId, Msg, _RefCount}] ->
- safe_ets_update_counter_ok(
- DedupCacheEts, MsgId, {3, +1},
- %% someone has deleted us in the meantime, insert us
- fun () -> ok = update_msg_cache(DedupCacheEts, MsgId, Msg) end),
- Msg
- end.
-
-decrement_cache(DedupCacheEts, MsgId) ->
- true = safe_ets_update_counter(
- DedupCacheEts, MsgId, {3, -1},
- fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, MsgId);
- (_N) -> true
- end,
- %% MsgId is not in there because although it's been
- %% delivered, it's never actually been read (think:
- %% persistent message held in RAM)
- fun () -> true end),
- ok.
-
%%----------------------------------------------------------------------------
%% index
%%----------------------------------------------------------------------------
@@ -1592,8 +1511,8 @@ build_index(Gatherer, Left, [],
sum_file_size = SumFileSize }) ->
case gatherer:out(Gatherer) of
empty ->
+ unlink(Gatherer),
ok = gatherer:stop(Gatherer),
- ok = rabbit_misc:unlink_and_capture_exit(Gatherer),
ok = index_delete_by_file(undefined, State),
Offset = case ets:lookup(FileSummaryEts, Left) of
[] -> 0;
@@ -1972,7 +1891,10 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
force_recovery(BaseDir, Store) ->
Dir = filename:join(BaseDir, atom_to_list(Store)),
- ok = file:delete(filename:join(Dir, ?CLEAN_FILENAME)),
+ case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
+ ok -> ok;
+ {error, enoent} -> ok
+ end,
recover_crashed_compactions(BaseDir),
ok.
@@ -2007,7 +1929,10 @@ transform_msg_file(FileOld, FileNew, TransformFun) ->
rabbit_msg_file:scan(
RefOld, filelib:file_size(FileOld),
fun({MsgId, _Size, _Offset, BinMsg}, ok) ->
- {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)),
+ {ok, MsgNew} = case binary_to_term(BinMsg) of
+ <<>> -> {ok, <<>>}; %% dying client marker
+ Msg -> TransformFun(Msg)
+ end,
{ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew),
ok
end, ok),
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 9874ba03b4..f7218fbd6b 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -259,8 +259,13 @@ duplicate_node_check(NodeStr) ->
terminate(?ERROR_CODE);
false -> ok
end;
- {error, EpmdReason} -> terminate("unexpected epmd error: ~p~n",
- [EpmdReason])
+ {error, EpmdReason} ->
+ terminate("epmd error for host ~p: ~p (~s)~n",
+ [NodeHost, EpmdReason,
+ case EpmdReason of
+ address -> "unable to establish tcp connection";
+ _ -> inet:format_error(EpmdReason)
+ end])
end.
terminate(Fmt, Args) ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 367953b897..aaf3df7844 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -514,8 +514,8 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) ->
queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
case gatherer:out(Gatherer) of
empty ->
+ unlink(Gatherer),
ok = gatherer:stop(Gatherer),
- ok = rabbit_misc:unlink_and_capture_exit(Gatherer),
finished;
{value, {MsgId, Count}} ->
{MsgId, Count, {next, Gatherer}}
@@ -1036,8 +1036,8 @@ foreach_queue_index(Funs) ->
end)
end || QueueDirName <- QueueDirNames],
empty = gatherer:out(Gatherer),
- ok = gatherer:stop(Gatherer),
- ok = rabbit_misc:unlink_and_capture_exit(Gatherer).
+ unlink(Gatherer),
+ ok = gatherer:stop(Gatherer).
transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 609bb43ffe..42af91a8f2 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -681,7 +681,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
State#v1{connection_state = running,
connection = NewConnection}),
rabbit_event:notify(connection_created,
- infos(?CREATION_EVENT_KEYS, State1)),
+ [{type, network} |
+ infos(?CREATION_EVENT_KEYS, State1)]),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State1) end),
State1;
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index 1953b6b85c..e0defa9e96 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -89,8 +89,8 @@ find_by_type(Type, {rdnSequence, RDNs}) ->
case [V || #'AttributeTypeAndValue'{type = T, value = V}
<- lists:flatten(RDNs),
T == Type] of
- [{printableString, S}] -> S;
- [] -> not_found
+ [Val] -> format_asn1_value(Val);
+ [] -> not_found
end.
%%--------------------------------------------------------------------------
@@ -162,12 +162,85 @@ escape_rdn_value([C | S], middle) ->
format_asn1_value({ST, S}) when ST =:= teletexString; ST =:= printableString;
ST =:= universalString; ST =:= utf8String;
ST =:= bmpString ->
- if is_binary(S) -> binary_to_list(S);
- true -> S
- end;
+ format_directory_string(ST, S);
format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2,
Min1, Min2, S1, S2, $Z]}) ->
io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
[Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
format_asn1_value(V) ->
io_lib:format("~p", [V]).
+
+%% DirectoryString { INTEGER : maxSize } ::= CHOICE {
+%% teletexString TeletexString (SIZE (1..maxSize)),
+%% printableString PrintableString (SIZE (1..maxSize)),
+%% bmpString BMPString (SIZE (1..maxSize)),
+%% universalString UniversalString (SIZE (1..maxSize)),
+%% uTF8String UTF8String (SIZE (1..maxSize)) }
+%%
+%% Precise definitions of printable / teletexString are hard to come
+%% by. This is what I reconstructed:
+%%
+%% printableString:
+%% "intended to represent the limited character sets available to
+%% mainframe input terminals"
+%% A-Z a-z 0-9 ' ( ) + , - . / : = ? [space]
+%% http://msdn.microsoft.com/en-us/library/bb540814(v=vs.85).aspx
+%%
+%% teletexString:
+%% "a sizable volume of software in the world treats TeletexString
+%% (T61String) as a simple 8-bit string with mostly Windows Latin 1
+%% (superset of iso-8859-1) encoding"
+%% http://www.mail-archive.com/asn1@asn1.org/msg00460.html
+%%
+%% (However according to that link X.680 actually defines
+%% TeletexString in some much more involved and crazy way. I suggest
+%% we treat it as ISO-8859-1 since Erlang does not support Windows
+%% Latin 1).
+%%
+%% bmpString:
+%% UCS-2 according to RFC 3641. Hence cannot represent Unicode
+%% characters above 65535 (outside the "Basic Multilingual Plane").
+%%
+%% universalString:
+%% UCS-4 according to RFC 3641.
+%%
+%% utf8String:
+%% UTF-8 according to RFC 3641.
+%%
+%% Within Rabbit we assume UTF-8 encoding. Since printableString is a
+%% subset of ASCII it is also a subset of UTF-8. The others need
+%% converting. Fortunately since the Erlang SSL library does the
+%% decoding for us (albeit into a weird format, see below), we just
+%% need to handle encoding into UTF-8. Note also that utf8Strings come
+%% back as binary.
+%%
+%% Note for testing: the default Ubuntu configuration for openssl will
+%% only create printableString or teletexString types no matter what
+%% you do. Edit string_mask in the [req] section of
+%% /etc/ssl/openssl.cnf to change this (see comments there). You
+%% probably also need to set utf8 = yes to get it to accept UTF-8 on
+%% the command line. Also note I could not get openssl to generate a
+%% universalString.
+
+format_directory_string(printableString, S) -> S;
+format_directory_string(teletexString, S) -> utf8_list_from(S);
+format_directory_string(bmpString, S) -> utf8_list_from(S);
+format_directory_string(universalString, S) -> utf8_list_from(S);
+format_directory_string(utf8String, S) -> binary_to_list(S).
+
+utf8_list_from(S) ->
+ binary_to_list(
+ unicode:characters_to_binary(flatten_ssl_list(S), utf32, utf8)).
+
+%% The Erlang SSL implementation invents its own representation for
+%% non-ascii strings - looking like [97,{0,0,3,187}] (that's LATIN
+%% SMALL LETTER A followed by GREEK SMALL LETTER LAMDA). We convert
+%% this into a list of unicode characters, which we can tell
+%% unicode:characters_to_binary is utf32.
+
+flatten_ssl_list(L) -> [flatten_ssl_list_item(I) || I <- L].
+
+flatten_ssl_list_item({A, B, C, D}) ->
+ A * (1 bsl 24) + B * (1 bsl 16) + C * (1 bsl 8) + D;
+flatten_ssl_list_item(N) when is_number (N) ->
+ N.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index f4376293b7..93a5f73222 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -598,39 +598,37 @@ test_topic_matching() ->
exchange_op_callback(X, create, []),
%% add some bindings
- Bindings = lists:map(
- fun ({Key, Q}) ->
- #binding{source = XName,
- key = list_to_binary(Key),
- destination = #resource{virtual_host = <<"/">>,
- kind = queue,
- name = list_to_binary(Q)}}
- end, [{"a.b.c", "t1"},
- {"a.*.c", "t2"},
- {"a.#.b", "t3"},
- {"a.b.b.c", "t4"},
- {"#", "t5"},
- {"#.#", "t6"},
- {"#.b", "t7"},
- {"*.*", "t8"},
- {"a.*", "t9"},
- {"*.b.c", "t10"},
- {"a.#", "t11"},
- {"a.#.#", "t12"},
- {"b.b.c", "t13"},
- {"a.b.b", "t14"},
- {"a.b", "t15"},
- {"b.c", "t16"},
- {"", "t17"},
- {"*.*.*", "t18"},
- {"vodka.martini", "t19"},
- {"a.b.c", "t20"},
- {"*.#", "t21"},
- {"#.*.#", "t22"},
- {"*.#.#", "t23"},
- {"#.#.#", "t24"},
- {"*", "t25"},
- {"#.b.#", "t26"}]),
+ Bindings = [#binding{source = XName,
+ key = list_to_binary(Key),
+ destination = #resource{virtual_host = <<"/">>,
+ kind = queue,
+ name = list_to_binary(Q)}} ||
+ {Key, Q} <- [{"a.b.c", "t1"},
+ {"a.*.c", "t2"},
+ {"a.#.b", "t3"},
+ {"a.b.b.c", "t4"},
+ {"#", "t5"},
+ {"#.#", "t6"},
+ {"#.b", "t7"},
+ {"*.*", "t8"},
+ {"a.*", "t9"},
+ {"*.b.c", "t10"},
+ {"a.#", "t11"},
+ {"a.#.#", "t12"},
+ {"b.b.c", "t13"},
+ {"a.b.b", "t14"},
+ {"a.b", "t15"},
+ {"b.c", "t16"},
+ {"", "t17"},
+ {"*.*.*", "t18"},
+ {"vodka.martini", "t19"},
+ {"a.b.c", "t20"},
+ {"*.#", "t21"},
+ {"#.*.#", "t22"},
+ {"*.#.#", "t23"},
+ {"#.#.#", "t24"},
+ {"*", "t25"},
+ {"#.b.#", "t26"}]],
lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end,
Bindings),
@@ -669,22 +667,23 @@ test_topic_matching() ->
ordsets:from_list(RemovedBindings))),
%% test some matches
- test_topic_expect_match(X,
- [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22",
- "t23", "t24", "t26"]},
- {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15",
- "t22", "t23", "t24", "t26"]},
- {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22",
- "t23", "t24", "t26"]},
- {"", ["t6", "t17", "t24"]},
- {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]},
- {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]},
- {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]},
- {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]},
- {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23",
- "t24", "t26"]},
- {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]},
- {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]),
+ test_topic_expect_match(
+ X,
+ [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22",
+ "t23", "t24", "t26"]},
+ {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15",
+ "t22", "t23", "t24", "t26"]},
+ {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22",
+ "t23", "t24", "t26"]},
+ {"", ["t6", "t17", "t24"]},
+ {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]},
+ {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]},
+ {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]},
+ {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]},
+ {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23",
+ "t24", "t26"]},
+ {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]},
+ {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]),
%% remove the entire exchange
exchange_op_callback(X, delete, [RemainingBindings]),
@@ -701,9 +700,14 @@ test_topic_expect_match(X, List) ->
lists:foreach(
fun ({Key, Expected}) ->
BinKey = list_to_binary(Key),
+ Message = rabbit_basic:message(X#exchange.name, BinKey,
+ #'P_basic'{}, <<>>),
Res = rabbit_exchange_type_topic:route(
- X, #delivery{message = #basic_message{routing_keys =
- [BinKey]}}),
+ X, #delivery{mandatory = false,
+ immediate = false,
+ txn = none,
+ sender = self(),
+ message = Message}),
ExpectedRes = lists:map(
fun (Q) -> #resource{virtual_host = <<"/">>,
kind = queue,
@@ -1178,9 +1182,15 @@ test_server_status() ->
passed.
-test_spawn(Receiver) ->
+test_writer(Pid) ->
+ receive
+ shutdown -> ok;
+ {send_command, Method} -> Pid ! Method, test_writer(Pid)
+ end.
+
+test_spawn() ->
Me = self(),
- Writer = spawn(fun () -> Receiver(Me) end),
+ Writer = spawn(fun () -> test_writer(Me) end),
{ok, Ch} = rabbit_channel:start_link(
1, Me, Writer, Me, rabbit_framing_amqp_0_9_1,
user(<<"guest">>), <<"/">>, [], self(),
@@ -1198,20 +1208,9 @@ user(Username) ->
impl = #internal_user{username = Username,
is_admin = true}}.
-test_statistics_receiver(Pid) ->
- receive
- shutdown ->
- ok;
- {send_command, Method} ->
- Pid ! Method,
- test_statistics_receiver(Pid)
- end.
-
test_statistics_event_receiver(Pid) ->
receive
- Foo ->
- Pid ! Foo,
- test_statistics_event_receiver(Pid)
+ Foo -> Pid ! Foo, test_statistics_event_receiver(Pid)
end.
test_statistics_receive_event(Ch, Matcher) ->
@@ -1228,17 +1227,8 @@ test_statistics_receive_event1(Ch, Matcher) ->
after 1000 -> throw(failed_to_receive_event)
end.
-test_confirms_receiver(Pid) ->
- receive
- shutdown ->
- ok;
- {send_command, Method} ->
- Pid ! Method,
- test_confirms_receiver(Pid)
- end.
-
test_confirms() ->
- {_Writer, Ch} = test_spawn(fun test_confirms_receiver/1),
+ {_Writer, Ch} = test_spawn(),
DeclareBindDurableQueue =
fun() ->
rabbit_channel:do(Ch, #'queue.declare'{durable = true}),
@@ -1264,10 +1254,9 @@ test_confirms() ->
QPid1 = Q1#amqqueue.pid,
%% Enable confirms
rabbit_channel:do(Ch, #'confirm.select'{}),
- receive #'confirm.select_ok'{} ->
- ok
- after 1000 ->
- throw(failed_to_enable_confirms)
+ receive
+ #'confirm.select_ok'{} -> ok
+ after 1000 -> throw(failed_to_enable_confirms)
end,
%% Publish a message
rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"amq.direct">>,
@@ -1279,25 +1268,19 @@ test_confirms() ->
QPid1 ! boom,
%% Wait for a nack
receive
- #'basic.nack'{} ->
- ok;
- #'basic.ack'{} ->
- throw(received_ack_instead_of_nack)
- after 2000 ->
- throw(did_not_receive_nack)
+ #'basic.nack'{} -> ok;
+ #'basic.ack'{} -> throw(received_ack_instead_of_nack)
+ after 2000 -> throw(did_not_receive_nack)
end,
receive
- #'basic.ack'{} ->
- throw(received_ack_when_none_expected)
- after 1000 ->
- ok
+ #'basic.ack'{} -> throw(received_ack_when_none_expected)
+ after 1000 -> ok
end,
%% Cleanup
rabbit_channel:do(Ch, #'queue.delete'{queue = QName2}),
- receive #'queue.delete_ok'{} ->
- ok
- after 1000 ->
- throw(failed_to_cleanup_queue)
+ receive
+ #'queue.delete_ok'{} -> ok
+ after 1000 -> throw(failed_to_cleanup_queue)
end,
unlink(Ch),
ok = rabbit_channel:shutdown(Ch),
@@ -1311,7 +1294,7 @@ test_statistics() ->
%% by far the most complex code though.
%% Set up a channel and queue
- {_Writer, Ch} = test_spawn(fun test_statistics_receiver/1),
+ {_Writer, Ch} = test_spawn(),
rabbit_channel:do(Ch, #'queue.declare'{}),
QName = receive #'queue.declare_ok'{queue = Q0} ->
Q0
@@ -1462,18 +1445,8 @@ test_delegates_sync(SecondaryNode) ->
passed.
-test_queue_cleanup_receiver(Pid) ->
- receive
- shutdown ->
- ok;
- {send_command, Method} ->
- Pid ! Method,
- test_queue_cleanup_receiver(Pid)
- end.
-
-
test_queue_cleanup(_SecondaryNode) ->
- {_Writer, Ch} = test_spawn(fun test_queue_cleanup_receiver/1),
+ {_Writer, Ch} = test_spawn(),
rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }),
receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} ->
ok
@@ -1813,8 +1786,6 @@ test_msg_store() ->
true = msg_store_contains(true, MsgIds2ndHalf, MSCState2),
%% read the second half again
MSCState3 = msg_store_read(MsgIds2ndHalf, MSCState2),
- %% release the second half, just for fun (aka code coverage)
- ok = rabbit_msg_store:release(MsgIds2ndHalf, MSCState3),
%% read the second half again, just for fun (aka code coverage)
MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3),
ok = rabbit_msg_store:client_terminate(MSCState4),
@@ -2101,9 +2072,9 @@ test_queue_index() ->
passed.
-variable_queue_init(QName, IsDurable, Recover) ->
- rabbit_variable_queue:init(QName, IsDurable, Recover,
- fun nop/1, fun nop/1, fun nop/2, fun nop/1).
+variable_queue_init(Q, Recover) ->
+ rabbit_variable_queue:init(
+ Q, Recover, fun nop/1, fun nop/1, fun nop/2, fun nop/1).
variable_queue_publish(IsPersistent, Count, VQ) ->
lists:foldl(
@@ -2115,7 +2086,7 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
true -> 2;
false -> 1
end}, <<>>),
- #message_properties{}, VQN)
+ #message_properties{}, self(), VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
@@ -2133,9 +2104,13 @@ assert_prop(List, Prop, Value) ->
assert_props(List, PropVals) ->
[assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals].
+test_amqqueue(Durable) ->
+ (rabbit_amqqueue:pseudo_queue(test_queue(), self()))
+ #amqqueue { durable = Durable }.
+
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
- VQ = variable_queue_init(test_queue(), true, false),
+ VQ = variable_queue_init(test_amqqueue(true), false),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -2193,7 +2168,7 @@ test_dropwhile(VQ0) ->
rabbit_basic:message(
rabbit_misc:r(<<>>, exchange, <<>>),
<<>>, #'P_basic'{}, <<>>),
- #message_properties{expiry = N}, VQN)
+ #message_properties{expiry = N}, self(), VQN)
end, VQ0, lists:seq(1, Count)),
%% drop the first 5 messages
@@ -2237,7 +2212,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- VQ9 = rabbit_variable_queue:ack(AckTags, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2247,7 +2222,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- VQ3 = rabbit_variable_queue:ack([AckTag], VQ2),
+ {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2281,7 +2256,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2310,7 +2285,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = variable_queue_init(test_queue(), true, true),
+ VQ7 = variable_queue_init(test_amqqueue(true), true),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -2323,17 +2298,18 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(false, 4, VQ1),
{VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2),
- VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
+ {_Guids, VQ4} =
+ rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = variable_queue_init(test_queue(), true, true),
+ VQ7 = variable_queue_init(test_amqqueue(true), true),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
test_queue_recover() ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
TxID = rabbit_guid:guid(),
- {new, #amqqueue { pid = QPid, name = QName }} =
+ {new, #amqqueue { pid = QPid, name = QName } = Q} =
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
[begin
Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
@@ -2349,7 +2325,7 @@ test_queue_recover() ->
after 10000 -> exit(timeout_waiting_for_queue_death)
end,
rabbit_amqqueue:stop(),
- ok = rabbit_amqqueue:start(),
+ rabbit_amqqueue:start(),
rabbit_amqqueue:with_or_die(
QName,
fun (Q1 = #amqqueue { pid = QPid1 }) ->
@@ -2357,7 +2333,7 @@ test_queue_recover() ->
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
- VQ1 = variable_queue_init(QName, true, true),
+ VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 7567c29ef3..842c3b4fac 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -26,6 +26,7 @@
-rabbit_upgrade({internal_exchanges, mnesia, []}).
-rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}).
-rabbit_upgrade({topic_trie, mnesia, []}).
+-rabbit_upgrade({semi_durable_route, mnesia, []}).
%% -------------------------------------------------------------------
@@ -37,6 +38,7 @@
-spec(internal_exchanges/0 :: () -> 'ok').
-spec(user_to_internal_user/0 :: () -> 'ok').
-spec(topic_trie/0 :: () -> 'ok').
+-spec(semi_durable_route/0 :: () -> 'ok').
-endif.
@@ -101,6 +103,10 @@ topic_trie() ->
{attributes, [trie_binding, value]},
{type, ordered_set}]).
+semi_durable_route() ->
+ create(rabbit_semi_durable_route, [{record_name, route},
+ {attributes, [binding, value]}]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7a1102e5be..7a3c17a29c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,18 +16,19 @@
-module(rabbit_variable_queue).
--export([init/5, terminate/1, delete_and_terminate/1,
- purge/1, publish/3, publish_delivered/4, drain_confirmed/1,
- fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
+-export([init/4, terminate/1, delete_and_terminate/1,
+ purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
+ fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1, multiple_routing_keys/0]).
+ status/1, invoke/3, is_duplicate/3, discard/3,
+ multiple_routing_keys/0]).
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/7]).
+-export([start_msg_store/2, stop_msg_store/0, init/6]).
%%----------------------------------------------------------------------------
%% Definitions:
@@ -408,15 +409,15 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback) ->
- init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback,
+init(Queue, Recover, AsyncCallback, SyncCallback) ->
+ init(Queue, Recover, AsyncCallback, SyncCallback,
fun (MsgIds, ActionTaken) ->
msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken)
end,
fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
-init(QueueName, IsDurable, false, AsyncCallback, SyncCallback,
- MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(#amqqueue { name = QueueName, durable = IsDurable }, false,
+ AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback,
case IsDurable of
@@ -426,8 +427,8 @@ init(QueueName, IsDurable, false, AsyncCallback, SyncCallback,
end,
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
-init(QueueName, true, true, AsyncCallback, SyncCallback,
- MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(#amqqueue { name = QueueName, durable = true }, true,
+ AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, TRef, Terms1} =
case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
@@ -517,13 +518,14 @@ purge(State = #vqstate { q4 = Q4,
ram_index_count = 0,
persistent_count = PCount1 })}.
-publish(Msg, MsgProps, State) ->
+publish(Msg, MsgProps, _ChPid, State) ->
{_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
publish_delivered(false, #basic_message { id = MsgId },
#message_properties { needs_confirming = NeedsConfirming },
- State = #vqstate { async_callback = Callback, len = 0 }) ->
+ _ChPid, State = #vqstate { async_callback = Callback,
+ len = 0 }) ->
case NeedsConfirming of
true -> blind_confirm(Callback, gb_sets:singleton(MsgId));
false -> ok
@@ -533,13 +535,13 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
- State = #vqstate { len = 0,
- next_seq_id = SeqId,
- out_counter = OutCount,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
+ _ChPid, State = #vqstate { len = 0,
+ next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = true },
@@ -665,13 +667,14 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
persistent_count = PCount1 })}.
ack(AckTags, State) ->
- a(ack(fun msg_store_remove/3,
- fun (_, State0) -> State0 end,
- AckTags, State)).
+ {MsgIds, State1} = ack(fun msg_store_remove/3,
+ fun (_, State0) -> State0 end,
+ AckTags, State),
+ {MsgIds, a(State1)}.
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
- State = #vqstate { durable = IsDurable,
- msg_store_clients = MSCState }) ->
+ _ChPid, State = #vqstate { durable = IsDurable,
+ msg_store_clients = MSCState }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
case IsPersistent andalso IsDurable of
@@ -727,8 +730,8 @@ requeue(AckTags, MsgPropsFun, State) ->
(MsgPropsFun(MsgProps)) #message_properties {
needs_confirming = false }
end,
- a(reduce_memory_use(
- ack(fun msg_store_release/3,
+ {MsgIds, State1} =
+ ack(fun (_, _, _) -> ok end,
fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
{_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps),
true, false, State1),
@@ -742,7 +745,8 @@ requeue(AckTags, MsgPropsFun, State) ->
true, true, State2),
State3
end,
- AckTags, State))).
+ AckTags, State),
+ {MsgIds, a(reduce_memory_use(State1))}.
len(#vqstate { len = Len }) -> Len.
@@ -880,6 +884,13 @@ status(#vqstate {
{avg_ack_ingress_rate, AvgAckIngressRate},
{avg_ack_egress_rate , AvgAckEgressRate} ].
+invoke(?MODULE, Fun, State) ->
+ Fun(?MODULE, State).
+
+is_duplicate(_Txn, _Msg, State) -> {false, State}.
+
+discard(_Msg, _ChPid, State) -> State.
+
%%----------------------------------------------------------------------------
%% Minor helpers
%%----------------------------------------------------------------------------
@@ -954,8 +965,8 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
- rabbit_msg_store:client_init(
- MsgStore, Ref, MsgOnDiskFun, fun () -> Callback(CloseFDsFun) end).
+ rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun,
+ fun () -> Callback(?MODULE, CloseFDsFun) end).
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
@@ -972,11 +983,6 @@ msg_store_remove(MSCState, IsPersistent, MsgIds) ->
MSCState, IsPersistent,
fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end).
-msg_store_release(MSCState, IsPersistent, MsgIds) ->
- with_immutable_msg_store_state(
- MSCState, IsPersistent,
- fun (MCSState1) -> rabbit_msg_store:release(MsgIds, MCSState1) end).
-
msg_store_sync(MSCState, IsPersistent, MsgIds, Fun) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
@@ -988,7 +994,7 @@ msg_store_close_fds(MSCState, IsPersistent) ->
fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
msg_store_close_fds_fun(IsPersistent) ->
- fun (State = #vqstate { msg_store_clients = MSCState }) ->
+ fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) ->
{ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent),
State #vqstate { msg_store_clients = MSCState1 }
end.
@@ -1134,7 +1140,8 @@ blank_rate(Timestamp, IngressLength) ->
msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun,
AsyncCallback, SyncCallback) ->
- case SyncCallback(fun (StateN) ->
+ case SyncCallback(?MODULE,
+ fun (?MODULE, StateN) ->
tx_commit_post_msg_store(true, Pubs, AckTags,
Fun, MsgPropsFun, StateN)
end) of
@@ -1197,20 +1204,21 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
Acks = lists:append(SAcks),
Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
{Msg, MsgProps} <- lists:reverse(PubsN)],
- {SeqIds, State1 = #vqstate { index_state = IndexState }} =
+ {_MsgIds, State1} = ack(Acks, State),
+ {SeqIds, State2 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun ({Msg = #basic_message { is_persistent = IsPersistent },
MsgProps},
- {SeqIdsAcc, State2}) ->
+ {SeqIdsAcc, State3}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- {SeqId, State3} =
- publish(Msg, MsgProps, false, IsPersistent1, State2),
- {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
- end, {PAcks, ack(Acks, State)}, Pubs),
+ {SeqId, State4} =
+ publish(Msg, MsgProps, false, IsPersistent1, State3),
+ {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State4}
+ end, {PAcks, State1}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
reduce_memory_use(
- State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
+ State2 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
@@ -1357,7 +1365,7 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {PersistentSeqIds, MsgIdsByStore} =
+ {PersistentSeqIds, MsgIdsByStore, _AllMsgIds} =
dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
@@ -1376,9 +1384,9 @@ remove_pending_ack(KeepPersistent,
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- State;
+ {[], State};
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{PersistentSeqIds, MsgIdsByStore},
+ {{PersistentSeqIds, MsgIdsByStore, AllMsgIds},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1398,21 +1406,24 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
orddict:new(), MsgIdsByStore)),
- State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }.
+ {lists:reverse(AllMsgIds),
+ State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + length(AckTags) }}.
-accumulate_ack_init() -> {[], orddict:new()}.
+accumulate_ack_init() -> {[], orddict:new(), []}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
- index_on_disk = false },
- {PersistentSeqIdsAcc, MsgIdsByStore}) ->
- {PersistentSeqIdsAcc, MsgIdsByStore};
+ index_on_disk = false,
+ msg_id = MsgId },
+ {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
+ {PersistentSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]};
accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps},
- {PersistentSeqIdsAcc, MsgIdsByStore}) ->
+ {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
{cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore)}.
+ rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore),
+ [MsgId | AllMsgIds]}.
find_persistent_count(LensByStore) ->
case orddict:find(true, LensByStore) of
@@ -1456,14 +1467,16 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
blind_confirm(Callback, MsgIdSet) ->
- Callback(fun (State) -> record_confirms(MsgIdSet, State) end).
+ Callback(?MODULE,
+ fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
msgs_written_to_disk(Callback, MsgIdSet, removed) ->
blind_confirm(Callback, MsgIdSet);
msgs_written_to_disk(Callback, MsgIdSet, written) ->
- Callback(fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+ Callback(?MODULE,
+ fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
Confirmed = gb_sets:intersection(UC, MsgIdSet),
record_confirms(gb_sets:intersection(MsgIdSet, MIOD),
State #vqstate {
@@ -1472,9 +1485,10 @@ msgs_written_to_disk(Callback, MsgIdSet, written) ->
end).
msg_indices_written_to_disk(Callback, MsgIdSet) ->
- Callback(fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+ Callback(?MODULE,
+ fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
Confirmed = gb_sets:intersection(UC, MsgIdSet),
record_confirms(gb_sets:intersection(MsgIdSet, MOD),
State #vqstate {
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 1a240856ce..ec1ee9cd90 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -38,6 +38,9 @@
%% child is a supervisor and it exits normally (i.e. with reason of
%% 'shutdown') then the child's parent also exits normally.
%%
+%% 5) normal, and {shutdown, _} exit reasons are all treated the same
+%% (i.e. are regarded as normal exits)
+%%
%% All modifications are (C) 2010-2011 VMware, Inc.
%%
%% %CopyrightBegin%
@@ -544,17 +547,12 @@ do_restart({RestartType, Delay}, Reason, Child, State) ->
do_restart(permanent, Reason, Child, State) ->
report_error(child_terminated, Reason, Child, State#state.name),
restart(Child, State);
-do_restart(intrinsic, normal, Child, State) ->
- {shutdown, state_del_child(Child, State)};
-do_restart(intrinsic, shutdown, Child = #child{child_type = supervisor},
- State) ->
- {shutdown, state_del_child(Child, State)};
-do_restart(_, normal, Child, State) ->
- NState = state_del_child(Child, State),
- {ok, NState};
-do_restart(_, shutdown, Child, State) ->
- NState = state_del_child(Child, State),
- {ok, NState};
+do_restart(Type, normal, Child, State) ->
+ del_child_and_maybe_shutdown(Type, Child, State);
+do_restart(Type, {shutdown, _}, Child, State) ->
+ del_child_and_maybe_shutdown(Type, Child, State);
+do_restart(Type, shutdown, Child = #child{child_type = supervisor}, State) ->
+ del_child_and_maybe_shutdown(Type, Child, State);
do_restart(Type, Reason, Child, State) when Type =:= transient orelse
Type =:= intrinsic ->
report_error(child_terminated, Reason, Child, State#state.name),
@@ -564,6 +562,11 @@ do_restart(temporary, Reason, Child, State) ->
NState = state_del_child(Child, State),
{ok, NState}.
+del_child_and_maybe_shutdown(intrinsic, Child, State) ->
+ {shutdown, state_del_child(Child, State)};
+del_child_and_maybe_shutdown(_, Child, State) ->
+ {ok, state_del_child(Child, State)}.
+
restart(Child, State) ->
case add_restart(State) of
{ok, NState} ->
diff --git a/src/test_sup.erl b/src/test_sup.erl
index b4df1fd042..150235da9b 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -45,8 +45,8 @@ test_supervisor_delayed_restart(SupPid) ->
with_sup(RestartStrategy, Fun) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, [RestartStrategy]),
Res = Fun(SupPid),
+ unlink(SupPid),
exit(SupPid, shutdown),
- rabbit_misc:unlink_and_capture_exit(SupPid),
Res.
init([RestartStrategy]) ->