summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/interlink_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/interlink_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/interlink_tests.py331
1 files changed, 0 insertions, 331 deletions
diff --git a/qpid/cpp/src/tests/interlink_tests.py b/qpid/cpp/src/tests/interlink_tests.py
deleted file mode 100755
index 8833f06af5..0000000000
--- a/qpid/cpp/src/tests/interlink_tests.py
+++ /dev/null
@@ -1,331 +0,0 @@
-#!/usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
-import traceback
-from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
-from brokertest import *
-from ha_test import HaPort
-from threading import Thread, Lock, Condition
-from logging import getLogger, WARN, ERROR, DEBUG, INFO
-from qpidtoollibs import BrokerObject
-
-class Domain(BrokerObject):
- def __init__(self, broker, values):
- BrokerObject.__init__(self, broker, values)
-
-class Config:
- def __init__(self, broker, address="q;{create:always}", version="amqp1.0"):
- self.url = broker.host_port()
- self.address = address
- self.version = version
-
- def __str__(self):
- return "url: %s, address: %s, version: %s" % (self.url, self.address, self.version)
-
-class AmqpBrokerTest(BrokerTest):
- """
- Tests using AMQP 1.0 support
- """
- def setUp(self):
- BrokerTest.setUp(self)
- self.port_holder = HaPort(self)
- self.broker = self.amqp_broker(port_holder=self.port_holder)
- self.default_config = Config(self.broker)
- self.agent = self.broker.agent
-
- def sender(self, config, reply_to=None):
- cmd = ["qpid-send",
- "--broker", config.url,
- "--address", config.address,
- "--connection-options", "{protocol:%s}" % config.version,
- "--content-stdin", "--send-eos=1"
- ]
- if reply_to:
- cmd.append( "--reply-to=%s" % reply_to)
- return self.popen(cmd, stdin=PIPE)
-
- def receiver(self, config):
- cmd = ["qpid-receive",
- "--broker", config.url,
- "--address", config.address,
- "--connection-options", "{protocol:%r}" % config.version,
- "--timeout=10"
- ]
- return self.popen(cmd, stdout=PIPE)
-
- def ready_receiver(self, config):
- s = self.broker.connect().session()
- r = s.receiver("readyq; {create:always}")
- cmd = ["qpid-receive",
- "--broker", config.url,
- "--address", config.address,
- "--connection-options", "{protocol:%r}" % config.version,
- "--timeout=10", "--ready-address=readyq;{create:always}"
- ]
- result = self.popen(cmd, stdout=PIPE)
- r.fetch(timeout=1) # wait until receiver is actually ready
- s.acknowledge()
- r.close()
- s.close()
- return result
-
- def send_and_receive(self, send_config=None, recv_config=None, count=1000, reply_to=None, wait_for_receiver=False, debug=False):
- if debug:
- print "sender config is %s" % (send_config or self.default_config)
- print "receiver config is %s" % (recv_config or self.default_config)
- sender = self.sender(send_config or self.default_config, reply_to)
- sender._set_cloexec_flag(sender.stdin) #required for older python, see http://bugs.python.org/issue4112
- if wait_for_receiver:
- receiver = self.ready_receiver(recv_config or self.default_config)
- else:
- receiver = self.receiver(recv_config or self.default_config)
-
- messages = ["message-%s" % (i+1) for i in range(count)]
- for m in messages:
- sender.stdin.write(m + "\n")
- sender.stdin.flush()
- sender.stdin.close()
- if debug:
- c = send_config or self.default_config
- print "sent %s messages to %s sn %s" % (len(messages), c.address, c.url)
-
- if debug:
- c = recv_config or self.default_config
- print "reading messages from %s sn %s" % (c.address, c.url)
- for m in messages:
- l = receiver.stdout.readline().rstrip()
- if debug:
- print l
- assert m == l, (m, l)
-
- sender.wait()
- receiver.wait()
-
- def test_simple(self):
- self.send_and_receive()
-
- def test_translate1(self):
- self.send_and_receive(recv_config=Config(self.broker, version="amqp0-10"))
-
- def test_translate2(self):
- self.send_and_receive(send_config=Config(self.broker, version="amqp0-10"))
-
- def test_translate_with_large_routingkey(self):
- self.send_and_receive(send_config=Config(self.broker, address="amq.topic/a.%s" % ("x" * 256), version="amqp1.0"), recv_config=Config(self.broker, address="amq.topic/a.*", version="amqp0-10"), wait_for_receiver=True)
-
- def send_and_receive_empty(self, send_config=None, recv_config=None):
- sconfig = send_config or self.default_config
- rconfig = recv_config or self.default_config
- send_cmd = ["qpid-send",
- "--broker", sconfig.url,
- "--address=%s" % sconfig.address,
- "--connection-options={protocol:%s}" % sconfig.version,
- "--content-size=0",
- "--messages=1",
- "-P", "my-header=abc"
- ]
- sender = self.popen(send_cmd)
- sender.wait()
- receive_cmd = ["qpid-receive",
- "--broker", rconfig.url,
- "--address=%s" % rconfig.address,
- "--connection-options={protocol:%s}" % rconfig.version,
- "--messages=1",
- "--print-content=false", "--print-headers=true"
- ]
- receiver = self.popen(receive_cmd, stdout=PIPE)
- l = receiver.stdout.read()
- assert "my-header:abc" in l
- receiver.wait()
-
- def test_translate_empty_1(self):
- self.send_and_receive_empty(recv_config=Config(self.broker, version="amqp0-10"))
-
- def test_translate_empty_2(self):
- self.send_and_receive_empty(send_config=Config(self.broker, version="amqp0-10"))
-
- def request_response(self, reply_to, send_config=None, request_config=None, response_config=None, count=1000, wait_for_receiver=False):
- rconfig = request_config or self.default_config
- echo_cmd = ["qpid-receive",
- "--broker", rconfig.url,
- "--address=%s" % rconfig.address,
- "--connection-options={protocol:%s}" % rconfig.version,
- "--timeout=10", "--print-content=false", "--print-headers=false"
- ]
- requests = self.popen(echo_cmd)
- self.send_and_receive(send_config, response_config, count, reply_to=reply_to, wait_for_receiver=wait_for_receiver)
- requests.wait()
-
- def request_response_local(self, request_address, response_address, wait_for_receiver=False, request_version="amqp1.0", echo_version="amqp1.0"):
- self.request_response(response_address, send_config=Config(self.broker, address=request_address, version=request_version), request_config=Config(self.broker, address=request_address, version=echo_version), response_config=Config(self.broker, address=response_address, version=request_version), wait_for_receiver=wait_for_receiver)
-
- def test_request_reponse_queue(self):
- self.agent.create("queue", "q1")
- self.agent.create("queue", "q2")
- self.request_response_local("q1", "q2")
-
- def test_request_reponse_queue_translated1(self):
- self.agent.create("queue", "q1")
- self.agent.create("queue", "q2")
- self.request_response_local("q1", "q2", request_version="amqp0-10", echo_version="amqp1.0")
-
- def test_request_reponse_queue_translated2(self):
- self.agent.create("queue", "q1")
- self.agent.create("queue", "q2")
- self.request_response_local("q1", "q2", request_version="amqp1.0", echo_version="amqp0-10")
-
- def test_request_reponse_exchange(self):
- self.agent.create("queue", "q1")
- self.request_response_local("q1", "amq.fanout", wait_for_receiver=True)
-
- def test_request_reponse_exchange_translated1(self):
- self.agent.create("queue", "q1")
- self.request_response_local("q1", "amq.fanout", wait_for_receiver=True, request_version="amqp0-10", echo_version="amqp1.0")
-
- def test_request_reponse_exchange_translated2(self):
- self.agent.create("queue", "q1")
- self.request_response_local("q1", "amq.fanout", wait_for_receiver=True, request_version="amqp1.0", echo_version="amqp0-10")
-
- def test_request_reponse_exchange_with_subject(self):
- self.agent.create("queue", "q1")
- self.request_response_local("q1", "amq.topic/abc; {node:{type:topic}}", wait_for_receiver=True)
-
- def test_request_reponse_exchange_with_subject_translated1(self):
- self.agent.create("queue", "q1")
- self.request_response_local("q1", "amq.topic/abc; {node:{type:topic}}", wait_for_receiver=True, request_version="amqp0-10", echo_version="amqp1.0")
-
- def test_request_reponse_exchange_with_subject_translated2(self):
- self.agent.create("queue", "q1")
- self.request_response_local("q1", "amq.topic/abc; {node:{type:topic}}", wait_for_receiver=True, request_version="amqp1.0", echo_version="amqp0-10")
-
- def test_domain(self):
- brokerB = self.amqp_broker()
- self.agent.create("domain", "BrokerB", {"url":brokerB.host_port()})
- domains = self.agent._getAllBrokerObjects(Domain)
- assert len(domains) == 1
- assert domains[0].name == "BrokerB"
-
- def incoming_link(self, mechanism):
- brokerB = self.amqp_broker()
- agentB = brokerB.agent
- self.agent.create("queue", "q")
- agentB.create("queue", "q")
- self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":mechanism})
- self.agent.create("incoming", "Link1", {"domain":"BrokerB","source":"q","target":"q"})
- #send to brokerB, receive from brokerA
- self.send_and_receive(send_config=Config(brokerB))
-
- def test_incoming_link_anonymous(self):
- self.incoming_link("ANONYMOUS")
-
- def test_incoming_link_nosasl(self):
- self.incoming_link("NONE")
-
- def test_outgoing_link(self):
- brokerB = self.amqp_broker()
- agentB = brokerB.agent
- self.agent.create("queue", "q")
- agentB.create("queue", "q")
- self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":"NONE"})
- self.agent.create("outgoing", "Link1", {"domain":"BrokerB","source":"q","target":"q"})
- #send to brokerA, receive from brokerB
- self.send_and_receive(recv_config=Config(brokerB))
-
- def test_relay(self):
- brokerB = self.amqp_broker()
- agentB = brokerB.agent
- agentB.create("queue", "q")
- self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":"NONE"})
- #send to q on broker B through brokerA
- self.send_and_receive(send_config=Config(self.broker, address="q@BrokerB"), recv_config=Config(brokerB))
-
- def test_reconnect(self):
- receiver_cmd = ["qpid-receive",
- "--broker", self.broker.host_port(),
- "--address=amq.fanout",
- "--connection-options={protocol:amqp1.0, reconnect:True,container_id:receiver}",
- "--timeout=10", "--print-content=true", "--print-headers=false"
- ]
- receiver = self.popen(receiver_cmd, stdout=PIPE)
-
- sender_cmd = ["qpid-send",
- "--broker", self.broker.host_port(),
- "--address=amq.fanout",
- "--connection-options={protocol:amqp1.0,reconnect:True,container_id:sender}",
- "--content-stdin", "--send-eos=1"
- ]
- sender = self.popen(sender_cmd, stdin=PIPE)
- sender._set_cloexec_flag(sender.stdin) #required for older python, see http://bugs.python.org/issue4112
-
-
- batch1 = ["message-%s" % (i+1) for i in range(10000)]
- for m in batch1:
- sender.stdin.write(m + "\n")
- sender.stdin.flush()
-
- self.broker.kill()
- self.broker = self.amqp_broker(port_holder=self.port_holder)
-
- batch2 = ["message-%s" % (i+1) for i in range(10000, 20000)]
- for m in batch2:
- sender.stdin.write(m + "\n")
- sender.stdin.flush()
-
- sender.stdin.close()
-
- last = None
- m = receiver.stdout.readline().rstrip()
- while len(m):
- last = m
- m = receiver.stdout.readline().rstrip()
- assert last == "message-20000", (last)
-
- """ Create and return a broker with AMQP 1.0 support """
- def amqp_broker(self):
- assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in"
- self.port_holder = HaPort(self) #reserve port
- args = ["--load-module", BrokerTest.amqp_lib,
- "--socket-fd=%s" % self.port_holder.fileno,
- "--listen-disable=tcp",
- "--log-enable=trace+:Protocol",
- "--log-enable=info+"]
- return BrokerTest.broker(self, args, port=self.port_holder.port)
-
- def amqp_broker(self, port_holder=None):
- assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in"
- if port_holder:
- args = ["--load-module", BrokerTest.amqp_lib,
- "--socket-fd=%s" % port_holder.fileno,
- "--listen-disable=tcp",
- "--log-enable=trace+:Protocol",
- "--log-enable=info+"]
- return BrokerTest.broker(self, args, port=port_holder.port)
- else:
- args = ["--load-module", BrokerTest.amqp_lib,
- "--log-enable=trace+:Protocol",
- "--log-enable=info+"]
- return BrokerTest.broker(self, args)
-
-
-if __name__ == "__main__":
- shutil.rmtree("brokertest.tmp", True)
- os.execvp("qpid-python-test",
- ["qpid-python-test", "-m", "interlink_tests"] + sys.argv[1:])