diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 349 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 9 |
3 files changed, 174 insertions, 194 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 69edb64fbc..417c3f02b0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -51,8 +51,8 @@ owner, exclusive_consumer, has_had_consumers, + mixed_state, next_msg_id, - message_buffer, round_robin}). -record(consumer, {tag, ack_required}). @@ -96,16 +96,18 @@ init(Q) -> owner = none, exclusive_consumer = none, has_had_consumers = false, + mixed_state = rabbit_mixed_queue:start_link(qname(Q), mixed), %% TODO, CHANGE ME next_msg_id = 1, - message_buffer = queue:new(), round_robin = queue:new()}, ?HIBERNATE_AFTER}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? QName = qname(State), - lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end, - all_tx()), - ok = purge_message_buffer(QName, State#q.message_buffer), + NewState = + lists:foldl(fun (Txn, State1) -> + rollback_transaction(Txn, State1) + end, State, all_tx()), + rabbit_mixed_queue:purge(NewState #q.mixed_state), ok = rabbit_amqqueue:internal_delete(QName). code_change(_OldVsn, State, _Extra) -> @@ -156,11 +158,10 @@ ch_record_state_transition(OldCR, NewCR) -> true -> ok end. -deliver_immediately(Message, Delivered, - State = #q{q = #amqqueue{name = QName}, - round_robin = RoundRobin, - next_msg_id = NextId}) -> - ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), +deliver_queue(Fun, + State = #q{q = #amqqueue{name = QName}, + round_robin = RoundRobin, + next_msg_id = NextId}) -> case queue:out(RoundRobin) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, @@ -171,62 +172,103 @@ deliver_immediately(Message, Delivered, case not(AckRequired) orelse rabbit_limiter:can_send( LimiterPid, self()) of true -> - rabbit_channel:deliver( - ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, Delivered, Message}), - NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); - false -> UAM - end, - NewC = C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}, - store_ch_record(NewC), - NewConsumers = - case ch_record_state_transition(C, NewC) of - ok -> queue:in(QEntry, RoundRobinTail); - block -> block_consumers(ChPid, RoundRobinTail) - end, - {offered, AckRequired, State#q{round_robin = NewConsumers, - next_msg_id = NextId + 1}}; + case Fun(State) of + {empty, State2} -> + {empty, State2}; + {{MsgId, Msg, MsgSize, IsDelivered, AckTag, Remaining}, State2} -> + rabbit_channel:deliver( + ChPid, ConsumerTag, AckRequired, + {QName, self(), NextId, Delivered, Message}), %% TODO FIXME + NewUAM = case AckRequired of + true -> dict:store(NextId, Message, UAM); + false -> UAM + end, + NewC = C#cr{unsent_message_count = Count + 1, + unacked_messages = NewUAM}, + store_ch_record(NewC), + NewConsumers = + case ch_record_state_transition(C, NewC) of + ok -> queue:in(QEntry, RoundRobinTail); + block -> block_consumers(ChPid, RoundRobinTail) + end, + State3 = State2 #q { round_robin = NewConsumers, + next_msg_id = NextId + 1 + }, + if Remaining == 0 -> {offered, AckRequired, State3}; + true -> deliver_queue(Fun, State3) + end + end; false -> store_ch_record(C#cr{is_limit_active = true}), NewConsumers = block_consumers(ChPid, RoundRobinTail), - deliver_immediately(Message, Delivered, - State#q{round_robin = NewConsumers}) + deliver_queue(Fun, State#q{round_robin = NewConsumers}) end; {empty, _} -> {not_offered, State} end. +deliver_from_queue(State = #q { mixed_state = MS }) -> + {Res, MS2} = rabbit_mixed_queue:deliver(MS), + {Res, State #q { mixed_state = MS2 }}. + +run_message_queue(State) -> + case deliver_queue(deliver_from_queue/1, State) of + {not_offered, State2} -> + State2; + {empty, State2} -> + State2; + {offered, _AckRequired, State2} -> + State2 + end. + attempt_delivery(none, Message, State) -> - case deliver_immediately(Message, false, State) of + Fun = fun (State2) -> {{MsgId, Message, MsgSize, false, AckTag, 0}, State2} end, %% TODO FIX ME + case deliver_queue(Fun, State) of {offered, false, State1} -> {true, State1}; {offered, true, State1} -> - persist_message(none, qname(State), Message), %% DQ HERE - persist_delivery(qname(State), Message, false), %% DQ HERE - {true, State1}; + MS = rabbit_mixed_queue:publish_delivered(Message, State1 #q.mixed_state), %% TODO API CHANGE + {true, State1 #q { mixed_state = MS }}; {not_offered, State1} -> {false, State1} end; attempt_delivery(Txn, Message, State) -> - persist_message(Txn, qname(State), Message), %% DQ tx_commit and store msgid in txn map - record_pending_message(Txn, Message), %% DQ seems to be done here! - {true, State}. + MS = rabbit_mixed_queue:tx_publish(Message, State #q.mixed_state), %% TODO API CHANGE + record_pending_message(Txn, Message), + {true, State #q { mixed_state = MS }}. deliver_or_enqueue(Txn, Message, State) -> case attempt_delivery(Txn, Message, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> - persist_message(Txn, qname(State), Message), %% DQ Txn must be false here - NewMB = queue:in({Message, false}, NewState#q.message_buffer), %% DQ magic here - {false, NewState#q{message_buffer = NewMB}} + %% Txn is none + MS = rabbit_mixed_queue:publish(Message, State #q.mixed_state), %% TODO API CHANGE + {false, NewState #q { mixed_state = MS }} + end. + +%% all these messages have already been delivered at least once and +%% not ack'd, but need to be either redelivered or requeued +deliver_or_requeue_n(Messages, State) -> + {AutoAcks, Remaining} = + dropwhilefoldl(deliver_or_requeue_msg/2, {[], State}, Messages), + {ok, MS} = rabbit_mixed_queue:ack(lists:reverse(AutoAcks), State #q.mixed_state), %% TODO FIXME + case Remaining of + [] -> run_message_queue(State #q { mixed_state = MS }); + _ -> {ok, MS2} = rabbit_mixed_queue:requeue(Remaining, MS), %% TODO FIXME + State #q { mixed_state = MS2 } end. -deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) -> - run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)), - State). +deliver_or_requeue_msg(Message, {AcksAcc, State}) -> + Fun = fun (State2) -> {{MsgId, Message, MsgSize, true, AckTag, 0}, State2} end, %% TODO FIX ME + case deliver_queue(Fun, State) of + {offered, true, State1} -> + {true, {AcksAcc, State1}}; + {offered, false, State1} -> + {true, {[AckTag|AcksAcc], State1}}; %% TODO FIXME where does AckTag come from?! + {not_offered, State1} -> + {false, {AcksAcc, State1}} + end. block_consumers(ChPid, RoundRobin) -> %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ChPid, queue:to_list(RoundRobin)]), @@ -257,7 +299,7 @@ possibly_unblock(State, ChPid, Update) -> unblock -> NewRR = unblock_consumers(ChPid, NewC#cr.consumers, State#q.round_robin), - run_poke_burst(State#q{round_robin = NewRR}) + run_message_queue(State#q{round_robin = NewRR}) end end. @@ -300,9 +342,9 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, erlang:demonitor(MonitorRef), erase({ch, ChPid}), case check_auto_delete( - deliver_or_enqueue_n( - [{Message, true} || - {_Messsage_id, Message} <- dict:to_list(UAM)], %% DQ alter all this stuff? + deliver_or_requeue_n( + [Message || + {_Messsage_id, Message} <- dict:to_list(UAM)], State#q{ exclusive_consumer = case Holder of {ChPid, _} -> none; @@ -335,26 +377,6 @@ check_exclusive_access(none, true) -> false -> in_use end. -run_poke_burst(State = #q{message_buffer = MessageBuffer}) -> - run_poke_burst(MessageBuffer, State). - -run_poke_burst(MessageBuffer, State) -> - case queue:out(MessageBuffer) of - {{value, {Message, Delivered}}, BufferTail} -> - case deliver_immediately(Message, Delivered, State) of - {offered, true, NewState} -> - persist_delivery(qname(State), Message, Delivered), %% DQ ack needed - run_poke_burst(BufferTail, NewState); - {offered, false, NewState} -> - persist_auto_ack(qname(State), Message), %% DQ record? We don't persist acks anyway now... - run_poke_burst(BufferTail, NewState); - {not_offered, NewState} -> - NewState#q{message_buffer = MessageBuffer} - end; - {empty, _} -> - State#q{message_buffer = MessageBuffer} - end. - is_unused() -> is_unused1(get()). @@ -371,62 +393,6 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -persist_message(_Txn, _QName, #basic_message{is_persistent = false}) -> %% DQ - ok; -persist_message(Txn, QName, Message) -> - M = Message#basic_message{ - %% don't persist any recoverable decoded properties, rebuild from properties_bin on restore - content = rabbit_binary_parser:clear_decoded_content( - Message#basic_message.content)}, - persist_work(Txn, QName, - [{publish, M, {QName, M#basic_message.guid}}]). - -persist_delivery(_QName, _Message, %% DQ - true) -> - ok; -persist_delivery(_QName, #basic_message{is_persistent = false}, %% DQ - _Delivered) -> - ok; -persist_delivery(QName, #basic_message{guid = MsgId}, %% DQ - _Delivered) -> - persist_work(none, QName, [{deliver, {QName, MsgId}}]). - -persist_acks(Txn, QName, Messages) -> %% DQ - persist_work(Txn, QName, - [{ack, {QName, MsgId}} || - #basic_message{guid = MsgId, is_persistent = P} <- Messages, - P]). - -persist_auto_ack(_QName, #basic_message{is_persistent = false}) -> - ok; -persist_auto_ack(QName, #basic_message{is_persistent = true, guid = MsgId}) -> - %% auto-acks are always non-transactional - rabbit_persister:dirty_work([{ack, {QName, MsgId}}]). - -persist_work(_Txn,_QName, []) -> - ok; -persist_work(none, _QName, WorkList) -> - rabbit_persister:dirty_work(WorkList); -persist_work(Txn, QName, WorkList) -> - mark_tx_persistent(Txn), - rabbit_persister:extend_transaction({Txn, QName}, WorkList). - -commit_work(Txn, QName) -> - do_if_persistent(fun rabbit_persister:commit_transaction/1, - Txn, QName). - -rollback_work(Txn, QName) -> - do_if_persistent(fun rabbit_persister:rollback_transaction/1, - Txn, QName). - -%% optimisation: don't do unnecessary work -%% it would be nice if this was handled by the persister -do_if_persistent(F, Txn, QName) -> - case is_tx_persistent(Txn) of - false -> ok; - true -> ok = F({Txn, QName}) - end. - lookup_tx(Txn) -> case get({txn, Txn}) of undefined -> #tx{ch_pid = none, @@ -448,54 +414,52 @@ all_tx_record() -> all_tx() -> [Txn || {{txn, Txn}, _} <- get()]. -mark_tx_persistent(Txn) -> - Tx = lookup_tx(Txn), - store_tx(Txn, Tx#tx{is_persistent = true}). - is_tx_persistent(Txn) -> #tx{is_persistent = Res} = lookup_tx(Txn), Res. -record_pending_message(Txn, Message) -> - Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), - store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}). +record_pending_message(Txn, Message = #basic_message { is_persistent = IsPersistent }) -> + Tx = #tx{pending_messages = Pending, is_persistent = IsPersistentTxn } = lookup_tx(Txn), + store_tx(Txn, Tx #tx { pending_messages = [{Message, false} | Pending], + is_persistent = IsPersistentTxn orelse IsPersistent + }). record_pending_acks(Txn, ChPid, MsgIds) -> Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], ch_pid = ChPid}). -process_pending(Txn, State) -> - #tx{ch_pid = ChPid, - pending_messages = PendingMessages, - pending_acks = PendingAcks} = lookup_tx(Txn), +commit_transaction(Txn, State) -> + #tx { ch_pid = ChPid, + pending_messages = PendingMessages, + pending_acks = PendingAcks + } = lookup_tx(Txn), + PendingMessagesOrdered = lists:reverse(PendingMessages), + PendingAcksOrdered = lists:append(lists:reverse(PendingAcks)), case lookup_ch(ChPid) of - not_found -> ok; - C = #cr{unacked_messages = UAM} -> - {_Acked, Remaining} = - collect_messages(lists:append(PendingAcks), UAM), - store_ch_record(C#cr{unacked_messages = Remaining}) - end, - deliver_or_enqueue_n(lists:reverse(PendingMessages), State). + not_found -> State; + C = #cr { unacked_messages = UAM } -> + {Acked, Remaining} = + collect_messages(PendingAcksAppended, UAM), + store_ch_record(C#cr{unacked_messages = Remaining}), + MS = rabbit_mixed_queue:tx_commit(PendingMessagesOrdered, + Acked, + State #q.mixed_state), + State #q { mixed_state = MS } + end. +rollback_transaction(Txn, State) -> + #tx { pending_messages = PendingMessages + } = lookup_tx(Txn), + MS = rabbit_mixed_queue:tx_cancel(lists:reverse(PendingMessages), State #q.mixed_state), + State #q { mixed_state = MS }. + +%% {A, B} = collect_messages(C, D) %% A = C `intersect` D; B = D \\ C +%% err, A = C `intersect` D , via projection through the dict that is A collect_messages(MsgIds, UAM) -> lists:mapfoldl( fun (MsgId, D) -> {dict:fetch(MsgId, D), dict:erase(MsgId, D)} end, UAM, MsgIds). -purge_message_buffer(QName, MessageBuffer) -> - Messages = - [[Message || {Message, _Delivered} <- - queue:to_list(MessageBuffer)] | - lists:map( - fun (#cr{unacked_messages = UAM}) -> - [Message || {_MessageId, Message} <- dict:to_list(UAM)] - end, - all_ch_record())], - %% the simplest, though certainly not the most obvious or - %% efficient, way to purge messages from the persister is to - %% artifically ack them. - persist_acks(none, QName, lists:append(Messages)). - infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(name, #q{q = #amqqueue{name = Name}}) -> Name; @@ -564,12 +528,11 @@ handle_call({deliver, Txn, Message}, _From, State) -> reply(Delivered, NewState); handle_call({commit, Txn}, From, State) -> - ok = commit_work(Txn, qname(State)), + NewState = commit_transaction(Txn, State), %% optimisation: we reply straight away so the sender can continue gen_server2:reply(From, ok), - NewState = process_pending(Txn, State), erase_tx(Txn), - noreply(NewState); + noreply(run_message_queue(NewState)); handle_call({notify_down, ChPid}, From, State) -> %% optimisation: we reply straight away so the sender can continue @@ -579,23 +542,25 @@ handle_call({notify_down, ChPid}, From, State) -> handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}, next_msg_id = NextId, - message_buffer = MessageBuffer}) -> - case queue:out(MessageBuffer) of - {{value, {Message, Delivered}}, BufferTail} -> + mixed_state = MS + }) -> + case rabbit_mixed_queue:deliver(MS) of + {empty, MS2} -> reply(empty, State #q { mixed_state = MS2 }); + {{MsgId, Msg, MsgSize, IsDelivered, AckTag, Remaining}, MS2} -> AckRequired = not(NoAck), - case AckRequired of - true -> - persist_delivery(QName, Message, Delivered), - C = #cr{unacked_messages = UAM} = ch_record(ChPid), - NewUAM = dict:store(NextId, Message, UAM), - store_ch_record(C#cr{unacked_messages = NewUAM}); - false -> - persist_auto_ack(QName, Message) - end, - Msg = {QName, self(), NextId, Delivered, Message}, - reply({ok, queue:len(BufferTail), Msg}, - State#q{message_buffer = BufferTail, - next_msg_id = NextId + 1}); + MS3 = + case AckRequired of + true -> + C = #cr{unacked_messages = UAM} = ch_record(ChPid), + NewUAM = dict:store(NextId, Message, UAM), + store_ch_record(C#cr{unacked_messages = NewUAM}), + MS2; + false -> + rabbit_mixed_queue:ack([AckTag], MS2) + end, + Message = {QName, self(), NextId, IsDelivered, Msg}, %% TODO, FIX UP + reply({ok, Remaining, Message}, + State#q{next_msg_id = NextId + 1}); {empty, _} -> reply(empty, State) end; @@ -630,7 +595,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, end, round_robin = queue:in({ChPid, Consumer}, RoundRobin)}, ok = maybe_send_reply(ChPid, OkMsg), - reply(ok, run_poke_burst(State1)) + reply(ok, run_message_queue(State1)) end end; @@ -667,27 +632,29 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, end; handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, - message_buffer = MessageBuffer, + mixed_state = MS, round_robin = RoundRobin}) -> - reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State); + {Length, MS2} = rabbit_mixed_queue:length(MS), + reply({ok, Name, Length, queue:len(RoundRobin)}, State #q { mixed_state = MS2 }); handle_call({delete, IfUnused, IfEmpty}, _From, - State = #q{message_buffer = MessageBuffer}) -> + State = #q{message_buffer = MessageBuffer, mixed_state = MS}) -> IsEmpty = queue:is_empty(MessageBuffer), IsUnused = is_unused(), + {Length, MS2} = rabbit_mixed_queue:length(MS), if IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); IfUnused and not(IsUnused) -> reply({error, in_use}, State); true -> - {stop, normal, {ok, queue:len(MessageBuffer)}, State} + {stop, normal, {ok, Length}, State #q { mixed_state = MS2 }} end; -handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) -> - ok = purge_message_buffer(qname(State), MessageBuffer), - reply({ok, queue:len(MessageBuffer)}, - State#q{message_buffer = queue:new()}); +handle_call(purge, _From, State) -> + {Count, MS} = rabbit_mixed_queue:purge(State #q.mixed_state), + reply({ok, Count}, + State #q { mixed_state = MS }); handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> @@ -722,23 +689,24 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> noreply(State); C = #cr{unacked_messages = UAM} -> {Acked, Remaining} = collect_messages(MsgIds, UAM), - persist_acks(Txn, qname(State), Acked), case Txn of none -> - store_ch_record(C#cr{unacked_messages = Remaining}); + MS = rabbit_mixed_queue:ack(Acked, State #q.mixed_state), %% TODO API + store_ch_record(C#cr{unacked_messages = Remaining}), + noreply(State #q { mixed_state = MS }); _ -> - record_pending_acks(Txn, ChPid, MsgIds) + record_pending_acks(Txn, ChPid, MsgIds), + noreply(State) end, - noreply(State) end; handle_cast({rollback, Txn}, State) -> - ok = rollback_work(Txn, qname(State)), + NewState = rollback_transaction(Txn, State), erase_tx(Txn), - noreply(State); + noreply(State2); handle_cast({redeliver, Messages}, State) -> - noreply(deliver_or_enqueue_n(Messages, State)); + noreply(ok); %% TODO - probably remove - only used by the old persister handle_cast({requeue, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of @@ -749,8 +717,7 @@ handle_cast({requeue, MsgIds, ChPid}, State) -> C = #cr{unacked_messages = UAM} -> {Messages, NewUAM} = collect_messages(MsgIds, UAM), store_ch_record(C#cr{unacked_messages = NewUAM}), - noreply(deliver_or_enqueue_n( - [{Message, true} || Message <- Messages], State)) + noreply(deliver_or_requeue_n(Messages, State)) end; handle_cast({unblock, ChPid}, State) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index de7bc010b2..f90abe3ffb 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -51,6 +51,7 @@ -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). +-export([dropwhilefoldl/3]). -import(mnesia). -import(lists). @@ -407,3 +408,12 @@ stop_applications(Apps) -> cannot_stop_application, Apps). +dropwhilefoldl(_PredFun, Acc0, []) -> + {Acc0, []}; +dropwhilefoldl(PredFun, Acc0, [E|List]) -> + case PredFun(E, Acc0) of + {true, Acc1} -> + dropwhilefoldl(PredFun, Acc1, List); + {false, Acc1} -> + {Acc1, List} + end. diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 811d140a4e..790f4b756e 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -87,9 +87,12 @@ deliver(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, next_write remove_noacks(Acks) -> lists:filter(fun (A) -> A /= noack end, Acks). -ack(Acks, State = #mqstate { queue = Q }) -> - ok = rabbit_disk_queue:ack(Q, remove_noacks(Acks)), - {ok, State}. +ack(Acks, State = #mqstate { queue = Q }) -> + case remove_noacks(Acks) of + [] -> {ok, State}; + AckTags -> ok = rabbit_disk_queue:ack(Q, AckTags), + {ok, State} + end. tx_publish(MsgId, Msg, _IsPersistent, State = #mqstate { mode = disk }) -> ok = rabbit_disk_queue:tx_publish(MsgId, Msg), |
