summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-06-12 11:16:10 +0100
committerEmile Joubert <emile@rabbitmq.com>2013-06-12 11:16:10 +0100
commit6b2571b094a3aaca5dc04cb82870f5aff88d0fba (patch)
tree7219162f17c0cc6317e6903c5a02d414e63a721c /src
parentcd4b0d8276cb1ce29d0f323f0c093ae87d8b43af (diff)
parent4a7ee069f49ca0aaac2624fc408c332f17e953a7 (diff)
downloadrabbitmq-server-git-6b2571b094a3aaca5dc04cb82870f5aff88d0fba.tar.gz
Merged bug25581
Diffstat (limited to 'src')
-rw-r--r--src/app_utils.erl2
-rw-r--r--src/priority_queue.erl2
-rw-r--r--src/rabbit.erl4
-rw-r--r--src/rabbit_access_control.erl7
-rw-r--r--src/rabbit_amqqueue_process.erl24
-rw-r--r--src/rabbit_direct.erl30
-rw-r--r--src/rabbit_mirror_queue_misc.erl2
-rw-r--r--src/rabbit_mirror_queue_slave.erl6
-rw-r--r--src/rabbit_misc.erl13
-rw-r--r--src/rabbit_node_monitor.erl2
-rw-r--r--src/rabbit_nodes.erl2
-rw-r--r--src/rabbit_plugins.erl2
-rw-r--r--src/rabbit_queue_index.erl13
-rw-r--r--src/rabbit_tests.erl15
-rw-r--r--src/rabbit_variable_queue.erl7
-rw-r--r--src/rabbit_vm.erl2
16 files changed, 91 insertions, 42 deletions
diff --git a/src/app_utils.erl b/src/app_utils.erl
index 8da436c016..b102ce7578 100644
--- a/src/app_utils.erl
+++ b/src/app_utils.erl
@@ -93,7 +93,7 @@ app_dependency_order(RootApps, StripUnreachable) ->
%% Private API
wait_for_application(Application) ->
- case lists:keymember(Application, 1, application:which_applications()) of
+ case lists:keymember(Application, 1, rabbit_misc:which_applications()) of
true -> ok;
false -> timer:sleep(1000),
wait_for_application(Application)
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 0dc198195e..18e1e8d984 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -51,7 +51,7 @@
-type(q() :: pqueue()).
-type(priority() :: integer() | 'infinity').
--type(squeue() :: {queue, [any()], [any()]}).
+-type(squeue() :: {queue, [any()], [any()], non_neg_integer()}).
-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
-spec(new/0 :: () -> pqueue()).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 3cfa21ba9b..450a7f32d0 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -393,7 +393,7 @@ await_startup() ->
status() ->
S1 = [{pid, list_to_integer(os:getpid())},
- {running_applications, application:which_applications(infinity)},
+ {running_applications, rabbit_misc:which_applications()},
{os, os:type()},
{erlang_version, erlang:system_info(system_version)},
{memory, rabbit_vm:memory()}],
@@ -421,7 +421,7 @@ status() ->
is_running() -> is_running(node()).
-is_running(Node) -> rabbit_nodes:is_running(Node, rabbit).
+is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit).
environment() ->
lists:keysort(1, [P || P = {K, _} <- application:get_all_env(rabbit),
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 5b92a5c3f4..b4179ec5aa 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -68,8 +68,11 @@ check_vhost_access(User = #user{ username = Username,
auth_backend = Module }, VHostPath) ->
check_access(
fun() ->
- rabbit_vhost:exists(VHostPath) andalso
- Module:check_vhost_access(User, VHostPath)
+ %% TODO this could be an andalso shortcut under >R13A
+ case rabbit_vhost:exists(VHostPath) of
+ false -> false;
+ true -> Module:check_vhost_access(User, VHostPath)
+ end
end,
Module, "access to vhost '~s' refused for user '~s'",
[VHostPath, Username]).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d2f4a178d5..8441bd3c55 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -898,6 +898,7 @@ make_dead_letter_msg(Msg = #basic_message{content = Content,
end,
ReasonBin = list_to_binary(atom_to_list(Reason)),
TimeSec = rabbit_misc:now_ms() div 1000,
+ PerMsgTTL = per_msg_ttl_header(Content#content.properties),
HeadersFun2 =
fun (Headers) ->
%% The first routing key is the one specified in the
@@ -908,13 +909,25 @@ make_dead_letter_msg(Msg = #basic_message{content = Content,
{<<"queue">>, longstr, QName},
{<<"time">>, timestamp, TimeSec},
{<<"exchange">>, longstr, Exchange#resource.name},
- {<<"routing-keys">>, array, RKs1}],
+ {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL,
HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>,
Info, Headers))
end,
- Content1 = rabbit_basic:map_headers(HeadersFun2, Content),
- Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(),
- routing_keys = DeathRoutingKeys, content = Content1}.
+ Content1 = #content{properties = Props} =
+ rabbit_basic:map_headers(HeadersFun2, Content),
+ Content2 = Content1#content{properties =
+ Props#'P_basic'{expiration = undefined}},
+ Msg#basic_message{exchange_name = DLX,
+ id = rabbit_guid:gen(),
+ routing_keys = DeathRoutingKeys,
+ content = Content2}.
+
+per_msg_ttl_header(#'P_basic'{expiration = undefined}) ->
+ [];
+per_msg_ttl_header(#'P_basic'{expiration = Expiration}) ->
+ [{<<"original-expiration">>, longstr, Expiration}];
+per_msg_ttl_header(_) ->
+ [].
now_micros() -> timer:now_diff(now(), {0,0,0}).
@@ -1052,7 +1065,8 @@ handle_call({init, Recover}, From,
gen_server2:reply(From, not_found),
case Recover of
new -> rabbit_log:warning(
- "Queue ~p exclusive owner went away~n", [QName]);
+ "Queue ~p exclusive owner went away~n",
+ [rabbit_misc:rs(QName)]);
_ -> ok
end,
BQS = bq_init(BQ, Q, Recover),
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 53144f3fa4..769c86c3af 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -35,8 +35,10 @@
{rabbit_types:username(), rabbit_types:password()}),
rabbit_types:vhost(), rabbit_types:protocol(), pid(),
rabbit_event:event_props()) ->
- {'ok', {rabbit_types:user(),
- rabbit_framing:amqp_table()}}).
+ rabbit_types:ok_or_error2(
+ {rabbit_types:user(), rabbit_framing:amqp_table()},
+ 'broker_not_found_on_node' | 'auth_failure' |
+ 'access_refused')).
-spec(start_channel/9 ::
(rabbit_channel:channel_number(), pid(), pid(), string(),
rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
@@ -76,21 +78,23 @@ connect(User = #user{}, VHost, Protocol, Pid, Infos) ->
end;
connect({Username, Password}, VHost, Protocol, Pid, Infos) ->
- connect0(check_user_pass_login, Username, Password, VHost, Protocol, Pid,
- Infos);
+ connect0(fun () -> rabbit_access_control:check_user_pass_login(
+ Username, Password) end,
+ VHost, Protocol, Pid, Infos);
connect(Username, VHost, Protocol, Pid, Infos) ->
- connect0(check_user_login, Username, [], VHost, Protocol, Pid, Infos).
+ connect0(fun () -> rabbit_access_control:check_user_login(
+ Username, []) end,
+ VHost, Protocol, Pid, Infos).
-connect0(FunctionName, U, P, VHost, Protocol, Pid, Infos) ->
+connect0(AuthFun, VHost, Protocol, Pid, Infos) ->
case rabbit:is_running() of
- true ->
- case rabbit_access_control:FunctionName(U, P) of
- {ok, User} -> connect(User, VHost, Protocol, Pid, Infos);
- {refused, _M, _A} -> {error, auth_failure}
- end;
- false ->
- {error, broker_not_found_on_node}
+ true -> case AuthFun() of
+ {ok, User} -> connect(User, VHost, Protocol, Pid,
+ Infos);
+ {refused, _M, _A} -> {error, auth_failure}
+ end;
+ false -> {error, broker_not_found_on_node}
end.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 5607bfa9a4..529351a2bd 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -204,8 +204,6 @@ start_child(Name, MirrorNode, Q) ->
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
ok;
report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) ->
- rabbit_event:notify(queue_mirror_deaths, [{name, QueueName},
- {pids, DeadPids}]),
rabbit_log:info("Mirrored-queue (~s): ~s ~s saw deaths of mirrors ~s~n",
[rabbit_misc:rs(QueueName),
case IsMaster of
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 964b0eb4f9..49ba94eea5 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -41,15 +41,13 @@
%%----------------------------------------------------------------------------
--define(CREATION_EVENT_KEYS,
+-define(INFO_KEYS,
[pid,
name,
master_pid,
is_synchronised
]).
--define(INFO_KEYS, ?CREATION_EVENT_KEYS).
-
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(DEATH_TIMEOUT, 20000). %% 20 seconds
@@ -124,8 +122,6 @@ init(Q = #amqqueue { name = QName }) ->
depth_delta = undefined
},
- rabbit_event:notify(queue_slave_created,
- infos(?CREATION_EVENT_KEYS, State)),
ok = gm:broadcast(GM, request_depth),
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index c36fb147c8..a1e95fd5ee 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -61,7 +61,7 @@
-export([multi_call/2]).
-export([os_cmd/1]).
-export([gb_sets_difference/2]).
--export([version/0]).
+-export([version/0, which_applications/0]).
-export([sequence_error/1]).
-export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]).
-export([check_expiry/1]).
@@ -232,6 +232,7 @@
-spec(os_cmd/1 :: (string()) -> string()).
-spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()).
-spec(version/0 :: () -> string()).
+-spec(which_applications/0 :: () -> [{atom(), string(), string()}]).
-spec(sequence_error/1 :: ([({'error', any()} | any())])
-> {'error', any()} | any()).
-spec(json_encode/1 :: (any()) -> {'ok', string()} | {'error', any()}).
@@ -985,6 +986,16 @@ version() ->
{ok, VSN} = application:get_key(rabbit, vsn),
VSN.
+%% application:which_applications(infinity) is dangerous, since it can
+%% cause deadlocks on shutdown. So we have to use a timeout variant,
+%% but w/o creating spurious timeout errors.
+which_applications() ->
+ try
+ application:which_applications()
+ catch
+ exit:{timeout, _} -> []
+ end.
+
sequence_error([T]) -> T;
sequence_error([{error, _} = Error | _]) -> Error;
sequence_error([_ | Rest]) -> sequence_error(Rest).
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 7d844c72d2..7fcd1f9990 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -465,4 +465,4 @@ alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)].
alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)).
alive_rabbit_nodes(Nodes) ->
- [N || N <- alive_nodes(Nodes), rabbit_nodes:is_process_running(N, rabbit)].
+ [N || N <- alive_nodes(Nodes), rabbit:is_running(N)].
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 5640f12add..b85646d211 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -96,7 +96,7 @@ cookie_hash() ->
base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))).
is_running(Node, Application) ->
- case rpc:call(Node, application, which_applications, [infinity]) of
+ case rpc:call(Node, rabbit_misc, which_applications, []) of
{badrpc, _} -> false;
Apps -> proplists:is_defined(Application, Apps)
end.
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 58c906eb0a..6f6515b0e7 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -47,7 +47,7 @@ setup() ->
active() ->
{ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
InstalledPlugins = [ P#plugin.name || P <- list(ExpandDir) ],
- [App || {App, _, _} <- application:which_applications(),
+ [App || {App, _, _} <- rabbit_misc:which_applications(),
lists:member(App, InstalledPlugins)].
%% @doc Get the list of plugins which are ready to be enabled.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 847d39c143..956412566a 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -1027,7 +1027,18 @@ journal_minus_segment1({no_pub, del, ack}, {?PUB, no_del, no_ack}) ->
journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) ->
{{no_pub, no_del, ack}, 0};
journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) ->
- {undefined, -1}.
+ {undefined, -1};
+
+%% Missing segment. If flush_journal/1 is interrupted after deleting
+%% the segment but before truncating the journal we can get these
+%% cases: a delivery and an acknowledgement in the journal, or just an
+%% acknowledgement in the journal, but with no segment. In both cases
+%% we have really forgotten the message; so ignore what's in the
+%% journal.
+journal_minus_segment1({no_pub, no_del, ack}, undefined) ->
+ {undefined, 0};
+journal_minus_segment1({no_pub, del, ack}, undefined) ->
+ {undefined, 0}.
%%----------------------------------------------------------------------------
%% upgrade
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index f32fe74086..21c54f3ebd 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2393,6 +2393,7 @@ test_variable_queue() ->
fun test_variable_queue_ack_limiting/1,
fun test_variable_queue_purge/1,
fun test_variable_queue_requeue/1,
+ fun test_variable_queue_requeue_ram_beta/1,
fun test_variable_queue_fold/1]],
passed.
@@ -2491,6 +2492,20 @@ test_variable_queue_requeue(VQ0) ->
{empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2),
VQ3.
+%% requeue from ram_pending_ack into q3, move to delta and then empty queue
+test_variable_queue_requeue_ram_beta(VQ0) ->
+ Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2,
+ VQ1 = rabbit_tests:variable_queue_publish(false, Count, VQ0),
+ {VQ2, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ1),
+ {Back, Front} = lists:split(Count div 2, AcksR),
+ {_, VQ3} = rabbit_variable_queue:requeue(erlang:tl(Back), VQ2),
+ VQ4 = rabbit_variable_queue:set_ram_duration_target(0, VQ3),
+ {_, VQ5} = rabbit_variable_queue:requeue([erlang:hd(Back)], VQ4),
+ VQ6 = requeue_one_by_one(Front, VQ5),
+ {VQ7, AcksAll} = variable_queue_fetch(Count, false, true, Count, VQ6),
+ {_, VQ8} = rabbit_variable_queue:ack(AcksAll, VQ7),
+ VQ8.
+
test_variable_queue_purge(VQ0) ->
LenDepth = fun (VQ) ->
{rabbit_variable_queue:len(VQ),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f7c6c7295a..5b39c2c61d 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1363,11 +1363,8 @@ publish_alpha(MsgStatus, State) ->
{MsgStatus, inc_ram_msg_count(State)}.
publish_beta(MsgStatus, State) ->
- {#msg_status { msg = Msg} = MsgStatus1,
- #vqstate { ram_msg_count = RamMsgCount } = State1} =
- maybe_write_to_disk(true, false, MsgStatus, State),
- {MsgStatus1, State1 #vqstate {
- ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}.
+ {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
+ {m(trim_msg_status(MsgStatus1)), State1}.
%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index c28b0cd578..e97824b9ad 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -99,7 +99,7 @@ bytes(Words) -> Words * erlang:system_info(wordsize).
plugin_sups() ->
lists:append([plugin_sup(App) ||
- {App, _, _} <- application:which_applications(),
+ {App, _, _} <- rabbit_misc:which_applications(),
is_plugin(atom_to_list(App))]).
plugin_sup(App) ->