summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-11-15 22:46:45 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2010-11-15 22:46:45 +0000
commita5c489b17f85c935a41b618936db03125ab819dd (patch)
tree6876b27385f734227bafd04bfda5c7aee55bb380
parent19d62579c3cf28e05300ed068c95dd1d7976e885 (diff)
parent53f90c8adb1a9cfedc3dd552663d596af58dbe94 (diff)
downloadrabbitmq-server-git-a5c489b17f85c935a41b618936db03125ab819dd.tar.gz
merge bug23495 into default
-rw-r--r--Makefile12
-rw-r--r--docs/rabbitmqctl.1.xml5
-rw-r--r--generate_deps23
-rw-r--r--packaging/macports/Makefile13
-rw-r--r--packaging/macports/Portfile.in23
-rwxr-xr-xpackaging/macports/make-checksums.sh14
-rw-r--r--src/rabbit.erl87
-rw-r--r--src/rabbit_amqqueue_process.erl63
-rw-r--r--src/rabbit_connection_sup.erl22
-rw-r--r--src/rabbit_control.erl2
-rw-r--r--src/rabbit_heartbeat.erl54
-rw-r--r--src/rabbit_misc.erl53
-rw-r--r--src/rabbit_mnesia.erl93
-rw-r--r--src/rabbit_net.erl6
-rw-r--r--src/rabbit_reader.erl31
-rw-r--r--src/rabbit_upgrade.erl156
-rw-r--r--src/rabbit_upgrade_functions.erl51
17 files changed, 529 insertions, 179 deletions
diff --git a/Makefile b/Makefile
index b43808a1e8..d3f052f6ad 100644
--- a/Makefile
+++ b/Makefile
@@ -93,7 +93,7 @@ all: $(TARGETS)
$(DEPS_FILE): $(SOURCES) $(INCLUDES)
rm -f $@
- escript generate_deps $(INCLUDE_DIR) $(SOURCE_DIR) \$$\(EBIN_DIR\) $@
+ echo $(subst : ,:,$(foreach FILE,$^,$(FILE):)) | escript generate_deps $@ $(EBIN_DIR)
$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
escript generate_app $(EBIN_DIR) $@ < $<
@@ -267,7 +267,9 @@ $(SOURCE_DIR)/%_usage.erl:
docs_all: $(MANPAGES) $(WEB_MANPAGES)
-install: all docs_all install_dirs
+install: install_bin install_docs
+
+install_bin: all install_dirs
cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR)
chmod 0755 scripts/*
@@ -275,14 +277,16 @@ install: all docs_all install_dirs
cp scripts/$$script $(TARGET_DIR)/sbin; \
[ -e $(SBIN_DIR)/$$script ] || ln -s $(SCRIPTS_REL_PATH)/$$script $(SBIN_DIR)/$$script; \
done
+ mkdir -p $(TARGET_DIR)/plugins
+ echo Put your .ez plugin files in this directory. > $(TARGET_DIR)/plugins/README
+
+install_docs: docs_all install_dirs
for section in 1 5; do \
mkdir -p $(MAN_DIR)/man$$section; \
for manpage in $(DOCS_DIR)/*.$$section.gz; do \
cp $$manpage $(MAN_DIR)/man$$section; \
done; \
done
- mkdir -p $(TARGET_DIR)/plugins
- echo Put your .ez plugin files in this directory. > $(TARGET_DIR)/plugins/README
install_dirs:
@ OK=true && \
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index acb99bc848..6b02abe438 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -972,6 +972,11 @@
<listitem><para>Peer port.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>ssl</term>
+ <listitem><para>Boolean indicating whether the
+ connection is secured with SSL.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>peer_cert_subject</term>
<listitem><para>The subject of the peer's SSL
certificate, in RFC4514 form.</para></listitem>
diff --git a/generate_deps b/generate_deps
index 29587b5a5f..ddfca816b4 100644
--- a/generate_deps
+++ b/generate_deps
@@ -2,18 +2,21 @@
%% -*- erlang -*-
-mode(compile).
-main([IncludeDir, ErlDir, EbinDir, TargetFile]) ->
- ErlDirContents = filelib:wildcard("*.erl", ErlDir),
- ErlFiles = [filename:join(ErlDir, FileName) || FileName <- ErlDirContents],
+%% We expect the list of Erlang source and header files to arrive on
+%% stdin, with the entries colon-separated.
+main([TargetFile, EbinDir]) ->
+ ErlsAndHrls = [ string:strip(S,left) ||
+ S <- string:tokens(io:get_line(""), ":\n")],
+ ErlFiles = [F || F <- ErlsAndHrls, lists:suffix(".erl", F)],
Modules = sets:from_list(
[list_to_atom(filename:basename(FileName, ".erl")) ||
- FileName <- ErlDirContents]),
- Headers = sets:from_list(
- [filename:join(IncludeDir, FileName) ||
- FileName <- filelib:wildcard("*.hrl", IncludeDir)]),
+ FileName <- ErlFiles]),
+ HrlFiles = [F || F <- ErlsAndHrls, lists:suffix(".hrl", F)],
+ IncludeDirs = lists:usort([filename:dirname(Path) || Path <- HrlFiles]),
+ Headers = sets:from_list(HrlFiles),
Deps = lists:foldl(
fun (Path, Deps1) ->
- dict:store(Path, detect_deps(IncludeDir, EbinDir,
+ dict:store(Path, detect_deps(IncludeDirs, EbinDir,
Modules, Headers, Path),
Deps1)
end, dict:new(), ErlFiles),
@@ -33,8 +36,8 @@ main([IncludeDir, ErlDir, EbinDir, TargetFile]) ->
ok = file:sync(Hdl),
ok = file:close(Hdl).
-detect_deps(IncludeDir, EbinDir, Modules, Headers, Path) ->
- {ok, Forms} = epp:parse_file(Path, [IncludeDir], [{use_specs, true}]),
+detect_deps(IncludeDirs, EbinDir, Modules, Headers, Path) ->
+ {ok, Forms} = epp:parse_file(Path, IncludeDirs, [{use_specs, true}]),
lists:foldl(
fun ({attribute, _LineNumber, Attribute, Behaviour}, Deps)
when Attribute =:= behaviour orelse Attribute =:= behavior ->
diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile
index 3a22eef08a..ee79c95a6a 100644
--- a/packaging/macports/Makefile
+++ b/packaging/macports/Makefile
@@ -1,7 +1,9 @@
-TARBALL_DIR=../../dist
-TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz))
+TARBALL_SRC_DIR=../../dist
+TARBALL_BIN_DIR=../../packaging/generic-unix/
+TARBALL_SRC=$(wildcard $(TARBALL_SRC_DIR)/rabbitmq-server-[0-9.]*.tar.gz)
+TARBALL_BIN=$(wildcard $(TARBALL_BIN_DIR)/rabbitmq-server-generic-unix-[0-9.]*.tar.gz)
COMMON_DIR=../common
-VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g')
+VERSION=$(shell echo $(TARBALL_SRC) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g')
# The URL at which things really get deployed
REAL_WEB_URL=http://www.rabbitmq.com/
@@ -23,10 +25,7 @@ dirs:
mkdir -p $(DEST)/files
$(DEST)/Portfile: Portfile.in
- for algo in md5 sha1 rmd160 ; do \
- checksum=$$(openssl $$algo $(TARBALL_DIR)/$(TARBALL) | awk '{print $$NF}') ; \
- echo "s|@$$algo@|$$checksum|g" ; \
- done >checksums.sed
+ ./make-checksums.sh $(TARBALL_SRC) $(TARBALL_BIN) > checksums.sed
sed -e "s|@VERSION@|$(VERSION)|g;s|@BASE_URL@|$(REAL_WEB_URL)|g" \
-f checksums.sed <$^ >$@
rm checksums.sed
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index e37a45b387..ce6b1e34a7 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -17,13 +17,19 @@ long_description \
homepage @BASE_URL@
master_sites @BASE_URL@releases/rabbitmq-server/v${version}/
+distfiles ${name}-${version}${extract.suffix} \
+ ${name}-generic-unix-${version}${extract.suffix}
+
checksums \
- md5 @md5@ \
- sha1 @sha1@ \
- rmd160 @rmd160@
+ ${name}-${version}${extract.suffix} md5 @md5-src@ \
+ ${name}-${version}${extract.suffix} sha1 @sha1-src@ \
+ ${name}-${version}${extract.suffix} rmd160 @rmd160-src@ \
+ ${name}-generic-unix-${version}${extract.suffix} md5 @md5-bin@ \
+ ${name}-generic-unix-${version}${extract.suffix} sha1 @sha1-bin@ \
+ ${name}-generic-unix-${version}${extract.suffix} rmd160 @rmd160-bin@
depends_lib port:erlang
-depends_build port:xmlto port:libxslt
+depends_build port:libxslt
platform darwin 7 {
depends_build-append port:py25-simplejson
@@ -49,11 +55,15 @@ set plistloc ${prefix}/etc/LaunchDaemons/org.macports.rabbitmq-server
set sbindir ${destroot}${prefix}/lib/rabbitmq/bin
set wrappersbin ${destroot}${prefix}/sbin
set realsbin ${destroot}${prefix}/lib/rabbitmq/lib/rabbitmq_server-${version}/sbin
+set mansrc ${workpath}/rabbitmq_server-${version}/share/man
+set mandest ${destroot}${prefix}/share/man
use_configure no
use_parallel_build yes
+destroot.target install_bin
+
destroot.destdir \
TARGET_DIR=${destroot}${prefix}/lib/rabbitmq/lib/rabbitmq_server-${version} \
SBIN_DIR=${sbindir} \
@@ -93,6 +103,11 @@ post-destroot {
${wrappersbin}/rabbitmq-multi
file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server
file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl
+
+ file copy ${mansrc}/man1/rabbitmq-multi.1.gz ${mandest}/man1/
+ file copy ${mansrc}/man1/rabbitmq-server.1.gz ${mandest}/man1/
+ file copy ${mansrc}/man1/rabbitmqctl.1.gz ${mandest}/man1/
+ file copy ${mansrc}/man5/rabbitmq.conf.5.gz ${mandest}/man5/
}
pre-install {
diff --git a/packaging/macports/make-checksums.sh b/packaging/macports/make-checksums.sh
new file mode 100755
index 0000000000..11424dfcba
--- /dev/null
+++ b/packaging/macports/make-checksums.sh
@@ -0,0 +1,14 @@
+#!/bin/bash
+# NB: this script requires bash
+tarball_src=$1
+tarball_bin=$2
+for type in src bin
+do
+ tarball_var=tarball_${type}
+ tarball=${!tarball_var}
+ for algo in md5 sha1 rmd160
+ do
+ checksum=$(openssl $algo ${tarball} | awk '{print $NF}')
+ echo "s|@$algo-$type@|$checksum|g"
+ done
+done
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 8c36a9f0a4..a1dd2c2e40 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -301,49 +301,38 @@ run_boot_step({StepName, Attributes}) ->
ok
end.
-module_attributes(Module) ->
- case catch Module:module_info(attributes) of
- {'EXIT', {undef, [{Module, module_info, _} | _]}} ->
- io:format("WARNING: module ~p not found, so not scanned for boot steps.~n",
- [Module]),
- [];
- {'EXIT', Reason} ->
- exit(Reason);
- V ->
- V
- end.
-
boot_steps() ->
- AllApps = [App || {App, _, _} <- application:loaded_applications()],
- Modules = lists:usort(
- lists:append([Modules
- || {ok, Modules} <-
- [application:get_key(App, modules)
- || App <- AllApps]])),
- UnsortedSteps =
- lists:flatmap(fun (Module) ->
- [{StepName, Attributes}
- || {rabbit_boot_step, [{StepName, Attributes}]}
- <- module_attributes(Module)]
- end, Modules),
- sort_boot_steps(UnsortedSteps).
+ sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)).
+
+vertices(_Module, Steps) ->
+ [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps].
+
+edges(_Module, Steps) ->
+ [case Key of
+ requires -> {StepName, OtherStep};
+ enables -> {OtherStep, StepName}
+ end || {StepName, Atts} <- Steps,
+ {Key, OtherStep} <- Atts,
+ Key =:= requires orelse Key =:= enables].
+
+graph_build_error({vertex, duplicate, StepName}) ->
+ boot_error("Duplicate boot step name: ~w~n", [StepName]);
+graph_build_error({edge, Reason, From, To}) ->
+ boot_error(
+ "Could not add boot step dependency of ~w on ~w:~n~s",
+ [To, From,
+ case Reason of
+ {bad_vertex, V} ->
+ io_lib:format("Boot step not registered: ~w~n", [V]);
+ {bad_edge, [First | Rest]} ->
+ [io_lib:format("Cyclic dependency: ~w", [First]),
+ [io_lib:format(" depends on ~w", [Next]) || Next <- Rest],
+ io_lib:format(" depends on ~w~n", [First])]
+ end]).
sort_boot_steps(UnsortedSteps) ->
- G = digraph:new([acyclic]),
-
- %% Add vertices, with duplicate checking.
- [case digraph:vertex(G, StepName) of
- false -> digraph:add_vertex(G, StepName, Step);
- _ -> boot_error("Duplicate boot step name: ~w~n", [StepName])
- end || Step = {StepName, _Attrs} <- UnsortedSteps],
-
- %% Add edges, detecting cycles and missing vertices.
- lists:foreach(fun ({StepName, Attributes}) ->
- [add_boot_step_dep(G, StepName, PrecedingStepName)
- || {requires, PrecedingStepName} <- Attributes],
- [add_boot_step_dep(G, SucceedingStepName, StepName)
- || {enables, SucceedingStepName} <- Attributes]
- end, UnsortedSteps),
+ G = rabbit_misc:build_acyclic_graph(
+ fun vertices/2, fun edges/2, fun graph_build_error/1, UnsortedSteps),
%% Use topological sort to find a consistent ordering (if there is
%% one, otherwise fail).
@@ -365,24 +354,6 @@ sort_boot_steps(UnsortedSteps) ->
[MissingFunctions])
end.
-add_boot_step_dep(G, RunsSecond, RunsFirst) ->
- case digraph:add_edge(G, RunsSecond, RunsFirst) of
- {error, Reason} ->
- boot_error("Could not add boot step dependency of ~w on ~w:~n~s",
- [RunsSecond, RunsFirst,
- case Reason of
- {bad_vertex, V} ->
- io_lib:format("Boot step not registered: ~w~n", [V]);
- {bad_edge, [First | Rest]} ->
- [io_lib:format("Cyclic dependency: ~w", [First]),
- [io_lib:format(" depends on ~w", [Next])
- || Next <- Rest],
- io_lib:format(" depends on ~w~n", [First])]
- end]);
- _ ->
- ok
- end.
-
%%---------------------------------------------------------------------------
log_location(Type) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fe2c975b4e..75f285dffb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -315,6 +315,25 @@ ch_record(ChPid) ->
store_ch_record(C = #cr{ch_pid = ChPid}) ->
put({ch, ChPid}, C).
+maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount,
+ acktags = ChAckTags,
+ txn = Txn,
+ unsent_message_count = UnsentMessageCount}) ->
+ case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount, Txn} of
+ {0, 0, 0, none} -> ok = erase_ch_record(C),
+ false;
+ _ -> store_ch_record(C),
+ true
+ end.
+
+erase_ch_record(#cr{ch_pid = ChPid,
+ limiter_pid = LimiterPid,
+ monitor_ref = MonitorRef}) ->
+ ok = rabbit_limiter:unregister(LimiterPid, self()),
+ erlang:demonitor(MonitorRef),
+ erase({ch, ChPid}),
+ ok.
+
all_ch_record() ->
[C || {{ch, _}, C} <- get()].
@@ -361,7 +380,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
- store_ch_record(NewC),
+ true = maybe_store_ch_record(NewC),
{NewActiveConsumers, NewBlockedConsumers} =
case ch_record_state_transition(C, NewC) of
ok -> {queue:in(QEntry, ActiveConsumersTail),
@@ -380,7 +399,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
deliver_msgs_to_consumers(Funs, FunAcc1, State2);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
- store_ch_record(C#cr{is_limit_active = true}),
+ true = maybe_store_ch_record(C#cr{is_limit_active = true}),
{NewActiveConsumers, NewBlockedConsumers} =
move_consumers(ChPid,
ActiveConsumers,
@@ -479,7 +498,7 @@ possibly_unblock(State, ChPid, Update) ->
State;
C ->
NewC = Update(C),
- store_ch_record(NewC),
+ maybe_store_ch_record(NewC),
case ch_record_state_transition(C, NewC) of
ok -> State;
unblock -> {NewBlockedConsumers, NewActiveConsumers} =
@@ -500,10 +519,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(DownPid) of
not_found ->
{ok, State};
- #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
- acktags = ChAckTags} ->
- erlang:demonitor(MonitorRef),
- erase({ch, ChPid}),
+ C = #cr{ch_pid = ChPid, txn = Txn, acktags = ChAckTags} ->
+ ok = erase_ch_record(C),
State1 = State#q{
exclusive_consumer = case Holder of
{ChPid, _} -> none;
@@ -562,7 +579,7 @@ commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
%% by the channel.
C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
+ maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
State#q{backing_queue_state = BQS1}.
rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
@@ -772,8 +789,9 @@ handle_call({basic_get, ChPid, NoAck}, _From,
{{Message, IsDelivered, AckTag, Remaining}, State2} ->
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- store_ch_record(
- C#cr{acktags = sets:add_element(AckTag, ChAckTags)});
+ true = maybe_store_ch_record(
+ C#cr{acktags = sets:add_element(AckTag,
+ ChAckTags)});
false -> ok
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
@@ -791,8 +809,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
- store_ch_record(C#cr{consumer_count = ConsumerCount +1,
- limiter_pid = LimiterPid}),
+ true = maybe_store_ch_record(C#cr{consumer_count = ConsumerCount +1,
+ limiter_pid = LimiterPid}),
ok = case ConsumerCount of
0 -> rabbit_limiter:register(LimiterPid, self());
_ -> ok
@@ -826,12 +844,15 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} ->
- store_ch_record(C#cr{consumer_count = ConsumerCount - 1}),
- case ConsumerCount of
- 1 -> ok = rabbit_limiter:unregister(LimiterPid, self());
- _ -> ok
- end,
+ C = #cr{consumer_count = ConsumerCount,
+ limiter_pid = LimiterPid} ->
+ C1 = C#cr{consumer_count = ConsumerCount -1},
+ maybe_store_ch_record(
+ case ConsumerCount of
+ 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()),
+ C1#cr{limiter_pid = undefined};
+ _ -> C1
+ end),
ok = maybe_send_reply(ChPid, OkMsg),
NewState =
State#q{exclusive_consumer = cancel_holder(ChPid,
@@ -880,7 +901,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(State);
C = #cr{acktags = ChAckTags} ->
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- store_ch_record(C#cr{acktags = ChAckTags1}),
+ maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(requeue_and_run(AckTags, State))
end;
@@ -904,7 +925,7 @@ handle_cast({ack, Txn, AckTags, ChPid},
{C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)};
_ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)}
end,
- store_ch_record(C1),
+ maybe_store_ch_record(C1),
noreply(State#q{backing_queue_state = BQS1})
end;
@@ -915,7 +936,7 @@ handle_cast({reject, AckTags, Requeue, ChPid},
noreply(State);
C = #cr{acktags = ChAckTags} ->
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- store_ch_record(C#cr{acktags = ChAckTags1}),
+ maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
false -> BQS1 = BQ:ack(AckTags, BQS),
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index 495baeb240..ff3995b54a 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -66,7 +66,8 @@ start_link() ->
supervisor2:start_child(
SupPid,
{reader, {rabbit_reader, start_link,
- [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]},
+ [ChannelSupSupPid, Collector,
+ rabbit_heartbeat:start_heartbeat_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
@@ -78,22 +79,3 @@ reader(Pid) ->
init([]) ->
{ok, {{one_for_all, 0, 1}, []}}.
-start_heartbeat_fun(SupPid) ->
- fun (_Sock, 0) ->
- none;
- (Sock, TimeoutSec) ->
- Parent = self(),
- {ok, Sender} =
- supervisor2:start_child(
- SupPid, {heartbeat_sender,
- {rabbit_heartbeat, start_heartbeat_sender,
- [Parent, Sock, TimeoutSec]},
- transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
- {ok, Receiver} =
- supervisor2:start_child(
- SupPid, {heartbeat_receiver,
- {rabbit_heartbeat, start_heartbeat_receiver,
- [Parent, Sock, TimeoutSec]},
- transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
- {Sender, Receiver}
- end.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6b21274529..6c0a727bdf 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -346,8 +346,6 @@ format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] =
Value) when is_binary(TableEntryKey) andalso
is_atom(TableEntryType) ->
io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
-format_info_item([C|_] = Value) when is_number(C), C >= 32, C =< 255 ->
- Value;
format_info_item(Value) ->
io_lib:format("~w", [Value]).
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index a9945af1d4..589bf7cc9d 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -32,7 +32,7 @@
-module(rabbit_heartbeat).
-export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
- pause_monitor/1, resume_monitor/1]).
+ start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]).
-include("rabbit.hrl").
@@ -41,16 +41,28 @@
-ifdef(use_specs).
-export_type([heartbeaters/0]).
+-export_type([start_heartbeat_fun/0]).
--type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})).
+-type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}).
+
+-type(heartbeat_callback() :: fun (() -> any())).
+
+-type(start_heartbeat_fun() ::
+ fun((rabbit_net:socket(), non_neg_integer(), heartbeat_callback(),
+ non_neg_integer(), heartbeat_callback()) ->
+ no_return())).
-spec(start_heartbeat_sender/3 ::
- (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
rabbit_types:ok(pid())).
-spec(start_heartbeat_receiver/3 ::
- (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
rabbit_types:ok(pid())).
+-spec(start_heartbeat_fun/1 ::
+ (pid()) -> start_heartbeat_fun()).
+
+
-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok').
@@ -58,40 +70,60 @@
%%----------------------------------------------------------------------------
-start_heartbeat_sender(_Parent, Sock, TimeoutSec) ->
+start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
heartbeater(
{Sock, TimeoutSec * 1000 div 2, send_oct, 0,
fun () ->
- catch rabbit_net:send(
- Sock, rabbit_binary_generator:build_heartbeat_frame()),
+ SendFun(),
continue
end}).
-start_heartbeat_receiver(Parent, Sock, TimeoutSec) ->
+start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () ->
- Parent ! timeout,
+ ReceiveFun(),
stop
end}).
-pause_monitor(none) ->
+start_heartbeat_fun(SupPid) ->
+ fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
+ {ok, Sender} =
+ start_heartbeater(SendTimeoutSec, SupPid, Sock,
+ SendFun, heartbeat_sender,
+ start_heartbeat_sender),
+ {ok, Receiver} =
+ start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
+ ReceiveFun, heartbeat_receiver,
+ start_heartbeat_receiver),
+ {Sender, Receiver}
+ end.
+
+pause_monitor({_Sender, none}) ->
ok;
pause_monitor({_Sender, Receiver}) ->
Receiver ! pause,
ok.
-resume_monitor(none) ->
+resume_monitor({_Sender, none}) ->
ok;
resume_monitor({_Sender, Receiver}) ->
Receiver ! resume,
ok.
%%----------------------------------------------------------------------------
+start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) ->
+ {ok, none};
+start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) ->
+ supervisor2:start_child(
+ SupPid, {Name,
+ {rabbit_heartbeat, Callback,
+ [Sock, TimeoutSec, TimeoutFun]},
+ transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}).
heartbeater(Params) ->
{ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index d5c8bd49cb..0522afdc8d 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -64,6 +64,7 @@
-export([recursive_delete/1, dict_cons/3, orddict_cons/3,
unlink_and_capture_exit/1]).
-export([get_options/2]).
+-export([all_module_attributes/1, build_acyclic_graph/4]).
-export([now_ms/0]).
-import(mnesia).
@@ -83,6 +84,12 @@
-type(optdef() :: {flag, string()} | {option, string(), any()}).
-type(channel_or_connection_exit()
:: rabbit_types:channel_exit() | rabbit_types:connection_exit()).
+-type(digraph_label() :: term()).
+-type(graph_vertex_fun() ::
+ fun ((atom(), [term()]) -> {digraph:vertex(), digraph_label()})).
+-type(graph_edge_fun() ::
+ fun ((atom(), [term()]) -> {digraph:vertex(), digraph:vertex()})).
+-type(graph_error_fun() :: fun ((any()) -> any() | no_return())).
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
@@ -183,6 +190,10 @@
-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok').
-spec(get_options/2 :: ([optdef()], [string()])
-> {[string()], [{string(), any()}]}).
+-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
+-spec(build_acyclic_graph/4 :: (graph_vertex_fun(), graph_edge_fun(),
+ graph_error_fun(), [{atom(), [term()]}]) ->
+ digraph()).
-spec(now_ms/0 :: () -> non_neg_integer()).
-endif.
@@ -725,3 +736,45 @@ get_flag(_, []) ->
now_ms() ->
timer:now_diff(now(), {0,0,0}) div 1000.
+
+module_attributes(Module) ->
+ case catch Module:module_info(attributes) of
+ {'EXIT', {undef, [{Module, module_info, _} | _]}} ->
+ io:format("WARNING: module ~p not found, so not scanned for boot steps.~n",
+ [Module]),
+ [];
+ {'EXIT', Reason} ->
+ exit(Reason);
+ V ->
+ V
+ end.
+
+all_module_attributes(Name) ->
+ Modules =
+ lists:usort(
+ lists:append(
+ [Modules || {App, _, _} <- application:loaded_applications(),
+ {ok, Modules} <- [application:get_key(App, modules)]])),
+ lists:foldl(
+ fun (Module, Acc) ->
+ case lists:append([Atts || {N, Atts} <- module_attributes(Module),
+ N =:= Name]) of
+ [] -> Acc;
+ Atts -> [{Module, Atts} | Acc]
+ end
+ end, [], Modules).
+
+
+build_acyclic_graph(VertexFun, EdgeFun, ErrorFun, Graph) ->
+ G = digraph:new([acyclic]),
+ [ case digraph:vertex(G, Vertex) of
+ false -> digraph:add_vertex(G, Vertex, Label);
+ _ -> ErrorFun({vertex, duplicate, Vertex})
+ end || {Module, Atts} <- Graph,
+ {Vertex, Label} <- VertexFun(Module, Atts) ],
+ [ case digraph:add_edge(G, From, To) of
+ {error, E} -> ErrorFun({edge, E, From, To});
+ _ -> ok
+ end || {Module, Atts} <- Graph,
+ {From, To} <- EdgeFun(Module, Atts) ],
+ G.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 8de2f0d664..9d17226960 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -44,9 +44,6 @@
-include("rabbit.hrl").
--define(SCHEMA_VERSION_SET, []).
--define(SCHEMA_VERSION_FILENAME, "schema_version").
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -94,9 +91,6 @@ init() ->
ok = ensure_mnesia_running(),
ok = ensure_mnesia_dir(),
ok = init_db(read_cluster_nodes_config(), true),
- ok = rabbit_misc:write_term_file(filename:join(
- dir(), ?SCHEMA_VERSION_FILENAME),
- [?SCHEMA_VERSION_SET]),
ok.
is_db_empty() ->
@@ -256,12 +250,12 @@ ensure_mnesia_dir() ->
ensure_mnesia_running() ->
case mnesia:system_info(is_running) of
yes -> ok;
- no -> throw({error, mnesia_not_running})
+ no -> throw({error, mnesia_not_running})
end.
ensure_mnesia_not_running() ->
case mnesia:system_info(is_running) of
- no -> ok;
+ no -> ok;
yes -> throw({error, mnesia_unexpectedly_running})
end.
@@ -378,28 +372,31 @@ init_db(ClusterNodes, Force) ->
end;
_ -> ok
end,
- case Nodes of
- [] ->
- case mnesia:system_info(use_dir) of
- true ->
- case check_schema_integrity() of
- ok ->
- ok;
- {error, Reason} ->
- %% NB: we cannot use rabbit_log here since
- %% it may not have been started yet
- error_logger:warning_msg(
- "schema integrity check failed: ~p~n"
- "moving database to backup location "
- "and recreating schema from scratch~n",
- [Reason]),
- ok = move_db(),
- ok = create_schema()
- end;
- false ->
- ok = create_schema()
+ case {Nodes, mnesia:system_info(use_dir),
+ mnesia:system_info(db_nodes)} of
+ {[], true, [_]} ->
+ %% True single disc node, attempt upgrade
+ wait_for_tables(),
+ case rabbit_upgrade:maybe_upgrade() of
+ ok ->
+ schema_ok_or_exit();
+ version_not_available ->
+ schema_ok_or_move()
end;
- [_|_] ->
+ {[], true, _} ->
+ %% "Master" (i.e. without config) disc node in cluster,
+ %% verify schema
+ wait_for_tables(),
+ version_ok_or_exit(rabbit_upgrade:read_version()),
+ schema_ok_or_exit();
+ {[], false, _} ->
+ %% First RAM node in cluster, start from scratch
+ ok = create_schema();
+ {[AnotherNode|_], _, _} ->
+ %% Subsequent node in cluster, catch up
+ version_ok_or_exit(rabbit_upgrade:read_version()),
+ version_ok_or_exit(
+ rpc:call(AnotherNode, rabbit_upgrade, read_version, [])),
IsDiskNode = ClusterNodes == [] orelse
lists:member(node(), ClusterNodes),
ok = wait_for_replicated_tables(),
@@ -408,7 +405,7 @@ init_db(ClusterNodes, Force) ->
true -> disc;
false -> ram
end),
- ok = ensure_schema_integrity()
+ schema_ok_or_exit()
end;
{error, Reason} ->
%% one reason we may end up here is if we try to join
@@ -418,6 +415,39 @@ init_db(ClusterNodes, Force) ->
ClusterNodes, Reason}})
end.
+schema_ok_or_move() ->
+ case check_schema_integrity() of
+ ok ->
+ ok;
+ {error, Reason} ->
+ %% NB: we cannot use rabbit_log here since it may not have been
+ %% started yet
+ error_logger:warning_msg("schema integrity check failed: ~p~n"
+ "moving database to backup location "
+ "and recreating schema from scratch~n",
+ [Reason]),
+ ok = move_db(),
+ ok = create_schema()
+ end.
+
+version_ok_or_exit({ok, DiscVersion}) ->
+ case rabbit_upgrade:desired_version() of
+ DiscVersion ->
+ ok;
+ DesiredVersion ->
+ exit({schema_mismatch, DesiredVersion, DiscVersion})
+ end;
+version_ok_or_exit({error, _}) ->
+ ok = rabbit_upgrade:write_version().
+
+schema_ok_or_exit() ->
+ case check_schema_integrity() of
+ ok ->
+ ok;
+ {error, Reason} ->
+ exit({schema_invalid, Reason})
+ end.
+
create_schema() ->
mnesia:stop(),
rabbit_misc:ensure_ok(mnesia:create_schema([node()]),
@@ -426,7 +456,8 @@ create_schema() ->
cannot_start_mnesia),
ok = create_tables(),
ok = ensure_schema_integrity(),
- ok = wait_for_tables().
+ ok = wait_for_tables(),
+ ok = rabbit_upgrade:write_version().
move_db() ->
mnesia:stop(),
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 53d0d5cbf3..0940dce2f4 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -34,7 +34,7 @@
-export([async_recv/3, close/1, controlling_process/2,
getstat/2, peername/1, peercert/1, port_command/2,
- send/2, sockname/1]).
+ send/2, sockname/1, is_ssl/1]).
%%---------------------------------------------------------------------------
@@ -65,6 +65,7 @@
-spec(sockname/1 ::
(socket())
-> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
+-spec(is_ssl/1 :: (socket()) -> boolean()).
-spec(getstat/2 ::
(socket(), [stat_option()])
-> ok_val_or_error([{stat_option(), integer()}])).
@@ -133,3 +134,6 @@ sockname(Sock) when ?IS_SSL(Sock) ->
ssl:sockname(Sock#ssl_socket.ssl);
sockname(Sock) when is_port(Sock) ->
inet:sockname(Sock).
+
+is_ssl(Sock) ->
+ ?IS_SSL(Sock).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 7f7bd9d847..12730ccfeb 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -65,7 +65,7 @@
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
--define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port,
+-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl,
peer_cert_subject, peer_cert_issuer,
peer_cert_validity,
protocol, user, vhost, timeout, frame_max,
@@ -162,11 +162,7 @@
-ifdef(use_specs).
--type(start_heartbeat_fun() ::
- fun ((rabbit_net:socket(), non_neg_integer()) ->
- rabbit_heartbeat:heartbeaters())).
-
--spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) ->
+-spec(start_link/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) ->
rabbit_types:ok(pid())).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
@@ -177,9 +173,10 @@
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
%% These specs only exists to add no_return() to keep dialyzer happy
--spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()).
+-spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun())
+ -> no_return()).
-spec(start_connection/7 ::
- (pid(), pid(), pid(), start_heartbeat_fun(), any(),
+ (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(),
rabbit_net:socket(),
fun ((rabbit_net:socket()) ->
rabbit_types:ok_or_error2(
@@ -771,7 +768,19 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ?FRAME_MAX]);
true ->
- Heartbeater = SHF(Sock, ClientHeartbeat),
+ SendFun =
+ fun() ->
+ Frame = rabbit_binary_generator:build_heartbeat_frame(),
+ catch rabbit_net:send(Sock, Frame)
+ end,
+
+ Parent = self(),
+ ReceiveFun =
+ fun() ->
+ Parent ! timeout
+ end,
+ Heartbeater = SHF(Sock, ClientHeartbeat, SendFun,
+ ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
@@ -839,6 +848,8 @@ i(peer_address, #v1{sock = Sock}) ->
socket_info(fun rabbit_net:peername/1, fun ({A, _}) -> A end, Sock);
i(peer_port, #v1{sock = Sock}) ->
socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock);
+i(ssl, #v1{sock = Sock}) ->
+ rabbit_net:is_ssl(Sock);
i(peer_cert_issuer, #v1{sock = Sock}) ->
cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock);
i(peer_cert_subject, #v1{sock = Sock}) ->
@@ -889,7 +900,7 @@ cert_info(F, Sock) ->
case rabbit_net:peercert(Sock) of
nossl -> '';
{error, no_peercert} -> '';
- {ok, Cert} -> F(Cert)
+ {ok, Cert} -> list_to_binary(F(Cert))
end.
%%--------------------------------------------------------------------------
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
new file mode 100644
index 0000000000..0071a08ad7
--- /dev/null
+++ b/src/rabbit_upgrade.erl
@@ -0,0 +1,156 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are Rabbit Technologies Ltd.
+%%
+%% Copyright (C) 2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_upgrade).
+
+-export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]).
+
+-include("rabbit.hrl").
+
+-define(VERSION_FILENAME, "schema_version").
+-define(LOCK_FILENAME, "schema_upgrade_lock").
+
+%% -------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available').
+-spec(read_version/0 ::
+ () -> {'ok', [any()]} | rabbit_types:error(any())).
+-spec(write_version/0 :: () -> 'ok').
+-spec(desired_version/0 :: () -> [atom()]).
+
+-endif.
+
+%% -------------------------------------------------------------------
+
+%% Try to upgrade the schema. If no information on the existing schema
+%% could be found, do nothing. rabbit_mnesia:check_schema_integrity()
+%% will catch the problem.
+maybe_upgrade() ->
+ case read_version() of
+ {ok, CurrentHeads} ->
+ G = load_graph(),
+ case unknown_heads(CurrentHeads, G) of
+ [] ->
+ case upgrades_to_apply(CurrentHeads, G) of
+ [] -> ok;
+ Upgrades -> apply_upgrades(Upgrades)
+ end;
+ Unknown ->
+ exit({future_upgrades_found, Unknown})
+ end,
+ true = digraph:delete(G),
+ ok;
+ {error, enoent} ->
+ version_not_available
+ end.
+
+read_version() ->
+ case rabbit_misc:read_term_file(schema_filename()) of
+ {ok, [Heads]} -> {ok, Heads};
+ {error, E} -> {error, E}
+ end.
+
+write_version() ->
+ ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]),
+ ok.
+
+desired_version() ->
+ G = load_graph(),
+ Version = heads(G),
+ true = digraph:delete(G),
+ Version.
+
+%% -------------------------------------------------------------------
+
+load_graph() ->
+ Upgrades = rabbit_misc:all_module_attributes(rabbit_upgrade),
+ rabbit_misc:build_acyclic_graph(
+ fun vertices/2, fun edges/2, fun graph_build_error/1, Upgrades).
+
+vertices(Module, Steps) ->
+ [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps].
+
+edges(_Module, Steps) ->
+ [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires].
+
+graph_build_error({vertex, duplicate, StepName}) ->
+ exit({duplicate_upgrade, StepName});
+graph_build_error({edge, E, From, To}) ->
+ exit({E, From, To}).
+
+unknown_heads(Heads, G) ->
+ [H || H <- Heads, digraph:vertex(G, H) =:= false].
+
+upgrades_to_apply(Heads, G) ->
+ %% Take all the vertices which can reach the known heads. That's
+ %% everything we've already applied. Subtract that from all
+ %% vertices: that's what we have to apply.
+ Unsorted = sets:to_list(
+ sets:subtract(
+ sets:from_list(digraph:vertices(G)),
+ sets:from_list(digraph_utils:reaching(Heads, G)))),
+ %% Form a subgraph from that list and find a topological ordering
+ %% so we can invoke them in order.
+ [element(2, digraph:vertex(G, StepName))
+ || StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))].
+
+heads(G) ->
+ lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]).
+
+%% -------------------------------------------------------------------
+
+apply_upgrades(Upgrades) ->
+ LockFile = lock_filename(),
+ case file:open(LockFile, [write, exclusive]) of
+ {ok, Lock} ->
+ ok = file:close(Lock),
+ info("Upgrades: ~w to apply~n", [length(Upgrades)]),
+ [apply_upgrade(Upgrade) || Upgrade <- Upgrades],
+ info("Upgrades: All applied~n", []),
+ ok = write_version(),
+ ok = file:delete(LockFile);
+ {error, eexist} ->
+ exit(previous_upgrade_failed);
+ {error, _} = Error ->
+ exit(Error)
+ end.
+
+apply_upgrade({M, F}) ->
+ info("Upgrades: Applying ~w:~w~n", [M, F]),
+ ok = apply(M, F, []).
+
+%% -------------------------------------------------------------------
+
+schema_filename() ->
+ filename:join(dir(), ?VERSION_FILENAME).
+
+lock_filename() ->
+ filename:join(dir(), ?LOCK_FILENAME).
+
+%% NB: we cannot use rabbit_log here since it may not have been
+%% started yet
+info(Msg, Args) ->
+ error_logger:info_msg(Msg, Args).
+
+dir() ->
+ rabbit_mnesia:dir().
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
new file mode 100644
index 0000000000..59b8705df5
--- /dev/null
+++ b/src/rabbit_upgrade_functions.erl
@@ -0,0 +1,51 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are Rabbit Technologies Ltd.
+%%
+%% Copyright (C) 2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+-module(rabbit_upgrade_functions).
+
+-include("rabbit.hrl").
+
+-compile([export_all]).
+
+-rabbit_upgrade({remove_user_scope, []}).
+
+%% -------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(remove_user_scope/0 :: () -> 'ok').
+
+-endif.
+
+%%--------------------------------------------------------------------
+
+remove_user_scope() ->
+ {atomic, ok} = mnesia:transform_table(
+ rabbit_user_permission,
+ fun (Perm = #user_permission{
+ permission = {permission,
+ _Scope, Conf, Write, Read}}) ->
+ Perm#user_permission{
+ permission = #permission{configure = Conf,
+ write = Write,
+ read = Read}}
+ end,
+ record_info(fields, user_permission)),
+ ok.