diff options
| -rw-r--r-- | src/rabbit_priority_queue.erl | 552 |
1 files changed, 552 insertions, 0 deletions
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl new file mode 100644 index 0000000000..21829cee10 --- /dev/null +++ b/src/rabbit_priority_queue.erl @@ -0,0 +1,552 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2014 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_priority_queue). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit_framing.hrl"). +-behaviour(rabbit_backing_queue). + +-rabbit_boot_step({?MODULE, + [{description, "enable priority queue"}, + {mfa, {?MODULE, enable, []}}, + {requires, pre_boot}, + {enables, kernel_ready}]}). + +-export([enable/0]). + +-export([start/1, stop/0]). + +-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, + purge/1, purge_acks/1, + publish/5, publish_delivered/4, discard/3, drain_confirmed/1, + dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, + ackfold/4, fold/3, len/1, is_empty/1, depth/1, + set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, + handle_pre_hibernate/1, resume/1, msg_rates/1, + info/2, invoke/3, is_duplicate/2]). + +-record(state, {bq, bqss}). +-record(passthrough, {bq, bqs}). + +%% See 'note on suffixes' below +-define(passthrough1(F), State#passthrough{bqs = BQ:F}). +-define(passthrough2(F), + {Res, BQS1} = BQ:F, {Res, State#passthrough{bqs = BQS1}}). +-define(passthrough3(F), + {Res1, Res2, BQS1} = BQ:F, {Res1, Res2, State#passthrough{bqs = BQS1}}). + +enable() -> + {ok, RealBQ} = application:get_env(rabbit, backing_queue_module), + case RealBQ of + ?MODULE -> ok; + _ -> rabbit_log:info("Priority queues enabled, real BQ is ~s~n", + [RealBQ]), + application:set_env( + rabbitmq_priority_queue, backing_queue_module, RealBQ), + application:set_env(rabbit, backing_queue_module, ?MODULE) + end. + +%%---------------------------------------------------------------------------- + +start(QNames) -> + BQ = bq(), + %% TODO this expand-collapse dance is a bit ridiculous but it's what + %% rabbit_amqqueue:recover/0 expects. We could probably simplify + %% this if we rejigged recovery a bit. + {DupNames, ExpNames} = expand_queues(QNames), + case BQ:start(ExpNames) of + {ok, ExpRecovery} -> + {ok, collapse_recovery(QNames, DupNames, ExpRecovery)}; + Else -> + Else + end. + +stop() -> + BQ = bq(), + BQ:stop(). + +%%---------------------------------------------------------------------------- + +mutate_name(P, Q = #amqqueue{name = QName = #resource{name = QNameBin}}) -> + Q#amqqueue{name = QName#resource{name = mutate_name_bin(P, QNameBin)}}. + +mutate_name_bin(P, NameBin) -> <<NameBin/binary, 0, P:8>>. + +expand_queues(QNames) -> + lists:unzip( + lists:append([expand_queue(QName) || QName <- QNames])). + +expand_queue(QName = #resource{name = QNameBin}) -> + {ok, Q} = rabbit_misc:dirty_read({rabbit_durable_queue, QName}), + case priorities(Q) of + none -> [{QName, QName}]; + Ps -> [{QName, QName#resource{name = mutate_name_bin(P, QNameBin)}} + || P <- Ps] + end. + +collapse_recovery(QNames, DupNames, Recovery) -> + NameToTerms = lists:foldl(fun({Name, RecTerm}, Dict) -> + dict:append(Name, RecTerm, Dict) + end, dict:new(), lists:zip(DupNames, Recovery)), + [dict:fetch(Name, NameToTerms) || Name <- QNames]. + +priorities(#amqqueue{arguments = Args}) -> + Ints = [long, short, signedint, byte], + case rabbit_misc:table_lookup(Args, <<"x-max-priority">>) of + {Type, Max} -> case lists:member(Type, Ints) of + false -> none; + true -> lists:reverse(lists:seq(0, Max)) + end; + _ -> none + end. + +%%---------------------------------------------------------------------------- + +init(Q, Recover, AsyncCallback) -> + BQ = bq(), + case priorities(Q) of + none -> RealRecover = case Recover of + [R] -> R; %% [0] + R -> R + end, + #passthrough{bq = BQ, + bqs = BQ:init(Q, RealRecover, AsyncCallback)}; + Ps -> Init = fun (P, Term) -> + BQ:init( + mutate_name(P, Q), Term, + fun (M, F) -> AsyncCallback(M, {P, F}) end) + end, + BQSs = case have_recovery_terms(Recover) of + false -> [{P, Init(P, Recover)} || P <- Ps]; + _ -> PsTerms = lists:zip(Ps, Recover), + [{P, Init(P, Term)} || {P, Term} <- PsTerms] + end, + #state{bq = BQ, + bqss = BQSs} + end. +%% [0] collapse_recovery has the effect of making a list of recovery +%% terms in priority order, even for non priority queues. It's easier +%% to do that and "unwrap" in init/3 than to have collapse_recovery be +%% aware of non-priority queues. + +have_recovery_terms(new) -> false; +have_recovery_terms(non_clean_shutdown) -> false; +have_recovery_terms(_) -> true. + +terminate(Reason, State = #state{bq = BQ}) -> + foreach1(fun (_P, BQSN) -> BQ:terminate(Reason, BQSN) end, State); +terminate(Reason, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(terminate(Reason, BQS)). + +delete_and_terminate(Reason, State = #state{bq = BQ}) -> + foreach1(fun (_P, BQSN) -> + BQ:delete_and_terminate(Reason, BQSN) + end, State); +delete_and_terminate(Reason, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(delete_and_terminate(Reason, BQS)). + +delete_crashed(Q = #amqqueue{name = QName}) -> + BQ = bq(), + case priorities(Q) of + none -> BQ:delete_crashed(Q); + Ps -> [BQ:delete_crashed(mutate_name(P, Q)) || P <- Ps] + end. + +purge(State = #state{bq = BQ}) -> + fold_add2(fun (_P, BQSN) -> BQ:purge(BQSN) end, State); +purge(State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(purge(BQS)). + +purge_acks(State = #state{bq = BQ}) -> + foreach1(fun (_P, BQSN) -> BQ:purge_acks(BQSN) end, State); +purge_acks(State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(purge_acks(BQS)). + +publish(Msg, MsgProps, IsDelivered, ChPid, State = #state{bq = BQ}) -> + pick1(fun (_P, BQSN) -> + BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQSN) + end, Msg, State); +publish(Msg, MsgProps, IsDelivered, ChPid, + State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, BQS)). + +publish_delivered(Msg, MsgProps, ChPid, State = #state{bq = BQ}) -> + pick2(fun (P, BQSN) -> + {AckTag, BQSN1} = BQ:publish_delivered( + Msg, MsgProps, ChPid, BQSN), + {{P, AckTag}, BQSN1} + end, Msg, State); +publish_delivered(Msg, MsgProps, ChPid, + State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(publish_delivered(Msg, MsgProps, ChPid, BQS)). + +%% TODO this is a hack. The BQ api does not give us enough information +%% here - if we had the Msg we could look at its priority and forward +%% to the appropriate sub-BQ. But we don't so we are stuck. +%% +%% But fortunately VQ ignores discard/3, so we can too, *assuming we +%% are talking to VQ*. discard/3 is used by HA, but that's "above" us +%% (if in use) so we don't break that either, just some hypothetical +%% alternate BQ implementation. +discard(_MsgId, _ChPid, State = #state{}) -> + State; + %% We should have something a bit like this here: + %% pick1(fun (_P, BQSN) -> + %% BQ:discard(MsgId, ChPid, BQSN) + %% end, Msg, State); +discard(MsgId, ChPid, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(discard(MsgId, ChPid, BQS)). + +drain_confirmed(State = #state{bq = BQ}) -> + fold_append2(fun (_P, BQSN) -> BQ:drain_confirmed(BQSN) end, State); +drain_confirmed(State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(drain_confirmed(BQS)). + +dropwhile(Pred, State = #state{bq = BQ}) -> + find2(fun (_P, BQSN) -> BQ:dropwhile(Pred, BQSN) end, undefined, State); +dropwhile(Pred, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(dropwhile(Pred, BQS)). + +%% TODO this is a bit nasty. In the one place where fetchwhile/4 is +%% actually used the accumulator is a list of acktags, which of course +%% we need to mutate - so we do that although we are encoding an +%% assumption here. +fetchwhile(Pred, Fun, Acc, State = #state{bq = BQ}) -> + findfold3( + fun (P, BQSN, AccN) -> + {Res, AccN1, BQSN1} = BQ:fetchwhile(Pred, Fun, AccN, BQSN), + {Res, priority_on_acktags(P, AccN1), BQSN1} + end, Acc, undefined, State); +fetchwhile(Pred, Fun, Acc, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough3(fetchwhile(Pred, Fun, Acc, BQS)). + +fetch(AckRequired, State = #state{bq = BQ}) -> + find2( + fun (P, BQSN) -> + case BQ:fetch(AckRequired, BQSN) of + {empty, BQSN1} -> {empty, BQSN1}; + {{Msg, Del, ATag}, BQSN1} -> {{Msg, Del, {P, ATag}}, BQSN1} + end + end, empty, State); +fetch(AckRequired, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(fetch(AckRequired, BQS)). + +drop(AckRequired, State = #state{bq = BQ}) -> + find2(fun (P, BQSN) -> + case BQ:drop(AckRequired, BQSN) of + {empty, BQSN1} -> {empty, BQSN1}; + {{MsgId, AckTag}, BQSN1} -> {{MsgId, {P, AckTag}}, BQSN1} + end + end, empty, State); +drop(AckRequired, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(drop(AckRequired, BQS)). + +ack(AckTags, State = #state{bq = BQ}) -> + fold_by_acktags2(fun (AckTagsN, BQSN) -> + BQ:ack(AckTagsN, BQSN) + end, AckTags, State); +ack(AckTags, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(ack(AckTags, BQS)). + +requeue(AckTags, State = #state{bq = BQ}) -> + fold_by_acktags2(fun (AckTagsN, BQSN) -> + BQ:requeue(AckTagsN, BQSN) + end, AckTags, State); +requeue(AckTags, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(requeue(AckTags, BQS)). + +%% Similar problem to fetchwhile/4 +ackfold(MsgFun, Acc, State = #state{bq = BQ}, AckTags) -> + AckTagsByPriority = partition_acktags(AckTags), + fold2( + fun (P, BQSN, AccN) -> + case orddict:find(P, AckTagsByPriority) of + {ok, ATagsN} -> {AccN1, BQSN1} = + BQ:ackfold(MsgFun, AccN, BQSN, ATagsN), + {priority_on_acktags(P, AccN1), BQSN1}; + error -> {AccN, BQSN} + end + end, Acc, State); +ackfold(MsgFun, Acc, State = #passthrough{bq = BQ, bqs = BQS}, AckTags) -> + ?passthrough2(ackfold(MsgFun, Acc, BQS, AckTags)). + +fold(Fun, Acc, State = #state{bq = BQ}) -> + fold2(fun (_P, BQSN, AccN) -> BQ:fold(Fun, AccN, BQSN) end, Acc, State); +fold(Fun, Acc, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(fold(Fun, Acc, BQS)). + +len(#state{bq = BQ, bqss = BQSs}) -> + add0(fun (_P, BQSN) -> BQ:len(BQSN) end, BQSs); +len(#passthrough{bq = BQ, bqs = BQS}) -> + BQ:len(BQS). + +is_empty(#state{bq = BQ, bqss = BQSs}) -> + all0(fun (_P, BQSN) -> BQ:is_empty(BQSN) end, BQSs); +is_empty(#passthrough{bq = BQ, bqs = BQS}) -> + BQ:is_empty(BQS). + +depth(#state{bq = BQ, bqss = BQSs}) -> + add0(fun (_P, BQSN) -> BQ:depth(BQSN) end, BQSs); +depth(#passthrough{bq = BQ, bqs = BQS}) -> + BQ:depth(BQS). + +set_ram_duration_target(DurationTarget, State = #state{bq = BQ}) -> + foreach1(fun (_P, BQSN) -> + BQ:set_ram_duration_target(DurationTarget, BQSN) + end, State); +set_ram_duration_target(DurationTarget, + State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(set_ram_duration_target(DurationTarget, BQS)). + +ram_duration(State = #state{bq = BQ}) -> + fold_add2(fun (_P, BQSN) -> BQ:ram_duration(BQSN) end, State); +ram_duration(State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(ram_duration(BQS)). + +needs_timeout(#state{bq = BQ, bqss = BQSs}) -> + fold0(fun (_P, _BQSN, timed) -> timed; + (_P, BQSN, idle) -> case BQ:needs_timeout(BQSN) of + timed -> timed; + _ -> idle + end; + (_P, BQSN, false) -> BQ:needs_timeout(BQSN) + end, false, BQSs); +needs_timeout(#passthrough{bq = BQ, bqs = BQS}) -> + BQ:needs_timeout(BQS). + +timeout(State = #state{bq = BQ}) -> + foreach1(fun (_P, BQSN) -> BQ:timeout(BQSN) end, State); +timeout(State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(timeout(BQS)). + +handle_pre_hibernate(State = #state{bq = BQ}) -> + foreach1(fun (_P, BQSN) -> + BQ:handle_pre_hibernate(BQSN) + end, State); +handle_pre_hibernate(State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(handle_pre_hibernate(BQS)). + +resume(State = #state{bq = BQ}) -> + foreach1(fun (_P, BQSN) -> BQ:resume(BQSN) end, State); +resume(State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(resume(BQS)). + +msg_rates(#state{bq = BQ, bqss = BQSs}) -> + fold0(fun(_P, BQSN, {InN, OutN}) -> + {In, Out} = BQ:msg_rates(BQSN), + {InN + In, OutN + Out} + end, {0.0, 0.0}, BQSs); +msg_rates(#passthrough{bq = BQ, bqs = BQS}) -> + BQ:msg_rates(BQS). + +info(backing_queue_status, #state{bq = BQ, bqss = BQSs}) -> + fold0(fun (P, BQSN, Acc) -> + combine_status(P, BQ:info(backing_queue_status, BQSN), Acc) + end, nothing, BQSs); +info(Item, #state{bq = BQ, bqss = BQSs}) -> + fold0(fun (_P, BQSN, Acc) -> + Acc + BQ:info(Item, BQSN) + end, 0, BQSs); +info(Item, #passthrough{bq = BQ, bqs = BQS}) -> + BQ:info(Item, BQS). + +invoke(Mod, {P, Fun}, State = #state{bq = BQ}) -> + pick1(fun (_P, BQSN) -> BQ:invoke(Mod, Fun, BQSN) end, P, State); +invoke(Mod, Fun, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(invoke(Mod, Fun, BQS)). + +is_duplicate(Msg, State = #state{bq = BQ}) -> + pick2(fun (_P, BQSN) -> BQ:is_duplicate(Msg, BQSN) end, Msg, State); +is_duplicate(Msg, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(is_duplicate(Msg, BQS)). + +%%---------------------------------------------------------------------------- + +bq() -> + {ok, RealBQ} = application:get_env( + rabbitmq_priority_queue, backing_queue_module), + RealBQ. + +%% Note on suffixes: Many utility functions here have suffixes telling +%% you the arity of the return type of the BQ function they are +%% designed to work with. +%% +%% 0 - BQ function returns a value and does not modify state +%% 1 - BQ function just returns a new state +%% 2 - BQ function returns a 2-tuple of {Result, NewState} +%% 3 - BQ function returns a 3-tuple of {Result1, Result2, NewState} + +%% Fold over results +fold0(Fun, Acc, [{P, BQSN} | Rest]) -> fold0(Fun, Fun(P, BQSN, Acc), Rest); +fold0(_Fun, Acc, []) -> Acc. + +%% Do all BQs match? +all0(Pred, BQSs) -> fold0(fun (_P, _BQSN, false) -> false; + (P, BQSN, true) -> Pred(P, BQSN) + end, true, BQSs). + +%% Sum results +add0(Fun, BQSs) -> fold0(fun (P, BQSN, Acc) -> Acc + Fun(P, BQSN) end, 0, BQSs). + +%% Apply for all states +foreach1(Fun, State = #state{bqss = BQSs}) -> + a(State#state{bqss = foreach1(Fun, BQSs, [])}). +foreach1(Fun, [{P, BQSN} | Rest], BQSAcc) -> + BQSN1 = Fun(P, BQSN), + foreach1(Fun, Rest, [{P, BQSN1} | BQSAcc]); +foreach1(_Fun, [], BQSAcc) -> + lists:reverse(BQSAcc). + +%% For a given thing, just go to its BQ +pick1(Fun, Prioritisable, #state{bqss = BQSs} = State) -> + {P, BQSN} = priority(Prioritisable, BQSs), + a(State#state{bqss = bq_store(P, Fun(P, BQSN), BQSs)}). + +%% Fold over results +fold2(Fun, Acc, State = #state{bqss = BQSs}) -> + {Res, BQSs1} = fold2(Fun, Acc, BQSs, []), + {Res, a(State#state{bqss = BQSs1})}. +fold2(Fun, Acc, [{P, BQSN} | Rest], BQSAcc) -> + {Acc1, BQSN1} = Fun(P, BQSN, Acc), + fold2(Fun, Acc1, Rest, [{P, BQSN1} | BQSAcc]); +fold2(_Fun, Acc, [], BQSAcc) -> + {Acc, lists:reverse(BQSAcc)}. + +%% Fold over results assuming results are lists and we want to append them +fold_append2(Fun, State) -> + fold2(fun (P, BQSN, Acc) -> + {Res, BQSN1} = Fun(P, BQSN), + {Res ++ Acc, BQSN1} + end, [], State). + +%% Fold over results assuming results are numbers and we want to sum them +fold_add2(Fun, State) -> + fold2(fun (P, BQSN, Acc) -> + {Res, BQSN1} = Fun(P, BQSN), + {add_maybe_infinity(Res, Acc), BQSN1} + end, 0, State). + +%% Fold over results assuming results are lists and we want to append +%% them, and also that we have some AckTags we want to pass in to each +%% invocation. +fold_by_acktags2(Fun, AckTags, State) -> + AckTagsByPriority = partition_acktags(AckTags), + fold_append2(fun (P, BQSN) -> + case orddict:find(P, AckTagsByPriority) of + {ok, AckTagsN} -> Fun(AckTagsN, BQSN); + error -> {[], BQSN} + end + end, State). + +%% For a given thing, just go to its BQ +pick2(Fun, Prioritisable, #state{bqss = BQSs} = State) -> + {P, BQSN} = priority(Prioritisable, BQSs), + {Res, BQSN1} = Fun(P, BQSN), + {Res, a(State#state{bqss = bq_store(P, BQSN1, BQSs)})}. + +%% Run through BQs in priority order until one does not return +%% {NotFound, NewState} or we have gone through them all. +find2(Fun, NotFound, State = #state{bqss = BQSs}) -> + {Res, BQSs1} = find2(Fun, NotFound, BQSs, []), + {Res, a(State#state{bqss = BQSs1})}. +find2(Fun, NotFound, [{P, BQSN} | Rest], BQSAcc) -> + case Fun(P, BQSN) of + {NotFound, BQSN1} -> find2(Fun, NotFound, Rest, [{P, BQSN1} | BQSAcc]); + {Res, BQSN1} -> {Res, lists:reverse([{P, BQSN1} | BQSAcc]) ++ Rest} + end; +find2(_Fun, NotFound, [], BQSAcc) -> + {NotFound, lists:reverse(BQSAcc)}. + +%% Run through BQs in priority order like find2 but also folding as we go. +findfold3(Fun, Acc, NotFound, State = #state{bqss = BQSs}) -> + {Res, Acc1, BQSs1} = findfold3(Fun, Acc, NotFound, BQSs, []), + {Res, Acc1, a(State#state{bqss = BQSs1})}. +findfold3(Fun, Acc, NotFound, [{P, BQSN} | Rest], BQSAcc) -> + case Fun(P, BQSN, Acc) of + {NotFound, Acc1, BQSN1} -> + findfold3(Fun, Acc1, NotFound, Rest, [{P, BQSN1} | BQSAcc]); + {Res, Acc1, BQSN1} -> + {Res, Acc1, lists:reverse([{P, BQSN1} | BQSAcc]) ++ Rest} + end; +findfold3(_Fun, Acc, NotFound, [], BQSAcc) -> + {NotFound, Acc, lists:reverse(BQSAcc)}. + +bq_fetch(P, []) -> exit({not_found, P}); +bq_fetch(P, [{P, BQSN} | _]) -> BQSN; +bq_fetch(P, [{_, _BQSN} | T]) -> bq_fetch(P, T). + +bq_store(P, BQS, BQSs) -> + [{PN, case PN of + P -> BQS; + _ -> BQSN + end} || {PN, BQSN} <- BQSs]. + +a(State = #state{bqss = BQSs}) -> + Ps = [P || {P, _} <- BQSs], + case lists:reverse(lists:usort(Ps)) of + Ps -> State; + _ -> exit({bad_order, Ps}) + end. + +%%---------------------------------------------------------------------------- + +priority(P, BQSs) when is_integer(P) -> + {P, bq_fetch(P, BQSs)}; +priority(_Msg, [{P, BQSN}]) -> + {P, BQSN}; +priority(Msg = #basic_message{content = #content{properties = Props}}, + [{P, BQSN} | Rest]) -> + #'P_basic'{priority = Priority0} = Props, + Priority = case Priority0 of + undefined -> 0; + _ when is_integer(Priority0) -> Priority0 + end, + case Priority >= P of + true -> {P, BQSN}; + false -> priority(Msg, Rest) + end. + +add_maybe_infinity(infinity, _) -> infinity; +add_maybe_infinity(_, infinity) -> infinity; +add_maybe_infinity(A, B) -> A + B. + +partition_acktags(AckTags) -> partition_acktags(AckTags, orddict:new()). + +partition_acktags([], Partitioned) -> + Partitioned; +partition_acktags([{P, AckTag} | Rest], Partitioned) -> + partition_acktags(Rest, orddict:append(P, AckTag, Partitioned)). + +priority_on_acktags(P, AckTags) -> + [case Tag of + _ when is_integer(Tag) -> {P, Tag}; + _ -> Tag + end || Tag <- AckTags]. + +combine_status(P, New, nothing) -> + [{priority_lengths, [{P, proplists:get_value(len, New)}]} | New]; +combine_status(P, New, Old) -> + Combined = [{K, cse(V, proplists:get_value(K, Old))} || {K, V} <- New], + Lens = [{P, proplists:get_value(len, New)} | + proplists:get_value(priority_lengths, Old)], + [{priority_lengths, Lens} | Combined]. + +cse(infinity, _) -> infinity; +cse(_, infinity) -> infinity; +cse(A, B) when is_number(A) -> A + B; +cse({delta, _, _, _}, _) -> {delta, todo, todo, todo}; +cse(A, B) -> exit({A, B}). |
