diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2014-01-09 17:19:59 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2014-01-09 17:19:59 +0000 |
| commit | 643fd0c2ae8f501676be48c02c06d913a3419436 (patch) | |
| tree | 4319f35fced7d3f83aea72ef8df73f0cd7b93b86 | |
| parent | 30d213dc1e6d743f2f0abb44c8bc91868d5126b1 (diff) | |
| download | qpid-python-643fd0c2ae8f501676be48c02c06d913a3419436.tar.gz | |
QPID-5362: WIP: Linearstore: No store tools exist for examining the journals. This checkin is work-in-progress.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1556888 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/tools/src/py/qls/__init__.py | 19 | ||||
| -rw-r--r-- | qpid/tools/src/py/qls/efp.py | 145 | ||||
| -rw-r--r-- | qpid/tools/src/py/qls/err.py | 177 | ||||
| -rw-r--r-- | qpid/tools/src/py/qls/jrnl.py | 762 | ||||
| -rwxr-xr-x | qpid/tools/src/py/qpid_qls_analyze.py | 80 |
5 files changed, 1183 insertions, 0 deletions
diff --git a/qpid/tools/src/py/qls/__init__.py b/qpid/tools/src/py/qls/__init__.py new file mode 100644 index 0000000000..d8a500d9d8 --- /dev/null +++ b/qpid/tools/src/py/qls/__init__.py @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + diff --git a/qpid/tools/src/py/qls/efp.py b/qpid/tools/src/py/qls/efp.py new file mode 100644 index 0000000000..8ce160f7c7 --- /dev/null +++ b/qpid/tools/src/py/qls/efp.py @@ -0,0 +1,145 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import os.path +import qls.err + +class EfpManager(object): + """ + Top level class to analyze the Qpid Linear Store (QLS) directory for the partitions that make up the + Empty File Pool (EFP). + """ + def __init__(self, directory): + if not os.path.exists(directory): + raise qls.err.InvalidQlsDirectoryNameError(directory) + self.directory = directory + self.partitions = [] + def report(self): + print 'Found', len(self.partitions), 'partition(s).' + if (len(self.partitions)) > 0: + EfpPartition.print_report_table_header() + for ptn in self.partitions: + ptn.print_report_table_line() + print + for ptn in self.partitions: + ptn.report() + def run(self, args): + for dir_entry in os.listdir(self.directory): + try: + efpp = EfpPartition(os.path.join(self.directory, dir_entry)) + efpp.scan() + self.partitions.append(efpp) + except qls.err.InvalidPartitionDirectoryNameError: + pass + +class EfpPartition(object): + """ + Class that represents a EFP partition. Each partition contains one or more Empty File Pools (EFPs). + """ + PTN_DIR_PREFIX = 'p' + EFP_DIR_NAME = 'efp' + def __init__(self, directory): + self.base_dir = os.path.basename(directory) + if self.base_dir[0] is not EfpPartition.PTN_DIR_PREFIX: + raise qls.err.InvalidPartitionDirectoryNameError(directory) + try: + self.partition_number = int(self.base_dir[1:]) + except ValueError: + raise qls.err.InvalidPartitionDirectoryNameError(directory) + self.directory = directory + self.pools = [] + self.efp_count = 0 + self.tot_file_count = 0 + self.tot_file_size_kb = 0 + def get_directory(self): + return self.directory + def get_efp_count(self): + return self.efp_count + def get_name(self): + return self.base_dir + def get_number(self): + return self.partition_number + def get_number_pools(self): + return len(self.pools) + def get_tot_file_count(self): + return self.tot_file_count + def get_tot_file_size_kb(self): + return self.tot_file_size_kb + @staticmethod + def print_report_table_header(): + print 'p_no no_efp tot_files tot_size_kb directory' + print '---- ------ --------- ----------- ---------' + def print_report_table_line(self): + print '%4d %6d %9d %11d %s' % (self.get_number(), self.get_efp_count(), self.get_tot_file_count(), + self.get_tot_file_size_kb(), self.get_directory()) + def report(self): + print 'Partition %s:' % self.base_dir + EmptyFilePool.print_report_table_header() + for pool in self.pools: + pool.print_report_table_line() + print + def scan(self): + if os.path.exists(self.directory): + efp_dir = os.path.join(self.directory, EfpPartition.EFP_DIR_NAME) + for dir_entry in os.listdir(efp_dir): + efp = EmptyFilePool(os.path.join(efp_dir, dir_entry)) + self.efp_count += 1 + self.tot_file_count += efp.get_tot_file_count() + self.tot_file_size_kb += efp.get_tot_file_size_kb() + self.pools.append(efp) + +class EmptyFilePool(object): + """ + Class that represents a single Empty File Pool within a partition. Each EFP contains pre-formatted linear store + journal files (but it may also be empty). + """ + EFP_DIR_SUFFIX = 'k' + def __init__(self, directory): + self.base_dir = os.path.basename(directory) + if self.base_dir[-1] is not EmptyFilePool.EFP_DIR_SUFFIX: + raise qls.err.InvalidEfpDirectoryNameError(directory) + try: + self.data_size_kb = int(os.path.basename(self.base_dir)[:-1]) + except ValueError: + raise qls.err.InvalidEfpDirectoryNameError(directory) + self.directory = directory + self.files = os.listdir(directory) + def get_directory(self): + return self.directory + def get_file_data_size_kb(self): + return self.data_size_kb + def get_name(self): + return self.base_dir + def get_tot_file_count(self): + return len(self.files) + def get_tot_file_size_kb(self): + return self.data_size_kb * len(self.files) + @staticmethod + def print_report_table_header(): + print 'file_size_kb file_count tot_file_size_kb efp_directory' + print '------------ ---------- ---------------- -------------' + def print_report_table_line(self): + print '%12d %10d %16d %s' % (self.get_file_data_size_kb(), self.get_tot_file_count(), + self.get_tot_file_size_kb(), self.get_directory()) + +# ============================================================================= + +if __name__ == "__main__": + print "This is a library, and cannot be executed." diff --git a/qpid/tools/src/py/qls/err.py b/qpid/tools/src/py/qls/err.py new file mode 100644 index 0000000000..11b12985cb --- /dev/null +++ b/qpid/tools/src/py/qls/err.py @@ -0,0 +1,177 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# --- Parent classes + +class QlsError(Exception): + """Base error class for QLS errors and exceptions""" + def __init__(self): + Exception.__init__(self) + +class QlsRecordError(QlsError): + """Base error class for individual records""" + def __init__(self, file_header, record): + QlsError.__init__(self) + self.file_header = file_header + self.record = record + def __str__(self): + return 'queue="%s" file_id=0x%x record_offset=0x%x record_id=0x%x' % \ + (self.file_header.queue_name, self.file_header.file_num, self.record.file_offset, self.record.record_id) + +# --- Error classes + +class AlreadyLockedError(QlsRecordError): + """Transactional record to be locked is already locked""" + def __init__(self, file_header, record): + QlsRecordError.__init__(self, file_header, record) + def __str__(self): + return 'Transactional operation already locked in TransactionMap: ' + QlsRecordError.__str__(self) + +class DatqaSizeError(QlsError): + """Error class for Data size mismatch""" + def __init__(self, expected_size, actual_size, data_str): + QlsError.__init__(self) + self.expected_size = expected_size + self.actual_size = actual_size + self.xid_str = data_str + def __str__(self): + return 'Inconsistent data size: expected:%d; actual:%d; data="%s"' % \ + (self.expected_size, self.actual_size, self.data_str) + +class DuplicateRecordIdError(QlsRecordError): + """Duplicate Record Id in Enqueue Map""" + def __init__(self, file_header, record): + QlsRecordError.__init__(self, file_header, record) + def __str__(self): + return 'Duplicate Record Id in enqueue map: ' + QlsRecordError.__str__(self) + +class ExternalDataError(QlsRecordError): + """Data present in Enqueue record when external data flag is set""" + def __init__(self, file_header, record): + QlsRecordError.__init__(self, file_header, record) + def __str__(self): + return 'Data present in external data record: ' + QlsRecordError.__str__(self) + +class FirstRecordOffsetMismatchError(QlsRecordError): + """First Record Offset (FRO) does not match file header""" + def __init__(self, file_header, record): + QlsRecordError.__init__(self, file_header, record) + def __str__(self): + return 'First record offset mismatch: ' + QlsRecordError.__str__(self) + ' expected_offset=0x%x' % \ + self.file_header.first_record_offset + +class InvalidEfpDirectoryNameError(QlsError): + """Invalid EFP directory name - should be NNNNk, where NNNN is a number (of any length)""" + def __init__(self, directory_name): + QlsError.__init__(self) + self.directory_name = directory_name + def __str__(self): + return 'Invalid EFP directory name "%s"' % self.directory_name + +class InvalidPartitionDirectoryNameError(QlsError): + """Invalid EFP partition name - should be pNNN, where NNN is a 3-digit partition number""" + def __init__(self, directory_name): + QlsError.__init__(self) + self.directory_name = directory_name + def __str__(self): + return 'Invalid partition directory name "%s"' % self.directory_name + +class InvalidQlsDirectoryNameError(QlsError): + """Invalid QLS directory name""" + def __init__(self, directory_name): + QlsError.__init__(self) + self.directory_name = directory_name + def __str__(self): + return 'Invalid QLS directory name "%s"' % self.directory_name + +class InvalidRecordTypeError(QlsRecordError): + """Error class for any operation using an invalid record type""" + def __init__(self, file_header, record, error_msg): + QlsRecordError.__init__(self, file_header, record) + self.error_msg = error_msg + def __str__(self): + return 'Invalid record type: ' + QlsRecordError.__str__(self) + ':' + self.error_msg + +class InvalidRecordVersionError(QlsRecordError): + """Invalid record version""" + def __init__(self, file_header, record, expected_version): + QlsRecordError.__init__(self, file_header, record) + self.expected_version = expected_version + def __str__(self): + return 'Invalid record version: queue="%s" ' + QlsRecordError.__str__(self) + \ + ' ver_found=0x%x ver_expected=0x%x' % (self.record_header.version, self.expected_version) + +class NoMoreFilesInJournalError(QlsError): + """Raised when trying to obtain the next file in the journal and there are no more files""" + def __init__(self, queue_name): + QlsError.__init__(self) + self.queue_name = queue_name + def __str__(self): + return 'No more journal files in queue "%s"' % self.queue_name + +class NonTransactionalRecordError(QlsRecordError): + """Transactional operation on non-transactional record""" + def __init__(self, file_header, record, operation): + QlsRecordError.__init__(self, file_header, record) + self.operation = operation + def __str__(self): + return 'Transactional operation on non-transactional record: ' + QlsRecordError.__str__() + \ + ' operation=%s' % self.operation + +class RecordIdNotFoundError(QlsRecordError): + """Record Id not found in enqueue map""" + def __init__(self, file_header, record): + QlsRecordError.__init__(self, file_header, record) + def __str__(self): + return 'Record Id not found in enqueue map: ' + QlsRecordError.__str__() + +class RecordNotLockedError(QlsRecordError): + """Record in enqueue map is not locked""" + def __init__(self, file_header, record): + QlsRecordError.__init__(self, file_header, record) + def __str__(self): + return 'Record in enqueue map is not locked: ' + QlsRecordError.__str__() + +class UnexpectedEndOfFileError(QlsError): + """The bytes read from a file is less than that expected""" + def __init__(self, size_read, size_expected, file_offset, file_name): + QlsError.__init__(self) + self.size_read = size_read + self.size_expected = size_expected + self.file_offset = file_offset + self.file_name = file_name + def __str__(self): + return 'Tried to read %d at offset %d in file "%s"; only read %d' % \ + (self.size_read, self.file_offset, self.file_name, self.size_expected) + +class XidSizeError(QlsError): + """Error class for Xid size mismatch""" + def __init__(self, expected_size, actual_size, xid_str): + QlsError.__init__(self) + self.expected_size = expected_size + self.actual_size = actual_size + self.xid_str = xid_str + def __str__(self): + return 'Inconsistent xid size: expected:%d; actual:%d; xid="%s"' % \ + (self.expected_size, self.actual_size, self.xid_str) + +# ============================================================================= + +if __name__ == "__main__": + print "This is a library, and cannot be executed." diff --git a/qpid/tools/src/py/qls/jrnl.py b/qpid/tools/src/py/qls/jrnl.py new file mode 100644 index 0000000000..484cbb4e91 --- /dev/null +++ b/qpid/tools/src/py/qls/jrnl.py @@ -0,0 +1,762 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import os.path +import qls.err +import struct +from time import gmtime, strftime + +class HighCounter(object): + def __init__(self): + self.num = 0 + def check(self, num): + if self.num < num: + self.num = num + def get(self): + return self.num + def get_next(self): + self.num += 1 + return self.num + +class JournalRecoveryManager(object): + TPL_DIR_NAME = 'tpl' + JRNL_DIR_NAME = 'jrnl' + def __init__(self, directory): + if not os.path.exists(directory): + raise qls.err.InvalidQlsDirectoryNameError(directory) + self.directory = directory + self.tpl = None + self.journals = {} + self.high_rid_counter = HighCounter() + def report(self, print_stats_flag): + if not self.tpl is None: + self.tpl.report(print_stats_flag) + for queue_name in sorted(self.journals.keys()): + self.journals[queue_name].report(print_stats_flag) + def run(self, args): + tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME) + if os.path.exists(tpl_dir): + self.tpl = Journal(os.path.join(self.directory, tpl_dir), None) + self.tpl.recover(self.high_rid_counter) + print + jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME) + if os.path.exists(jrnl_dir): + for dir_entry in os.listdir(jrnl_dir): + jrnl = Journal(os.path.join(jrnl_dir, dir_entry), self.tpl.get_outstanding_transaction_list()) + jrnl.recover(self.high_rid_counter) + self.journals[jrnl.get_queue_name()] = jrnl + if args.txn: + jrnl.reconcile_transactions(self.high_rid_counter) + print + +class EnqueueMap(object): + """ + Map of enqueued records in a QLS journal + """ + def __init__(self, journal): + self.journal = journal + self.enq_map = {} + def add(self, file_header, enq_record, locked_flag): + if enq_record.record_id in self.enq_map: + raise qls.err.DuplicateRecordIdError(self.journal.current_file_header, enq_record) + self.enq_map[enq_record.record_id] = [file_header.file_num, enq_record, locked_flag] + def contains(self, rid): + """Return True if the map contains the given rid""" + return rid in self.enq_map + def delete(self, file_header, deq_record): + if deq_record.dequeue_record_id in self.enq_map: + del self.enq_map[deq_record.dequeue_record_id] + else: + raise qls.err.RecordIdNotFoundError(file_header, deq_record) + def lock(self, file_header, dequeue_record): + if not dequeue_record.dequeue_record_id in self.enq_map: + raise qls.err.RecordIdNotFoundError(file_header, dequeue_record) + self.enq_map[dequeue_record.dequeue_record_id][2] = True + def report_str(self, show_stats, show_records): + """Return a string containing a text report for all records in the map""" + if len(self.enq_map) == 0: + return 'No enqueued records found.' + rstr = '%d enqueued records found' % len(self.enq_map) + if show_records: + rstr += ":" + rid_list = self.enq_map.keys() + rid_list.sort() + for rid in rid_list: + file_num, record, locked_flag = self.enq_map[rid] + if locked_flag: + lock_str = '[LOCKED]' + else: + lock_str = '' + rstr += '\n %d:%s %s' % (file_num, record, lock_str) + else: + rstr += '.' + return rstr + def unlock(self, file_header, dequeue_record): + """Set the transaction lock for a given record_id to False""" + if dequeue_record.dequeue_record_id in self.enq_map: + if self.enq_map[dequeue_record.dequeue_record_id][2]: + self.enq_map[dequeue_record.dequeue_record_id][2] = False + else: + raise qls.err.RecordNotLockedError(file_header, dequeue_record) + else: + raise qls.err.RecordIdNotFoundError(file_header, dequeue_record) + +class TransactionMap(object): + """ + Map of open transactions used while recovering a QLS journal + """ + def __init__(self, enq_map): + self.txn_map = {} + self.enq_map = enq_map + def abort(self, xid): + """Perform an abort operation for the given xid record""" + for file_header, record, lock_flag in self.txn_map[xid]: + if isinstance(record, DequeueRecord): + if self.enq_map.contains(record.dequeue_record_id): + self.enq_map.unlock(file_header, record) + del self.txn_map[xid] + def add(self, file_header, record): + if record.xid is None: + raise qls.err.NonTransactionalRecordError(file_header, record, 'TransactionMap.add()') + if isinstance(record, DequeueRecord): + try: + self.enq_map.lock(file_header, record) + except qls.err.RecordIdNotFoundError: + # Not in emap, look for rid in tmap - should not happen in practice + txn_op = self._find_record_id(record.xid, record.dequeue_record_id) + if txn_op != None: + if txn_op[2]: + raise qls.err.AlreadyLockedError(file_header, record) + txn_op[2] = True + if record.xid in self.txn_map: + self.txn_map[record.xid].append([file_header, record, False]) # append to existing list + else: + self.txn_map[record.xid] = [[file_header, record, False]] # create new list + def commit(self, xid): + """Perform a commit operation for the given xid record""" + mismatch_list = [] + for file_header, record, lock in self.txn_map[xid]: + if isinstance(record, EnqueueRecord): + self.enq_map.add(file_header, record, lock) # Transfer enq to emap + else: + if self.enq_map.contains(record.dequeue_record_id): + self.enq_map.unlock(file_header, record) + self.enq_map.delete(file_header, record) + else: + mismatch_list.append('0x%x' % record.dequeue_record_id) + del self.txn_map[xid] + return mismatch_list + def contains(self, xid): + """Return True if the xid exists in the map; False otherwise""" + return xid in self.txn_map + def delete(self, file_header, transaction_record): + """Remove a transaction record from the map using either a commit or abort header""" + if transaction_record.magic[-1] == 'c': + return self.commit(transaction_record.xid) + if transaction_record.magic[-1] == 'a': + self.abort(transaction_record.xid) + else: + raise qls.err.InvalidRecordTypeError(file_header, transaction_record, 'delete from Transaction Map') + def get_xid_list(self): + return self.txn_map.keys() + def report_str(self, show_stats, show_records): + """Return a string containing a text report for all records in the map""" + if len(self.txn_map) == 0: + return 'No outstanding transactions found.' + rstr = '%d outstanding transaction(s)' % len(self.txn_map) + if show_records: + rstr += ':' + for xid, list in self.txn_map.iteritems(): + rstr += '\n %s containing %d operations:' % (Utils.format_xid(xid), len(list)) + for file_header, record, locked_flag in list: + rstr += '\n %d:%s' % (file_header.file_num, record) + else: + rstr += '.' + return rstr + def _find_record_id(self, xid, record_id): + """ Search for and return map list with supplied rid.""" + if xid in self.txn_map: + for txn_op in self.txn_map[xid]: + if txn_op[1].record_id == record_id: + return txn_op + for this_xid in self.txn_map.iterkeys(): + for txn_op in self.txn_map[this_xid]: + if txn_op[1].record_id == record_id: + return txn_op + return None + +class Journal(object): + """ + Instance of a Qpid Linear Store (QLS) journal. + """ + class JournalStatistics(object): + """Journal statistics""" + def __init__(self): + self.total_record_count = 0 + self.transient_record_count = 0 + self.filler_record_count = 0 + self.enqueue_count = 0 + self.dequeue_count = 0 + self.transaction_record_count = 0 + self.transaction_enqueue_count = 0 + self.transaction_dequeue_count = 0 + self.transaction_commit_count = 0 + self.transaction_abort_count = 0 + self.transaction_operation_count = 0 + def __str__(self): + fstr = 'Total record count: %d\n' + \ + 'Transient record count: %d\n' + \ + 'Filler_record_count: %d\n' + \ + 'Enqueue_count: %d\n' + \ + 'Dequeue_count: %d\n' + \ + 'Transaction_record_count: %d\n' + \ + 'Transaction_enqueue_count: %d\n' + \ + 'Transaction_dequeue_count: %d\n' + \ + 'Transaction_commit_count: %d\n' + \ + 'Transaction_abort_count: %d\n' + \ + 'Transaction_operation_count: %d\n' + return fstr % (self.total_record_count, + self.transient_record_count, + self.filler_record_count, + self.enqueue_count, + self.dequeue_count, + self.transaction_record_count, + self.transaction_enqueue_count, + self.transaction_dequeue_count, + self.transaction_commit_count, + self.transaction_abort_count, + self.transaction_operation_count) + + def __init__(self, directory, xid_prepared_list): + self.directory = directory + self.queue_name = os.path.basename(directory) + self.files = {} + self.enq_map = EnqueueMap(self) + self.txn_map = TransactionMap(self.enq_map) + self.file_itr = None + self.current_file_header = None + self.first_rec_flag = None + self.statistics = Journal.JournalStatistics() + self.warnings = [] + self.xid_prepared_list = xid_prepared_list # This is None for the TPL instance only + def get_outstanding_transaction_list(self): + return self.txn_map.get_xid_list() + def get_queue_name(self): + return self.queue_name + def recover(self, high_rid_counter): + print 'Recovering', self.queue_name #DEBUG + self._analyze_files() + try: + while self._get_next_record(high_rid_counter): + pass + except qls.err.NoMoreFilesInJournalError: + #print '[No more files in journal]' # DEBUG + #print #DEBUG + pass + def reconcile_transactions(self, high_rid_counter): + if not self.xid_prepared_list is None: # ie This is not the TPL instance + print 'Reconcile outstanding prepared transactions:' + for xid in self.txn_map.get_xid_list(): + if xid in self.xid_prepared_list: + print ' Committing', Utils.format_xid(xid) + self.txn_map.commit(xid) + else: + print ' Aborting', Utils.format_xid(xid) + self.txn_map.abort(xid) + def report(self, print_stats_flag): + print 'Journal "%s":' % self.queue_name + if print_stats_flag: + print str(self.statistics) + print self.enq_map.report_str(True, True) + print self.txn_map.report_str(True, True) + print 'file_num p_no efp journal_file' + print '-------- ---- ----- ------------' + for file_num, file_hdr in self.files.iteritems(): + comment = '<uninitialized>' if file_hdr.file_num == 0 else '' + print '%8d %4d %4dk %s %s' % (file_num, file_hdr.partition_num, file_hdr.efp_data_size_kb, + os.path.basename(file_hdr.file_handle.name), comment) + print + #--- protected functions --- + def _analyze_files(self): + for dir_entry in os.listdir(self.directory): + dir_entry_bits = dir_entry.split('.') + if len(dir_entry_bits) == 2 and dir_entry_bits[1] == JournalRecoveryManager.JRNL_DIR_NAME: + fq_file_name = os.path.join(self.directory, dir_entry) + file_handle = open(fq_file_name) + args = Utils.load_args(file_handle, RecordHeader) + file_hdr = FileHeader(*args) + file_hdr.init(file_handle, *Utils.load_args(file_handle, FileHeader)) + if not file_hdr.is_valid(file_hdr): + break + file_hdr.load(file_handle) + Utils.skip(file_handle, file_hdr.file_header_size_sblks * Utils.SBLK_SIZE) + self.files[file_hdr.file_num] = file_hdr + self.file_itr = iter(self.files) + def _check_file(self): + if not self.current_file_header is None and not self.current_file_header.is_end_of_file(): + return + self._get_next_file() + self.current_file_header.file_handle.seek(self.current_file_header.first_record_offset) + def _get_next_file(self): + if not self.current_file_header is None: + if not self.current_file_header.file_handle.closed: # sanity check, should not be necessary + self.current_file_header.file_handle.close() + file_num = 0 + try: + while file_num == 0: + file_num = self.file_itr.next() + except StopIteration: + pass + if file_num == 0: + raise qls.err.NoMoreFilesInJournalError(self.queue_name) + self.current_file_header = self.files[file_num] + self.first_rec_flag = True + print self.current_file_header + #print '[file_num=0x%x]' % self.current_file_header.file_num #DEBUG + def _get_next_record(self, high_rid_counter): + self._check_file() + this_record = Utils.load(self.current_file_header.file_handle, RecordHeader) + if not this_record.is_valid(self.current_file_header): + return False + if self.first_rec_flag: + if this_record.file_offset != self.current_file_header.first_record_offset: + raise qls.err.FirstRecordOffsetMismatchError(self.current_file_header, this_record) + self.first_rec_flag = False + high_rid_counter.check(this_record.record_id) + self.statistics.total_record_count += 1 + if isinstance(this_record, EnqueueRecord): + self._handle_enqueue_record(this_record) + print this_record + elif isinstance(this_record, DequeueRecord): + self._handle_dequeue_record(this_record) + print this_record + elif isinstance(this_record, TransactionRecord): + self._handle_transaction_record(this_record) + print this_record + else: + self.statistics.filler_record_count += 1 + Utils.skip(self.current_file_header.file_handle, Utils.DBLK_SIZE) + return True + def _handle_enqueue_record(self, enqueue_record): + while enqueue_record.load(self.current_file_header.file_handle): + self._get_next_file() + if enqueue_record.is_external() and enqueue_record.data != None: + raise qls.err.ExternalDataError(self.current_file_header, enqueue_record) + if enqueue_record.is_transient(): + self.statistics.transient_record_count += 1 + return + if enqueue_record.xid_size > 0: + self.txn_map.add(self.current_file_header, enqueue_record) + self.statistics.transaction_operation_count += 1 + self.statistics.transaction_record_count += 1 + self.statistics.transaction_enqueue_count += 1 + else: + self.enq_map.add(self.current_file_header, enqueue_record, False) + self.statistics.enqueue_count += 1 + #print enqueue_record, # DEBUG + def _handle_dequeue_record(self, dequeue_record): + while dequeue_record.load(self.current_file_header.file_handle): + self._get_next_file() + if dequeue_record.xid_size > 0: + if self.xid_prepared_list is None: # ie this is the TPL + dequeue_record.transaction_prepared_list_flag = True + self.txn_map.add(self.current_file_header, dequeue_record) + self.statistics.transaction_operation_count += 1 + self.statistics.transaction_record_count += 1 + self.statistics.transaction_dequeue_count += 1 + else: + try: + self.enq_map.delete(self.current_file_header, dequeue_record) + except qls.err.RecordIdNotFoundError: + pass # TODO: handle missing enqueue warning here + self.statistics.dequeue_count += 1 + #print dequeue_record, # DEBUG + def _handle_transaction_record(self, transaction_record): + while transaction_record.load(self.current_file_header.file_handle): + self._get_next_file() + if transaction_record.magic[-1] == 'a': + self.statistics.transaction_abort_count += 1 + else: + self.statistics.transaction_commit_count += 1 + if self.txn_map.contains(transaction_record.xid): + mismatched_rids = self.txn_map.delete(self.current_file_header, transaction_record) + if mismatched_rids != None and len(mismatched_rids) > 0: + self.warnings.append('WARNING: transactional dequeues not found in enqueue map; rids=%s' % + mismatched_rids) + else: + self.warnings.append('WARNING: %s not found in transaction map' % \ + Utils.format_xid(transaction_record.xid)) +# if transaction_record.magic[-1] == 'c': # commits only +# self._txn_obj_list[hdr.xid] = hdr + self.statistics.transaction_record_count += 1 + #print transaction_record, # DEBUG + def _load_data(self, record): + while not record.is_complete: + record.load(self.current_file_header.file_handle) + +class RecordHeader(object): + FORMAT = '<4s2H2Q' + def __init__(self, file_offset, magic, version, user_flags, serial, record_id): + self.file_offset = file_offset + self.magic = magic + self.version = version + self.user_flags = user_flags + self.serial = serial + self.record_id = record_id + def load(self, file_handle): + pass + @staticmethod + def discriminate(args): + """Use the last char in the header magic to determine the header type""" + return _CLASSES.get(args[1][-1], RecordHeader) + def is_empty(self): + """Return True if this record is empty (ie has a magic of 0x0000""" + return self.magic == '\x00'*4 + def is_valid(self, file_header): + """Check that this record is valid""" + if self.is_empty(): + return False + if self.magic[:3] != 'QLS' or self.magic[3] not in ['a', 'c', 'd', 'e', 'f', 'x']: + return False + if self.magic[-1] != 'x': + if self.version != Utils.RECORD_VERSION: + raise qls.err.InvalidRecordVersionError(file_header, self, Utils.RECORD_VERSION) + if self.serial != file_header.serial: + #print '[serial mismatch at 0x%x]' % self.file_offset #DEBUG + #print #DEBUG + return False + return True + def __str__(self): + """Return string representation of this header""" + if self.is_empty(): + return '0x%08x: <empty>' % (self.file_offset) + if self.magic[-1] == 'x': + return '0x%08x: [X]' % (self.file_offset) + if self.magic[-1] in ['a', 'c', 'd', 'e', 'f', 'x']: + return '0x%08x: [%c v=%d f=0x%04x rid=0x%x]' % \ + (self.file_offset, self.magic[-1].upper(), self.version, self.user_flags, self.record_id) + return '0x%08x: <error, unknown magic "%s" (possible overwrite boundary?)>' % (self.file_offset, self.magic) + +class RecordTail(object): + FORMAT = '<4sL2Q' + def __init__(self, file_handle): + self.file_offset = file_handle.tell() + self.complete = False + self.read_size = struct.calcsize(RecordTail.FORMAT) + self.fbin = file_handle.read(self.read_size) + if len(self.fbin) >= self.read_size: + self.complete = True + self.xmagic, self.checksum, self.serial, self.record_id = struct.unpack(RecordTail.FORMAT, self.fbin) + def load(self, file_handle): + """Used to continue load of RecordTail object if it is split between files""" + if not self.is_complete: + self.fbin += file_handle.read(self.read_size - len(self.fbin)) + if (len(self.fbin)) >= self.read_size: + self.complete = True + self.xmagic, self.checksum, self.serial, self.record_id = struct.unpack(RecordTail.FORMAT, self.fbin) + def is_complete(self): + return self.complete + def __str__(self): + """Return a string representation of the this RecordTail instance""" + magic = Utils.inv_str(self.xmagic) + return '[%c cs=0x%x rid=0x%x]' % (magic[-1].upper(), self.checksum, self.record_id) + +class FileHeader(RecordHeader): + FORMAT = '<2H4x5QH' + def init(self, file_handle, file_offset, file_header_size_sblks, partition_num, efp_data_size_kb, + first_record_offset, timestamp_sec, timestamp_ns, file_num, queue_name_len): + self.file_handle = file_handle + self.file_header_size_sblks = file_header_size_sblks + self.partition_num = partition_num + self.efp_data_size_kb = efp_data_size_kb + self.first_record_offset = first_record_offset + self.timestamp_sec = timestamp_sec + self.timestamp_ns = timestamp_ns + self.file_num = file_num + self.queue_name_len = queue_name_len + self.queue_name = None + def load(self, file_handle): + self.queue_name = file_handle.read(self.queue_name_len) + def get_file_size(self): + """Sum of file header size and data size""" + return (self.file_header_size_sblks * Utils.SBLK_SIZE) + (self.efp_data_size_kb * 1024) + def is_end_of_file(self): + return self.file_handle.tell() >= self.get_file_size() + def timestamp_str(self): + """Get the timestamp of this record in string format""" + time = gmtime(self.timestamp_sec) + fstr = '%%a %%b %%d %%H:%%M:%%S.%09d %%Y' % (self.timestamp_ns) + return strftime(fstr, time) + def __str__(self): + """Return a string representation of the this FileHeader instance""" + return '%s fnum=%d fro=0x%08x p=%d s=%dk t=%s' % (RecordHeader.__str__(self), self.file_num, self.first_record_offset, + self.partition_num, self.efp_data_size_kb, self.timestamp_str()) + +class EnqueueRecord(RecordHeader): + FORMAT = '<2Q' + EXTERNAL_FLAG_MASK = 0x20 + TRANSIENT_FLAG_MASK = 0x10 + def init(self, file_offset, xid_size, data_size): + self.xid_size = xid_size + self.data_size = data_size + self.xid = None + self.xid_complete = False + self.data = None + self.data_complete = False + self.record_tail = None + def is_external(self): + return self.user_flags & EnqueueRecord.EXTERNAL_FLAG_MASK > 0 + def is_transient(self): + return self.user_flags & EnqueueRecord.TRANSIENT_FLAG_MASK > 0 + def load(self, file_handle): + """Return True when load is incomplete and must be called again with new file handle""" + self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, self.xid_size) + if not self.xid_complete: + return True + if self.is_external(): + self.data_complete = True + else: + self.data, self.data_complete = Utils.load_data(file_handle, self.data, self.data_size) + if not self.data_complete: + return True + if self.xid_size > 0 or self.data_size > 0: + if self.record_tail is None: + self.record_tail = RecordTail(file_handle) + elif not self.record_tail.is_complete(): + self.record_tail.load(file_handle) # Continue loading partially loaded tail + return not self.record_tail.is_complete() + return False + def _print_flags(self): + """Utility function to decode the flags field in the header and print a string representation""" + fstr = '' + if self.is_transient(): + fstr = '[TRANSIENT' + if self.is_external(): + if len(fstr) > 0: + fstr += ',EXTERNAL' + else: + fstr = '*EXTERNAL' + if len(fstr) > 0: + fstr += ']' + return fstr + def __str__(self): + """Return a string representation of the this EnqueueRecord instance""" + if self.record_tail is None: + record_tail_str = '' + else: + record_tail_str = str(self.record_tail) + return '%s %s %s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size), + Utils.format_data(self.data_size, self.data), record_tail_str, self._print_flags()) + +class DequeueRecord(RecordHeader): + FORMAT = '<2Q' + TXN_COMPLETE_COMMIT_FLAG = 0x10 + def init(self, file_offset, dequeue_record_id, xid_size): + self.dequeue_record_id = dequeue_record_id + self.xid_size = xid_size + self.transaction_prepared_list_flag = False + self.xid = None + self.xid_complete = False + self.record_tail = None + def is_transaction_complete_commit(self): + return self.user_flags & DequeueRecord.TXN_COMPLETE_COMMIT_FLAG > 0 + def load(self, file_handle): + """Return True when load is incomplete and must be called again with new file handle""" + self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, self.xid_size) + if not self.xid_complete: + return True + if self.xid_size > 0: + if self.record_tail is None: + self.record_tail = RecordTail(file_handle) + elif not self.record_tail.is_complete(): + self.record_tail.load(file_handle) + return not self.record_tail.is_complete() + return False + def _print_flags(self): + """Utility function to decode the flags field in the header and print a string representation""" + if self.transaction_prepared_list_flag: + if self.is_transaction_complete_commit(): + return '[COMMIT]' + else: + return '[ABORT]' + return '' + def __str__(self): + """Return a string representation of the this DequeueRecord instance""" + if self.record_tail is None: + record_tail_str = '' + else: + record_tail_str = str(self.record_tail) + return '%s %s drid=0x%x %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size), + self.dequeue_record_id, record_tail_str, self._print_flags()) + +class TransactionRecord(RecordHeader): + FORMAT = '<Q' + def init(self, file_offset, xid_size): + self.xid_size = xid_size + self.xid = None + self.xid_complete = False + self.record_tail = None + def load(self, file_handle): + """Return True when load is incomplete and must be called again with new file handle""" + self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, self.xid_size) + if not self.xid_complete: + return True + if self.xid_size > 0: + if self.record_tail is None: + self.record_tail = RecordTail(file_handle) + elif not self.record_tail.is_complete(): + self.record_tail.load(file_handle) + return not self.record_tail.is_complete() + return False + def __str__(self): + """Return a string representation of the this TransactionRecord instance""" + if self.record_tail is None: + record_tail_str = '' + else: + record_tail_str = str(self.record_tail) + return '%s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size), record_tail_str) + +class Utils(object): + """Class containing utility functions for dealing with the journal""" + DBLK_SIZE = 128 + RECORD_VERSION = 2 + SBLK_SIZE = 4096 + __printchars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ ' + @staticmethod + def format_data(dsize, data): + """Format binary data for printing""" + if data == None: + return '' + # << DEBUG >> + begin = data.find('msg') + end = data.find('\0', begin) + return 'data="%s"' % data[begin:end] + # << END DEBUG + if Utils._is_printable(data): + datastr = Utils._split_str(data) + else: + datastr = Utils._hex_split_str(data) + if dsize != len(data): + raise qls.err.DataSizeError(dsize, len(data), datastr) + return 'data(%d)="%s"' % (dsize, datastr) + @staticmethod + def format_xid(xid, xidsize=None): + """Format binary XID for printing""" + if xid == None and xidsize != None: + if xidsize > 0: + raise qls.err.XidSizeError(xidsize, 0, None) + return '' + if Utils._is_printable(xid): + xidstr = '"%s"' % Utils._split_str(xid) + else: + xidstr = '0x%s' % Utils._hex_split_str(xid) + if xidsize == None: + xidsize = len(xid) + elif xidsize != len(xid): + raise qls.err.XidSizeError(xidsize, len(xid), xidstr) + return 'xid(%d)=%s' % (xidsize, xidstr) + @staticmethod + def inv_str(in_string): + """Perform a binary 1's compliment (invert all bits) on a binary string""" + istr = '' + for index in range(0, len(in_string)): + istr += chr(~ord(in_string[index]) & 0xff) + return istr + @staticmethod + def load(file_handle, klass): + """Load a record of class klass from a file""" + args = Utils.load_args(file_handle, klass) + subclass = klass.discriminate(args) + result = subclass(*args) # create instance of record + if subclass != klass: + result.init(*Utils.load_args(file_handle, subclass)) + return result + @staticmethod + def load_args(file_handle, klass): + """Load the arguments from class klass""" + size = struct.calcsize(klass.FORMAT) + foffs = file_handle.tell(), + fbin = file_handle.read(size) + if len(fbin) != size: + raise qls.err.UnexpectedEndOfFileError(len(fbin), size, foffs, file_handle.name) + return foffs + struct.unpack(klass.FORMAT, fbin) + @staticmethod + def load_data(file_handle, element, element_size): + if element_size == 0: + return element, True + if element is None: + element = file_handle.read(element_size) + else: + read_size = element_size - len(element) + element += file_handle.read(read_size) + return element, len(element) == element_size + @staticmethod + def skip(file_handle, boundary): + """Read and discard disk bytes until the next multiple of boundary""" + file_handle.read(Utils._rem_bytes_in_block(file_handle, boundary)) + #--- protected functions --- + @staticmethod + def _hex_str(in_str, begin, end): + """Return a binary string as a hex string""" + hstr = '' + for index in range(begin, end): + if Utils._is_printable(in_str[index]): + hstr += in_str[index] + else: + hstr += '\\%02x' % ord(in_str[index]) + return hstr + @staticmethod + def _hex_split_str(in_str, split_size = 50): + """Split a hex string into two parts separated by an ellipsis""" +# if len(in_str) <= split_size: +# return Utils._hex_str(in_str, 0, len(in_str)) +# return Utils._hex_str(in_str, 0, 10) + ' ... ' + Utils._hex_str(in_str, len(in_str)-10, len(in_str)) + return ''.join(x.encode('hex') for x in reversed(in_str)) + @staticmethod + def _is_printable(in_str): + """Return True if in_str in printable; False otherwise.""" + return in_str.strip(Utils.__printchars) == '' + @staticmethod + def _rem_bytes_in_block(file_handle, block_size): + """Return the remaining bytes in a block""" + foffs = file_handle.tell() + return (Utils._size_in_blocks(foffs, block_size) * block_size) - foffs + @staticmethod + def _size_in_blocks(size, block_size): + """Return the size in terms of data blocks""" + return int((size + block_size - 1) / block_size) + @staticmethod + def _split_str(in_str, split_size = 50): + """Split a string into two parts separated by an ellipsis if it is longer than split_size""" + if len(in_str) < split_size: + return in_str + return in_str[:25] + ' ... ' + in_str[-25:] + +# ============================================================================= + +_CLASSES = { + 'a': TransactionRecord, + 'c': TransactionRecord, + 'd': DequeueRecord, + 'e': EnqueueRecord, +} + +if __name__ == '__main__': + print 'This is a library, and cannot be executed.' diff --git a/qpid/tools/src/py/qpid_qls_analyze.py b/qpid/tools/src/py/qpid_qls_analyze.py new file mode 100755 index 0000000000..b00a91cfca --- /dev/null +++ b/qpid/tools/src/py/qpid_qls_analyze.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import argparse +import os +import os.path +from qls import efp +from qls import jrnl + +class QqpdLinearStoreAnalyzer(object): + """ + Top-level store analyzer. Will analyze the directory in args.qls_dir as the top-level Qpid Linear Store (QLS) + directory. The following may be analyzed: + * The Empty File Pool (if --efp is specified in the arguments) + * The Linear Store + * The Transaction Prepared List (TPL) + """ + QLS_ANALYZE_VERSION = '0.1' + def __init__(self): + self.args = None + self._process_args() + self.efp_manager = efp.EfpManager(self.args.qls_dir) + self.jrnl_recovery_mgr = jrnl.JournalRecoveryManager(self.args.qls_dir) + def _analyze_efp(self): + self.efp_manager.run(self.args) + def _analyze_journals(self): + self.jrnl_recovery_mgr.run(self.args) + def _process_args(self): + parser = argparse.ArgumentParser(description = 'Qpid Linear Store Analyzer') + parser.add_argument('qls_dir', metavar='DIR', + help='Qpid Linear Store (QLS) directory to be analyzed') + parser.add_argument('--efp', action='store_true', + help='Analyze the Emtpy File Pool (EFP) and show stats') + parser.add_argument('--stats', action='store_true', + help='Print journal record stats') + parser.add_argument('--txn', action='store_true', + help='Reconcile incomplete transactions') + parser.add_argument('--version', action='version', version='%(prog)s ' + + QqpdLinearStoreAnalyzer.QLS_ANALYZE_VERSION) + self.args = parser.parse_args() + if not os.path.exists(self.args.qls_dir): + parser.error('Journal path "%s" does not exist' % self.args.qls_dir) + def report(self): + if self.args.efp: + self.efp_manager.report() + self.jrnl_recovery_mgr.report(self.args.stats) + def run(self): + if self.args.efp: + self._analyze_efp() + self._analyze_journals() + +#============================================================================== +# main program +#============================================================================== + +if __name__ == "__main__": + # TODO: Remove this in due course + print 'WARNING: This program is still a work in progress and is largely untested.' + print '* USE AT YOUR OWN RISK *' + print + M = QqpdLinearStoreAnalyzer() + M.run() + M.report() |
