diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-13 14:47:06 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-13 14:47:06 +0100 |
| commit | 3eed0f099b5753d5a8314d01f8ec030bb884a5b2 (patch) | |
| tree | 93ce835953679853947729a4672c55da29afec78 /src | |
| parent | d6b780f02a700e341295dd17013d3de37c460e13 (diff) | |
| parent | f2bc8a9f1da01ef620ad14d2d56b76c7e8be7e7d (diff) | |
| download | rabbitmq-server-git-3eed0f099b5753d5a8314d01f8ec030bb884a5b2.tar.gz | |
Merging bug 23159 into default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_binary_generator.erl | 47 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 17 |
6 files changed, 72 insertions, 61 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4e82ac9ba1..e8730b030e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -479,9 +479,8 @@ on_node_down(Node) -> ok. delete_queue(QueueName) -> - Post = rabbit_binding:remove_transient_for_queue(QueueName), ok = mnesia:delete({rabbit_queue, QueueName}), - Post. + rabbit_binding:remove_transient_for_queue(QueueName). pseudo_queue(QueueName, Pid) -> #amqqueue{name = QueueName, diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 056ab1b574..722573c769 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -47,6 +47,7 @@ -export([generate_table/1, encode_properties/2]). -export([check_empty_content_body_frame_size/0]). -export([ensure_content_encoded/2, clear_encoded_content/1]). +-export([map_exception/3]). -import(lists). @@ -74,6 +75,9 @@ rabbit_types:encoded_content()). -spec(clear_encoded_content/1 :: (rabbit_types:content()) -> rabbit_types:unencoded_content()). +-spec(map_exception/3 :: (non_neg_integer(), rabbit_types:amqp_error(), + rabbit_types:protocol()) -> + {boolean(), non_neg_integer(), rabbit_framing:amqp_method()}). -endif. @@ -306,3 +310,46 @@ clear_encoded_content(Content = #content{properties = none}) -> Content; clear_encoded_content(Content = #content{}) -> Content#content{properties_bin = none, protocol = none}. + +%% NB: this function is also used by the Erlang client +map_exception(Channel, Reason, Protocol) -> + {SuggestedClose, ReplyCode, ReplyText, FailedMethod} = + lookup_amqp_exception(Reason, Protocol), + ShouldClose = SuggestedClose orelse (Channel == 0), + {ClassId, MethodId} = case FailedMethod of + {_, _} -> FailedMethod; + none -> {0, 0}; + _ -> Protocol:method_id(FailedMethod) + end, + {CloseChannel, CloseMethod} = + case ShouldClose of + true -> {0, #'connection.close'{reply_code = ReplyCode, + reply_text = ReplyText, + class_id = ClassId, + method_id = MethodId}}; + false -> {Channel, #'channel.close'{reply_code = ReplyCode, + reply_text = ReplyText, + class_id = ClassId, + method_id = MethodId}} + end, + {ShouldClose, CloseChannel, CloseMethod}. + +lookup_amqp_exception(#amqp_error{name = Name, + explanation = Expl, + method = Method}, + Protocol) -> + {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name), + ExplBin = amqp_exception_explanation(Text, Expl), + {ShouldClose, Code, ExplBin, Method}; +lookup_amqp_exception(Other, Protocol) -> + rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), + {ShouldClose, Code, Text} = + Protocol:lookup_amqp_exception(internal_error, Protocol), + {ShouldClose, Code, Text, none}. + +amqp_exception_explanation(Text, Expl) -> + ExplBin = list_to_binary(Expl), + CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, + if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>; + true -> CompleteTextBin + end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index fe36cef923..64f84f348e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -877,14 +877,14 @@ handle_method(#'channel.flow'{active = false}, _, undefined -> start_limiter(State); Other -> Other end, + State1 = State#ch{limiter_pid = LimiterPid1}, ok = rabbit_limiter:block(LimiterPid1), - QPids = consumer_queues(Consumers), - Queues = [{QPid, erlang:monitor(process, QPid)} || QPid <- QPids], - ok = rabbit_amqqueue:flush_all(QPids, self()), - case Queues of - [] -> {reply, #'channel.flow_ok'{active = false}, State}; - _ -> {noreply, State#ch{limiter_pid = LimiterPid1, - blocking = dict:from_list(Queues)}} + case consumer_queues(Consumers) of + [] -> {reply, #'channel.flow_ok'{active = false}, State1}; + QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} || + QPid <- QPids], + ok = rabbit_amqqueue:flush_all(QPids, self()), + {noreply, State1#ch{blocking = dict:from_list(Queues)}} end; handle_method(_MethodRecord, _Content, _State) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index a321488897..d35adf1628 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -44,6 +44,9 @@ -include("rabbit.hrl"). +-define(SCHEMA_VERSION_SET, []). +-define(SCHEMA_VERSION_FILENAME, "schema_version"). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -91,6 +94,9 @@ init() -> ok = ensure_mnesia_running(), ok = ensure_mnesia_dir(), ok = init_db(read_cluster_nodes_config(), true), + ok = rabbit_misc:write_term_file(filename:join( + dir(), ?SCHEMA_VERSION_FILENAME), + [?SCHEMA_VERSION_SET]), ok. is_db_empty() -> @@ -241,7 +247,8 @@ ensure_mnesia_dir() -> case filelib:ensure_dir(MnesiaDir) of {error, Reason} -> throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}}); - ok -> ok + ok -> + ok end. ensure_mnesia_running() -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index ff0fb8f778..e500b111b8 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -906,7 +906,7 @@ handle_exception(State = #v1{connection_state = CS}, Channel, Reason) -> send_exception(State = #v1{connection = #connection{protocol = Protocol}}, Channel, Reason) -> {ShouldClose, CloseChannel, CloseMethod} = - map_exception(Channel, Reason, Protocol), + rabbit_binary_generator:map_exception(Channel, Reason, Protocol), NewState = case ShouldClose of true -> terminate_channels(), close_connection(State); @@ -916,47 +916,6 @@ send_exception(State = #v1{connection = #connection{protocol = Protocol}}, NewState#v1.sock, CloseChannel, CloseMethod, Protocol), NewState. -map_exception(Channel, Reason, Protocol) -> - {SuggestedClose, ReplyCode, ReplyText, FailedMethod} = - lookup_amqp_exception(Reason, Protocol), - ShouldClose = SuggestedClose or (Channel == 0), - {ClassId, MethodId} = case FailedMethod of - {_, _} -> FailedMethod; - none -> {0, 0}; - _ -> Protocol:method_id(FailedMethod) - end, - {CloseChannel, CloseMethod} = - case ShouldClose of - true -> {0, #'connection.close'{reply_code = ReplyCode, - reply_text = ReplyText, - class_id = ClassId, - method_id = MethodId}}; - false -> {Channel, #'channel.close'{reply_code = ReplyCode, - reply_text = ReplyText, - class_id = ClassId, - method_id = MethodId}} - end, - {ShouldClose, CloseChannel, CloseMethod}. - -lookup_amqp_exception(#amqp_error{name = Name, - explanation = Expl, - method = Method}, - Protocol) -> - {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name), - ExplBin = amqp_exception_explanation(Text, Expl), - {ShouldClose, Code, ExplBin, Method}; -lookup_amqp_exception(Other, Protocol) -> - rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), - {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error), - {ShouldClose, Code, Text, none}. - -amqp_exception_explanation(Text, Expl) -> - ExplBin = list_to_binary(Expl), - CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, - if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>; - true -> CompleteTextBin - end. - internal_emit_stats(State = #v1{stats_timer = StatsTimer}) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index bd57f73726..39eac07272 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -99,15 +99,6 @@ match_routing_key(Name, RoutingKey) -> _ = '_'}}, lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). -lookup_qpids(Queues) -> - lists:foldl( - fun (Key, Acc) -> - case mnesia:dirty_read({rabbit_queue, Key}) of - [#amqqueue{pid = QPid}] -> [QPid | Acc]; - [] -> Acc - end - end, [], lists:usort(Queues)). - %%-------------------------------------------------------------------- fold_deliveries({Pid, true},{_, Handled}) -> {true, [Pid|Handled]}; @@ -117,3 +108,11 @@ fold_deliveries({_, false},{_, Handled}) -> {true, Handled}. check_delivery(true, _ , {false, []}) -> {unroutable, []}; check_delivery(_ , true, {_ , []}) -> {not_delivered, []}; check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. + +lookup_qpids(QNames) -> + lists:foldl(fun (QName, QPids) -> + case mnesia:dirty_read({rabbit_queue, QName}) of + [#amqqueue{pid = QPid}] -> [QPid | QPids]; + [] -> QPids + end + end, [], lists:usort(QNames)). |
