diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-07-02 15:38:36 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-07-02 15:38:36 +0100 |
| commit | 2de81da3082a0f9010c7e89001ea3df91d21da84 (patch) | |
| tree | ba69d21fddddfd313716bab94aaa9c93083bb38a /src | |
| parent | 99f80aea7d1afe8c0137dbbd3228f370cf82a91c (diff) | |
| parent | 010fc2a0cc744109eef8d6e8be34297518df5d4d (diff) | |
| download | rabbitmq-server-git-2de81da3082a0f9010c7e89001ea3df91d21da84.tar.gz | |
merge in from 21087. Behaviour is now broken because the timeout can get > 10seconds which means the memory_report timer will always fire and reset the timeout - thus the queue process will never hibernate.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_access_control.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 71 | ||||
| -rw-r--r-- | src/rabbit_error_logger.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_log.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 12 |
8 files changed, 132 insertions, 56 deletions
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 99b912ec09..6ff7a1046c 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -245,8 +245,8 @@ add_vhost(VHostPath) -> [{<<"">>, direct}, {<<"amq.direct">>, direct}, {<<"amq.topic">>, topic}, - {<<"amq.match">>, headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, headers}, %% per 0-9-1 xml + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml {<<"amq.fanout">>, fanout}]], ok; [_] -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2bd170a265..e847b34c6c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -36,7 +36,7 @@ -behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). --define(HIBERNATE_AFTER, 1000). +-define(HIBERNATE_AFTER_MIN, 1000). -define(MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds -export([start_link/1]). @@ -57,7 +57,9 @@ next_msg_id, active_consumers, blocked_consumers, - memory_report_timer + memory_report_timer, + hibernate_after, + hibernated_at }). -record(consumer, {tag, ack_required}). @@ -110,8 +112,10 @@ init(Q = #amqqueue { name = QName, durable = Durable }) -> next_msg_id = 1, active_consumers = queue:new(), blocked_consumers = queue:new(), - memory_report_timer = start_memory_timer() - }, ?HIBERNATE_AFTER}. + memory_report_timer = start_memory_timer(), + hibernate_after = ?HIBERNATE_AFTER_MIN, + hibernated_at = undefined + }, ?HIBERNATE_AFTER_MIN}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -130,14 +134,44 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- reply(Reply, NewState = #q { memory_report_timer = undefined }) -> - {reply, Reply, start_memory_timer(NewState), ?HIBERNATE_AFTER}; + reply(Reply, start_memory_timer(NewState)); +reply(Reply, NewState = #q { hibernated_at = undefined }) -> + {reply, Reply, NewState, NewState #q.hibernate_after}; reply(Reply, NewState) -> - {reply, Reply, NewState, ?HIBERNATE_AFTER}. + NewState1 = adjust_hibernate_after(NewState), + {reply, Reply, NewState1, NewState1 #q.hibernate_after}. noreply(NewState = #q { memory_report_timer = undefined }) -> - {noreply, start_memory_timer(NewState), ?HIBERNATE_AFTER}; + noreply(start_memory_timer(NewState)); +noreply(NewState = #q { hibernated_at = undefined }) -> + {noreply, NewState, NewState #q.hibernate_after}; noreply(NewState) -> - {noreply, NewState, ?HIBERNATE_AFTER}. + NewState1 = adjust_hibernate_after(NewState), + {noreply, NewState1, NewState1 #q.hibernate_after}. + +adjust_hibernate_after(State = #q { hibernated_at = undefined }) -> + State; +adjust_hibernate_after(State = #q { hibernated_at = Then, + hibernate_after = Timeout }) -> + State1 = State #q { hibernated_at = undefined }, + NapLengthMicros = timer:now_diff(now(), Then), + TimeoutMicros = Timeout * 1000, + LowTargetMicros = TimeoutMicros * 4, + HighTargetMicros = LowTargetMicros * 4, + if + NapLengthMicros < LowTargetMicros -> + %% nap was too short, don't go to sleep as soon + State1 #q { hibernate_after = Timeout * 2 }; + + NapLengthMicros > HighTargetMicros -> + %% nap was long, try going to sleep sooner + Timeout1 = lists:max([?HIBERNATE_AFTER_MIN, round(Timeout / 2)]), + State1 #q { hibernate_after = Timeout1 }; + + true -> + %% nap and timeout seem to be in the right relationship. stay here + State1 + end. start_memory_timer() -> {ok, TRef} = timer:apply_interval(?MEMORY_REPORT_TIME_INTERVAL, @@ -861,7 +895,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State) -> %% TODO: Once we drop support for R11B-5, we can change this to %% {noreply, State, hibernate}; - State1 = stop_memory_timer(report_memory(State)), + State1 = (stop_memory_timer(report_memory(State))) + #q { hibernated_at = now() }, proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]); handle_info(Info, State) -> diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 63d6a4815d..f70d6067b3 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -34,20 +34,29 @@ -include("rabbit_framing.hrl"). -export([publish/1, message/4, message/5, message/6, delivery/4]). +-export([properties/1, publish/4, publish/7]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(publish/1 :: (delivery()) -> - {ok, routing_result(), [pid()]} | not_found()). +-type(properties_input() :: (amqp_properties() | [{atom(), any()}])). +-type(publish_result() :: ({ok, routing_result(), [pid()]} | not_found())). + +-spec(publish/1 :: (delivery()) -> publish_result()). -spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()). --spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) -> - message()). --spec(message/5 :: (exchange_name(), routing_key(), binary(), binary(), - guid()) -> message()). --spec(message/6 :: (exchange_name(), routing_key(), binary(), binary(), - guid(), bool()) -> message()). +-spec(message/4 :: (exchange_name(), routing_key(), properties_input(), + binary()) -> message()). +-spec(message/5 :: (exchange_name(), routing_key(), properties_input(), + binary(), guid()) -> message()). +-spec(message/6 :: (exchange_name(), routing_key(), properties_input(), + binary(), guid(), bool()) -> message()). +-spec(properties/1 :: (properties_input()) -> amqp_properties()). +-spec(publish/4 :: (exchange_name(), routing_key(), properties_input(), + binary()) -> publish_result()). +-spec(publish/7 :: (exchange_name(), routing_key(), bool(), bool(), + maybe(txn()), properties_input(), binary()) -> + publish_result()). -endif. @@ -67,16 +76,17 @@ delivery(Mandatory, Immediate, Txn, Message) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, sender = self(), message = Message}. -message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> - message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, rabbit_guid:guid()). +message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> + message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, rabbit_guid:guid()). -message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, MsgId) -> - message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, MsgId, false). +message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId) -> + message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId, false). -message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, MsgId, IsPersistent) -> +message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId, IsPersistent) -> + Properties = properties(RawProperties), {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), Content = #content{class_id = ClassId, - properties = #'P_basic'{content_type = ContentTypeBin}, + properties = Properties, properties_bin = none, payload_fragments_rev = [BodyBin]}, #basic_message{exchange_name = ExchangeName, @@ -84,3 +94,36 @@ message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, MsgId, IsPersisten content = Content, guid = MsgId, is_persistent = IsPersistent}. + +properties(P = #'P_basic'{}) -> + P; +properties(P) when is_list(P) -> + %% Yes, this is O(length(P) * record_info(size, 'P_basic') / 2), + %% i.e. slow. Use the definition of 'P_basic' directly if + %% possible! + lists:foldl(fun ({Key, Value}, Acc) -> + case indexof(record_info(fields, 'P_basic'), Key) of + 0 -> throw({unknown_basic_property, Key}); + N -> setelement(N + 1, Acc, Value) + end + end, #'P_basic'{}, P). + +indexof(L, Element) -> indexof(L, Element, 1). + +indexof([], _Element, _N) -> 0; +indexof([Element | _Rest], Element, N) -> N; +indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1). + +%% Convenience function, for avoiding round-trips in calls across the +%% erlang distributed network. +publish(ExchangeName, RoutingKeyBin, Properties, BodyBin) -> + publish(ExchangeName, RoutingKeyBin, false, false, none, Properties, + BodyBin). + +%% Convenience function, for avoiding round-trips in calls across the +%% erlang distributed network. +publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, + BodyBin) -> + publish(delivery(Mandatory, Immediate, Txn, + message(ExchangeName, RoutingKeyBin, + properties(Properties), BodyBin))). diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 76016a8cb2..b28574b707 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -31,6 +31,7 @@ -module(rabbit_error_logger). -include("rabbit.hrl"). +-include("rabbit_framing.hrl"). -define(LOG_EXCH_NAME, <<"amq.rabbitmq.log">>). @@ -75,10 +76,7 @@ publish(_Other, _Format, _Data, _State) -> publish1(RoutingKey, Format, Data, LogExch) -> {ok, _RoutingRes, _DeliveredQPids} = - rabbit_basic:publish( - rabbit_basic:delivery( - false, false, none, - rabbit_basic:message( - LogExch, RoutingKey, <<"text/plain">>, - list_to_binary(io_lib:format(Format, Data))))), + rabbit_basic:publish(LogExch, RoutingKey, false, false, none, + #'P_basic'{content_type = <<"text/plain">>}, + list_to_binary(io_lib:format(Format, Data))), ok. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 7d9948f06f..8fb9eae304 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -235,9 +235,9 @@ route(X = #exchange{type = topic}, RoutingKey, _Content) -> route(X = #exchange{type = headers}, _RoutingKey, Content) -> Headers = case (Content#content.properties)#'P_basic'.headers of - undefined -> []; - H -> sort_arguments(H) - end, + undefined -> []; + H -> sort_arguments(H) + end, match_bindings(X, fun (#binding{args = Spec}) -> headers_match(Spec, Headers) end); @@ -489,14 +489,14 @@ parse_x_match(Other) -> %% headers_match(Pattern, Data) -> MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of - {value, {_, longstr, MK}} -> parse_x_match(MK); - {value, {_, Type, MK}} -> - rabbit_log:warning("Invalid x-match field type ~p " + {value, {_, longstr, MK}} -> parse_x_match(MK); + {value, {_, Type, MK}} -> + rabbit_log:warning("Invalid x-match field type ~p " "(value ~p); expected longstr", - [Type, MK]), - default_headers_match_kind(); - _ -> default_headers_match_kind() - end, + [Type, MK]), + default_headers_match_kind(); + _ -> default_headers_match_kind() + end, headers_match(Pattern, Data, true, false, MatchKind). headers_match([], _Data, AllMatch, _AnyMatch, all) -> @@ -523,8 +523,8 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], %% the corresponding data field. I've interpreted that to %% mean a type of "void" for the pattern field. PT == void -> {AllMatch, true}; - %% Similarly, it's not specified, but I assume that a - %% mismatched type causes a mismatched value. + %% Similarly, it's not specified, but I assume that a + %% mismatched type causes a mismatched value. PT =/= DT -> {false, AnyMatch}; PV == DV -> {AllMatch, true}; true -> {false, AnyMatch} diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index f408336e94..dd5b498b07 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -75,7 +75,7 @@ debug(Fmt, Args) when is_list(Args) -> message(Direction, Channel, MethodRecord, Content) -> gen_server:cast(?SERVER, - {message, Direction, Channel, MethodRecord, Content}). + {message, Direction, Channel, MethodRecord, Content}). info(Fmt) -> gen_server:cast(?SERVER, {info, Fmt}). @@ -112,11 +112,11 @@ handle_cast({debug, Fmt, Args}, State) -> {noreply, State}; handle_cast({message, Direction, Channel, MethodRecord, Content}, State) -> io:format("~s ch~p ~p~n", - [case Direction of - in -> "-->"; - out -> "<--" end, - Channel, - {MethodRecord, Content}]), + [case Direction of + in -> "-->"; + out -> "<--" end, + Channel, + {MethodRecord, Content}]), {noreply, State}; handle_cast({info, Fmt}, State) -> error_logger:info_msg(Fmt), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index ef8038e7aa..426b99eba1 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -231,7 +231,7 @@ start_connection(Parent, Deb, ClientSock) -> connection_state = pre_init}, handshake, 8)) catch - Ex -> (if Ex == connection_closed_abruptly -> + Ex -> (if Ex == connection_closed_abruptly -> fun rabbit_log:warning/2; true -> fun rabbit_log:error/2 diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 05a6393b23..0b70be0c9b 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -716,7 +716,7 @@ benchmark_disk_queue() -> passed. rdq_message(MsgId, MsgBody) -> - rabbit_basic:message(x, <<>>, <<>>, MsgBody, MsgId). + rabbit_basic:message(x, <<>>, [], MsgBody, MsgId). rdq_match_message( #basic_message { guid = MsgId, content = @@ -976,20 +976,20 @@ rdq_test_mixed_queue_modes() -> {ok, MS} = rabbit_mixed_queue:init(q, true, mixed), MS2 = lists:foldl( fun (_N, MS1) -> - Msg = rabbit_basic:message(x, <<>>, <<>>, Payload), + Msg = rabbit_basic:message(x, <<>>, [], Payload), {ok, MS1a} = rabbit_mixed_queue:publish(Msg, MS1), MS1a end, MS, lists:seq(1,10)), MS4 = lists:foldl( fun (_N, MS3) -> - Msg = (rabbit_basic:message(x, <<>>, <<>>, Payload)) + Msg = (rabbit_basic:message(x, <<>>, [], Payload)) #basic_message { is_persistent = true }, {ok, MS3a} = rabbit_mixed_queue:publish(Msg, MS3), MS3a end, MS2, lists:seq(1,10)), MS6 = lists:foldl( fun (_N, MS5) -> - Msg = rabbit_basic:message(x, <<>>, <<>>, Payload), + Msg = rabbit_basic:message(x, <<>>, [], Payload), {ok, MS5a} = rabbit_mixed_queue:publish(Msg, MS5), MS5a end, MS4, lists:seq(1,10)), @@ -1050,11 +1050,11 @@ rdq_test_mixed_queue_modes() -> rdq_test_mode_conversion_mid_txn() -> Payload = <<0:(8*256)>>, MsgIdsA = lists:seq(0,9), - MsgsA = [ rabbit_basic:message(x, <<>>, <<>>, Payload, MsgId, + MsgsA = [ rabbit_basic:message(x, <<>>, [], Payload, MsgId, (0 == MsgId rem 2)) || MsgId <- MsgIdsA ], MsgIdsB = lists:seq(10,20), - MsgsB = [ rabbit_basic:message(x, <<>>, <<>>, Payload, MsgId, + MsgsB = [ rabbit_basic:message(x, <<>>, [], Payload, MsgId, (0 == MsgId rem 2)) || MsgId <- MsgIdsB ], |
