diff options
| author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-06-12 14:42:20 +0100 |
|---|---|---|
| committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-06-12 14:42:20 +0100 |
| commit | 72de3898771d339c0475d1ae04e0d41b44726c55 (patch) | |
| tree | 1d905d3d56df55e1761447798a4b2aeb02c26e6d /src | |
| parent | c3caad5cb996d01308f825fb8896c4160dbdee36 (diff) | |
| parent | 9ed260c716bd3a5dd62cfa8fa47d521da66820f6 (diff) | |
| download | rabbitmq-server-git-72de3898771d339c0475d1ae04e0d41b44726c55.tar.gz | |
merge default
Diffstat (limited to 'src')
| -rw-r--r-- | src/gm.erl | 87 | ||||
| -rw-r--r-- | src/rabbit.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 47 |
5 files changed, 77 insertions, 99 deletions
diff --git a/src/gm.erl b/src/gm.erl index 97c81ec635..30fcdc5d86 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -876,11 +876,9 @@ flush_broadcast_buffer(State = #state { self = Self, %% View construction and inspection %% --------------------------------------------------------------------------- -needs_view_update(ReqVer, {Ver, _View}) -> - Ver < ReqVer. +needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer. -view_version({Ver, _View}) -> - Ver. +view_version({Ver, _View}) -> Ver. is_member_alive({dead, _Member}) -> false; is_member_alive(_) -> true. @@ -899,17 +897,13 @@ store_view_member(VMember = #view_member { id = Id }, {Ver, View}) -> with_view_member(Fun, View, Id) -> store_view_member(Fun(fetch_view_member(Id, View)), View). -fetch_view_member(Id, {_Ver, View}) -> - ?DICT:fetch(Id, View). +fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View). -find_view_member(Id, {_Ver, View}) -> - ?DICT:find(Id, View). +find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View). -blank_view(Ver) -> - {Ver, ?DICT:new()}. +blank_view(Ver) -> {Ver, ?DICT:new()}. -alive_view_members({_Ver, View}) -> - ?DICT:fetch_keys(View). +alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View). all_known_members({_Ver, View}) -> ?DICT:fold( @@ -1150,10 +1144,8 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> end, {Neighbour, maybe_monitor(Neighbour, Self)}. -maybe_monitor(Self, Self) -> - undefined; -maybe_monitor(Other, _Self) -> - erlang:monitor(process, get_pid(Other)). +maybe_monitor( Self, Self) -> undefined; +maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)). check_neighbours(State = #state { self = Self, left = Left, @@ -1242,23 +1234,19 @@ find_member_or_blank(Id, MembersState) -> error -> blank_member() end. -erase_member(Id, MembersState) -> - ?DICT:erase(Id, MembersState). +erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState). blank_member() -> #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }. -blank_member_state() -> - ?DICT:new(). +blank_member_state() -> ?DICT:new(). store_member(Id, MemberState, MembersState) -> ?DICT:store(Id, MemberState, MembersState). -prepare_members_state(MembersState) -> - ?DICT:to_list(MembersState). +prepare_members_state(MembersState) -> ?DICT:to_list(MembersState). -build_members_state(MembersStateList) -> - ?DICT:from_list(MembersStateList). +build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList). make_member(GroupName) -> {case read_group(GroupName) of @@ -1280,16 +1268,12 @@ get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids]. %% Activity assembly %% --------------------------------------------------------------------------- -activity_nil() -> - queue:new(). +activity_nil() -> queue:new(). -activity_cons(_Id, [], [], Tail) -> - Tail; -activity_cons(Sender, Pubs, Acks, Tail) -> - queue:in({Sender, Pubs, Acks}, Tail). +activity_cons( _Id, [], [], Tail) -> Tail; +activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail). -activity_finalise(Activity) -> - queue:to_list(Activity). +activity_finalise(Activity) -> queue:to_list(Activity). maybe_send_activity([], _State) -> ok; @@ -1393,34 +1377,25 @@ purge_confirms(Confirms) -> %% Msg transformation %% --------------------------------------------------------------------------- -acks_from_queue(Q) -> - [PubNum || {PubNum, _Msg} <- queue:to_list(Q)]. +acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)]. -pubs_from_queue(Q) -> - queue:to_list(Q). +pubs_from_queue(Q) -> queue:to_list(Q). -queue_from_pubs(Pubs) -> - queue:from_list(Pubs). +queue_from_pubs(Pubs) -> queue:from_list(Pubs). -apply_acks([], Pubs) -> - Pubs; -apply_acks(List, Pubs) -> - {_, Pubs1} = queue:split(length(List), Pubs), - Pubs1. +apply_acks( [], Pubs) -> Pubs; +apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs), + Pubs1. join_pubs(Q, []) -> Q; join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)). -last_ack([], LA) -> - LA; -last_ack(List, LA) -> - LA1 = lists:last(List), - true = LA1 > LA, %% ASSERTION - LA1. - -last_pub([], LP) -> - LP; -last_pub(List, LP) -> - {PubNum, _Msg} = lists:last(List), - true = PubNum > LP, %% ASSERTION - PubNum. +last_ack( [], LA) -> LA; +last_ack(List, LA) -> LA1 = lists:last(List), + true = LA1 > LA, %% ASSERTION + LA1. + +last_pub( [], LP) -> LP; +last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List), + true = PubNum > LP, %% ASSERTION + PubNum. diff --git a/src/rabbit.erl b/src/rabbit.erl index a9af7335b9..0d2e27b931 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -199,7 +199,8 @@ rabbit_queue_index, gen, dict, ordsets, file_handle_cache, rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file, rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, - mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon]). + mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon, + ssl_connection, ssl_record, gen_fsm, ssl]). %% HiPE compilation uses multiple cores anyway, but some bits are %% IO-bound so we can go faster if we parallelise a bit more. In @@ -263,7 +264,7 @@ maybe_hipe_compile() -> hipe_compile() -> Count = length(?HIPE_WORTHY), - io:format("HiPE compiling: |~s|~n |", + io:format("~nHiPE compiling: |~s|~n |", [string:copies("-", Count)]), T1 = erlang:now(), PidMRefs = [spawn_monitor(fun () -> [begin diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 422458a95e..d927206bdb 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -505,12 +505,14 @@ wait_for_process_death(Pid) -> read_pid_file(PidFile, Wait) -> case {file:read_file(PidFile), Wait} of {{ok, Bin}, _} -> - S = re:replace(Bin, "\\s", "", [global, {return, list}]), - try list_to_integer(S) + S = binary_to_list(Bin), + {match, [PidS]} = re:run(S, "[^\\s]+", + [{capture, all, list}]), + try list_to_integer(PidS) catch error:badarg -> exit({error, {garbage_in_pid_file, PidFile}}) end, - S; + PidS; {{error, enoent}, true} -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), read_pid_file(PidFile, Wait); @@ -522,8 +524,7 @@ read_pid_file(PidFile, Wait) -> % rpc:call(os, getpid, []) at this point process_up(Pid) -> with_os([{unix, fun () -> - system("ps -p " ++ Pid - ++ " >/dev/null 2>&1") =:= 0 + run_ps(Pid) =:= 0 end}, {win32, fun () -> Res = os:cmd("tasklist /nh /fi \"pid eq " ++ @@ -541,15 +542,17 @@ with_os(Handlers) -> Handler -> Handler() end. -% Like system(3) -system(Cmd) -> - ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'", - Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]), - receive {Port, {exit_status, Status}} -> Status end. +run_ps(Pid) -> + Port = erlang:open_port({spawn, "ps -p " ++ Pid}, + [exit_status, {line, 16384}, + use_stdio, stderr_to_stdout]), + exit_loop(Port). -% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'" -escape_quotes(Cmd) -> - lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). +exit_loop(Port) -> + receive + {Port, {exit_status, Rc}} -> Rc; + {Port, _} -> exit_loop(Port) + end. format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 17e2ffb472..71e0507a33 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -354,7 +354,10 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) -> noreply(State); handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> - noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }). + noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }); + +handle_cast({delete_and_terminate, Reason}, State) -> + {stop, Reason, State}. handle_info(send_gm_heartbeat, State = #state { gm = GM }) -> gm:broadcast(GM, heartbeat), @@ -402,6 +405,9 @@ handle_msg([CPid], _From, request_length = Msg) -> ok = gen_server2:cast(CPid, Msg); handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) -> ok = gen_server2:cast(CPid, Msg); +handle_msg([CPid], _From, {delete_and_terminate, Reason} = Msg) -> + ok = gen_server2:cast(CPid, Msg), + {stop, Reason}; handle_msg([_CPid], _From, _Msg) -> ok. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index dc5e6ef737..f4f7492100 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -29,6 +29,7 @@ -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). -define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>). +-define(TIMEOUT, 5000). all_tests() -> ok = setup_cluster(), @@ -1062,7 +1063,7 @@ test_spawn() -> rabbit_limiter:make_token(self())), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok - after 1000 -> throw(failed_to_receive_channel_open_ok) + after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok) end, {Writer, Ch}. @@ -1083,7 +1084,7 @@ test_spawn_remote() -> end end), receive Res -> Res - after 1000 -> throw(failed_to_receive_result) + after ?TIMEOUT -> throw(failed_to_receive_result) end. user(Username) -> @@ -1103,13 +1104,10 @@ test_confirms() -> queue = Q0, exchange = <<"amq.direct">>, routing_key = "magic" }), - receive #'queue.bind_ok'{} -> - Q0 - after 1000 -> - throw(failed_to_bind_queue) + receive #'queue.bind_ok'{} -> Q0 + after ?TIMEOUT -> throw(failed_to_bind_queue) end - after 1000 -> - throw(failed_to_declare_queue) + after ?TIMEOUT -> throw(failed_to_declare_queue) end end, %% Declare and bind two queues @@ -1122,7 +1120,7 @@ test_confirms() -> rabbit_channel:do(Ch, #'confirm.select'{}), receive #'confirm.select_ok'{} -> ok - after 1000 -> throw(failed_to_enable_confirms) + after ?TIMEOUT -> throw(failed_to_enable_confirms) end, %% Publish a message rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"amq.direct">>, @@ -1139,7 +1137,7 @@ test_confirms() -> receive #'basic.nack'{} -> ok; #'basic.ack'{} -> throw(received_ack_instead_of_nack) - after 2000 -> throw(did_not_receive_nack) + after ?TIMEOUT-> throw(did_not_receive_nack) end, receive #'basic.ack'{} -> throw(received_ack_when_none_expected) @@ -1149,7 +1147,7 @@ test_confirms() -> rabbit_channel:do(Ch, #'queue.delete'{queue = QName2}), receive #'queue.delete_ok'{} -> ok - after 1000 -> throw(failed_to_cleanup_queue) + after ?TIMEOUT -> throw(failed_to_cleanup_queue) end, unlink(Ch), ok = rabbit_channel:shutdown(Ch), @@ -1172,7 +1170,7 @@ test_statistics_receive_event1(Ch, Matcher) -> true -> Props; _ -> test_statistics_receive_event1(Ch, Matcher) end - after 1000 -> throw(failed_to_receive_event) + after ?TIMEOUT -> throw(failed_to_receive_event) end. test_statistics() -> @@ -1184,9 +1182,8 @@ test_statistics() -> %% Set up a channel and queue {_Writer, Ch} = test_spawn(), rabbit_channel:do(Ch, #'queue.declare'{}), - QName = receive #'queue.declare_ok'{queue = Q0} -> - Q0 - after 1000 -> throw(failed_to_receive_queue_declare_ok) + QName = receive #'queue.declare_ok'{queue = Q0} -> Q0 + after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok) end, {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)), QPid = Q#amqqueue.pid, @@ -1266,7 +1263,7 @@ expect_event(Pid, Type) -> Pid -> ok; _ -> expect_event(Pid, Type) end - after 1000 -> throw({failed_to_receive_event, Type}) + after ?TIMEOUT -> throw({failed_to_receive_event, Type}) end. test_delegates_async(SecondaryNode) -> @@ -1290,7 +1287,7 @@ make_responder(FMsg) -> make_responder(FMsg, timeout). make_responder(FMsg, Throw) -> fun () -> receive Msg -> FMsg(Msg) - after 1000 -> throw(Throw) + after ?TIMEOUT -> throw(Throw) end end. @@ -1303,9 +1300,7 @@ await_response(Count) -> receive response -> ok, await_response(Count - 1) - after 1000 -> - io:format("Async reply not received~n"), - throw(timeout) + after ?TIMEOUT -> throw(timeout) end. must_exit(Fun) -> @@ -1372,7 +1367,7 @@ test_queue_cleanup(_SecondaryNode) -> rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }), receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} -> ok - after 1000 -> throw(failed_to_receive_queue_declare_ok) + after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok) end, rabbit_channel:shutdown(Ch), rabbit:stop(), @@ -1383,8 +1378,7 @@ test_queue_cleanup(_SecondaryNode) -> receive #'channel.close'{reply_code = ?NOT_FOUND} -> ok - after 2000 -> - throw(failed_to_receive_channel_exit) + after ?TIMEOUT -> throw(failed_to_receive_channel_exit) end, rabbit_channel:shutdown(Ch2), passed. @@ -1411,8 +1405,7 @@ test_declare_on_dead_queue(SecondaryNode) -> true = rabbit_misc:is_process_alive(Q#amqqueue.pid), {ok, 0} = rabbit_amqqueue:delete(Q, false, false), passed - after 2000 -> - throw(failed_to_create_and_kill_queue) + after ?TIMEOUT -> throw(failed_to_create_and_kill_queue) end. %%--------------------------------------------------------------------- @@ -1643,7 +1636,7 @@ on_disk_capture(OnDisk, Awaiting, Pid) -> Pid); stop -> done - after (case Awaiting of [] -> 200; _ -> 1000 end) -> + after (case Awaiting of [] -> 200; _ -> ?TIMEOUT end) -> case Awaiting of [] -> Pid ! {self(), arrived}, on_disk_capture(); _ -> Pid ! {self(), timeout} @@ -2196,7 +2189,7 @@ wait_for_confirms(Unconfirmed) -> wait_for_confirms( rabbit_misc:gb_sets_difference( Unconfirmed, gb_sets:from_list(Confirmed))) - after 5000 -> exit(timeout_waiting_for_confirm) + after ?TIMEOUT -> exit(timeout_waiting_for_confirm) end end. |
