diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 120 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_framing_channel.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 70 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 42 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 1 |
11 files changed, 189 insertions, 176 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9871b60427..1d1ccef77e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -37,8 +37,9 @@ update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2]). -export([pseudo_queue/2]). --export([lookup/1, with/2, with_or_die/2, - stat/1, stat_all/0, deliver/2, requeue/3, ack/4]). +-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, + check_exclusive_access/2, with_exclusive_access_or_die/3, + stat/1, deliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). @@ -58,7 +59,6 @@ -ifdef(use_specs). --type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}). -type(qlen() :: {'ok', non_neg_integer()}). -type(qfun(A) :: fun ((amqqueue()) -> A)). -type(ok_or_errors() :: @@ -66,10 +66,14 @@ -spec(start/0 :: () -> 'ok'). -spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), - maybe(pid())) -> amqqueue()). + maybe(pid())) -> {'new' | 'existing', amqqueue()}). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). +-spec(assert_equivalence/5 :: (amqqueue(), boolean(), boolean(), amqp_table(), + maybe(pid)) -> ok). +-spec(check_exclusive_access/2 :: (amqqueue(), pid()) -> 'ok'). +-spec(with_exclusive_access_or_die/3 :: (queue_name(), pid(), qfun(A)) -> A). -spec(list/1 :: (vhost()) -> [amqqueue()]). -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (amqqueue()) -> [info()]). @@ -79,8 +83,8 @@ -spec(consumers/1 :: (amqqueue()) -> [{pid(), ctag(), boolean()}]). -spec(consumers_all/1 :: (vhost()) -> [{queue_name(), pid(), ctag(), boolean()}]). --spec(stat/1 :: (amqqueue()) -> qstats()). --spec(stat_all/0 :: () -> [qstats()]). +-spec(stat/1 :: + (amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). -spec(delete/3 :: (amqqueue(), 'false', 'false') -> qlen(); (amqqueue(), 'true' , 'false') -> qlen() | {'error', 'in_use'}; @@ -213,6 +217,31 @@ with(Name, F) -> with_or_die(Name, F) -> with(Name, F, fun () -> rabbit_misc:not_found(Name) end). +assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, + Durable, AutoDelete, _Args, Owner) -> + check_exclusive_access(Q, Owner, strict); +assert_equivalence(#amqqueue{name = QueueName}, + _Durable, _AutoDelete, _Args, _Owner) -> + rabbit_misc:protocol_error( + not_allowed, "parameters for ~s not equivalent", + [rabbit_misc:rs(QueueName)]). + +check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax). + +check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) -> + ok; +check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) -> + ok; +check_exclusive_access(#amqqueue{name = QueueName}, _ReaderPid, _MatchType) -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QueueName)]). + +with_exclusive_access_or_die(Name, ReaderPid, F) -> + with_or_die(Name, + fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end). + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, @@ -247,9 +276,6 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). -stat_all() -> - lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). - delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). @@ -395,7 +421,7 @@ delegate_call(Pid, Msg, Timeout) -> delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). delegate_pcall(Pid, Pri, Msg, Timeout) -> - delegate:invoke(Pid, + delegate:invoke(Pid, fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). delegate_pcast(Pid, Pri, Msg) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6ba4f29880..ec59095d25 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -137,7 +137,7 @@ declare(Recover, From, backing_queue = BQ, backing_queue_state = undefined}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of not_found -> {stop, normal, not_found, State}; - Q -> gen_server2:reply(From, Q), + Q -> gen_server2:reply(From, {new, Q}), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [self()]), @@ -146,7 +146,7 @@ declare(Recover, From, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), noreply(State#q{backing_queue_state = BQS}); - Q1 -> {stop, normal, Q1, State} + Q1 -> {stop, normal, {existing, Q1}, State} end. terminate_shutdown(Fun, State) -> @@ -692,11 +692,10 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, end end; -handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, - backing_queue = BQ, +handle_call(stat, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS, active_consumers = ActiveConsumers}) -> - reply({ok, Name, BQ:len(BQS), queue:len(ActiveConsumers)}, State); + reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8649ecc7f1..94a20fbdb1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -284,20 +284,15 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> Reader ! {channel_exit, Channel, Reason}, State#ch{state = terminating}. -return_queue_declare_ok(State, NoWait, Q) -> - NewState = State#ch{most_recently_declared_queue = - (Q#amqqueue.name)#resource.name}, +return_queue_declare_ok(#resource{name = ActualName}, + NoWait, MessageCount, ConsumerCount, State) -> + NewState = State#ch{most_recently_declared_queue = ActualName}, case NoWait of true -> {noreply, NewState}; - false -> - {ok, ActualName, MessageCount, ConsumerCount} = - rabbit_misc:with_exit_handler( - fun () -> {ok, Q#amqqueue.name, 0, 0} end, - fun () -> rabbit_amqqueue:stat(Q) end), - Reply = #'queue.declare_ok'{queue = ActualName#resource.name, - message_count = MessageCount, - consumer_count = ConsumerCount}, - {reply, Reply, NewState} + false -> Reply = #'queue.declare_ok'{queue = ActualName, + message_count = MessageCount, + consumer_count = ConsumerCount}, + {reply, Reply, NewState} end. check_resource_access(Username, Resource, Perm) -> @@ -329,19 +324,6 @@ check_write_permitted(Resource, #ch{ username = Username}) -> check_read_permitted(Resource, #ch{ username = Username}) -> check_resource_access(Username, Resource, read). -check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) -> - ok; -check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) -> - ok; -check_exclusive_access(#amqqueue{name = QName}, _ReaderPid, _MatchType) -> - rabbit_misc:protocol_error( - resource_locked, - "cannot obtain exclusive access to locked ~s", [rabbit_misc:rs(QName)]). - -with_exclusive_access_or_die(QName, ReaderPid, F) -> - rabbit_amqqueue:with_or_die( - QName, fun (Q) -> check_exclusive_access(Q, ReaderPid, lax), F(Q) end). - expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); @@ -444,12 +426,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Exchange, rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), case RoutingRes of - routed -> - ok; - unroutable -> - ok = basic_return(Message, WriterPid, no_route); - not_delivered -> - ok = basic_return(Message, WriterPid, no_consumers) + routed -> ok; + unroutable -> ok = basic_return(Message, WriterPid, no_route); + not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) end, {noreply, case TxnKey of none -> State; @@ -480,7 +459,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - case with_exclusive_access_or_die( + case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, @@ -499,7 +478,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, Content), {noreply, State1#ch{next_tag = DeliveryTag + 1}}; empty -> - {reply, #'basic.get_empty'{cluster_id = <<>>}, State} + {reply, #'basic.get_empty'{}, State} end; handle_method(#'basic.consume'{queue = QueueNameBin, @@ -524,7 +503,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, %% We get the queue process to send the consume_ok on our %% behalf. This is for symmetry with basic.cancel - see %% the comment in that method for why. - case with_exclusive_access_or_die( + case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_consume( @@ -716,7 +695,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, exclusive = ExclusiveDeclare, auto_delete = AutoDelete, nowait = NoWait, - arguments = Args}, + arguments = Args} = Declare, _, State = #ch{virtual_host = VHostPath, reader_pid = ReaderPid, queue_collector_pid = CollectorPid}) -> @@ -724,37 +703,40 @@ handle_method(#'queue.declare'{queue = QueueNameBin, true -> ReaderPid; false -> none end, - %% We use this in both branches, because queue_declare may yet return an - %% existing queue. ActualNameBin = case QueueNameBin of <<>> -> rabbit_guid:binstring_guid("amq.gen"); Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - Q = case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, - Args, Owner) of - #amqqueue{name = QueueName, - durable = Durable1, - auto_delete = AutoDelete1} = Q1 - when Durable =:= Durable1, AutoDelete =:= AutoDelete1 -> - check_exclusive_access(Q1, Owner, strict), - check_configure_permitted(QueueName, State), - %% We need to notify the reader within the channel - %% process so that we can be sure there are no - %% outstanding exclusive queues being declared as the - %% connection shuts down. - case Owner of - none -> ok; - _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q1) - end, - Q1; - %% non-equivalence trumps exclusivity arbitrarily - #amqqueue{name = QueueName} -> - rabbit_misc:protocol_error( - precondition_failed, "parameters for ~s not equivalent", - [rabbit_misc:rs(QueueName)]) - end, - return_queue_declare_ok(State, NoWait, Q); + check_configure_permitted(QueueName, State), + case rabbit_amqqueue:with( + QueueName, + fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( + Q, Durable, AutoDelete, Args, Owner), + rabbit_amqqueue:stat(Q) + end) of + {ok, MessageCount, ConsumerCount} -> + return_queue_declare_ok(QueueName, NoWait, MessageCount, + ConsumerCount, State); + {error, not_found} -> + case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, + Args, Owner) of + {new, Q = #amqqueue{}} -> + %% We need to notify the reader within the channel + %% process so that we can be sure there are no + %% outstanding exclusive queues being declared as + %% the connection shuts down. + ok = case Owner of + none -> ok; + _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) + end, + return_queue_declare_ok(QueueName, NoWait, 0, 0, State); + {existing, _Q} -> + %% must have been created between the stat and the + %% declare. Loop around again. + handle_method(Declare, none, State) + end + end; handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, @@ -763,8 +745,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin, reader_pid = ReaderPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), - Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun (Q) -> Q end), - return_queue_declare_ok(State, NoWait, Q); + {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = + rabbit_amqqueue:with_or_die( + QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), + ok = rabbit_amqqueue:check_exclusive_access(Q, ReaderPid), + return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, + State); handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, @@ -773,7 +759,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), - case with_exclusive_access_or_die( + case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> @@ -809,7 +795,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin, _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - {ok, PurgedMessageCount} = with_exclusive_access_or_die( + {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:purge(Q) end), return_ok(State, NoWait, @@ -917,7 +903,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, - fun (_X, Q) -> check_exclusive_access(Q, ReaderPid, lax) end) of + fun (_X, Q) -> + rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) + end) of {error, exchange_not_found} -> rabbit_misc:not_found(ExchangeName); {error, queue_not_found} -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 7072055cde..bd9d3d2926 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -198,7 +198,7 @@ assert_equivalence(X = #exchange{ durable = Durable, assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete, _Args) -> rabbit_misc:protocol_error( - precondition_failed, + not_allowed, "cannot redeclare ~s with different type, durable or autodelete value", [rabbit_misc:rs(Name)]). @@ -215,7 +215,7 @@ assert_args_equivalence(#exchange{ name = Name, Ae2 = alternate_exchange_value(Args), if Ae1==Ae2 -> ok; true -> rabbit_misc:protocol_error( - precondition_failed, + not_allowed, "cannot redeclare ~s with inequivalent args", [rabbit_misc:rs(Name)]) end. @@ -335,7 +335,7 @@ delete_queue_bindings(QueueName, FwdDeleteFun) -> Module = type_to_module(Type), case IsDeleted of auto_deleted -> Module:delete(X, Bs); - no_delete -> Module:remove_bindings(X, Bs) + not_deleted -> Module:remove_bindings(X, Bs) end end, Cleanup) end. @@ -438,11 +438,11 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> end) of Err = {error, _} -> Err; - {{Action, X = #exchange{ type = Type }}, B} -> + {{IsDeleted, X = #exchange{ type = Type }}, B} -> Module = type_to_module(Type), - case Action of - auto_delete -> Module:delete(X, [B]); - no_delete -> Module:remove_bindings(X, [B]) + case IsDeleted of + auto_deleted -> Module:delete(X, [B]); + not_deleted -> Module:remove_bindings(X, [B]) end end. @@ -526,10 +526,10 @@ delete(ExchangeName, IfUnused) -> end. maybe_auto_delete(Exchange = #exchange{auto_delete = false}) -> - {no_delete, Exchange}; + {not_deleted, Exchange}; maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> case conditional_delete(Exchange) of - {error, in_use} -> {no_delete, Exchange}; + {error, in_use} -> {not_deleted, Exchange}; {deleted, Exchange, []} -> {auto_deleted, Exchange} end. diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index b7c6aa96fa..bc1a2a0835 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -76,29 +76,27 @@ mainloop(ChannelPid) -> {method, MethodName, FieldsBin} = read_frame(ChannelPid), Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin), case rabbit_framing:method_has_content(MethodName) of - true -> rabbit_channel:do(ChannelPid, Method, - collect_content(ChannelPid, MethodName)); + true -> {ClassId, _MethodId} = rabbit_framing:method_id(MethodName), + rabbit_channel:do(ChannelPid, Method, + collect_content(ChannelPid, ClassId)); false -> rabbit_channel:do(ChannelPid, Method) end, ?MODULE:mainloop(ChannelPid). -collect_content(ChannelPid, MethodName) -> - {ClassId, _MethodId} = rabbit_framing:method_id(MethodName), +collect_content(ChannelPid, ClassId) -> case read_frame(ChannelPid) of - {content_header, HeaderClassId, 0, BodySize, PropertiesBin} -> - if HeaderClassId == ClassId -> - Payload = collect_content_payload(ChannelPid, BodySize, []), - #content{class_id = ClassId, - properties = none, - properties_bin = PropertiesBin, - payload_fragments_rev = Payload}; - true -> - rabbit_misc:protocol_error( - command_invalid, - "expected content header for class ~w, " - "got one for class ~w instead", - [ClassId, HeaderClassId]) - end; + {content_header, ClassId, 0, BodySize, PropertiesBin} -> + Payload = collect_content_payload(ChannelPid, BodySize, []), + #content{class_id = ClassId, + properties = none, + properties_bin = PropertiesBin, + payload_fragments_rev = Payload}; + {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} -> + rabbit_misc:protocol_error( + command_invalid, + "expected content header for class ~w, " + "got one for class ~w instead", + [ClassId, HeaderClassId]); _ -> rabbit_misc:protocol_error( command_invalid, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 1f61111ccd..459c0fb67b 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -350,10 +350,13 @@ blank_state(QueueName) -> StrName = queue_name_to_dir_name(QueueName), Dir = filename:join(queues_dir(), StrName), ok = filelib:ensure_dir(filename:join(Dir, "nothing")), - #qistate { dir = Dir, - segments = segments_new(), - journal_handle = undefined, - dirty_count = 0 }. + {ok, MaxJournal} = + application:get_env(rabbit, queue_index_max_journal_entries), + #qistate { dir = Dir, + segments = segments_new(), + journal_handle = undefined, + dirty_count = 0, + max_journal_entries = MaxJournal }. detect_clean_shutdown(Dir) -> case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f2a903dcca..a54e0de97d 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -678,11 +678,13 @@ handle_method0(#'connection.close'{}, State = #v1{connection_state = running}) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); -handle_method0(#'connection.close'{}, State = #v1{connection_state = CS}) +handle_method0(#'connection.close'{}, + State = #v1{connection_state = CS, + sock = Sock}) when CS =:= closing; CS =:= closed -> %% We're already closed or closing, so we don't need to cleanup %% anything. - ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}), + ok = send_on_channel0(Sock, #'connection.close_ok'{}), State; handle_method0(#'connection.close_ok'{}, State = #v1{connection_state = closed}) -> @@ -805,8 +807,8 @@ map_exception(Channel, Reason) -> ShouldClose = SuggestedClose or (Channel == 0), {ClassId, MethodId} = case FailedMethod of {_, _} -> FailedMethod; - none -> {0, 0}; - _ -> rabbit_framing:method_id(FailedMethod) + none -> {0, 0}; + _ -> rabbit_framing:method_id(FailedMethod) end, {CloseChannel, CloseMethod} = case ShouldClose of diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 5cd15a9462..75196bc0d4 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -90,13 +90,13 @@ match_routing_key(Name, RoutingKey) -> lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). lookup_qpids(Queues) -> - sets:fold( + lists:foldl( fun (Key, Acc) -> case mnesia:dirty_read({rabbit_queue, Key}) of [#amqqueue{pid = QPid}] -> [QPid | Acc]; [] -> Acc end - end, [], sets:from_list(Queues)). + end, [], lists:usort(Queues)). %%-------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index cffd0e7fd9..dd6a90892a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -517,42 +517,38 @@ test_field_values() -> passed. %% Test that content frames don't exceed frame-max -test_content_framing(FrameMax, Fragments) -> +test_content_framing(FrameMax, BodyBin) -> [Header | Frames] = rabbit_binary_generator:build_simple_content_frames( 1, - #content{class_id = 0, properties_bin = <<>>, - payload_fragments_rev = Fragments}, + rabbit_binary_generator:ensure_content_encoded( + rabbit_basic:build_content(#'P_basic'{}, BodyBin)), FrameMax), %% header is formatted correctly and the size is the total of the %% fragments <<_FrameHeader:7/binary, _ClassAndWeight:4/binary, BodySize:64/unsigned, _Rest/binary>> = list_to_binary(Header), - BodySize = size(list_to_binary(Fragments)), - false = lists:any( - fun (ContentFrame) -> - FrameBinary = list_to_binary(ContentFrame), - %% assert - <<_TypeAndChannel:3/binary, - Size:32/unsigned, - _Payload:Size/binary, - 16#CE>> = FrameBinary, - size(FrameBinary) > FrameMax - end, - Frames), + BodySize = size(BodyBin), + true = lists:all( + fun (ContentFrame) -> + FrameBinary = list_to_binary(ContentFrame), + %% assert + <<_TypeAndChannel:3/binary, + Size:32/unsigned, _Payload:Size/binary, 16#CE>> = + FrameBinary, + size(FrameBinary) =< FrameMax + end, Frames), passed. test_content_framing() -> %% no content - passed = test_content_framing(4096, []), - passed = test_content_framing(4096, [<<>>]), + passed = test_content_framing(4096, <<>>), %% easily fit in one frame - passed = test_content_framing(4096, [<<"Easy">>]), + passed = test_content_framing(4096, <<"Easy">>), %% exactly one frame (empty frame = 8 bytes) - passed = test_content_framing(11, [<<"One">>]), + passed = test_content_framing(11, <<"One">>), %% more than one frame - passed = test_content_framing(20, [<<"into more than one frame">>, - <<"This will have to go">>]), + passed = test_content_framing(11, <<"More than one frame">>), passed. test_topic_match(P, R) -> @@ -954,10 +950,11 @@ test_server_status() -> Writer = spawn(fun () -> receive shutdown -> ok end end), Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, self()), - [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare( + [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], + {new, Queue = #amqqueue{}} <- + [rabbit_amqqueue:declare( rabbit_misc:r(<<"/">>, queue, Name), - false, false, [], none) || - Name <- [<<"foo">>, <<"bar">>]], + false, false, [], none)]], ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined, <<"ctag">>, true, undefined), @@ -1114,11 +1111,7 @@ test_memory_pressure() -> ok = test_memory_pressure_receive_flow(true), %% if we publish at this point, the channel should die - Content = #content{class_id = element(1, rabbit_framing:method_id( - 'basic.publish')), - properties = none, - properties_bin = <<>>, - payload_fragments_rev = []}, + Content = rabbit_basic:build_content(#'P_basic'{}, <<>>), ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content), expect_normal_channel_termination(MRef0, Ch0), @@ -1627,7 +1620,7 @@ test_queue_index() -> MostOfASegment = trunc(SegmentSize*0.75), stop_msg_store(), ok = empty_test_queue(), - SeqIdsA = lists:seq(0,MostOfASegment-1), + SeqIdsA = lists:seq(0, MostOfASegment-1), SeqIdsB = lists:seq(MostOfASegment, 2*MostOfASegment), {0, _Terms, Qi0} = test_queue_init(), {0, 0, Qi1} = rabbit_queue_index:bounds(Qi0), @@ -1900,17 +1893,14 @@ variable_queue_wait_for_shuffling_end(VQ) -> test_queue_recover() -> Count = 2*rabbit_queue_index:next_segment_boundary(0), TxID = rabbit_guid:guid(), - #amqqueue { pid = QPid, name = QName } = + {new, #amqqueue { pid = QPid, name = QName }} = rabbit_amqqueue:declare(test_queue(), true, false, [], none), - Msg = fun() -> rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{delivery_mode = 2}, <<>>) end, - Delivery = #delivery{mandatory = false, - immediate = false, - txn = TxID, - sender = self(), - message = Msg()}, - [true = rabbit_amqqueue:deliver(QPid, Delivery) || _ <- lists:seq(1, Count)], + Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{delivery_mode = 2}, <<>>), + Delivery = #delivery{mandatory = false, immediate = false, txn = TxID, + sender = self(), message = Msg}, + [true = rabbit_amqqueue:deliver(QPid, Delivery) || + _ <- lists:seq(1, Count)], rabbit_amqqueue:commit_all([QPid], TxID, self()), exit(QPid, kill), MRef = erlang:monitor(process, QPid), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index b7d2460bfd..5893385aa1 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -111,21 +111,18 @@ %% should be very few betas remaining, thus the transition is fast (no %% work needs to be done for the gamma -> delta transition). %% -%% The conversion of betas to gammas is done on publish, in batches of -%% exactly ?RAM_INDEX_BATCH_SIZE. This value should not be too small, -%% otherwise the frequent operations on the queues of q2 and q3 will -%% not be effectively amortised, nor should it be too big, otherwise a -%% publish will take too long as it attempts to do too much work and -%% thus stalls the queue. Therefore, it must be just right. This -%% approach is preferable to doing work on a new queue-duration -%% because converting all the indicated betas to gammas at that point -%% can be far too expensive, thus requiring batching and segmented -%% work anyway, and furthermore, if we're not getting any publishes -%% anyway then the queue is either being drained or has no -%% consumers. In the latter case, an expensive beta to delta -%% transition doesn't matter, and in the former case the queue's -%% shrinking length makes it unlikely (though not impossible) that the -%% duration will become 0. +%% The conversion of betas to gammas is done on all actions that can +%% increase the message count, such as publish and requeue, and when +%% the queue is asked to reduce its memory usage. The conversion is +%% done in batches of exactly ?RAM_INDEX_BATCH_SIZE. This value should +%% not be too small, otherwise the frequent operations on the queues +%% of q2 and q3 will not be effectively amortised (switching the +%% direction of queue access defeats amortisation), nor should it be +%% too big, otherwise converting a batch stalls the queue for too +%% long. Therefore, it must be just right. This approach is preferable +%% to doing work on a new queue-duration because converting all the +%% indicated betas to gammas at that point can be far too expensive, +%% thus requiring batching and segmented work anyway. %% %% In the queue we only keep track of messages that are pending %% delivery. This is fine for queue purging, but can be expensive for @@ -394,6 +391,9 @@ terminate(State) -> %% the only difference between purge and delete is that delete also %% needs to delete everything that's been delivered and not ack'd. delete_and_terminate(State) -> + %% TODO: there is no need to interact with qi at all - which we do + %% as part of 'purge' and 'remove_pending_ack', other than + %% deleting it. {_PurgeCount, State1} = purge(State), State2 = #vqstate { index_state = IndexState, msg_store_clients = {{MSCStateP, PRef}, @@ -412,6 +412,9 @@ delete_and_terminate(State) -> msg_store_clients = undefined }). purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> + %% TODO: when there are no pending acks, which is a common case, + %% we could simply wipe the qi instead of issuing delivers and + %% acks for all the messages. IndexState1 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4, IndexState), State1 = #vqstate { q1 = Q1, index_state = IndexState2 } = @@ -1139,6 +1142,10 @@ limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) -> {Q2a, {Reduction1, IndexState1}} = limit_ram_index(fun bpqueue:map_fold_filter_l/4, Q2, {Reduction, IndexState}), + %% TODO: we shouldn't be writing index + %% entries for messages that can never end up + %% in delta due them residing in the only + %% segment held by q3. {Q3a, {Reduction2, IndexState2}} = limit_ram_index(fun bpqueue:map_fold_filter_r/4, Q3, {Reduction1, IndexState1}), @@ -1152,9 +1159,8 @@ limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) -> State end. -limit_ram_index(_MapFoldFilterFun, Q, {Reduction, IndexState}) - when Reduction == 0 -> - {Q, {Reduction, IndexState}}; +limit_ram_index(_MapFoldFilterFun, Q, {0, IndexState}) -> + {Q, {0, IndexState}}; limit_ram_index(MapFoldFilterFun, Q, {Reduction, IndexState}) -> MapFoldFilterFun( fun erlang:'not'/1, diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 3d10dc121e..233d72913c 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -149,6 +149,7 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> shutdown(W) -> W ! shutdown, + rabbit_misc:unlink_and_capture_exit(W), ok. %--------------------------------------------------------------------------- |
