diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 42 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 9 |
4 files changed, 29 insertions, 26 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index a0cc14367f..1e044e0a8d 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -243,7 +243,7 @@ run_boot_step({StepName, Attributes}) -> end, case [MFA || {mfa, MFA} <- Attributes] of [] -> - io:format("progress -- ~s~n", [Description]); + io:format("-- ~s~n", [Description]); MFAs -> io:format("starting ~-60s ...", [Description]), [case catch apply(M,F,A) of diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 36123fbd19..77fed28d84 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -312,7 +312,7 @@ requeue(QPid, MsgIds, ChPid) -> gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). + gen_server2:pcast(QPid, 8, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> safe_pmap_ok( diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 6afd0bc9a7..7ca6955326 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -526,24 +526,24 @@ handle_method(#'basic.recover'{requeue = false}, _, State = #ch{ transaction_id = none, writer_pid = WriterPid, unacked_message_q = UAMQ }) -> - lists:foreach( - fun ({_DeliveryTag, none, _Msg}) -> - %% Was sent as a basic.get_ok. Don't redeliver - %% it. FIXME: appropriate? - ok; - ({DeliveryTag, ConsumerTag, - {QName, QPid, MsgId, _Redelivered, Message}}) -> - %% Was sent as a proper consumer delivery. Resend it as - %% before. - %% - %% FIXME: What should happen if the consumer's been - %% cancelled since? - %% - %% FIXME: should we allocate a fresh DeliveryTag? - ok = internal_deliver( + ok = rabbit_misc:queue_fold( + fun ({_DeliveryTag, none, _Msg}, ok) -> + %% Was sent as a basic.get_ok. Don't redeliver + %% it. FIXME: appropriate? + ok; + ({DeliveryTag, ConsumerTag, + {QName, QPid, MsgId, _Redelivered, Message}}, ok) -> + %% Was sent as a proper consumer delivery. Resend + %% it as before. + %% + %% FIXME: What should happen if the consumer's been + %% cancelled since? + %% + %% FIXME: should we allocate a fresh DeliveryTag? + internal_deliver( WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) - end, queue:to_list(UAMQ)), + end, ok, UAMQ), %% No answer required, apparently! {noreply, State}; @@ -872,7 +872,7 @@ rollback_and_notify(State) -> notify_queues(internal_rollback(State)). fold_per_queue(F, Acc0, UAQ) -> - D = lists:foldl( + D = rabbit_misc:queue_fold( fun ({_DTag, _CTag, {_QName, QPid, MsgId, _Redelivered, _Message}}, D) -> %% dict:append would be simpler and avoid the @@ -883,7 +883,7 @@ fold_per_queue(F, Acc0, UAQ) -> fun (MsgIds) -> [MsgId | MsgIds] end, [MsgId], D) - end, dict:new(), queue:to_list(UAQ)), + end, dict:new(), UAQ), dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). @@ -912,9 +912,9 @@ consumer_queues(Consumers) -> notify_limiter(undefined, _Acked) -> ok; notify_limiter(LimiterPid, Acked) -> - case lists:foldl(fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 - end, 0, queue:to_list(Acked)) of + case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, Acked) of 0 -> ok; Count -> rabbit_limiter:ack(LimiterPid, Count) end. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 2b5fe4c746..172e27f460 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -97,7 +97,7 @@ -spec(enable_cover/1 :: (string()) -> ok_or_error()). -spec(report_cover/1 :: (string()) -> 'ok'). -spec(throw_on_error/2 :: - (atom(), thunk({error, any()} | {ok, A} | A)) -> A). + (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). -spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(with_user/2 :: (username(), thunk(A)) -> A). @@ -340,6 +340,9 @@ intersperse(Sep, [E|T]) -> [E, Sep | intersperse(Sep, T)]. %% This is a modified version of Luke Gorrie's pmap - %% http://lukego.livejournal.com/6753.html - that doesn't care about %% the order in which results are received. +%% +%% WARNING: This is is deliberately lightweight rather than robust -- if F +%% throws, upmap will hang forever, so make sure F doesn't throw! upmap(F, L) -> Parent = self(), Ref = make_ref(), @@ -428,7 +431,7 @@ append_file(File, _, Suffix) -> ensure_parent_dirs_exist(Filename) -> case filelib:ensure_dir(Filename) of ok -> ok; - {error, Reason} -> + {error, Reason} -> throw({error, {cannot_create_parent_dirs, Filename, Reason}}) end. @@ -493,6 +496,6 @@ ceil(N) -> queue_fold(Fun, Init, Q) -> case queue:out(Q) of - {empty, _Q} -> Init; + {empty, _Q} -> Init; {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) end. |
