summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-02 15:38:36 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-02 15:38:36 +0100
commit2de81da3082a0f9010c7e89001ea3df91d21da84 (patch)
treeba69d21fddddfd313716bab94aaa9c93083bb38a /src
parent99f80aea7d1afe8c0137dbbd3228f370cf82a91c (diff)
parent010fc2a0cc744109eef8d6e8be34297518df5d4d (diff)
downloadrabbitmq-server-git-2de81da3082a0f9010c7e89001ea3df91d21da84.tar.gz
merge in from 21087. Behaviour is now broken because the timeout can get > 10seconds which means the memory_report timer will always fire and reset the timeout - thus the queue process will never hibernate.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_access_control.erl4
-rw-r--r--src/rabbit_amqqueue_process.erl53
-rw-r--r--src/rabbit_basic.erl71
-rw-r--r--src/rabbit_error_logger.erl10
-rw-r--r--src/rabbit_exchange.erl24
-rw-r--r--src/rabbit_log.erl12
-rw-r--r--src/rabbit_reader.erl2
-rw-r--r--src/rabbit_tests.erl12
8 files changed, 132 insertions, 56 deletions
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 99b912ec09..6ff7a1046c 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -245,8 +245,8 @@ add_vhost(VHostPath) ->
[{<<"">>, direct},
{<<"amq.direct">>, direct},
{<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
{<<"amq.fanout">>, fanout}]],
ok;
[_] ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2bd170a265..e847b34c6c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -36,7 +36,7 @@
-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
--define(HIBERNATE_AFTER, 1000).
+-define(HIBERNATE_AFTER_MIN, 1000).
-define(MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds
-export([start_link/1]).
@@ -57,7 +57,9 @@
next_msg_id,
active_consumers,
blocked_consumers,
- memory_report_timer
+ memory_report_timer,
+ hibernate_after,
+ hibernated_at
}).
-record(consumer, {tag, ack_required}).
@@ -110,8 +112,10 @@ init(Q = #amqqueue { name = QName, durable = Durable }) ->
next_msg_id = 1,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
- memory_report_timer = start_memory_timer()
- }, ?HIBERNATE_AFTER}.
+ memory_report_timer = start_memory_timer(),
+ hibernate_after = ?HIBERNATE_AFTER_MIN,
+ hibernated_at = undefined
+ }, ?HIBERNATE_AFTER_MIN}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -130,14 +134,44 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
reply(Reply, NewState = #q { memory_report_timer = undefined }) ->
- {reply, Reply, start_memory_timer(NewState), ?HIBERNATE_AFTER};
+ reply(Reply, start_memory_timer(NewState));
+reply(Reply, NewState = #q { hibernated_at = undefined }) ->
+ {reply, Reply, NewState, NewState #q.hibernate_after};
reply(Reply, NewState) ->
- {reply, Reply, NewState, ?HIBERNATE_AFTER}.
+ NewState1 = adjust_hibernate_after(NewState),
+ {reply, Reply, NewState1, NewState1 #q.hibernate_after}.
noreply(NewState = #q { memory_report_timer = undefined }) ->
- {noreply, start_memory_timer(NewState), ?HIBERNATE_AFTER};
+ noreply(start_memory_timer(NewState));
+noreply(NewState = #q { hibernated_at = undefined }) ->
+ {noreply, NewState, NewState #q.hibernate_after};
noreply(NewState) ->
- {noreply, NewState, ?HIBERNATE_AFTER}.
+ NewState1 = adjust_hibernate_after(NewState),
+ {noreply, NewState1, NewState1 #q.hibernate_after}.
+
+adjust_hibernate_after(State = #q { hibernated_at = undefined }) ->
+ State;
+adjust_hibernate_after(State = #q { hibernated_at = Then,
+ hibernate_after = Timeout }) ->
+ State1 = State #q { hibernated_at = undefined },
+ NapLengthMicros = timer:now_diff(now(), Then),
+ TimeoutMicros = Timeout * 1000,
+ LowTargetMicros = TimeoutMicros * 4,
+ HighTargetMicros = LowTargetMicros * 4,
+ if
+ NapLengthMicros < LowTargetMicros ->
+ %% nap was too short, don't go to sleep as soon
+ State1 #q { hibernate_after = Timeout * 2 };
+
+ NapLengthMicros > HighTargetMicros ->
+ %% nap was long, try going to sleep sooner
+ Timeout1 = lists:max([?HIBERNATE_AFTER_MIN, round(Timeout / 2)]),
+ State1 #q { hibernate_after = Timeout1 };
+
+ true ->
+ %% nap and timeout seem to be in the right relationship. stay here
+ State1
+ end.
start_memory_timer() ->
{ok, TRef} = timer:apply_interval(?MEMORY_REPORT_TIME_INTERVAL,
@@ -861,7 +895,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(timeout, State) ->
%% TODO: Once we drop support for R11B-5, we can change this to
%% {noreply, State, hibernate};
- State1 = stop_memory_timer(report_memory(State)),
+ State1 = (stop_memory_timer(report_memory(State)))
+ #q { hibernated_at = now() },
proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]);
handle_info(Info, State) ->
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 63d6a4815d..f70d6067b3 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -34,20 +34,29 @@
-include("rabbit_framing.hrl").
-export([publish/1, message/4, message/5, message/6, delivery/4]).
+-export([properties/1, publish/4, publish/7]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(publish/1 :: (delivery()) ->
- {ok, routing_result(), [pid()]} | not_found()).
+-type(properties_input() :: (amqp_properties() | [{atom(), any()}])).
+-type(publish_result() :: ({ok, routing_result(), [pid()]} | not_found())).
+
+-spec(publish/1 :: (delivery()) -> publish_result()).
-spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()).
--spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) ->
- message()).
--spec(message/5 :: (exchange_name(), routing_key(), binary(), binary(),
- guid()) -> message()).
--spec(message/6 :: (exchange_name(), routing_key(), binary(), binary(),
- guid(), bool()) -> message()).
+-spec(message/4 :: (exchange_name(), routing_key(), properties_input(),
+ binary()) -> message()).
+-spec(message/5 :: (exchange_name(), routing_key(), properties_input(),
+ binary(), guid()) -> message()).
+-spec(message/6 :: (exchange_name(), routing_key(), properties_input(),
+ binary(), guid(), bool()) -> message()).
+-spec(properties/1 :: (properties_input()) -> amqp_properties()).
+-spec(publish/4 :: (exchange_name(), routing_key(), properties_input(),
+ binary()) -> publish_result()).
+-spec(publish/7 :: (exchange_name(), routing_key(), bool(), bool(),
+ maybe(txn()), properties_input(), binary()) ->
+ publish_result()).
-endif.
@@ -67,16 +76,17 @@ delivery(Mandatory, Immediate, Txn, Message) ->
#delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
sender = self(), message = Message}.
-message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) ->
- message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, rabbit_guid:guid()).
+message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
+ message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, rabbit_guid:guid()).
-message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, MsgId) ->
- message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, MsgId, false).
+message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId) ->
+ message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId, false).
-message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, MsgId, IsPersistent) ->
+message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId, IsPersistent) ->
+ Properties = properties(RawProperties),
{ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
Content = #content{class_id = ClassId,
- properties = #'P_basic'{content_type = ContentTypeBin},
+ properties = Properties,
properties_bin = none,
payload_fragments_rev = [BodyBin]},
#basic_message{exchange_name = ExchangeName,
@@ -84,3 +94,36 @@ message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, MsgId, IsPersisten
content = Content,
guid = MsgId,
is_persistent = IsPersistent}.
+
+properties(P = #'P_basic'{}) ->
+ P;
+properties(P) when is_list(P) ->
+ %% Yes, this is O(length(P) * record_info(size, 'P_basic') / 2),
+ %% i.e. slow. Use the definition of 'P_basic' directly if
+ %% possible!
+ lists:foldl(fun ({Key, Value}, Acc) ->
+ case indexof(record_info(fields, 'P_basic'), Key) of
+ 0 -> throw({unknown_basic_property, Key});
+ N -> setelement(N + 1, Acc, Value)
+ end
+ end, #'P_basic'{}, P).
+
+indexof(L, Element) -> indexof(L, Element, 1).
+
+indexof([], _Element, _N) -> 0;
+indexof([Element | _Rest], Element, N) -> N;
+indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
+
+%% Convenience function, for avoiding round-trips in calls across the
+%% erlang distributed network.
+publish(ExchangeName, RoutingKeyBin, Properties, BodyBin) ->
+ publish(ExchangeName, RoutingKeyBin, false, false, none, Properties,
+ BodyBin).
+
+%% Convenience function, for avoiding round-trips in calls across the
+%% erlang distributed network.
+publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties,
+ BodyBin) ->
+ publish(delivery(Mandatory, Immediate, Txn,
+ message(ExchangeName, RoutingKeyBin,
+ properties(Properties), BodyBin))).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 76016a8cb2..b28574b707 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -31,6 +31,7 @@
-module(rabbit_error_logger).
-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
-define(LOG_EXCH_NAME, <<"amq.rabbitmq.log">>).
@@ -75,10 +76,7 @@ publish(_Other, _Format, _Data, _State) ->
publish1(RoutingKey, Format, Data, LogExch) ->
{ok, _RoutingRes, _DeliveredQPids} =
- rabbit_basic:publish(
- rabbit_basic:delivery(
- false, false, none,
- rabbit_basic:message(
- LogExch, RoutingKey, <<"text/plain">>,
- list_to_binary(io_lib:format(Format, Data))))),
+ rabbit_basic:publish(LogExch, RoutingKey, false, false, none,
+ #'P_basic'{content_type = <<"text/plain">>},
+ list_to_binary(io_lib:format(Format, Data))),
ok.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 7d9948f06f..8fb9eae304 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -235,9 +235,9 @@ route(X = #exchange{type = topic}, RoutingKey, _Content) ->
route(X = #exchange{type = headers}, _RoutingKey, Content) ->
Headers = case (Content#content.properties)#'P_basic'.headers of
- undefined -> [];
- H -> sort_arguments(H)
- end,
+ undefined -> [];
+ H -> sort_arguments(H)
+ end,
match_bindings(X, fun (#binding{args = Spec}) ->
headers_match(Spec, Headers)
end);
@@ -489,14 +489,14 @@ parse_x_match(Other) ->
%%
headers_match(Pattern, Data) ->
MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
- {value, {_, longstr, MK}} -> parse_x_match(MK);
- {value, {_, Type, MK}} ->
- rabbit_log:warning("Invalid x-match field type ~p "
+ {value, {_, longstr, MK}} -> parse_x_match(MK);
+ {value, {_, Type, MK}} ->
+ rabbit_log:warning("Invalid x-match field type ~p "
"(value ~p); expected longstr",
- [Type, MK]),
- default_headers_match_kind();
- _ -> default_headers_match_kind()
- end,
+ [Type, MK]),
+ default_headers_match_kind();
+ _ -> default_headers_match_kind()
+ end,
headers_match(Pattern, Data, true, false, MatchKind).
headers_match([], _Data, AllMatch, _AnyMatch, all) ->
@@ -523,8 +523,8 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
%% the corresponding data field. I've interpreted that to
%% mean a type of "void" for the pattern field.
PT == void -> {AllMatch, true};
- %% Similarly, it's not specified, but I assume that a
- %% mismatched type causes a mismatched value.
+ %% Similarly, it's not specified, but I assume that a
+ %% mismatched type causes a mismatched value.
PT =/= DT -> {false, AnyMatch};
PV == DV -> {AllMatch, true};
true -> {false, AnyMatch}
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index f408336e94..dd5b498b07 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -75,7 +75,7 @@ debug(Fmt, Args) when is_list(Args) ->
message(Direction, Channel, MethodRecord, Content) ->
gen_server:cast(?SERVER,
- {message, Direction, Channel, MethodRecord, Content}).
+ {message, Direction, Channel, MethodRecord, Content}).
info(Fmt) ->
gen_server:cast(?SERVER, {info, Fmt}).
@@ -112,11 +112,11 @@ handle_cast({debug, Fmt, Args}, State) ->
{noreply, State};
handle_cast({message, Direction, Channel, MethodRecord, Content}, State) ->
io:format("~s ch~p ~p~n",
- [case Direction of
- in -> "-->";
- out -> "<--" end,
- Channel,
- {MethodRecord, Content}]),
+ [case Direction of
+ in -> "-->";
+ out -> "<--" end,
+ Channel,
+ {MethodRecord, Content}]),
{noreply, State};
handle_cast({info, Fmt}, State) ->
error_logger:info_msg(Fmt),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index ef8038e7aa..426b99eba1 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -231,7 +231,7 @@ start_connection(Parent, Deb, ClientSock) ->
connection_state = pre_init},
handshake, 8))
catch
- Ex -> (if Ex == connection_closed_abruptly ->
+ Ex -> (if Ex == connection_closed_abruptly ->
fun rabbit_log:warning/2;
true ->
fun rabbit_log:error/2
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 05a6393b23..0b70be0c9b 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -716,7 +716,7 @@ benchmark_disk_queue() ->
passed.
rdq_message(MsgId, MsgBody) ->
- rabbit_basic:message(x, <<>>, <<>>, MsgBody, MsgId).
+ rabbit_basic:message(x, <<>>, [], MsgBody, MsgId).
rdq_match_message(
#basic_message { guid = MsgId, content =
@@ -976,20 +976,20 @@ rdq_test_mixed_queue_modes() ->
{ok, MS} = rabbit_mixed_queue:init(q, true, mixed),
MS2 = lists:foldl(
fun (_N, MS1) ->
- Msg = rabbit_basic:message(x, <<>>, <<>>, Payload),
+ Msg = rabbit_basic:message(x, <<>>, [], Payload),
{ok, MS1a} = rabbit_mixed_queue:publish(Msg, MS1),
MS1a
end, MS, lists:seq(1,10)),
MS4 = lists:foldl(
fun (_N, MS3) ->
- Msg = (rabbit_basic:message(x, <<>>, <<>>, Payload))
+ Msg = (rabbit_basic:message(x, <<>>, [], Payload))
#basic_message { is_persistent = true },
{ok, MS3a} = rabbit_mixed_queue:publish(Msg, MS3),
MS3a
end, MS2, lists:seq(1,10)),
MS6 = lists:foldl(
fun (_N, MS5) ->
- Msg = rabbit_basic:message(x, <<>>, <<>>, Payload),
+ Msg = rabbit_basic:message(x, <<>>, [], Payload),
{ok, MS5a} = rabbit_mixed_queue:publish(Msg, MS5),
MS5a
end, MS4, lists:seq(1,10)),
@@ -1050,11 +1050,11 @@ rdq_test_mixed_queue_modes() ->
rdq_test_mode_conversion_mid_txn() ->
Payload = <<0:(8*256)>>,
MsgIdsA = lists:seq(0,9),
- MsgsA = [ rabbit_basic:message(x, <<>>, <<>>, Payload, MsgId,
+ MsgsA = [ rabbit_basic:message(x, <<>>, [], Payload, MsgId,
(0 == MsgId rem 2))
|| MsgId <- MsgIdsA ],
MsgIdsB = lists:seq(10,20),
- MsgsB = [ rabbit_basic:message(x, <<>>, <<>>, Payload, MsgId,
+ MsgsB = [ rabbit_basic:message(x, <<>>, [], Payload, MsgId,
(0 == MsgId rem 2))
|| MsgId <- MsgIdsB ],