summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl46
-rw-r--r--src/rabbit_amqqueue_process.erl9
-rw-r--r--src/rabbit_channel.erl120
-rw-r--r--src/rabbit_exchange.erl18
-rw-r--r--src/rabbit_framing_channel.erl34
-rw-r--r--src/rabbit_queue_index.erl11
-rw-r--r--src/rabbit_reader.erl10
-rw-r--r--src/rabbit_router.erl4
-rw-r--r--src/rabbit_tests.erl70
-rw-r--r--src/rabbit_variable_queue.erl42
-rw-r--r--src/rabbit_writer.erl1
11 files changed, 189 insertions, 176 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9871b60427..1d1ccef77e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -37,8 +37,9 @@
update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2]).
-export([pseudo_queue/2]).
--export([lookup/1, with/2, with_or_die/2,
- stat/1, stat_all/0, deliver/2, requeue/3, ack/4]).
+-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
+ check_exclusive_access/2, with_exclusive_access_or_die/3,
+ stat/1, deliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
@@ -58,7 +59,6 @@
-ifdef(use_specs).
--type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}).
-type(qlen() :: {'ok', non_neg_integer()}).
-type(qfun(A) :: fun ((amqqueue()) -> A)).
-type(ok_or_errors() ::
@@ -66,10 +66,14 @@
-spec(start/0 :: () -> 'ok').
-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(),
- maybe(pid())) -> amqqueue()).
+ maybe(pid())) -> {'new' | 'existing', amqqueue()}).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
+-spec(assert_equivalence/5 :: (amqqueue(), boolean(), boolean(), amqp_table(),
+ maybe(pid)) -> ok).
+-spec(check_exclusive_access/2 :: (amqqueue(), pid()) -> 'ok').
+-spec(with_exclusive_access_or_die/3 :: (queue_name(), pid(), qfun(A)) -> A).
-spec(list/1 :: (vhost()) -> [amqqueue()]).
-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (amqqueue()) -> [info()]).
@@ -79,8 +83,8 @@
-spec(consumers/1 :: (amqqueue()) -> [{pid(), ctag(), boolean()}]).
-spec(consumers_all/1 ::
(vhost()) -> [{queue_name(), pid(), ctag(), boolean()}]).
--spec(stat/1 :: (amqqueue()) -> qstats()).
--spec(stat_all/0 :: () -> [qstats()]).
+-spec(stat/1 ::
+ (amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}).
-spec(delete/3 ::
(amqqueue(), 'false', 'false') -> qlen();
(amqqueue(), 'true' , 'false') -> qlen() | {'error', 'in_use'};
@@ -213,6 +217,31 @@ with(Name, F) ->
with_or_die(Name, F) ->
with(Name, F, fun () -> rabbit_misc:not_found(Name) end).
+assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q,
+ Durable, AutoDelete, _Args, Owner) ->
+ check_exclusive_access(Q, Owner, strict);
+assert_equivalence(#amqqueue{name = QueueName},
+ _Durable, _AutoDelete, _Args, _Owner) ->
+ rabbit_misc:protocol_error(
+ not_allowed, "parameters for ~s not equivalent",
+ [rabbit_misc:rs(QueueName)]).
+
+check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax).
+
+check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) ->
+ ok;
+check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) ->
+ ok;
+check_exclusive_access(#amqqueue{name = QueueName}, _ReaderPid, _MatchType) ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QueueName)]).
+
+with_exclusive_access_or_die(Name, ReaderPid, F) ->
+ with_or_die(Name,
+ fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end).
+
list(VHostPath) ->
mnesia:dirty_match_object(
rabbit_queue,
@@ -247,9 +276,6 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
-stat_all() ->
- lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)).
-
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
@@ -395,7 +421,7 @@ delegate_call(Pid, Msg, Timeout) ->
delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
delegate_pcall(Pid, Pri, Msg, Timeout) ->
- delegate:invoke(Pid,
+ delegate:invoke(Pid,
fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
delegate_pcast(Pid, Pri, Msg) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6ba4f29880..ec59095d25 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -137,7 +137,7 @@ declare(Recover, From,
backing_queue = BQ, backing_queue_state = undefined}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
- Q -> gen_server2:reply(From, Q),
+ Q -> gen_server2:reply(From, {new, Q}),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use,
[self()]),
@@ -146,7 +146,7 @@ declare(Recover, From,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
noreply(State#q{backing_queue_state = BQS});
- Q1 -> {stop, normal, Q1, State}
+ Q1 -> {stop, normal, {existing, Q1}, State}
end.
terminate_shutdown(Fun, State) ->
@@ -692,11 +692,10 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end
end;
-handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
- backing_queue = BQ,
+handle_call(stat, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS,
active_consumers = ActiveConsumers}) ->
- reply({ok, Name, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
+ reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 8649ecc7f1..94a20fbdb1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -284,20 +284,15 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) ->
Reader ! {channel_exit, Channel, Reason},
State#ch{state = terminating}.
-return_queue_declare_ok(State, NoWait, Q) ->
- NewState = State#ch{most_recently_declared_queue =
- (Q#amqqueue.name)#resource.name},
+return_queue_declare_ok(#resource{name = ActualName},
+ NoWait, MessageCount, ConsumerCount, State) ->
+ NewState = State#ch{most_recently_declared_queue = ActualName},
case NoWait of
true -> {noreply, NewState};
- false ->
- {ok, ActualName, MessageCount, ConsumerCount} =
- rabbit_misc:with_exit_handler(
- fun () -> {ok, Q#amqqueue.name, 0, 0} end,
- fun () -> rabbit_amqqueue:stat(Q) end),
- Reply = #'queue.declare_ok'{queue = ActualName#resource.name,
- message_count = MessageCount,
- consumer_count = ConsumerCount},
- {reply, Reply, NewState}
+ false -> Reply = #'queue.declare_ok'{queue = ActualName,
+ message_count = MessageCount,
+ consumer_count = ConsumerCount},
+ {reply, Reply, NewState}
end.
check_resource_access(Username, Resource, Perm) ->
@@ -329,19 +324,6 @@ check_write_permitted(Resource, #ch{ username = Username}) ->
check_read_permitted(Resource, #ch{ username = Username}) ->
check_resource_access(Username, Resource, read).
-check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) ->
- ok;
-check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) ->
- ok;
-check_exclusive_access(#amqqueue{name = QName}, _ReaderPid, _MatchType) ->
- rabbit_misc:protocol_error(
- resource_locked,
- "cannot obtain exclusive access to locked ~s", [rabbit_misc:rs(QName)]).
-
-with_exclusive_access_or_die(QName, ReaderPid, F) ->
- rabbit_amqqueue:with_or_die(
- QName, fun (Q) -> check_exclusive_access(Q, ReaderPid, lax), F(Q) end).
-
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
not_found, "no previously declared queue", []);
@@ -444,12 +426,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Exchange,
rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
case RoutingRes of
- routed ->
- ok;
- unroutable ->
- ok = basic_return(Message, WriterPid, no_route);
- not_delivered ->
- ok = basic_return(Message, WriterPid, no_consumers)
+ routed -> ok;
+ unroutable -> ok = basic_return(Message, WriterPid, no_route);
+ not_delivered -> ok = basic_return(Message, WriterPid, no_consumers)
end,
{noreply, case TxnKey of
none -> State;
@@ -480,7 +459,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
next_tag = DeliveryTag }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- case with_exclusive_access_or_die(
+ case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
@@ -499,7 +478,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
Content),
{noreply, State1#ch{next_tag = DeliveryTag + 1}};
empty ->
- {reply, #'basic.get_empty'{cluster_id = <<>>}, State}
+ {reply, #'basic.get_empty'{}, State}
end;
handle_method(#'basic.consume'{queue = QueueNameBin,
@@ -524,7 +503,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
%% We get the queue process to send the consume_ok on our
%% behalf. This is for symmetry with basic.cancel - see
%% the comment in that method for why.
- case with_exclusive_access_or_die(
+ case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) ->
rabbit_amqqueue:basic_consume(
@@ -716,7 +695,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
exclusive = ExclusiveDeclare,
auto_delete = AutoDelete,
nowait = NoWait,
- arguments = Args},
+ arguments = Args} = Declare,
_, State = #ch{virtual_host = VHostPath,
reader_pid = ReaderPid,
queue_collector_pid = CollectorPid}) ->
@@ -724,37 +703,40 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
true -> ReaderPid;
false -> none
end,
- %% We use this in both branches, because queue_declare may yet return an
- %% existing queue.
ActualNameBin = case QueueNameBin of
<<>> -> rabbit_guid:binstring_guid("amq.gen");
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- Q = case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
- Args, Owner) of
- #amqqueue{name = QueueName,
- durable = Durable1,
- auto_delete = AutoDelete1} = Q1
- when Durable =:= Durable1, AutoDelete =:= AutoDelete1 ->
- check_exclusive_access(Q1, Owner, strict),
- check_configure_permitted(QueueName, State),
- %% We need to notify the reader within the channel
- %% process so that we can be sure there are no
- %% outstanding exclusive queues being declared as the
- %% connection shuts down.
- case Owner of
- none -> ok;
- _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q1)
- end,
- Q1;
- %% non-equivalence trumps exclusivity arbitrarily
- #amqqueue{name = QueueName} ->
- rabbit_misc:protocol_error(
- precondition_failed, "parameters for ~s not equivalent",
- [rabbit_misc:rs(QueueName)])
- end,
- return_queue_declare_ok(State, NoWait, Q);
+ check_configure_permitted(QueueName, State),
+ case rabbit_amqqueue:with(
+ QueueName,
+ fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
+ Q, Durable, AutoDelete, Args, Owner),
+ rabbit_amqqueue:stat(Q)
+ end) of
+ {ok, MessageCount, ConsumerCount} ->
+ return_queue_declare_ok(QueueName, NoWait, MessageCount,
+ ConsumerCount, State);
+ {error, not_found} ->
+ case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
+ Args, Owner) of
+ {new, Q = #amqqueue{}} ->
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as
+ %% the connection shuts down.
+ ok = case Owner of
+ none -> ok;
+ _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
+ end,
+ return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
+ {existing, _Q} ->
+ %% must have been created between the stat and the
+ %% declare. Loop around again.
+ handle_method(Declare, none, State)
+ end
+ end;
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
@@ -763,8 +745,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
reader_pid = ReaderPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun (Q) -> Q end),
- return_queue_declare_ok(State, NoWait, Q);
+ {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
+ rabbit_amqqueue:with_or_die(
+ QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
+ ok = rabbit_amqqueue:check_exclusive_access(Q, ReaderPid),
+ return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
+ State);
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
@@ -773,7 +759,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
_, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
- case with_exclusive_access_or_die(
+ case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
@@ -809,7 +795,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
_, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- {ok, PurgedMessageCount} = with_exclusive_access_or_die(
+ {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:purge(Q) end),
return_ok(State, NoWait,
@@ -917,7 +903,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments,
- fun (_X, Q) -> check_exclusive_access(Q, ReaderPid, lax) end) of
+ fun (_X, Q) ->
+ rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
+ end) of
{error, exchange_not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, queue_not_found} ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 7072055cde..bd9d3d2926 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -198,7 +198,7 @@ assert_equivalence(X = #exchange{ durable = Durable,
assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
_Args) ->
rabbit_misc:protocol_error(
- precondition_failed,
+ not_allowed,
"cannot redeclare ~s with different type, durable or autodelete value",
[rabbit_misc:rs(Name)]).
@@ -215,7 +215,7 @@ assert_args_equivalence(#exchange{ name = Name,
Ae2 = alternate_exchange_value(Args),
if Ae1==Ae2 -> ok;
true -> rabbit_misc:protocol_error(
- precondition_failed,
+ not_allowed,
"cannot redeclare ~s with inequivalent args",
[rabbit_misc:rs(Name)])
end.
@@ -335,7 +335,7 @@ delete_queue_bindings(QueueName, FwdDeleteFun) ->
Module = type_to_module(Type),
case IsDeleted of
auto_deleted -> Module:delete(X, Bs);
- no_delete -> Module:remove_bindings(X, Bs)
+ not_deleted -> Module:remove_bindings(X, Bs)
end
end, Cleanup)
end.
@@ -438,11 +438,11 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
end) of
Err = {error, _} ->
Err;
- {{Action, X = #exchange{ type = Type }}, B} ->
+ {{IsDeleted, X = #exchange{ type = Type }}, B} ->
Module = type_to_module(Type),
- case Action of
- auto_delete -> Module:delete(X, [B]);
- no_delete -> Module:remove_bindings(X, [B])
+ case IsDeleted of
+ auto_deleted -> Module:delete(X, [B]);
+ not_deleted -> Module:remove_bindings(X, [B])
end
end.
@@ -526,10 +526,10 @@ delete(ExchangeName, IfUnused) ->
end.
maybe_auto_delete(Exchange = #exchange{auto_delete = false}) ->
- {no_delete, Exchange};
+ {not_deleted, Exchange};
maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
case conditional_delete(Exchange) of
- {error, in_use} -> {no_delete, Exchange};
+ {error, in_use} -> {not_deleted, Exchange};
{deleted, Exchange, []} -> {auto_deleted, Exchange}
end.
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index b7c6aa96fa..bc1a2a0835 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -76,29 +76,27 @@ mainloop(ChannelPid) ->
{method, MethodName, FieldsBin} = read_frame(ChannelPid),
Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin),
case rabbit_framing:method_has_content(MethodName) of
- true -> rabbit_channel:do(ChannelPid, Method,
- collect_content(ChannelPid, MethodName));
+ true -> {ClassId, _MethodId} = rabbit_framing:method_id(MethodName),
+ rabbit_channel:do(ChannelPid, Method,
+ collect_content(ChannelPid, ClassId));
false -> rabbit_channel:do(ChannelPid, Method)
end,
?MODULE:mainloop(ChannelPid).
-collect_content(ChannelPid, MethodName) ->
- {ClassId, _MethodId} = rabbit_framing:method_id(MethodName),
+collect_content(ChannelPid, ClassId) ->
case read_frame(ChannelPid) of
- {content_header, HeaderClassId, 0, BodySize, PropertiesBin} ->
- if HeaderClassId == ClassId ->
- Payload = collect_content_payload(ChannelPid, BodySize, []),
- #content{class_id = ClassId,
- properties = none,
- properties_bin = PropertiesBin,
- payload_fragments_rev = Payload};
- true ->
- rabbit_misc:protocol_error(
- command_invalid,
- "expected content header for class ~w, "
- "got one for class ~w instead",
- [ClassId, HeaderClassId])
- end;
+ {content_header, ClassId, 0, BodySize, PropertiesBin} ->
+ Payload = collect_content_payload(ChannelPid, BodySize, []),
+ #content{class_id = ClassId,
+ properties = none,
+ properties_bin = PropertiesBin,
+ payload_fragments_rev = Payload};
+ {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "expected content header for class ~w, "
+ "got one for class ~w instead",
+ [ClassId, HeaderClassId]);
_ ->
rabbit_misc:protocol_error(
command_invalid,
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 1f61111ccd..459c0fb67b 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -350,10 +350,13 @@ blank_state(QueueName) ->
StrName = queue_name_to_dir_name(QueueName),
Dir = filename:join(queues_dir(), StrName),
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
- #qistate { dir = Dir,
- segments = segments_new(),
- journal_handle = undefined,
- dirty_count = 0 }.
+ {ok, MaxJournal} =
+ application:get_env(rabbit, queue_index_max_journal_entries),
+ #qistate { dir = Dir,
+ segments = segments_new(),
+ journal_handle = undefined,
+ dirty_count = 0,
+ max_journal_entries = MaxJournal }.
detect_clean_shutdown(Dir) ->
case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f2a903dcca..a54e0de97d 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -678,11 +678,13 @@ handle_method0(#'connection.close'{},
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
-handle_method0(#'connection.close'{}, State = #v1{connection_state = CS})
+handle_method0(#'connection.close'{},
+ State = #v1{connection_state = CS,
+ sock = Sock})
when CS =:= closing; CS =:= closed ->
%% We're already closed or closing, so we don't need to cleanup
%% anything.
- ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}),
State;
handle_method0(#'connection.close_ok'{},
State = #v1{connection_state = closed}) ->
@@ -805,8 +807,8 @@ map_exception(Channel, Reason) ->
ShouldClose = SuggestedClose or (Channel == 0),
{ClassId, MethodId} = case FailedMethod of
{_, _} -> FailedMethod;
- none -> {0, 0};
- _ -> rabbit_framing:method_id(FailedMethod)
+ none -> {0, 0};
+ _ -> rabbit_framing:method_id(FailedMethod)
end,
{CloseChannel, CloseMethod} =
case ShouldClose of
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 5cd15a9462..75196bc0d4 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -90,13 +90,13 @@ match_routing_key(Name, RoutingKey) ->
lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
lookup_qpids(Queues) ->
- sets:fold(
+ lists:foldl(
fun (Key, Acc) ->
case mnesia:dirty_read({rabbit_queue, Key}) of
[#amqqueue{pid = QPid}] -> [QPid | Acc];
[] -> Acc
end
- end, [], sets:from_list(Queues)).
+ end, [], lists:usort(Queues)).
%%--------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index cffd0e7fd9..dd6a90892a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -517,42 +517,38 @@ test_field_values() ->
passed.
%% Test that content frames don't exceed frame-max
-test_content_framing(FrameMax, Fragments) ->
+test_content_framing(FrameMax, BodyBin) ->
[Header | Frames] =
rabbit_binary_generator:build_simple_content_frames(
1,
- #content{class_id = 0, properties_bin = <<>>,
- payload_fragments_rev = Fragments},
+ rabbit_binary_generator:ensure_content_encoded(
+ rabbit_basic:build_content(#'P_basic'{}, BodyBin)),
FrameMax),
%% header is formatted correctly and the size is the total of the
%% fragments
<<_FrameHeader:7/binary, _ClassAndWeight:4/binary,
BodySize:64/unsigned, _Rest/binary>> = list_to_binary(Header),
- BodySize = size(list_to_binary(Fragments)),
- false = lists:any(
- fun (ContentFrame) ->
- FrameBinary = list_to_binary(ContentFrame),
- %% assert
- <<_TypeAndChannel:3/binary,
- Size:32/unsigned,
- _Payload:Size/binary,
- 16#CE>> = FrameBinary,
- size(FrameBinary) > FrameMax
- end,
- Frames),
+ BodySize = size(BodyBin),
+ true = lists:all(
+ fun (ContentFrame) ->
+ FrameBinary = list_to_binary(ContentFrame),
+ %% assert
+ <<_TypeAndChannel:3/binary,
+ Size:32/unsigned, _Payload:Size/binary, 16#CE>> =
+ FrameBinary,
+ size(FrameBinary) =< FrameMax
+ end, Frames),
passed.
test_content_framing() ->
%% no content
- passed = test_content_framing(4096, []),
- passed = test_content_framing(4096, [<<>>]),
+ passed = test_content_framing(4096, <<>>),
%% easily fit in one frame
- passed = test_content_framing(4096, [<<"Easy">>]),
+ passed = test_content_framing(4096, <<"Easy">>),
%% exactly one frame (empty frame = 8 bytes)
- passed = test_content_framing(11, [<<"One">>]),
+ passed = test_content_framing(11, <<"One">>),
%% more than one frame
- passed = test_content_framing(20, [<<"into more than one frame">>,
- <<"This will have to go">>]),
+ passed = test_content_framing(11, <<"More than one frame">>),
passed.
test_topic_match(P, R) ->
@@ -954,10 +950,11 @@ test_server_status() ->
Writer = spawn(fun () -> receive shutdown -> ok end end),
Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
self()),
- [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare(
+ [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
+ {new, Queue = #amqqueue{}} <-
+ [rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, Name),
- false, false, [], none) ||
- Name <- [<<"foo">>, <<"bar">>]],
+ false, false, [], none)]],
ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined,
<<"ctag">>, true, undefined),
@@ -1114,11 +1111,7 @@ test_memory_pressure() ->
ok = test_memory_pressure_receive_flow(true),
%% if we publish at this point, the channel should die
- Content = #content{class_id = element(1, rabbit_framing:method_id(
- 'basic.publish')),
- properties = none,
- properties_bin = <<>>,
- payload_fragments_rev = []},
+ Content = rabbit_basic:build_content(#'P_basic'{}, <<>>),
ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content),
expect_normal_channel_termination(MRef0, Ch0),
@@ -1627,7 +1620,7 @@ test_queue_index() ->
MostOfASegment = trunc(SegmentSize*0.75),
stop_msg_store(),
ok = empty_test_queue(),
- SeqIdsA = lists:seq(0,MostOfASegment-1),
+ SeqIdsA = lists:seq(0, MostOfASegment-1),
SeqIdsB = lists:seq(MostOfASegment, 2*MostOfASegment),
{0, _Terms, Qi0} = test_queue_init(),
{0, 0, Qi1} = rabbit_queue_index:bounds(Qi0),
@@ -1900,17 +1893,14 @@ variable_queue_wait_for_shuffling_end(VQ) ->
test_queue_recover() ->
Count = 2*rabbit_queue_index:next_segment_boundary(0),
TxID = rabbit_guid:guid(),
- #amqqueue { pid = QPid, name = QName } =
+ {new, #amqqueue { pid = QPid, name = QName }} =
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
- Msg = fun() -> rabbit_basic:message(
- rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, #'P_basic'{delivery_mode = 2}, <<>>) end,
- Delivery = #delivery{mandatory = false,
- immediate = false,
- txn = TxID,
- sender = self(),
- message = Msg()},
- [true = rabbit_amqqueue:deliver(QPid, Delivery) || _ <- lists:seq(1, Count)],
+ Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
+ <<>>, #'P_basic'{delivery_mode = 2}, <<>>),
+ Delivery = #delivery{mandatory = false, immediate = false, txn = TxID,
+ sender = self(), message = Msg},
+ [true = rabbit_amqqueue:deliver(QPid, Delivery) ||
+ _ <- lists:seq(1, Count)],
rabbit_amqqueue:commit_all([QPid], TxID, self()),
exit(QPid, kill),
MRef = erlang:monitor(process, QPid),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b7d2460bfd..5893385aa1 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -111,21 +111,18 @@
%% should be very few betas remaining, thus the transition is fast (no
%% work needs to be done for the gamma -> delta transition).
%%
-%% The conversion of betas to gammas is done on publish, in batches of
-%% exactly ?RAM_INDEX_BATCH_SIZE. This value should not be too small,
-%% otherwise the frequent operations on the queues of q2 and q3 will
-%% not be effectively amortised, nor should it be too big, otherwise a
-%% publish will take too long as it attempts to do too much work and
-%% thus stalls the queue. Therefore, it must be just right. This
-%% approach is preferable to doing work on a new queue-duration
-%% because converting all the indicated betas to gammas at that point
-%% can be far too expensive, thus requiring batching and segmented
-%% work anyway, and furthermore, if we're not getting any publishes
-%% anyway then the queue is either being drained or has no
-%% consumers. In the latter case, an expensive beta to delta
-%% transition doesn't matter, and in the former case the queue's
-%% shrinking length makes it unlikely (though not impossible) that the
-%% duration will become 0.
+%% The conversion of betas to gammas is done on all actions that can
+%% increase the message count, such as publish and requeue, and when
+%% the queue is asked to reduce its memory usage. The conversion is
+%% done in batches of exactly ?RAM_INDEX_BATCH_SIZE. This value should
+%% not be too small, otherwise the frequent operations on the queues
+%% of q2 and q3 will not be effectively amortised (switching the
+%% direction of queue access defeats amortisation), nor should it be
+%% too big, otherwise converting a batch stalls the queue for too
+%% long. Therefore, it must be just right. This approach is preferable
+%% to doing work on a new queue-duration because converting all the
+%% indicated betas to gammas at that point can be far too expensive,
+%% thus requiring batching and segmented work anyway.
%%
%% In the queue we only keep track of messages that are pending
%% delivery. This is fine for queue purging, but can be expensive for
@@ -394,6 +391,9 @@ terminate(State) ->
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
delete_and_terminate(State) ->
+ %% TODO: there is no need to interact with qi at all - which we do
+ %% as part of 'purge' and 'remove_pending_ack', other than
+ %% deleting it.
{_PurgeCount, State1} = purge(State),
State2 = #vqstate { index_state = IndexState,
msg_store_clients = {{MSCStateP, PRef},
@@ -412,6 +412,9 @@ delete_and_terminate(State) ->
msg_store_clients = undefined }).
purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
+ %% TODO: when there are no pending acks, which is a common case,
+ %% we could simply wipe the qi instead of issuing delivers and
+ %% acks for all the messages.
IndexState1 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4,
IndexState),
State1 = #vqstate { q1 = Q1, index_state = IndexState2 } =
@@ -1139,6 +1142,10 @@ limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) ->
{Q2a, {Reduction1, IndexState1}} =
limit_ram_index(fun bpqueue:map_fold_filter_l/4,
Q2, {Reduction, IndexState}),
+ %% TODO: we shouldn't be writing index
+ %% entries for messages that can never end up
+ %% in delta due them residing in the only
+ %% segment held by q3.
{Q3a, {Reduction2, IndexState2}} =
limit_ram_index(fun bpqueue:map_fold_filter_r/4,
Q3, {Reduction1, IndexState1}),
@@ -1152,9 +1159,8 @@ limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) ->
State
end.
-limit_ram_index(_MapFoldFilterFun, Q, {Reduction, IndexState})
- when Reduction == 0 ->
- {Q, {Reduction, IndexState}};
+limit_ram_index(_MapFoldFilterFun, Q, {0, IndexState}) ->
+ {Q, {0, IndexState}};
limit_ram_index(MapFoldFilterFun, Q, {Reduction, IndexState}) ->
MapFoldFilterFun(
fun erlang:'not'/1,
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 3d10dc121e..233d72913c 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -149,6 +149,7 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
shutdown(W) ->
W ! shutdown,
+ rabbit_misc:unlink_and_capture_exit(W),
ok.
%---------------------------------------------------------------------------