summaryrefslogtreecommitdiff
path: root/qpid/cpp/management/python/lib/qlslibs
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/management/python/lib/qlslibs')
-rw-r--r--qpid/cpp/management/python/lib/qlslibs/__init__.py19
-rw-r--r--qpid/cpp/management/python/lib/qlslibs/analyze.py606
-rw-r--r--qpid/cpp/management/python/lib/qlslibs/efp.py327
-rw-r--r--qpid/cpp/management/python/lib/qlslibs/err.py261
-rw-r--r--qpid/cpp/management/python/lib/qlslibs/jrnl.py394
-rw-r--r--qpid/cpp/management/python/lib/qlslibs/utils.py216
6 files changed, 0 insertions, 1823 deletions
diff --git a/qpid/cpp/management/python/lib/qlslibs/__init__.py b/qpid/cpp/management/python/lib/qlslibs/__init__.py
deleted file mode 100644
index d8a500d9d8..0000000000
--- a/qpid/cpp/management/python/lib/qlslibs/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
diff --git a/qpid/cpp/management/python/lib/qlslibs/analyze.py b/qpid/cpp/management/python/lib/qlslibs/analyze.py
deleted file mode 100644
index 8c5de05b9e..0000000000
--- a/qpid/cpp/management/python/lib/qlslibs/analyze.py
+++ /dev/null
@@ -1,606 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-Module: qlslibs.analyze
-
-Classes for recovery and analysis of a Qpid Linear Store (QLS).
-"""
-
-import os.path
-import qlslibs.err
-import qlslibs.jrnl
-import qlslibs.utils
-
-class HighCounter(object):
- def __init__(self):
- self.num = 0
- def check(self, num):
- if self.num < num:
- self.num = num
- def get(self):
- return self.num
- def get_next(self):
- self.num += 1
- return self.num
-
-class JournalRecoveryManager(object):
- TPL_DIR_NAME = 'tpl2'
- JRNL_DIR_NAME = 'jrnl2'
- def __init__(self, directory, args):
- if not os.path.exists(directory):
- raise qlslibs.err.InvalidQlsDirectoryNameError(directory)
- self.directory = directory
- self.args = args
- self.tpl = None
- self.journals = {}
- self.high_rid_counter = HighCounter()
- self.prepared_list = None
- def report(self):
- self._reconcile_transactions(self.prepared_list, self.args.txn)
- if self.tpl is not None:
- self.tpl.report(self.args)
- for queue_name in sorted(self.journals.keys()):
- self.journals[queue_name].report(self.args)
- def run(self):
- tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME)
- if os.path.exists(tpl_dir):
- self.tpl = Journal(tpl_dir, None, self.args)
- self.tpl.recover(self.high_rid_counter)
- if self.args.show_recovery_recs or self.args.show_all_recs:
- print
- jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME)
- self.prepared_list = self.tpl.txn_map.get_prepared_list() if self.tpl is not None else {}
- if os.path.exists(jrnl_dir):
- for dir_entry in sorted(os.listdir(jrnl_dir)):
- jrnl = Journal(os.path.join(jrnl_dir, dir_entry), self.prepared_list, self.args)
- jrnl.recover(self.high_rid_counter)
- self.journals[jrnl.get_queue_name()] = jrnl
- if self.args.show_recovery_recs or self.args.show_all_recs:
- print
- print
- def _reconcile_transactions(self, prepared_list, txn_flag):
- print 'Transaction reconciliation report:'
- print '=================================='
- print 'Transaction Prepared List (TPL) contains %d open transaction(s):' % len(prepared_list)
- for xid in prepared_list.keys():
- commit_flag = prepared_list[xid]
- if commit_flag is None:
- status = '[Prepared, neither committed nor aborted - assuming commit]'
- elif commit_flag:
- status = '[Prepared, but interrupted during commit phase]'
- else:
- status = '[Prepared, but interrupted during abort phase]'
- print ' ', qlslibs.utils.format_xid(xid), status
- if prepared_list[xid] is None: # Prepared, but not committed or aborted
- enqueue_record = self.tpl.get_txn_map_record(xid)[0][1]
- dequeue_record = qlslibs.utils.create_record(qlslibs.jrnl.DequeueRecord.MAGIC, \
- qlslibs.jrnl.DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \
- self.tpl.current_journal_file, \
- self.high_rid_counter.get_next(), \
- enqueue_record.record_id, xid, None)
- if txn_flag:
- self.tpl.add_record(dequeue_record)
- print
- print 'Open transactions found in queues:'
- print '----------------------------------'
- for queue_name in sorted(self.journals.keys()):
- self.journals[queue_name].reconcile_transactions(prepared_list, txn_flag)
- print
- if len(prepared_list) > 0:
- print 'Creating commit records for the following prepared transactions in TPL:'
- for xid in prepared_list.keys():
- print ' ', qlslibs.utils.format_xid(xid)
- transaction_record = qlslibs.utils.create_record(qlslibs.jrnl.TransactionRecord.MAGIC_COMMIT, 0, \
- self.tpl.current_journal_file, \
- self.high_rid_counter.get_next(), None, xid, None)
- if txn_flag:
- self.tpl.add_record(transaction_record)
- print
-
-class EnqueueMap(object):
- """
- Map of enqueued records in a QLS journal
- """
- def __init__(self, journal):
- self.journal = journal
- self.enq_map = {}
- def add(self, journal_file, enq_record, locked_flag):
- if enq_record.record_id in self.enq_map:
- raise qlslibs.err.DuplicateRecordIdError(self.journal.current_file_header, enq_record)
- self.enq_map[enq_record.record_id] = [journal_file, enq_record, locked_flag]
- def contains(self, rid):
- """Return True if the map contains the given rid"""
- return rid in self.enq_map
- def delete(self, journal_file, deq_record):
- if deq_record.dequeue_record_id in self.enq_map:
- enq_list = self.enq_map[deq_record.dequeue_record_id]
- del self.enq_map[deq_record.dequeue_record_id]
- return enq_list
- else:
- raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, deq_record)
- def get(self, record_id):
- if record_id in self.enq_map:
- return self.enq_map[record_id]
- return None
- def lock(self, journal_file, dequeue_record):
- if dequeue_record.dequeue_record_id not in self.enq_map:
- raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record)
- self.enq_map[dequeue_record.dequeue_record_id][2] = True
- def report_str(self, args):
- """Return a string containing a text report for all records in the map"""
- if len(self.enq_map) == 0:
- return 'No enqueued records found.'
- rstr = '%d enqueued records found' % len(self.enq_map)
- if args.show_recovered_recs:
- rstr += ":"
- rid_list = self.enq_map.keys()
- rid_list.sort()
- for rid in rid_list:
- journal_file, record, locked_flag = self.enq_map[rid]
- rstr += '\n 0x%x:' % journal_file.file_header.file_num
- rstr += record.to_string(args.show_xids, args.show_data, args.txtest)
- if locked_flag:
- rstr += ' [LOCKED]'
- else:
- rstr += '.'
- return rstr
- def unlock(self, journal_file, dequeue_record):
- """Set the transaction lock for a given record_id to False"""
- if dequeue_record.dequeue_record_id in self.enq_map:
- if self.enq_map[dequeue_record.dequeue_record_id][2]:
- self.enq_map[dequeue_record.dequeue_record_id][2] = False
- else:
- raise qlslibs.err.RecordNotLockedError(journal_file.file_header, dequeue_record)
- else:
- raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record)
-
-class TransactionMap(object):
- """
- Map of open transactions used while recovering a QLS journal
- """
- def __init__(self, enq_map):
- self.txn_map = {}
- self.enq_map = enq_map
- def abort(self, xid):
- """Perform an abort operation for the given xid record"""
- for journal_file, record, _ in self.txn_map[xid]:
- if isinstance(record, qlslibs.jrnl.DequeueRecord):
- if self.enq_map.contains(record.dequeue_record_id):
- self.enq_map.unlock(journal_file, record)
- else:
- journal_file.decr_enq_cnt(record)
- del self.txn_map[xid]
- def add(self, journal_file, record):
- if record.xid is None:
- raise qlslibs.err.NonTransactionalRecordError(journal_file.file_header, record, 'TransactionMap.add()')
- if isinstance(record, qlslibs.jrnl.DequeueRecord):
- try:
- self.enq_map.lock(journal_file, record)
- except qlslibs.err.RecordIdNotFoundError:
- # Not in emap, look for rid in tmap - should not happen in practice
- txn_op = self._find_record_id(record.xid, record.dequeue_record_id)
- if txn_op != None:
- if txn_op[2]:
- raise qlslibs.err.AlreadyLockedError(journal_file.file_header, record)
- txn_op[2] = True
- if record.xid in self.txn_map:
- self.txn_map[record.xid].append([journal_file, record, False]) # append to existing list
- else:
- self.txn_map[record.xid] = [[journal_file, record, False]] # create new list
- def commit(self, xid):
- """Perform a commit operation for the given xid record"""
- mismatch_list = []
- for journal_file, record, lock in self.txn_map[xid]:
- if isinstance(record, qlslibs.jrnl.EnqueueRecord):
- self.enq_map.add(journal_file, record, lock) # Transfer enq to emap
- else:
- if self.enq_map.contains(record.dequeue_record_id):
- self.enq_map.unlock(journal_file, record)
- self.enq_map.delete(journal_file, record)[0].decr_enq_cnt(record)
- else:
- mismatch_list.append('0x%x' % record.dequeue_record_id)
- del self.txn_map[xid]
- return mismatch_list
- def contains(self, xid):
- """Return True if the xid exists in the map; False otherwise"""
- return xid in self.txn_map
- def delete(self, journal_file, transaction_record):
- """Remove a transaction record from the map using either a commit or abort header"""
- if transaction_record.magic[-1] == 'c':
- return self.commit(transaction_record.xid)
- if transaction_record.magic[-1] == 'a':
- self.abort(transaction_record.xid)
- else:
- raise qlslibs.err.InvalidRecordTypeError(journal_file.file_header, transaction_record,
- 'delete from Transaction Map')
- def get(self, xid):
- if xid in self.txn_map:
- return self.txn_map[xid]
- return None
- def get_prepared_list(self):
- """
- Prepared list is a map of xid(key) to one of None, True or False. These represent respectively:
- None: prepared, but neither committed or aborted (interrupted before commit or abort)
- False: prepared and aborted (interrupted before abort complete)
- True: prepared and committed (interrupted before commit complete)
- """
- prepared_list = {}
- for xid in self.get_xid_list():
- for _, record, _ in self.txn_map[xid]:
- if isinstance(record, qlslibs.jrnl.EnqueueRecord):
- prepared_list[xid] = None
- else:
- prepared_list[xid] = record.is_transaction_complete_commit()
- return prepared_list
- def get_xid_list(self):
- return self.txn_map.keys()
- def report_str(self, args):
- """Return a string containing a text report for all records in the map"""
- if len(self.txn_map) == 0:
- return 'No outstanding transactions found.'
- rstr = '%d outstanding transaction(s)' % len(self.txn_map)
- if args.show_recovered_recs:
- rstr += ':'
- for xid, op_list in self.txn_map.iteritems():
- rstr += '\n %s containing %d operations:' % (qlslibs.utils.format_xid(xid), len(op_list))
- for journal_file, record, _ in op_list:
- rstr += '\n 0x%x:' % journal_file.file_header.file_num
- rstr += record.to_string(args.show_xids, args.show_data, args.txtest)
- else:
- rstr += '.'
- return rstr
- def _find_record_id(self, xid, record_id):
- """ Search for and return map list with supplied rid."""
- if xid in self.txn_map:
- for txn_op in self.txn_map[xid]:
- if txn_op[1].record_id == record_id:
- return txn_op
- for this_xid in self.txn_map.iterkeys():
- for txn_op in self.txn_map[this_xid]:
- if txn_op[1].record_id == record_id:
- return txn_op
- return None
-
-class JournalStatistics(object):
- """Journal statistics"""
- def __init__(self):
- self.total_record_count = 0
- self.transient_record_count = 0
- self.filler_record_count = 0
- self.enqueue_count = 0
- self.dequeue_count = 0
- self.transaction_record_count = 0
- self.transaction_enqueue_count = 0
- self.transaction_dequeue_count = 0
- self.transaction_commit_count = 0
- self.transaction_abort_count = 0
- self.transaction_operation_count = 0
- def __str__(self):
- fstr = 'Total record count: %d\n' + \
- 'Transient record count: %d\n' + \
- 'Filler_record_count: %d\n' + \
- 'Enqueue_count: %d\n' + \
- 'Dequeue_count: %d\n' + \
- 'Transaction_record_count: %d\n' + \
- 'Transaction_enqueue_count: %d\n' + \
- 'Transaction_dequeue_count: %d\n' + \
- 'Transaction_commit_count: %d\n' + \
- 'Transaction_abort_count: %d\n' + \
- 'Transaction_operation_count: %d\n'
- return fstr % (self.total_record_count,
- self.transient_record_count,
- self.filler_record_count,
- self.enqueue_count,
- self.dequeue_count,
- self.transaction_record_count,
- self.transaction_enqueue_count,
- self.transaction_dequeue_count,
- self.transaction_commit_count,
- self.transaction_abort_count,
- self.transaction_operation_count)
-
-class Journal(object):
- """
- Instance of a Qpid Linear Store (QLS) journal.
- """
- JRNL_SUFFIX = 'jrnl'
- def __init__(self, directory, xid_prepared_list, args):
- self.directory = directory
- self.queue_name = os.path.basename(directory)
- self.files = {}
- self.file_num_list = None
- self.file_num_itr = None
- self.enq_map = EnqueueMap(self)
- self.txn_map = TransactionMap(self.enq_map)
- self.current_journal_file = None
- self.first_rec_flag = None
- self.statistics = JournalStatistics()
- self.xid_prepared_list = xid_prepared_list # This is None for the TPL instance only
- self.args = args
- self.last_record_offset = None # TODO: Move into JournalFile
- self.num_filler_records_required = None # TODO: Move into JournalFile
- self.fill_to_offset = None
- def add_record(self, record):
- """Used for reconciling transactions only - called from JournalRecoveryManager._reconcile_transactions()"""
- if isinstance(record, qlslibs.jrnl.EnqueueRecord) or isinstance(record, qlslibs.jrnl.DequeueRecord):
- if record.xid_size > 0:
- self.txn_map.add(self.current_journal_file, record)
- else:
- self.enq_map.add(self.current_journal_file, record, False)
- elif isinstance(record, qlslibs.jrnl.TransactionRecord):
- self.txn_map.delete(self.current_journal_file, record)
- else:
- raise qlslibs.err.InvalidRecordTypeError(self.current_journal_file, record, 'add to Journal')
- def get_enq_map_record(self, rid):
- return self.enq_map.get(rid)
- def get_txn_map_record(self, xid):
- return self.txn_map.get(xid)
- def get_outstanding_txn_list(self):
- return self.txn_map.get_xid_list()
- def get_queue_name(self):
- return self.queue_name
- def recover(self, high_rid_counter):
- print 'Recovering %s...' % self.queue_name,
- self._analyze_files()
- try:
- while self._get_next_record(high_rid_counter):
- pass
- self._check_alignment()
- except qlslibs.err.NoMoreFilesInJournalError:
- print 'No more files in journal'
- except qlslibs.err.FirstRecordOffsetMismatchError as err:
- print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro actual=0x%08x expected=0x%08x' % \
- (err.get_expected_fro(), err.get_queue_name(), err.get_file_number(), err.get_record_offset(),
- err.get_expected_fro())
- print 'done'
- def reconcile_transactions(self, prepared_list, txn_flag):
- xid_list = self.txn_map.get_xid_list()
- if len(xid_list) > 0:
- print self.queue_name, 'contains', len(xid_list), 'open transaction(s):'
- for xid in xid_list:
- if xid in prepared_list.keys():
- commit_flag = prepared_list[xid]
- if commit_flag is None:
- print ' ', qlslibs.utils.format_xid(xid), '- Assuming commit after prepare'
- if txn_flag:
- self.txn_map.commit(xid)
- elif commit_flag:
- print ' ', qlslibs.utils.format_xid(xid), '- Completing interrupted commit operation'
- if txn_flag:
- self.txn_map.commit(xid)
- else:
- print ' ', qlslibs.utils.format_xid(xid), '- Completing interrupted abort operation'
- if txn_flag:
- self.txn_map.abort(xid)
- else:
- print ' ', qlslibs.utils.format_xid(xid), '- Ignoring, not in prepared transaction list'
- if txn_flag:
- self.txn_map.abort(xid)
- def report(self, args):
- print 'Journal "%s":' % self.queue_name
- print '=' * (11 + len(self.queue_name))
- if args.stats:
- print str(self.statistics)
- print self.enq_map.report_str(args)
- print self.txn_map.report_str(args)
- JournalFile.report_header()
- for file_num in sorted(self.files.keys()):
- self.files[file_num].report()
- #TODO: move this to JournalFile, append to file info
- if self.num_filler_records_required is not None and self.fill_to_offset is not None:
- print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \
- (self.current_journal_file.file_header.file_num, self.last_record_offset,
- self.num_filler_records_required, self.fill_to_offset)
- print
- #--- protected functions ---
- def _analyze_files(self):
- for dir_entry in os.listdir(self.directory):
- dir_entry_bits = dir_entry.split('.')
- if len(dir_entry_bits) == 2 and dir_entry_bits[1] == Journal.JRNL_SUFFIX:
- fq_file_name = os.path.join(self.directory, dir_entry)
- file_handle = open(fq_file_name)
- args = qlslibs.utils.load_args(file_handle, qlslibs.jrnl.RecordHeader)
- file_hdr = qlslibs.jrnl.FileHeader(*args)
- file_hdr.init(file_handle, *qlslibs.utils.load_args(file_handle, qlslibs.jrnl.FileHeader))
- if file_hdr.is_header_valid(file_hdr):
- file_hdr.load(file_handle)
- if file_hdr.is_valid(False):
- qlslibs.utils.skip(file_handle,
- file_hdr.file_header_size_sblks * qlslibs.utils.DEFAULT_SBLK_SIZE)
- self.files[file_hdr.file_num] = JournalFile(file_hdr)
- self.file_num_list = sorted(self.files.keys())
- self.file_num_itr = iter(self.file_num_list)
- def _check_alignment(self): # TODO: Move into JournalFile
- if self.last_record_offset is None: # Empty file, _check_file() never run
- return
- remaining_sblks = self.last_record_offset % qlslibs.utils.DEFAULT_SBLK_SIZE
- if remaining_sblks == 0:
- self.num_filler_records_required = 0
- else:
- self.num_filler_records_required = (qlslibs.utils.DEFAULT_SBLK_SIZE - remaining_sblks) / \
- qlslibs.utils.DEFAULT_DBLK_SIZE
- self.fill_to_offset = self.last_record_offset + \
- (self.num_filler_records_required * qlslibs.utils.DEFAULT_DBLK_SIZE)
- if self.args.show_recovery_recs or self.args.show_all_recs:
- print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \
- (self.current_journal_file.file_header.file_num, self.last_record_offset,
- self.num_filler_records_required, self.fill_to_offset)
- def _check_file(self):
- if self.current_journal_file is not None:
- if not self.current_journal_file.file_header.is_end_of_file():
- return True
- if self.current_journal_file.file_header.is_end_of_file():
- self.last_record_offset = self.current_journal_file.file_header.file_handle.tell()
- if not self._get_next_file():
- return False
- fhdr = self.current_journal_file.file_header
- fhdr.file_handle.seek(fhdr.first_record_offset)
- return True
- def _get_next_file(self):
- if self.current_journal_file is not None:
- file_handle = self.current_journal_file.file_header.file_handle
- if not file_handle.closed: # sanity check, should not be necessary
- file_handle.close()
- file_num = 0
- try:
- while file_num == 0:
- file_num = self.file_num_itr.next()
- except StopIteration:
- pass
- if file_num == 0:
- return False
- self.current_journal_file = self.files[file_num]
- self.first_rec_flag = True
- if self.args.show_recovery_recs or self.args.show_all_recs:
- file_header = self.current_journal_file.file_header
- print '0x%x:%s' % (file_header.file_num, file_header.to_string())
- return True
- def _get_next_record(self, high_rid_counter):
- if not self._check_file():
- return False
- self.last_record_offset = self.current_journal_file.file_header.file_handle.tell()
- this_record = qlslibs.utils.load(self.current_journal_file.file_header.file_handle, qlslibs.jrnl.RecordHeader)
- if not this_record.is_header_valid(self.current_journal_file.file_header):
- return False
- if self.first_rec_flag:
- if this_record.file_offset != self.current_journal_file.file_header.first_record_offset:
- raise qlslibs.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header, this_record)
- self.first_rec_flag = False
- self.statistics.total_record_count += 1
- start_journal_file = self.current_journal_file
- if isinstance(this_record, qlslibs.jrnl.EnqueueRecord):
- ok_flag = self._handle_enqueue_record(this_record, start_journal_file)
- high_rid_counter.check(this_record.record_id)
- if self.args.show_recovery_recs or self.args.show_all_recs:
- print '0x%x:%s' % (start_journal_file.file_header.file_num, \
- this_record.to_string(self.args.show_xids, self.args.show_data, self.args.txtest))
- elif isinstance(this_record, qlslibs.jrnl.DequeueRecord):
- ok_flag = self._handle_dequeue_record(this_record, start_journal_file)
- high_rid_counter.check(this_record.record_id)
- if self.args.show_recovery_recs or self.args.show_all_recs:
- print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids, None, None))
- elif isinstance(this_record, qlslibs.jrnl.TransactionRecord):
- ok_flag = self._handle_transaction_record(this_record, start_journal_file)
- high_rid_counter.check(this_record.record_id)
- if self.args.show_recovery_recs or self.args.show_all_recs:
- print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids, None, None))
- else:
- self.statistics.filler_record_count += 1
- ok_flag = True
- if self.args.show_all_recs:
- print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record)
- qlslibs.utils.skip(self.current_journal_file.file_header.file_handle, qlslibs.utils.DEFAULT_DBLK_SIZE)
- return ok_flag
- def _handle_enqueue_record(self, enqueue_record, start_journal_file):
- while enqueue_record.load(self.current_journal_file.file_header.file_handle):
- if not self._get_next_file():
- enqueue_record.truncated_flag = True
- return False
- if not enqueue_record.is_valid(start_journal_file):
- return False
- if enqueue_record.is_external() and enqueue_record.data != None:
- raise qlslibs.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record)
- if enqueue_record.is_transient():
- self.statistics.transient_record_count += 1
- return True
- if enqueue_record.xid_size > 0:
- self.txn_map.add(start_journal_file, enqueue_record)
- self.statistics.transaction_operation_count += 1
- self.statistics.transaction_record_count += 1
- self.statistics.transaction_enqueue_count += 1
- else:
- self.enq_map.add(start_journal_file, enqueue_record, False)
- start_journal_file.incr_enq_cnt()
- self.statistics.enqueue_count += 1
- return True
- def _handle_dequeue_record(self, dequeue_record, start_journal_file):
- while dequeue_record.load(self.current_journal_file.file_header.file_handle):
- if not self._get_next_file():
- dequeue_record.truncated_flag = True
- return False
- if not dequeue_record.is_valid(start_journal_file):
- return False
- if dequeue_record.xid_size > 0:
- if self.xid_prepared_list is None: # ie this is the TPL
- dequeue_record.transaction_prepared_list_flag = True
- elif not self.enq_map.contains(dequeue_record.dequeue_record_id):
- dequeue_record.warnings.append('NOT IN EMAP') # Only for non-TPL records
- self.txn_map.add(start_journal_file, dequeue_record)
- self.statistics.transaction_operation_count += 1
- self.statistics.transaction_record_count += 1
- self.statistics.transaction_dequeue_count += 1
- else:
- try:
- self.enq_map.delete(start_journal_file, dequeue_record)[0].decr_enq_cnt(dequeue_record)
- except qlslibs.err.RecordIdNotFoundError:
- dequeue_record.warnings.append('NOT IN EMAP')
- self.statistics.dequeue_count += 1
- return True
- def _handle_transaction_record(self, transaction_record, start_journal_file):
- while transaction_record.load(self.current_journal_file.file_header.file_handle):
- if not self._get_next_file():
- transaction_record.truncated_flag = True
- return False
- if not transaction_record.is_valid(start_journal_file):
- return False
- if transaction_record.magic[-1] == 'a': # Abort
- self.statistics.transaction_abort_count += 1
- elif transaction_record.magic[-1] == 'c': # Commit
- self.statistics.transaction_commit_count += 1
- else:
- raise InvalidRecordTypeError('Unknown transaction record magic \'%s\'' % transaction_record.magic)
- if self.txn_map.contains(transaction_record.xid):
- self.txn_map.delete(self.current_journal_file, transaction_record)
- else:
- transaction_record.warnings.append('NOT IN TMAP')
-# if transaction_record.magic[-1] == 'c': # commits only
-# self._txn_obj_list[hdr.xid] = hdr
- self.statistics.transaction_record_count += 1
- return True
- def _load_data(self, record):
- while not record.is_complete:
- record.load(self.current_journal_file.file_handle)
-
-class JournalFile(object):
- def __init__(self, file_header):
- self.file_header = file_header
- self.enq_cnt = 0
- self.deq_cnt = 0
- self.num_filler_records_required = None
- def incr_enq_cnt(self):
- self.enq_cnt += 1
- def decr_enq_cnt(self, record):
- if self.enq_cnt <= self.deq_cnt:
- raise qlslibs.err.EnqueueCountUnderflowError(self.file_header, record)
- self.deq_cnt += 1
- def get_enq_cnt(self):
- return self.enq_cnt - self.deq_cnt
- def is_outstanding_enq(self):
- return self.enq_cnt > self.deq_cnt
- @staticmethod
- def report_header():
- print 'file_num enq_cnt p_no efp journal_file'
- print '-------- ------- ---- ----- ------------'
- def report(self):
- comment = '<uninitialized>' if self.file_header.file_num == 0 else ''
- file_num_str = '0x%x' % self.file_header.file_num
- print '%8s %7d %4d %4dk %s %s' % (file_num_str, self.get_enq_cnt(), self.file_header.partition_num,
- self.file_header.efp_data_size_kb,
- os.path.basename(self.file_header.file_handle.name), comment)
diff --git a/qpid/cpp/management/python/lib/qlslibs/efp.py b/qpid/cpp/management/python/lib/qlslibs/efp.py
deleted file mode 100644
index 1c751c3d06..0000000000
--- a/qpid/cpp/management/python/lib/qlslibs/efp.py
+++ /dev/null
@@ -1,327 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-Module: qlslibs.efp
-
-Contains empty file pool (EFP) classes.
-"""
-
-import os
-import os.path
-import qlslibs.err
-import shutil
-import uuid
-
-class EfpManager(object):
- """
- Top level class to analyze the Qpid Linear Store (QLS) directory for the partitions that make up the
- Empty File Pool (EFP).
- """
- def __init__(self, directory, disk_space_required_kb):
- if not os.path.exists(directory):
- raise qlslibs.err.InvalidQlsDirectoryNameError(directory)
- self.directory = directory
- self.disk_space_required_kb = disk_space_required_kb
- self.efp_partitions = []
- self.efp_pools = {}
- self.total_num_files = 0
- self.total_cum_file_size_kb = 0
- self.current_efp_partition = None
- def add_file_pool(self, file_size_kb, num_files):
- """ Add an EFP in the specified partition of the specified size containing the specified number of files """
- dir_name = EmptyFilePool.get_directory_name(file_size_kb)
- print 'Adding pool \'%s\' to partition %s' % (dir_name, self.current_efp_partition.partition_number)
- self.total_cum_file_size_kb += self.current_efp_partition.create_new_efp(file_size_kb, num_files)
- self.total_num_files += num_files
- def freshen_file_pool(self, file_size_kb, num_files):
- """ Freshen an EFP in the specified partition and of the specified size to the specified number of files """
- if self.current_efp_partition is None:
- partition_list = self.efp_partitions
- partition_str = 'all partitions'
- else:
- partition_list = [self.current_efp_partition]
- partition_str = 'partition %d' % self.current_efp_partition.partition_number
- if file_size_kb is None:
- pool_str = 'all pools'
- else:
- pool_str = 'pool \'%s\'' % EmptyFilePool.get_directory_name(int(file_size_kb))
- print 'Freshening %s in %s to %d files' % (pool_str, partition_str, num_files)
- for self.current_efp_partition in partition_list: # Partition objects
- if file_size_kb is None:
- file_size_list = self.current_efp_partition.efp_pools.keys()
- else:
- file_size_list = ['%sk' % file_size_kb]
- for file_size in file_size_list:
- efp = self.current_efp_partition.efp_pools[file_size]
- num_files_needed = num_files - efp.get_tot_file_count()
- if num_files_needed > 0:
- self.current_efp_partition.create_new_efp_files(qlslibs.utils.efp_directory_size(file_size),
- num_files_needed)
- else:
- print ' WARNING: Pool %s in partition %s already contains %d files: no action taken' % \
- (self.current_efp_partition.efp_pools[file_size].size_str,
- self.current_efp_partition.partition_number, efp.get_num_files())
- def remove_file_pool(self, file_size_kb):
- """ Remove an existing EFP from the specified partition and of the specified size """
- dir_name = EmptyFilePool.get_directory_name(file_size_kb)
- print 'Removing pool \'%s\' from partition %s' % (dir_name, self.current_efp_partition.partition_number)
- self.efp_partitions.remove(self.current_efp_partition)
- shutil.rmtree(os.path.join(self.current_efp_partition.efp_directory, dir_name))
- def report(self):
- print 'Empty File Pool (EFP) report'
- print '============================'
- print 'Found', len(self.efp_partitions), 'partition(s)'
- if (len(self.efp_partitions)) > 0:
- sorted_efp_partitions = sorted(self.efp_partitions, key=lambda x: x.partition_number)
- EfpPartition.print_report_table_header()
- for ptn in sorted_efp_partitions:
- ptn.print_report_table_line()
- print
- for ptn in sorted_efp_partitions:
- ptn.report()
- def run(self, arg_tup):
- self._analyze_efp()
- if arg_tup is not None:
- _, arg_file_size, arg_num_files, arg_add, arg_remove, arg_freshen, arg_list = arg_tup
- self._check_args(arg_tup)
- if arg_add:
- self.add_file_pool(int(arg_file_size), int(arg_num_files))
- if arg_remove:
- self.remove_file_pool(int(arg_file_size))
- if arg_freshen:
- self.freshen_file_pool(arg_file_size, int(arg_num_files))
- if arg_list:
- self.report()
- def _analyze_efp(self):
- for dir_entry in os.listdir(self.directory):
- try:
- efp_partition = EfpPartition(os.path.join(self.directory, dir_entry), self.disk_space_required_kb)
- efp_partition.scan()
- self.efp_partitions.append(efp_partition)
- for efpl in efp_partition.efp_pools.iterkeys():
- if efpl not in self.efp_pools:
- self.efp_pools[efpl] = []
- self.efp_pools[efpl].append(efp_partition.efp_pools[efpl])
- self.total_num_files += efp_partition.tot_file_count
- self.total_cum_file_size_kb += efp_partition.tot_file_size_kb
- except qlslibs.err.InvalidPartitionDirectoryNameError:
- pass
- def _check_args(self, arg_tup):
- """ Value check of args. The names of partitions and pools are validated against the discovered instances """
- arg_partition, arg_file_size, _, arg_add, arg_remove, arg_freshen, _ = arg_tup
- if arg_partition is not None:
- try:
- if arg_partition[0] == 'p': # string partition name, eg 'p001'
- partition_num = int(arg_partition[1:])
- else: # numeric partition, eg '1'
- partition_num = int(arg_partition)
- found = False
- for partition in self.efp_partitions:
- if partition.partition_number == partition_num:
- self.current_efp_partition = partition
- found = True
- break
- if not found:
- raise qlslibs.err.PartitionDoesNotExistError(arg_partition)
- except ValueError:
- raise qlslibs.err.InvalidPartitionDirectoryNameError(arg_partition)
- if self.current_efp_partition is not None:
- pool_list = self.current_efp_partition.efp_pools.keys()
- efp_directory_name = EmptyFilePool.get_directory_name(int(arg_file_size))
- if arg_add and efp_directory_name in pool_list:
- raise qlslibs.err.PoolDirectoryAlreadyExistsError(efp_directory_name)
- if (arg_remove or arg_freshen) and efp_directory_name not in pool_list:
- raise qlslibs.err.PoolDirectoryDoesNotExistError(efp_directory_name)
-
-class EfpPartition(object):
- """
- Class that represents a EFP partition. Each partition contains one or more Empty File Pools (EFPs).
- """
- PTN_DIR_PREFIX = 'p'
- EFP_DIR_NAME = 'efp'
- def __init__(self, directory, disk_space_required_kb):
- self.directory = directory
- self.partition_number = None
- self.efp_pools = {}
- self.tot_file_count = 0
- self.tot_file_size_kb = 0
- self._validate_partition_directory(disk_space_required_kb)
- def create_new_efp_files(self, file_size_kb, num_files):
- """ Create new EFP files in this partition """
- dir_name = EmptyFilePool.get_directory_name(file_size_kb)
- if dir_name in self.efp_pools.keys():
- efp = self.efp_pools[dir_name]
- else:
- efp = EmptyFilePool(os.path.join(self.directory, EfpPartition.EFP_DIR_NAME), dir_name)
- this_tot_file_size_kb = efp.create_new_efp_files(num_files)
- self.tot_file_size_kb += this_tot_file_size_kb
- self.tot_file_count += num_files
- return this_tot_file_size_kb
- @staticmethod
- def print_report_table_header():
- print 'p_no no_efp tot_files tot_size_kb directory'
- print '---- ------ --------- ----------- ---------'
- def print_report_table_line(self):
- print '%4d %6d %9d %11d %s' % (self.partition_number, len(self.efp_pools), self.tot_file_count,
- self.tot_file_size_kb, self.directory)
- def report(self):
- print 'Partition %s:' % os.path.basename(self.directory)
- if len(self.efp_pools) > 0:
- EmptyFilePool.print_report_table_header()
- for dir_name in self.efp_pools.keys():
- self.efp_pools[dir_name].print_report_table_line()
- else:
- print '<empty - no EFPs found in this partition>'
- print
- def scan(self):
- if os.path.exists(self.directory):
- efp_dir = os.path.join(self.directory, EfpPartition.EFP_DIR_NAME)
- for dir_entry in os.listdir(efp_dir):
- efp = EmptyFilePool(os.path.join(efp_dir, dir_entry), self.partition_number)
- efp.scan()
- self.tot_file_count += efp.get_tot_file_count()
- self.tot_file_size_kb += efp.get_tot_file_size_kb()
- self.efp_pools[dir_entry] = efp
- def _validate_partition_directory(self, disk_space_required_kb):
- if os.path.basename(self.directory)[0] is not EfpPartition.PTN_DIR_PREFIX:
- raise qlslibs.err.InvalidPartitionDirectoryNameError(self.directory)
- try:
- self.partition_number = int(os.path.basename(self.directory)[1:])
- except ValueError:
- raise qlslibs.err.InvalidPartitionDirectoryNameError(self.directory)
- if not qlslibs.utils.has_write_permission(self.directory):
- raise qlslibs.err.WritePermissionError(self.directory)
- if disk_space_required_kb is not None:
- space_avail = qlslibs.utils.get_avail_disk_space(self.directory)
- if space_avail < (disk_space_required_kb * 1024):
- raise qlslibs.err.InsufficientSpaceOnDiskError(self.directory, space_avail,
- disk_space_required_kb * 1024)
-
-class EmptyFilePool(object):
- """
- Class that represents a single Empty File Pool within a partition. Each EFP contains pre-formatted linear store
- journal files (but it may also be empty).
- """
- EFP_DIR_SUFFIX = 'k'
- EFP_JRNL_EXTENTION = '.jrnl'
- EFP_INUSE_DIRNAME = 'in_use'
- EFP_RETURNED_DIRNAME = 'returned'
- def __init__(self, directory, partition_number):
- self.base_dir_name = os.path.basename(directory)
- self.directory = directory
- self.partition_number = partition_number
- self.data_size_kb = None
- self.efp_files = []
- self.in_use_files = []
- self.returned_files = []
- self._validate_efp_directory()
- def create_new_efp_files(self, num_files):
- """ Create one or more new empty journal files of the prescribed size for this EFP """
- this_total_file_size = 0
- for _ in range(num_files):
- this_total_file_size += self._create_new_efp_file()
- return this_total_file_size
- def get_directory(self):
- return self.directory
- @staticmethod
- def get_directory_name(file_size_kb):
- """ Static function to create an EFP directory name from the size of the files it contains """
- return '%dk' % file_size_kb
- def get_tot_file_count(self):
- return len(self.efp_files)
- def get_tot_file_size_kb(self):
- return self.data_size_kb * len(self.efp_files)
- @staticmethod
- def print_report_table_header():
- print ' ---------- efp ------------ --------- in_use ---------- -------- returned ---------'
- print 'data_size_kb file_count tot_file_size_kb file_count tot_file_size_kb file_count tot_file_size_kb efp_directory'
- print '------------ ---------- ---------------- ---------- ---------------- ---------- ---------------- -------------'
- def print_report_table_line(self):
- print '%12d %10d %16d %10d %16d %10d %16d %s' % (self.data_size_kb, len(self.efp_files),
- self.data_size_kb * len(self.efp_files),
- len(self.in_use_files),
- self.data_size_kb * len(self.in_use_files),
- len(self.returned_files),
- self.data_size_kb * len(self.returned_files),
- self.get_directory())
- def scan(self):
- for efp_file in os.listdir(self.directory):
- if efp_file == self.EFP_INUSE_DIRNAME:
- for in_use_file in os.listdir(os.path.join(self.directory, self.EFP_INUSE_DIRNAME)):
- self.in_use_files.append(in_use_file)
- continue
- if efp_file == self.EFP_RETURNED_DIRNAME:
- for returned_file in os.listdir(os.path.join(self.directory, self.EFP_RETURNED_DIRNAME)):
- self.returned_files.append(returned_file)
- continue
- if self._validate_efp_file(os.path.join(self.directory, efp_file)):
- self.efp_files.append(efp_file)
- def _add_efp_file(self, efp_file_name):
- """ Add a single journal file of the appropriate size to this EFP. No file size check is made here. """
- self.efp_files.append(efp_file_name)
- def _create_new_efp_file(self):
- """ Create a single new empty journal file of the prescribed size for this EFP """
- file_name = str(uuid.uuid4()) + EmptyFilePool.EFP_JRNL_EXTENTION
- file_header = qlslibs.jrnl.FileHeader(0, qlslibs.jrnl.FileHeader.MAGIC, qlslibs.utils.DEFAULT_RECORD_VERSION,
- 0, 0, 0)
- file_header.init(None, None, qlslibs.utils.DEFAULT_HEADER_SIZE_SBLKS, self.partition_number, self.data_size_kb,
- 0, 0, 0, 0, 0)
- efh = file_header.encode()
- efh_bytes = len(efh)
- file_handle = open(os.path.join(self.directory, file_name), 'wb')
- file_handle.write(efh)
- file_handle.write('\xff' * (qlslibs.utils.DEFAULT_SBLK_SIZE - efh_bytes))
- file_handle.write('\x00' * (int(self.data_size_kb) * 1024))
- file_handle.close()
- fqfn = os.path.join(self.directory, file_name)
- self._add_efp_file(fqfn)
- return os.path.getsize(fqfn)
- def _validate_efp_directory(self):
- if self.base_dir_name[-1] is not EmptyFilePool.EFP_DIR_SUFFIX:
- raise qlslibs.err.InvalidEfpDirectoryNameError(self.directory)
- try:
- self.data_size_kb = int(os.path.basename(self.base_dir_name)[:-1])
- except ValueError:
- raise qlslibs.err.InvalidEfpDirectoryNameError(self.directory)
- def _validate_efp_file(self, efp_file):
- file_size = os.path.getsize(efp_file)
- expected_file_size = (self.data_size_kb * 1024) + qlslibs.utils.DEFAULT_SBLK_SIZE
- if file_size != expected_file_size:
- print 'WARNING: File %s not of correct size (size=%d, expected=%d): Ignoring' % (efp_file, file_size,
- expected_file_size)
- return False
- file_handle = open(efp_file)
- args = qlslibs.utils.load_args(file_handle, qlslibs.jrnl.RecordHeader)
- file_hdr = qlslibs.jrnl.FileHeader(*args)
- file_hdr.init(file_handle, *qlslibs.utils.load_args(file_handle, qlslibs.jrnl.FileHeader))
- if not file_hdr.is_header_valid(file_hdr):
- file_handle.close()
- return False
- file_hdr.load(file_handle)
- file_handle.close()
- if not file_hdr.is_valid(True):
- return False
- return True
-
-
-# =============================================================================
-
-if __name__ == "__main__":
- print "This is a library, and cannot be executed."
diff --git a/qpid/cpp/management/python/lib/qlslibs/err.py b/qpid/cpp/management/python/lib/qlslibs/err.py
deleted file mode 100644
index f47632ce6a..0000000000
--- a/qpid/cpp/management/python/lib/qlslibs/err.py
+++ /dev/null
@@ -1,261 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-Module: qlslibs.err
-
-Contains error classes.
-"""
-
-# --- Parent classes
-
-class QlsError(Exception):
- """Base error class for QLS errors and exceptions"""
- def __init__(self):
- Exception.__init__(self)
- def __str__(self):
- return ''
-
-class QlsRecordError(QlsError):
- """Base error class for individual records"""
- def __init__(self, file_header, record):
- QlsError.__init__(self)
- self.file_header = file_header
- self.record = record
- def get_expected_fro(self):
- return self.file_header.first_record_offset
- def get_file_number(self):
- return self.file_header.file_num
- def get_queue_name(self):
- return self.file_header.queue_name
- def get_record_id(self):
- return self.record.record_id
- def get_record_offset(self):
- return self.record.file_offset
- def __str__(self):
- return 'queue="%s" file_id=0x%x record_offset=0x%x record_id=0x%x' % \
- (self.file_header.queue_name, self.file_header.file_num, self.record.file_offset, self.record.record_id)
-
-# --- Error classes
-
-class AlreadyLockedError(QlsRecordError):
- """Transactional record to be locked is already locked"""
- def __init__(self, file_header, record):
- QlsRecordError.__init__(self, file_header, record)
- def __str__(self):
- return 'Transactional operation already locked in TransactionMap: ' + QlsRecordError.__str__(self)
-
-class DataSizeError(QlsError):
- """Error class for Data size mismatch"""
- def __init__(self, expected_size, actual_size, data_str):
- QlsError.__init__(self)
- self.expected_size = expected_size
- self.actual_size = actual_size
- self.xid_str = data_str
- def __str__(self):
- return 'Inconsistent data size: expected:%d; actual:%d; data="%s"' % \
- (self.expected_size, self.actual_size, self.data_str)
-
-class DuplicateRecordIdError(QlsRecordError):
- """Duplicate Record Id in Enqueue Map"""
- def __init__(self, file_header, record):
- QlsRecordError.__init__(self, file_header, record)
- def __str__(self):
- return 'Duplicate Record Id in enqueue map: ' + QlsRecordError.__str__(self)
-
-class EnqueueCountUnderflowError(QlsRecordError):
- """Attempted to decrement enqueue count past 0"""
- def __init__(self, file_header, record):
- QlsRecordError.__init__(self, file_header, record)
- def __str__(self):
- return 'Enqueue record count underflow: ' + QlsRecordError.__str__(self)
-
-class ExternalDataError(QlsRecordError):
- """Data present in Enqueue record when external data flag is set"""
- def __init__(self, file_header, record):
- QlsRecordError.__init__(self, file_header, record)
- def __str__(self):
- return 'Data present in external data record: ' + QlsRecordError.__str__(self)
-
-class FirstRecordOffsetMismatchError(QlsRecordError):
- """First Record Offset (FRO) does not match file header"""
- def __init__(self, file_header, record):
- QlsRecordError.__init__(self, file_header, record)
- def __str__(self):
- return 'First record offset mismatch: ' + QlsRecordError.__str__(self) + ' expected_offset=0x%x' % \
- self.file_header.first_record_offset
-
-class InsufficientSpaceOnDiskError(QlsError):
- """Insufficient space on disk"""
- def __init__(self, directory, space_avail, space_requried):
- QlsError.__init__(self)
- self.directory = directory
- self.space_avail = space_avail
- self.space_required = space_requried
- def __str__(self):
- return 'Insufficient space on disk: directory=%s; avail_space=%d required_space=%d' % \
- (self.directory, self.space_avail, self.space_required)
-
-class InvalidClassError(QlsError):
- """Invalid class name or type"""
- def __init__(self, class_name):
- QlsError.__init__(self)
- self.class_name = class_name
- def __str__(self):
- return 'Invalid class name "%s"' % self.class_name
-
-class InvalidEfpDirectoryNameError(QlsError):
- """Invalid EFP directory name - should be NNNNk, where NNNN is a number (of any length)"""
- def __init__(self, directory_name):
- QlsError.__init__(self)
- self.directory_name = directory_name
- def __str__(self):
- return 'Invalid EFP directory name "%s"' % self.directory_name
-
-#class InvalidFileSizeString(QlsError):
-# """Invalid file size string"""
-# def __init__(self, file_size_string):
-# QlsError.__init__(self)
-# self.file_size_string = file_size_string
-# def __str__(self):
-# return 'Invalid file size string "%s"' % self.file_size_string
-
-class InvalidPartitionDirectoryNameError(QlsError):
- """Invalid EFP partition name - should be pNNN, where NNN is a 3-digit partition number"""
- def __init__(self, directory_name):
- QlsError.__init__(self)
- self.directory_name = directory_name
- def __str__(self):
- return 'Invalid partition directory name "%s"' % self.directory_name
-
-class InvalidQlsDirectoryNameError(QlsError):
- """Invalid QLS directory name"""
- def __init__(self, directory_name):
- QlsError.__init__(self)
- self.directory_name = directory_name
- def __str__(self):
- return 'Invalid QLS directory name "%s"' % self.directory_name
-
-class InvalidRecordTypeError(QlsRecordError):
- """Error class for any operation using an invalid record type"""
- def __init__(self, file_header, record, error_msg):
- QlsRecordError.__init__(self, file_header, record)
- self.error_msg = error_msg
- def __str__(self):
- return 'Invalid record type: ' + QlsRecordError.__str__(self) + ':' + self.error_msg
-
-class InvalidRecordVersionError(QlsRecordError):
- """Invalid record version"""
- def __init__(self, file_header, record, expected_version):
- QlsRecordError.__init__(self, file_header, record)
- self.expected_version = expected_version
- def __str__(self):
- return 'Invalid record version: queue="%s" ' + QlsRecordError.__str__(self) + \
- ' ver_found=0x%x ver_expected=0x%x' % (self.record_header.version, self.expected_version)
-
-class NoMoreFilesInJournalError(QlsError):
- """Raised when trying to obtain the next file in the journal and there are no more files"""
- def __init__(self, queue_name):
- QlsError.__init__(self)
- self.queue_name = queue_name
- def __str__(self):
- return 'No more journal files in queue "%s"' % self.queue_name
-
-class NonTransactionalRecordError(QlsRecordError):
- """Transactional operation on non-transactional record"""
- def __init__(self, file_header, record, operation):
- QlsRecordError.__init__(self, file_header, record)
- self.operation = operation
- def __str__(self):
- return 'Transactional operation on non-transactional record: ' + QlsRecordError.__str__() + \
- ' operation=%s' % self.operation
-
-class PartitionDoesNotExistError(QlsError):
- """Partition name does not exist on disk"""
- def __init__(self, partition_directory):
- QlsError.__init__(self)
- self.partition_directory = partition_directory
- def __str__(self):
- return 'Partition %s does not exist' % self.partition_directory
-
-class PoolDirectoryAlreadyExistsError(QlsError):
- """Pool directory already exists"""
- def __init__(self, pool_directory):
- QlsError.__init__(self)
- self.pool_directory = pool_directory
- def __str__(self):
- return 'Pool directory %s already exists' % self.pool_directory
-
-class PoolDirectoryDoesNotExistError(QlsError):
- """Pool directory does not exist"""
- def __init__(self, pool_directory):
- QlsError.__init__(self)
- self.pool_directory = pool_directory
- def __str__(self):
- return 'Pool directory %s does not exist' % self.pool_directory
-
-class RecordIdNotFoundError(QlsRecordError):
- """Record Id not found in enqueue map"""
- def __init__(self, file_header, record):
- QlsRecordError.__init__(self, file_header, record)
- def __str__(self):
- return 'Record Id not found in enqueue map: ' + QlsRecordError.__str__()
-
-class RecordNotLockedError(QlsRecordError):
- """Record in enqueue map is not locked"""
- def __init__(self, file_header, record):
- QlsRecordError.__init__(self, file_header, record)
- def __str__(self):
- return 'Record in enqueue map is not locked: ' + QlsRecordError.__str__()
-
-class UnexpectedEndOfFileError(QlsError):
- """The bytes read from a file is less than that expected"""
- def __init__(self, size_read, size_expected, file_offset, file_name):
- QlsError.__init__(self)
- self.size_read = size_read
- self.size_expected = size_expected
- self.file_offset = file_offset
- self.file_name = file_name
- def __str__(self):
- return 'Tried to read %d at offset %d in file "%s"; only read %d' % \
- (self.size_read, self.file_offset, self.file_name, self.size_expected)
-
-class WritePermissionError(QlsError):
- """No write permission"""
- def __init__(self, directory):
- QlsError.__init__(self)
- self.directory = directory
- def __str__(self):
- return 'No write permission in directory %s' % self.directory
-
-class XidSizeError(QlsError):
- """Error class for Xid size mismatch"""
- def __init__(self, expected_size, actual_size, xid_str):
- QlsError.__init__(self)
- self.expected_size = expected_size
- self.actual_size = actual_size
- self.xid_str = xid_str
- def __str__(self):
- return 'Inconsistent xid size: expected:%d; actual:%d; xid="%s"' % \
- (self.expected_size, self.actual_size, self.xid_str)
-
-# =============================================================================
-
-if __name__ == "__main__":
- print "This is a library, and cannot be executed."
diff --git a/qpid/cpp/management/python/lib/qlslibs/jrnl.py b/qpid/cpp/management/python/lib/qlslibs/jrnl.py
deleted file mode 100644
index 5e65890393..0000000000
--- a/qpid/cpp/management/python/lib/qlslibs/jrnl.py
+++ /dev/null
@@ -1,394 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-Module: qlslibs.jrnl
-
-Contains journal record classes.
-"""
-
-import qlslibs.err
-import qlslibs.utils
-import string
-import struct
-import time
-
-class RecordHeader(object):
- FORMAT = '<4s2H2Q'
- def __init__(self, file_offset, magic, version, user_flags, serial, record_id):
- self.file_offset = file_offset
- self.magic = magic
- self.version = version
- self.user_flags = user_flags
- self.serial = serial
- self.record_id = record_id
- self.warnings = []
- self.truncated_flag = False
- def encode(self):
- return struct.pack(RecordHeader.FORMAT, self.magic, self.version, self.user_flags, self.serial, self.record_id)
- def load(self, file_handle):
- pass
- @staticmethod
- def discriminate(args):
- """Use the last char in the header magic to determine the header type"""
- return CLASSES.get(args[1][-1], RecordHeader)
- def is_empty(self):
- """Return True if this record is empty (ie has a magic of 0x0000"""
- return self.magic == '\x00'*4
- def is_header_valid(self, file_header):
- """Check that this record is valid"""
- if self.is_empty():
- return False
- if self.magic[:3] != 'QLS' or self.magic[3] not in ['a', 'c', 'd', 'e', 'f', 'x']:
- return False
- if self.magic[-1] != 'x':
- if self.version != qlslibs.utils.DEFAULT_RECORD_VERSION:
- raise qlslibs.err.InvalidRecordVersionError(file_header, self, qlslibs.utils.DEFAULT_RECORD_VERSION)
- if self.serial != file_header.serial:
- return False
- return True
- def to_rh_string(self):
- """Return string representation of this header"""
- if self.is_empty():
- return '0x%08x: <empty>' % (self.file_offset)
- if self.magic[-1] == 'x':
- return '0x%08x: [X]' % (self.file_offset)
- if self.magic[-1] in ['a', 'c', 'd', 'e', 'f', 'x']:
- return '0x%08x: [%c v=%d f=0x%04x rid=0x%x]' % \
- (self.file_offset, self.magic[-1].upper(), self.version, self.user_flags, self.record_id)
- return '0x%08x: <error, unknown magic "%s" (possible overwrite boundary?)>' % (self.file_offset, self.magic)
- def _get_warnings(self):
- warn_str = ''
- for warn in self.warnings:
- warn_str += '<%s>' % warn
- return warn_str
-
-class RecordTail(object):
- FORMAT = '<4sL2Q'
- def __init__(self, file_handle): # TODO - clumsy, only allows reading from disk. Move all disk stuff to laod()
- self.file_offset = file_handle.tell() if file_handle is not None else 0
- self.complete = False
- self.read_size = struct.calcsize(RecordTail.FORMAT)
- self.fbin = file_handle.read(self.read_size) if file_handle is not None else None
- self.valid_flag = None
- if self.fbin is not None and len(self.fbin) >= self.read_size:
- self.complete = True
- self.xmagic, self.checksum, self.serial, self.record_id = struct.unpack(RecordTail.FORMAT, self.fbin)
- def load(self, file_handle):
- """Used to continue load of RecordTail object if it is split between files"""
- if not self.is_complete:
- self.fbin += file_handle.read(self.read_size - len(self.fbin))
- if (len(self.fbin)) >= self.read_size:
- self.complete = True
- self.xmagic, self.checksum, self.serial, self.record_id = struct.unpack(RecordTail.FORMAT, self.fbin)
- def is_complete(self):
- return self.complete
- def is_valid(self, record):
- if self.valid_flag is None:
- if not self.complete:
- return False
- self.valid_flag = qlslibs.utils.inv_str(self.xmagic) == record.magic and \
- self.serial == record.serial and \
- self.record_id == record.record_id and \
- qlslibs.utils.adler32(record.checksum_encode()) == self.checksum
- return self.valid_flag
- def to_string(self):
- """Return a string representation of the this RecordTail instance"""
- if self.valid_flag is not None:
- if not self.valid_flag:
- return '[INVALID RECORD TAIL]'
- magic = qlslibs.utils.inv_str(self.xmagic)
- magic_char = magic[-1].upper() if magic[-1] in string.printable else '?'
- return '[%c cs=0x%08x rid=0x%x]' % (magic_char, self.checksum, self.record_id)
-
-class FileHeader(RecordHeader):
- FORMAT = '<2H4x5QH'
- MAGIC = 'QLSf'
- def init(self, file_handle, _, file_header_size_sblks, partition_num, efp_data_size_kb, first_record_offset,
- timestamp_sec, timestamp_ns, file_num, queue_name_len):
- self.file_handle = file_handle
- self.file_header_size_sblks = file_header_size_sblks
- self.partition_num = partition_num
- self.efp_data_size_kb = efp_data_size_kb
- self.first_record_offset = first_record_offset
- self.timestamp_sec = timestamp_sec
- self.timestamp_ns = timestamp_ns
- self.file_num = file_num
- self.queue_name_len = queue_name_len
- self.queue_name = None
- def encode(self):
- if self.queue_name is None:
- return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.file_header_size_sblks, \
- self.partition_num, self.efp_data_size_kb, \
- self.first_record_offset, self.timestamp_sec, \
- self.timestamp_ns, self.file_num, 0)
- return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.file_header_size_sblks, self.partition_num, \
- self.efp_data_size_kb, self.first_record_offset, \
- self.timestamp_sec, self.timestamp_ns, self.file_num, \
- self.queue_name_len) + self.queue_name
- def get_file_size(self):
- """Sum of file header size and data size"""
- return (self.file_header_size_sblks * qlslibs.utils.DEFAULT_SBLK_SIZE) + (self.efp_data_size_kb * 1024)
- def load(self, file_handle):
- self.queue_name = file_handle.read(self.queue_name_len)
- def is_end_of_file(self):
- return self.file_handle.tell() >= self.get_file_size()
- def is_valid(self, is_empty):
- if not RecordHeader.is_header_valid(self, self):
- return False
- if self.file_handle is None or self.file_header_size_sblks == 0 or self.partition_num == 0 or \
- self.efp_data_size_kb == 0:
- return False
- if is_empty:
- if self.first_record_offset != 0 or self.timestamp_sec != 0 or self.timestamp_ns != 0 or \
- self.file_num != 0 or self.queue_name_len != 0:
- return False
- else:
- if self.first_record_offset == 0 or self.timestamp_sec == 0 or self.timestamp_ns == 0 or \
- self.file_num == 0 or self.queue_name_len == 0:
- return False
- if self.queue_name is None:
- return False
- if len(self.queue_name) != self.queue_name_len:
- return False
- return True
- def timestamp_str(self):
- """Get the timestamp of this record in string format"""
- now = time.gmtime(self.timestamp_sec)
- fstr = '%%a %%b %%d %%H:%%M:%%S.%09d %%Y' % (self.timestamp_ns)
- return time.strftime(fstr, now)
- def to_string(self):
- """Return a string representation of the this FileHeader instance"""
- return '%s fnum=0x%x fro=0x%08x p=%d s=%dk t=%s %s' % (self.to_rh_string(), self.file_num,
- self.first_record_offset, self.partition_num,
- self.efp_data_size_kb, self.timestamp_str(),
- self._get_warnings())
-
-class EnqueueRecord(RecordHeader):
- FORMAT = '<2Q'
- MAGIC = 'QLSe'
- EXTERNAL_FLAG_MASK = 0x20
- TRANSIENT_FLAG_MASK = 0x10
- def init(self, _, xid_size, data_size):
- self.xid_size = xid_size
- self.data_size = data_size
- self.xid = None
- self.xid_complete = False
- self.data = None
- self.data_complete = False
- self.record_tail = None
- def checksum_encode(self): # encode excluding record tail
- cs_bytes = RecordHeader.encode(self) + struct.pack(self.FORMAT, self.xid_size, self.data_size)
- if self.xid is not None:
- cs_bytes += self.xid
- if self.data is not None:
- cs_bytes += self.data
- return cs_bytes
- def is_external(self):
- return self.user_flags & EnqueueRecord.EXTERNAL_FLAG_MASK > 0
- def is_transient(self):
- return self.user_flags & EnqueueRecord.TRANSIENT_FLAG_MASK > 0
- def is_valid(self, journal_file):
- if not RecordHeader.is_header_valid(self, journal_file.file_header):
- return False
- if not (self.xid_complete and self.data_complete):
- return False
- if self.xid_size > 0 and len(self.xid) != self.xid_size:
- return False
- if self.data_size > 0 and len(self.data) != self.data_size:
- return False
- if self.xid_size > 0 or self.data_size > 0:
- if self.record_tail is None:
- return False
- if not self.record_tail.is_valid(self):
- return False
- return True
- def load(self, file_handle):
- """Return True when load is incomplete and must be called again with new file handle"""
- self.xid, self.xid_complete = qlslibs.utils.load_data(file_handle, self.xid, self.xid_size)
- if not self.xid_complete:
- return True
- if self.is_external():
- self.data_complete = True
- else:
- self.data, self.data_complete = qlslibs.utils.load_data(file_handle, self.data, self.data_size)
- if not self.data_complete:
- return True
- if self.xid_size > 0 or self.data_size > 0:
- if self.record_tail is None:
- self.record_tail = RecordTail(file_handle)
- elif not self.record_tail.is_complete():
- self.record_tail.load(file_handle) # Continue loading partially loaded tail
- if self.record_tail.is_complete():
- self.record_tail.is_valid(self)
- else:
- return True
- return False
- def to_string(self, show_xid_flag, show_data_flag, txtest_flag):
- """Return a string representation of the this EnqueueRecord instance"""
- if self.truncated_flag:
- return '%s xid(%d) data(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self),
- self.xid_size, self.data_size)
- if self.record_tail is None:
- record_tail_str = ''
- else:
- record_tail_str = self.record_tail.to_string()
- return '%s %s %s %s %s %s' % (self.to_rh_string(),
- qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag),
- qlslibs.utils.format_data(self.data, self.data_size, show_data_flag, txtest_flag),
- record_tail_str, self._print_flags(), self._get_warnings())
- def _print_flags(self):
- """Utility function to decode the flags field in the header and print a string representation"""
- fstr = ''
- if self.is_transient():
- fstr = '[TRANSIENT'
- if self.is_external():
- if len(fstr) > 0:
- fstr += ',EXTERNAL'
- else:
- fstr = '*EXTERNAL'
- if len(fstr) > 0:
- fstr += ']'
- return fstr
-
-class DequeueRecord(RecordHeader):
- FORMAT = '<2Q'
- MAGIC = 'QLSd'
- TXN_COMPLETE_COMMIT_FLAG = 0x10
- def init(self, _, dequeue_record_id, xid_size):
- self.dequeue_record_id = dequeue_record_id
- self.xid_size = xid_size
- self.transaction_prepared_list_flag = False
- self.xid = None
- self.xid_complete = False
- self.record_tail = None
- def checksum_encode(self): # encode excluding record tail
- return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.dequeue_record_id, self.xid_size) + \
- self.xid
- def is_transaction_complete_commit(self):
- return self.user_flags & DequeueRecord.TXN_COMPLETE_COMMIT_FLAG > 0
- def is_valid(self, journal_file):
- if not RecordHeader.is_header_valid(self, journal_file.file_header):
- return False
- if self.xid_size > 0:
- if not self.xid_complete:
- return False
- if self.xid_size > 0 and len(self.xid) != self.xid_size:
- return False
- if self.record_tail is None:
- return False
- if not self.record_tail.is_valid(self):
- return False
- return True
- def load(self, file_handle):
- """Return True when load is incomplete and must be called again with new file handle"""
- self.xid, self.xid_complete = qlslibs.utils.load_data(file_handle, self.xid, self.xid_size)
- if not self.xid_complete:
- return True
- if self.xid_size > 0:
- if self.record_tail is None:
- self.record_tail = RecordTail(file_handle)
- elif not self.record_tail.is_complete():
- self.record_tail.load(file_handle)
- if self.record_tail.is_complete():
- self.record_tail.is_valid(self)
- else:
- return True
- return False
- def to_string(self, show_xid_flag, _u1, _u2):
- """Return a string representation of the this DequeueRecord instance"""
- if self.truncated_flag:
- return '%s xid(%d) drid=0x%x [Truncated, no more files in journal]' % (RecordHeader.__str__(self),
- self.xid_size,
- self.dequeue_record_id)
- if self.record_tail is None:
- record_tail_str = ''
- else:
- record_tail_str = self.record_tail.to_string()
- return '%s drid=0x%x %s %s %s %s' % (self.to_rh_string(), self.dequeue_record_id,
- qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag),
- record_tail_str, self._print_flags(), self._get_warnings())
- def _print_flags(self):
- """Utility function to decode the flags field in the header and print a string representation"""
- if self.transaction_prepared_list_flag:
- if self.is_transaction_complete_commit():
- return '[COMMIT]'
- else:
- return '[ABORT]'
- return ''
-
-class TransactionRecord(RecordHeader):
- FORMAT = '<Q'
- MAGIC_ABORT = 'QLSa'
- MAGIC_COMMIT = 'QLSc'
- def init(self, _, xid_size):
- self.xid_size = xid_size
- self.xid = None
- self.xid_complete = False
- self.record_tail = None
- def checksum_encode(self): # encode excluding record tail
- return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.xid_size) + self.xid
- def is_valid(self, journal_file):
- if not RecordHeader.is_header_valid(self, journal_file.file_header):
- return False
- if not self.xid_complete or len(self.xid) != self.xid_size:
- return False
- if self.record_tail is None:
- return False
- if not self.record_tail.is_valid(self):
- return False
- return True
- def load(self, file_handle):
- """Return True when load is incomplete and must be called again with new file handle"""
- self.xid, self.xid_complete = qlslibs.utils.load_data(file_handle, self.xid, self.xid_size)
- if not self.xid_complete:
- return True
- if self.xid_size > 0:
- if self.record_tail is None:
- self.record_tail = RecordTail(file_handle)
- elif not self.record_tail.is_complete():
- self.record_tail.load(file_handle)
- if self.record_tail.is_complete():
- self.record_tail.is_valid(self)
- else:
- return True
- return False
- def to_string(self, show_xid_flag, _u1, _u2):
- """Return a string representation of the this TransactionRecord instance"""
- if self.truncated_flag:
- return '%s xid(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), self.xid_size)
- if self.record_tail is None:
- record_tail_str = ''
- else:
- record_tail_str = self.record_tail.to_string()
- return '%s %s %s %s' % (self.to_rh_string(),
- qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag),
- record_tail_str, self._get_warnings())
-
-# =============================================================================
-
-CLASSES = {
- 'a': TransactionRecord,
- 'c': TransactionRecord,
- 'd': DequeueRecord,
- 'e': EnqueueRecord,
-}
-
-if __name__ == '__main__':
- print 'This is a library, and cannot be executed.'
diff --git a/qpid/cpp/management/python/lib/qlslibs/utils.py b/qpid/cpp/management/python/lib/qlslibs/utils.py
deleted file mode 100644
index dfa760a839..0000000000
--- a/qpid/cpp/management/python/lib/qlslibs/utils.py
+++ /dev/null
@@ -1,216 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-Module: qlslibs.utils
-
-Contains helper functions for qpid_qls_analyze.
-"""
-
-import os
-import qlslibs.jrnl
-import stat
-import string
-import struct
-import subprocess
-import zlib
-
-DEFAULT_DBLK_SIZE = 128
-DEFAULT_SBLK_SIZE = 4096 # 32 dblks
-DEFAULT_SBLK_SIZE_KB = DEFAULT_SBLK_SIZE / 1024
-DEFAULT_RECORD_VERSION = 2
-DEFAULT_HEADER_SIZE_SBLKS = 1
-
-def adler32(data):
- """return the adler32 checksum of data"""
- return zlib.adler32(data) & 0xffffffff
-
-def create_record(magic, uflags, journal_file, record_id, dequeue_record_id, xid, data):
- """Helper function to construct a record with xid, data (where applicable) and consistent tail with checksum"""
- record_class = qlslibs.jrnl.CLASSES.get(magic[-1])
- record = record_class(0, magic, DEFAULT_RECORD_VERSION, uflags, journal_file.file_header.serial, record_id)
- xid_length = len(xid) if xid is not None else 0
- if isinstance(record, qlslibs.jrnl.EnqueueRecord):
- data_length = len(data) if data is not None else 0
- record.init(None, xid_length, data_length)
- elif isinstance(record, qlslibs.jrnl.DequeueRecord):
- record.init(None, dequeue_record_id, xid_length)
- elif isinstance(record, qlslibs.jrnl.TransactionRecord):
- record.init(None, xid_length)
- else:
- raise qlslibs.err.InvalidClassError(record.__class__.__name__)
- if xid is not None:
- record.xid = xid
- record.xid_complete = True
- if data is not None:
- record.data = data
- record.data_complete = True
- record.record_tail = _mk_record_tail(record)
- return record
-
-def efp_directory_size(directory_name):
- """"Decode the directory name in the format NNNk to a numeric size, where NNN is a number string"""
- try:
- if directory_name[-1] == 'k':
- return int(directory_name[:-1])
- except ValueError:
- pass
- return 0
-
-def format_data(data, data_size=None, show_data_flag=True, txtest_flag=False):
- """Format binary data for printing"""
- return _format_binary(data, data_size, show_data_flag, 'data', qlslibs.err.DataSizeError, False, txtest_flag)
-
-def format_xid(xid, xid_size=None, show_xid_flag=True):
- """Format binary XID for printing"""
- return _format_binary(xid, xid_size, show_xid_flag, 'xid', qlslibs.err.XidSizeError, True, False)
-
-def get_avail_disk_space(path):
- df_proc = subprocess.Popen(["df", path], stdout=subprocess.PIPE)
- output = df_proc.communicate()[0]
- return int(output.split('\n')[1].split()[3])
-
-def has_write_permission(path):
- stat_info = os.stat(path)
- return bool(stat_info.st_mode & stat.S_IRGRP)
-
-def inv_str(in_string):
- """Perform a binary 1's compliment (invert all bits) on a binary string"""
- istr = ''
- for index in range(0, len(in_string)):
- istr += chr(~ord(in_string[index]) & 0xff)
- return istr
-
-def load(file_handle, klass):
- """Load a record of class klass from a file"""
- args = load_args(file_handle, klass)
- subclass = klass.discriminate(args)
- result = subclass(*args) # create instance of record
- if subclass != klass:
- result.init(*load_args(file_handle, subclass))
- return result
-
-def load_args(file_handle, klass):
- """Load the arguments from class klass"""
- size = struct.calcsize(klass.FORMAT)
- foffs = file_handle.tell(),
- fbin = file_handle.read(size)
- if len(fbin) != size:
- raise qlslibs.err.UnexpectedEndOfFileError(len(fbin), size, foffs, file_handle.name)
- return foffs + struct.unpack(klass.FORMAT, fbin)
-
-def load_data(file_handle, element, element_size):
- """Read element_size bytes of binary data from file_handle into element"""
- if element_size == 0:
- return element, True
- if element is None:
- element = file_handle.read(element_size)
- else:
- read_size = element_size - len(element)
- element += file_handle.read(read_size)
- return element, len(element) == element_size
-
-def skip(file_handle, boundary):
- """Read and discard disk bytes until the next multiple of boundary"""
- if not file_handle.closed:
- file_handle.read(_rem_bytes_in_block(file_handle, boundary))
-
-#--- protected functions ---
-
-def _format_binary(bin_str, bin_size, show_bin_flag, prefix, err_class, hex_num_flag, txtest_flag):
- """Format binary XID for printing"""
- if bin_str is None and bin_size is not None:
- if bin_size > 0:
- raise err_class(bin_size, len(bin_str), bin_str)
- return ''
- if bin_size is None:
- bin_size = len(bin_str)
- elif bin_size != len(bin_str):
- raise err_class(bin_size, len(bin_str), bin_str)
- out_str = '%s(%d)' % (prefix, bin_size)
- if txtest_flag:
- out_str += '=\'%s\'' % _txtest_msg_str(bin_str)
- elif show_bin_flag:
- if _is_printable(bin_str):
- binstr = '"%s"' % _split_str(bin_str)
- elif hex_num_flag:
- binstr = '0x%s' % _str_to_hex_num(bin_str)
- else:
- binstr = _hex_split_str(bin_str, 50, 10, 10)
- out_str += '=\'%s\'' % binstr
- return out_str
-
-def _hex_str(in_str, begin, end):
- """Return a binary string as a hex string"""
- hstr = ''
- for index in range(begin, end):
- if _is_printable(in_str[index]):
- hstr += in_str[index]
- else:
- hstr += '\\%02x' % ord(in_str[index])
- return hstr
-
-def _hex_split_str(in_str, split_size, head_size, tail_size):
- """Split a hex string into two parts separated by an ellipsis"""
- if len(in_str) <= split_size:
- return _hex_str(in_str, 0, len(in_str))
- return _hex_str(in_str, 0, head_size) + ' ... ' + _hex_str(in_str, len(in_str)-tail_size, len(in_str))
-
-def _txtest_msg_str(bin_str):
- """Extract the message number used in qpid-txtest"""
- msg_index = bin_str.find('msg')
- if msg_index >= 0:
- end_index = bin_str.find('\x00', msg_index)
- assert end_index >= 0
- return bin_str[msg_index:end_index]
- return None
-
-def _is_printable(in_str):
- """Return True if in_str in printable; False otherwise."""
- for this_char in in_str:
- if this_char not in string.letters and this_char not in string.digits and this_char not in string.punctuation:
- return False
- return True
-
-def _mk_record_tail(record):
- record_tail = qlslibs.jrnl.RecordTail(None)
- record_tail.xmagic = inv_str(record.magic)
- record_tail.checksum = adler32(record.checksum_encode())
- record_tail.serial = record.serial
- record_tail.record_id = record.record_id
- return record_tail
-
-def _rem_bytes_in_block(file_handle, block_size):
- """Return the remaining bytes in a block"""
- foffs = file_handle.tell()
- return (_size_in_blocks(foffs, block_size) * block_size) - foffs
-
-def _size_in_blocks(size, block_size):
- """Return the size in terms of data blocks"""
- return int((size + block_size - 1) / block_size)
-
-def _split_str(in_str, split_size = 50):
- """Split a string into two parts separated by an ellipsis if it is longer than split_size"""
- if len(in_str) < split_size:
- return in_str
- return in_str[:25] + ' ... ' + in_str[-25:]
-
-def _str_to_hex_num(in_str):
- """Turn a string into a hex number representation, little endian assumed (ie LSB is first, MSB is last)"""
- return ''.join(x.encode('hex') for x in reversed(in_str))