diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 256 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 35 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 2 | ||||
| -rw-r--r-- | src/vm_memory_monitor.erl | 23 |
4 files changed, 162 insertions, 154 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7002fd367c..cb59edd9b1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -238,10 +238,11 @@ notify_decorators(Event, Props, State) when Event =:= startup; notify_decorators(Event, Props, State = #q{active_consumers = ACs, backing_queue = BQ, backing_queue_state = BQS}) -> - decorator_callback( - qname(State), notify, - [Event, [{max_active_consumer_priority, priority_queue:highest(ACs)}, - {is_empty, BQ:is_empty(BQS)} | Props]]). + P = priority_queue:highest(ACs), + decorator_callback(qname(State), notify, + [Event, [{max_active_consumer_priority, P}, + {is_empty, BQ:is_empty(BQS)} | + Props]]). decorator_callback(QName, F, A) -> %% Look up again in case policy and hence decorators have changed @@ -326,8 +327,8 @@ terminate_shutdown(Fun, State) -> _ -> ok = rabbit_memory_monitor:deregister(self()), QName = qname(State), notify_decorators(shutdown, [], State), - [emit_consumer_deleted(Ch, CTag, QName) - || {Ch, CTag, _} <- consumers(State1)], + [emit_consumer_deleted(Ch, CTag, QName) || + {Ch, CTag, _} <- consumers(State1)], State1#q{backing_queue_state = Fun(BQS)} end. @@ -485,12 +486,10 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; deliver_msgs_to_consumers(DeliverFun, false, - State = #q{active_consumers = ActiveConsumers, - consumer_use = CUInfo}) -> + State = #q{active_consumers = ActiveConsumers}) -> case priority_queue:out_p(ActiveConsumers) of {empty, _} -> - {false, - State#q{consumer_use = update_consumer_use(CUInfo, inactive)}}; + {false, update_consumer_use(State, inactive)}; {{value, QEntry, Priority}, Tail} -> {Stop, State1} = deliver_msg_to_consumer( DeliverFun, QEntry, Priority, @@ -537,18 +536,17 @@ deliver_msg_to_consumer0(DeliverFun, unsent_message_count = Count + 1}), {Stop, State1}. -deliver_from_queue_deliver(AckRequired, State) -> - {Result, State1} = fetch(AckRequired, State), - {Result, is_empty(State1), State1}. +update_consumer_use(State = #q{consumer_use = CUInfo}, Use) -> + State#q{consumer_use = update_consumer_use1(CUInfo, Use)}. -update_consumer_use({inactive, _, _, _} = CUInfo, inactive) -> +update_consumer_use1({inactive, _, _, _} = CUInfo, inactive) -> CUInfo; -update_consumer_use({active, _, _} = CUInfo, active) -> +update_consumer_use1({active, _, _} = CUInfo, active) -> CUInfo; -update_consumer_use({active, Since, Avg}, inactive) -> +update_consumer_use1({active, Since, Avg}, inactive) -> Now = now_micros(), {inactive, Now, Now - Since, Avg}; -update_consumer_use({inactive, Since, Active, Avg}, active) -> +update_consumer_use1({inactive, Since, Active, Avg}, active) -> Now = now_micros(), {active, Now, consumer_use_avg(Active, Now - Since, Avg)}. @@ -607,10 +605,12 @@ discard(#delivery{sender = SenderPid, State1#q{backing_queue_state = BQS1}. run_message_queue(State) -> - {_IsEmpty1, State1} = deliver_msgs_to_consumers( - fun deliver_from_queue_deliver/2, - is_empty(State), State), - State1. + {_Active, State3} = deliver_msgs_to_consumers( + fun(AckRequired, State1) -> + {Result, State2} = fetch(AckRequired, State1), + {Result, is_empty(State2), State2} + end, is_empty(State), State), + State3. add_consumer({ChPid, Consumer = #consumer{args = Args}}, ActiveConsumers) -> Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of @@ -622,21 +622,21 @@ add_consumer({ChPid, Consumer = #consumer{args = Args}}, ActiveConsumers) -> attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - case BQ:is_duplicate(Message, BQS) of - {false, BQS1} -> - deliver_msgs_to_consumers( - fun (true, State1 = #q{backing_queue_state = BQS2}) -> - true = BQ:is_empty(BQS2), - {AckTag, BQS3} = BQ:publish_delivered( - Message, Props, SenderPid, BQS2), - {{Message, Delivered, AckTag}, - true, State1#q{backing_queue_state = BQS3}}; - (false, State1) -> - {{Message, Delivered, undefined}, - true, discard(Delivery, State1)} - end, false, State#q{backing_queue_state = BQS1}); - {true, BQS1} -> - {true, State#q{backing_queue_state = BQS1}} + {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), + State1 = State#q{backing_queue_state = BQS1}, + case IsDuplicate of + false -> deliver_msgs_to_consumers( + fun (true, State2 = #q{backing_queue_state = BQS2}) -> + true = BQ:is_empty(BQS2), + {AckTag, BQS3} = BQ:publish_delivered( + Message, Props, SenderPid, BQS2), + {{Message, Delivered, AckTag}, + true, State2#q{backing_queue_state = BQS3}}; + (false, State2) -> + {{Message, Delivered, undefined}, + true, discard(Delivery, State2)} + end, false, State1); + true -> {true, State1} end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, @@ -652,7 +652,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), {Dropped, State3 = #q{backing_queue_state = BQS2}} = - maybe_drop_head(State2#q{backing_queue_state = BQS1}), + maybe_drop_head(State2#q{backing_queue_state = BQS1}), QLen = BQ:len(BQS2), %% optimisation: it would be perfectly safe to always %% invoke drop_expired_msgs here, but that is expensive so @@ -738,7 +738,7 @@ possibly_unblock(State, ChPid, Update) -> end end. -unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) -> +unblock(State, C = #cr{limiter = Limiter}) -> case lists:partition( fun({_P, {_ChPid, #consumer{tag = CTag}}}) -> rabbit_limiter:is_consumer_blocked(Limiter, CTag) @@ -750,49 +750,47 @@ unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) -> BlockedQ = priority_queue:from_list(Blocked), UnblockedQ = priority_queue:from_list(Unblocked), update_ch_record(C#cr{blocked_consumers = BlockedQ}), - State1 = State#q{consumer_use = - update_consumer_use(CUInfo, active)}, - AC1 = priority_queue:join(State1#q.active_consumers, UnblockedQ), - State2 = State1#q{active_consumers = AC1}, + AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ), + State1 = update_consumer_use(State#q{active_consumers = AC1}, + active), [notify_decorators( - consumer_unblocked, [{consumer_tag, CTag}], State2) || + consumer_unblocked, [{consumer_tag, CTag}], State1) || {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked], - run_message_queue(State2) + run_message_queue(State1) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). -handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, +handle_ch_down(DownPid, State = #q{active_consumers = AC, + exclusive_consumer = Holder, senders = Senders}) -> - Senders1 = case pmon:is_monitored(DownPid, Senders) of - false -> Senders; - true -> credit_flow:peer_down(DownPid), - pmon:demonitor(DownPid, Senders) - end, + State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of + false -> Senders; + true -> credit_flow:peer_down(DownPid), + pmon:demonitor(DownPid, Senders) + end}, case lookup_ch(DownPid) of not_found -> - {ok, State#q{senders = Senders1}}; + {ok, State1}; C = #cr{ch_pid = ChPid, acktags = ChAckTags, blocked_consumers = Blocked} -> QName = qname(State), - _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission + AC1 = remove_consumers(ChPid, AC, QName), + _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission ok = erase_ch_record(C), - State1 = State#q{ - exclusive_consumer = case Holder of - {ChPid, _} -> none; - Other -> Other - end, - active_consumers = remove_consumers( - ChPid, State#q.active_consumers, - QName), - senders = Senders1}, - case should_auto_delete(State1) of - true -> {stop, State1}; + Holder1 = case Holder of + {DownPid, _} -> none; + Other -> Other + end, + State2 = State1#q{active_consumers = AC1, + exclusive_consumer = Holder1}, + case should_auto_delete(State2) of + true -> {stop, State2}; false -> {ok, requeue_and_run(queue:to_list(ChAckTags), - ensure_expiry_timer(State1))} + ensure_expiry_timer(State2))} end end. @@ -1223,64 +1221,66 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, case fetch(AckRequired, State1) of {empty, State2} -> reply(empty, State2); - {{Message, IsDelivered, AckTag}, State2} -> - State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = - case AckRequired of - true -> C = #cr{acktags = ChAckTags} = - ch_record(ChPid, LimiterPid), - ChAckTags1 = queue:in(AckTag, ChAckTags), - update_ch_record(C#cr{acktags = ChAckTags1}), - State2; - false -> State2 - end, + {{Message, IsDelivered, AckTag}, + #q{backing_queue = BQ, backing_queue_state = BQS} = State2} -> + case AckRequired of + true -> C = #cr{acktags = ChAckTags} = + ch_record(ChPid, LimiterPid), + ChAckTags1 = queue:in(AckTag, ChAckTags), + update_ch_record(C#cr{acktags = ChAckTags1}); + false -> ok + end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, BQ:len(BQS), Msg}, State3) + reply({ok, BQ:len(BQS), Msg}, State2) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg}, - _From, State = #q{exclusive_consumer = Holder}) -> + _From, State = #q{active_consumers = AC, + exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of - in_use -> - reply({error, exclusive_consume_unavailable}, State); - ok -> - C = #cr{consumer_count = Count, - limiter = Limiter} = ch_record(ChPid, LimiterPid), - Limiter1 = case LimiterActive of - true -> rabbit_limiter:activate(Limiter); - false -> Limiter - end, - Limiter2 = case CreditArgs of - none -> Limiter1; - {Crd, Drain} -> rabbit_limiter:credit( - Limiter1, ConsumerTag, Crd, Drain) - end, - C1 = update_ch_record(C#cr{consumer_count = Count + 1, - limiter = Limiter2}), - case is_empty(State) of - true -> send_drained(C1); - false -> ok - end, - Consumer = #consumer{tag = ConsumerTag, - ack_required = not NoAck, - args = OtherArgs}, - ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> Holder - end, - State1 = State#q{has_had_consumers = true, - exclusive_consumer = ExclusiveConsumer}, - ok = maybe_send_reply(ChPid, OkMsg), - emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State1), OtherArgs), - AC1 = add_consumer({ChPid, Consumer}, State1#q.active_consumers), - State2 = State1#q{active_consumers = AC1}, - notify_decorators( - basic_consume, [{consumer_tag, ConsumerTag}], State2), - reply(ok, run_message_queue(State2)) + in_use -> reply({error, exclusive_consume_unavailable}, State); + ok -> C = #cr{consumer_count = Count, + limiter = Limiter} = + ch_record(ChPid, LimiterPid), + Limiter1 = case LimiterActive of + true -> rabbit_limiter:activate(Limiter); + false -> Limiter + end, + Limiter2 = case CreditArgs of + none -> Limiter1; + {Crd, Drain} -> rabbit_limiter:credit( + Limiter1, ConsumerTag, + Crd, Drain) + end, + C1 = update_ch_record(C#cr{consumer_count = Count + 1, + limiter = Limiter2}), + case is_empty(State) of + true -> send_drained(C1); + false -> ok + end, + Consumer = #consumer{tag = ConsumerTag, + ack_required = not NoAck, + args = OtherArgs}, + AC1 = add_consumer({ChPid, Consumer}, AC), + ExclusiveConsumer = + if ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> Holder + end, + State1 = State#q{active_consumers = AC1, + has_had_consumers = true, + exclusive_consumer = ExclusiveConsumer}, + ok = maybe_send_reply(ChPid, OkMsg), + emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + not NoAck, qname(State1), OtherArgs), + notify_decorators( + basic_consume, [{consumer_tag, ConsumerTag}], State1), + reply(ok, run_message_queue(State1)) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, - State = #q{exclusive_consumer = Holder}) -> + State = #q{active_consumers = AC, + exclusive_consumer = Holder}) -> ok = maybe_send_reply(ChPid, OkMsg), case lookup_ch(ChPid) of not_found -> @@ -1288,7 +1288,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, C = #cr{consumer_count = Count, limiter = Limiter, blocked_consumers = Blocked} -> - emit_consumer_deleted(ChPid, ConsumerTag, qname(State)), + AC1 = remove_consumer(ChPid, ConsumerTag, AC), Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), Limiter1 = case Count of 1 -> rabbit_limiter:deactivate(Limiter); @@ -1298,14 +1298,13 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, update_ch_record(C#cr{consumer_count = Count - 1, limiter = Limiter2, blocked_consumers = Blocked1}), - State1 = State#q{ - exclusive_consumer = case Holder of - {ChPid, ConsumerTag} -> none; - _ -> Holder - end, - active_consumers = remove_consumer( - ChPid, ConsumerTag, - State#q.active_consumers)}, + Holder1 = case Holder of + {ChPid, ConsumerTag} -> none; + _ -> Holder + end, + State1 = State#q{active_consumers = AC1, + exclusive_consumer = Holder1}, + emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)), notify_decorators( basic_cancel, [{consumer_tag, ConsumerTag}], State1), case should_auto_delete(State1) of @@ -1372,11 +1371,12 @@ handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), QName = qname(State), + AllConsumers = consumers(State), case Exclusive of none -> [emit_consumer_created( Ch, CTag, false, AckRequired, QName, Args) || - {Ch, CTag, AckRequired, Args} <- consumers(State)]; - {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = consumers(State), + {Ch, CTag, AckRequired, Args} <- AllConsumers]; + {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = AllConsumers, emit_consumer_created( Ch, CTag, true, AckRequired, QName, Args) end, @@ -1451,21 +1451,21 @@ handle_cast({set_maximum_since_use, Age}, State) -> noreply(State); handle_cast(start_mirroring, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS}) -> %% lookup again to get policy for init_with_existing_bq {ok, Q} = rabbit_amqqueue:lookup(qname(State)), true = BQ =/= rabbit_mirror_queue_master, %% assertion BQ1 = rabbit_mirror_queue_master, BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), noreply(State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); + backing_queue_state = BQS1}); handle_cast(stop_mirroring, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS}) -> BQ = rabbit_mirror_queue_master, %% assertion {BQ1, BQS1} = BQ:stop_mirroring(BQS), noreply(State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); + backing_queue_state = BQS1}); handle_cast({credit, ChPid, CTag, Credit, Drain}, State = #q{backing_queue = BQ, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index da22e09266..0aa44d9f47 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -736,6 +736,14 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) -> Type, Channel, Payload, State) end; +handle_input(handshake, <<"AMQP", A, B, C, D>>, State) -> + handshake({A, B, C, D}, State); +handle_input(handshake, Other, #v1{sock = Sock}) -> + refuse_connection(Sock, {bad_header, Other}); + +handle_input(Callback, Data, _State) -> + throw({bad_input, Callback, Data}). + %% The two rules pertaining to version negotiation: %% %% * If the server cannot support the protocol specified in the @@ -744,37 +752,31 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) -> %% %% * The server MUST provide a protocol version that is lower than or %% equal to that requested by the client in the protocol header. -handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) -> +handshake({0, 0, 9, 1}, State) -> start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State); %% This is the protocol header for 0-9, which we can safely treat as %% though it were 0-9-1. -handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) -> +handshake({1, 1, 0, 9}, State) -> start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State); %% This is what most clients send for 0-8. The 0-8 spec, confusingly, %% defines the version as 8-0. -handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) -> +handshake({1, 1, 8, 0}, State) -> start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); %% The 0-8 spec as on the AMQP web site actually has this as the %% protocol header; some libraries e.g., py-amqplib, send it when they %% want 0-8. -handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) -> +handshake({1, 1, 9, 1}, State) -> start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); -%% ... and finally, the 1.0 spec is crystal clear! Note that the -handle_input(handshake, <<"AMQP", Id, 1, 0, 0>>, State) -> +%% ... and finally, the 1.0 spec is crystal clear! +handshake({Id, 1, 0, 0}, State) -> become_1_0(Id, State); -handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) -> - refuse_connection(Sock, {bad_version, {A, B, C, D}}); - -handle_input(handshake, Other, #v1{sock = Sock}) -> - refuse_connection(Sock, {bad_header, Other}); - -handle_input(Callback, Data, _State) -> - throw({bad_input, Callback, Data}). +handshake(Vsn, #v1{sock = Sock}) -> + refuse_connection(Sock, {bad_version, Vsn}). %% Offer a protocol version to the client. Connection.start only %% includes a major and minor version number, Luckily 0-9 and 0-9-1 @@ -818,7 +820,10 @@ handle_method0(MethodName, FieldsBin, try handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), State) - catch exit:#amqp_error{method = none} = Reason -> + catch throw:{writer_inet_error, closed} -> + maybe_emit_stats(State), + throw(connection_closed_abruptly); + exit:#amqp_error{method = none} = Reason -> handle_exception(State, 0, Reason#amqp_error{method = MethodName}); Type:Reason -> Stack = erlang:get_stacktrace(), diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 34dd3d3b35..92d48e633f 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -272,7 +272,7 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> [MethodFrame | ContentFrames]. tcp_send(Sock, Data) -> - rabbit_misc:throw_on_error(inet_error, + rabbit_misc:throw_on_error(writer_inet_error, fun () -> rabbit_net:send(Sock, Data) end). internal_send_command(Sock, Channel, MethodRecord, Protocol) -> diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 369ec65596..fc4353dcb4 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -173,16 +173,19 @@ set_mem_limits(State, MemFraction) -> ?MEMORY_SIZE_FOR_UNKNOWN_OS; M -> M end, - UsableMemory = case get_vm_limit() of - Limit when Limit < TotalMemory -> - error_logger:warning_msg( - "Only ~pMB of ~pMB memory usable due to " - "limited address space.~n", - [trunc(V/?ONE_MB) || V <- [Limit, TotalMemory]]), - Limit; - _ -> - TotalMemory - end, + UsableMemory = + case get_vm_limit() of + Limit when Limit < TotalMemory -> + error_logger:warning_msg( + "Only ~pMB of ~pMB memory usable due to " + "limited address space.~n" + "Crashes due to memory exhaustion are possible - see~n" + "http://www.rabbitmq.com/memory.html#address-space~n", + [trunc(V/?ONE_MB) || V <- [Limit, TotalMemory]]), + Limit; + _ -> + TotalMemory + end, MemLim = trunc(MemFraction * UsableMemory), error_logger:info_msg("Memory limit set to ~pMB of ~pMB total.~n", [trunc(MemLim/?ONE_MB), trunc(TotalMemory/?ONE_MB)]), |
