diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2013-06-12 11:16:10 +0100 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2013-06-12 11:16:10 +0100 |
| commit | 6b2571b094a3aaca5dc04cb82870f5aff88d0fba (patch) | |
| tree | 7219162f17c0cc6317e6903c5a02d414e63a721c /src | |
| parent | cd4b0d8276cb1ce29d0f323f0c093ae87d8b43af (diff) | |
| parent | 4a7ee069f49ca0aaac2624fc408c332f17e953a7 (diff) | |
| download | rabbitmq-server-git-6b2571b094a3aaca5dc04cb82870f5aff88d0fba.tar.gz | |
Merged bug25581
Diffstat (limited to 'src')
| -rw-r--r-- | src/app_utils.erl | 2 | ||||
| -rw-r--r-- | src/priority_queue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_access_control.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_nodes.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_plugins.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_vm.erl | 2 |
16 files changed, 91 insertions, 42 deletions
diff --git a/src/app_utils.erl b/src/app_utils.erl index 8da436c016..b102ce7578 100644 --- a/src/app_utils.erl +++ b/src/app_utils.erl @@ -93,7 +93,7 @@ app_dependency_order(RootApps, StripUnreachable) -> %% Private API wait_for_application(Application) -> - case lists:keymember(Application, 1, application:which_applications()) of + case lists:keymember(Application, 1, rabbit_misc:which_applications()) of true -> ok; false -> timer:sleep(1000), wait_for_application(Application) diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 0dc198195e..18e1e8d984 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -51,7 +51,7 @@ -type(q() :: pqueue()). -type(priority() :: integer() | 'infinity'). --type(squeue() :: {queue, [any()], [any()]}). +-type(squeue() :: {queue, [any()], [any()], non_neg_integer()}). -type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}). -spec(new/0 :: () -> pqueue()). diff --git a/src/rabbit.erl b/src/rabbit.erl index 3cfa21ba9b..450a7f32d0 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -393,7 +393,7 @@ await_startup() -> status() -> S1 = [{pid, list_to_integer(os:getpid())}, - {running_applications, application:which_applications(infinity)}, + {running_applications, rabbit_misc:which_applications()}, {os, os:type()}, {erlang_version, erlang:system_info(system_version)}, {memory, rabbit_vm:memory()}], @@ -421,7 +421,7 @@ status() -> is_running() -> is_running(node()). -is_running(Node) -> rabbit_nodes:is_running(Node, rabbit). +is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit). environment() -> lists:keysort(1, [P || P = {K, _} <- application:get_all_env(rabbit), diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 5b92a5c3f4..b4179ec5aa 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -68,8 +68,11 @@ check_vhost_access(User = #user{ username = Username, auth_backend = Module }, VHostPath) -> check_access( fun() -> - rabbit_vhost:exists(VHostPath) andalso - Module:check_vhost_access(User, VHostPath) + %% TODO this could be an andalso shortcut under >R13A + case rabbit_vhost:exists(VHostPath) of + false -> false; + true -> Module:check_vhost_access(User, VHostPath) + end end, Module, "access to vhost '~s' refused for user '~s'", [VHostPath, Username]). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d2f4a178d5..8441bd3c55 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -898,6 +898,7 @@ make_dead_letter_msg(Msg = #basic_message{content = Content, end, ReasonBin = list_to_binary(atom_to_list(Reason)), TimeSec = rabbit_misc:now_ms() div 1000, + PerMsgTTL = per_msg_ttl_header(Content#content.properties), HeadersFun2 = fun (Headers) -> %% The first routing key is the one specified in the @@ -908,13 +909,25 @@ make_dead_letter_msg(Msg = #basic_message{content = Content, {<<"queue">>, longstr, QName}, {<<"time">>, timestamp, TimeSec}, {<<"exchange">>, longstr, Exchange#resource.name}, - {<<"routing-keys">>, array, RKs1}], + {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL, HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>, Info, Headers)) end, - Content1 = rabbit_basic:map_headers(HeadersFun2, Content), - Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(), - routing_keys = DeathRoutingKeys, content = Content1}. + Content1 = #content{properties = Props} = + rabbit_basic:map_headers(HeadersFun2, Content), + Content2 = Content1#content{properties = + Props#'P_basic'{expiration = undefined}}, + Msg#basic_message{exchange_name = DLX, + id = rabbit_guid:gen(), + routing_keys = DeathRoutingKeys, + content = Content2}. + +per_msg_ttl_header(#'P_basic'{expiration = undefined}) -> + []; +per_msg_ttl_header(#'P_basic'{expiration = Expiration}) -> + [{<<"original-expiration">>, longstr, Expiration}]; +per_msg_ttl_header(_) -> + []. now_micros() -> timer:now_diff(now(), {0,0,0}). @@ -1052,7 +1065,8 @@ handle_call({init, Recover}, From, gen_server2:reply(From, not_found), case Recover of new -> rabbit_log:warning( - "Queue ~p exclusive owner went away~n", [QName]); + "Queue ~p exclusive owner went away~n", + [rabbit_misc:rs(QName)]); _ -> ok end, BQS = bq_init(BQ, Q, Recover), diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 53144f3fa4..769c86c3af 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -35,8 +35,10 @@ {rabbit_types:username(), rabbit_types:password()}), rabbit_types:vhost(), rabbit_types:protocol(), pid(), rabbit_event:event_props()) -> - {'ok', {rabbit_types:user(), - rabbit_framing:amqp_table()}}). + rabbit_types:ok_or_error2( + {rabbit_types:user(), rabbit_framing:amqp_table()}, + 'broker_not_found_on_node' | 'auth_failure' | + 'access_refused')). -spec(start_channel/9 :: (rabbit_channel:channel_number(), pid(), pid(), string(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), @@ -76,21 +78,23 @@ connect(User = #user{}, VHost, Protocol, Pid, Infos) -> end; connect({Username, Password}, VHost, Protocol, Pid, Infos) -> - connect0(check_user_pass_login, Username, Password, VHost, Protocol, Pid, - Infos); + connect0(fun () -> rabbit_access_control:check_user_pass_login( + Username, Password) end, + VHost, Protocol, Pid, Infos); connect(Username, VHost, Protocol, Pid, Infos) -> - connect0(check_user_login, Username, [], VHost, Protocol, Pid, Infos). + connect0(fun () -> rabbit_access_control:check_user_login( + Username, []) end, + VHost, Protocol, Pid, Infos). -connect0(FunctionName, U, P, VHost, Protocol, Pid, Infos) -> +connect0(AuthFun, VHost, Protocol, Pid, Infos) -> case rabbit:is_running() of - true -> - case rabbit_access_control:FunctionName(U, P) of - {ok, User} -> connect(User, VHost, Protocol, Pid, Infos); - {refused, _M, _A} -> {error, auth_failure} - end; - false -> - {error, broker_not_found_on_node} + true -> case AuthFun() of + {ok, User} -> connect(User, VHost, Protocol, Pid, + Infos); + {refused, _M, _A} -> {error, auth_failure} + end; + false -> {error, broker_not_found_on_node} end. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 5607bfa9a4..529351a2bd 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -204,8 +204,6 @@ start_child(Name, MirrorNode, Q) -> report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> - rabbit_event:notify(queue_mirror_deaths, [{name, QueueName}, - {pids, DeadPids}]), rabbit_log:info("Mirrored-queue (~s): ~s ~s saw deaths of mirrors ~s~n", [rabbit_misc:rs(QueueName), case IsMaster of diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 964b0eb4f9..49ba94eea5 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -41,15 +41,13 @@ %%---------------------------------------------------------------------------- --define(CREATION_EVENT_KEYS, +-define(INFO_KEYS, [pid, name, master_pid, is_synchronised ]). --define(INFO_KEYS, ?CREATION_EVENT_KEYS). - -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(DEATH_TIMEOUT, 20000). %% 20 seconds @@ -124,8 +122,6 @@ init(Q = #amqqueue { name = QName }) -> depth_delta = undefined }, - rabbit_event:notify(queue_slave_created, - infos(?CREATION_EVENT_KEYS, State)), ok = gm:broadcast(GM, request_depth), {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index c36fb147c8..a1e95fd5ee 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -61,7 +61,7 @@ -export([multi_call/2]). -export([os_cmd/1]). -export([gb_sets_difference/2]). --export([version/0]). +-export([version/0, which_applications/0]). -export([sequence_error/1]). -export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]). -export([check_expiry/1]). @@ -232,6 +232,7 @@ -spec(os_cmd/1 :: (string()) -> string()). -spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()). -spec(version/0 :: () -> string()). +-spec(which_applications/0 :: () -> [{atom(), string(), string()}]). -spec(sequence_error/1 :: ([({'error', any()} | any())]) -> {'error', any()} | any()). -spec(json_encode/1 :: (any()) -> {'ok', string()} | {'error', any()}). @@ -985,6 +986,16 @@ version() -> {ok, VSN} = application:get_key(rabbit, vsn), VSN. +%% application:which_applications(infinity) is dangerous, since it can +%% cause deadlocks on shutdown. So we have to use a timeout variant, +%% but w/o creating spurious timeout errors. +which_applications() -> + try + application:which_applications() + catch + exit:{timeout, _} -> [] + end. + sequence_error([T]) -> T; sequence_error([{error, _} = Error | _]) -> Error; sequence_error([_ | Rest]) -> sequence_error(Rest). diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 7d844c72d2..7fcd1f9990 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -465,4 +465,4 @@ alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)]. alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)). alive_rabbit_nodes(Nodes) -> - [N || N <- alive_nodes(Nodes), rabbit_nodes:is_process_running(N, rabbit)]. + [N || N <- alive_nodes(Nodes), rabbit:is_running(N)]. diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 5640f12add..b85646d211 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -96,7 +96,7 @@ cookie_hash() -> base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))). is_running(Node, Application) -> - case rpc:call(Node, application, which_applications, [infinity]) of + case rpc:call(Node, rabbit_misc, which_applications, []) of {badrpc, _} -> false; Apps -> proplists:is_defined(Application, Apps) end. diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 58c906eb0a..6f6515b0e7 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -47,7 +47,7 @@ setup() -> active() -> {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), InstalledPlugins = [ P#plugin.name || P <- list(ExpandDir) ], - [App || {App, _, _} <- application:which_applications(), + [App || {App, _, _} <- rabbit_misc:which_applications(), lists:member(App, InstalledPlugins)]. %% @doc Get the list of plugins which are ready to be enabled. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 847d39c143..956412566a 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -1027,7 +1027,18 @@ journal_minus_segment1({no_pub, del, ack}, {?PUB, no_del, no_ack}) -> journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) -> {{no_pub, no_del, ack}, 0}; journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) -> - {undefined, -1}. + {undefined, -1}; + +%% Missing segment. If flush_journal/1 is interrupted after deleting +%% the segment but before truncating the journal we can get these +%% cases: a delivery and an acknowledgement in the journal, or just an +%% acknowledgement in the journal, but with no segment. In both cases +%% we have really forgotten the message; so ignore what's in the +%% journal. +journal_minus_segment1({no_pub, no_del, ack}, undefined) -> + {undefined, 0}; +journal_minus_segment1({no_pub, del, ack}, undefined) -> + {undefined, 0}. %%---------------------------------------------------------------------------- %% upgrade diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f32fe74086..21c54f3ebd 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2393,6 +2393,7 @@ test_variable_queue() -> fun test_variable_queue_ack_limiting/1, fun test_variable_queue_purge/1, fun test_variable_queue_requeue/1, + fun test_variable_queue_requeue_ram_beta/1, fun test_variable_queue_fold/1]], passed. @@ -2491,6 +2492,20 @@ test_variable_queue_requeue(VQ0) -> {empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2), VQ3. +%% requeue from ram_pending_ack into q3, move to delta and then empty queue +test_variable_queue_requeue_ram_beta(VQ0) -> + Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2, + VQ1 = rabbit_tests:variable_queue_publish(false, Count, VQ0), + {VQ2, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ1), + {Back, Front} = lists:split(Count div 2, AcksR), + {_, VQ3} = rabbit_variable_queue:requeue(erlang:tl(Back), VQ2), + VQ4 = rabbit_variable_queue:set_ram_duration_target(0, VQ3), + {_, VQ5} = rabbit_variable_queue:requeue([erlang:hd(Back)], VQ4), + VQ6 = requeue_one_by_one(Front, VQ5), + {VQ7, AcksAll} = variable_queue_fetch(Count, false, true, Count, VQ6), + {_, VQ8} = rabbit_variable_queue:ack(AcksAll, VQ7), + VQ8. + test_variable_queue_purge(VQ0) -> LenDepth = fun (VQ) -> {rabbit_variable_queue:len(VQ), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f7c6c7295a..5b39c2c61d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1363,11 +1363,8 @@ publish_alpha(MsgStatus, State) -> {MsgStatus, inc_ram_msg_count(State)}. publish_beta(MsgStatus, State) -> - {#msg_status { msg = Msg} = MsgStatus1, - #vqstate { ram_msg_count = RamMsgCount } = State1} = - maybe_write_to_disk(true, false, MsgStatus, State), - {MsgStatus1, State1 #vqstate { - ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}. + {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), + {m(trim_msg_status(MsgStatus1)), State1}. %% Rebuild queue, inserting sequence ids to maintain ordering queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index c28b0cd578..e97824b9ad 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -99,7 +99,7 @@ bytes(Words) -> Words * erlang:system_info(wordsize). plugin_sups() -> lists:append([plugin_sup(App) || - {App, _, _} <- application:which_applications(), + {App, _, _} <- rabbit_misc:which_applications(), is_plugin(atom_to_list(App))]). plugin_sup(App) -> |
