summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2021-01-27 14:48:59 +0100
committerdcorbacho <dparracorbacho@piotal.io>2021-01-28 13:34:50 +0100
commit9357b7f1edacdc9cd55226a062236dc1362ea2d9 (patch)
treeb3935651d8df37cf32234af70c8439ae11de8480
parent3de198845fa28f59b9520d92d1579911af652734 (diff)
downloadrabbitmq-server-git-stream-competing-readers.tar.gz
Competing readers on stream queuesstream-competing-readers
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl210
-rw-r--r--deps/rabbit/test/rabbit_stream_queue_SUITE.erl116
2 files changed, 279 insertions, 47 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index 206584fe61..fb1b0f212d 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -62,9 +62,15 @@
max :: non_neg_integer(),
start_offset = 0 :: non_neg_integer(),
listening_offset = 0 :: non_neg_integer(),
- log :: undefined | osiris_log:state()}).
-
--record(stream_client, {name :: term(),
+ log :: undefined | osiris_log:state(),
+ competing :: boolean(),
+ pending_chunks = [] :: [non_neg_integer()],
+ pending_ack = #{} :: #{term() => term()},
+ pending_ack_by_chunk = #{} :: #{term() => [term()]}
+ }).
+
+-record(stream_client, {name :: binary(),
+ reference :: term(),
leader :: pid(),
next_seq = 1 :: non_neg_integer(),
correlation = #{} :: #{appender_seq() => {msg_id(), msg()}},
@@ -107,7 +113,7 @@ start_cluster(Q0, Node) ->
amqqueue:set_type_state(Q0, Conf0)) of
{ok, {error, already_started}, _} ->
{protocol_error, precondition_failed, "safe queue name already in use '~s'",
- [Node]};
+ [maps:get(name, Conf0)]};
{ok, {created, Q}, _} ->
rabbit_event:notify(queue_created,
[{name, QName},
@@ -190,6 +196,13 @@ consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) ->
{_, V} ->
V
end,
+ %% If we have competing consumers, offset must be ignored
+ Competing = case rabbit_misc:table_lookup(Args, <<"x-competing-consumers">>) of
+ undefined ->
+ false;
+ _ ->
+ true
+ end,
rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, QName,
ConsumerPrefetchCount, false,
@@ -199,7 +212,7 @@ consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) ->
%% do
maybe_send_reply(ChPid, OkMsg),
QState = begin_stream(QState0, Q, ConsumerTag, Offset,
- ConsumerPrefetchCount),
+ ConsumerPrefetchCount, Competing),
{ok, QState, []};
Err ->
Err
@@ -214,11 +227,25 @@ get_local_pid(#{replica_pids := ReplicaPids}) ->
Local.
begin_stream(#stream_client{readers = Readers0} = State,
- Q, Tag, Offset, Max) ->
- LocalPid = get_local_pid(amqqueue:get_type_state(Q)),
+ Q, Tag, Offset0, Max, Competing) ->
+ Conf = amqqueue:get_type_state(Q),
+ QName = amqqueue:get_name(Q),
+ LocalPid = get_local_pid(Conf),
+ Offset = case Competing of
+ true -> 'first';
+ false -> Offset0
+ end,
{ok, Seg0} = osiris:init_reader(LocalPid, Offset),
NextOffset = osiris_log:next_offset(Seg0) - 1,
- osiris:register_offset_listener(LocalPid, NextOffset),
+ case Competing of
+ true ->
+ {ok, CRC} = osiris_server_sup:get_crc(maps:get(name, Conf)),
+ Fun = fun(Evt) -> {'$gen_cast', {queue_event, QName, Evt}} end,
+ %% TODO configure number of chunks
+ gen_server:call(CRC, {register, self(), Tag, 2, Fun});
+ false ->
+ osiris:register_offset_listener(LocalPid, NextOffset)
+ end,
%% TODO: avoid double calls to the same process
StartOffset = case Offset of
first -> NextOffset;
@@ -232,11 +259,12 @@ begin_stream(#stream_client{readers = Readers0} = State,
start_offset = StartOffset,
listening_offset = NextOffset,
log = Seg0,
- max = Max},
+ max = Max,
+ competing = Competing},
State#stream_client{readers = Readers0#{Tag => Str0}}.
cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
- name = QName} = State) ->
+ reference = QName} = State) ->
Readers = maps:remove(ConsumerTag, Readers0),
rabbit_core_metrics:consumer_deleted(self(), ConsumerTag, QName),
rabbit_event:notify(consumer_deleted, [{consumer_tag, ConsumerTag},
@@ -247,16 +275,21 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
{ok, State#stream_client{readers = Readers}}.
credit(CTag, Credit, Drain, #stream_client{readers = Readers0,
- name = Name,
leader = Leader} = State) ->
{Readers1, Msgs} = case Readers0 of
- #{CTag := #stream{credit = Credit0} = Str0} ->
- Str1 = Str0#stream{credit = Credit0 + Credit},
- {Str, Msgs0} = stream_entries(Name, Leader, Str1),
- {Readers0#{CTag => Str}, Msgs0};
- _ ->
- {Readers0, []}
- end,
+ #{CTag := #stream{credit = Credit0,
+ competing = false} = Str0} ->
+ Str1 = Str0#stream{credit = Credit0 + Credit},
+ {Str, Msgs0} = stream_entries(Leader, Str1),
+ {Readers0#{CTag => Str}, Msgs0};
+ #{CTag := #stream{credit = Credit0,
+ competing = true} = Str0} ->
+ Str1 = Str0#stream{credit = Credit0 + Credit},
+ {Str, Msgs0} = stream_entries_by_chunks(CTag, Leader, Str1),
+ {Readers0#{CTag => Str}, Msgs0};
+ _ ->
+ {Readers0, []}
+ end,
{Readers, Actions} =
case Drain of
true ->
@@ -286,7 +319,7 @@ deliver(QSs, #delivery{confirm = Confirm} = Delivery) ->
end, {[], []}, QSs).
deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId},
- #stream_client{name = Name,
+ #stream_client{reference = Name,
leader = LeaderPid,
writer_id = WriterId,
next_seq = Seq,
@@ -312,7 +345,7 @@ deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId},
slow = Slow}.
-spec dequeue(_, _, _, client()) -> no_return().
-dequeue(_, _, _, #stream_client{name = Name}) ->
+dequeue(_, _, _, #stream_client{reference = Name}) ->
{protocol_error, not_implemented, "basic.get not supported by stream queues ~s",
[rabbit_misc:rs(Name)]}.
@@ -320,7 +353,7 @@ handle_event({osiris_written, From, _WriterId, Corrs},
State = #stream_client{correlation = Correlation0,
soft_limit = SftLmt,
slow = Slow0,
- name = Name}) ->
+ reference = Name}) ->
MsgIds = lists:sort(maps:fold(
fun (_Seq, {I, _M}, Acc) ->
[I | Acc]
@@ -337,13 +370,12 @@ handle_event({osiris_written, From, _WriterId, Corrs},
slow = Slow}, [{settled, From, MsgIds}]};
handle_event({osiris_offset, _From, _Offs},
State = #stream_client{leader = Leader,
- readers = Readers0,
- name = Name}) ->
+ readers = Readers0}) ->
%% offset isn't actually needed as we use the atomic to read the
%% current committed
{Readers, TagMsgs} = maps:fold(
fun (Tag, Str0, {Acc, TM}) ->
- {Str, Msgs} = stream_entries(Name, Leader, Str0),
+ {Str, Msgs} = stream_entries(Leader, Str0),
%% HACK for now, better to just return but
%% tricky with acks credits
%% that also evaluate the stream
@@ -355,7 +387,21 @@ handle_event({osiris_offset, _From, _Offs},
|| {Tag, _LeaderPid, OffsetMsg} <- TagMsgs],
{ok, State#stream_client{readers = Readers}, Deliveries};
handle_event({stream_leader_change, Pid}, State) ->
- {ok, update_leader_pid(Pid, State), []}.
+ {ok, update_leader_pid(Pid, State), []};
+handle_event({osiris_chunk, _From, Tag, [FirstChunk | _] = Chunks},
+ State = #stream_client{leader = Leader,
+ readers = Readers0}) ->
+ #stream{pending_chunks = Chunks0,
+ listening_offset = NextOffset} = Str0 = maps:get(Tag, Readers0),
+ Str1 = case FirstChunk < NextOffset of
+ true -> reopen_log(Str0);
+ false -> Str0
+ end,
+ {Str, Msgs} = stream_entries_by_chunks(Tag, Leader,
+ Str1#stream{pending_chunks = Chunks0 ++ Chunks}),
+ Readers = Readers0#{Tag => Str},
+ Deliver = [{deliver, Tag, true, Msgs}],
+ {ok, State#stream_client{readers = Readers}, Deliver}.
is_recoverable(Q) ->
Node = node(),
@@ -370,20 +416,47 @@ recover(_VHost, Queues) ->
{[Q | R0], F0}
end, {[], []}, Queues).
+handle_pending(_Name, [], Str) ->
+ Str;
+handle_pending(Name, [MsgId | MsgIds], #stream{pending_ack = PendingAck0,
+ pending_ack_by_chunk = PendingAckByChunk0} = Str) ->
+ {ChunkId, PendingAck} = maps:take(MsgId, PendingAck0),
+ case maps:get(ChunkId, PendingAckByChunk0) of
+ #{msgs := [MsgId],
+ tag := Tag} ->
+ {ok, CRC} = osiris_server_sup:get_crc(Name),
+ gen_server:cast(CRC, {ack, {self(), Tag}, ChunkId}),
+ PendingAckByChunk = maps:remove(ChunkId, PendingAckByChunk0),
+ handle_pending(Name, MsgIds, Str#stream{pending_ack = PendingAck,
+ pending_ack_by_chunk = PendingAckByChunk});
+ #{msgs := MsgIdsByChunk0} = ByChunk ->
+ MsgIdsByChunk = lists:delete(MsgId, MsgIdsByChunk0),
+ Str#stream{pending_ack = PendingAck,
+ pending_ack_by_chunk = maps:put(ChunkId, ByChunk#{msgs => MsgIdsByChunk},
+ PendingAckByChunk0)}
+ end.
+
settle(complete, CTag, MsgIds, #stream_client{readers = Readers0,
name = Name,
leader = Leader} = State) ->
Credit = length(MsgIds),
{Readers, Msgs} = case Readers0 of
- #{CTag := #stream{credit = Credit0} = Str0} ->
+ #{CTag := #stream{credit = Credit0,
+ competing = true} = Str0} ->
+ Str1 = handle_pending(Name, MsgIds,
+ Str0#stream{credit = Credit0 + Credit}),
+ {Str, Msgs0} = stream_entries_by_chunks(CTag, Leader, Str1),
+ {Readers0#{CTag => Str}, Msgs0};
+ #{CTag := #stream{credit = Credit0,
+ competing = false} = Str0} ->
Str1 = Str0#stream{credit = Credit0 + Credit},
- {Str, Msgs0} = stream_entries(Name, Leader, Str1),
+ {Str, Msgs0} = stream_entries(Leader, Str1),
{Readers0#{CTag => Str}, Msgs0};
_ ->
{Readers0, []}
end,
{State#stream_client{readers = Readers}, [{deliver, CTag, true, Msgs}]};
-settle(_, _, _, #stream_client{name = Name}) ->
+settle(_, _, _, #stream_client{reference = Name}) ->
{protocol_error, not_implemented,
"basic.nack and basic.reject not supported by stream queues ~s",
[rabbit_misc:rs(Name)]}.
@@ -467,10 +540,12 @@ init(Q) when ?is_amqqueue(Q) ->
Prefix = erlang:pid_to_list(self()) ++ "_",
WriterId = rabbit_guid:binary(rabbit_guid:gen(), Prefix),
{ok, SoftLimit} = application:get_env(rabbit, stream_messages_soft_limit),
- #stream_client{name = amqqueue:get_name(Q),
+ Name = maps:get(name, amqqueue:get_type_state(Q)),
+ #stream_client{reference = amqqueue:get_name(Q),
leader = Leader,
writer_id = WriterId,
- soft_limit = SoftLimit}.
+ soft_limit = SoftLimit,
+ name = Name}.
close(#stream_client{readers = Readers}) ->
_ = maps:map(fun (_, #stream{log = Log}) ->
@@ -660,10 +735,10 @@ check_queue_exists_in_local_node(Q) ->
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
-stream_entries(Name, Id, Str) ->
- stream_entries(Name, Id, Str, []).
+stream_entries(Id, Str) ->
+ stream_entries(Id, Str, []).
-stream_entries(Name, LeaderPid,
+stream_entries(LeaderPid,
#stream{name = QName,
credit = Credit,
start_offset = StartOffs,
@@ -686,7 +761,7 @@ stream_entries(Name, LeaderPid,
Msg0 = binary_to_msg(QName, B),
Msg = rabbit_basic:add_header(<<"x-stream-offset">>,
long, O, Msg0),
- {Name, LeaderPid, O, false, Msg}
+ {QName, LeaderPid, O, false, Msg}
end || {O, B} <- Records,
O >= StartOffs],
@@ -701,12 +776,65 @@ stream_entries(Name, LeaderPid,
false ->
%% if there are fewer Msgs than Entries0 it means there were non-events
%% in the log and we should recurse and try again
- stream_entries(Name, LeaderPid, Str, MsgIn ++ Msgs)
+ stream_entries(LeaderPid, Str, MsgIn ++ Msgs)
end
end;
-stream_entries(_Name, _Id, Str, Msgs) ->
+stream_entries(_Id, Str, Msgs) ->
{Str, Msgs}.
+stream_entries_by_chunks(Tag, LeaderPid, Str) ->
+ stream_entries_by_chunks(Tag, LeaderPid, Str, []).
+
+stream_entries_by_chunks(_Tag, _LeaderPid, #stream{pending_chunks = []} = Str, MsgIn) ->
+ {Str, MsgIn};
+stream_entries_by_chunks(Tag, LeaderPid,
+ #stream{name = QName,
+ credit = Credit,
+ log = Seg0,
+ pending_ack = PendingAck0,
+ pending_ack_by_chunk = PendingAckByChunk0,
+ pending_chunks = [ChunkId | Chunks]} = Str0, MsgIn)
+ when Credit > 0 ->
+ case osiris_log:read_chunk_parsed(Seg0) of
+ {end_of_stream, Seg} ->
+ %% TODO where is my chunk? it might be older if other competing consumer just died
+ NextOffset = osiris_log:next_offset(Seg),
+ {Str0#stream{log = Seg,
+ listening_offset = NextOffset}, MsgIn};
+ {[{Off, _} | _] = Records, Seg} when Off == ChunkId ->
+ Msgs = [begin
+ Msg0 = binary_to_msg(QName, B),
+ Msg = rabbit_basic:add_header(<<"x-stream-offset">>,
+ long, O, Msg0),
+ {QName, LeaderPid, O, false, Msg}
+ end || {O, B} <- Records],
+ {MsgIds, _} = lists:unzip(Records),
+ NumMsgs = length(Msgs),
+ NextOffset = osiris_log:next_offset(Seg),
+ PendingAck = maps:merge(PendingAck0,
+ maps:from_list([{MsgId, ChunkId} || MsgId <- MsgIds])),
+ Str = Str0#stream{credit = Credit - NumMsgs,
+ log = Seg,
+ pending_chunks = Chunks,
+ pending_ack_by_chunk = PendingAckByChunk0#{ChunkId =>
+ #{msgs => MsgIds,
+ tag => Tag}},
+ pending_ack = PendingAck,
+ listening_offset = NextOffset},
+ case Str#stream.credit < 1 of
+ true ->
+ %% we are done here
+ {Str, MsgIn ++ Msgs};
+ false ->
+ stream_entries_by_chunks(Tag, LeaderPid, Str, MsgIn ++ Msgs)
+ end;
+ {_, Seg} ->
+ %% Not yet the expected chunk, try again
+ stream_entries_by_chunks(Tag, LeaderPid, Str0#stream{log = Seg}, MsgIn)
+ end;
+stream_entries_by_chunks(_Tag, _Id, Str, MsgIn) ->
+ {Str, MsgIn}.
+
binary_to_msg(#resource{virtual_host = VHost,
kind = queue,
name = QName}, Data) ->
@@ -762,3 +890,13 @@ resend_all(#stream_client{leader = LeaderPid,
ok = osiris:write(LeaderPid, WriterId, Seq, msg_to_iodata(Msg))
end || {Seq, Msg} <- Msgs],
State.
+
+reopen_log(#stream_client{name = QName} = Str0) ->
+ Q = rabbit_amqqueue:lookup(QName),
+ Conf = amqqueue:get_type_state(Q),
+ LocalPid = get_local_pid(Conf),
+ {ok, Seg0} = osiris:init_reader(LocalPid, 'first'),
+ NextOffset = osiris_log:next_offset(Seg0) - 1,
+ Str0#stream{start_offset = NextOffset,
+ listening_offset = NextOffset,
+ log = Seg0}.
diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
index 47c5432cbc..02accebc19 100644
--- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
+++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
@@ -95,7 +95,9 @@ all_tests() ->
max_age_policy,
max_segment_size_policy,
purge,
- update_retention_policy
+ update_retention_policy,
+ competing_consumers,
+ competing_consumers_resend
].
%% -------------------------------------------------------------------
@@ -255,8 +257,8 @@ declare_queue(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children,
- [osiris_server_sup])),
+ [{_, Sup, _, _}] = rpc:call(Server, supervisor, which_children, [osiris_server_sup_sup]),
+ ?assertMatch([_, _], rpc:call(Server, supervisor, which_children, [Sup])),
%% Test declare an existing queue with different arguments
?assertExit(_, declare(Ch, Q, [])).
@@ -1143,7 +1145,8 @@ max_age(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-max-age">>, longstr, <<"10s">>},
- {<<"x-max-segment-size">>, long, 250}])),
+ {<<"x-max-segment-size">>, long, 250}
+ ])),
Payload = << <<"1">> || _ <- lists:seq(1, 500) >>,
@@ -1158,8 +1161,6 @@ max_age(Config) ->
[publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)],
amqp_channel:wait_for_confirms(Ch, 5),
- timer:sleep(5000),
-
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 200, false),
subscribe(Ch1, Q, false, 0),
@@ -1452,7 +1453,7 @@ leader_locator_policy(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
ok = rabbit_ct_broker_helpers:set_policy(
- Config, 0, <<"leader-locator">>, <<"leader_locator_.*">>, <<"queues">>,
+ Config, 0, <<"leader-locator">>, <<"leader_locator_policy.*">>, <<"queues">>,
[{<<"queue-leader-locator">>, <<"random">>}]),
Q = ?config(queue_name, Config),
@@ -1601,7 +1602,7 @@ max_segment_size_policy(Config) ->
purge(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
@@ -1609,6 +1610,96 @@ purge(Config) ->
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _},
amqp_channel:call(Ch, #'queue.purge'{queue = Q})).
+competing_consumers(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-max-segment-size">>, long, 10000}
+ ])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+
+ Payload = << <<"1">> || _ <- lists:seq(1, 500) >>,
+ [publish(Ch, Q, Payload) || _ <- lists:seq(1, 10000)],
+
+ amqp_channel:wait_for_confirms(Ch, 5),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+
+ subscribe(Ch1, Q, false, 0, <<"ctag1">>, [{<<"x-competing-consumers">>, bool, true}]),
+ subscribe(Ch1, Q, false, 0, <<"ctag2">>, [{<<"x-competing-consumers">>, bool, true}]),
+
+ receive
+ {#'basic.deliver'{consumer_tag = <<"ctag1">>},
+ #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S1_1}]}}} ->
+ receive
+ %% This is the first message, and we should receive two chunks. We can thus
+ %% guarantee we get the next message on the stream in order, as nothing should
+ %% have crashed!
+ {#'basic.deliver'{consumer_tag = <<"ctag1">>},
+ #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S1_2}]}}} ->
+ ?assert((S1_1 + 1) == S1_2),
+ receive
+ {#'basic.deliver'{consumer_tag = <<"ctag2">>},
+ #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S2_3}]}}} ->
+ %% Probably chunks have more than 1 message, but at least we can assert this
+ ?assert(S1_2 < S2_3)
+ after 5000 ->
+ exit(timeout2_1)
+ end
+ after 5000 ->
+ exit(timeout1_2)
+ end
+ after 5000 ->
+ exit(timeout1_1)
+ end.
+
+competing_consumers_resend(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-max-segment-size">>, long, 10000}
+ ])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+
+ Payload = << <<"1">> || _ <- lists:seq(1, 500) >>,
+ [publish(Ch, Q, Payload) || _ <- lists:seq(1, 10000)],
+
+ amqp_channel:wait_for_confirms(Ch, 5),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+
+ subscribe(Ch1, Q, false, 0, <<"ctag1">>, [{<<"x-competing-consumers">>, bool, true}]),
+
+ receive
+ {#'basic.deliver'{consumer_tag = <<"ctag1">>},
+ #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S1}]}}} ->
+ rabbit_ct_client_helpers:close_channel(Ch1),
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch2, 10, false),
+ subscribe(Ch2, Q, false, 0, <<"ctag2">>, [{<<"x-competing-consumers">>, bool, true}]),
+ receive
+ {#'basic.deliver'{consumer_tag = <<"ctag2">>},
+ #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S1}]}}} ->
+ ok
+ after 5000 ->
+ exit(timeout2_1)
+ end
+ after 5000 ->
+ exit(timeout1_1)
+ end.
+
%%----------------------------------------------------------------------------
delete_queues() ->
@@ -1654,13 +1745,16 @@ publish(Ch, Queue, Msg) ->
payload = Msg}).
subscribe(Ch, Queue, NoAck, Offset) ->
+ subscribe(Ch, Queue, NoAck, Offset, <<"ctag">>, []).
+
+subscribe(Ch, Queue, NoAck, Offset, CTag, ExtraArgs) ->
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
no_ack = NoAck,
- consumer_tag = <<"ctag">>,
- arguments = [{<<"x-stream-offset">>, long, Offset}]},
+ consumer_tag = CTag,
+ arguments = [{<<"x-stream-offset">>, long, Offset}] ++ ExtraArgs},
self()),
receive
- #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ #'basic.consume_ok'{consumer_tag = CTag} ->
ok
end.