diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-09-27 09:52:48 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-09-27 09:52:48 +0200 |
commit | b758edc64247ef8b33bf8c897b37013d9eef99ec (patch) | |
tree | 7363f632f390b14377e756abff52ff0acfdf252c | |
parent | c78b04307bf87eb604ebe6b19abefaa9d22984dc (diff) | |
download | rabbitmq-server-git-rabbitmq-server-3508-web-stomp-stream-consuming.tar.gz |
Handle no-context delivery in web stomprabbitmq-server-3508-web-stomp-stream-consuming
To support messages from streams, which do not have a
context (for credit flow).
References rabbitmq/rabbitmq-stomp#138
Fixes #3508
-rw-r--r-- | deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl | 2 | ||||
-rw-r--r-- | deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl | 9 |
2 files changed, 10 insertions, 1 deletions
diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl index 30a0b94b35..4cbdaf10ba 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl @@ -827,7 +827,7 @@ send_delivery(Delivery = #'basic.deliver'{consumer_tag = ConsumerTag}, NewState. notify_received(undefined) -> - %% no notification for quorum queues + %% no notification for quorum queues and streams ok; notify_received(DeliveryCtx) -> %% notification for flow control diff --git a/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl b/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl index 3a1bcd42cb..ebec3da671 100644 --- a/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl +++ b/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl @@ -183,6 +183,15 @@ websocket_info({Delivery = #'basic.deliver'{}, DeliveryCtx, ProcState0), {ok, State#state{ proc_state = ProcState }}; +websocket_info({Delivery = #'basic.deliver'{}, + #amqp_msg{props = Props, payload = Payload}}, + State=#state{ proc_state = ProcState0 }) -> + ProcState = rabbit_stomp_processor:send_delivery(Delivery, + Props, + Payload, + undefined, + ProcState0), + {ok, State#state{ proc_state = ProcState }}; websocket_info(#'basic.cancel'{consumer_tag = Ctag}, State=#state{ proc_state = ProcState0 }) -> case rabbit_stomp_processor:cancel_consumer(Ctag, ProcState0) of |