diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/priority_queue.erl | 40 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_guid.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 77 |
8 files changed, 185 insertions, 37 deletions
diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 732757c41c..c74b39a957 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -55,7 +55,8 @@ -module(priority_queue). --export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]). +-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, + out/1, join/2]). %%---------------------------------------------------------------------------- @@ -73,6 +74,7 @@ -spec(in/2 :: (any(), pqueue()) -> pqueue()). -spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). -spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}). +-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()). -endif. @@ -147,6 +149,42 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. +join(A, {queue, [], []}) -> + A; +join({queue, [], []}, B) -> + B; +join({queue, AIn, AOut}, {queue, BIn, BOut}) -> + {queue, BIn, AOut ++ lists:reverse(AIn, BOut)}; +join(A = {queue, _, _}, {pqueue, BPQ}) -> + {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ), + Post1 = case Post of + [] -> [ {0, A} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ]; + _ -> [ {0, A} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, B = {queue, _, _}) -> + {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ), + Post1 = case Post of + [] -> [ {0, B} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ]; + _ -> [ {0, B} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, {pqueue, BPQ}) -> + {pqueue, merge(APQ, BPQ, [])}. + +merge([], BPQ, Acc) -> + lists:reverse(Acc, BPQ); +merge(APQ, [], Acc) -> + lists:reverse(Acc, APQ); +merge([{P, A}|As], [{P, B}|Bs], Acc) -> + merge(As, Bs, [ {P, join(A, B)} | Acc ]); +merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB -> + merge(As, Bs, [ {PA, A} | Acc ]); +merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> + merge(As, Bs, [ {PB, B} | Acc ]). + r2f([]) -> {queue, [], []}; r2f([_] = R) -> {queue, [], R}; r2f([X,Y]) -> {queue, [X], [Y]}; diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4903c2c57f..f05f7880b7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -303,10 +303,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - gen_server2:cast(QPid, {notify_sent, ChPid}). + gen_server2:pcast(QPid, 8, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:cast(QPid, {unblock, ChPid}). + gen_server2:pcast(QPid, 8, {unblock, ChPid}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 58b9423460..16b7c938ca 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -157,6 +157,10 @@ handle_cast({conserve_memory, Conserve}, State) -> State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), noreply(State). +handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, + State = #ch{writer_pid = WriterPid}) -> + State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, + {stop, normal, State}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 2be005034e..b789fbd1e0 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -42,6 +42,7 @@ terminate/2, code_change/3]). -define(SERVER, ?MODULE). +-define(SERIAL_FILENAME, "rabbit_serial"). -record(state, {serial}). @@ -59,17 +60,28 @@ %%---------------------------------------------------------------------------- start_link() -> - %% The persister can get heavily loaded, and we don't want that to - %% impact guid generation. We therefore keep the serial in a - %% separate process rather than calling rabbit_persister:serial/0 - %% directly in the functions below. gen_server:start_link({local, ?SERVER}, ?MODULE, - [rabbit_persister:serial()], []). + [update_disk_serial()], []). + +update_disk_serial() -> + Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME), + Serial = case rabbit_misc:read_term_file(Filename) of + {ok, [Num]} -> Num; + {error, enoent} -> rabbit_persister:serial(); + {error, Reason} -> + throw({error, {cannot_read_serial_file, Filename, Reason}}) + end, + case rabbit_misc:write_term_file(Filename, [Serial + 1]) of + ok -> ok; + {error, Reason1} -> + throw({error, {cannot_write_serial_file, Filename, Reason1}}) + end, + Serial. %% generate a guid that is monotonically increasing per process. %% %% The id is only unique within a single cluster and as long as the -%% persistent message store hasn't been deleted. +%% serial store hasn't been deleted. guid() -> %% We don't use erlang:now() here because a) it may return %% duplicates when the system clock has been rewound prior to a @@ -77,7 +89,7 @@ guid() -> %% now() to move ahead of the system time), and b) it is really %% slow since it takes a global lock and makes a system call. %% - %% rabbit_persister:serial/0, in combination with self/0 (which + %% A persisted serial number, in combination with self/0 (which %% includes the node name) uniquely identifies a process in space %% and time. We combine that with a process-local counter to give %% us a GUID that is monotonically increasing per process. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index abf4c7ccfa..95a274e37e 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -50,9 +50,11 @@ -export([intersperse/2, upmap/2, map_in_order/2]). -export([table_foreach/2]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). +-export([read_term_file/1, write_term_file/2]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). +-export([unfold/2, ceil/1]). -import(mnesia). -import(lists). @@ -65,6 +67,8 @@ -include_lib("kernel/include/inet.hrl"). +-type(ok_or_error() :: 'ok' | {'error', any()}). + -spec(method_record_type/1 :: (tuple()) -> atom()). -spec(polite_pause/0 :: () -> 'done'). -spec(polite_pause/1 :: (non_neg_integer()) -> 'done'). @@ -88,9 +92,9 @@ -spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) -> undefined | r(K) when is_subtype(K, atom())). -spec(rs/1 :: (r(atom())) -> string()). --spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). +-spec(enable_cover/0 :: () -> ok_or_error()). -spec(report_cover/0 :: () -> 'ok'). --spec(enable_cover/1 :: (string()) -> 'ok' | {'error', any()}). +-spec(enable_cover/1 :: (string()) -> ok_or_error()). -spec(report_cover/1 :: (string()) -> 'ok'). -spec(throw_on_error/2 :: (atom(), thunk({error, any()} | {ok, A} | A)) -> A). @@ -100,7 +104,7 @@ -spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). -spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A). -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). --spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok'). +-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). -spec(localnode/1 :: (atom()) -> erlang_node()). -spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()). -spec(intersperse/2 :: (A, [A]) -> [A]). @@ -110,12 +114,16 @@ -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). --spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}). --spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}). +-spec(dirty_dump_log/1 :: (string()) -> ok_or_error()). +-spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}). +-spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()). +-spec(append_file/2 :: (string(), string()) -> ok_or_error()). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). +-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). +-spec(ceil/1 :: (number()) -> number()). -endif. @@ -360,7 +368,9 @@ dirty_foreach_key1(F, TableName, K) -> end. dirty_dump_log(FileName) -> - {ok, LH} = disk_log:open([{name, dirty_dump_log}, {mode, read_only}, {file, FileName}]), + {ok, LH} = disk_log:open([{name, dirty_dump_log}, + {mode, read_only}, + {file, FileName}]), dirty_dump_log1(LH, disk_log:chunk(LH, start)), disk_log:close(LH). @@ -374,6 +384,12 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) -> dirty_dump_log1(LH, disk_log:chunk(LH, K)). +read_term_file(File) -> file:consult(File). + +write_term_file(File, Terms) -> + file:write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) || + Term <- Terms])). + append_file(File, Suffix) -> case file:read_file_info(File) of {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix); @@ -444,3 +460,18 @@ stop_applications(Apps) -> cannot_stop_application, Apps). +unfold(Fun, Init) -> + unfold(Fun, [], Init). + +unfold(Fun, Acc, Init) -> + case Fun(Init) of + {true, E, I} -> unfold(Fun, [E|Acc], I); + false -> {Acc, Init} + end. + +ceil(N) -> + T = trunc(N), + case N - T of + 0 -> N; + _ -> 1 + T + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 575ecb0adc..74856c4146 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -192,28 +192,16 @@ cluster_nodes_config_filename() -> create_cluster_nodes_config(ClusterNodes) -> FileName = cluster_nodes_config_filename(), - Handle = case file:open(FileName, [write]) of - {ok, Device} -> Device; - {error, Reason} -> - throw({error, {cannot_create_cluster_nodes_config, - FileName, Reason}}) - end, - try - ok = io:write(Handle, ClusterNodes), - ok = io:put_chars(Handle, [$.]) - after - case file:close(Handle) of - ok -> ok; - {error, Reason1} -> - throw({error, {cannot_close_cluster_nodes_config, - FileName, Reason1}}) - end - end, - ok. + case rabbit_misc:write_term_file(FileName, [ClusterNodes]) of + ok -> ok; + {error, Reason} -> + throw({error, {cannot_create_cluster_nodes_config, + FileName, Reason}}) + end. read_cluster_nodes_config() -> FileName = cluster_nodes_config_filename(), - case file:consult(FileName) of + case rabbit_misc:read_term_file(FileName) of {ok, [ClusterNodes]} -> ClusterNodes; {error, enoent} -> case application:get_env(cluster_config) of diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 426b99eba1..7be9281258 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -286,7 +286,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> %% since this termination is initiated by our parent it is %% probably more important to exit quickly. exit(Reason); - {'EXIT', _Pid, E = {writer, send_failed, _Error}} -> + {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> throw(E); {channel_exit, Channel, Reason} -> mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e5100ccd16..8f278c2d03 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -49,6 +49,7 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> passed = test_priority_queue(), + passed = test_unfold(), passed = test_parsing(), passed = test_topic_matching(), passed = test_log_management(), @@ -75,7 +76,8 @@ test_priority_queue() -> %% 1-element priority Q Q1 = priority_queue:in(foo, 1, priority_queue:new()), - {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1), + {true, false, 1, [{1, foo}], [foo]} = + test_priority_queue(Q1), %% 2-element same-priority Q Q2 = priority_queue:in(bar, 1, Q1), @@ -91,6 +93,71 @@ test_priority_queue() -> Q4 = priority_queue:in(foo, -1, priority_queue:new()), {true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4), + %% merge 2 * 1-element no-priority Qs + Q5 = priority_queue:join(priority_queue:in(foo, Q), + priority_queue:in(bar, Q)), + {true, false, 2, [{0, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q5), + + %% merge 1-element no-priority Q with 1-element priority Q + Q6 = priority_queue:join(priority_queue:in(foo, Q), + priority_queue:in(bar, 1, Q)), + {true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} = + test_priority_queue(Q6), + + %% merge 1-element priority Q with 1-element no-priority Q + Q7 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, Q)), + {true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q7), + + %% merge 2 * 1-element same-priority Qs + Q8 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, 1, Q)), + {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} = + test_priority_queue(Q8), + + %% merge 2 * 1-element different-priority Qs + Q9 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, 2, Q)), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q9), + + %% merge 2 * 1-element different-priority Qs (other way around) + Q10 = priority_queue:join(priority_queue:in(bar, 2, Q), + priority_queue:in(foo, 1, Q)), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q10), + + %% merge 2 * 2-element multi-different-priority Qs + Q11 = priority_queue:join(Q6, Q5), + {true, false, 4, [{1, bar}, {0, foo}, {0, foo}, {0, bar}], + [bar, foo, foo, bar]} = test_priority_queue(Q11), + + %% and the other way around + Q12 = priority_queue:join(Q5, Q6), + {true, false, 4, [{1, bar}, {0, foo}, {0, bar}, {0, foo}], + [bar, foo, bar, foo]} = test_priority_queue(Q12), + + %% merge with negative priorities + Q13 = priority_queue:join(Q4, Q5), + {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} = + test_priority_queue(Q13), + + %% and the other way around + Q14 = priority_queue:join(Q5, Q4), + {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} = + test_priority_queue(Q14), + + %% joins with empty queues: + Q1 = priority_queue:join(Q, Q1), + Q1 = priority_queue:join(Q1, Q), + + %% insert with priority into non-empty zero-priority queue + Q15 = priority_queue:in(baz, 1, Q5), + {true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} = + test_priority_queue(Q15), + passed. priority_queue_in_all(Q, L) -> @@ -116,6 +183,14 @@ test_simple_n_element_queue(N) -> {true, false, N, ToListRes, Items} = test_priority_queue(Q), passed. +test_unfold() -> + {[], test} = rabbit_misc:unfold(fun (_V) -> false end, test), + List = lists:seq(2,20,2), + {List, 0} = rabbit_misc:unfold(fun (0) -> false; + (N) -> {true, N*2, N-1} + end, 10), + passed. + test_parsing() -> passed = test_content_properties(), passed. |
