diff options
Diffstat (limited to 'qpid/cpp/management/python/lib/qlslibs')
| -rw-r--r-- | qpid/cpp/management/python/lib/qlslibs/__init__.py | 19 | ||||
| -rw-r--r-- | qpid/cpp/management/python/lib/qlslibs/analyze.py | 606 | ||||
| -rw-r--r-- | qpid/cpp/management/python/lib/qlslibs/efp.py | 327 | ||||
| -rw-r--r-- | qpid/cpp/management/python/lib/qlslibs/err.py | 261 | ||||
| -rw-r--r-- | qpid/cpp/management/python/lib/qlslibs/jrnl.py | 394 | ||||
| -rw-r--r-- | qpid/cpp/management/python/lib/qlslibs/utils.py | 216 |
6 files changed, 0 insertions, 1823 deletions
diff --git a/qpid/cpp/management/python/lib/qlslibs/__init__.py b/qpid/cpp/management/python/lib/qlslibs/__init__.py deleted file mode 100644 index d8a500d9d8..0000000000 --- a/qpid/cpp/management/python/lib/qlslibs/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - diff --git a/qpid/cpp/management/python/lib/qlslibs/analyze.py b/qpid/cpp/management/python/lib/qlslibs/analyze.py deleted file mode 100644 index 8c5de05b9e..0000000000 --- a/qpid/cpp/management/python/lib/qlslibs/analyze.py +++ /dev/null @@ -1,606 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Module: qlslibs.analyze - -Classes for recovery and analysis of a Qpid Linear Store (QLS). -""" - -import os.path -import qlslibs.err -import qlslibs.jrnl -import qlslibs.utils - -class HighCounter(object): - def __init__(self): - self.num = 0 - def check(self, num): - if self.num < num: - self.num = num - def get(self): - return self.num - def get_next(self): - self.num += 1 - return self.num - -class JournalRecoveryManager(object): - TPL_DIR_NAME = 'tpl2' - JRNL_DIR_NAME = 'jrnl2' - def __init__(self, directory, args): - if not os.path.exists(directory): - raise qlslibs.err.InvalidQlsDirectoryNameError(directory) - self.directory = directory - self.args = args - self.tpl = None - self.journals = {} - self.high_rid_counter = HighCounter() - self.prepared_list = None - def report(self): - self._reconcile_transactions(self.prepared_list, self.args.txn) - if self.tpl is not None: - self.tpl.report(self.args) - for queue_name in sorted(self.journals.keys()): - self.journals[queue_name].report(self.args) - def run(self): - tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME) - if os.path.exists(tpl_dir): - self.tpl = Journal(tpl_dir, None, self.args) - self.tpl.recover(self.high_rid_counter) - if self.args.show_recovery_recs or self.args.show_all_recs: - print - jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME) - self.prepared_list = self.tpl.txn_map.get_prepared_list() if self.tpl is not None else {} - if os.path.exists(jrnl_dir): - for dir_entry in sorted(os.listdir(jrnl_dir)): - jrnl = Journal(os.path.join(jrnl_dir, dir_entry), self.prepared_list, self.args) - jrnl.recover(self.high_rid_counter) - self.journals[jrnl.get_queue_name()] = jrnl - if self.args.show_recovery_recs or self.args.show_all_recs: - print - print - def _reconcile_transactions(self, prepared_list, txn_flag): - print 'Transaction reconciliation report:' - print '==================================' - print 'Transaction Prepared List (TPL) contains %d open transaction(s):' % len(prepared_list) - for xid in prepared_list.keys(): - commit_flag = prepared_list[xid] - if commit_flag is None: - status = '[Prepared, neither committed nor aborted - assuming commit]' - elif commit_flag: - status = '[Prepared, but interrupted during commit phase]' - else: - status = '[Prepared, but interrupted during abort phase]' - print ' ', qlslibs.utils.format_xid(xid), status - if prepared_list[xid] is None: # Prepared, but not committed or aborted - enqueue_record = self.tpl.get_txn_map_record(xid)[0][1] - dequeue_record = qlslibs.utils.create_record(qlslibs.jrnl.DequeueRecord.MAGIC, \ - qlslibs.jrnl.DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \ - self.tpl.current_journal_file, \ - self.high_rid_counter.get_next(), \ - enqueue_record.record_id, xid, None) - if txn_flag: - self.tpl.add_record(dequeue_record) - print - print 'Open transactions found in queues:' - print '----------------------------------' - for queue_name in sorted(self.journals.keys()): - self.journals[queue_name].reconcile_transactions(prepared_list, txn_flag) - print - if len(prepared_list) > 0: - print 'Creating commit records for the following prepared transactions in TPL:' - for xid in prepared_list.keys(): - print ' ', qlslibs.utils.format_xid(xid) - transaction_record = qlslibs.utils.create_record(qlslibs.jrnl.TransactionRecord.MAGIC_COMMIT, 0, \ - self.tpl.current_journal_file, \ - self.high_rid_counter.get_next(), None, xid, None) - if txn_flag: - self.tpl.add_record(transaction_record) - print - -class EnqueueMap(object): - """ - Map of enqueued records in a QLS journal - """ - def __init__(self, journal): - self.journal = journal - self.enq_map = {} - def add(self, journal_file, enq_record, locked_flag): - if enq_record.record_id in self.enq_map: - raise qlslibs.err.DuplicateRecordIdError(self.journal.current_file_header, enq_record) - self.enq_map[enq_record.record_id] = [journal_file, enq_record, locked_flag] - def contains(self, rid): - """Return True if the map contains the given rid""" - return rid in self.enq_map - def delete(self, journal_file, deq_record): - if deq_record.dequeue_record_id in self.enq_map: - enq_list = self.enq_map[deq_record.dequeue_record_id] - del self.enq_map[deq_record.dequeue_record_id] - return enq_list - else: - raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, deq_record) - def get(self, record_id): - if record_id in self.enq_map: - return self.enq_map[record_id] - return None - def lock(self, journal_file, dequeue_record): - if dequeue_record.dequeue_record_id not in self.enq_map: - raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record) - self.enq_map[dequeue_record.dequeue_record_id][2] = True - def report_str(self, args): - """Return a string containing a text report for all records in the map""" - if len(self.enq_map) == 0: - return 'No enqueued records found.' - rstr = '%d enqueued records found' % len(self.enq_map) - if args.show_recovered_recs: - rstr += ":" - rid_list = self.enq_map.keys() - rid_list.sort() - for rid in rid_list: - journal_file, record, locked_flag = self.enq_map[rid] - rstr += '\n 0x%x:' % journal_file.file_header.file_num - rstr += record.to_string(args.show_xids, args.show_data, args.txtest) - if locked_flag: - rstr += ' [LOCKED]' - else: - rstr += '.' - return rstr - def unlock(self, journal_file, dequeue_record): - """Set the transaction lock for a given record_id to False""" - if dequeue_record.dequeue_record_id in self.enq_map: - if self.enq_map[dequeue_record.dequeue_record_id][2]: - self.enq_map[dequeue_record.dequeue_record_id][2] = False - else: - raise qlslibs.err.RecordNotLockedError(journal_file.file_header, dequeue_record) - else: - raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record) - -class TransactionMap(object): - """ - Map of open transactions used while recovering a QLS journal - """ - def __init__(self, enq_map): - self.txn_map = {} - self.enq_map = enq_map - def abort(self, xid): - """Perform an abort operation for the given xid record""" - for journal_file, record, _ in self.txn_map[xid]: - if isinstance(record, qlslibs.jrnl.DequeueRecord): - if self.enq_map.contains(record.dequeue_record_id): - self.enq_map.unlock(journal_file, record) - else: - journal_file.decr_enq_cnt(record) - del self.txn_map[xid] - def add(self, journal_file, record): - if record.xid is None: - raise qlslibs.err.NonTransactionalRecordError(journal_file.file_header, record, 'TransactionMap.add()') - if isinstance(record, qlslibs.jrnl.DequeueRecord): - try: - self.enq_map.lock(journal_file, record) - except qlslibs.err.RecordIdNotFoundError: - # Not in emap, look for rid in tmap - should not happen in practice - txn_op = self._find_record_id(record.xid, record.dequeue_record_id) - if txn_op != None: - if txn_op[2]: - raise qlslibs.err.AlreadyLockedError(journal_file.file_header, record) - txn_op[2] = True - if record.xid in self.txn_map: - self.txn_map[record.xid].append([journal_file, record, False]) # append to existing list - else: - self.txn_map[record.xid] = [[journal_file, record, False]] # create new list - def commit(self, xid): - """Perform a commit operation for the given xid record""" - mismatch_list = [] - for journal_file, record, lock in self.txn_map[xid]: - if isinstance(record, qlslibs.jrnl.EnqueueRecord): - self.enq_map.add(journal_file, record, lock) # Transfer enq to emap - else: - if self.enq_map.contains(record.dequeue_record_id): - self.enq_map.unlock(journal_file, record) - self.enq_map.delete(journal_file, record)[0].decr_enq_cnt(record) - else: - mismatch_list.append('0x%x' % record.dequeue_record_id) - del self.txn_map[xid] - return mismatch_list - def contains(self, xid): - """Return True if the xid exists in the map; False otherwise""" - return xid in self.txn_map - def delete(self, journal_file, transaction_record): - """Remove a transaction record from the map using either a commit or abort header""" - if transaction_record.magic[-1] == 'c': - return self.commit(transaction_record.xid) - if transaction_record.magic[-1] == 'a': - self.abort(transaction_record.xid) - else: - raise qlslibs.err.InvalidRecordTypeError(journal_file.file_header, transaction_record, - 'delete from Transaction Map') - def get(self, xid): - if xid in self.txn_map: - return self.txn_map[xid] - return None - def get_prepared_list(self): - """ - Prepared list is a map of xid(key) to one of None, True or False. These represent respectively: - None: prepared, but neither committed or aborted (interrupted before commit or abort) - False: prepared and aborted (interrupted before abort complete) - True: prepared and committed (interrupted before commit complete) - """ - prepared_list = {} - for xid in self.get_xid_list(): - for _, record, _ in self.txn_map[xid]: - if isinstance(record, qlslibs.jrnl.EnqueueRecord): - prepared_list[xid] = None - else: - prepared_list[xid] = record.is_transaction_complete_commit() - return prepared_list - def get_xid_list(self): - return self.txn_map.keys() - def report_str(self, args): - """Return a string containing a text report for all records in the map""" - if len(self.txn_map) == 0: - return 'No outstanding transactions found.' - rstr = '%d outstanding transaction(s)' % len(self.txn_map) - if args.show_recovered_recs: - rstr += ':' - for xid, op_list in self.txn_map.iteritems(): - rstr += '\n %s containing %d operations:' % (qlslibs.utils.format_xid(xid), len(op_list)) - for journal_file, record, _ in op_list: - rstr += '\n 0x%x:' % journal_file.file_header.file_num - rstr += record.to_string(args.show_xids, args.show_data, args.txtest) - else: - rstr += '.' - return rstr - def _find_record_id(self, xid, record_id): - """ Search for and return map list with supplied rid.""" - if xid in self.txn_map: - for txn_op in self.txn_map[xid]: - if txn_op[1].record_id == record_id: - return txn_op - for this_xid in self.txn_map.iterkeys(): - for txn_op in self.txn_map[this_xid]: - if txn_op[1].record_id == record_id: - return txn_op - return None - -class JournalStatistics(object): - """Journal statistics""" - def __init__(self): - self.total_record_count = 0 - self.transient_record_count = 0 - self.filler_record_count = 0 - self.enqueue_count = 0 - self.dequeue_count = 0 - self.transaction_record_count = 0 - self.transaction_enqueue_count = 0 - self.transaction_dequeue_count = 0 - self.transaction_commit_count = 0 - self.transaction_abort_count = 0 - self.transaction_operation_count = 0 - def __str__(self): - fstr = 'Total record count: %d\n' + \ - 'Transient record count: %d\n' + \ - 'Filler_record_count: %d\n' + \ - 'Enqueue_count: %d\n' + \ - 'Dequeue_count: %d\n' + \ - 'Transaction_record_count: %d\n' + \ - 'Transaction_enqueue_count: %d\n' + \ - 'Transaction_dequeue_count: %d\n' + \ - 'Transaction_commit_count: %d\n' + \ - 'Transaction_abort_count: %d\n' + \ - 'Transaction_operation_count: %d\n' - return fstr % (self.total_record_count, - self.transient_record_count, - self.filler_record_count, - self.enqueue_count, - self.dequeue_count, - self.transaction_record_count, - self.transaction_enqueue_count, - self.transaction_dequeue_count, - self.transaction_commit_count, - self.transaction_abort_count, - self.transaction_operation_count) - -class Journal(object): - """ - Instance of a Qpid Linear Store (QLS) journal. - """ - JRNL_SUFFIX = 'jrnl' - def __init__(self, directory, xid_prepared_list, args): - self.directory = directory - self.queue_name = os.path.basename(directory) - self.files = {} - self.file_num_list = None - self.file_num_itr = None - self.enq_map = EnqueueMap(self) - self.txn_map = TransactionMap(self.enq_map) - self.current_journal_file = None - self.first_rec_flag = None - self.statistics = JournalStatistics() - self.xid_prepared_list = xid_prepared_list # This is None for the TPL instance only - self.args = args - self.last_record_offset = None # TODO: Move into JournalFile - self.num_filler_records_required = None # TODO: Move into JournalFile - self.fill_to_offset = None - def add_record(self, record): - """Used for reconciling transactions only - called from JournalRecoveryManager._reconcile_transactions()""" - if isinstance(record, qlslibs.jrnl.EnqueueRecord) or isinstance(record, qlslibs.jrnl.DequeueRecord): - if record.xid_size > 0: - self.txn_map.add(self.current_journal_file, record) - else: - self.enq_map.add(self.current_journal_file, record, False) - elif isinstance(record, qlslibs.jrnl.TransactionRecord): - self.txn_map.delete(self.current_journal_file, record) - else: - raise qlslibs.err.InvalidRecordTypeError(self.current_journal_file, record, 'add to Journal') - def get_enq_map_record(self, rid): - return self.enq_map.get(rid) - def get_txn_map_record(self, xid): - return self.txn_map.get(xid) - def get_outstanding_txn_list(self): - return self.txn_map.get_xid_list() - def get_queue_name(self): - return self.queue_name - def recover(self, high_rid_counter): - print 'Recovering %s...' % self.queue_name, - self._analyze_files() - try: - while self._get_next_record(high_rid_counter): - pass - self._check_alignment() - except qlslibs.err.NoMoreFilesInJournalError: - print 'No more files in journal' - except qlslibs.err.FirstRecordOffsetMismatchError as err: - print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro actual=0x%08x expected=0x%08x' % \ - (err.get_expected_fro(), err.get_queue_name(), err.get_file_number(), err.get_record_offset(), - err.get_expected_fro()) - print 'done' - def reconcile_transactions(self, prepared_list, txn_flag): - xid_list = self.txn_map.get_xid_list() - if len(xid_list) > 0: - print self.queue_name, 'contains', len(xid_list), 'open transaction(s):' - for xid in xid_list: - if xid in prepared_list.keys(): - commit_flag = prepared_list[xid] - if commit_flag is None: - print ' ', qlslibs.utils.format_xid(xid), '- Assuming commit after prepare' - if txn_flag: - self.txn_map.commit(xid) - elif commit_flag: - print ' ', qlslibs.utils.format_xid(xid), '- Completing interrupted commit operation' - if txn_flag: - self.txn_map.commit(xid) - else: - print ' ', qlslibs.utils.format_xid(xid), '- Completing interrupted abort operation' - if txn_flag: - self.txn_map.abort(xid) - else: - print ' ', qlslibs.utils.format_xid(xid), '- Ignoring, not in prepared transaction list' - if txn_flag: - self.txn_map.abort(xid) - def report(self, args): - print 'Journal "%s":' % self.queue_name - print '=' * (11 + len(self.queue_name)) - if args.stats: - print str(self.statistics) - print self.enq_map.report_str(args) - print self.txn_map.report_str(args) - JournalFile.report_header() - for file_num in sorted(self.files.keys()): - self.files[file_num].report() - #TODO: move this to JournalFile, append to file info - if self.num_filler_records_required is not None and self.fill_to_offset is not None: - print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \ - (self.current_journal_file.file_header.file_num, self.last_record_offset, - self.num_filler_records_required, self.fill_to_offset) - print - #--- protected functions --- - def _analyze_files(self): - for dir_entry in os.listdir(self.directory): - dir_entry_bits = dir_entry.split('.') - if len(dir_entry_bits) == 2 and dir_entry_bits[1] == Journal.JRNL_SUFFIX: - fq_file_name = os.path.join(self.directory, dir_entry) - file_handle = open(fq_file_name) - args = qlslibs.utils.load_args(file_handle, qlslibs.jrnl.RecordHeader) - file_hdr = qlslibs.jrnl.FileHeader(*args) - file_hdr.init(file_handle, *qlslibs.utils.load_args(file_handle, qlslibs.jrnl.FileHeader)) - if file_hdr.is_header_valid(file_hdr): - file_hdr.load(file_handle) - if file_hdr.is_valid(False): - qlslibs.utils.skip(file_handle, - file_hdr.file_header_size_sblks * qlslibs.utils.DEFAULT_SBLK_SIZE) - self.files[file_hdr.file_num] = JournalFile(file_hdr) - self.file_num_list = sorted(self.files.keys()) - self.file_num_itr = iter(self.file_num_list) - def _check_alignment(self): # TODO: Move into JournalFile - if self.last_record_offset is None: # Empty file, _check_file() never run - return - remaining_sblks = self.last_record_offset % qlslibs.utils.DEFAULT_SBLK_SIZE - if remaining_sblks == 0: - self.num_filler_records_required = 0 - else: - self.num_filler_records_required = (qlslibs.utils.DEFAULT_SBLK_SIZE - remaining_sblks) / \ - qlslibs.utils.DEFAULT_DBLK_SIZE - self.fill_to_offset = self.last_record_offset + \ - (self.num_filler_records_required * qlslibs.utils.DEFAULT_DBLK_SIZE) - if self.args.show_recovery_recs or self.args.show_all_recs: - print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \ - (self.current_journal_file.file_header.file_num, self.last_record_offset, - self.num_filler_records_required, self.fill_to_offset) - def _check_file(self): - if self.current_journal_file is not None: - if not self.current_journal_file.file_header.is_end_of_file(): - return True - if self.current_journal_file.file_header.is_end_of_file(): - self.last_record_offset = self.current_journal_file.file_header.file_handle.tell() - if not self._get_next_file(): - return False - fhdr = self.current_journal_file.file_header - fhdr.file_handle.seek(fhdr.first_record_offset) - return True - def _get_next_file(self): - if self.current_journal_file is not None: - file_handle = self.current_journal_file.file_header.file_handle - if not file_handle.closed: # sanity check, should not be necessary - file_handle.close() - file_num = 0 - try: - while file_num == 0: - file_num = self.file_num_itr.next() - except StopIteration: - pass - if file_num == 0: - return False - self.current_journal_file = self.files[file_num] - self.first_rec_flag = True - if self.args.show_recovery_recs or self.args.show_all_recs: - file_header = self.current_journal_file.file_header - print '0x%x:%s' % (file_header.file_num, file_header.to_string()) - return True - def _get_next_record(self, high_rid_counter): - if not self._check_file(): - return False - self.last_record_offset = self.current_journal_file.file_header.file_handle.tell() - this_record = qlslibs.utils.load(self.current_journal_file.file_header.file_handle, qlslibs.jrnl.RecordHeader) - if not this_record.is_header_valid(self.current_journal_file.file_header): - return False - if self.first_rec_flag: - if this_record.file_offset != self.current_journal_file.file_header.first_record_offset: - raise qlslibs.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header, this_record) - self.first_rec_flag = False - self.statistics.total_record_count += 1 - start_journal_file = self.current_journal_file - if isinstance(this_record, qlslibs.jrnl.EnqueueRecord): - ok_flag = self._handle_enqueue_record(this_record, start_journal_file) - high_rid_counter.check(this_record.record_id) - if self.args.show_recovery_recs or self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, \ - this_record.to_string(self.args.show_xids, self.args.show_data, self.args.txtest)) - elif isinstance(this_record, qlslibs.jrnl.DequeueRecord): - ok_flag = self._handle_dequeue_record(this_record, start_journal_file) - high_rid_counter.check(this_record.record_id) - if self.args.show_recovery_recs or self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids, None, None)) - elif isinstance(this_record, qlslibs.jrnl.TransactionRecord): - ok_flag = self._handle_transaction_record(this_record, start_journal_file) - high_rid_counter.check(this_record.record_id) - if self.args.show_recovery_recs or self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids, None, None)) - else: - self.statistics.filler_record_count += 1 - ok_flag = True - if self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) - qlslibs.utils.skip(self.current_journal_file.file_header.file_handle, qlslibs.utils.DEFAULT_DBLK_SIZE) - return ok_flag - def _handle_enqueue_record(self, enqueue_record, start_journal_file): - while enqueue_record.load(self.current_journal_file.file_header.file_handle): - if not self._get_next_file(): - enqueue_record.truncated_flag = True - return False - if not enqueue_record.is_valid(start_journal_file): - return False - if enqueue_record.is_external() and enqueue_record.data != None: - raise qlslibs.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record) - if enqueue_record.is_transient(): - self.statistics.transient_record_count += 1 - return True - if enqueue_record.xid_size > 0: - self.txn_map.add(start_journal_file, enqueue_record) - self.statistics.transaction_operation_count += 1 - self.statistics.transaction_record_count += 1 - self.statistics.transaction_enqueue_count += 1 - else: - self.enq_map.add(start_journal_file, enqueue_record, False) - start_journal_file.incr_enq_cnt() - self.statistics.enqueue_count += 1 - return True - def _handle_dequeue_record(self, dequeue_record, start_journal_file): - while dequeue_record.load(self.current_journal_file.file_header.file_handle): - if not self._get_next_file(): - dequeue_record.truncated_flag = True - return False - if not dequeue_record.is_valid(start_journal_file): - return False - if dequeue_record.xid_size > 0: - if self.xid_prepared_list is None: # ie this is the TPL - dequeue_record.transaction_prepared_list_flag = True - elif not self.enq_map.contains(dequeue_record.dequeue_record_id): - dequeue_record.warnings.append('NOT IN EMAP') # Only for non-TPL records - self.txn_map.add(start_journal_file, dequeue_record) - self.statistics.transaction_operation_count += 1 - self.statistics.transaction_record_count += 1 - self.statistics.transaction_dequeue_count += 1 - else: - try: - self.enq_map.delete(start_journal_file, dequeue_record)[0].decr_enq_cnt(dequeue_record) - except qlslibs.err.RecordIdNotFoundError: - dequeue_record.warnings.append('NOT IN EMAP') - self.statistics.dequeue_count += 1 - return True - def _handle_transaction_record(self, transaction_record, start_journal_file): - while transaction_record.load(self.current_journal_file.file_header.file_handle): - if not self._get_next_file(): - transaction_record.truncated_flag = True - return False - if not transaction_record.is_valid(start_journal_file): - return False - if transaction_record.magic[-1] == 'a': # Abort - self.statistics.transaction_abort_count += 1 - elif transaction_record.magic[-1] == 'c': # Commit - self.statistics.transaction_commit_count += 1 - else: - raise InvalidRecordTypeError('Unknown transaction record magic \'%s\'' % transaction_record.magic) - if self.txn_map.contains(transaction_record.xid): - self.txn_map.delete(self.current_journal_file, transaction_record) - else: - transaction_record.warnings.append('NOT IN TMAP') -# if transaction_record.magic[-1] == 'c': # commits only -# self._txn_obj_list[hdr.xid] = hdr - self.statistics.transaction_record_count += 1 - return True - def _load_data(self, record): - while not record.is_complete: - record.load(self.current_journal_file.file_handle) - -class JournalFile(object): - def __init__(self, file_header): - self.file_header = file_header - self.enq_cnt = 0 - self.deq_cnt = 0 - self.num_filler_records_required = None - def incr_enq_cnt(self): - self.enq_cnt += 1 - def decr_enq_cnt(self, record): - if self.enq_cnt <= self.deq_cnt: - raise qlslibs.err.EnqueueCountUnderflowError(self.file_header, record) - self.deq_cnt += 1 - def get_enq_cnt(self): - return self.enq_cnt - self.deq_cnt - def is_outstanding_enq(self): - return self.enq_cnt > self.deq_cnt - @staticmethod - def report_header(): - print 'file_num enq_cnt p_no efp journal_file' - print '-------- ------- ---- ----- ------------' - def report(self): - comment = '<uninitialized>' if self.file_header.file_num == 0 else '' - file_num_str = '0x%x' % self.file_header.file_num - print '%8s %7d %4d %4dk %s %s' % (file_num_str, self.get_enq_cnt(), self.file_header.partition_num, - self.file_header.efp_data_size_kb, - os.path.basename(self.file_header.file_handle.name), comment) diff --git a/qpid/cpp/management/python/lib/qlslibs/efp.py b/qpid/cpp/management/python/lib/qlslibs/efp.py deleted file mode 100644 index 1c751c3d06..0000000000 --- a/qpid/cpp/management/python/lib/qlslibs/efp.py +++ /dev/null @@ -1,327 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Module: qlslibs.efp - -Contains empty file pool (EFP) classes. -""" - -import os -import os.path -import qlslibs.err -import shutil -import uuid - -class EfpManager(object): - """ - Top level class to analyze the Qpid Linear Store (QLS) directory for the partitions that make up the - Empty File Pool (EFP). - """ - def __init__(self, directory, disk_space_required_kb): - if not os.path.exists(directory): - raise qlslibs.err.InvalidQlsDirectoryNameError(directory) - self.directory = directory - self.disk_space_required_kb = disk_space_required_kb - self.efp_partitions = [] - self.efp_pools = {} - self.total_num_files = 0 - self.total_cum_file_size_kb = 0 - self.current_efp_partition = None - def add_file_pool(self, file_size_kb, num_files): - """ Add an EFP in the specified partition of the specified size containing the specified number of files """ - dir_name = EmptyFilePool.get_directory_name(file_size_kb) - print 'Adding pool \'%s\' to partition %s' % (dir_name, self.current_efp_partition.partition_number) - self.total_cum_file_size_kb += self.current_efp_partition.create_new_efp(file_size_kb, num_files) - self.total_num_files += num_files - def freshen_file_pool(self, file_size_kb, num_files): - """ Freshen an EFP in the specified partition and of the specified size to the specified number of files """ - if self.current_efp_partition is None: - partition_list = self.efp_partitions - partition_str = 'all partitions' - else: - partition_list = [self.current_efp_partition] - partition_str = 'partition %d' % self.current_efp_partition.partition_number - if file_size_kb is None: - pool_str = 'all pools' - else: - pool_str = 'pool \'%s\'' % EmptyFilePool.get_directory_name(int(file_size_kb)) - print 'Freshening %s in %s to %d files' % (pool_str, partition_str, num_files) - for self.current_efp_partition in partition_list: # Partition objects - if file_size_kb is None: - file_size_list = self.current_efp_partition.efp_pools.keys() - else: - file_size_list = ['%sk' % file_size_kb] - for file_size in file_size_list: - efp = self.current_efp_partition.efp_pools[file_size] - num_files_needed = num_files - efp.get_tot_file_count() - if num_files_needed > 0: - self.current_efp_partition.create_new_efp_files(qlslibs.utils.efp_directory_size(file_size), - num_files_needed) - else: - print ' WARNING: Pool %s in partition %s already contains %d files: no action taken' % \ - (self.current_efp_partition.efp_pools[file_size].size_str, - self.current_efp_partition.partition_number, efp.get_num_files()) - def remove_file_pool(self, file_size_kb): - """ Remove an existing EFP from the specified partition and of the specified size """ - dir_name = EmptyFilePool.get_directory_name(file_size_kb) - print 'Removing pool \'%s\' from partition %s' % (dir_name, self.current_efp_partition.partition_number) - self.efp_partitions.remove(self.current_efp_partition) - shutil.rmtree(os.path.join(self.current_efp_partition.efp_directory, dir_name)) - def report(self): - print 'Empty File Pool (EFP) report' - print '============================' - print 'Found', len(self.efp_partitions), 'partition(s)' - if (len(self.efp_partitions)) > 0: - sorted_efp_partitions = sorted(self.efp_partitions, key=lambda x: x.partition_number) - EfpPartition.print_report_table_header() - for ptn in sorted_efp_partitions: - ptn.print_report_table_line() - print - for ptn in sorted_efp_partitions: - ptn.report() - def run(self, arg_tup): - self._analyze_efp() - if arg_tup is not None: - _, arg_file_size, arg_num_files, arg_add, arg_remove, arg_freshen, arg_list = arg_tup - self._check_args(arg_tup) - if arg_add: - self.add_file_pool(int(arg_file_size), int(arg_num_files)) - if arg_remove: - self.remove_file_pool(int(arg_file_size)) - if arg_freshen: - self.freshen_file_pool(arg_file_size, int(arg_num_files)) - if arg_list: - self.report() - def _analyze_efp(self): - for dir_entry in os.listdir(self.directory): - try: - efp_partition = EfpPartition(os.path.join(self.directory, dir_entry), self.disk_space_required_kb) - efp_partition.scan() - self.efp_partitions.append(efp_partition) - for efpl in efp_partition.efp_pools.iterkeys(): - if efpl not in self.efp_pools: - self.efp_pools[efpl] = [] - self.efp_pools[efpl].append(efp_partition.efp_pools[efpl]) - self.total_num_files += efp_partition.tot_file_count - self.total_cum_file_size_kb += efp_partition.tot_file_size_kb - except qlslibs.err.InvalidPartitionDirectoryNameError: - pass - def _check_args(self, arg_tup): - """ Value check of args. The names of partitions and pools are validated against the discovered instances """ - arg_partition, arg_file_size, _, arg_add, arg_remove, arg_freshen, _ = arg_tup - if arg_partition is not None: - try: - if arg_partition[0] == 'p': # string partition name, eg 'p001' - partition_num = int(arg_partition[1:]) - else: # numeric partition, eg '1' - partition_num = int(arg_partition) - found = False - for partition in self.efp_partitions: - if partition.partition_number == partition_num: - self.current_efp_partition = partition - found = True - break - if not found: - raise qlslibs.err.PartitionDoesNotExistError(arg_partition) - except ValueError: - raise qlslibs.err.InvalidPartitionDirectoryNameError(arg_partition) - if self.current_efp_partition is not None: - pool_list = self.current_efp_partition.efp_pools.keys() - efp_directory_name = EmptyFilePool.get_directory_name(int(arg_file_size)) - if arg_add and efp_directory_name in pool_list: - raise qlslibs.err.PoolDirectoryAlreadyExistsError(efp_directory_name) - if (arg_remove or arg_freshen) and efp_directory_name not in pool_list: - raise qlslibs.err.PoolDirectoryDoesNotExistError(efp_directory_name) - -class EfpPartition(object): - """ - Class that represents a EFP partition. Each partition contains one or more Empty File Pools (EFPs). - """ - PTN_DIR_PREFIX = 'p' - EFP_DIR_NAME = 'efp' - def __init__(self, directory, disk_space_required_kb): - self.directory = directory - self.partition_number = None - self.efp_pools = {} - self.tot_file_count = 0 - self.tot_file_size_kb = 0 - self._validate_partition_directory(disk_space_required_kb) - def create_new_efp_files(self, file_size_kb, num_files): - """ Create new EFP files in this partition """ - dir_name = EmptyFilePool.get_directory_name(file_size_kb) - if dir_name in self.efp_pools.keys(): - efp = self.efp_pools[dir_name] - else: - efp = EmptyFilePool(os.path.join(self.directory, EfpPartition.EFP_DIR_NAME), dir_name) - this_tot_file_size_kb = efp.create_new_efp_files(num_files) - self.tot_file_size_kb += this_tot_file_size_kb - self.tot_file_count += num_files - return this_tot_file_size_kb - @staticmethod - def print_report_table_header(): - print 'p_no no_efp tot_files tot_size_kb directory' - print '---- ------ --------- ----------- ---------' - def print_report_table_line(self): - print '%4d %6d %9d %11d %s' % (self.partition_number, len(self.efp_pools), self.tot_file_count, - self.tot_file_size_kb, self.directory) - def report(self): - print 'Partition %s:' % os.path.basename(self.directory) - if len(self.efp_pools) > 0: - EmptyFilePool.print_report_table_header() - for dir_name in self.efp_pools.keys(): - self.efp_pools[dir_name].print_report_table_line() - else: - print '<empty - no EFPs found in this partition>' - print - def scan(self): - if os.path.exists(self.directory): - efp_dir = os.path.join(self.directory, EfpPartition.EFP_DIR_NAME) - for dir_entry in os.listdir(efp_dir): - efp = EmptyFilePool(os.path.join(efp_dir, dir_entry), self.partition_number) - efp.scan() - self.tot_file_count += efp.get_tot_file_count() - self.tot_file_size_kb += efp.get_tot_file_size_kb() - self.efp_pools[dir_entry] = efp - def _validate_partition_directory(self, disk_space_required_kb): - if os.path.basename(self.directory)[0] is not EfpPartition.PTN_DIR_PREFIX: - raise qlslibs.err.InvalidPartitionDirectoryNameError(self.directory) - try: - self.partition_number = int(os.path.basename(self.directory)[1:]) - except ValueError: - raise qlslibs.err.InvalidPartitionDirectoryNameError(self.directory) - if not qlslibs.utils.has_write_permission(self.directory): - raise qlslibs.err.WritePermissionError(self.directory) - if disk_space_required_kb is not None: - space_avail = qlslibs.utils.get_avail_disk_space(self.directory) - if space_avail < (disk_space_required_kb * 1024): - raise qlslibs.err.InsufficientSpaceOnDiskError(self.directory, space_avail, - disk_space_required_kb * 1024) - -class EmptyFilePool(object): - """ - Class that represents a single Empty File Pool within a partition. Each EFP contains pre-formatted linear store - journal files (but it may also be empty). - """ - EFP_DIR_SUFFIX = 'k' - EFP_JRNL_EXTENTION = '.jrnl' - EFP_INUSE_DIRNAME = 'in_use' - EFP_RETURNED_DIRNAME = 'returned' - def __init__(self, directory, partition_number): - self.base_dir_name = os.path.basename(directory) - self.directory = directory - self.partition_number = partition_number - self.data_size_kb = None - self.efp_files = [] - self.in_use_files = [] - self.returned_files = [] - self._validate_efp_directory() - def create_new_efp_files(self, num_files): - """ Create one or more new empty journal files of the prescribed size for this EFP """ - this_total_file_size = 0 - for _ in range(num_files): - this_total_file_size += self._create_new_efp_file() - return this_total_file_size - def get_directory(self): - return self.directory - @staticmethod - def get_directory_name(file_size_kb): - """ Static function to create an EFP directory name from the size of the files it contains """ - return '%dk' % file_size_kb - def get_tot_file_count(self): - return len(self.efp_files) - def get_tot_file_size_kb(self): - return self.data_size_kb * len(self.efp_files) - @staticmethod - def print_report_table_header(): - print ' ---------- efp ------------ --------- in_use ---------- -------- returned ---------' - print 'data_size_kb file_count tot_file_size_kb file_count tot_file_size_kb file_count tot_file_size_kb efp_directory' - print '------------ ---------- ---------------- ---------- ---------------- ---------- ---------------- -------------' - def print_report_table_line(self): - print '%12d %10d %16d %10d %16d %10d %16d %s' % (self.data_size_kb, len(self.efp_files), - self.data_size_kb * len(self.efp_files), - len(self.in_use_files), - self.data_size_kb * len(self.in_use_files), - len(self.returned_files), - self.data_size_kb * len(self.returned_files), - self.get_directory()) - def scan(self): - for efp_file in os.listdir(self.directory): - if efp_file == self.EFP_INUSE_DIRNAME: - for in_use_file in os.listdir(os.path.join(self.directory, self.EFP_INUSE_DIRNAME)): - self.in_use_files.append(in_use_file) - continue - if efp_file == self.EFP_RETURNED_DIRNAME: - for returned_file in os.listdir(os.path.join(self.directory, self.EFP_RETURNED_DIRNAME)): - self.returned_files.append(returned_file) - continue - if self._validate_efp_file(os.path.join(self.directory, efp_file)): - self.efp_files.append(efp_file) - def _add_efp_file(self, efp_file_name): - """ Add a single journal file of the appropriate size to this EFP. No file size check is made here. """ - self.efp_files.append(efp_file_name) - def _create_new_efp_file(self): - """ Create a single new empty journal file of the prescribed size for this EFP """ - file_name = str(uuid.uuid4()) + EmptyFilePool.EFP_JRNL_EXTENTION - file_header = qlslibs.jrnl.FileHeader(0, qlslibs.jrnl.FileHeader.MAGIC, qlslibs.utils.DEFAULT_RECORD_VERSION, - 0, 0, 0) - file_header.init(None, None, qlslibs.utils.DEFAULT_HEADER_SIZE_SBLKS, self.partition_number, self.data_size_kb, - 0, 0, 0, 0, 0) - efh = file_header.encode() - efh_bytes = len(efh) - file_handle = open(os.path.join(self.directory, file_name), 'wb') - file_handle.write(efh) - file_handle.write('\xff' * (qlslibs.utils.DEFAULT_SBLK_SIZE - efh_bytes)) - file_handle.write('\x00' * (int(self.data_size_kb) * 1024)) - file_handle.close() - fqfn = os.path.join(self.directory, file_name) - self._add_efp_file(fqfn) - return os.path.getsize(fqfn) - def _validate_efp_directory(self): - if self.base_dir_name[-1] is not EmptyFilePool.EFP_DIR_SUFFIX: - raise qlslibs.err.InvalidEfpDirectoryNameError(self.directory) - try: - self.data_size_kb = int(os.path.basename(self.base_dir_name)[:-1]) - except ValueError: - raise qlslibs.err.InvalidEfpDirectoryNameError(self.directory) - def _validate_efp_file(self, efp_file): - file_size = os.path.getsize(efp_file) - expected_file_size = (self.data_size_kb * 1024) + qlslibs.utils.DEFAULT_SBLK_SIZE - if file_size != expected_file_size: - print 'WARNING: File %s not of correct size (size=%d, expected=%d): Ignoring' % (efp_file, file_size, - expected_file_size) - return False - file_handle = open(efp_file) - args = qlslibs.utils.load_args(file_handle, qlslibs.jrnl.RecordHeader) - file_hdr = qlslibs.jrnl.FileHeader(*args) - file_hdr.init(file_handle, *qlslibs.utils.load_args(file_handle, qlslibs.jrnl.FileHeader)) - if not file_hdr.is_header_valid(file_hdr): - file_handle.close() - return False - file_hdr.load(file_handle) - file_handle.close() - if not file_hdr.is_valid(True): - return False - return True - - -# ============================================================================= - -if __name__ == "__main__": - print "This is a library, and cannot be executed." diff --git a/qpid/cpp/management/python/lib/qlslibs/err.py b/qpid/cpp/management/python/lib/qlslibs/err.py deleted file mode 100644 index f47632ce6a..0000000000 --- a/qpid/cpp/management/python/lib/qlslibs/err.py +++ /dev/null @@ -1,261 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Module: qlslibs.err - -Contains error classes. -""" - -# --- Parent classes - -class QlsError(Exception): - """Base error class for QLS errors and exceptions""" - def __init__(self): - Exception.__init__(self) - def __str__(self): - return '' - -class QlsRecordError(QlsError): - """Base error class for individual records""" - def __init__(self, file_header, record): - QlsError.__init__(self) - self.file_header = file_header - self.record = record - def get_expected_fro(self): - return self.file_header.first_record_offset - def get_file_number(self): - return self.file_header.file_num - def get_queue_name(self): - return self.file_header.queue_name - def get_record_id(self): - return self.record.record_id - def get_record_offset(self): - return self.record.file_offset - def __str__(self): - return 'queue="%s" file_id=0x%x record_offset=0x%x record_id=0x%x' % \ - (self.file_header.queue_name, self.file_header.file_num, self.record.file_offset, self.record.record_id) - -# --- Error classes - -class AlreadyLockedError(QlsRecordError): - """Transactional record to be locked is already locked""" - def __init__(self, file_header, record): - QlsRecordError.__init__(self, file_header, record) - def __str__(self): - return 'Transactional operation already locked in TransactionMap: ' + QlsRecordError.__str__(self) - -class DataSizeError(QlsError): - """Error class for Data size mismatch""" - def __init__(self, expected_size, actual_size, data_str): - QlsError.__init__(self) - self.expected_size = expected_size - self.actual_size = actual_size - self.xid_str = data_str - def __str__(self): - return 'Inconsistent data size: expected:%d; actual:%d; data="%s"' % \ - (self.expected_size, self.actual_size, self.data_str) - -class DuplicateRecordIdError(QlsRecordError): - """Duplicate Record Id in Enqueue Map""" - def __init__(self, file_header, record): - QlsRecordError.__init__(self, file_header, record) - def __str__(self): - return 'Duplicate Record Id in enqueue map: ' + QlsRecordError.__str__(self) - -class EnqueueCountUnderflowError(QlsRecordError): - """Attempted to decrement enqueue count past 0""" - def __init__(self, file_header, record): - QlsRecordError.__init__(self, file_header, record) - def __str__(self): - return 'Enqueue record count underflow: ' + QlsRecordError.__str__(self) - -class ExternalDataError(QlsRecordError): - """Data present in Enqueue record when external data flag is set""" - def __init__(self, file_header, record): - QlsRecordError.__init__(self, file_header, record) - def __str__(self): - return 'Data present in external data record: ' + QlsRecordError.__str__(self) - -class FirstRecordOffsetMismatchError(QlsRecordError): - """First Record Offset (FRO) does not match file header""" - def __init__(self, file_header, record): - QlsRecordError.__init__(self, file_header, record) - def __str__(self): - return 'First record offset mismatch: ' + QlsRecordError.__str__(self) + ' expected_offset=0x%x' % \ - self.file_header.first_record_offset - -class InsufficientSpaceOnDiskError(QlsError): - """Insufficient space on disk""" - def __init__(self, directory, space_avail, space_requried): - QlsError.__init__(self) - self.directory = directory - self.space_avail = space_avail - self.space_required = space_requried - def __str__(self): - return 'Insufficient space on disk: directory=%s; avail_space=%d required_space=%d' % \ - (self.directory, self.space_avail, self.space_required) - -class InvalidClassError(QlsError): - """Invalid class name or type""" - def __init__(self, class_name): - QlsError.__init__(self) - self.class_name = class_name - def __str__(self): - return 'Invalid class name "%s"' % self.class_name - -class InvalidEfpDirectoryNameError(QlsError): - """Invalid EFP directory name - should be NNNNk, where NNNN is a number (of any length)""" - def __init__(self, directory_name): - QlsError.__init__(self) - self.directory_name = directory_name - def __str__(self): - return 'Invalid EFP directory name "%s"' % self.directory_name - -#class InvalidFileSizeString(QlsError): -# """Invalid file size string""" -# def __init__(self, file_size_string): -# QlsError.__init__(self) -# self.file_size_string = file_size_string -# def __str__(self): -# return 'Invalid file size string "%s"' % self.file_size_string - -class InvalidPartitionDirectoryNameError(QlsError): - """Invalid EFP partition name - should be pNNN, where NNN is a 3-digit partition number""" - def __init__(self, directory_name): - QlsError.__init__(self) - self.directory_name = directory_name - def __str__(self): - return 'Invalid partition directory name "%s"' % self.directory_name - -class InvalidQlsDirectoryNameError(QlsError): - """Invalid QLS directory name""" - def __init__(self, directory_name): - QlsError.__init__(self) - self.directory_name = directory_name - def __str__(self): - return 'Invalid QLS directory name "%s"' % self.directory_name - -class InvalidRecordTypeError(QlsRecordError): - """Error class for any operation using an invalid record type""" - def __init__(self, file_header, record, error_msg): - QlsRecordError.__init__(self, file_header, record) - self.error_msg = error_msg - def __str__(self): - return 'Invalid record type: ' + QlsRecordError.__str__(self) + ':' + self.error_msg - -class InvalidRecordVersionError(QlsRecordError): - """Invalid record version""" - def __init__(self, file_header, record, expected_version): - QlsRecordError.__init__(self, file_header, record) - self.expected_version = expected_version - def __str__(self): - return 'Invalid record version: queue="%s" ' + QlsRecordError.__str__(self) + \ - ' ver_found=0x%x ver_expected=0x%x' % (self.record_header.version, self.expected_version) - -class NoMoreFilesInJournalError(QlsError): - """Raised when trying to obtain the next file in the journal and there are no more files""" - def __init__(self, queue_name): - QlsError.__init__(self) - self.queue_name = queue_name - def __str__(self): - return 'No more journal files in queue "%s"' % self.queue_name - -class NonTransactionalRecordError(QlsRecordError): - """Transactional operation on non-transactional record""" - def __init__(self, file_header, record, operation): - QlsRecordError.__init__(self, file_header, record) - self.operation = operation - def __str__(self): - return 'Transactional operation on non-transactional record: ' + QlsRecordError.__str__() + \ - ' operation=%s' % self.operation - -class PartitionDoesNotExistError(QlsError): - """Partition name does not exist on disk""" - def __init__(self, partition_directory): - QlsError.__init__(self) - self.partition_directory = partition_directory - def __str__(self): - return 'Partition %s does not exist' % self.partition_directory - -class PoolDirectoryAlreadyExistsError(QlsError): - """Pool directory already exists""" - def __init__(self, pool_directory): - QlsError.__init__(self) - self.pool_directory = pool_directory - def __str__(self): - return 'Pool directory %s already exists' % self.pool_directory - -class PoolDirectoryDoesNotExistError(QlsError): - """Pool directory does not exist""" - def __init__(self, pool_directory): - QlsError.__init__(self) - self.pool_directory = pool_directory - def __str__(self): - return 'Pool directory %s does not exist' % self.pool_directory - -class RecordIdNotFoundError(QlsRecordError): - """Record Id not found in enqueue map""" - def __init__(self, file_header, record): - QlsRecordError.__init__(self, file_header, record) - def __str__(self): - return 'Record Id not found in enqueue map: ' + QlsRecordError.__str__() - -class RecordNotLockedError(QlsRecordError): - """Record in enqueue map is not locked""" - def __init__(self, file_header, record): - QlsRecordError.__init__(self, file_header, record) - def __str__(self): - return 'Record in enqueue map is not locked: ' + QlsRecordError.__str__() - -class UnexpectedEndOfFileError(QlsError): - """The bytes read from a file is less than that expected""" - def __init__(self, size_read, size_expected, file_offset, file_name): - QlsError.__init__(self) - self.size_read = size_read - self.size_expected = size_expected - self.file_offset = file_offset - self.file_name = file_name - def __str__(self): - return 'Tried to read %d at offset %d in file "%s"; only read %d' % \ - (self.size_read, self.file_offset, self.file_name, self.size_expected) - -class WritePermissionError(QlsError): - """No write permission""" - def __init__(self, directory): - QlsError.__init__(self) - self.directory = directory - def __str__(self): - return 'No write permission in directory %s' % self.directory - -class XidSizeError(QlsError): - """Error class for Xid size mismatch""" - def __init__(self, expected_size, actual_size, xid_str): - QlsError.__init__(self) - self.expected_size = expected_size - self.actual_size = actual_size - self.xid_str = xid_str - def __str__(self): - return 'Inconsistent xid size: expected:%d; actual:%d; xid="%s"' % \ - (self.expected_size, self.actual_size, self.xid_str) - -# ============================================================================= - -if __name__ == "__main__": - print "This is a library, and cannot be executed." diff --git a/qpid/cpp/management/python/lib/qlslibs/jrnl.py b/qpid/cpp/management/python/lib/qlslibs/jrnl.py deleted file mode 100644 index 5e65890393..0000000000 --- a/qpid/cpp/management/python/lib/qlslibs/jrnl.py +++ /dev/null @@ -1,394 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Module: qlslibs.jrnl - -Contains journal record classes. -""" - -import qlslibs.err -import qlslibs.utils -import string -import struct -import time - -class RecordHeader(object): - FORMAT = '<4s2H2Q' - def __init__(self, file_offset, magic, version, user_flags, serial, record_id): - self.file_offset = file_offset - self.magic = magic - self.version = version - self.user_flags = user_flags - self.serial = serial - self.record_id = record_id - self.warnings = [] - self.truncated_flag = False - def encode(self): - return struct.pack(RecordHeader.FORMAT, self.magic, self.version, self.user_flags, self.serial, self.record_id) - def load(self, file_handle): - pass - @staticmethod - def discriminate(args): - """Use the last char in the header magic to determine the header type""" - return CLASSES.get(args[1][-1], RecordHeader) - def is_empty(self): - """Return True if this record is empty (ie has a magic of 0x0000""" - return self.magic == '\x00'*4 - def is_header_valid(self, file_header): - """Check that this record is valid""" - if self.is_empty(): - return False - if self.magic[:3] != 'QLS' or self.magic[3] not in ['a', 'c', 'd', 'e', 'f', 'x']: - return False - if self.magic[-1] != 'x': - if self.version != qlslibs.utils.DEFAULT_RECORD_VERSION: - raise qlslibs.err.InvalidRecordVersionError(file_header, self, qlslibs.utils.DEFAULT_RECORD_VERSION) - if self.serial != file_header.serial: - return False - return True - def to_rh_string(self): - """Return string representation of this header""" - if self.is_empty(): - return '0x%08x: <empty>' % (self.file_offset) - if self.magic[-1] == 'x': - return '0x%08x: [X]' % (self.file_offset) - if self.magic[-1] in ['a', 'c', 'd', 'e', 'f', 'x']: - return '0x%08x: [%c v=%d f=0x%04x rid=0x%x]' % \ - (self.file_offset, self.magic[-1].upper(), self.version, self.user_flags, self.record_id) - return '0x%08x: <error, unknown magic "%s" (possible overwrite boundary?)>' % (self.file_offset, self.magic) - def _get_warnings(self): - warn_str = '' - for warn in self.warnings: - warn_str += '<%s>' % warn - return warn_str - -class RecordTail(object): - FORMAT = '<4sL2Q' - def __init__(self, file_handle): # TODO - clumsy, only allows reading from disk. Move all disk stuff to laod() - self.file_offset = file_handle.tell() if file_handle is not None else 0 - self.complete = False - self.read_size = struct.calcsize(RecordTail.FORMAT) - self.fbin = file_handle.read(self.read_size) if file_handle is not None else None - self.valid_flag = None - if self.fbin is not None and len(self.fbin) >= self.read_size: - self.complete = True - self.xmagic, self.checksum, self.serial, self.record_id = struct.unpack(RecordTail.FORMAT, self.fbin) - def load(self, file_handle): - """Used to continue load of RecordTail object if it is split between files""" - if not self.is_complete: - self.fbin += file_handle.read(self.read_size - len(self.fbin)) - if (len(self.fbin)) >= self.read_size: - self.complete = True - self.xmagic, self.checksum, self.serial, self.record_id = struct.unpack(RecordTail.FORMAT, self.fbin) - def is_complete(self): - return self.complete - def is_valid(self, record): - if self.valid_flag is None: - if not self.complete: - return False - self.valid_flag = qlslibs.utils.inv_str(self.xmagic) == record.magic and \ - self.serial == record.serial and \ - self.record_id == record.record_id and \ - qlslibs.utils.adler32(record.checksum_encode()) == self.checksum - return self.valid_flag - def to_string(self): - """Return a string representation of the this RecordTail instance""" - if self.valid_flag is not None: - if not self.valid_flag: - return '[INVALID RECORD TAIL]' - magic = qlslibs.utils.inv_str(self.xmagic) - magic_char = magic[-1].upper() if magic[-1] in string.printable else '?' - return '[%c cs=0x%08x rid=0x%x]' % (magic_char, self.checksum, self.record_id) - -class FileHeader(RecordHeader): - FORMAT = '<2H4x5QH' - MAGIC = 'QLSf' - def init(self, file_handle, _, file_header_size_sblks, partition_num, efp_data_size_kb, first_record_offset, - timestamp_sec, timestamp_ns, file_num, queue_name_len): - self.file_handle = file_handle - self.file_header_size_sblks = file_header_size_sblks - self.partition_num = partition_num - self.efp_data_size_kb = efp_data_size_kb - self.first_record_offset = first_record_offset - self.timestamp_sec = timestamp_sec - self.timestamp_ns = timestamp_ns - self.file_num = file_num - self.queue_name_len = queue_name_len - self.queue_name = None - def encode(self): - if self.queue_name is None: - return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.file_header_size_sblks, \ - self.partition_num, self.efp_data_size_kb, \ - self.first_record_offset, self.timestamp_sec, \ - self.timestamp_ns, self.file_num, 0) - return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.file_header_size_sblks, self.partition_num, \ - self.efp_data_size_kb, self.first_record_offset, \ - self.timestamp_sec, self.timestamp_ns, self.file_num, \ - self.queue_name_len) + self.queue_name - def get_file_size(self): - """Sum of file header size and data size""" - return (self.file_header_size_sblks * qlslibs.utils.DEFAULT_SBLK_SIZE) + (self.efp_data_size_kb * 1024) - def load(self, file_handle): - self.queue_name = file_handle.read(self.queue_name_len) - def is_end_of_file(self): - return self.file_handle.tell() >= self.get_file_size() - def is_valid(self, is_empty): - if not RecordHeader.is_header_valid(self, self): - return False - if self.file_handle is None or self.file_header_size_sblks == 0 or self.partition_num == 0 or \ - self.efp_data_size_kb == 0: - return False - if is_empty: - if self.first_record_offset != 0 or self.timestamp_sec != 0 or self.timestamp_ns != 0 or \ - self.file_num != 0 or self.queue_name_len != 0: - return False - else: - if self.first_record_offset == 0 or self.timestamp_sec == 0 or self.timestamp_ns == 0 or \ - self.file_num == 0 or self.queue_name_len == 0: - return False - if self.queue_name is None: - return False - if len(self.queue_name) != self.queue_name_len: - return False - return True - def timestamp_str(self): - """Get the timestamp of this record in string format""" - now = time.gmtime(self.timestamp_sec) - fstr = '%%a %%b %%d %%H:%%M:%%S.%09d %%Y' % (self.timestamp_ns) - return time.strftime(fstr, now) - def to_string(self): - """Return a string representation of the this FileHeader instance""" - return '%s fnum=0x%x fro=0x%08x p=%d s=%dk t=%s %s' % (self.to_rh_string(), self.file_num, - self.first_record_offset, self.partition_num, - self.efp_data_size_kb, self.timestamp_str(), - self._get_warnings()) - -class EnqueueRecord(RecordHeader): - FORMAT = '<2Q' - MAGIC = 'QLSe' - EXTERNAL_FLAG_MASK = 0x20 - TRANSIENT_FLAG_MASK = 0x10 - def init(self, _, xid_size, data_size): - self.xid_size = xid_size - self.data_size = data_size - self.xid = None - self.xid_complete = False - self.data = None - self.data_complete = False - self.record_tail = None - def checksum_encode(self): # encode excluding record tail - cs_bytes = RecordHeader.encode(self) + struct.pack(self.FORMAT, self.xid_size, self.data_size) - if self.xid is not None: - cs_bytes += self.xid - if self.data is not None: - cs_bytes += self.data - return cs_bytes - def is_external(self): - return self.user_flags & EnqueueRecord.EXTERNAL_FLAG_MASK > 0 - def is_transient(self): - return self.user_flags & EnqueueRecord.TRANSIENT_FLAG_MASK > 0 - def is_valid(self, journal_file): - if not RecordHeader.is_header_valid(self, journal_file.file_header): - return False - if not (self.xid_complete and self.data_complete): - return False - if self.xid_size > 0 and len(self.xid) != self.xid_size: - return False - if self.data_size > 0 and len(self.data) != self.data_size: - return False - if self.xid_size > 0 or self.data_size > 0: - if self.record_tail is None: - return False - if not self.record_tail.is_valid(self): - return False - return True - def load(self, file_handle): - """Return True when load is incomplete and must be called again with new file handle""" - self.xid, self.xid_complete = qlslibs.utils.load_data(file_handle, self.xid, self.xid_size) - if not self.xid_complete: - return True - if self.is_external(): - self.data_complete = True - else: - self.data, self.data_complete = qlslibs.utils.load_data(file_handle, self.data, self.data_size) - if not self.data_complete: - return True - if self.xid_size > 0 or self.data_size > 0: - if self.record_tail is None: - self.record_tail = RecordTail(file_handle) - elif not self.record_tail.is_complete(): - self.record_tail.load(file_handle) # Continue loading partially loaded tail - if self.record_tail.is_complete(): - self.record_tail.is_valid(self) - else: - return True - return False - def to_string(self, show_xid_flag, show_data_flag, txtest_flag): - """Return a string representation of the this EnqueueRecord instance""" - if self.truncated_flag: - return '%s xid(%d) data(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), - self.xid_size, self.data_size) - if self.record_tail is None: - record_tail_str = '' - else: - record_tail_str = self.record_tail.to_string() - return '%s %s %s %s %s %s' % (self.to_rh_string(), - qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag), - qlslibs.utils.format_data(self.data, self.data_size, show_data_flag, txtest_flag), - record_tail_str, self._print_flags(), self._get_warnings()) - def _print_flags(self): - """Utility function to decode the flags field in the header and print a string representation""" - fstr = '' - if self.is_transient(): - fstr = '[TRANSIENT' - if self.is_external(): - if len(fstr) > 0: - fstr += ',EXTERNAL' - else: - fstr = '*EXTERNAL' - if len(fstr) > 0: - fstr += ']' - return fstr - -class DequeueRecord(RecordHeader): - FORMAT = '<2Q' - MAGIC = 'QLSd' - TXN_COMPLETE_COMMIT_FLAG = 0x10 - def init(self, _, dequeue_record_id, xid_size): - self.dequeue_record_id = dequeue_record_id - self.xid_size = xid_size - self.transaction_prepared_list_flag = False - self.xid = None - self.xid_complete = False - self.record_tail = None - def checksum_encode(self): # encode excluding record tail - return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.dequeue_record_id, self.xid_size) + \ - self.xid - def is_transaction_complete_commit(self): - return self.user_flags & DequeueRecord.TXN_COMPLETE_COMMIT_FLAG > 0 - def is_valid(self, journal_file): - if not RecordHeader.is_header_valid(self, journal_file.file_header): - return False - if self.xid_size > 0: - if not self.xid_complete: - return False - if self.xid_size > 0 and len(self.xid) != self.xid_size: - return False - if self.record_tail is None: - return False - if not self.record_tail.is_valid(self): - return False - return True - def load(self, file_handle): - """Return True when load is incomplete and must be called again with new file handle""" - self.xid, self.xid_complete = qlslibs.utils.load_data(file_handle, self.xid, self.xid_size) - if not self.xid_complete: - return True - if self.xid_size > 0: - if self.record_tail is None: - self.record_tail = RecordTail(file_handle) - elif not self.record_tail.is_complete(): - self.record_tail.load(file_handle) - if self.record_tail.is_complete(): - self.record_tail.is_valid(self) - else: - return True - return False - def to_string(self, show_xid_flag, _u1, _u2): - """Return a string representation of the this DequeueRecord instance""" - if self.truncated_flag: - return '%s xid(%d) drid=0x%x [Truncated, no more files in journal]' % (RecordHeader.__str__(self), - self.xid_size, - self.dequeue_record_id) - if self.record_tail is None: - record_tail_str = '' - else: - record_tail_str = self.record_tail.to_string() - return '%s drid=0x%x %s %s %s %s' % (self.to_rh_string(), self.dequeue_record_id, - qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag), - record_tail_str, self._print_flags(), self._get_warnings()) - def _print_flags(self): - """Utility function to decode the flags field in the header and print a string representation""" - if self.transaction_prepared_list_flag: - if self.is_transaction_complete_commit(): - return '[COMMIT]' - else: - return '[ABORT]' - return '' - -class TransactionRecord(RecordHeader): - FORMAT = '<Q' - MAGIC_ABORT = 'QLSa' - MAGIC_COMMIT = 'QLSc' - def init(self, _, xid_size): - self.xid_size = xid_size - self.xid = None - self.xid_complete = False - self.record_tail = None - def checksum_encode(self): # encode excluding record tail - return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.xid_size) + self.xid - def is_valid(self, journal_file): - if not RecordHeader.is_header_valid(self, journal_file.file_header): - return False - if not self.xid_complete or len(self.xid) != self.xid_size: - return False - if self.record_tail is None: - return False - if not self.record_tail.is_valid(self): - return False - return True - def load(self, file_handle): - """Return True when load is incomplete and must be called again with new file handle""" - self.xid, self.xid_complete = qlslibs.utils.load_data(file_handle, self.xid, self.xid_size) - if not self.xid_complete: - return True - if self.xid_size > 0: - if self.record_tail is None: - self.record_tail = RecordTail(file_handle) - elif not self.record_tail.is_complete(): - self.record_tail.load(file_handle) - if self.record_tail.is_complete(): - self.record_tail.is_valid(self) - else: - return True - return False - def to_string(self, show_xid_flag, _u1, _u2): - """Return a string representation of the this TransactionRecord instance""" - if self.truncated_flag: - return '%s xid(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), self.xid_size) - if self.record_tail is None: - record_tail_str = '' - else: - record_tail_str = self.record_tail.to_string() - return '%s %s %s %s' % (self.to_rh_string(), - qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag), - record_tail_str, self._get_warnings()) - -# ============================================================================= - -CLASSES = { - 'a': TransactionRecord, - 'c': TransactionRecord, - 'd': DequeueRecord, - 'e': EnqueueRecord, -} - -if __name__ == '__main__': - print 'This is a library, and cannot be executed.' diff --git a/qpid/cpp/management/python/lib/qlslibs/utils.py b/qpid/cpp/management/python/lib/qlslibs/utils.py deleted file mode 100644 index dfa760a839..0000000000 --- a/qpid/cpp/management/python/lib/qlslibs/utils.py +++ /dev/null @@ -1,216 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Module: qlslibs.utils - -Contains helper functions for qpid_qls_analyze. -""" - -import os -import qlslibs.jrnl -import stat -import string -import struct -import subprocess -import zlib - -DEFAULT_DBLK_SIZE = 128 -DEFAULT_SBLK_SIZE = 4096 # 32 dblks -DEFAULT_SBLK_SIZE_KB = DEFAULT_SBLK_SIZE / 1024 -DEFAULT_RECORD_VERSION = 2 -DEFAULT_HEADER_SIZE_SBLKS = 1 - -def adler32(data): - """return the adler32 checksum of data""" - return zlib.adler32(data) & 0xffffffff - -def create_record(magic, uflags, journal_file, record_id, dequeue_record_id, xid, data): - """Helper function to construct a record with xid, data (where applicable) and consistent tail with checksum""" - record_class = qlslibs.jrnl.CLASSES.get(magic[-1]) - record = record_class(0, magic, DEFAULT_RECORD_VERSION, uflags, journal_file.file_header.serial, record_id) - xid_length = len(xid) if xid is not None else 0 - if isinstance(record, qlslibs.jrnl.EnqueueRecord): - data_length = len(data) if data is not None else 0 - record.init(None, xid_length, data_length) - elif isinstance(record, qlslibs.jrnl.DequeueRecord): - record.init(None, dequeue_record_id, xid_length) - elif isinstance(record, qlslibs.jrnl.TransactionRecord): - record.init(None, xid_length) - else: - raise qlslibs.err.InvalidClassError(record.__class__.__name__) - if xid is not None: - record.xid = xid - record.xid_complete = True - if data is not None: - record.data = data - record.data_complete = True - record.record_tail = _mk_record_tail(record) - return record - -def efp_directory_size(directory_name): - """"Decode the directory name in the format NNNk to a numeric size, where NNN is a number string""" - try: - if directory_name[-1] == 'k': - return int(directory_name[:-1]) - except ValueError: - pass - return 0 - -def format_data(data, data_size=None, show_data_flag=True, txtest_flag=False): - """Format binary data for printing""" - return _format_binary(data, data_size, show_data_flag, 'data', qlslibs.err.DataSizeError, False, txtest_flag) - -def format_xid(xid, xid_size=None, show_xid_flag=True): - """Format binary XID for printing""" - return _format_binary(xid, xid_size, show_xid_flag, 'xid', qlslibs.err.XidSizeError, True, False) - -def get_avail_disk_space(path): - df_proc = subprocess.Popen(["df", path], stdout=subprocess.PIPE) - output = df_proc.communicate()[0] - return int(output.split('\n')[1].split()[3]) - -def has_write_permission(path): - stat_info = os.stat(path) - return bool(stat_info.st_mode & stat.S_IRGRP) - -def inv_str(in_string): - """Perform a binary 1's compliment (invert all bits) on a binary string""" - istr = '' - for index in range(0, len(in_string)): - istr += chr(~ord(in_string[index]) & 0xff) - return istr - -def load(file_handle, klass): - """Load a record of class klass from a file""" - args = load_args(file_handle, klass) - subclass = klass.discriminate(args) - result = subclass(*args) # create instance of record - if subclass != klass: - result.init(*load_args(file_handle, subclass)) - return result - -def load_args(file_handle, klass): - """Load the arguments from class klass""" - size = struct.calcsize(klass.FORMAT) - foffs = file_handle.tell(), - fbin = file_handle.read(size) - if len(fbin) != size: - raise qlslibs.err.UnexpectedEndOfFileError(len(fbin), size, foffs, file_handle.name) - return foffs + struct.unpack(klass.FORMAT, fbin) - -def load_data(file_handle, element, element_size): - """Read element_size bytes of binary data from file_handle into element""" - if element_size == 0: - return element, True - if element is None: - element = file_handle.read(element_size) - else: - read_size = element_size - len(element) - element += file_handle.read(read_size) - return element, len(element) == element_size - -def skip(file_handle, boundary): - """Read and discard disk bytes until the next multiple of boundary""" - if not file_handle.closed: - file_handle.read(_rem_bytes_in_block(file_handle, boundary)) - -#--- protected functions --- - -def _format_binary(bin_str, bin_size, show_bin_flag, prefix, err_class, hex_num_flag, txtest_flag): - """Format binary XID for printing""" - if bin_str is None and bin_size is not None: - if bin_size > 0: - raise err_class(bin_size, len(bin_str), bin_str) - return '' - if bin_size is None: - bin_size = len(bin_str) - elif bin_size != len(bin_str): - raise err_class(bin_size, len(bin_str), bin_str) - out_str = '%s(%d)' % (prefix, bin_size) - if txtest_flag: - out_str += '=\'%s\'' % _txtest_msg_str(bin_str) - elif show_bin_flag: - if _is_printable(bin_str): - binstr = '"%s"' % _split_str(bin_str) - elif hex_num_flag: - binstr = '0x%s' % _str_to_hex_num(bin_str) - else: - binstr = _hex_split_str(bin_str, 50, 10, 10) - out_str += '=\'%s\'' % binstr - return out_str - -def _hex_str(in_str, begin, end): - """Return a binary string as a hex string""" - hstr = '' - for index in range(begin, end): - if _is_printable(in_str[index]): - hstr += in_str[index] - else: - hstr += '\\%02x' % ord(in_str[index]) - return hstr - -def _hex_split_str(in_str, split_size, head_size, tail_size): - """Split a hex string into two parts separated by an ellipsis""" - if len(in_str) <= split_size: - return _hex_str(in_str, 0, len(in_str)) - return _hex_str(in_str, 0, head_size) + ' ... ' + _hex_str(in_str, len(in_str)-tail_size, len(in_str)) - -def _txtest_msg_str(bin_str): - """Extract the message number used in qpid-txtest""" - msg_index = bin_str.find('msg') - if msg_index >= 0: - end_index = bin_str.find('\x00', msg_index) - assert end_index >= 0 - return bin_str[msg_index:end_index] - return None - -def _is_printable(in_str): - """Return True if in_str in printable; False otherwise.""" - for this_char in in_str: - if this_char not in string.letters and this_char not in string.digits and this_char not in string.punctuation: - return False - return True - -def _mk_record_tail(record): - record_tail = qlslibs.jrnl.RecordTail(None) - record_tail.xmagic = inv_str(record.magic) - record_tail.checksum = adler32(record.checksum_encode()) - record_tail.serial = record.serial - record_tail.record_id = record.record_id - return record_tail - -def _rem_bytes_in_block(file_handle, block_size): - """Return the remaining bytes in a block""" - foffs = file_handle.tell() - return (_size_in_blocks(foffs, block_size) * block_size) - foffs - -def _size_in_blocks(size, block_size): - """Return the size in terms of data blocks""" - return int((size + block_size - 1) / block_size) - -def _split_str(in_str, split_size = 50): - """Split a string into two parts separated by an ellipsis if it is longer than split_size""" - if len(in_str) < split_size: - return in_str - return in_str[:25] + ' ... ' + in_str[-25:] - -def _str_to_hex_num(in_str): - """Turn a string into a hex number representation, little endian assumed (ie LSB is first, MSB is last)""" - return ''.join(x.encode('hex') for x in reversed(in_str)) |
