diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2021-01-27 14:48:59 +0100 |
---|---|---|
committer | dcorbacho <dparracorbacho@piotal.io> | 2021-01-28 13:34:50 +0100 |
commit | 9357b7f1edacdc9cd55226a062236dc1362ea2d9 (patch) | |
tree | b3935651d8df37cf32234af70c8439ae11de8480 | |
parent | 3de198845fa28f59b9520d92d1579911af652734 (diff) | |
download | rabbitmq-server-git-stream-competing-readers.tar.gz |
Competing readers on stream queuesstream-competing-readers
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 210 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_stream_queue_SUITE.erl | 116 |
2 files changed, 279 insertions, 47 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 206584fe61..fb1b0f212d 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -62,9 +62,15 @@ max :: non_neg_integer(), start_offset = 0 :: non_neg_integer(), listening_offset = 0 :: non_neg_integer(), - log :: undefined | osiris_log:state()}). - --record(stream_client, {name :: term(), + log :: undefined | osiris_log:state(), + competing :: boolean(), + pending_chunks = [] :: [non_neg_integer()], + pending_ack = #{} :: #{term() => term()}, + pending_ack_by_chunk = #{} :: #{term() => [term()]} + }). + +-record(stream_client, {name :: binary(), + reference :: term(), leader :: pid(), next_seq = 1 :: non_neg_integer(), correlation = #{} :: #{appender_seq() => {msg_id(), msg()}}, @@ -107,7 +113,7 @@ start_cluster(Q0, Node) -> amqqueue:set_type_state(Q0, Conf0)) of {ok, {error, already_started}, _} -> {protocol_error, precondition_failed, "safe queue name already in use '~s'", - [Node]}; + [maps:get(name, Conf0)]}; {ok, {created, Q}, _} -> rabbit_event:notify(queue_created, [{name, QName}, @@ -190,6 +196,13 @@ consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) -> {_, V} -> V end, + %% If we have competing consumers, offset must be ignored + Competing = case rabbit_misc:table_lookup(Args, <<"x-competing-consumers">>) of + undefined -> + false; + _ -> + true + end, rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck, QName, ConsumerPrefetchCount, false, @@ -199,7 +212,7 @@ consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) -> %% do maybe_send_reply(ChPid, OkMsg), QState = begin_stream(QState0, Q, ConsumerTag, Offset, - ConsumerPrefetchCount), + ConsumerPrefetchCount, Competing), {ok, QState, []}; Err -> Err @@ -214,11 +227,25 @@ get_local_pid(#{replica_pids := ReplicaPids}) -> Local. begin_stream(#stream_client{readers = Readers0} = State, - Q, Tag, Offset, Max) -> - LocalPid = get_local_pid(amqqueue:get_type_state(Q)), + Q, Tag, Offset0, Max, Competing) -> + Conf = amqqueue:get_type_state(Q), + QName = amqqueue:get_name(Q), + LocalPid = get_local_pid(Conf), + Offset = case Competing of + true -> 'first'; + false -> Offset0 + end, {ok, Seg0} = osiris:init_reader(LocalPid, Offset), NextOffset = osiris_log:next_offset(Seg0) - 1, - osiris:register_offset_listener(LocalPid, NextOffset), + case Competing of + true -> + {ok, CRC} = osiris_server_sup:get_crc(maps:get(name, Conf)), + Fun = fun(Evt) -> {'$gen_cast', {queue_event, QName, Evt}} end, + %% TODO configure number of chunks + gen_server:call(CRC, {register, self(), Tag, 2, Fun}); + false -> + osiris:register_offset_listener(LocalPid, NextOffset) + end, %% TODO: avoid double calls to the same process StartOffset = case Offset of first -> NextOffset; @@ -232,11 +259,12 @@ begin_stream(#stream_client{readers = Readers0} = State, start_offset = StartOffset, listening_offset = NextOffset, log = Seg0, - max = Max}, + max = Max, + competing = Competing}, State#stream_client{readers = Readers0#{Tag => Str0}}. cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0, - name = QName} = State) -> + reference = QName} = State) -> Readers = maps:remove(ConsumerTag, Readers0), rabbit_core_metrics:consumer_deleted(self(), ConsumerTag, QName), rabbit_event:notify(consumer_deleted, [{consumer_tag, ConsumerTag}, @@ -247,16 +275,21 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0, {ok, State#stream_client{readers = Readers}}. credit(CTag, Credit, Drain, #stream_client{readers = Readers0, - name = Name, leader = Leader} = State) -> {Readers1, Msgs} = case Readers0 of - #{CTag := #stream{credit = Credit0} = Str0} -> - Str1 = Str0#stream{credit = Credit0 + Credit}, - {Str, Msgs0} = stream_entries(Name, Leader, Str1), - {Readers0#{CTag => Str}, Msgs0}; - _ -> - {Readers0, []} - end, + #{CTag := #stream{credit = Credit0, + competing = false} = Str0} -> + Str1 = Str0#stream{credit = Credit0 + Credit}, + {Str, Msgs0} = stream_entries(Leader, Str1), + {Readers0#{CTag => Str}, Msgs0}; + #{CTag := #stream{credit = Credit0, + competing = true} = Str0} -> + Str1 = Str0#stream{credit = Credit0 + Credit}, + {Str, Msgs0} = stream_entries_by_chunks(CTag, Leader, Str1), + {Readers0#{CTag => Str}, Msgs0}; + _ -> + {Readers0, []} + end, {Readers, Actions} = case Drain of true -> @@ -286,7 +319,7 @@ deliver(QSs, #delivery{confirm = Confirm} = Delivery) -> end, {[], []}, QSs). deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId}, - #stream_client{name = Name, + #stream_client{reference = Name, leader = LeaderPid, writer_id = WriterId, next_seq = Seq, @@ -312,7 +345,7 @@ deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId}, slow = Slow}. -spec dequeue(_, _, _, client()) -> no_return(). -dequeue(_, _, _, #stream_client{name = Name}) -> +dequeue(_, _, _, #stream_client{reference = Name}) -> {protocol_error, not_implemented, "basic.get not supported by stream queues ~s", [rabbit_misc:rs(Name)]}. @@ -320,7 +353,7 @@ handle_event({osiris_written, From, _WriterId, Corrs}, State = #stream_client{correlation = Correlation0, soft_limit = SftLmt, slow = Slow0, - name = Name}) -> + reference = Name}) -> MsgIds = lists:sort(maps:fold( fun (_Seq, {I, _M}, Acc) -> [I | Acc] @@ -337,13 +370,12 @@ handle_event({osiris_written, From, _WriterId, Corrs}, slow = Slow}, [{settled, From, MsgIds}]}; handle_event({osiris_offset, _From, _Offs}, State = #stream_client{leader = Leader, - readers = Readers0, - name = Name}) -> + readers = Readers0}) -> %% offset isn't actually needed as we use the atomic to read the %% current committed {Readers, TagMsgs} = maps:fold( fun (Tag, Str0, {Acc, TM}) -> - {Str, Msgs} = stream_entries(Name, Leader, Str0), + {Str, Msgs} = stream_entries(Leader, Str0), %% HACK for now, better to just return but %% tricky with acks credits %% that also evaluate the stream @@ -355,7 +387,21 @@ handle_event({osiris_offset, _From, _Offs}, || {Tag, _LeaderPid, OffsetMsg} <- TagMsgs], {ok, State#stream_client{readers = Readers}, Deliveries}; handle_event({stream_leader_change, Pid}, State) -> - {ok, update_leader_pid(Pid, State), []}. + {ok, update_leader_pid(Pid, State), []}; +handle_event({osiris_chunk, _From, Tag, [FirstChunk | _] = Chunks}, + State = #stream_client{leader = Leader, + readers = Readers0}) -> + #stream{pending_chunks = Chunks0, + listening_offset = NextOffset} = Str0 = maps:get(Tag, Readers0), + Str1 = case FirstChunk < NextOffset of + true -> reopen_log(Str0); + false -> Str0 + end, + {Str, Msgs} = stream_entries_by_chunks(Tag, Leader, + Str1#stream{pending_chunks = Chunks0 ++ Chunks}), + Readers = Readers0#{Tag => Str}, + Deliver = [{deliver, Tag, true, Msgs}], + {ok, State#stream_client{readers = Readers}, Deliver}. is_recoverable(Q) -> Node = node(), @@ -370,20 +416,47 @@ recover(_VHost, Queues) -> {[Q | R0], F0} end, {[], []}, Queues). +handle_pending(_Name, [], Str) -> + Str; +handle_pending(Name, [MsgId | MsgIds], #stream{pending_ack = PendingAck0, + pending_ack_by_chunk = PendingAckByChunk0} = Str) -> + {ChunkId, PendingAck} = maps:take(MsgId, PendingAck0), + case maps:get(ChunkId, PendingAckByChunk0) of + #{msgs := [MsgId], + tag := Tag} -> + {ok, CRC} = osiris_server_sup:get_crc(Name), + gen_server:cast(CRC, {ack, {self(), Tag}, ChunkId}), + PendingAckByChunk = maps:remove(ChunkId, PendingAckByChunk0), + handle_pending(Name, MsgIds, Str#stream{pending_ack = PendingAck, + pending_ack_by_chunk = PendingAckByChunk}); + #{msgs := MsgIdsByChunk0} = ByChunk -> + MsgIdsByChunk = lists:delete(MsgId, MsgIdsByChunk0), + Str#stream{pending_ack = PendingAck, + pending_ack_by_chunk = maps:put(ChunkId, ByChunk#{msgs => MsgIdsByChunk}, + PendingAckByChunk0)} + end. + settle(complete, CTag, MsgIds, #stream_client{readers = Readers0, name = Name, leader = Leader} = State) -> Credit = length(MsgIds), {Readers, Msgs} = case Readers0 of - #{CTag := #stream{credit = Credit0} = Str0} -> + #{CTag := #stream{credit = Credit0, + competing = true} = Str0} -> + Str1 = handle_pending(Name, MsgIds, + Str0#stream{credit = Credit0 + Credit}), + {Str, Msgs0} = stream_entries_by_chunks(CTag, Leader, Str1), + {Readers0#{CTag => Str}, Msgs0}; + #{CTag := #stream{credit = Credit0, + competing = false} = Str0} -> Str1 = Str0#stream{credit = Credit0 + Credit}, - {Str, Msgs0} = stream_entries(Name, Leader, Str1), + {Str, Msgs0} = stream_entries(Leader, Str1), {Readers0#{CTag => Str}, Msgs0}; _ -> {Readers0, []} end, {State#stream_client{readers = Readers}, [{deliver, CTag, true, Msgs}]}; -settle(_, _, _, #stream_client{name = Name}) -> +settle(_, _, _, #stream_client{reference = Name}) -> {protocol_error, not_implemented, "basic.nack and basic.reject not supported by stream queues ~s", [rabbit_misc:rs(Name)]}. @@ -467,10 +540,12 @@ init(Q) when ?is_amqqueue(Q) -> Prefix = erlang:pid_to_list(self()) ++ "_", WriterId = rabbit_guid:binary(rabbit_guid:gen(), Prefix), {ok, SoftLimit} = application:get_env(rabbit, stream_messages_soft_limit), - #stream_client{name = amqqueue:get_name(Q), + Name = maps:get(name, amqqueue:get_type_state(Q)), + #stream_client{reference = amqqueue:get_name(Q), leader = Leader, writer_id = WriterId, - soft_limit = SoftLimit}. + soft_limit = SoftLimit, + name = Name}. close(#stream_client{readers = Readers}) -> _ = maps:map(fun (_, #stream{log = Log}) -> @@ -660,10 +735,10 @@ check_queue_exists_in_local_node(Q) -> maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). -stream_entries(Name, Id, Str) -> - stream_entries(Name, Id, Str, []). +stream_entries(Id, Str) -> + stream_entries(Id, Str, []). -stream_entries(Name, LeaderPid, +stream_entries(LeaderPid, #stream{name = QName, credit = Credit, start_offset = StartOffs, @@ -686,7 +761,7 @@ stream_entries(Name, LeaderPid, Msg0 = binary_to_msg(QName, B), Msg = rabbit_basic:add_header(<<"x-stream-offset">>, long, O, Msg0), - {Name, LeaderPid, O, false, Msg} + {QName, LeaderPid, O, false, Msg} end || {O, B} <- Records, O >= StartOffs], @@ -701,12 +776,65 @@ stream_entries(Name, LeaderPid, false -> %% if there are fewer Msgs than Entries0 it means there were non-events %% in the log and we should recurse and try again - stream_entries(Name, LeaderPid, Str, MsgIn ++ Msgs) + stream_entries(LeaderPid, Str, MsgIn ++ Msgs) end end; -stream_entries(_Name, _Id, Str, Msgs) -> +stream_entries(_Id, Str, Msgs) -> {Str, Msgs}. +stream_entries_by_chunks(Tag, LeaderPid, Str) -> + stream_entries_by_chunks(Tag, LeaderPid, Str, []). + +stream_entries_by_chunks(_Tag, _LeaderPid, #stream{pending_chunks = []} = Str, MsgIn) -> + {Str, MsgIn}; +stream_entries_by_chunks(Tag, LeaderPid, + #stream{name = QName, + credit = Credit, + log = Seg0, + pending_ack = PendingAck0, + pending_ack_by_chunk = PendingAckByChunk0, + pending_chunks = [ChunkId | Chunks]} = Str0, MsgIn) + when Credit > 0 -> + case osiris_log:read_chunk_parsed(Seg0) of + {end_of_stream, Seg} -> + %% TODO where is my chunk? it might be older if other competing consumer just died + NextOffset = osiris_log:next_offset(Seg), + {Str0#stream{log = Seg, + listening_offset = NextOffset}, MsgIn}; + {[{Off, _} | _] = Records, Seg} when Off == ChunkId -> + Msgs = [begin + Msg0 = binary_to_msg(QName, B), + Msg = rabbit_basic:add_header(<<"x-stream-offset">>, + long, O, Msg0), + {QName, LeaderPid, O, false, Msg} + end || {O, B} <- Records], + {MsgIds, _} = lists:unzip(Records), + NumMsgs = length(Msgs), + NextOffset = osiris_log:next_offset(Seg), + PendingAck = maps:merge(PendingAck0, + maps:from_list([{MsgId, ChunkId} || MsgId <- MsgIds])), + Str = Str0#stream{credit = Credit - NumMsgs, + log = Seg, + pending_chunks = Chunks, + pending_ack_by_chunk = PendingAckByChunk0#{ChunkId => + #{msgs => MsgIds, + tag => Tag}}, + pending_ack = PendingAck, + listening_offset = NextOffset}, + case Str#stream.credit < 1 of + true -> + %% we are done here + {Str, MsgIn ++ Msgs}; + false -> + stream_entries_by_chunks(Tag, LeaderPid, Str, MsgIn ++ Msgs) + end; + {_, Seg} -> + %% Not yet the expected chunk, try again + stream_entries_by_chunks(Tag, LeaderPid, Str0#stream{log = Seg}, MsgIn) + end; +stream_entries_by_chunks(_Tag, _Id, Str, MsgIn) -> + {Str, MsgIn}. + binary_to_msg(#resource{virtual_host = VHost, kind = queue, name = QName}, Data) -> @@ -762,3 +890,13 @@ resend_all(#stream_client{leader = LeaderPid, ok = osiris:write(LeaderPid, WriterId, Seq, msg_to_iodata(Msg)) end || {Seq, Msg} <- Msgs], State. + +reopen_log(#stream_client{name = QName} = Str0) -> + Q = rabbit_amqqueue:lookup(QName), + Conf = amqqueue:get_type_state(Q), + LocalPid = get_local_pid(Conf), + {ok, Seg0} = osiris:init_reader(LocalPid, 'first'), + NextOffset = osiris_log:next_offset(Seg0) - 1, + Str0#stream{start_offset = NextOffset, + listening_offset = NextOffset, + log = Seg0}. diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 47c5432cbc..02accebc19 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -95,7 +95,9 @@ all_tests() -> max_age_policy, max_segment_size_policy, purge, - update_retention_policy + update_retention_policy, + competing_consumers, + competing_consumers_resend ]. %% ------------------------------------------------------------------- @@ -255,8 +257,8 @@ declare_queue(Config) -> ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, - [osiris_server_sup])), + [{_, Sup, _, _}] = rpc:call(Server, supervisor, which_children, [osiris_server_sup_sup]), + ?assertMatch([_, _], rpc:call(Server, supervisor, which_children, [Sup])), %% Test declare an existing queue with different arguments ?assertExit(_, declare(Ch, Q, [])). @@ -1143,7 +1145,8 @@ max_age(Config) -> ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-max-age">>, longstr, <<"10s">>}, - {<<"x-max-segment-size">>, long, 250}])), + {<<"x-max-segment-size">>, long, 250} + ])), Payload = << <<"1">> || _ <- lists:seq(1, 500) >>, @@ -1158,8 +1161,6 @@ max_age(Config) -> [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)], amqp_channel:wait_for_confirms(Ch, 5), - timer:sleep(5000), - Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), qos(Ch1, 200, false), subscribe(Ch1, Q, false, 0), @@ -1452,7 +1453,7 @@ leader_locator_policy(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Server), ok = rabbit_ct_broker_helpers:set_policy( - Config, 0, <<"leader-locator">>, <<"leader_locator_.*">>, <<"queues">>, + Config, 0, <<"leader-locator">>, <<"leader_locator_policy.*">>, <<"queues">>, [{<<"queue-leader-locator">>, <<"random">>}]), Q = ?config(queue_name, Config), @@ -1601,7 +1602,7 @@ max_segment_size_policy(Config) -> purge(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), Q = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), @@ -1609,6 +1610,96 @@ purge(Config) -> ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _}, amqp_channel:call(Ch, #'queue.purge'{queue = Q})). +competing_consumers(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-max-segment-size">>, long, 10000} + ])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + + Payload = << <<"1">> || _ <- lists:seq(1, 500) >>, + [publish(Ch, Q, Payload) || _ <- lists:seq(1, 10000)], + + amqp_channel:wait_for_confirms(Ch, 5), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + + subscribe(Ch1, Q, false, 0, <<"ctag1">>, [{<<"x-competing-consumers">>, bool, true}]), + subscribe(Ch1, Q, false, 0, <<"ctag2">>, [{<<"x-competing-consumers">>, bool, true}]), + + receive + {#'basic.deliver'{consumer_tag = <<"ctag1">>}, + #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S1_1}]}}} -> + receive + %% This is the first message, and we should receive two chunks. We can thus + %% guarantee we get the next message on the stream in order, as nothing should + %% have crashed! + {#'basic.deliver'{consumer_tag = <<"ctag1">>}, + #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S1_2}]}}} -> + ?assert((S1_1 + 1) == S1_2), + receive + {#'basic.deliver'{consumer_tag = <<"ctag2">>}, + #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S2_3}]}}} -> + %% Probably chunks have more than 1 message, but at least we can assert this + ?assert(S1_2 < S2_3) + after 5000 -> + exit(timeout2_1) + end + after 5000 -> + exit(timeout1_2) + end + after 5000 -> + exit(timeout1_1) + end. + +competing_consumers_resend(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-max-segment-size">>, long, 10000} + ])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + + Payload = << <<"1">> || _ <- lists:seq(1, 500) >>, + [publish(Ch, Q, Payload) || _ <- lists:seq(1, 10000)], + + amqp_channel:wait_for_confirms(Ch, 5), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + + subscribe(Ch1, Q, false, 0, <<"ctag1">>, [{<<"x-competing-consumers">>, bool, true}]), + + receive + {#'basic.deliver'{consumer_tag = <<"ctag1">>}, + #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S1}]}}} -> + rabbit_ct_client_helpers:close_channel(Ch1), + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch2, 10, false), + subscribe(Ch2, Q, false, 0, <<"ctag2">>, [{<<"x-competing-consumers">>, bool, true}]), + receive + {#'basic.deliver'{consumer_tag = <<"ctag2">>}, + #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S1}]}}} -> + ok + after 5000 -> + exit(timeout2_1) + end + after 5000 -> + exit(timeout1_1) + end. + %%---------------------------------------------------------------------------- delete_queues() -> @@ -1654,13 +1745,16 @@ publish(Ch, Queue, Msg) -> payload = Msg}). subscribe(Ch, Queue, NoAck, Offset) -> + subscribe(Ch, Queue, NoAck, Offset, <<"ctag">>, []). + +subscribe(Ch, Queue, NoAck, Offset, CTag, ExtraArgs) -> amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, no_ack = NoAck, - consumer_tag = <<"ctag">>, - arguments = [{<<"x-stream-offset">>, long, Offset}]}, + consumer_tag = CTag, + arguments = [{<<"x-stream-offset">>, long, Offset}] ++ ExtraArgs}, self()), receive - #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + #'basic.consume_ok'{consumer_tag = CTag} -> ok end. |