#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import errno, os, time from brokertest import * from qpid import compat, session from qpid.util import connect from qpid.connection import Connection from qpid.datatypes import Message, uuid4 from qpid.queue import Empty class StoreTests(BrokerTest): XA_RBROLLBACK = 1 XA_RBTIMEOUT = 2 XA_OK = 0 tx_counter = 0 def configure(self, config): self.config = config self.defines = self.config.defines BrokerTest.configure(self, config) def setup_connection(self): socket = connect(self._broker.host(), self._broker.port()) return Connection(sock=socket) def setup_session(self): self.conn.start() return self.conn.session(str(uuid4())) def start_session(self): self.conn = self.setup_connection() self.ssn = self.setup_session() def setUp(self): BrokerTest.setUp(self) self._broker = self.broker() self.start_session() def cycle_broker(self): # tearDown resets working dir; change it back after. d = os.getcwd() BrokerTest.tearDown(self) os.chdir(d) self._broker = None self._broker = self.broker() self.conn = self.setup_connection() self.ssn = self.setup_session() def xid(self, txid): StoreTests.tx_counter += 1 branchqual = "v%s" % StoreTests.tx_counter return self.ssn.xid(format=0, global_id=txid, branch_id=branchqual) def testDurableExchange(self): try: self.ssn.exchange_delete(exchange="DE1") except: # restart the session busted from the exception self.start_session() self.ssn.exchange_declare(exchange="DE1", type="direct", durable=True) response = self.ssn.exchange_query(name="DE1") self.assert_(response.durable) self.assert_(not response.not_found) # Cycle the broker and make sure the exchange recovers self.cycle_broker() response = self.ssn.exchange_query(name="DE1") self.assert_(response.durable) self.assert_(not response.not_found) self.ssn.exchange_delete(exchange="DE1") def testDurableQueues(self): try: self.ssn.queue_delete(queue="DQ1") except: self.start_session() self.ssn.queue_declare(queue="DQ1", durable=True) response = self.ssn.queue_query(queue="DQ1") self.assertEqual("DQ1", response.queue) self.assert_(response.durable) # Cycle the broker and make sure the queue recovers self.cycle_broker() response = self.ssn.queue_query(queue="DQ1") self.assertEqual("DQ1", response.queue) self.assert_(response.durable) self.ssn.queue_delete(queue="DQ1") def testDurableBindings(self): try: self.ssn.exchange_unbind(queue="DB_Q1", exchange="DB_E1", binding_key="K1") except: self.start_session() try: self.ssn.exchange_delete(exchange="DB_E1") except: self.start_session() try: self.ssn.queue_delete(queue="DB_Q1") except: self.start_session() self.ssn.queue_declare(queue="DB_Q1", durable=True) self.ssn.exchange_declare(exchange="DB_E1", type="direct", durable=True) self.ssn.exchange_bind(exchange="DB_E1", queue="DB_Q1", binding_key="K1") # Cycle the broker and make sure the binding recovers self.cycle_broker() response = self.ssn.exchange_bound(exchange="DB_E1", queue="DB_Q1", binding_key="K1") self.assert_(not response.exchange_not_found) self.assert_(not response.queue_not_found) self.assert_(not response.queue_not_matched) self.assert_(not response.key_not_matched) self.ssn.exchange_unbind(queue="DB_Q1", exchange="DB_E1", binding_key="K1") self.ssn.exchange_delete(exchange="DB_E1") self.ssn.queue_delete(queue="DB_Q1") def testDtxRecoverPrepared(self): try: self.ssn.exchange_unbind(queue="Dtx_Q", exchange="Dtx_E", binding_key="Dtx") except: self.start_session() try: self.ssn.exchange_delete(exchange="Dtx_E") except: self.start_session() try: self.ssn.queue_delete(queue="Dtx_Q") except: self.start_session() self.ssn.queue_declare(queue="Dtx_Q", auto_delete=False, durable=True) self.ssn.exchange_declare(exchange="Dtx_E", type="direct", durable=True) self.ssn.exchange_bind(exchange="Dtx_E", queue="Dtx_Q", binding_key="Dtx") txid = self.xid("DtxRecoverPrepared") self.ssn.dtx_select() self.ssn.dtx_start(xid=txid) # 2 = delivery_mode.persistent dp = self.ssn.delivery_properties(routing_key="Dtx_Q", delivery_mode=2) self.ssn.message_transfer(message=Message(dp, "transactional message")) self.ssn.dtx_end(xid=txid) self.assertEqual(self.XA_OK, self.ssn.dtx_prepare(xid=txid).status) # Cycle the broker and make sure the xid is there, the message is not # queued. self.cycle_broker() # The txid should be recovered and in doubt xids = self.ssn.dtx_recover().in_doubt xid_matched = False for x in xids: self.assertEqual(txid.format, x.format) self.assertEqual(txid.global_id, x.global_id) self.assertEqual(txid.branch_id, x.branch_id) xid_matched = True self.assert_(xid_matched) self.ssn.message_subscribe(destination="dtx_msgs", queue="Dtx_Q", accept_mode=1, acquire_mode=0) self.ssn.message_flow(unit = 1, value = 0xFFFFFFFFL, destination = "dtx_msgs") self.ssn.message_flow(unit = 0, value = 10, destination = "dtx_msgs") message_arrivals = self.ssn.incoming("dtx_msgs") try: message_arrivals.get(timeout=1) assert False, 'Message present in queue before commit' except Empty: pass self.ssn.dtx_select() self.assertEqual(self.XA_OK, self.ssn.dtx_commit(xid=txid, one_phase=False).status) try: msg = message_arrivals.get(timeout=1) self.assertEqual("transactional message", msg.body) except Empty: assert False, 'Message should be present after dtx commit but is not' self.ssn.exchange_unbind(queue="Dtx_Q", exchange="Dtx_E", binding_key="Dtx") self.ssn.exchange_delete(exchange="Dtx_E") self.ssn.queue_delete(queue="Dtx_Q")