#!/usr/bin/env python # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # """ A set of tests that can be run against a foreign AMQP 1.0 broker. RUNNING WITH A FOREIGN BROKER: 1. Start the broker 2. Create persistent queues named: interop-a interop-b interop-q tx-1 tx-2 3. Export the environment variable QPID_INTEROP_URL with the URL to connect to your broker in the form [user[:password]@]host[:port] 4. From the build directory run this test: ctest -VV -R interop_tests If QPID_INTEROP_URL is not set, a qpidd broker will be started for the test. """ import os, sys, shutil, subprocess import qpid_messaging as qm from brokertest import * URL='QPID_INTEROP_URL' class InteropTest(BrokerTest): def setUp(self): super(InteropTest, self).setUp() self.url = os.environ[URL] self.connect_opts = ['--broker', self.url, '--connection-options', '{protocol:amqp1.0}'] def connect(self, **kwargs): """Python connection to interop URL""" c = qm.Connection.establish(self.url, protocol='amqp1.0', **kwargs) self.teardown_add(c) return c def drain(self, queue, connection=None): """ Drain a queue to make sure it is empty. Throw away the messages. """ c = connection or self.connect() r = c.session().receiver(queue) try: while True: r.fetch(timeout=0) r.session.acknowledge() except qm.Empty: pass r.close() def clear_queue(self, queue, connection=None, properties=None, durable=False): """ Make empty queue, prefix with self.id(). Create if needed, drain if needed @return queue name. """ queue = "interop-%s" % queue c = connection or self.connect() props = {'create':'always'} if durable: props['node'] = {'durable':True} if properties: props.update(properties) self.drain("%s;%s" % (queue, props), c) return queue class SimpleTest(InteropTest): """Simple test to check the broker is responding.""" def test_send_receive_python(self): c = self.connect() q = self.clear_queue('q', c) s = c.session() s.sender(q).send('foo') self.assertEqual('foo', s.receiver(q).fetch().content) def test_send_receive_cpp(self): q = self.clear_queue('q') args = ['-b', self.url, '-a', q] self.check_output(['qpid-send', '--content-string=cpp_foo'] + args) self.assertEqual('cpp_foo', self.check_output(['qpid-receive'] + args).strip()) class PythonTxTest(InteropTest): def tx_simple_setup(self): """Start a transaction, remove messages from queue a, add messages to queue b""" c = self.connect() qa, qb = self.clear_queue('a', c, durable=True), self.clear_queue('b', c, durable=True) # Send messages to a, no transaction. sa = c.session().sender(qa+";{create:always,node:{durable:true}}") tx_msgs = ['x', 'y', 'z'] for m in tx_msgs: sa.send(qm.Message(content=m, durable=True)) # Receive messages from a, in transaction. tx = c.session(transactional=True) txr = tx.receiver(qa) self.assertEqual(tx_msgs, [txr.fetch(1).content for i in xrange(3)]) tx.acknowledge() # Send messages to b, transactional, mixed with non-transactional. sb = c.session().sender(qb+";{create:always,node:{durable:true}}") txs = tx.sender(qb) msgs = [str(i) for i in xrange(3)] for tx_m, m in zip(tx_msgs, msgs): txs.send(tx_m); sb.send(m) tx.sync() return tx, qa, qb def test_tx_simple_commit(self): tx, qa, qb = self.tx_simple_setup() s = self.connect().session() assert_browse(s, qa, []) assert_browse(s, qb, ['0', '1', '2']) tx.commit() assert_browse(s, qa, []) assert_browse(s, qb, ['0', '1', '2', 'x', 'y', 'z']) def test_tx_simple_rollback(self): tx, qa, qb = self.tx_simple_setup() s = self.connect().session() assert_browse(s, qa, []) assert_browse(s, qb, ['0', '1', '2']) tx.rollback() assert_browse(s, qa, ['x', 'y', 'z']) assert_browse(s, qb, ['0', '1', '2']) def test_tx_sequence(self): tx = self.connect().session(transactional=True) notx = self.connect().session() q = self.clear_queue('q', tx.connection, durable=True) s = tx.sender(q) r = tx.receiver(q) s.send('a') tx.commit() assert_browse(notx, q, ['a']) s.send('b') tx.commit() assert_browse(notx, q, ['a', 'b']) self.assertEqual('a', r.fetch().content) tx.acknowledge(); tx.commit() assert_browse(notx, q, ['b']) s.send('z') tx.rollback() assert_browse(notx, q, ['b']) self.assertEqual('b', r.fetch().content) tx.acknowledge(); tx.rollback() assert_browse(notx, q, ['b']) class CppTxTest(InteropTest): def test_txtest2(self): self.popen(["qpid-txtest2"] + self.connect_opts).assert_exit_ok() def test_send_receive(self): q = self.clear_queue('q', durable=True) sender = self.popen(["qpid-send", "--address", q, "--messages=100", "--tx=10", "--durable=yes"] + self.connect_opts) receiver = self.popen(["qpid-receive", "--address", q, "--messages=90", "--timeout=10", "--tx=10"] + self.connect_opts) sender.assert_exit_ok() receiver.assert_exit_ok() expect = [long(i) for i in range(91, 101)] sn = lambda m: m.properties["sn"] assert_browse(self.connect().session(), q, expect, transform=sn) if __name__ == "__main__": if not BrokerTest.amqp_tx_supported: BrokerTest.amqp_tx_warning() print "Skipping interop_tests" sys.exit(0) outdir = "interop_tests.tmp" shutil.rmtree(outdir, True) cmd = ["qpid-python-test", "-m", "interop_tests", "-DOUTDIR=%s"%outdir] + sys.argv[1:] if "QPID_PORT" in os.environ: del os.environ["QPID_PORT"] if os.environ.get(URL): os.execvp(cmd[0], cmd) else: dir = os.getcwd() class StartBroker(BrokerTest): def start_qpidd(self): pass test = StartBroker('start_qpidd') class Config: def __init__(self): self.defines = { 'OUTDIR': outdir } test.configure(Config()) test.setUp() os.environ[URL] = test.broker().host_port() os.chdir(dir) p = subprocess.Popen(cmd) status = p.wait() test.tearDown() sys.exit(status)