diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-17 10:51:22 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-17 10:51:22 +0100 |
| commit | 739ee88e529aa0ffcd5e37d24a2d58b92f3d177d (patch) | |
| tree | 94d29ec6773117871fa71d5c1389dde416e69aa1 /src | |
| parent | 1299f4acd0f282967191a7d69b444887e055b603 (diff) | |
| download | rabbitmq-server-git-739ee88e529aa0ffcd5e37d24a2d58b92f3d177d.tar.gz | |
work in progress
This doesn't even compile; but I want to stash it away so I can cherry
pick some of the refactorings and tweaks for 'default'.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 257 |
1 files changed, 117 insertions, 140 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d41db65a79..0cd03daf48 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -42,7 +42,6 @@ backing_queue, backing_queue_state, active_consumers, - blocked_consumers, expires, sync_timer_ref, rate_timer_ref, @@ -60,6 +59,7 @@ monitor_ref, acktags, consumer_count, + blocked_consumers, limiter, is_limit_active, unsent_message_count}). @@ -124,7 +124,6 @@ init(Q) -> backing_queue = backing_queue_module(Q), backing_queue_state = undefined, active_consumers = queue:new(), - blocked_consumers = queue:new(), expires = undefined, sync_timer_ref = undefined, rate_timer_ref = undefined, @@ -150,7 +149,6 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, backing_queue = BQ, backing_queue_state = BQS, active_consumers = queue:new(), - blocked_consumers = queue:new(), expires = undefined, sync_timer_ref = undefined, rate_timer_ref = RateTRef, @@ -340,6 +338,7 @@ ch_record(ChPid) -> monitor_ref = MonitorRef, acktags = sets:new(), consumer_count = 0, + blocked_consumers = queue:new(), is_limit_active = false, limiter = rabbit_limiter:make_token(), unsent_message_count = 0}, @@ -348,19 +347,18 @@ ch_record(ChPid) -> C = #cr{} -> C end. -store_ch_record(C = #cr{ch_pid = ChPid}) -> - put({ch, ChPid}, C). - -maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, - acktags = ChAckTags, - unsent_message_count = UnsentMessageCount}) -> +update_ch_record(C = #cr{consumer_count = ConsumerCount, + acktags = ChAckTags, + unsent_message_count = UnsentMessageCount}) -> case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of - {0, 0, 0} -> ok = erase_ch_record(C), - false; - _ -> store_ch_record(C), - true + {0, 0, 0} -> ok = erase_ch_record(C); + _ -> ok = store_ch_record(C) end. +store_ch_record(C = #cr{ch_pid = ChPid}) -> + put({ch, ChPid}, C), + ok. + erase_ch_record(#cr{ch_pid = ChPid, limiter = Limiter, monitor_ref = MonitorRef}) -> @@ -371,6 +369,9 @@ erase_ch_record(#cr{ch_pid = ChPid, all_ch_record() -> [C || {{ch, _}, C} <- get()]. +block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> + update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}). + is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. @@ -382,67 +383,56 @@ ch_record_state_transition(OldCR, NewCR) -> end. deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, - State = #q{q = #amqqueue{name = QName}, - active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers}) -> - case queue:out(ActiveConsumers) of - {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, - ack_required = AckRequired}}}, - ActiveConsumersTail} -> - C = #cr{limiter = Limiter, - unsent_message_count = Count, - acktags = ChAckTags} = ch_record(ChPid), - IsMsgReady = PredFun(FunAcc, State), - case (IsMsgReady andalso - rabbit_limiter:can_send(Limiter, self(), AckRequired)) of - true -> - {{Message, IsDelivered, AckTag}, FunAcc1, State1} = - DeliverFun(AckRequired, FunAcc, State), - rabbit_channel:deliver( - ChPid, ConsumerTag, AckRequired, - {QName, self(), AckTag, IsDelivered, Message}), - ChAckTags1 = - case AckRequired of - true -> sets:add_element(AckTag, ChAckTags); - false -> ChAckTags - end, - NewC = C#cr{unsent_message_count = Count + 1, - acktags = ChAckTags1}, - true = maybe_store_ch_record(NewC), - {NewActiveConsumers, NewBlockedConsumers} = - case ch_record_state_transition(C, NewC) of - ok -> {queue:in(QEntry, ActiveConsumersTail), - BlockedConsumers}; - block -> {ActiveConsumers1, BlockedConsumers1} = - move_consumers(ChPid, - ActiveConsumersTail, - BlockedConsumers), - {ActiveConsumers1, - queue:in(QEntry, BlockedConsumers1)} - end, - State2 = State1#q{ - active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers}, - deliver_msgs_to_consumers(Funs, FunAcc1, State2); - %% if IsMsgReady then we've hit the limiter - false when IsMsgReady -> - true = maybe_store_ch_record(C#cr{is_limit_active = true}), - {NewActiveConsumers, NewBlockedConsumers} = - move_consumers(ChPid, - ActiveConsumers, - BlockedConsumers), - deliver_msgs_to_consumers( - Funs, FunAcc, - State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers}); - false -> - %% no message was ready, so we don't need to block anyone - {FunAcc, State} - end; - {empty, _} -> - {FunAcc, State} + State = #q{active_consumers = ActiveConsumers}) -> + case PredFun(FunAcc, State) of + false -> {FunAcc, State}; + true -> case queue:out(ActiveConsumers) of + {empty, _} -> + {FunAcc, State}; + {{value, QEntry}, Tail} -> + {FunAcc1, State1} = + deliver_msg_to_consumer( + DeliverFun, QEntry, + FunAcc, State#q{active_consumers = Tail}), + deliver_msgs_to_consumers(Funs, FunAcc1, State1) + end + end. + +deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, FunAcc, State) -> + C = ch_record(ChPid), + case is_ch_blocked(C) of + true -> block_consumer(C, E), + {FunAcc, State}; + false -> case rabbit_limiter:can_send(C#cr.limiter, self(), + Consumer#consumer.ack_required) of + false -> block_consumer(C#cr{is_limit_active = true}, E), + {FunAcc, State}; + true -> AC1 = queue:in(E, State#q.active_consumers), + deliver_msg_to_consumer( + DeliverFun, Consumer, C, FunAcc, + State#q{active_consumers = AC1}) + end end. +deliver_msg_to_consumer(DeliverFun, + #consumer{tag = ConsumerTag, + ack_required = AckRequired}, + C = #cr{ch_pid = ChPid, + acktags = ChAckTags, + unsent_message_count = Count}, + FunAcc, State = #q{q = #amqqueue{name = QName}}) -> + {{Message, IsDelivered, AckTag}, FunAcc1, State1} = + DeliverFun(AckRequired, FunAcc, State), + rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, + {QName, self(), AckTag, IsDelivered, Message}), + ChAckTags1 = case AckRequired of + true -> sets:add_element(AckTag, ChAckTags); + false -> ChAckTags + end, + update_ch_record(C#cr{acktags = ChAckTags1, + unsent_message_count = Count + 1}), + {FunAcc1, State1}. + deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty. deliver_from_queue_deliver(AckRequired, false, State) -> @@ -560,11 +550,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, maybe_record_confirm_message(Confirm, State1), case Delivered of true -> State2; - false -> BQS1 = - BQ:publish(Message, - (message_properties(State)) #message_properties{ - needs_confirming = needs_confirming(Confirm)}, - ChPid, BQS), + false -> Props = (message_properties(State)) #message_properties{ + needs_confirming = needs_confirming(Confirm)}, + BQS1 = BQ:publish(Message, Props, ChPid, BQS), ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. @@ -576,6 +564,15 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> BQS1 end, State). +process_acks(ChPid, AckTags, State, Fun) -> + case lookup_ch(ChPid) of + not_found -> + noreply(State); + C = #cr{acktags = ChAckTags} -> + update_ch_record(C#cr{acktags = subtract_acks(ChAckTags, AckTags)}), + noreply(Fun(State)) + end. + fetch(AckRequired, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), @@ -608,17 +605,16 @@ possibly_unblock(State, ChPid, Update) -> not_found -> State; C -> - NewC = Update(C), - maybe_store_ch_record(NewC), - case ch_record_state_transition(C, NewC) of - ok -> State; - unblock -> {NewBlockedConsumers, NewActiveConsumers} = - move_consumers(ChPid, - State#q.blocked_consumers, - State#q.active_consumers), - run_message_queue( - State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers}) + C1 = Update(C), + case ch_record_state_transition(C, C1) of + ok -> update_ch_record(C1), + State; + unblock -> #cr{blocked_consumers = Consumers} = C1, + update_ch_record( + C1#cr{blocked_consumers = queue:new()}), + AC1 = queue:join(State#q.active_consumers, + Consumers), + run_message_queue(State#q{active_consumers = AC1}) end end. @@ -946,10 +942,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, State3 = case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - true = maybe_store_ch_record( - C#cr{acktags = - sets:add_element(AckTag, - ChAckTags)}), + ChAckTags1 = sets:add_element(AckTag, ChAckTags), + update_ch_record(C#cr{acktags = ChAckTags1}), State2; false -> State2 end, @@ -968,9 +962,8 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, - true = maybe_store_ch_record( - C#cr{consumer_count = ConsumerCount +1, - limiter = Limiter}), + C1 = C#cr{consumer_count = ConsumerCount +1, + limiter = Limiter}, ok = case ConsumerCount of 0 -> rabbit_limiter:register(Limiter, self()); _ -> ok @@ -981,17 +974,14 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, State1 = State#q{has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), + E = {ChPid, Consumer}, State2 = case is_ch_blocked(C) of - true -> State1#q{ - blocked_consumers = - add_consumer(ChPid, Consumer, - State1#q.blocked_consumers)}; - false -> run_message_queue( - State1#q{ - active_consumers = - add_consumer(ChPid, Consumer, - State1#q.active_consumers)}) + true -> block_consumer(C1, E), + State1; + false -> update_ch_record(C1), + AC1 = queue:in(E, State1#q.active_consumers), + run_message_queue(State1#q{active_consumers = AC1}) end, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck), @@ -1007,7 +997,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, C = #cr{consumer_count = ConsumerCount, limiter = Limiter} -> C1 = C#cr{consumer_count = ConsumerCount -1}, - maybe_store_ch_record( + update_ch_record( case ConsumerCount of 1 -> ok = rabbit_limiter:unregister(Limiter, self()), C1#cr{limiter = rabbit_limiter:make_token()}; @@ -1057,14 +1047,8 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), - case lookup_ch(ChPid) of - not_found -> - noreply(State); - C = #cr{acktags = ChAckTags} -> - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - maybe_store_ch_record(C#cr{acktags = ChAckTags1}), - noreply(requeue_and_run(AckTags, State)) - end. + noreply(process_acks(ChPid, AckTags, State, + fun (State1) -> requeue_and_run(AckTags, State1) end)). handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -1073,33 +1057,26 @@ handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. noreply(deliver_or_enqueue(Delivery, State)); -handle_cast({ack, AckTags, ChPid}, - State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - case lookup_ch(ChPid) of - not_found -> - noreply(State); - C = #cr{acktags = ChAckTags} -> - maybe_store_ch_record(C#cr{acktags = subtract_acks( - ChAckTags, AckTags)}), - {_Guids, BQS1} = BQ:ack(AckTags, BQS), - noreply(State#q{backing_queue_state = BQS1}) - end; - -handle_cast({reject, AckTags, Requeue, ChPid}, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - case lookup_ch(ChPid) of - not_found -> - noreply(State); - C = #cr{acktags = ChAckTags} -> - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - maybe_store_ch_record(C#cr{acktags = ChAckTags1}), - noreply(case Requeue of - true -> requeue_and_run(AckTags, State); - false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), - State#q{backing_queue_state = BQS1} - end) - end; +handle_cast({ack, AckTags, ChPid}, State) -> + noreply(process_acks(ChPid, AckTags, State, + fun (State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State1#q{backing_queue_state = BQS1} + end)); + +handle_cast({reject, AckTags, Requeue, ChPid}, State) -> + noreply(process_acks(ChPid, AckTags, State, + fun (State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + case Requeue of + true -> requeue_and_run(AckTags, State1); + false -> {_Guids, BQS1} = + BQ:ack(AckTags, BQS), + State1#q{ + backing_queue_state = BQS1} + end + end)); handle_cast(delete_immediately, State) -> {stop, normal, State}; |
