summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/legacystore
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-04-07 21:22:55 +0000
committerAlan Conway <aconway@apache.org>2014-04-07 21:22:55 +0000
commita6f044f40d70b73c320cc909169e3518909365e2 (patch)
treea23df30ce4b52f1d23e55a0afa0e90c1fa20ffd6 /qpid/cpp/src/tests/legacystore
parent8ea0f79d78edd0a0825547ecc618e3fa63a2b93f (diff)
downloadqpid-python-a6f044f40d70b73c320cc909169e3518909365e2.tar.gz
QPID-5560: HA tests do not use AMQP 1.0
The HA tests were using only AMQP 0-10. Modified the tests to use AMQP 1.0 if available (still use 0-10 if 1.0 is not available) Fixed bugs uncovered both in the tests and in the AMQP 1.0 implementation. Summary of changes: - brokertest.py: configurable support for of swig vs. native and amqp0-10 vs. 1.0 - default to swig+amqp1.0 if swig is available, native+amqp0-10 otherwise - qpidtoollibs/broker.py: enable use of swig client with BrokerAgent - Swig python client: - support for passing client_properties/properties. - expose AddressHelper pn_data read/write as PnData helper class - set sender/receiver capacity on creation - limited disposition support - rejected messages. - support for additional timeout parameters - expose messaging::Logger, allow log configuration to be set from python. - ha_tests.py: - bind, delete policies not supported by AMQP 1.0, switched to using BrokerAgent QMF. - pass protocol:amqp1.0 connection-option to c++ test clients (qpid-send, qpid-receive) - TX tests forsce use of 0-10 protocol (but still with Swig client if enabled.) - Broker fixes: - Queue::Settings::isTemporary was set in the 0-10 SessionAdapter, moved to Broker::createQueue. - broker::amqp::Session was always setting an exclusive owner in createQueue git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1585588 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/legacystore')
-rw-r--r--qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py45
-rw-r--r--qpid/cpp/src/tests/legacystore/python_tests/resize.py45
-rw-r--r--qpid/cpp/src/tests/legacystore/python_tests/store_test.py79
3 files changed, 89 insertions, 80 deletions
diff --git a/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py b/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py
index 3c62740a62..37c12601be 100644
--- a/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py
+++ b/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py
@@ -17,15 +17,20 @@
# under the License.
#
+import os
+
from brokertest import EXPECT_EXIT_OK
from store_test import StoreTest, Qmf, store_args
from qpid.messaging import *
+import qpid.messaging, brokertest
+brokertest.qm = qpid.messaging # FIXME aconway 2014-04-04: Tests fail with SWIG client.
+
class ExchangeQueueTests(StoreTest):
"""
Simple tests of the broker exchange and queue types
"""
-
+
def test_direct_exchange(self):
"""Test Direct exchange."""
broker = self.broker(store_args(), name="test_direct_exchange", expect=EXPECT_EXIT_OK)
@@ -34,11 +39,11 @@ class ExchangeQueueTests(StoreTest):
broker.send_message("a", msg1)
broker.send_message("b", msg2)
broker.terminate()
-
+
broker = self.broker(store_args(), name="test_direct_exchange")
self.check_message(broker, "a", msg1, True)
self.check_message(broker, "b", msg2, True)
-
+
def test_topic_exchange(self):
"""Test Topic exchange."""
broker = self.broker(store_args(), name="test_topic_exchange", expect=EXPECT_EXIT_OK)
@@ -56,17 +61,17 @@ class ExchangeQueueTests(StoreTest):
msg2 = Message("Message2", durable=True, correlation_id="Msg0004")
snd2.send(msg2)
broker.terminate()
-
+
broker = self.broker(store_args(), name="test_topic_exchange")
self.check_message(broker, "a", msg1, True)
self.check_message(broker, "b", msg1, True)
self.check_messages(broker, "c", [msg1, msg2], True)
self.check_message(broker, "d", msg2, True)
self.check_message(broker, "e", msg2, True)
-
-
+
+
def test_legacy_lvq(self):
- """Test legacy LVQ."""
+ """Test legacy LVQ."""
broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK)
ma1 = Message("A1", durable=True, correlation_id="Msg0005", properties={"qpid.LVQ_key":"A"})
ma2 = Message("A2", durable=True, correlation_id="Msg0006", properties={"qpid.LVQ_key":"A"})
@@ -77,7 +82,7 @@ class ExchangeQueueTests(StoreTest):
broker.send_messages("lvq-test", [mb1, ma1, ma2, mb2, mb3, mc1],
xprops="arguments:{\"qpid.last_value_queue\":True}")
broker.terminate()
-
+
broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK)
ssn = self.check_messages(broker, "lvq-test", [ma2, mb3, mc1], empty=True, ack=False)
# Add more messages while subscriber is active (no replacement):
@@ -89,11 +94,11 @@ class ExchangeQueueTests(StoreTest):
broker.send_messages("lvq-test", [mc2, mc3, ma3, ma4, mc4], session=ssn)
ssn.acknowledge()
broker.terminate()
-
+
broker = self.broker(store_args(), name="test_lvq")
self.check_messages(broker, "lvq-test", [ma4, mc4], True)
-
-
+
+
def test_fanout_exchange(self):
"""Test Fanout Exchange"""
broker = self.broker(store_args(), name="test_fanout_exchange", expect=EXPECT_EXIT_OK)
@@ -107,7 +112,7 @@ class ExchangeQueueTests(StoreTest):
msg2 = Message("Msg2", durable=True, correlation_id="Msg0002")
snd.send(msg2)
broker.terminate()
-
+
broker = self.broker(store_args(), name="test_fanout_exchange")
self.check_messages(broker, "q1", [msg1, msg2], True)
self.check_messages(broker, "q2", [msg1, msg2], True)
@@ -124,18 +129,18 @@ class ExchangeQueueTests(StoreTest):
m2 = rcv.fetch()
ssn.acknowledge(message=m2, disposition=Disposition(REJECTED))
broker.terminate()
-
+
broker = self.broker(store_args(), name="test_message_reject")
qmf = Qmf(broker)
assert qmf.queue_message_count("tmr") == 0
-
+
def test_route(self):
""" Test the recovery of a route (link and bridge objects."""
broker = self.broker(store_args(), name="test_route", expect=EXPECT_EXIT_OK)
qmf = Qmf(broker)
qmf_broker_obj = qmf.get_objects("broker")[0]
-
+
# create a "link"
link_args = {"host":"a.fake.host.com", "port":9999, "durable":True,
"authMechanism":"PLAIN", "username":"guest", "password":"guest",
@@ -143,16 +148,16 @@ class ExchangeQueueTests(StoreTest):
result = qmf_broker_obj.create("link", "test-link", link_args, False)
self.assertEqual(result.status, 0, result)
link = qmf.get_objects("link")[0]
-
+
# create bridge
bridge_args = {"link":"test-link", "src":"amq.direct", "dest":"amq.fanout",
"key":"my-key", "durable":True}
result = qmf_broker_obj.create("bridge", "test-bridge", bridge_args, False);
self.assertEqual(result.status, 0, result)
bridge = qmf.get_objects("bridge")[0]
-
+
broker.terminate()
-
+
# recover the link and bridge
broker = self.broker(store_args(), name="test_route")
qmf = Qmf(broker)
@@ -189,7 +194,7 @@ class AlternateExchangePropertyTests(StoreTest):
self.assertTrue(qmf.query_exchange("testExch", alt_exchange_name = "altExch"),
"Alternate exchange property not found or is incorrect on exchange \"testExch\".")
qmf.close()
-
+
def test_queue(self):
"""Queue alternate exchange property persistexchangeNamece test"""
broker = self.broker(store_args(), name="test_queue", expect=EXPECT_EXIT_OK)
@@ -226,7 +231,7 @@ class RedeliveredTests(StoreTest):
msg = Message(msg_content, durable=True)
broker.send_message("testQueue", msg)
broker.terminate()
-
+
broker = self.broker(store_args(), name="test_broker_recovery")
rcv_msg = broker.get_message("testQueue")
self.assertEqual(msg_content, rcv_msg.content)
diff --git a/qpid/cpp/src/tests/legacystore/python_tests/resize.py b/qpid/cpp/src/tests/legacystore/python_tests/resize.py
index 469e0f6730..e719b755da 100644
--- a/qpid/cpp/src/tests/legacystore/python_tests/resize.py
+++ b/qpid/cpp/src/tests/legacystore/python_tests/resize.py
@@ -26,8 +26,11 @@ from qpid.datatypes import uuid4
from store_test import StoreTest, store_args
from qpid.messaging import Message
+import qpid.messaging, brokertest
+brokertest.qm = qpid.messaging # TODO aconway 2014-04-04: Tests fail with SWIG client.
+
class ResizeTest(StoreTest):
-
+
resize_tool = os.getenv("QPID_STORE_RESIZE_TOOL", "qpid-store-resize")
print resize_tool
def _resize_store(self, store_dir, queue_name, resize_num_files, resize_file_size, exp_fail):
@@ -47,21 +50,21 @@ class ResizeTest(StoreTest):
finally:
p.stdout.close()
return res
-
+
def _resize_test(self, queue_name, num_msgs, msg_size, resize_num_files, resize_file_size, init_num_files = 8,
init_file_size = 24, exp_fail = False, wait_time = None):
# Using a sender will force the creation of an empty persistent queue which is needed for some tests
broker = self.broker(store_args(), name="broker", expect=EXPECT_EXIT_OK, wait=wait_time)
ssn = broker.connect().session()
snd = ssn.sender("%s; {create:always, node:{durable:True}}" % queue_name)
-
+
msgs = []
for index in range(0, num_msgs):
msg = Message(self.make_message(index, msg_size), durable=True, id=uuid4(), correlation_id="msg-%04d"%index)
msgs.append(msg)
snd.send(msg)
broker.terminate()
-
+
res = self._resize_store(os.path.join(self.dir, "broker", "rhm", "jrnl"), queue_name, resize_num_files,
resize_file_size, exp_fail)
if res != 0:
@@ -70,95 +73,95 @@ class ResizeTest(StoreTest):
self.fail("ERROR: Resize operation failed with return code %d" % res)
elif exp_fail:
self.fail("ERROR: Resize operation succeeded, but a failure was expected")
-
+
broker = self.broker(store_args(), name="broker")
self.check_messages(broker, queue_name, msgs, True)
-
- # TODO: Check the physical files to check number and size are as expected.
+
+ # TODO: Check the physical files to check number and size are as expected.
class SimpleTest(ResizeTest):
"""
Simple tests of the resize utility for resizing a journal to larger and smaller sizes.
"""
-
+
def test_empty_store_same(self):
self._resize_test(queue_name = "empty_store_same",
num_msgs = 0, msg_size = 0,
init_num_files = 8, init_file_size = 24,
resize_num_files = 8, resize_file_size = 24)
-
+
def test_empty_store_up(self):
self._resize_test(queue_name = "empty_store_up",
num_msgs = 0, msg_size = 0,
init_num_files = 8, init_file_size = 24,
resize_num_files = 16, resize_file_size = 48)
-
+
def test_empty_store_down(self):
self._resize_test(queue_name = "empty_store_down",
num_msgs = 0, msg_size = 0,
init_num_files = 8, init_file_size = 24,
resize_num_files = 6, resize_file_size = 12)
-
-# TODO: Put into long tests, make sure there is > 128GB free disk space
+
+# TODO: Put into long tests, make sure there is > 128GB free disk space
# def test_empty_store_max(self):
# self._resize_test(queue_name = "empty_store_max",
# num_msgs = 0, msg_size = 0,
# init_num_files = 8, init_file_size = 24,
# resize_num_files = 64, resize_file_size = 32768,
# wait_time = 120)
-
+
def test_empty_store_min(self):
self._resize_test(queue_name = "empty_store_min",
num_msgs = 0, msg_size = 0,
init_num_files = 8, init_file_size = 24,
resize_num_files = 4, resize_file_size = 1)
-
+
def test_basic_up(self):
self._resize_test(queue_name = "basic_up",
num_msgs = 100, msg_size = 10000,
init_num_files = 8, init_file_size = 24,
resize_num_files = 16, resize_file_size = 48)
-
+
def test_basic_down(self):
self._resize_test(queue_name = "basic_down",
num_msgs = 100, msg_size = 10000,
init_num_files = 8, init_file_size = 24,
resize_num_files = 4, resize_file_size = 15)
-
+
def test_basic_low(self):
self._resize_test(queue_name = "basic_low",
num_msgs = 100, msg_size = 10000,
init_num_files = 8, init_file_size = 24,
resize_num_files = 4, resize_file_size = 4,
exp_fail = True)
-
+
def test_basic_under(self):
self._resize_test(queue_name = "basic_under",
num_msgs = 100, msg_size = 10000,
init_num_files = 8, init_file_size = 24,
resize_num_files = 4, resize_file_size = 3,
exp_fail = True)
-
+
def test_very_large_msg_up(self):
self._resize_test(queue_name = "very_large_msg_up",
num_msgs = 4, msg_size = 2000000,
init_num_files = 8, init_file_size = 24,
resize_num_files = 16, resize_file_size = 48)
-
+
def test_very_large_msg_down(self):
self._resize_test(queue_name = "very_large_msg_down",
num_msgs = 4, msg_size = 2000000,
init_num_files = 16, init_file_size = 64,
resize_num_files = 16, resize_file_size = 48)
-
+
def test_very_large_msg_low(self):
self._resize_test(queue_name = "very_large_msg_low",
num_msgs = 4, msg_size = 2000000,
init_num_files = 8, init_file_size = 24,
resize_num_files = 7, resize_file_size = 20,
exp_fail = True)
-
+
def test_very_large_msg_under(self):
self._resize_test(queue_name = "very_large_msg_under",
num_msgs = 4, msg_size = 2000000,
diff --git a/qpid/cpp/src/tests/legacystore/python_tests/store_test.py b/qpid/cpp/src/tests/legacystore/python_tests/store_test.py
index 2fcab4e38e..cc846aefd4 100644
--- a/qpid/cpp/src/tests/legacystore/python_tests/store_test.py
+++ b/qpid/cpp/src/tests/legacystore/python_tests/store_test.py
@@ -22,10 +22,13 @@ from brokertest import BrokerTest
from qpid.messaging import Empty
from qmf.console import Session
-
+import qpid.messaging, brokertest
+brokertest.qm = qpid.messaging # TODO aconway 2014-04-04: Tests fail with SWIG client.
+
+
def store_args(store_dir = None):
"""Return the broker args necessary to load the async store"""
- assert BrokerTest.store_lib
+ assert BrokerTest.store_lib
if store_dir == None:
return []
return ["--store-dir", store_dir]
@@ -51,7 +54,7 @@ class Qmf:
else:
amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type, passive=passive, durable=durable,
arguments=arguments)
-
+
def add_queue(self, queue_name, alt_exchange_name=None, passive=False, durable=False, arguments = None):
"""Add a new queue"""
amqp_session = self.__broker.getAmqpSession()
@@ -62,7 +65,7 @@ class Qmf:
durable=durable, arguments=arguments)
else:
amqp_session.queue_declare(queue_name, passive=passive, durable=durable, arguments=arguments)
-
+
def delete_queue(self, queue_name):
"""Delete an existing queue"""
amqp_session = self.__broker.getAmqpSession()
@@ -84,24 +87,24 @@ class Qmf:
return found
except Exception:
return False
-
+
def query_exchange(self, exchange_name, alt_exchange_name=None):
"""Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known
value."""
return self._query(exchange_name, "exchange", "org.apache.qpid.broker", alt_exchange_name)
-
+
def query_queue(self, queue_name, alt_exchange_name=None):
"""Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known
value."""
return self._query(queue_name, "queue", "org.apache.qpid.broker", alt_exchange_name)
-
+
def queue_message_count(self, queue_name):
"""Query the number of messages on a queue"""
queue_list = self.__session.getObjects(_class="queue", _name=queue_name)
if len(queue_list):
return queue_list[0].msgDepth
-
+
def queue_empty(self, queue_name):
"""Check if a queue is empty (has no messages waiting)"""
return self.queue_message_count(queue_name) == 0
@@ -109,7 +112,7 @@ class Qmf:
def get_objects(self, target_class, target_package="org.apache.qpid.broker"):
return self.__session.getObjects(_class=target_class, _package=target_package)
-
+
def close(self):
self.__session.delBroker(self.__broker)
self.__session = None
@@ -119,7 +122,7 @@ class StoreTest(BrokerTest):
"""
This subclass of BrokerTest adds some convenience test/check functions
"""
-
+
def _chk_empty(self, queue, receiver):
"""Check if a queue is empty (has no more messages)"""
try:
@@ -141,9 +144,9 @@ class StoreTest(BrokerTest):
else:
buff += chr(ord('a') + (index % 26))
return buff + msg
-
+
# Functions for formatting address strings
-
+
@staticmethod
def _fmt_csv(string_list, list_braces = None):
"""Format a list using comma-separation. Braces are optionally added."""
@@ -163,16 +166,16 @@ class StoreTest(BrokerTest):
if list_braces != None:
str_ += list_braces[1]
return str_
-
+
def _fmt_map(self, string_list):
"""Format a map {l1, l2, l3, ...} from a string list. Each item in the list must be a formatted map
element('key:val')."""
- return self._fmt_csv(string_list, list_braces="{}")
-
+ return self._fmt_csv(string_list, list_braces="{}")
+
def _fmt_list(self, string_list):
"""Format a list [l1, l2, l3, ...] from a string list."""
- return self._fmt_csv(string_list, list_braces="[]")
-
+ return self._fmt_csv(string_list, list_braces="[]")
+
def addr_fmt(self, node_name, **kwargs):
"""Generic AMQP to new address formatter. Takes common (but not all) AMQP options and formats an address
string."""
@@ -190,12 +193,12 @@ class StoreTest(BrokerTest):
x_declare_list = kwargs.get("x_declare_list", [])
x_bindings_list = kwargs.get("x_bindings_list", [])
x_subscribe_list = kwargs.get("x_subscribe_list", [])
-
+
node_flag = not link and (node_type != None or durable or len(x_declare_list) > 0 or len(x_bindings_list) > 0)
link_flag = link and (link_name != None or durable or link_reliability != None or len(x_declare_list) > 0 or
len(x_bindings_list) > 0 or len(x_subscribe_list) > 0)
assert not (node_flag and link_flag)
-
+
opt_str_list = []
if create_policy != None:
opt_str_list.append("create: %s" % create_policy)
@@ -231,7 +234,7 @@ class StoreTest(BrokerTest):
if len(opt_str_list) > 0:
addr_str += "; %s" % self._fmt_map(opt_str_list)
return addr_str
-
+
def snd_addr(self, node_name, **kwargs):
""" Create a send (node) address"""
# Get keyword args
@@ -245,7 +248,7 @@ class StoreTest(BrokerTest):
ftd_size = kwargs.get("ftd_size")
policy = kwargs.get("policy", "flow-to-disk")
exchage_type = kwargs.get("exchage_type")
-
+
create_policy = None
if auto_create:
create_policy = "always"
@@ -265,10 +268,10 @@ class StoreTest(BrokerTest):
x_declare_list.append("arguments: %s" % self._fmt_map(queue_policy))
if exchage_type != None:
x_declare_list.append("type: %s" % exchage_type)
-
+
return self.addr_fmt(node_name, topic=topic, create_policy=create_policy, delete_policy=delete_policy,
node_type=node_type, durable=durable, x_declare_list=x_declare_list)
-
+
def rcv_addr(self, node_name, **kwargs):
""" Create a receive (link) address"""
# Get keyword args
@@ -282,7 +285,7 @@ class StoreTest(BrokerTest):
ftd_count = kwargs.get("ftd_count")
ftd_size = kwargs.get("ftd_size")
policy = kwargs.get("policy", "flow-to-disk")
-
+
create_policy = None
if auto_create:
create_policy = "always"
@@ -291,7 +294,7 @@ class StoreTest(BrokerTest):
delete_policy = "always"
mode = None
if browse:
- mode = "browse"
+ mode = "browse"
x_declare_list = ["\"exclusive\": %s" % exclusive]
if ftd_count != None or ftd_size != None:
queue_policy = ["\'qpid.policy_type\': %s" % policy]
@@ -308,11 +311,11 @@ class StoreTest(BrokerTest):
return self.addr_fmt(node_name, create_policy=create_policy, delete_policy=delete_policy, mode=mode, link=True,
link_name=link_name, durable=durable, x_declare_list=x_declare_list,
x_bindings_list=x_bindings_list, link_reliability=reliability)
-
+
def check_message(self, broker, queue, exp_msg, transactional=False, empty=False, ack=True, browse=False):
"""Check that a message is on a queue by dequeuing it and comparing it to the expected message"""
return self.check_messages(broker, queue, [exp_msg], transactional, empty, ack, browse)
-
+
def check_messages(self, broker, queue, exp_msg_list, transactional=False, empty=False, ack=True, browse=False,
emtpy_flag=False):
"""Check that messages is on a queue by dequeuing them and comparing them to the expected messages"""
@@ -341,8 +344,8 @@ class StoreTest(BrokerTest):
if transactional:
ssn.commit()
return ssn
-
-
+
+
# Functions for finding strings in the broker log file (or other files)
@staticmethod
@@ -353,7 +356,7 @@ class StoreTest(BrokerTest):
return file_handle.read()
finally:
file_handle.close()
-
+
def _get_hits(self, broker, search):
"""Find all occurrences of the search in the broker log (eliminating possible duplicates from msgs on multiple
queues)"""
@@ -361,9 +364,9 @@ class StoreTest(BrokerTest):
hits = []
for hit in search.findall(self._read_file(broker.log)):
if hit not in hits:
- hits.append(hit)
+ hits.append(hit)
return hits
-
+
def _reconsile_hits(self, broker, ftd_msgs, release_hits):
"""Remove entries from list release_hits if they match the message id in ftd_msgs. Check for remaining
release_hits."""
@@ -382,35 +385,33 @@ class StoreTest(BrokerTest):
for hit in release_hits:
err += " %s\n" % hit
self.assert_(False, err)
-
+
def check_msg_release(self, broker, ftd_msgs):
""" Check for 'Content released' messages in broker log for messages in ftd_msgs"""
hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
"Content released$", re.MULTILINE))
self._reconsile_hits(broker, ftd_msgs, hits)
-
+
def check_msg_release_on_commit(self, broker, ftd_msgs):
""" Check for 'Content released on commit' messages in broker log for messages in ftd_msgs"""
hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
"Content released on commit$", re.MULTILINE))
self._reconsile_hits(broker, ftd_msgs, hits)
-
+
def check_msg_release_on_recover(self, broker, ftd_msgs):
""" Check for 'Content released after recovery' messages in broker log for messages in ftd_msgs"""
hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
"Content released after recovery$", re.MULTILINE))
self._reconsile_hits(broker, ftd_msgs, hits)
-
+
def check_msg_block(self, broker, ftd_msgs):
"""Check for 'Content release blocked' messages in broker log for messages in ftd_msgs"""
hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
"Content release blocked$", re.MULTILINE))
self._reconsile_hits(broker, ftd_msgs, hits)
-
+
def check_msg_block_on_commit(self, broker, ftd_msgs):
"""Check for 'Content release blocked' messages in broker log for messages in ftd_msgs"""
hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
"Content release blocked on commit$", re.MULTILINE))
self._reconsile_hits(broker, ftd_msgs, hits)
-
-