summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-01-21 13:38:55 +0000
committerMatthew Sackman <matthew@lshift.net>2010-01-21 13:38:55 +0000
commit0e6a36cab0dd953d61d42616b35128f84d39066f (patch)
treefa0880ee6307c1179b6997e5e62600b6722692f2 /src
parente1ca83601402e403106c0db14b20b98b16bccff7 (diff)
parentc462b3841e055f6e95472c4144649f7ae33dcaa7 (diff)
downloadrabbitmq-server-git-0e6a36cab0dd953d61d42616b35128f84d39066f.tar.gz
merge in from default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_channel.erl42
-rw-r--r--src/rabbit_misc.erl9
4 files changed, 29 insertions, 26 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index a0cc14367f..1e044e0a8d 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -243,7 +243,7 @@ run_boot_step({StepName, Attributes}) ->
end,
case [MFA || {mfa, MFA} <- Attributes] of
[] ->
- io:format("progress -- ~s~n", [Description]);
+ io:format("-- ~s~n", [Description]);
MFAs ->
io:format("starting ~-60s ...", [Description]),
[case catch apply(M,F,A) of
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 36123fbd19..77fed28d84 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -312,7 +312,7 @@ requeue(QPid, MsgIds, ChPid) ->
gen_server2:cast(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
- gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}).
+ gen_server2:pcast(QPid, 8, {ack, Txn, MsgIds, ChPid}).
commit_all(QPids, Txn) ->
safe_pmap_ok(
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 6afd0bc9a7..7ca6955326 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -526,24 +526,24 @@ handle_method(#'basic.recover'{requeue = false},
_, State = #ch{ transaction_id = none,
writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
- lists:foreach(
- fun ({_DeliveryTag, none, _Msg}) ->
- %% Was sent as a basic.get_ok. Don't redeliver
- %% it. FIXME: appropriate?
- ok;
- ({DeliveryTag, ConsumerTag,
- {QName, QPid, MsgId, _Redelivered, Message}}) ->
- %% Was sent as a proper consumer delivery. Resend it as
- %% before.
- %%
- %% FIXME: What should happen if the consumer's been
- %% cancelled since?
- %%
- %% FIXME: should we allocate a fresh DeliveryTag?
- ok = internal_deliver(
+ ok = rabbit_misc:queue_fold(
+ fun ({_DeliveryTag, none, _Msg}, ok) ->
+ %% Was sent as a basic.get_ok. Don't redeliver
+ %% it. FIXME: appropriate?
+ ok;
+ ({DeliveryTag, ConsumerTag,
+ {QName, QPid, MsgId, _Redelivered, Message}}, ok) ->
+ %% Was sent as a proper consumer delivery. Resend
+ %% it as before.
+ %%
+ %% FIXME: What should happen if the consumer's been
+ %% cancelled since?
+ %%
+ %% FIXME: should we allocate a fresh DeliveryTag?
+ internal_deliver(
WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
- end, queue:to_list(UAMQ)),
+ end, ok, UAMQ),
%% No answer required, apparently!
{noreply, State};
@@ -872,7 +872,7 @@ rollback_and_notify(State) ->
notify_queues(internal_rollback(State)).
fold_per_queue(F, Acc0, UAQ) ->
- D = lists:foldl(
+ D = rabbit_misc:queue_fold(
fun ({_DTag, _CTag,
{_QName, QPid, MsgId, _Redelivered, _Message}}, D) ->
%% dict:append would be simpler and avoid the
@@ -883,7 +883,7 @@ fold_per_queue(F, Acc0, UAQ) ->
fun (MsgIds) -> [MsgId | MsgIds] end,
[MsgId],
D)
- end, dict:new(), queue:to_list(UAQ)),
+ end, dict:new(), UAQ),
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
@@ -912,9 +912,9 @@ consumer_queues(Consumers) ->
notify_limiter(undefined, _Acked) ->
ok;
notify_limiter(LimiterPid, Acked) ->
- case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
- end, 0, queue:to_list(Acked)) of
+ case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, Acked) of
0 -> ok;
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 2b5fe4c746..172e27f460 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -97,7 +97,7 @@
-spec(enable_cover/1 :: (string()) -> ok_or_error()).
-spec(report_cover/1 :: (string()) -> 'ok').
-spec(throw_on_error/2 ::
- (atom(), thunk({error, any()} | {ok, A} | A)) -> A).
+ (atom(), thunk({error, any()} | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(with_user/2 :: (username(), thunk(A)) -> A).
@@ -340,6 +340,9 @@ intersperse(Sep, [E|T]) -> [E, Sep | intersperse(Sep, T)].
%% This is a modified version of Luke Gorrie's pmap -
%% http://lukego.livejournal.com/6753.html - that doesn't care about
%% the order in which results are received.
+%%
+%% WARNING: This is is deliberately lightweight rather than robust -- if F
+%% throws, upmap will hang forever, so make sure F doesn't throw!
upmap(F, L) ->
Parent = self(),
Ref = make_ref(),
@@ -428,7 +431,7 @@ append_file(File, _, Suffix) ->
ensure_parent_dirs_exist(Filename) ->
case filelib:ensure_dir(Filename) of
ok -> ok;
- {error, Reason} ->
+ {error, Reason} ->
throw({error, {cannot_create_parent_dirs, Filename, Reason}})
end.
@@ -493,6 +496,6 @@ ceil(N) ->
queue_fold(Fun, Init, Q) ->
case queue:out(Q) of
- {empty, _Q} -> Init;
+ {empty, _Q} -> Init;
{{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
end.