summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-10-20 12:03:04 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-10-20 12:03:04 +0200
commit935f14921a30b495d01a067dd042cfccfcdb51bd (patch)
tree8a38c841961b8094f301e8e2b450b3d72d41a069 /src
parent5a110fd1cb88ccd59cbe334bbde48aa5d0f9705a (diff)
parent88328fe18ee5ba1cb8a01797050cc1443669be5d (diff)
downloadrabbitmq-server-git-935f14921a30b495d01a067dd042cfccfcdb51bd.tar.gz
Merge branch 'master' into rabbitmq-server-351
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl3
-rw-r--r--src/rabbit_mirror_queue_misc.erl6
-rw-r--r--src/rabbit_mirror_queue_sync.erl39
-rw-r--r--src/rabbit_nodes.erl6
-rw-r--r--src/rabbit_priority_queue.erl21
-rw-r--r--src/rabbit_queue_decorator.erl2
6 files changed, 61 insertions, 16 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index d11b8d95a5..7c85685276 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -459,7 +459,8 @@ status() ->
{uptime, begin
{T,_} = erlang:statistics(wall_clock),
T div 1000
- end}],
+ end},
+ {kernel, {net_ticktime, net_kernel:get_net_ticktime()}}],
S1 ++ S2 ++ S3 ++ S4.
alarms() ->
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index b8997faea5..849efa3611 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -287,10 +287,10 @@ promote_slave([SPid | SPids]) ->
{SPid, SPids}.
initial_queue_node(Q, DefNode) ->
- {MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, all_nodes()),
+ {MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, rabbit_nodes:all_running()),
MNode.
-suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, all_nodes()).
+suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:all_running()).
suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All).
%% The third argument exists so we can pull a call to
@@ -312,8 +312,6 @@ suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, DefNode, All) ->
_ -> {MNode, []}
end.
-all_nodes() -> rabbit_mnesia:cluster_nodes(running).
-
policy(Policy, Q) ->
case rabbit_policy:get(Policy, Q) of
undefined -> none;
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 534ef1afad..62fc718f79 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -352,13 +352,46 @@ batch_publish(Batch, MA, BQ, BQS) ->
BQS1 = BQ:batch_publish(Batch, none, noflow, BQS),
{MA, BQS1}.
+%% TODO
+%%
+%% The case clause in this function assumes that we are either dealing
+%% with a backing_queue that returns acktags as integers, or a
+%% priority queue.
+%% A possible fix to this would be to add a function
+%% to the BQ API where we pass a list of messages and acktags and the
+%% BQ implementation knows how to zip them together.
batch_publish_delivered(Batch, MA, BQ, BQS) ->
{AckTags, BQS1} = BQ:batch_publish_delivered(Batch, none, noflow, BQS),
MA1 =
- lists:foldl(fun ({{Msg, _}, AckTag}, MAs) ->
- [{Msg#basic_message.id, AckTag} | MAs]
- end, MA, lists:zip(Batch, AckTags)),
+ case hd(AckTags) of
+ HeadTag when is_integer(HeadTag) ->
+ lists:foldl(fun ({{Msg, _}, AckTag}, MAs) ->
+ [{msg_id(Msg), AckTag} | MAs]
+ end, MA, lists:zip(Batch, AckTags));
+ _AckTags ->
+ %% priority queue processing of acktags
+ BatchByPriority = batch_by_priority(Batch),
+ lists:foldl(fun (Acks, MAs) ->
+ {P, _AckTag} = hd(Acks),
+ Pubs = orddict:fetch(P, BatchByPriority),
+ MAs0 = zip_msgs_and_tags(Pubs, Acks),
+ MAs ++ MAs0
+ end, MA, AckTags)
+ end,
{MA1, BQS1}.
+batch_by_priority(Batch) ->
+ rabbit_priority_queue:partition_publish_delivered_batch(Batch).
+
+zip_msgs_and_tags(Pubs, AckTags) ->
+ lists:zipwith(
+ fun (Pub, AckTag) ->
+ {Msg, _Props} = Pub,
+ {msg_id(Msg), AckTag}
+ end, Pubs, AckTags).
+
props(Props) ->
Props#message_properties{needs_confirming = false}.
+
+msg_id(#basic_message{ id = Id }) ->
+ Id.
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 090aacc63c..57d971715b 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -18,7 +18,8 @@
-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0,
is_running/2, is_process_running/2,
- cluster_name/0, set_cluster_name/1, ensure_epmd/0]).
+ cluster_name/0, set_cluster_name/1, ensure_epmd/0,
+ all_running/0]).
-include_lib("kernel/include/inet.hrl").
@@ -42,6 +43,7 @@
-spec(cluster_name/0 :: () -> binary()).
-spec(set_cluster_name/1 :: (binary()) -> 'ok').
-spec(ensure_epmd/0 :: () -> 'ok').
+-spec(all_running/0 :: () -> [node()]).
-endif.
@@ -215,3 +217,5 @@ port_shutdown_loop(Port) ->
{Port, {exit_status, _Rc}} -> ok;
{Port, _} -> port_shutdown_loop(Port)
end.
+
+all_running() -> rabbit_mnesia:cluster_nodes(running).
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index e3fdddf0ca..46a3991d88 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -42,6 +42,9 @@
handle_pre_hibernate/1, resume/1, msg_rates/1,
info/2, invoke/3, is_duplicate/2, set_queue_mode/2]).
+%% for rabbit_mirror_queue_sync.
+-export([partition_publish_delivered_batch/1]).
+
-record(state, {bq, bqss}).
-record(passthrough, {bq, bqs}).
@@ -205,8 +208,7 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow,
?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)).
batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ}) ->
- PubDict = publishes_by_priority(
- Publishes, fun ({Msg, _, _}) -> Msg end),
+ PubDict = partition_publish_batch(Publishes),
lists:foldl(
fun ({Priority, Pubs}, St) ->
pick1(fun (_P, BQSN) ->
@@ -228,8 +230,7 @@ publish_delivered(Msg, MsgProps, ChPid, Flow,
?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)).
batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ}) ->
- PubDict = publishes_by_priority(
- Publishes, fun ({Msg, _}) -> Msg end),
+ PubDict = partition_publish_delivered_batch(Publishes),
{PrioritiesAndAcks, State1} =
lists:foldl(
fun ({Priority, Pubs}, {PriosAndAcks, St}) ->
@@ -238,7 +239,7 @@ batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ}) ->
{AckTags, BQSN1} =
BQ:batch_publish_delivered(
Pubs, ChPid, Flow, BQSN),
- {{P, AckTags}, BQSN1}
+ {priority_on_acktags(P, AckTags), BQSN1}
end, Priority, St),
{[PriosAndAcks1 | PriosAndAcks], St1}
end, {[], State}, orddict:to_list(PubDict)),
@@ -571,7 +572,15 @@ a(State = #state{bqss = BQSs}) ->
end.
%%----------------------------------------------------------------------------
-publishes_by_priority(Publishes, ExtractMsg) ->
+partition_publish_batch(Publishes) ->
+ partition_publishes(
+ Publishes, fun ({Msg, _, _}) -> Msg end).
+
+partition_publish_delivered_batch(Publishes) ->
+ partition_publishes(
+ Publishes, fun ({Msg, _}) -> Msg end).
+
+partition_publishes(Publishes, ExtractMsg) ->
lists:foldl(fun (Pub, Dict) ->
Msg = ExtractMsg(Pub),
rabbit_misc:orddict_cons(priority2(Msg), Pub, Dict)
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
index 129f51d099..0c6f0820c7 100644
--- a/src/rabbit_queue_decorator.erl
+++ b/src/rabbit_queue_decorator.erl
@@ -42,7 +42,7 @@
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2},
+ [{startup, 1}, {shutdown, 1}, {policy_changed, 2},
{active_for, 1}, {consumer_state_changed, 3}];
behaviour_info(_Other) ->
undefined.