diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-20 12:03:04 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-20 12:03:04 +0200 |
| commit | 935f14921a30b495d01a067dd042cfccfcdb51bd (patch) | |
| tree | 8a38c841961b8094f301e8e2b450b3d72d41a069 /src | |
| parent | 5a110fd1cb88ccd59cbe334bbde48aa5d0f9705a (diff) | |
| parent | 88328fe18ee5ba1cb8a01797050cc1443669be5d (diff) | |
| download | rabbitmq-server-git-935f14921a30b495d01a067dd042cfccfcdb51bd.tar.gz | |
Merge branch 'master' into rabbitmq-server-351
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_nodes.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_priority_queue.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_queue_decorator.erl | 2 |
6 files changed, 61 insertions, 16 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index d11b8d95a5..7c85685276 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -459,7 +459,8 @@ status() -> {uptime, begin {T,_} = erlang:statistics(wall_clock), T div 1000 - end}], + end}, + {kernel, {net_ticktime, net_kernel:get_net_ticktime()}}], S1 ++ S2 ++ S3 ++ S4. alarms() -> diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index b8997faea5..849efa3611 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -287,10 +287,10 @@ promote_slave([SPid | SPids]) -> {SPid, SPids}. initial_queue_node(Q, DefNode) -> - {MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, all_nodes()), + {MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, rabbit_nodes:all_running()), MNode. -suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, all_nodes()). +suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:all_running()). suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All). %% The third argument exists so we can pull a call to @@ -312,8 +312,6 @@ suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, DefNode, All) -> _ -> {MNode, []} end. -all_nodes() -> rabbit_mnesia:cluster_nodes(running). - policy(Policy, Q) -> case rabbit_policy:get(Policy, Q) of undefined -> none; diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 534ef1afad..62fc718f79 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -352,13 +352,46 @@ batch_publish(Batch, MA, BQ, BQS) -> BQS1 = BQ:batch_publish(Batch, none, noflow, BQS), {MA, BQS1}. +%% TODO +%% +%% The case clause in this function assumes that we are either dealing +%% with a backing_queue that returns acktags as integers, or a +%% priority queue. +%% A possible fix to this would be to add a function +%% to the BQ API where we pass a list of messages and acktags and the +%% BQ implementation knows how to zip them together. batch_publish_delivered(Batch, MA, BQ, BQS) -> {AckTags, BQS1} = BQ:batch_publish_delivered(Batch, none, noflow, BQS), MA1 = - lists:foldl(fun ({{Msg, _}, AckTag}, MAs) -> - [{Msg#basic_message.id, AckTag} | MAs] - end, MA, lists:zip(Batch, AckTags)), + case hd(AckTags) of + HeadTag when is_integer(HeadTag) -> + lists:foldl(fun ({{Msg, _}, AckTag}, MAs) -> + [{msg_id(Msg), AckTag} | MAs] + end, MA, lists:zip(Batch, AckTags)); + _AckTags -> + %% priority queue processing of acktags + BatchByPriority = batch_by_priority(Batch), + lists:foldl(fun (Acks, MAs) -> + {P, _AckTag} = hd(Acks), + Pubs = orddict:fetch(P, BatchByPriority), + MAs0 = zip_msgs_and_tags(Pubs, Acks), + MAs ++ MAs0 + end, MA, AckTags) + end, {MA1, BQS1}. +batch_by_priority(Batch) -> + rabbit_priority_queue:partition_publish_delivered_batch(Batch). + +zip_msgs_and_tags(Pubs, AckTags) -> + lists:zipwith( + fun (Pub, AckTag) -> + {Msg, _Props} = Pub, + {msg_id(Msg), AckTag} + end, Pubs, AckTags). + props(Props) -> Props#message_properties{needs_confirming = false}. + +msg_id(#basic_message{ id = Id }) -> + Id. diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 090aacc63c..57d971715b 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -18,7 +18,8 @@ -export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, is_running/2, is_process_running/2, - cluster_name/0, set_cluster_name/1, ensure_epmd/0]). + cluster_name/0, set_cluster_name/1, ensure_epmd/0, + all_running/0]). -include_lib("kernel/include/inet.hrl"). @@ -42,6 +43,7 @@ -spec(cluster_name/0 :: () -> binary()). -spec(set_cluster_name/1 :: (binary()) -> 'ok'). -spec(ensure_epmd/0 :: () -> 'ok'). +-spec(all_running/0 :: () -> [node()]). -endif. @@ -215,3 +217,5 @@ port_shutdown_loop(Port) -> {Port, {exit_status, _Rc}} -> ok; {Port, _} -> port_shutdown_loop(Port) end. + +all_running() -> rabbit_mnesia:cluster_nodes(running). diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index e3fdddf0ca..46a3991d88 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -42,6 +42,9 @@ handle_pre_hibernate/1, resume/1, msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2]). +%% for rabbit_mirror_queue_sync. +-export([partition_publish_delivered_batch/1]). + -record(state, {bq, bqss}). -record(passthrough, {bq, bqs}). @@ -205,8 +208,7 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow, ?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)). batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ}) -> - PubDict = publishes_by_priority( - Publishes, fun ({Msg, _, _}) -> Msg end), + PubDict = partition_publish_batch(Publishes), lists:foldl( fun ({Priority, Pubs}, St) -> pick1(fun (_P, BQSN) -> @@ -228,8 +230,7 @@ publish_delivered(Msg, MsgProps, ChPid, Flow, ?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)). batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ}) -> - PubDict = publishes_by_priority( - Publishes, fun ({Msg, _}) -> Msg end), + PubDict = partition_publish_delivered_batch(Publishes), {PrioritiesAndAcks, State1} = lists:foldl( fun ({Priority, Pubs}, {PriosAndAcks, St}) -> @@ -238,7 +239,7 @@ batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ}) -> {AckTags, BQSN1} = BQ:batch_publish_delivered( Pubs, ChPid, Flow, BQSN), - {{P, AckTags}, BQSN1} + {priority_on_acktags(P, AckTags), BQSN1} end, Priority, St), {[PriosAndAcks1 | PriosAndAcks], St1} end, {[], State}, orddict:to_list(PubDict)), @@ -571,7 +572,15 @@ a(State = #state{bqss = BQSs}) -> end. %%---------------------------------------------------------------------------- -publishes_by_priority(Publishes, ExtractMsg) -> +partition_publish_batch(Publishes) -> + partition_publishes( + Publishes, fun ({Msg, _, _}) -> Msg end). + +partition_publish_delivered_batch(Publishes) -> + partition_publishes( + Publishes, fun ({Msg, _}) -> Msg end). + +partition_publishes(Publishes, ExtractMsg) -> lists:foldl(fun (Pub, Dict) -> Msg = ExtractMsg(Pub), rabbit_misc:orddict_cons(priority2(Msg), Pub, Dict) diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl index 129f51d099..0c6f0820c7 100644 --- a/src/rabbit_queue_decorator.erl +++ b/src/rabbit_queue_decorator.erl @@ -42,7 +42,7 @@ -export([behaviour_info/1]). behaviour_info(callbacks) -> - [{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2}, + [{startup, 1}, {shutdown, 1}, {policy_changed, 2}, {active_for, 1}, {consumer_state_changed, 3}]; behaviour_info(_Other) -> undefined. |
