diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-05-22 15:32:53 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-05-22 15:32:53 +0100 |
| commit | b6ec8493f9b606d4a7a76a46b2e38efc95222a74 (patch) | |
| tree | 497312c7e8f222a6dc4a47933793c71dc958e680 | |
| parent | 7b0f295c2e7acc3b6a40874d3feb5d000ea34c78 (diff) | |
| download | rabbitmq-server-git-b6ec8493f9b606d4a7a76a46b2e38efc95222a74.tar.gz | |
It compiles.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 136 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 4 |
3 files changed, 64 insertions, 87 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 15b3a03655..8fe0d62386 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -175,12 +175,12 @@ deliver_queue(Fun, FunAcc0, case Fun(AckRequired, FunAcc0, State) of {empty, FunAcc1, State2} -> {FunAcc1, State2}; - {{MsgId, Msg, MsgSize, IsDelivered, AckTag, Remaining}, FunAcc1, State2} -> + {{Msg, IsDelivered, AckTag, Remaining}, FunAcc1, State2} -> rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, Delivered, Message}), %% TODO FIXME + {QName, self(), NextId, IsDelivered, Msg}), NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); + true -> dict:store(NextId, {Msg, AckTag}, UAM); false -> UAM end, NewC = C#cr{unsent_message_count = Count + 1, @@ -201,81 +201,74 @@ deliver_queue(Fun, FunAcc0, false -> store_ch_record(C#cr{is_limit_active = true}), NewConsumers = block_consumers(ChPid, RoundRobinTail), - deliver_queue(Fun, FunAcc0, State#q{round_robin = NewConsumers}) + deliver_queue(Fun, FunAcc0, State #q { round_robin = NewConsumers }) end; {empty, _} -> {FunAcc0, State} end. -deliver_from_queue(AckRequired, Acc, State = #q { mixed_state = MS }) -> +deliver_from_queue(AckRequired, Acc = undefined, State = #q { mixed_state = MS }) -> {Res, MS2} = rabbit_mixed_queue:deliver(MS), MS3 = case {Res, AckRequired} of - {empty, _} -> MS2; {_, true} -> MS2; - {{_MsgId, _Msg, _MsgSize, _IsDelivered, AckTag, _Remaining}, false} -> + {empty, _} -> MS2; + {{_Msg, _IsDelivered, AckTag, _Remaining}, false} -> {ok, MS4} = rabbit_mixed_queue:ack([AckTag], MS2), - MS3 + MS4 end, {Res, Acc, State #q { mixed_state = MS3 }}. run_message_queue(State) -> - {undefined, State2} = deliver_queue(deliver_from_queue/1, undefined, State), + {undefined, State2} = deliver_queue(fun deliver_from_queue/3, undefined, State), State2. -attempt_delivery(none, Message, State) -> +attempt_immediate_delivery(none, Msg, State) -> Fun = fun (AckRequired, false, State2) -> {AckTag, State3} = if AckRequired -> - %% TODO API CHANGE - {ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered(Message, + {ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered(Msg, State2 #q.mixed_state), {AckTag2, State2 #q { mixed_state = MS }}; true -> {noack, State2} end, - {{MsgId, Message, MsgSize, false, AckTag, 0}, true, State3} %% TODO FIX ME + {{Msg, false, AckTag, 0}, true, State3} end, deliver_queue(Fun, false, State); -attempt_delivery(Txn, Message, State) -> - {ok, MS} = rabbit_mixed_queue:tx_publish(Message, State #q.mixed_state), %% TODO API CHANGE - record_pending_message(Txn, Message), +attempt_immediate_delivery(Txn, Msg, State) -> + {ok, MS} = rabbit_mixed_queue:tx_publish(Msg, State #q.mixed_state), + record_pending_message(Txn, Msg), {true, State #q { mixed_state = MS }}. -deliver_or_enqueue(Txn, Message, State) -> - case attempt_delivery(Txn, Message, State) of +deliver_or_enqueue(Txn, Msg, State) -> + case attempt_immediate_delivery(Txn, Msg, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - {ok, MS} = rabbit_mixed_queue:publish(Message, State #q.mixed_state), %% TODO API CHANGE + {ok, MS} = rabbit_mixed_queue:publish(Msg, State #q.mixed_state), {false, NewState #q { mixed_state = MS }} end. %% all these messages have already been delivered at least once and %% not ack'd, but need to be either redelivered or requeued -deliver_or_requeue_n(Messages, State) -> - {AutoAcks, Remaining} = - dropwhilefoldl(deliver_or_requeue_msg/2, {[], State}, Messages), - {ok, MS} = rabbit_mixed_queue:ack(lists:reverse(AutoAcks), State #q.mixed_state), %% TODO FIXME - case Remaining of - [] -> run_message_queue(State #q { mixed_state = MS }); - _ -> {ok, MS2} = rabbit_mixed_queue:requeue(Remaining, MS), %% TODO FIXME - State #q { mixed_state = MS2 } +deliver_or_requeue_n([], State) -> + State; +deliver_or_requeue_n(MsgsWithAcks, State) -> + {{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} = + deliver_queue(fun deliver_or_requeue_msgs/3, {length(MsgsWithAcks) - 1, [], MsgsWithAcks}, State), + {ok, MS} = rabbit_mixed_queue:ack(lists:reverse(AutoAcks), NewState #q.mixed_state), + case OutstandingMsgs of + [] -> run_message_queue(NewState #q { mixed_state = MS }); + _ -> {ok, MS2} = rabbit_mixed_queue:requeue(OutstandingMsgs, MS), + NewState #q { mixed_state = MS2 } end. -deliver_or_requeue_msg(Message, {AcksAcc, State}) -> %% TODO the acktag really should be within the msg here - Fun = fun (AckRequired, {false, AcksAcc}, State2) -> - AcksAcc2 = if AckRequired -> AcksAcc; - true -> [AckTag|AcksAcc] - end, - {{MsgId, Message, MsgSize, true, AckTag, 0}, {true, AcksAcc2}, State2} end, %% TODO FIX ME - case deliver_queue(Fun, {false, AcksAcc}, State) of - {{true, AcksAcc3}, State1} -> - {true, {AcksAcc3, State1}}; - {{false, AcksAcc}, State1} -> - {false, {AcksAcc, State1}} - end. +deliver_or_requeue_msgs(false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> + {{Msg, true, noack, Len}, {Len - 1, [AckTag|AcksAcc], MsgsWithAcks}, State}; +deliver_or_requeue_msgs(true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> + {{Msg, true, AckTag, Len}, {Len - 1, [AcksAcc], MsgsWithAcks}, State}. block_consumers(ChPid, RoundRobin) -> %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ChPid, queue:to_list(RoundRobin)]), @@ -350,8 +343,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, erase({ch, ChPid}), case check_auto_delete( deliver_or_requeue_n( - [Message || %% TODO NEED TO GRAB ACKTAGS OUT OF HERE AND PASS THEM THROUGH - {_Messsage_id, Message} <- dict:to_list(UAM)], + [MsgWithAck || + {_MsgId, MsgWithAck} <- dict:to_list(UAM)], State#q{ exclusive_consumer = case Holder of {ChPid, _} -> none; @@ -421,10 +414,6 @@ all_tx_record() -> all_tx() -> [Txn || {{txn, Txn}, _} <- get()]. -is_tx_persistent(Txn) -> - #tx{is_persistent = Res} = lookup_tx(Txn), - Res. - record_pending_message(Txn, Message = #basic_message { is_persistent = IsPersistent }) -> Tx = #tx{pending_messages = Pending, is_persistent = IsPersistentTxn } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [{Message, false} | Pending], @@ -445,12 +434,13 @@ commit_transaction(Txn, State) -> case lookup_ch(ChPid) of not_found -> State; C = #cr { unacked_messages = UAM } -> - {Acked, Remaining} = - collect_messages(PendingAcksAppended, UAM), + {MsgWithAcks, Remaining} = + collect_messages(PendingAcksOrdered, UAM), store_ch_record(C#cr{unacked_messages = Remaining}), - MS = rabbit_mixed_queue:tx_commit(PendingMessagesOrdered, - Acked, - State #q.mixed_state), + MS = rabbit_mixed_queue:tx_commit( + PendingMessagesOrdered, + lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks), + State #q.mixed_state), State #q { mixed_state = MS } end. @@ -461,7 +451,7 @@ rollback_transaction(Txn, State) -> State #q { mixed_state = MS }. %% {A, B} = collect_messages(C, D) %% A = C `intersect` D; B = D \\ C -%% err, A = C `intersect` D , via projection through the dict that is A +%% err, A = C `intersect` D , via projection through the dict that is C collect_messages(MsgIds, UAM) -> lists:mapfoldl( fun (MsgId, D) -> {dict:fetch(MsgId, D), dict:erase(MsgId, D)} end, @@ -475,8 +465,8 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete; i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments; i(pid, _) -> self(); -i(messages_ready, #q{message_buffer = MessageBuffer}) -> - queue:len(MessageBuffer); +i(messages_ready, #q { mixed_state = MS }) -> + rabbit_mixed_queue:length(MS); i(messages_unacknowledged, _) -> lists:sum([dict:size(UAM) || #cr{unacked_messages = UAM} <- all_ch_record()]); @@ -526,7 +516,7 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, Message, State), + {Delivered, NewState} = attempt_immediate_delivery(Txn, Message, State), reply(Delivered, NewState); handle_call({deliver, Txn, Message}, _From, State) -> @@ -553,23 +543,23 @@ handle_call({basic_get, ChPid, NoAck}, _From, }) -> case rabbit_mixed_queue:deliver(MS) of {empty, MS2} -> reply(empty, State #q { mixed_state = MS2 }); - {{MsgId, Msg, MsgSize, IsDelivered, AckTag, Remaining}, MS2} -> + {{Msg, IsDelivered, AckTag, Remaining}, MS2} -> AckRequired = not(NoAck), - MS3 = + {ok, MS3} = case AckRequired of true -> C = #cr{unacked_messages = UAM} = ch_record(ChPid), - NewUAM = dict:store(NextId, Message, UAM), + NewUAM = dict:store(NextId, {Msg, AckTag}, UAM), store_ch_record(C#cr{unacked_messages = NewUAM}), - MS2; + {ok, MS2}; false -> rabbit_mixed_queue:ack([AckTag], MS2) end, - Message = {QName, self(), NextId, IsDelivered, Msg}, %% TODO, FIX UP + Message = {QName, self(), NextId, IsDelivered, Msg}, reply({ok, Remaining, Message}, - State#q{next_msg_id = NextId + 1}); - {empty, _} -> - reply(empty, State) + State #q { next_msg_id = NextId + 1, + mixed_state = MS3 + }) end; handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, @@ -645,10 +635,10 @@ handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, reply({ok, Name, Length, queue:len(RoundRobin)}, State #q { mixed_state = MS2 }); handle_call({delete, IfUnused, IfEmpty}, _From, - State = #q{message_buffer = MessageBuffer, mixed_state = MS}) -> - IsEmpty = queue:is_empty(MessageBuffer), - IsUnused = is_unused(), + State = #q { mixed_state = MS }) -> {Length, MS2} = rabbit_mixed_queue:length(MS), + IsEmpty = Length == 0, + IsUnused = is_unused(), if IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); @@ -695,25 +685,23 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> not_found -> noreply(State); C = #cr{unacked_messages = UAM} -> - {Acked, Remaining} = collect_messages(MsgIds, UAM), + {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM), case Txn of none -> - MS = rabbit_mixed_queue:ack(Acked, State #q.mixed_state), %% TODO API + Acks = lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks), + {ok, MS} = rabbit_mixed_queue:ack(Acks, State #q.mixed_state), store_ch_record(C#cr{unacked_messages = Remaining}), noreply(State #q { mixed_state = MS }); _ -> record_pending_acks(Txn, ChPid, MsgIds), noreply(State) - end, + end end; handle_cast({rollback, Txn}, State) -> NewState = rollback_transaction(Txn, State), erase_tx(Txn), - noreply(State2); - -handle_cast({redeliver, Messages}, State) -> - noreply(ok); %% TODO - probably remove - only used by the old persister + noreply(NewState); handle_cast({requeue, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of @@ -722,9 +710,9 @@ handle_cast({requeue, MsgIds, ChPid}, State) -> [ChPid]), noreply(State); C = #cr{unacked_messages = UAM} -> - {Messages, NewUAM} = collect_messages(MsgIds, UAM), %% TODO Messages must contain AckTags too + {MsgWithAcks, NewUAM} = collect_messages(MsgIds, UAM), store_ch_record(C#cr{unacked_messages = NewUAM}), - noreply(deliver_or_requeue_n(Messages, State)) + noreply(deliver_or_requeue_n(MsgWithAcks, State)) end; handle_cast({unblock, ChPid}, State) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index f90abe3ffb..f207038eff 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -51,7 +51,6 @@ -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). --export([dropwhilefoldl/3]). -import(mnesia). -import(lists). @@ -407,13 +406,3 @@ stop_applications(Apps) -> not_started, cannot_stop_application, Apps). - -dropwhilefoldl(_PredFun, Acc0, []) -> - {Acc0, []}; -dropwhilefoldl(PredFun, Acc0, [E|List]) -> - case PredFun(E, Acc0) of - {true, Acc1} -> - dropwhilefoldl(PredFun, Acc1, List); - {false, Acc1} -> - {Acc1, List} - end. diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index e56e667d4f..24d0de8d59 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -80,8 +80,8 @@ publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersist State = #mqstate { mode = Mode, queue = Q }) when Mode =:= disk orelse IsPersistent -> ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)), - {MsgId, false, Ack, 0} = rabbit_disk_queue:phantom_deliver(Q), - {ok, Ack, State}; + {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q), + {ok, AckTag, State}; publish_delivered(#basic_message { is_persistent = false }, State = #mqstate { mode = mixed }) -> {ok, noack, State}. |
