diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-01-19 21:45:38 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-01-19 21:45:38 +0000 |
| commit | 2c5d3fee352f71c34db6da79cbf3e696447f776f (patch) | |
| tree | d0ab27671b1e55929f5297762a23c63a37a0eeae /src | |
| parent | a9433b4bb71c81c579e6b3222603bcea529917f4 (diff) | |
| parent | 85b9f37c52e7d5b79cc722c03f423600089777c8 (diff) | |
| download | rabbitmq-server-git-2c5d3fee352f71c34db6da79cbf3e696447f776f.tar.gz | |
merge v1_7 into default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 42 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 9 |
3 files changed, 30 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d728ef6a76..515dbf6823 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -266,7 +266,7 @@ requeue(QPid, MsgIds, ChPid) -> gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). + gen_server2:pcast(QPid, 8, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> safe_pmap_ok( diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c20cb16ca1..7e195d2fcb 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -526,24 +526,24 @@ handle_method(#'basic.recover'{requeue = false}, _, State = #ch{ transaction_id = none, writer_pid = WriterPid, unacked_message_q = UAMQ }) -> - lists:foreach( - fun ({_DeliveryTag, none, _Msg}) -> - %% Was sent as a basic.get_ok. Don't redeliver - %% it. FIXME: appropriate? - ok; - ({DeliveryTag, ConsumerTag, - {QName, QPid, MsgId, _Redelivered, Message}}) -> - %% Was sent as a proper consumer delivery. Resend it as - %% before. - %% - %% FIXME: What should happen if the consumer's been - %% cancelled since? - %% - %% FIXME: should we allocate a fresh DeliveryTag? - ok = internal_deliver( + ok = rabbit_misc:queue_fold( + fun ({_DeliveryTag, none, _Msg}, ok) -> + %% Was sent as a basic.get_ok. Don't redeliver + %% it. FIXME: appropriate? + ok; + ({DeliveryTag, ConsumerTag, + {QName, QPid, MsgId, _Redelivered, Message}}, ok) -> + %% Was sent as a proper consumer delivery. Resend + %% it as before. + %% + %% FIXME: What should happen if the consumer's been + %% cancelled since? + %% + %% FIXME: should we allocate a fresh DeliveryTag? + internal_deliver( WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) - end, queue:to_list(UAMQ)), + end, ok, UAMQ), %% No answer required, apparently! {noreply, State}; @@ -872,7 +872,7 @@ rollback_and_notify(State) -> notify_queues(internal_rollback(State)). fold_per_queue(F, Acc0, UAQ) -> - D = lists:foldl( + D = rabbit_misc:queue_fold( fun ({_DTag, _CTag, {_QName, QPid, MsgId, _Redelivered, _Message}}, D) -> %% dict:append would be simpler and avoid the @@ -883,7 +883,7 @@ fold_per_queue(F, Acc0, UAQ) -> fun (MsgIds) -> [MsgId | MsgIds] end, [MsgId], D) - end, dict:new(), queue:to_list(UAQ)), + end, dict:new(), UAQ), dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). @@ -912,9 +912,9 @@ consumer_queues(Consumers) -> notify_limiter(undefined, _Acked) -> ok; notify_limiter(LimiterPid, Acked) -> - case lists:foldl(fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 - end, 0, queue:to_list(Acked)) of + case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, Acked) of 0 -> ok; Count -> rabbit_limiter:ack(LimiterPid, Count) end. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index d927bfb1f9..0866da3fb0 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -55,7 +55,7 @@ -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). --export([unfold/2, ceil/1]). +-export([unfold/2, ceil/1, queue_fold/3]). -import(mnesia). -import(lists). @@ -126,6 +126,7 @@ -spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). -spec(ceil/1 :: (number()) -> number()). +-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). -endif. @@ -492,3 +493,9 @@ ceil(N) -> 0 -> N; _ -> 1 + T end. + +queue_fold(Fun, Init, Q) -> + case queue:out(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) + end. |
