diff options
| author | Robert Gemmell <robbie@apache.org> | 2016-07-05 21:55:35 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2016-07-05 21:55:35 +0000 |
| commit | f160cb6566c17945f7ebc4f3a752b2cc6a051685 (patch) | |
| tree | 809f04fc1967c22e5abc52de07602555bed0e920 /qpid/cpp/management/python/lib | |
| parent | ebb276cca41582b73223b55eff9f2d4386f4f746 (diff) | |
| download | qpid-python-f160cb6566c17945f7ebc4f3a752b2cc6a051685.tar.gz | |
QPID-7207: remove cpp and python subdirs from svn trunk, they have migrated to their own git repositories
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1751566 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/management/python/lib')
18 files changed, 0 insertions, 8384 deletions
diff --git a/qpid/cpp/management/python/lib/.gitignore b/qpid/cpp/management/python/lib/.gitignore deleted file mode 100644 index 628d81888c..0000000000 --- a/qpid/cpp/management/python/lib/.gitignore +++ /dev/null @@ -1,22 +0,0 @@ - -# -# -# -# -# http://www.apache.org/licenses/LICENSE-2.0 -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# "License"); you may not use this file except in compliance -# KIND, either express or implied. See the License for the -# Licensed to the Apache Software Foundation (ASF) under one -# Unless required by applicable law or agreed to in writing, -# distributed with this work for additional information -# or more contributor license agreements. See the NOTICE file -# regarding copyright ownership. The ASF licenses this file -# software distributed under the License is distributed on an -# specific language governing permissions and limitations -# to you under the Apache License, Version 2.0 (the -# under the License. -# with the License. You may obtain a copy of the License at -/qpid-configc -/qpid-hac -/qpid-routec diff --git a/qpid/cpp/management/python/lib/README.txt b/qpid/cpp/management/python/lib/README.txt deleted file mode 100644 index cabeb1be02..0000000000 --- a/qpid/cpp/management/python/lib/README.txt +++ /dev/null @@ -1,4 +0,0 @@ -To run these programs, please set PYTHONPATH to include: - - qpid/python - qpid/extras/qmf/src/py diff --git a/qpid/cpp/management/python/lib/qlslibs/__init__.py b/qpid/cpp/management/python/lib/qlslibs/__init__.py deleted file mode 100644 index d8a500d9d8..0000000000 --- a/qpid/cpp/management/python/lib/qlslibs/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - diff --git a/qpid/cpp/management/python/lib/qlslibs/analyze.py b/qpid/cpp/management/python/lib/qlslibs/analyze.py deleted file mode 100644 index 8c5de05b9e..0000000000 --- a/qpid/cpp/management/python/lib/qlslibs/analyze.py +++ /dev/null @@ -1,606 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Module: qlslibs.analyze - -Classes for recovery and analysis of a Qpid Linear Store (QLS). -""" - -import os.path -import qlslibs.err -import qlslibs.jrnl -import qlslibs.utils - -class HighCounter(object): - def __init__(self): - self.num = 0 - def check(self, num): - if self.num < num: - self.num = num - def get(self): - return self.num - def get_next(self): - self.num += 1 - return self.num - -class JournalRecoveryManager(object): - TPL_DIR_NAME = 'tpl2' - JRNL_DIR_NAME = 'jrnl2' - def __init__(self, directory, args): - if not os.path.exists(directory): - raise qlslibs.err.InvalidQlsDirectoryNameError(directory) - self.directory = directory - self.args = args - self.tpl = None - self.journals = {} - self.high_rid_counter = HighCounter() - self.prepared_list = None - def report(self): - self._reconcile_transactions(self.prepared_list, self.args.txn) - if self.tpl is not None: - self.tpl.report(self.args) - for queue_name in sorted(self.journals.keys()): - self.journals[queue_name].report(self.args) - def run(self): - tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME) - if os.path.exists(tpl_dir): - self.tpl = Journal(tpl_dir, None, self.args) - self.tpl.recover(self.high_rid_counter) - if self.args.show_recovery_recs or self.args.show_all_recs: - print - jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME) - self.prepared_list = self.tpl.txn_map.get_prepared_list() if self.tpl is not None else {} - if os.path.exists(jrnl_dir): - for dir_entry in sorted(os.listdir(jrnl_dir)): - jrnl = Journal(os.path.join(jrnl_dir, dir_entry), self.prepared_list, self.args) - jrnl.recover(self.high_rid_counter) - self.journals[jrnl.get_queue_name()] = jrnl - if self.args.show_recovery_recs or self.args.show_all_recs: - print - print - def _reconcile_transactions(self, prepared_list, txn_flag): - print 'Transaction reconciliation report:' - print '==================================' - print 'Transaction Prepared List (TPL) contains %d open transaction(s):' % len(prepared_list) - for xid in prepared_list.keys(): - commit_flag = prepared_list[xid] - if commit_flag is None: - status = '[Prepared, neither committed nor aborted - assuming commit]' - elif commit_flag: - status = '[Prepared, but interrupted during commit phase]' - else: - status = '[Prepared, but interrupted during abort phase]' - print ' ', qlslibs.utils.format_xid(xid), status - if prepared_list[xid] is None: # Prepared, but not committed or aborted - enqueue_record = self.tpl.get_txn_map_record(xid)[0][1] - dequeue_record = qlslibs.utils.create_record(qlslibs.jrnl.DequeueRecord.MAGIC, \ - qlslibs.jrnl.DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \ - self.tpl.current_journal_file, \ - self.high_rid_counter.get_next(), \ - enqueue_record.record_id, xid, None) - if txn_flag: - self.tpl.add_record(dequeue_record) - print - print 'Open transactions found in queues:' - print '----------------------------------' - for queue_name in sorted(self.journals.keys()): - self.journals[queue_name].reconcile_transactions(prepared_list, txn_flag) - print - if len(prepared_list) > 0: - print 'Creating commit records for the following prepared transactions in TPL:' - for xid in prepared_list.keys(): - print ' ', qlslibs.utils.format_xid(xid) - transaction_record = qlslibs.utils.create_record(qlslibs.jrnl.TransactionRecord.MAGIC_COMMIT, 0, \ - self.tpl.current_journal_file, \ - self.high_rid_counter.get_next(), None, xid, None) - if txn_flag: - self.tpl.add_record(transaction_record) - print - -class EnqueueMap(object): - """ - Map of enqueued records in a QLS journal - """ - def __init__(self, journal): - self.journal = journal - self.enq_map = {} - def add(self, journal_file, enq_record, locked_flag): - if enq_record.record_id in self.enq_map: - raise qlslibs.err.DuplicateRecordIdError(self.journal.current_file_header, enq_record) - self.enq_map[enq_record.record_id] = [journal_file, enq_record, locked_flag] - def contains(self, rid): - """Return True if the map contains the given rid""" - return rid in self.enq_map - def delete(self, journal_file, deq_record): - if deq_record.dequeue_record_id in self.enq_map: - enq_list = self.enq_map[deq_record.dequeue_record_id] - del self.enq_map[deq_record.dequeue_record_id] - return enq_list - else: - raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, deq_record) - def get(self, record_id): - if record_id in self.enq_map: - return self.enq_map[record_id] - return None - def lock(self, journal_file, dequeue_record): - if dequeue_record.dequeue_record_id not in self.enq_map: - raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record) - self.enq_map[dequeue_record.dequeue_record_id][2] = True - def report_str(self, args): - """Return a string containing a text report for all records in the map""" - if len(self.enq_map) == 0: - return 'No enqueued records found.' - rstr = '%d enqueued records found' % len(self.enq_map) - if args.show_recovered_recs: - rstr += ":" - rid_list = self.enq_map.keys() - rid_list.sort() - for rid in rid_list: - journal_file, record, locked_flag = self.enq_map[rid] - rstr += '\n 0x%x:' % journal_file.file_header.file_num - rstr += record.to_string(args.show_xids, args.show_data, args.txtest) - if locked_flag: - rstr += ' [LOCKED]' - else: - rstr += '.' - return rstr - def unlock(self, journal_file, dequeue_record): - """Set the transaction lock for a given record_id to False""" - if dequeue_record.dequeue_record_id in self.enq_map: - if self.enq_map[dequeue_record.dequeue_record_id][2]: - self.enq_map[dequeue_record.dequeue_record_id][2] = False - else: - raise qlslibs.err.RecordNotLockedError(journal_file.file_header, dequeue_record) - else: - raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record) - -class TransactionMap(object): - """ - Map of open transactions used while recovering a QLS journal - """ - def __init__(self, enq_map): - self.txn_map = {} - self.enq_map = enq_map - def abort(self, xid): - """Perform an abort operation for the given xid record""" - for journal_file, record, _ in self.txn_map[xid]: - if isinstance(record, qlslibs.jrnl.DequeueRecord): - if self.enq_map.contains(record.dequeue_record_id): - self.enq_map.unlock(journal_file, record) - else: - journal_file.decr_enq_cnt(record) - del self.txn_map[xid] - def add(self, journal_file, record): - if record.xid is None: - raise qlslibs.err.NonTransactionalRecordError(journal_file.file_header, record, 'TransactionMap.add()') - if isinstance(record, qlslibs.jrnl.DequeueRecord): - try: - self.enq_map.lock(journal_file, record) - except qlslibs.err.RecordIdNotFoundError: - # Not in emap, look for rid in tmap - should not happen in practice - txn_op = self._find_record_id(record.xid, record.dequeue_record_id) - if txn_op != None: - if txn_op[2]: - raise qlslibs.err.AlreadyLockedError(journal_file.file_header, record) - txn_op[2] = True - if record.xid in self.txn_map: - self.txn_map[record.xid].append([journal_file, record, False]) # append to existing list - else: - self.txn_map[record.xid] = [[journal_file, record, False]] # create new list - def commit(self, xid): - """Perform a commit operation for the given xid record""" - mismatch_list = [] - for journal_file, record, lock in self.txn_map[xid]: - if isinstance(record, qlslibs.jrnl.EnqueueRecord): - self.enq_map.add(journal_file, record, lock) # Transfer enq to emap - else: - if self.enq_map.contains(record.dequeue_record_id): - self.enq_map.unlock(journal_file, record) - self.enq_map.delete(journal_file, record)[0].decr_enq_cnt(record) - else: - mismatch_list.append('0x%x' % record.dequeue_record_id) - del self.txn_map[xid] - return mismatch_list - def contains(self, xid): - """Return True if the xid exists in the map; False otherwise""" - return xid in self.txn_map - def delete(self, journal_file, transaction_record): - """Remove a transaction record from the map using either a commit or abort header""" - if transaction_record.magic[-1] == 'c': - return self.commit(transaction_record.xid) - if transaction_record.magic[-1] == 'a': - self.abort(transaction_record.xid) - else: - raise qlslibs.err.InvalidRecordTypeError(journal_file.file_header, transaction_record, - 'delete from Transaction Map') - def get(self, xid): - if xid in self.txn_map: - return self.txn_map[xid] - return None - def get_prepared_list(self): - """ - Prepared list is a map of xid(key) to one of None, True or False. These represent respectively: - None: prepared, but neither committed or aborted (interrupted before commit or abort) - False: prepared and aborted (interrupted before abort complete) - True: prepared and committed (interrupted before commit complete) - """ - prepared_list = {} - for xid in self.get_xid_list(): - for _, record, _ in self.txn_map[xid]: - if isinstance(record, qlslibs.jrnl.EnqueueRecord): - prepared_list[xid] = None - else: - prepared_list[xid] = record.is_transaction_complete_commit() - return prepared_list - def get_xid_list(self): - return self.txn_map.keys() - def report_str(self, args): - """Return a string containing a text report for all records in the map""" - if len(self.txn_map) == 0: - return 'No outstanding transactions found.' - rstr = '%d outstanding transaction(s)' % len(self.txn_map) - if args.show_recovered_recs: - rstr += ':' - for xid, op_list in self.txn_map.iteritems(): - rstr += '\n %s containing %d operations:' % (qlslibs.utils.format_xid(xid), len(op_list)) - for journal_file, record, _ in op_list: - rstr += '\n 0x%x:' % journal_file.file_header.file_num - rstr += record.to_string(args.show_xids, args.show_data, args.txtest) - else: - rstr += '.' - return rstr - def _find_record_id(self, xid, record_id): - """ Search for and return map list with supplied rid.""" - if xid in self.txn_map: - for txn_op in self.txn_map[xid]: - if txn_op[1].record_id == record_id: - return txn_op - for this_xid in self.txn_map.iterkeys(): - for txn_op in self.txn_map[this_xid]: - if txn_op[1].record_id == record_id: - return txn_op - return None - -class JournalStatistics(object): - """Journal statistics""" - def __init__(self): - self.total_record_count = 0 - self.transient_record_count = 0 - self.filler_record_count = 0 - self.enqueue_count = 0 - self.dequeue_count = 0 - self.transaction_record_count = 0 - self.transaction_enqueue_count = 0 - self.transaction_dequeue_count = 0 - self.transaction_commit_count = 0 - self.transaction_abort_count = 0 - self.transaction_operation_count = 0 - def __str__(self): - fstr = 'Total record count: %d\n' + \ - 'Transient record count: %d\n' + \ - 'Filler_record_count: %d\n' + \ - 'Enqueue_count: %d\n' + \ - 'Dequeue_count: %d\n' + \ - 'Transaction_record_count: %d\n' + \ - 'Transaction_enqueue_count: %d\n' + \ - 'Transaction_dequeue_count: %d\n' + \ - 'Transaction_commit_count: %d\n' + \ - 'Transaction_abort_count: %d\n' + \ - 'Transaction_operation_count: %d\n' - return fstr % (self.total_record_count, - self.transient_record_count, - self.filler_record_count, - self.enqueue_count, - self.dequeue_count, - self.transaction_record_count, - self.transaction_enqueue_count, - self.transaction_dequeue_count, - self.transaction_commit_count, - self.transaction_abort_count, - self.transaction_operation_count) - -class Journal(object): - """ - Instance of a Qpid Linear Store (QLS) journal. - """ - JRNL_SUFFIX = 'jrnl' - def __init__(self, directory, xid_prepared_list, args): - self.directory = directory - self.queue_name = os.path.basename(directory) - self.files = {} - self.file_num_list = None - self.file_num_itr = None - self.enq_map = EnqueueMap(self) - self.txn_map = TransactionMap(self.enq_map) - self.current_journal_file = None - self.first_rec_flag = None - self.statistics = JournalStatistics() - self.xid_prepared_list = xid_prepared_list # This is None for the TPL instance only - self.args = args - self.last_record_offset = None # TODO: Move into JournalFile - self.num_filler_records_required = None # TODO: Move into JournalFile - self.fill_to_offset = None - def add_record(self, record): - """Used for reconciling transactions only - called from JournalRecoveryManager._reconcile_transactions()""" - if isinstance(record, qlslibs.jrnl.EnqueueRecord) or isinstance(record, qlslibs.jrnl.DequeueRecord): - if record.xid_size > 0: - self.txn_map.add(self.current_journal_file, record) - else: - self.enq_map.add(self.current_journal_file, record, False) - elif isinstance(record, qlslibs.jrnl.TransactionRecord): - self.txn_map.delete(self.current_journal_file, record) - else: - raise qlslibs.err.InvalidRecordTypeError(self.current_journal_file, record, 'add to Journal') - def get_enq_map_record(self, rid): - return self.enq_map.get(rid) - def get_txn_map_record(self, xid): - return self.txn_map.get(xid) - def get_outstanding_txn_list(self): - return self.txn_map.get_xid_list() - def get_queue_name(self): - return self.queue_name - def recover(self, high_rid_counter): - print 'Recovering %s...' % self.queue_name, - self._analyze_files() - try: - while self._get_next_record(high_rid_counter): - pass - self._check_alignment() - except qlslibs.err.NoMoreFilesInJournalError: - print 'No more files in journal' - except qlslibs.err.FirstRecordOffsetMismatchError as err: - print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro actual=0x%08x expected=0x%08x' % \ - (err.get_expected_fro(), err.get_queue_name(), err.get_file_number(), err.get_record_offset(), - err.get_expected_fro()) - print 'done' - def reconcile_transactions(self, prepared_list, txn_flag): - xid_list = self.txn_map.get_xid_list() - if len(xid_list) > 0: - print self.queue_name, 'contains', len(xid_list), 'open transaction(s):' - for xid in xid_list: - if xid in prepared_list.keys(): - commit_flag = prepared_list[xid] - if commit_flag is None: - print ' ', qlslibs.utils.format_xid(xid), '- Assuming commit after prepare' - if txn_flag: - self.txn_map.commit(xid) - elif commit_flag: - print ' ', qlslibs.utils.format_xid(xid), '- Completing interrupted commit operation' - if txn_flag: - self.txn_map.commit(xid) - else: - print ' ', qlslibs.utils.format_xid(xid), '- Completing interrupted abort operation' - if txn_flag: - self.txn_map.abort(xid) - else: - print ' ', qlslibs.utils.format_xid(xid), '- Ignoring, not in prepared transaction list' - if txn_flag: - self.txn_map.abort(xid) - def report(self, args): - print 'Journal "%s":' % self.queue_name - print '=' * (11 + len(self.queue_name)) - if args.stats: - print str(self.statistics) - print self.enq_map.report_str(args) - print self.txn_map.report_str(args) - JournalFile.report_header() - for file_num in sorted(self.files.keys()): - self.files[file_num].report() - #TODO: move this to JournalFile, append to file info - if self.num_filler_records_required is not None and self.fill_to_offset is not None: - print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \ - (self.current_journal_file.file_header.file_num, self.last_record_offset, - self.num_filler_records_required, self.fill_to_offset) - print - #--- protected functions --- - def _analyze_files(self): - for dir_entry in os.listdir(self.directory): - dir_entry_bits = dir_entry.split('.') - if len(dir_entry_bits) == 2 and dir_entry_bits[1] == Journal.JRNL_SUFFIX: - fq_file_name = os.path.join(self.directory, dir_entry) - file_handle = open(fq_file_name) - args = qlslibs.utils.load_args(file_handle, qlslibs.jrnl.RecordHeader) - file_hdr = qlslibs.jrnl.FileHeader(*args) - file_hdr.init(file_handle, *qlslibs.utils.load_args(file_handle, qlslibs.jrnl.FileHeader)) - if file_hdr.is_header_valid(file_hdr): - file_hdr.load(file_handle) - if file_hdr.is_valid(False): - qlslibs.utils.skip(file_handle, - file_hdr.file_header_size_sblks * qlslibs.utils.DEFAULT_SBLK_SIZE) - self.files[file_hdr.file_num] = JournalFile(file_hdr) - self.file_num_list = sorted(self.files.keys()) - self.file_num_itr = iter(self.file_num_list) - def _check_alignment(self): # TODO: Move into JournalFile - if self.last_record_offset is None: # Empty file, _check_file() never run - return - remaining_sblks = self.last_record_offset % qlslibs.utils.DEFAULT_SBLK_SIZE - if remaining_sblks == 0: - self.num_filler_records_required = 0 - else: - self.num_filler_records_required = (qlslibs.utils.DEFAULT_SBLK_SIZE - remaining_sblks) / \ - qlslibs.utils.DEFAULT_DBLK_SIZE - self.fill_to_offset = self.last_record_offset + \ - (self.num_filler_records_required * qlslibs.utils.DEFAULT_DBLK_SIZE) - if self.args.show_recovery_recs or self.args.show_all_recs: - print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \ - (self.current_journal_file.file_header.file_num, self.last_record_offset, - self.num_filler_records_required, self.fill_to_offset) - def _check_file(self): - if self.current_journal_file is not None: - if not self.current_journal_file.file_header.is_end_of_file(): - return True - if self.current_journal_file.file_header.is_end_of_file(): - self.last_record_offset = self.current_journal_file.file_header.file_handle.tell() - if not self._get_next_file(): - return False - fhdr = self.current_journal_file.file_header - fhdr.file_handle.seek(fhdr.first_record_offset) - return True - def _get_next_file(self): - if self.current_journal_file is not None: - file_handle = self.current_journal_file.file_header.file_handle - if not file_handle.closed: # sanity check, should not be necessary - file_handle.close() - file_num = 0 - try: - while file_num == 0: - file_num = self.file_num_itr.next() - except StopIteration: - pass - if file_num == 0: - return False - self.current_journal_file = self.files[file_num] - self.first_rec_flag = True - if self.args.show_recovery_recs or self.args.show_all_recs: - file_header = self.current_journal_file.file_header - print '0x%x:%s' % (file_header.file_num, file_header.to_string()) - return True - def _get_next_record(self, high_rid_counter): - if not self._check_file(): - return False - self.last_record_offset = self.current_journal_file.file_header.file_handle.tell() - this_record = qlslibs.utils.load(self.current_journal_file.file_header.file_handle, qlslibs.jrnl.RecordHeader) - if not this_record.is_header_valid(self.current_journal_file.file_header): - return False - if self.first_rec_flag: - if this_record.file_offset != self.current_journal_file.file_header.first_record_offset: - raise qlslibs.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header, this_record) - self.first_rec_flag = False - self.statistics.total_record_count += 1 - start_journal_file = self.current_journal_file - if isinstance(this_record, qlslibs.jrnl.EnqueueRecord): - ok_flag = self._handle_enqueue_record(this_record, start_journal_file) - high_rid_counter.check(this_record.record_id) - if self.args.show_recovery_recs or self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, \ - this_record.to_string(self.args.show_xids, self.args.show_data, self.args.txtest)) - elif isinstance(this_record, qlslibs.jrnl.DequeueRecord): - ok_flag = self._handle_dequeue_record(this_record, start_journal_file) - high_rid_counter.check(this_record.record_id) - if self.args.show_recovery_recs or self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids, None, None)) - elif isinstance(this_record, qlslibs.jrnl.TransactionRecord): - ok_flag = self._handle_transaction_record(this_record, start_journal_file) - high_rid_counter.check(this_record.record_id) - if self.args.show_recovery_recs or self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids, None, None)) - else: - self.statistics.filler_record_count += 1 - ok_flag = True - if self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) - qlslibs.utils.skip(self.current_journal_file.file_header.file_handle, qlslibs.utils.DEFAULT_DBLK_SIZE) - return ok_flag - def _handle_enqueue_record(self, enqueue_record, start_journal_file): - while enqueue_record.load(self.current_journal_file.file_header.file_handle): - if not self._get_next_file(): - enqueue_record.truncated_flag = True - return False - if not enqueue_record.is_valid(start_journal_file): - return False - if enqueue_record.is_external() and enqueue_record.data != None: - raise qlslibs.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record) - if enqueue_record.is_transient(): - self.statistics.transient_record_count += 1 - return True - if enqueue_record.xid_size > 0: - self.txn_map.add(start_journal_file, enqueue_record) - self.statistics.transaction_operation_count += 1 - self.statistics.transaction_record_count += 1 - self.statistics.transaction_enqueue_count += 1 - else: - self.enq_map.add(start_journal_file, enqueue_record, False) - start_journal_file.incr_enq_cnt() - self.statistics.enqueue_count += 1 - return True - def _handle_dequeue_record(self, dequeue_record, start_journal_file): - while dequeue_record.load(self.current_journal_file.file_header.file_handle): - if not self._get_next_file(): - dequeue_record.truncated_flag = True - return False - if not dequeue_record.is_valid(start_journal_file): - return False - if dequeue_record.xid_size > 0: - if self.xid_prepared_list is None: # ie this is the TPL - dequeue_record.transaction_prepared_list_flag = True - elif not self.enq_map.contains(dequeue_record.dequeue_record_id): - dequeue_record.warnings.append('NOT IN EMAP') # Only for non-TPL records - self.txn_map.add(start_journal_file, dequeue_record) - self.statistics.transaction_operation_count += 1 - self.statistics.transaction_record_count += 1 - self.statistics.transaction_dequeue_count += 1 - else: - try: - self.enq_map.delete(start_journal_file, dequeue_record)[0].decr_enq_cnt(dequeue_record) - except qlslibs.err.RecordIdNotFoundError: - dequeue_record.warnings.append('NOT IN EMAP') - self.statistics.dequeue_count += 1 - return True - def _handle_transaction_record(self, transaction_record, start_journal_file): - while transaction_record.load(self.current_journal_file.file_header.file_handle): - if not self._get_next_file(): - transaction_record.truncated_flag = True - return False - if not transaction_record.is_valid(start_journal_file): - return False - if transaction_record.magic[-1] == 'a': # Abort - self.statistics.transaction_abort_count += 1 - elif transaction_record.magic[-1] == 'c': # Commit - self.statistics.transaction_commit_count += 1 - else: - raise InvalidRecordTypeError('Unknown transaction record magic \'%s\'' % transaction_record.magic) - if self.txn_map.contains(transaction_record.xid): - self.txn_map.delete(self.current_journal_file, transaction_record) - else: - transaction_record.warnings.append('NOT IN TMAP') -# if transaction_record.magic[-1] == 'c': # commits only -# self._txn_obj_list[hdr.xid] = hdr - self.statistics.transaction_record_count += 1 - return True - def _load_data(self, record): - while not record.is_complete: - record.load(self.current_journal_file.file_handle) - -class JournalFile(object): - def __init__(self, file_header): - self.file_header = file_header - self.enq_cnt = 0 - self.deq_cnt = 0 - self.num_filler_records_required = None - def incr_enq_cnt(self): - self.enq_cnt += 1 - def decr_enq_cnt(self, record): - if self.enq_cnt <= self.deq_cnt: - raise qlslibs.err.EnqueueCountUnderflowError(self.file_header, record) - self.deq_cnt += 1 - def get_enq_cnt(self): - return self.enq_cnt - self.deq_cnt - def is_outstanding_enq(self): - return self.enq_cnt > self.deq_cnt - @staticmethod - def report_header(): - print 'file_num enq_cnt p_no efp journal_file' - print '-------- ------- ---- ----- ------------' - def report(self): - comment = '<uninitialized>' if self.file_header.file_num == 0 else '' - file_num_str = '0x%x' % self.file_header.file_num - print '%8s %7d %4d %4dk %s %s' % (file_num_str, self.get_enq_cnt(), self.file_header.partition_num, - self.file_header.efp_data_size_kb, - os.path.basename(self.file_header.file_handle.name), comment) diff --git a/qpid/cpp/management/python/lib/qlslibs/efp.py b/qpid/cpp/management/python/lib/qlslibs/efp.py deleted file mode 100644 index 1c751c3d06..0000000000 --- a/qpid/cpp/management/python/lib/qlslibs/efp.py +++ /dev/null @@ -1,327 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Module: qlslibs.efp - -Contains empty file pool (EFP) classes. -""" - -import os -import os.path -import qlslibs.err -import shutil -import uuid - -class EfpManager(object): - """ - Top level class to analyze the Qpid Linear Store (QLS) directory for the partitions that make up the - Empty File Pool (EFP). - """ - def __init__(self, directory, disk_space_required_kb): - if not os.path.exists(directory): - raise qlslibs.err.InvalidQlsDirectoryNameError(directory) - self.directory = directory - self.disk_space_required_kb = disk_space_required_kb - self.efp_partitions = [] - self.efp_pools = {} - self.total_num_files = 0 - self.total_cum_file_size_kb = 0 - self.current_efp_partition = None - def add_file_pool(self, file_size_kb, num_files): - """ Add an EFP in the specified partition of the specified size containing the specified number of files """ - dir_name = EmptyFilePool.get_directory_name(file_size_kb) - print 'Adding pool \'%s\' to partition %s' % (dir_name, self.current_efp_partition.partition_number) - self.total_cum_file_size_kb += self.current_efp_partition.create_new_efp(file_size_kb, num_files) - self.total_num_files += num_files - def freshen_file_pool(self, file_size_kb, num_files): - """ Freshen an EFP in the specified partition and of the specified size to the specified number of files """ - if self.current_efp_partition is None: - partition_list = self.efp_partitions - partition_str = 'all partitions' - else: - partition_list = [self.current_efp_partition] - partition_str = 'partition %d' % self.current_efp_partition.partition_number - if file_size_kb is None: - pool_str = 'all pools' - else: - pool_str = 'pool \'%s\'' % EmptyFilePool.get_directory_name(int(file_size_kb)) - print 'Freshening %s in %s to %d files' % (pool_str, partition_str, num_files) - for self.current_efp_partition in partition_list: # Partition objects - if file_size_kb is None: - file_size_list = self.current_efp_partition.efp_pools.keys() - else: - file_size_list = ['%sk' % file_size_kb] - for file_size in file_size_list: - efp = self.current_efp_partition.efp_pools[file_size] - num_files_needed = num_files - efp.get_tot_file_count() - if num_files_needed > 0: - self.current_efp_partition.create_new_efp_files(qlslibs.utils.efp_directory_size(file_size), - num_files_needed) - else: - print ' WARNING: Pool %s in partition %s already contains %d files: no action taken' % \ - (self.current_efp_partition.efp_pools[file_size].size_str, - self.current_efp_partition.partition_number, efp.get_num_files()) - def remove_file_pool(self, file_size_kb): - """ Remove an existing EFP from the specified partition and of the specified size """ - dir_name = EmptyFilePool.get_directory_name(file_size_kb) - print 'Removing pool \'%s\' from partition %s' % (dir_name, self.current_efp_partition.partition_number) - self.efp_partitions.remove(self.current_efp_partition) - shutil.rmtree(os.path.join(self.current_efp_partition.efp_directory, dir_name)) - def report(self): - print 'Empty File Pool (EFP) report' - print '============================' - print 'Found', len(self.efp_partitions), 'partition(s)' - if (len(self.efp_partitions)) > 0: - sorted_efp_partitions = sorted(self.efp_partitions, key=lambda x: x.partition_number) - EfpPartition.print_report_table_header() - for ptn in sorted_efp_partitions: - ptn.print_report_table_line() - print - for ptn in sorted_efp_partitions: - ptn.report() - def run(self, arg_tup): - self._analyze_efp() - if arg_tup is not None: - _, arg_file_size, arg_num_files, arg_add, arg_remove, arg_freshen, arg_list = arg_tup - self._check_args(arg_tup) - if arg_add: - self.add_file_pool(int(arg_file_size), int(arg_num_files)) - if arg_remove: - self.remove_file_pool(int(arg_file_size)) - if arg_freshen: - self.freshen_file_pool(arg_file_size, int(arg_num_files)) - if arg_list: - self.report() - def _analyze_efp(self): - for dir_entry in os.listdir(self.directory): - try: - efp_partition = EfpPartition(os.path.join(self.directory, dir_entry), self.disk_space_required_kb) - efp_partition.scan() - self.efp_partitions.append(efp_partition) - for efpl in efp_partition.efp_pools.iterkeys(): - if efpl not in self.efp_pools: - self.efp_pools[efpl] = [] - self.efp_pools[efpl].append(efp_partition.efp_pools[efpl]) - self.total_num_files += efp_partition.tot_file_count - self.total_cum_file_size_kb += efp_partition.tot_file_size_kb - except qlslibs.err.InvalidPartitionDirectoryNameError: - pass - def _check_args(self, arg_tup): - """ Value check of args. The names of partitions and pools are validated against the discovered instances """ - arg_partition, arg_file_size, _, arg_add, arg_remove, arg_freshen, _ = arg_tup - if arg_partition is not None: - try: - if arg_partition[0] == 'p': # string partition name, eg 'p001' - partition_num = int(arg_partition[1:]) - else: # numeric partition, eg '1' - partition_num = int(arg_partition) - found = False - for partition in self.efp_partitions: - if partition.partition_number == partition_num: - self.current_efp_partition = partition - found = True - break - if not found: - raise qlslibs.err.PartitionDoesNotExistError(arg_partition) - except ValueError: - raise qlslibs.err.InvalidPartitionDirectoryNameError(arg_partition) - if self.current_efp_partition is not None: - pool_list = self.current_efp_partition.efp_pools.keys() - efp_directory_name = EmptyFilePool.get_directory_name(int(arg_file_size)) - if arg_add and efp_directory_name in pool_list: - raise qlslibs.err.PoolDirectoryAlreadyExistsError(efp_directory_name) - if (arg_remove or arg_freshen) and efp_directory_name not in pool_list: - raise qlslibs.err.PoolDirectoryDoesNotExistError(efp_directory_name) - -class EfpPartition(object): - """ - Class that represents a EFP partition. Each partition contains one or more Empty File Pools (EFPs). - """ - PTN_DIR_PREFIX = 'p' - EFP_DIR_NAME = 'efp' - def __init__(self, directory, disk_space_required_kb): - self.directory = directory - self.partition_number = None - self.efp_pools = {} - self.tot_file_count = 0 - self.tot_file_size_kb = 0 - self._validate_partition_directory(disk_space_required_kb) - def create_new_efp_files(self, file_size_kb, num_files): - """ Create new EFP files in this partition """ - dir_name = EmptyFilePool.get_directory_name(file_size_kb) - if dir_name in self.efp_pools.keys(): - efp = self.efp_pools[dir_name] - else: - efp = EmptyFilePool(os.path.join(self.directory, EfpPartition.EFP_DIR_NAME), dir_name) - this_tot_file_size_kb = efp.create_new_efp_files(num_files) - self.tot_file_size_kb += this_tot_file_size_kb - self.tot_file_count += num_files - return this_tot_file_size_kb - @staticmethod - def print_report_table_header(): - print 'p_no no_efp tot_files tot_size_kb directory' - print '---- ------ --------- ----------- ---------' - def print_report_table_line(self): - print '%4d %6d %9d %11d %s' % (self.partition_number, len(self.efp_pools), self.tot_file_count, - self.tot_file_size_kb, self.directory) - def report(self): - print 'Partition %s:' % os.path.basename(self.directory) - if len(self.efp_pools) > 0: - EmptyFilePool.print_report_table_header() - for dir_name in self.efp_pools.keys(): - self.efp_pools[dir_name].print_report_table_line() - else: - print '<empty - no EFPs found in this partition>' - print - def scan(self): - if os.path.exists(self.directory): - efp_dir = os.path.join(self.directory, EfpPartition.EFP_DIR_NAME) - for dir_entry in os.listdir(efp_dir): - efp = EmptyFilePool(os.path.join(efp_dir, dir_entry), self.partition_number) - efp.scan() - self.tot_file_count += efp.get_tot_file_count() - self.tot_file_size_kb += efp.get_tot_file_size_kb() - self.efp_pools[dir_entry] = efp - def _validate_partition_directory(self, disk_space_required_kb): - if os.path.basename(self.directory)[0] is not EfpPartition.PTN_DIR_PREFIX: - raise qlslibs.err.InvalidPartitionDirectoryNameError(self.directory) - try: - self.partition_number = int(os.path.basename(self.directory)[1:]) - except ValueError: - raise qlslibs.err.InvalidPartitionDirectoryNameError(self.directory) - if not qlslibs.utils.has_write_permission(self.directory): - raise qlslibs.err.WritePermissionError(self.directory) - if disk_space_required_kb is not None: - space_avail = qlslibs.utils.get_avail_disk_space(self.directory) - if space_avail < (disk_space_required_kb * 1024): - raise qlslibs.err.InsufficientSpaceOnDiskError(self.directory, space_avail, - disk_space_required_kb * 1024) - -class EmptyFilePool(object): - """ - Class that represents a single Empty File Pool within a partition. Each EFP contains pre-formatted linear store - journal files (but it may also be empty). - """ - EFP_DIR_SUFFIX = 'k' - EFP_JRNL_EXTENTION = '.jrnl' - EFP_INUSE_DIRNAME = 'in_use' - EFP_RETURNED_DIRNAME = 'returned' - def __init__(self, directory, partition_number): - self.base_dir_name = os.path.basename(directory) - self.directory = directory - self.partition_number = partition_number - self.data_size_kb = None - self.efp_files = [] - self.in_use_files = [] - self.returned_files = [] - self._validate_efp_directory() - def create_new_efp_files(self, num_files): - """ Create one or more new empty journal files of the prescribed size for this EFP """ - this_total_file_size = 0 - for _ in range(num_files): - this_total_file_size += self._create_new_efp_file() - return this_total_file_size - def get_directory(self): - return self.directory - @staticmethod - def get_directory_name(file_size_kb): - """ Static function to create an EFP directory name from the size of the files it contains """ - return '%dk' % file_size_kb - def get_tot_file_count(self): - return len(self.efp_files) - def get_tot_file_size_kb(self): - return self.data_size_kb * len(self.efp_files) - @staticmethod - def print_report_table_header(): - print ' ---------- efp ------------ --------- in_use ---------- -------- returned ---------' - print 'data_size_kb file_count tot_file_size_kb file_count tot_file_size_kb file_count tot_file_size_kb efp_directory' - print '------------ ---------- ---------------- ---------- ---------------- ---------- ---------------- -------------' - def print_report_table_line(self): - print '%12d %10d %16d %10d %16d %10d %16d %s' % (self.data_size_kb, len(self.efp_files), - self.data_size_kb * len(self.efp_files), - len(self.in_use_files), - self.data_size_kb * len(self.in_use_files), - len(self.returned_files), - self.data_size_kb * len(self.returned_files), - self.get_directory()) - def scan(self): - for efp_file in os.listdir(self.directory): - if efp_file == self.EFP_INUSE_DIRNAME: - for in_use_file in os.listdir(os.path.join(self.directory, self.EFP_INUSE_DIRNAME)): - self.in_use_files.append(in_use_file) - continue - if efp_file == self.EFP_RETURNED_DIRNAME: - for returned_file in os.listdir(os.path.join(self.directory, self.EFP_RETURNED_DIRNAME)): - self.returned_files.append(returned_file) - continue - if self._validate_efp_file(os.path.join(self.directory, efp_file)): - self.efp_files.append(efp_file) - def _add_efp_file(self, efp_file_name): - """ Add a single journal file of the appropriate size to this EFP. No file size check is made here. """ - self.efp_files.append(efp_file_name) - def _create_new_efp_file(self): - """ Create a single new empty journal file of the prescribed size for this EFP """ - file_name = str(uuid.uuid4()) + EmptyFilePool.EFP_JRNL_EXTENTION - file_header = qlslibs.jrnl.FileHeader(0, qlslibs.jrnl.FileHeader.MAGIC, qlslibs.utils.DEFAULT_RECORD_VERSION, - 0, 0, 0) - file_header.init(None, None, qlslibs.utils.DEFAULT_HEADER_SIZE_SBLKS, self.partition_number, self.data_size_kb, - 0, 0, 0, 0, 0) - efh = file_header.encode() - efh_bytes = len(efh) - file_handle = open(os.path.join(self.directory, file_name), 'wb') - file_handle.write(efh) - file_handle.write('\xff' * (qlslibs.utils.DEFAULT_SBLK_SIZE - efh_bytes)) - file_handle.write('\x00' * (int(self.data_size_kb) * 1024)) - file_handle.close() - fqfn = os.path.join(self.directory, file_name) - self._add_efp_file(fqfn) - return os.path.getsize(fqfn) - def _validate_efp_directory(self): - if self.base_dir_name[-1] is not EmptyFilePool.EFP_DIR_SUFFIX: - raise qlslibs.err.InvalidEfpDirectoryNameError(self.directory) - try: - self.data_size_kb = int(os.path.basename(self.base_dir_name)[:-1]) - except ValueError: - raise qlslibs.err.InvalidEfpDirectoryNameError(self.directory) - def _validate_efp_file(self, efp_file): - file_size = os.path.getsize(efp_file) - expected_file_size = (self.data_size_kb * 1024) + qlslibs.utils.DEFAULT_SBLK_SIZE - if file_size != expected_file_size: - print 'WARNING: File %s not of correct size (size=%d, expected=%d): Ignoring' % (efp_file, file_size, - expected_file_size) - return False - file_handle = open(efp_file) - args = qlslibs.utils.load_args(file_handle, qlslibs.jrnl.RecordHeader) - file_hdr = qlslibs.jrnl.FileHeader(*args) - file_hdr.init(file_handle, *qlslibs.utils.load_args(file_handle, qlslibs.jrnl.FileHeader)) - if not file_hdr.is_header_valid(file_hdr): - file_handle.close() - return False - file_hdr.load(file_handle) - file_handle.close() - if not file_hdr.is_valid(True): - return False - return True - - -# ============================================================================= - -if __name__ == "__main__": - print "This is a library, and cannot be executed." diff --git a/qpid/cpp/management/python/lib/qlslibs/err.py b/qpid/cpp/management/python/lib/qlslibs/err.py deleted file mode 100644 index f47632ce6a..0000000000 --- a/qpid/cpp/management/python/lib/qlslibs/err.py +++ /dev/null @@ -1,261 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Module: qlslibs.err - -Contains error classes. -""" - -# --- Parent classes - -class QlsError(Exception): - """Base error class for QLS errors and exceptions""" - def __init__(self): - Exception.__init__(self) - def __str__(self): - return '' - -class QlsRecordError(QlsError): - """Base error class for individual records""" - def __init__(self, file_header, record): - QlsError.__init__(self) - self.file_header = file_header - self.record = record - def get_expected_fro(self): - return self.file_header.first_record_offset - def get_file_number(self): - return self.file_header.file_num - def get_queue_name(self): - return self.file_header.queue_name - def get_record_id(self): - return self.record.record_id - def get_record_offset(self): - return self.record.file_offset - def __str__(self): - return 'queue="%s" file_id=0x%x record_offset=0x%x record_id=0x%x' % \ - (self.file_header.queue_name, self.file_header.file_num, self.record.file_offset, self.record.record_id) - -# --- Error classes - -class AlreadyLockedError(QlsRecordError): - """Transactional record to be locked is already locked""" - def __init__(self, file_header, record): - QlsRecordError.__init__(self, file_header, record) - def __str__(self): - return 'Transactional operation already locked in TransactionMap: ' + QlsRecordError.__str__(self) - -class DataSizeError(QlsError): - """Error class for Data size mismatch""" - def __init__(self, expected_size, actual_size, data_str): - QlsError.__init__(self) - self.expected_size = expected_size - self.actual_size = actual_size - self.xid_str = data_str - def __str__(self): - return 'Inconsistent data size: expected:%d; actual:%d; data="%s"' % \ - (self.expected_size, self.actual_size, self.data_str) - -class DuplicateRecordIdError(QlsRecordError): - """Duplicate Record Id in Enqueue Map""" - def __init__(self, file_header, record): - QlsRecordError.__init__(self, file_header, record) - def __str__(self): - return 'Duplicate Record Id in enqueue map: ' + QlsRecordError.__str__(self) - -class EnqueueCountUnderflowError(QlsRecordError): - """Attempted to decrement enqueue count past 0""" - def __init__(self, file_header, record): - QlsRecordError.__init__(self, file_header, record) - def __str__(self): - return 'Enqueue record count underflow: ' + QlsRecordError.__str__(self) - -class ExternalDataError(QlsRecordError): - """Data present in Enqueue record when external data flag is set""" - def __init__(self, file_header, record): - QlsRecordError.__init__(self, file_header, record) - def __str__(self): - return 'Data present in external data record: ' + QlsRecordError.__str__(self) - -class FirstRecordOffsetMismatchError(QlsRecordError): - """First Record Offset (FRO) does not match file header""" - def __init__(self, file_header, record): - QlsRecordError.__init__(self, file_header, record) - def __str__(self): - return 'First record offset mismatch: ' + QlsRecordError.__str__(self) + ' expected_offset=0x%x' % \ - self.file_header.first_record_offset - -class InsufficientSpaceOnDiskError(QlsError): - """Insufficient space on disk""" - def __init__(self, directory, space_avail, space_requried): - QlsError.__init__(self) - self.directory = directory - self.space_avail = space_avail - self.space_required = space_requried - def __str__(self): - return 'Insufficient space on disk: directory=%s; avail_space=%d required_space=%d' % \ - (self.directory, self.space_avail, self.space_required) - -class InvalidClassError(QlsError): - """Invalid class name or type""" - def __init__(self, class_name): - QlsError.__init__(self) - self.class_name = class_name - def __str__(self): - return 'Invalid class name "%s"' % self.class_name - -class InvalidEfpDirectoryNameError(QlsError): - """Invalid EFP directory name - should be NNNNk, where NNNN is a number (of any length)""" - def __init__(self, directory_name): - QlsError.__init__(self) - self.directory_name = directory_name - def __str__(self): - return 'Invalid EFP directory name "%s"' % self.directory_name - -#class InvalidFileSizeString(QlsError): -# """Invalid file size string""" -# def __init__(self, file_size_string): -# QlsError.__init__(self) -# self.file_size_string = file_size_string -# def __str__(self): -# return 'Invalid file size string "%s"' % self.file_size_string - -class InvalidPartitionDirectoryNameError(QlsError): - """Invalid EFP partition name - should be pNNN, where NNN is a 3-digit partition number""" - def __init__(self, directory_name): - QlsError.__init__(self) - self.directory_name = directory_name - def __str__(self): - return 'Invalid partition directory name "%s"' % self.directory_name - -class InvalidQlsDirectoryNameError(QlsError): - """Invalid QLS directory name""" - def __init__(self, directory_name): - QlsError.__init__(self) - self.directory_name = directory_name - def __str__(self): - return 'Invalid QLS directory name "%s"' % self.directory_name - -class InvalidRecordTypeError(QlsRecordError): - """Error class for any operation using an invalid record type""" - def __init__(self, file_header, record, error_msg): - QlsRecordError.__init__(self, file_header, record) - self.error_msg = error_msg - def __str__(self): - return 'Invalid record type: ' + QlsRecordError.__str__(self) + ':' + self.error_msg - -class InvalidRecordVersionError(QlsRecordError): - """Invalid record version""" - def __init__(self, file_header, record, expected_version): - QlsRecordError.__init__(self, file_header, record) - self.expected_version = expected_version - def __str__(self): - return 'Invalid record version: queue="%s" ' + QlsRecordError.__str__(self) + \ - ' ver_found=0x%x ver_expected=0x%x' % (self.record_header.version, self.expected_version) - -class NoMoreFilesInJournalError(QlsError): - """Raised when trying to obtain the next file in the journal and there are no more files""" - def __init__(self, queue_name): - QlsError.__init__(self) - self.queue_name = queue_name - def __str__(self): - return 'No more journal files in queue "%s"' % self.queue_name - -class NonTransactionalRecordError(QlsRecordError): - """Transactional operation on non-transactional record""" - def __init__(self, file_header, record, operation): - QlsRecordError.__init__(self, file_header, record) - self.operation = operation - def __str__(self): - return 'Transactional operation on non-transactional record: ' + QlsRecordError.__str__() + \ - ' operation=%s' % self.operation - -class PartitionDoesNotExistError(QlsError): - """Partition name does not exist on disk""" - def __init__(self, partition_directory): - QlsError.__init__(self) - self.partition_directory = partition_directory - def __str__(self): - return 'Partition %s does not exist' % self.partition_directory - -class PoolDirectoryAlreadyExistsError(QlsError): - """Pool directory already exists""" - def __init__(self, pool_directory): - QlsError.__init__(self) - self.pool_directory = pool_directory - def __str__(self): - return 'Pool directory %s already exists' % self.pool_directory - -class PoolDirectoryDoesNotExistError(QlsError): - """Pool directory does not exist""" - def __init__(self, pool_directory): - QlsError.__init__(self) - self.pool_directory = pool_directory - def __str__(self): - return 'Pool directory %s does not exist' % self.pool_directory - -class RecordIdNotFoundError(QlsRecordError): - """Record Id not found in enqueue map""" - def __init__(self, file_header, record): - QlsRecordError.__init__(self, file_header, record) - def __str__(self): - return 'Record Id not found in enqueue map: ' + QlsRecordError.__str__() - -class RecordNotLockedError(QlsRecordError): - """Record in enqueue map is not locked""" - def __init__(self, file_header, record): - QlsRecordError.__init__(self, file_header, record) - def __str__(self): - return 'Record in enqueue map is not locked: ' + QlsRecordError.__str__() - -class UnexpectedEndOfFileError(QlsError): - """The bytes read from a file is less than that expected""" - def __init__(self, size_read, size_expected, file_offset, file_name): - QlsError.__init__(self) - self.size_read = size_read - self.size_expected = size_expected - self.file_offset = file_offset - self.file_name = file_name - def __str__(self): - return 'Tried to read %d at offset %d in file "%s"; only read %d' % \ - (self.size_read, self.file_offset, self.file_name, self.size_expected) - -class WritePermissionError(QlsError): - """No write permission""" - def __init__(self, directory): - QlsError.__init__(self) - self.directory = directory - def __str__(self): - return 'No write permission in directory %s' % self.directory - -class XidSizeError(QlsError): - """Error class for Xid size mismatch""" - def __init__(self, expected_size, actual_size, xid_str): - QlsError.__init__(self) - self.expected_size = expected_size - self.actual_size = actual_size - self.xid_str = xid_str - def __str__(self): - return 'Inconsistent xid size: expected:%d; actual:%d; xid="%s"' % \ - (self.expected_size, self.actual_size, self.xid_str) - -# ============================================================================= - -if __name__ == "__main__": - print "This is a library, and cannot be executed." diff --git a/qpid/cpp/management/python/lib/qlslibs/jrnl.py b/qpid/cpp/management/python/lib/qlslibs/jrnl.py deleted file mode 100644 index 5e65890393..0000000000 --- a/qpid/cpp/management/python/lib/qlslibs/jrnl.py +++ /dev/null @@ -1,394 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Module: qlslibs.jrnl - -Contains journal record classes. -""" - -import qlslibs.err -import qlslibs.utils -import string -import struct -import time - -class RecordHeader(object): - FORMAT = '<4s2H2Q' - def __init__(self, file_offset, magic, version, user_flags, serial, record_id): - self.file_offset = file_offset - self.magic = magic - self.version = version - self.user_flags = user_flags - self.serial = serial - self.record_id = record_id - self.warnings = [] - self.truncated_flag = False - def encode(self): - return struct.pack(RecordHeader.FORMAT, self.magic, self.version, self.user_flags, self.serial, self.record_id) - def load(self, file_handle): - pass - @staticmethod - def discriminate(args): - """Use the last char in the header magic to determine the header type""" - return CLASSES.get(args[1][-1], RecordHeader) - def is_empty(self): - """Return True if this record is empty (ie has a magic of 0x0000""" - return self.magic == '\x00'*4 - def is_header_valid(self, file_header): - """Check that this record is valid""" - if self.is_empty(): - return False - if self.magic[:3] != 'QLS' or self.magic[3] not in ['a', 'c', 'd', 'e', 'f', 'x']: - return False - if self.magic[-1] != 'x': - if self.version != qlslibs.utils.DEFAULT_RECORD_VERSION: - raise qlslibs.err.InvalidRecordVersionError(file_header, self, qlslibs.utils.DEFAULT_RECORD_VERSION) - if self.serial != file_header.serial: - return False - return True - def to_rh_string(self): - """Return string representation of this header""" - if self.is_empty(): - return '0x%08x: <empty>' % (self.file_offset) - if self.magic[-1] == 'x': - return '0x%08x: [X]' % (self.file_offset) - if self.magic[-1] in ['a', 'c', 'd', 'e', 'f', 'x']: - return '0x%08x: [%c v=%d f=0x%04x rid=0x%x]' % \ - (self.file_offset, self.magic[-1].upper(), self.version, self.user_flags, self.record_id) - return '0x%08x: <error, unknown magic "%s" (possible overwrite boundary?)>' % (self.file_offset, self.magic) - def _get_warnings(self): - warn_str = '' - for warn in self.warnings: - warn_str += '<%s>' % warn - return warn_str - -class RecordTail(object): - FORMAT = '<4sL2Q' - def __init__(self, file_handle): # TODO - clumsy, only allows reading from disk. Move all disk stuff to laod() - self.file_offset = file_handle.tell() if file_handle is not None else 0 - self.complete = False - self.read_size = struct.calcsize(RecordTail.FORMAT) - self.fbin = file_handle.read(self.read_size) if file_handle is not None else None - self.valid_flag = None - if self.fbin is not None and len(self.fbin) >= self.read_size: - self.complete = True - self.xmagic, self.checksum, self.serial, self.record_id = struct.unpack(RecordTail.FORMAT, self.fbin) - def load(self, file_handle): - """Used to continue load of RecordTail object if it is split between files""" - if not self.is_complete: - self.fbin += file_handle.read(self.read_size - len(self.fbin)) - if (len(self.fbin)) >= self.read_size: - self.complete = True - self.xmagic, self.checksum, self.serial, self.record_id = struct.unpack(RecordTail.FORMAT, self.fbin) - def is_complete(self): - return self.complete - def is_valid(self, record): - if self.valid_flag is None: - if not self.complete: - return False - self.valid_flag = qlslibs.utils.inv_str(self.xmagic) == record.magic and \ - self.serial == record.serial and \ - self.record_id == record.record_id and \ - qlslibs.utils.adler32(record.checksum_encode()) == self.checksum - return self.valid_flag - def to_string(self): - """Return a string representation of the this RecordTail instance""" - if self.valid_flag is not None: - if not self.valid_flag: - return '[INVALID RECORD TAIL]' - magic = qlslibs.utils.inv_str(self.xmagic) - magic_char = magic[-1].upper() if magic[-1] in string.printable else '?' - return '[%c cs=0x%08x rid=0x%x]' % (magic_char, self.checksum, self.record_id) - -class FileHeader(RecordHeader): - FORMAT = '<2H4x5QH' - MAGIC = 'QLSf' - def init(self, file_handle, _, file_header_size_sblks, partition_num, efp_data_size_kb, first_record_offset, - timestamp_sec, timestamp_ns, file_num, queue_name_len): - self.file_handle = file_handle - self.file_header_size_sblks = file_header_size_sblks - self.partition_num = partition_num - self.efp_data_size_kb = efp_data_size_kb - self.first_record_offset = first_record_offset - self.timestamp_sec = timestamp_sec - self.timestamp_ns = timestamp_ns - self.file_num = file_num - self.queue_name_len = queue_name_len - self.queue_name = None - def encode(self): - if self.queue_name is None: - return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.file_header_size_sblks, \ - self.partition_num, self.efp_data_size_kb, \ - self.first_record_offset, self.timestamp_sec, \ - self.timestamp_ns, self.file_num, 0) - return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.file_header_size_sblks, self.partition_num, \ - self.efp_data_size_kb, self.first_record_offset, \ - self.timestamp_sec, self.timestamp_ns, self.file_num, \ - self.queue_name_len) + self.queue_name - def get_file_size(self): - """Sum of file header size and data size""" - return (self.file_header_size_sblks * qlslibs.utils.DEFAULT_SBLK_SIZE) + (self.efp_data_size_kb * 1024) - def load(self, file_handle): - self.queue_name = file_handle.read(self.queue_name_len) - def is_end_of_file(self): - return self.file_handle.tell() >= self.get_file_size() - def is_valid(self, is_empty): - if not RecordHeader.is_header_valid(self, self): - return False - if self.file_handle is None or self.file_header_size_sblks == 0 or self.partition_num == 0 or \ - self.efp_data_size_kb == 0: - return False - if is_empty: - if self.first_record_offset != 0 or self.timestamp_sec != 0 or self.timestamp_ns != 0 or \ - self.file_num != 0 or self.queue_name_len != 0: - return False - else: - if self.first_record_offset == 0 or self.timestamp_sec == 0 or self.timestamp_ns == 0 or \ - self.file_num == 0 or self.queue_name_len == 0: - return False - if self.queue_name is None: - return False - if len(self.queue_name) != self.queue_name_len: - return False - return True - def timestamp_str(self): - """Get the timestamp of this record in string format""" - now = time.gmtime(self.timestamp_sec) - fstr = '%%a %%b %%d %%H:%%M:%%S.%09d %%Y' % (self.timestamp_ns) - return time.strftime(fstr, now) - def to_string(self): - """Return a string representation of the this FileHeader instance""" - return '%s fnum=0x%x fro=0x%08x p=%d s=%dk t=%s %s' % (self.to_rh_string(), self.file_num, - self.first_record_offset, self.partition_num, - self.efp_data_size_kb, self.timestamp_str(), - self._get_warnings()) - -class EnqueueRecord(RecordHeader): - FORMAT = '<2Q' - MAGIC = 'QLSe' - EXTERNAL_FLAG_MASK = 0x20 - TRANSIENT_FLAG_MASK = 0x10 - def init(self, _, xid_size, data_size): - self.xid_size = xid_size - self.data_size = data_size - self.xid = None - self.xid_complete = False - self.data = None - self.data_complete = False - self.record_tail = None - def checksum_encode(self): # encode excluding record tail - cs_bytes = RecordHeader.encode(self) + struct.pack(self.FORMAT, self.xid_size, self.data_size) - if self.xid is not None: - cs_bytes += self.xid - if self.data is not None: - cs_bytes += self.data - return cs_bytes - def is_external(self): - return self.user_flags & EnqueueRecord.EXTERNAL_FLAG_MASK > 0 - def is_transient(self): - return self.user_flags & EnqueueRecord.TRANSIENT_FLAG_MASK > 0 - def is_valid(self, journal_file): - if not RecordHeader.is_header_valid(self, journal_file.file_header): - return False - if not (self.xid_complete and self.data_complete): - return False - if self.xid_size > 0 and len(self.xid) != self.xid_size: - return False - if self.data_size > 0 and len(self.data) != self.data_size: - return False - if self.xid_size > 0 or self.data_size > 0: - if self.record_tail is None: - return False - if not self.record_tail.is_valid(self): - return False - return True - def load(self, file_handle): - """Return True when load is incomplete and must be called again with new file handle""" - self.xid, self.xid_complete = qlslibs.utils.load_data(file_handle, self.xid, self.xid_size) - if not self.xid_complete: - return True - if self.is_external(): - self.data_complete = True - else: - self.data, self.data_complete = qlslibs.utils.load_data(file_handle, self.data, self.data_size) - if not self.data_complete: - return True - if self.xid_size > 0 or self.data_size > 0: - if self.record_tail is None: - self.record_tail = RecordTail(file_handle) - elif not self.record_tail.is_complete(): - self.record_tail.load(file_handle) # Continue loading partially loaded tail - if self.record_tail.is_complete(): - self.record_tail.is_valid(self) - else: - return True - return False - def to_string(self, show_xid_flag, show_data_flag, txtest_flag): - """Return a string representation of the this EnqueueRecord instance""" - if self.truncated_flag: - return '%s xid(%d) data(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), - self.xid_size, self.data_size) - if self.record_tail is None: - record_tail_str = '' - else: - record_tail_str = self.record_tail.to_string() - return '%s %s %s %s %s %s' % (self.to_rh_string(), - qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag), - qlslibs.utils.format_data(self.data, self.data_size, show_data_flag, txtest_flag), - record_tail_str, self._print_flags(), self._get_warnings()) - def _print_flags(self): - """Utility function to decode the flags field in the header and print a string representation""" - fstr = '' - if self.is_transient(): - fstr = '[TRANSIENT' - if self.is_external(): - if len(fstr) > 0: - fstr += ',EXTERNAL' - else: - fstr = '*EXTERNAL' - if len(fstr) > 0: - fstr += ']' - return fstr - -class DequeueRecord(RecordHeader): - FORMAT = '<2Q' - MAGIC = 'QLSd' - TXN_COMPLETE_COMMIT_FLAG = 0x10 - def init(self, _, dequeue_record_id, xid_size): - self.dequeue_record_id = dequeue_record_id - self.xid_size = xid_size - self.transaction_prepared_list_flag = False - self.xid = None - self.xid_complete = False - self.record_tail = None - def checksum_encode(self): # encode excluding record tail - return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.dequeue_record_id, self.xid_size) + \ - self.xid - def is_transaction_complete_commit(self): - return self.user_flags & DequeueRecord.TXN_COMPLETE_COMMIT_FLAG > 0 - def is_valid(self, journal_file): - if not RecordHeader.is_header_valid(self, journal_file.file_header): - return False - if self.xid_size > 0: - if not self.xid_complete: - return False - if self.xid_size > 0 and len(self.xid) != self.xid_size: - return False - if self.record_tail is None: - return False - if not self.record_tail.is_valid(self): - return False - return True - def load(self, file_handle): - """Return True when load is incomplete and must be called again with new file handle""" - self.xid, self.xid_complete = qlslibs.utils.load_data(file_handle, self.xid, self.xid_size) - if not self.xid_complete: - return True - if self.xid_size > 0: - if self.record_tail is None: - self.record_tail = RecordTail(file_handle) - elif not self.record_tail.is_complete(): - self.record_tail.load(file_handle) - if self.record_tail.is_complete(): - self.record_tail.is_valid(self) - else: - return True - return False - def to_string(self, show_xid_flag, _u1, _u2): - """Return a string representation of the this DequeueRecord instance""" - if self.truncated_flag: - return '%s xid(%d) drid=0x%x [Truncated, no more files in journal]' % (RecordHeader.__str__(self), - self.xid_size, - self.dequeue_record_id) - if self.record_tail is None: - record_tail_str = '' - else: - record_tail_str = self.record_tail.to_string() - return '%s drid=0x%x %s %s %s %s' % (self.to_rh_string(), self.dequeue_record_id, - qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag), - record_tail_str, self._print_flags(), self._get_warnings()) - def _print_flags(self): - """Utility function to decode the flags field in the header and print a string representation""" - if self.transaction_prepared_list_flag: - if self.is_transaction_complete_commit(): - return '[COMMIT]' - else: - return '[ABORT]' - return '' - -class TransactionRecord(RecordHeader): - FORMAT = '<Q' - MAGIC_ABORT = 'QLSa' - MAGIC_COMMIT = 'QLSc' - def init(self, _, xid_size): - self.xid_size = xid_size - self.xid = None - self.xid_complete = False - self.record_tail = None - def checksum_encode(self): # encode excluding record tail - return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.xid_size) + self.xid - def is_valid(self, journal_file): - if not RecordHeader.is_header_valid(self, journal_file.file_header): - return False - if not self.xid_complete or len(self.xid) != self.xid_size: - return False - if self.record_tail is None: - return False - if not self.record_tail.is_valid(self): - return False - return True - def load(self, file_handle): - """Return True when load is incomplete and must be called again with new file handle""" - self.xid, self.xid_complete = qlslibs.utils.load_data(file_handle, self.xid, self.xid_size) - if not self.xid_complete: - return True - if self.xid_size > 0: - if self.record_tail is None: - self.record_tail = RecordTail(file_handle) - elif not self.record_tail.is_complete(): - self.record_tail.load(file_handle) - if self.record_tail.is_complete(): - self.record_tail.is_valid(self) - else: - return True - return False - def to_string(self, show_xid_flag, _u1, _u2): - """Return a string representation of the this TransactionRecord instance""" - if self.truncated_flag: - return '%s xid(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), self.xid_size) - if self.record_tail is None: - record_tail_str = '' - else: - record_tail_str = self.record_tail.to_string() - return '%s %s %s %s' % (self.to_rh_string(), - qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag), - record_tail_str, self._get_warnings()) - -# ============================================================================= - -CLASSES = { - 'a': TransactionRecord, - 'c': TransactionRecord, - 'd': DequeueRecord, - 'e': EnqueueRecord, -} - -if __name__ == '__main__': - print 'This is a library, and cannot be executed.' diff --git a/qpid/cpp/management/python/lib/qlslibs/utils.py b/qpid/cpp/management/python/lib/qlslibs/utils.py deleted file mode 100644 index dfa760a839..0000000000 --- a/qpid/cpp/management/python/lib/qlslibs/utils.py +++ /dev/null @@ -1,216 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Module: qlslibs.utils - -Contains helper functions for qpid_qls_analyze. -""" - -import os -import qlslibs.jrnl -import stat -import string -import struct -import subprocess -import zlib - -DEFAULT_DBLK_SIZE = 128 -DEFAULT_SBLK_SIZE = 4096 # 32 dblks -DEFAULT_SBLK_SIZE_KB = DEFAULT_SBLK_SIZE / 1024 -DEFAULT_RECORD_VERSION = 2 -DEFAULT_HEADER_SIZE_SBLKS = 1 - -def adler32(data): - """return the adler32 checksum of data""" - return zlib.adler32(data) & 0xffffffff - -def create_record(magic, uflags, journal_file, record_id, dequeue_record_id, xid, data): - """Helper function to construct a record with xid, data (where applicable) and consistent tail with checksum""" - record_class = qlslibs.jrnl.CLASSES.get(magic[-1]) - record = record_class(0, magic, DEFAULT_RECORD_VERSION, uflags, journal_file.file_header.serial, record_id) - xid_length = len(xid) if xid is not None else 0 - if isinstance(record, qlslibs.jrnl.EnqueueRecord): - data_length = len(data) if data is not None else 0 - record.init(None, xid_length, data_length) - elif isinstance(record, qlslibs.jrnl.DequeueRecord): - record.init(None, dequeue_record_id, xid_length) - elif isinstance(record, qlslibs.jrnl.TransactionRecord): - record.init(None, xid_length) - else: - raise qlslibs.err.InvalidClassError(record.__class__.__name__) - if xid is not None: - record.xid = xid - record.xid_complete = True - if data is not None: - record.data = data - record.data_complete = True - record.record_tail = _mk_record_tail(record) - return record - -def efp_directory_size(directory_name): - """"Decode the directory name in the format NNNk to a numeric size, where NNN is a number string""" - try: - if directory_name[-1] == 'k': - return int(directory_name[:-1]) - except ValueError: - pass - return 0 - -def format_data(data, data_size=None, show_data_flag=True, txtest_flag=False): - """Format binary data for printing""" - return _format_binary(data, data_size, show_data_flag, 'data', qlslibs.err.DataSizeError, False, txtest_flag) - -def format_xid(xid, xid_size=None, show_xid_flag=True): - """Format binary XID for printing""" - return _format_binary(xid, xid_size, show_xid_flag, 'xid', qlslibs.err.XidSizeError, True, False) - -def get_avail_disk_space(path): - df_proc = subprocess.Popen(["df", path], stdout=subprocess.PIPE) - output = df_proc.communicate()[0] - return int(output.split('\n')[1].split()[3]) - -def has_write_permission(path): - stat_info = os.stat(path) - return bool(stat_info.st_mode & stat.S_IRGRP) - -def inv_str(in_string): - """Perform a binary 1's compliment (invert all bits) on a binary string""" - istr = '' - for index in range(0, len(in_string)): - istr += chr(~ord(in_string[index]) & 0xff) - return istr - -def load(file_handle, klass): - """Load a record of class klass from a file""" - args = load_args(file_handle, klass) - subclass = klass.discriminate(args) - result = subclass(*args) # create instance of record - if subclass != klass: - result.init(*load_args(file_handle, subclass)) - return result - -def load_args(file_handle, klass): - """Load the arguments from class klass""" - size = struct.calcsize(klass.FORMAT) - foffs = file_handle.tell(), - fbin = file_handle.read(size) - if len(fbin) != size: - raise qlslibs.err.UnexpectedEndOfFileError(len(fbin), size, foffs, file_handle.name) - return foffs + struct.unpack(klass.FORMAT, fbin) - -def load_data(file_handle, element, element_size): - """Read element_size bytes of binary data from file_handle into element""" - if element_size == 0: - return element, True - if element is None: - element = file_handle.read(element_size) - else: - read_size = element_size - len(element) - element += file_handle.read(read_size) - return element, len(element) == element_size - -def skip(file_handle, boundary): - """Read and discard disk bytes until the next multiple of boundary""" - if not file_handle.closed: - file_handle.read(_rem_bytes_in_block(file_handle, boundary)) - -#--- protected functions --- - -def _format_binary(bin_str, bin_size, show_bin_flag, prefix, err_class, hex_num_flag, txtest_flag): - """Format binary XID for printing""" - if bin_str is None and bin_size is not None: - if bin_size > 0: - raise err_class(bin_size, len(bin_str), bin_str) - return '' - if bin_size is None: - bin_size = len(bin_str) - elif bin_size != len(bin_str): - raise err_class(bin_size, len(bin_str), bin_str) - out_str = '%s(%d)' % (prefix, bin_size) - if txtest_flag: - out_str += '=\'%s\'' % _txtest_msg_str(bin_str) - elif show_bin_flag: - if _is_printable(bin_str): - binstr = '"%s"' % _split_str(bin_str) - elif hex_num_flag: - binstr = '0x%s' % _str_to_hex_num(bin_str) - else: - binstr = _hex_split_str(bin_str, 50, 10, 10) - out_str += '=\'%s\'' % binstr - return out_str - -def _hex_str(in_str, begin, end): - """Return a binary string as a hex string""" - hstr = '' - for index in range(begin, end): - if _is_printable(in_str[index]): - hstr += in_str[index] - else: - hstr += '\\%02x' % ord(in_str[index]) - return hstr - -def _hex_split_str(in_str, split_size, head_size, tail_size): - """Split a hex string into two parts separated by an ellipsis""" - if len(in_str) <= split_size: - return _hex_str(in_str, 0, len(in_str)) - return _hex_str(in_str, 0, head_size) + ' ... ' + _hex_str(in_str, len(in_str)-tail_size, len(in_str)) - -def _txtest_msg_str(bin_str): - """Extract the message number used in qpid-txtest""" - msg_index = bin_str.find('msg') - if msg_index >= 0: - end_index = bin_str.find('\x00', msg_index) - assert end_index >= 0 - return bin_str[msg_index:end_index] - return None - -def _is_printable(in_str): - """Return True if in_str in printable; False otherwise.""" - for this_char in in_str: - if this_char not in string.letters and this_char not in string.digits and this_char not in string.punctuation: - return False - return True - -def _mk_record_tail(record): - record_tail = qlslibs.jrnl.RecordTail(None) - record_tail.xmagic = inv_str(record.magic) - record_tail.checksum = adler32(record.checksum_encode()) - record_tail.serial = record.serial - record_tail.record_id = record.record_id - return record_tail - -def _rem_bytes_in_block(file_handle, block_size): - """Return the remaining bytes in a block""" - foffs = file_handle.tell() - return (_size_in_blocks(foffs, block_size) * block_size) - foffs - -def _size_in_blocks(size, block_size): - """Return the size in terms of data blocks""" - return int((size + block_size - 1) / block_size) - -def _split_str(in_str, split_size = 50): - """Split a string into two parts separated by an ellipsis if it is longer than split_size""" - if len(in_str) < split_size: - return in_str - return in_str[:25] + ' ... ' + in_str[-25:] - -def _str_to_hex_num(in_str): - """Turn a string into a hex number representation, little endian assumed (ie LSB is first, MSB is last)""" - return ''.join(x.encode('hex') for x in reversed(in_str)) diff --git a/qpid/cpp/management/python/lib/qmf/__init__.py b/qpid/cpp/management/python/lib/qmf/__init__.py deleted file mode 100644 index 31d5a2ef58..0000000000 --- a/qpid/cpp/management/python/lib/qmf/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# diff --git a/qpid/cpp/management/python/lib/qmf/console.py b/qpid/cpp/management/python/lib/qmf/console.py deleted file mode 100644 index 405c5dcb62..0000000000 --- a/qpid/cpp/management/python/lib/qmf/console.py +++ /dev/null @@ -1,4054 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" Console API for Qpid Management Framework """ - -import os -import platform -import qpid -import struct -import socket -import re -import sys -from qpid.datatypes import UUID -from qpid.datatypes import timestamp -from qpid.datatypes import datetime -from qpid.exceptions import Closed -from qpid.session import SessionDetached -from qpid.connection import Connection, ConnectionFailed, Timeout -from qpid.datatypes import Message, RangedSet, UUID -from qpid.util import connect, ssl, URL -from qpid.codec010 import StringCodec as Codec -from threading import Lock, Condition, Thread, Semaphore -from Queue import Queue, Empty -from time import time, strftime, gmtime, sleep -from cStringIO import StringIO - -#import qpid.log -#qpid.log.enable(name="qpid.io.cmd", level=qpid.log.DEBUG) - -#=================================================================================================== -# CONSOLE -#=================================================================================================== -class Console: - """ To access the asynchronous operations, a class must be derived from - Console with overrides of any combination of the available methods. """ - - def brokerConnected(self, broker): - """ Invoked when a connection is established to a broker """ - pass - - def brokerConnectionFailed(self, broker): - """ Invoked when a connection to a broker fails """ - pass - - def brokerDisconnected(self, broker): - """ Invoked when the connection to a broker is lost """ - pass - - def newPackage(self, name): - """ Invoked when a QMF package is discovered. """ - pass - - def newClass(self, kind, classKey): - """ Invoked when a new class is discovered. Session.getSchema can be - used to obtain details about the class.""" - pass - - def newAgent(self, agent): - """ Invoked when a QMF agent is discovered. """ - pass - - def delAgent(self, agent): - """ Invoked when a QMF agent disconects. """ - pass - - def objectProps(self, broker, record): - """ Invoked when an object is updated. """ - pass - - def objectStats(self, broker, record): - """ Invoked when an object is updated. """ - pass - - def event(self, broker, event): - """ Invoked when an event is raised. """ - pass - - def heartbeat(self, agent, timestamp): - """ Invoked when an agent heartbeat is received. """ - pass - - def brokerInfo(self, broker): - """ Invoked when the connection sequence reaches the point where broker information is available. """ - pass - - def methodResponse(self, broker, seq, response): - """ Invoked when a method response from an asynchronous method call is received. """ - pass - - -#=================================================================================================== -# BrokerURL -#=================================================================================================== -class BrokerURL(URL): - def __init__(self, *args, **kwargs): - URL.__init__(self, *args, **kwargs) - if self.port is None: - if self.scheme == URL.AMQPS: - self.port = 5671 - else: - self.port = 5672 - self.authName = None - self.authPass = None - if self.user: - self.authName = str(self.user) - if self.password: - self.authPass = str(self.password) - - def name(self): - return str(self) - - def match(self, host, port): - return socket.getaddrinfo(self.host, self.port)[0][4] == socket.getaddrinfo(host, port)[0][4] - -#=================================================================================================== -# Object -#=================================================================================================== -class Object(object): - """ - This class defines a 'proxy' object representing a real managed object on an agent. - Actions taken on this proxy are remotely affected on the real managed object. - """ - def __init__(self, agent, schema, codec=None, prop=None, stat=None, v2Map=None, agentName=None, kwargs={}): - self._agent = agent - self._session = None - self._broker = None - if agent: - self._session = agent.session - self._broker = agent.broker - self._schema = schema - self._properties = [] - self._statistics = [] - self._currentTime = None - self._createTime = None - self._deleteTime = 0 - self._objectId = None - if v2Map: - self.v2Init(v2Map, agentName) - return - - if self._agent: - self._currentTime = codec.read_uint64() - self._createTime = codec.read_uint64() - self._deleteTime = codec.read_uint64() - self._objectId = ObjectId(codec) - if codec: - if prop: - notPresent = self._parsePresenceMasks(codec, schema) - for property in schema.getProperties(): - if property.name in notPresent: - self._properties.append((property, None)) - else: - self._properties.append((property, self._session._decodeValue(codec, property.type, self._broker))) - if stat: - for statistic in schema.getStatistics(): - self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, self._broker))) - else: - for property in schema.getProperties(): - if property.optional: - self._properties.append((property, None)) - else: - self._properties.append((property, self._session._defaultValue(property, self._broker, kwargs))) - for statistic in schema.getStatistics(): - self._statistics.append((statistic, self._session._defaultValue(statistic, self._broker, kwargs))) - - def v2Init(self, omap, agentName): - if omap.__class__ != dict: - raise Exception("QMFv2 object data must be a map/dict") - if '_values' not in omap: - raise Exception("QMFv2 object must have '_values' element") - - values = omap['_values'] - for prop in self._schema.getProperties(): - if prop.name in values: - if prop.type == 10: # Reference - self._properties.append((prop, ObjectId(values[prop.name], agentName=agentName))) - else: - self._properties.append((prop, values[prop.name])) - for stat in self._schema.getStatistics(): - if stat.name in values: - self._statistics.append((stat, values[stat.name])) - if '_subtypes' in omap: - self._subtypes = omap['_subtypes'] - if '_object_id' in omap: - self._objectId = ObjectId(omap['_object_id'], agentName=agentName) - else: - self._objectId = None - - self._currentTime = omap.get("_update_ts", 0) - self._createTime = omap.get("_create_ts", 0) - self._deleteTime = omap.get("_delete_ts", 0) - - def getAgent(self): - """ Return the agent from which this object was sent """ - return self._agent - - def getBroker(self): - """ Return the broker from which this object was sent """ - return self._broker - - def getV2RoutingKey(self): - """ Get the QMFv2 routing key to address this object """ - return self._agent.getV2RoutingKey() - - def getObjectId(self): - """ Return the object identifier for this object """ - return self._objectId - - def getClassKey(self): - """ Return the class-key that references the schema describing this object. """ - return self._schema.getKey() - - def getSchema(self): - """ Return the schema that describes this object. """ - return self._schema - - def getMethods(self): - """ Return a list of methods available for this object. """ - return self._schema.getMethods() - - def getTimestamps(self): - """ Return the current, creation, and deletion times for this object. """ - return self._currentTime, self._createTime, self._deleteTime - - def isDeleted(self): - """ Return True iff this object has been deleted. """ - return self._deleteTime != 0 - - def isManaged(self): - """ Return True iff this object is a proxy for a managed object on an agent. """ - return self._objectId and self._agent - - def getIndex(self): - """ Return a string describing this object's primary key. """ - if self._objectId.isV2: - return self._objectId.getObject() - result = u"" - for prop, value in self._properties: - if prop.index: - if result != u"": - result += u":" - try: - valstr = unicode(self._session._displayValue(value, prop.type)) - except Exception, e: - valstr = u"<undecodable>" - result += valstr - return result - - def getProperties(self): - """ Return a list of object properties """ - return self._properties - - def getStatistics(self): - """ Return a list of object statistics """ - return self._statistics - - def mergeUpdate(self, newer): - """ Replace properties and/or statistics with a newly received update """ - if not self.isManaged(): - raise Exception("Object is not managed") - if self._objectId != newer._objectId: - raise Exception("Objects with different object-ids") - if len(newer.getProperties()) > 0: - self._properties = newer.getProperties() - if len(newer.getStatistics()) > 0: - self._statistics = newer.getStatistics() - self._currentTime = newer._currentTime - self._deleteTime = newer._deleteTime - - def update(self): - """ Contact the agent and retrieve the lastest property and statistic values for this object. """ - if not self.isManaged(): - raise Exception("Object is not managed") - obj = self._agent.getObjects(_objectId=self._objectId) - if obj: - self.mergeUpdate(obj[0]) - else: - raise Exception("Underlying object no longer exists") - - def __repr__(self): - if self.isManaged(): - id = self.getObjectId().__repr__() - else: - id = "unmanaged" - key = self.getClassKey() - return key.getPackageName() + ":" + key.getClassName() +\ - "[" + id + "] " + self.getIndex().encode("utf8") - - def __getattr__(self, name): - for method in self._schema.getMethods(): - if name == method.name: - return lambda *args, **kwargs : self._invoke(name, args, kwargs) - for prop, value in self._properties: - if name == prop.name: - return value - if name == "_" + prop.name + "_" and prop.type == 10: # Dereference references - deref = self._agent.getObjects(_objectId=value) - if len(deref) != 1: - return None - else: - return deref[0] - for stat, value in self._statistics: - if name == stat.name: - return value - - # - # Check to see if the name is in the schema. If so, return None (i.e. this is a not-present attribute) - # - for prop in self._schema.getProperties(): - if name == prop.name: - return None - for stat in self._schema.getStatistics(): - if name == stat.name: - return None - raise Exception("Type Object has no attribute '%s'" % name) - - def __setattr__(self, name, value): - if name[0] == '_': - super.__setattr__(self, name, value) - return - - for prop, unusedValue in self._properties: - if name == prop.name: - newprop = (prop, value) - newlist = [] - for old, val in self._properties: - if name == old.name: - newlist.append(newprop) - else: - newlist.append((old, val)) - self._properties = newlist - return - super.__setattr__(self, name, value) - - def _parseDefault(self, typ, val): - try: - if typ in (2, 3, 4): # 16, 32, 64 bit numbers - val = int(val, 0) - elif typ == 11: # bool - val = val.lower() in ("t", "true", "1", "yes", "y") - elif typ == 15: # map - val = eval(val) - except: - pass - return val - - def _handleDefaultArguments(self, method, args, kwargs): - count = len([x for x in method.arguments if x.dir.find("I") != -1]) - for kwarg in kwargs.keys(): - if not [x for x in method.arguments if x.dir.find("I") != -1 and \ - x.name == kwarg]: - del kwargs[kwarg] - - # If there were not enough args supplied, add any defaulted arguments - # from the schema (starting at the end) until we either get enough - # arguments or run out of defaults - while count > len(args) + len(kwargs): - for arg in reversed(method.arguments): - if arg.dir.find("I") != -1 and getattr(arg, "default") is not None and \ - arg.name not in kwargs: - # add missing defaulted value to the kwargs dict - kwargs[arg.name] = self._parseDefault(arg.type, arg.default) - break - else: - # no suitable defaulted args found, end the while loop - break - - return count - - def _sendMethodRequest(self, name, args, kwargs, synchronous=False, timeWait=None): - for method in self._schema.getMethods(): - if name == method.name: - aIdx = 0 - sendCodec = Codec() - seq = self._session.seqMgr._reserve((method, synchronous)) - - count = self._handleDefaultArguments(method, args, kwargs) - if count != len(args) + len(kwargs): - raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args) + len(kwargs))) - - if self._agent.isV2: - # - # Compose and send a QMFv2 method request - # - call = {} - call['_object_id'] = self._objectId.asMap() - call['_method_name'] = name - argMap = {} - for arg in method.arguments: - if arg.dir.find("I") != -1: - # If any kwargs match this schema arg, insert them in the proper place - if arg.name in kwargs: - argMap[arg.name] = kwargs[arg.name] - elif aIdx < len(args): - argMap[arg.name] = args[aIdx] - aIdx += 1 - call['_arguments'] = argMap - - dp = self._broker.amqpSession.delivery_properties() - dp.routing_key = self.getV2RoutingKey() - mp = self._broker.amqpSession.message_properties() - mp.content_type = "amqp/map" - if self._broker.saslUser: - mp.user_id = self._broker.saslUser - mp.correlation_id = str(seq) - mp.app_id = "qmf2" - mp.reply_to = self._broker.amqpSession.reply_to("qmf.default.direct", self._broker.v2_direct_queue) - mp.application_headers = {'qmf.opcode':'_method_request'} - sendCodec.write_map(call) - smsg = Message(dp, mp, sendCodec.encoded) - exchange = "qmf.default.direct" - - else: - # - # Associate this sequence with the agent hosting the object so we can correctly - # route the method-response - # - agent = self._broker.getAgent(self._broker.getBrokerBank(), self._objectId.getAgentBank()) - self._broker._setSequence(seq, agent) - - # - # Compose and send a QMFv1 method request - # - self._broker._setHeader(sendCodec, 'M', seq) - self._objectId.encode(sendCodec) - self._schema.getKey().encode(sendCodec) - sendCodec.write_str8(name) - - for arg in method.arguments: - if arg.dir.find("I") != -1: - self._session._encodeValue(sendCodec, args[aIdx], arg.type) - aIdx += 1 - smsg = self._broker._message(sendCodec.encoded, "agent.%d.%s" % - (self._objectId.getBrokerBank(), self._objectId.getAgentBank())) - exchange = "qpid.management" - - if synchronous: - try: - self._broker.cv.acquire() - self._broker.syncInFlight = True - finally: - self._broker.cv.release() - self._broker._send(smsg, exchange) - return seq - return None - - def _invoke(self, name, args, kwargs): - if not self.isManaged(): - raise Exception("Object is not managed") - if "_timeout" in kwargs: - timeout = kwargs["_timeout"] - else: - timeout = self._broker.SYNC_TIME - - if "_async" in kwargs and kwargs["_async"]: - sync = False - if "_timeout" not in kwargs: - timeout = None - else: - sync = True - - # Remove special "meta" kwargs before handing to _sendMethodRequest() to process - if "_timeout" in kwargs: del kwargs["_timeout"] - if "_async" in kwargs: del kwargs["_async"] - - seq = self._sendMethodRequest(name, args, kwargs, sync, timeout) - if seq: - if not sync: - return seq - self._broker.cv.acquire() - try: - starttime = time() - while self._broker.syncInFlight and self._broker.error == None: - self._broker.cv.wait(timeout) - if time() - starttime > timeout: - raise RuntimeError("Timed out waiting for method to respond") - finally: - self._session.seqMgr._release(seq) - self._broker.cv.release() - if self._broker.error != None: - errorText = self._broker.error - self._broker.error = None - raise Exception(errorText) - return self._broker.syncResult - raise Exception("Invalid Method (software defect) [%s]" % name) - - def _encodeUnmanaged(self, codec): - codec.write_uint8(20) - codec.write_str8(self._schema.getKey().getPackageName()) - codec.write_str8(self._schema.getKey().getClassName()) - codec.write_bin128(self._schema.getKey().getHash()) - - # emit presence masks for optional properties - mask = 0 - bit = 0 - for prop, value in self._properties: - if prop.optional: - if bit == 0: - bit = 1 - if value: - mask |= bit - bit = bit << 1 - if bit == 256: - bit = 0 - codec.write_uint8(mask) - mask = 0 - if bit != 0: - codec.write_uint8(mask) - - # encode properties - for prop, value in self._properties: - if value != None: - self._session._encodeValue(codec, value, prop.type) - - # encode statistics - for stat, value in self._statistics: - self._session._encodeValue(codec, value, stat.type) - - def _parsePresenceMasks(self, codec, schema): - excludeList = [] - bit = 0 - for property in schema.getProperties(): - if property.optional: - if bit == 0: - mask = codec.read_uint8() - bit = 1 - if (mask & bit) == 0: - excludeList.append(property.name) - bit *= 2 - if bit == 256: - bit = 0 - return excludeList - - -#=================================================================================================== -# Session -#=================================================================================================== -class Session: - """ - An instance of the Session class represents a console session running - against one or more QMF brokers. A single instance of Session is needed - to interact with the management framework as a console. - """ - _CONTEXT_SYNC = 1 - _CONTEXT_STARTUP = 2 - _CONTEXT_MULTIGET = 3 - - DEFAULT_GET_WAIT_TIME = 60 - - ENCODINGS = { - str: 7, - timestamp: 8, - datetime: 8, - int: 9, - long: 9, - float: 13, - UUID: 14, - Object: 20, - list: 21 - } - - - def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True, - manageConnections=False, userBindings=False): - """ - Initialize a session. If the console argument is provided, the - more advanced asynchronous features are available. If console is - defaulted, the session will operate in a simpler, synchronous manner. - - The rcvObjects, rcvEvents, and rcvHeartbeats arguments are meaningful only if 'console' - is provided. They control whether object updates, events, and agent-heartbeats are - subscribed to. If the console is not interested in receiving one or more of the above, - setting the argument to False will reduce tha bandwidth used by the API. - - If manageConnections is set to True, the Session object will manage connections to - the brokers. This means that if a broker is unreachable, it will retry until a connection - can be established. If a connection is lost, the Session will attempt to reconnect. - - If manageConnections is set to False, the user is responsible for handing failures. In - this case, an unreachable broker will cause addBroker to raise an exception. - - If userBindings is set to False (the default) and rcvObjects is True, the console will - receive data for all object classes. If userBindings is set to True, the user must select - which classes the console shall receive by invoking the bindPackage or bindClass methods. - This allows the console to be configured to receive only information that is relavant to - a particular application. If rcvObjects id False, userBindings has no meaning. - """ - self.console = console - self.brokers = [] - self.schemaCache = SchemaCache() - self.seqMgr = SequenceManager() - self.cv = Condition() - self.syncSequenceList = [] - self.getResult = [] - self.getSelect = [] - self.error = None - self.rcvObjects = rcvObjects - self.rcvEvents = rcvEvents - self.rcvHeartbeats = rcvHeartbeats - self.userBindings = userBindings - if self.console == None: - self.rcvObjects = False - self.rcvEvents = False - self.rcvHeartbeats = False - self.v1BindingKeyList, self.v2BindingKeyList = self._bindingKeys() - self.manageConnections = manageConnections - # callback filters: - self.agent_filter = [] # (vendor, product, instance) || v1-agent-label-str - self.class_filter = [] # (pkg, class) - self.event_filter = [] # (pkg, event) - self.agent_heartbeat_min = 10 # minimum agent heartbeat timeout interval - self.agent_heartbeat_miss = 3 # # of heartbeats to miss before deleting agent - - if self.userBindings and not self.console: - raise Exception("userBindings can't be set unless a console is provided.") - - def close(self): - """ Releases all resources held by the session. Must be called by the - application when it is done with the Session object. - """ - self.cv.acquire() - try: - while len(self.brokers): - b = self.brokers.pop() - try: - b._shutdown() - except: - pass - finally: - self.cv.release() - - def _getBrokerForAgentAddr(self, agent_addr): - try: - self.cv.acquire() - key = (1, agent_addr) - for b in self.brokers: - if key in b.agents: - return b - finally: - self.cv.release() - return None - - - def _getAgentForAgentAddr(self, agent_addr): - try: - self.cv.acquire() - key = agent_addr - for b in self.brokers: - if key in b.agents: - return b.agents[key] - finally: - self.cv.release() - return None - - - def __repr__(self): - return "QMF Console Session Manager (brokers: %d)" % len(self.brokers) - - - def addBroker(self, target="localhost", timeout=None, mechanisms=None, sessTimeout=None, **connectArgs): - """ Connect to a Qpid broker. Returns an object of type Broker. - Will raise an exception if the session is not managing the connection and - the connection setup to the broker fails. - """ - if isinstance(target, BrokerURL): - url = target - else: - url = BrokerURL(target) - broker = Broker(self, url.host, url.port, mechanisms, url.authName, url.authPass, - ssl = url.scheme == URL.AMQPS, connTimeout=timeout, sessTimeout=sessTimeout, **connectArgs) - - self.brokers.append(broker) - return broker - - - def delBroker(self, broker): - """ Disconnect from a broker, and deallocate the broker proxy object. The - 'broker' argument is the object returned from the addBroker call. Errors - are ignored. - """ - broker._shutdown() - self.brokers.remove(broker) - del broker - - - def getPackages(self): - """ Get the list of known QMF packages """ - for broker in self.brokers: - broker._waitForStable() - return self.schemaCache.getPackages() - - - def getClasses(self, packageName): - """ Get the list of known classes within a QMF package """ - for broker in self.brokers: - broker._waitForStable() - return self.schemaCache.getClasses(packageName) - - - def getSchema(self, classKey): - """ Get the schema for a QMF class """ - for broker in self.brokers: - broker._waitForStable() - return self.schemaCache.getSchema(classKey) - - - def bindPackage(self, packageName): - """ Filter object and event callbacks to only those elements of the - specified package. Also filters newPackage and newClass callbacks to the - given package. Only valid if userBindings is True. - """ - if not self.userBindings: - raise Exception("userBindings option must be set for this Session.") - if not self.rcvObjects and not self.rcvEvents: - raise Exception("Session needs to be configured to receive events or objects.") - v1keys = ["console.obj.*.*.%s.#" % packageName, "console.event.*.*.%s.#" % packageName] - v2keys = ["agent.ind.data.%s.#" % packageName.replace(".", "_"), - "agent.ind.event.%s.#" % packageName.replace(".", "_"),] - if (packageName, None) not in self.class_filter: - self.class_filter.append((packageName, None)) - if (packageName, None) not in self.event_filter: - self.event_filter.append((packageName, None)) - self.v1BindingKeyList.extend(v1keys) - self.v2BindingKeyList.extend(v2keys) - for broker in self.brokers: - if broker.isConnected(): - for v1key in v1keys: - broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key) - if broker.brokerSupportsV2: - for v2key in v2keys: - # data indications should arrive on the unsolicited indication queue - broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key) - - - def bindClass(self, pname, cname=None): - """ Filter object callbacks to only those objects of the specified package - and optional class. Will also filter newPackage/newClass callbacks to the - specified package and class. Only valid if userBindings is True and - rcvObjects is True. - """ - if not self.userBindings: - raise Exception("userBindings option must be set for this Session.") - if not self.rcvObjects: - raise Exception("Session needs to be configured with rcvObjects=True.") - if cname is not None: - v1key = "console.obj.*.*.%s.%s.#" % (pname, cname) - v2key = "agent.ind.data.%s.%s.#" % (pname.replace(".", "_"), cname.replace(".", "_")) - else: - v1key = "console.obj.*.*.%s.#" % pname - v2key = "agent.ind.data.%s.#" % pname.replace(".", "_") - self.v1BindingKeyList.append(v1key) - self.v2BindingKeyList.append(v2key) - if (pname, cname) not in self.class_filter: - self.class_filter.append((pname, cname)) - for broker in self.brokers: - if broker.isConnected(): - broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key) - if broker.brokerSupportsV2: - # data indications should arrive on the unsolicited indication queue - broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key) - - - def bindClassKey(self, classKey): - """ Filter object callbacks to only those objects of the specified - class. Will also filter newPackage/newClass callbacks to the specified - package and class. Only valid if userBindings is True and rcvObjects is - True. - """ - pname = classKey.getPackageName() - cname = classKey.getClassName() - self.bindClass(pname, cname) - - def bindEvent(self, pname, ename=None): - """ Filter event callbacks only from a particular class by package and - event name, or all events in a package if ename=None. Will also filter - newPackage/newClass callbacks to the specified package and class. Only - valid if userBindings is True and rcvEvents is True. - """ - if not self.userBindings: - raise Exception("userBindings option must be set for this Session.") - if not self.rcvEvents: - raise Exception("Session needs to be configured with rcvEvents=True.") - if ename is not None: - v1key = "console.event.*.*.%s.%s.#" % (pname, ename) - v2key = "agent.ind.event.%s.%s.#" % (pname.replace(".", "_"), ename.replace(".", "_")) - else: - v1key = "console.event.*.*.%s.#" % pname - v2key = "agent.ind.event.%s.#" % pname.replace(".", "_") - self.v1BindingKeyList.append(v1key) - self.v2BindingKeyList.append(v2key) - if (pname, ename) not in self.event_filter: - self.event_filter.append((pname, ename)) - for broker in self.brokers: - if broker.isConnected(): - broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key) - if broker.brokerSupportsV2: - # event indications should arrive on the unsolicited indication queue - broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key) - - def bindEventKey(self, eventKey): - """ Filter event callbacks only from a particular class key. Will also - filter newPackage/newClass callbacks to the specified package and - class. Only valid if userBindings is True and rcvEvents is True. - """ - pname = eventKey.getPackageName() - ename = eventKey.getClassName() - self.bindEvent(pname, ename) - - def bindAgent(self, vendor=None, product=None, instance=None, label=None): - """ Receive heartbeats, newAgent and delAgent callbacks only for those - agent(s) that match the passed identification criteria: - V2 agents: vendor, optionally product and instance strings - V1 agents: the label string. - Only valid if userBindings is True. - """ - if not self.userBindings: - raise Exception("Session not configured for binding specific agents.") - if vendor is None and label is None: - raise Exception("Must specify at least a vendor (V2 agents)" - " or label (V1 agents).") - - if vendor: # V2 agent identification - if product is not None: - v2key = "agent.ind.heartbeat.%s.%s.#" % (vendor.replace(".", "_"), product.replace(".", "_")) - else: - v2key = "agent.ind.heartbeat.%s.#" % vendor.replace(".", "_") - self.v2BindingKeyList.append(v2key) - - # allow wildcards - only add filter if a non-wildcarded component is given - if vendor == "*": - vendor = None - if product == "*": - product = None - if instance == "*": - instance = None - if vendor or product or instance: - if (vendor, product, instance) not in self.agent_filter: - self.agent_filter.append((vendor, product, instance)) - - for broker in self.brokers: - if broker.isConnected(): - if broker.brokerSupportsV2: - # heartbeats should arrive on the heartbeat queue - broker.amqpSession.exchange_bind(exchange="qmf.default.topic", - queue=broker.v2_topic_queue_hb, - binding_key=v2key) - elif label != "*": # non-wildcard V1 agent label - # V1 format heartbeats do not have any agent identifier in the routing - # key, so we cannot filter them by bindings. - if label not in self.agent_filter: - self.agent_filter.append(label) - - - def getAgents(self, broker=None): - """ Get a list of currently known agents """ - brokerList = [] - if broker == None: - for b in self.brokers: - brokerList.append(b) - else: - brokerList.append(broker) - - for b in brokerList: - b._waitForStable() - agentList = [] - for b in brokerList: - for a in b.getAgents(): - agentList.append(a) - return agentList - - - def makeObject(self, classKey, **kwargs): - """ Create a new, unmanaged object of the schema indicated by classKey """ - schema = self.getSchema(classKey) - if schema == None: - raise Exception("Schema not found for classKey") - return Object(None, schema, None, True, True, kwargs) - - - def getObjects(self, **kwargs): - """ Get a list of objects from QMF agents. - All arguments are passed by name(keyword). - - The class for queried objects may be specified in one of the following ways: - - _schema = <schema> - supply a schema object returned from getSchema. - _key = <key> - supply a classKey from the list returned by getClasses. - _class = <name> - supply a class name as a string. If the class name exists - in multiple packages, a _package argument may also be supplied. - _objectId = <id> - get the object referenced by the object-id - - If objects should be obtained from only one agent, use the following argument. - Otherwise, the query will go to all agents. - - _agent = <agent> - supply an agent from the list returned by getAgents. - - If the get query is to be restricted to one broker (as opposed to all connected brokers), - add the following argument: - - _broker = <broker> - supply a broker as returned by addBroker. - - The default timeout for this synchronous operation is 60 seconds. To change the timeout, - use the following argument: - - _timeout = <time in seconds> - - If additional arguments are supplied, they are used as property selectors. For example, - if the argument name="test" is supplied, only objects whose "name" property is "test" - will be returned in the result. - """ - if "_broker" in kwargs: - brokerList = [] - brokerList.append(kwargs["_broker"]) - else: - brokerList = self.brokers - for broker in brokerList: - broker._waitForStable() - if broker.isConnected(): - if "_package" not in kwargs or "_class" not in kwargs or \ - kwargs["_package"] != "org.apache.qpid.broker" or \ - kwargs["_class"] != "agent": - self.getObjects(_package = "org.apache.qpid.broker", _class = "agent", - _agent = broker.getAgent(1,0)) - - agentList = [] - if "_agent" in kwargs: - agent = kwargs["_agent"] - if agent.broker not in brokerList: - raise Exception("Supplied agent is not accessible through the supplied broker") - if agent.broker.isConnected(): - agentList.append(agent) - else: - if "_objectId" in kwargs: - oid = kwargs["_objectId"] - for broker in brokerList: - for agent in broker.getAgents(): - if agent.getBrokerBank() == oid.getBrokerBank() and agent.getAgentBank() == oid.getAgentBank(): - agentList.append(agent) - else: - for broker in brokerList: - for agent in broker.getAgents(): - if agent.broker.isConnected(): - agentList.append(agent) - - if len(agentList) == 0: - return [] - - # - # We now have a list of agents to query, start the queries and gather the results. - # - request = SessionGetRequest(len(agentList)) - for agent in agentList: - agent.getObjects(request, **kwargs) - timeout = 60 - if '_timeout' in kwargs: - timeout = kwargs['_timeout'] - request.wait(timeout) - return request.result - - - def addEventFilter(self, **kwargs): - """Filter unsolicited events based on package and event name. - QMF v2 also can filter on vendor, product, and severity values. - - By default, a console receives unsolicted events by binding to: - - qpid.management/console.event.# (v1) - - qmf.default.topic/agent.ind.event.# (v2) - - A V1 event filter binding uses the pattern: - - qpid.management/console.event.*.*[.<package>[.<event>]].# - - A V2 event filter binding uses the pattern: - - qmf.default.topic/agent.ind.event.<Vendor|*>.<Product|*>.<severity|*>.<package|*>.<event|*>.# - """ - package = kwargs.get("package", "*") - event = kwargs.get("event", "*") - vendor = kwargs.get("vendor", "*") - product = kwargs.get("product", "*") - severity = kwargs.get("severity", "*") - - if package == "*" and event != "*": - raise Exception("'package' parameter required if 'event' parameter" - " supplied") - - # V1 key - can only filter on package (and event) - if package == "*": - key = "console.event.*.*." + str(package) - if event != "*": - key += "." + str(event) - key += ".#" - - if key not in self.v1BindingKeyList: - self.v1BindingKeyList.append(key) - try: - # remove default wildcard binding - self.v1BindingKeyList.remove("console.event.#") - except: - pass - - # V2 key - escape any "." in the filter strings - - key = "agent.ind.event." + str(package).replace(".", "_") \ - + "." + str(event).replace(".", "_") \ - + "." + str(severity).replace(".", "_") \ - + "." + str(vendor).replace(".", "_") \ - + "." + str(product).replace(".", "_") \ - + ".#" - - if key not in self.v2BindingKeyList: - self.v2BindingKeyList.append(key) - try: - # remove default wildcard binding - self.v2BindingKeyList.remove("agent.ind.event.#") - except: - pass - - if package != "*": - if event != "*": - f = (package, event) - else: - f = (package, None) - if f not in self.event_filter: - self.event_filter.append(f) - - - def addAgentFilter(self, vendor, product=None): - """ Deprecate - use bindAgent() instead - """ - self.addHeartbeatFilter(vendor=vendor, product=product) - - def addHeartbeatFilter(self, **kwargs): - """ Deprecate - use bindAgent() instead. - """ - vendor = kwargs.get("vendor") - product = kwargs.get("product") - if vendor is None: - raise Exception("vendor parameter required!") - - # V1 heartbeats do not have any agent identifier - we cannot - # filter them by agent. - - # build the binding key - escape "."s... - key = "agent.ind.heartbeat." + str(vendor).replace(".", "_") - if product is not None: - key += "." + str(product).replace(".", "_") - key += ".#" - - if key not in self.v2BindingKeyList: - self.v2BindingKeyList.append(key) - self.agent_filter.append((vendor, product, None)) - - # be sure we don't ever filter the local broker - local_broker_key = "agent.ind.heartbeat." + "org.apache".replace(".", "_") \ - + "." + "qpidd".replace(".", "_") + ".#" - if local_broker_key not in self.v2BindingKeyList: - self.v2BindingKeyList.append(local_broker_key) - - # remove the wildcard key if present - try: - self.v2BindingKeyList.remove("agent.ind.heartbeat.#") - except: - pass - - def _bindingKeys(self): - v1KeyList = [] - v2KeyList = [] - v1KeyList.append("schema.#") - # note well: any binding that starts with 'agent.ind.heartbeat' will be - # bound to the heartbeat queue, otherwise it will be bound to the - # unsolicited indication queue. See _decOutstanding() for the binding. - if not self.userBindings: - if self.rcvObjects and self.rcvEvents and self.rcvHeartbeats: - v1KeyList.append("console.#") - v2KeyList.append("agent.ind.data.#") - v2KeyList.append("agent.ind.event.#") - v2KeyList.append("agent.ind.heartbeat.#") - else: - # need heartbeats for V2 newAgent()/delAgent() - v2KeyList.append("agent.ind.heartbeat.#") - if self.rcvObjects: - v1KeyList.append("console.obj.#") - v2KeyList.append("agent.ind.data.#") - else: - v1KeyList.append("console.obj.*.*.org.apache.qpid.broker.agent") - if self.rcvEvents: - v1KeyList.append("console.event.#") - v2KeyList.append("agent.ind.event.#") - else: - v1KeyList.append("console.event.*.*.org.apache.qpid.broker.agent") - if self.rcvHeartbeats: - v1KeyList.append("console.heartbeat.#") - else: - # mandatory bindings - v1KeyList.append("console.obj.*.*.org.apache.qpid.broker.agent") - v1KeyList.append("console.event.*.*.org.apache.qpid.broker.agent") - v1KeyList.append("console.heartbeat.#") # no way to turn this on later - v2KeyList.append("agent.ind.heartbeat.org_apache.qpidd.#") - - return (v1KeyList, v2KeyList) - - - def _handleBrokerConnect(self, broker): - if self.console: - for agent in broker.getAgents(): - self._newAgentCallback(agent) - self.console.brokerConnected(broker) - - - def _handleBrokerDisconnect(self, broker): - if self.console: - for agent in broker.getAgents(): - self._delAgentCallback(agent) - self.console.brokerDisconnected(broker) - - - def _handleBrokerResp(self, broker, codec, seq): - broker.brokerId = codec.read_uuid() - if self.console != None: - self.console.brokerInfo(broker) - - # Send a package request - # (effectively inc and dec outstanding by not doing anything) - sendCodec = Codec() - seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) - broker._setHeader(sendCodec, 'P', seq) - smsg = broker._message(sendCodec.encoded) - broker._send(smsg) - - - def _handlePackageInd(self, broker, codec, seq): - pname = str(codec.read_str8()) - notify = self.schemaCache.declarePackage(pname) - if notify and self.console != None: - self._newPackageCallback(pname) - - # Send a class request - broker._incOutstanding() - sendCodec = Codec() - seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) - broker._setHeader(sendCodec, 'Q', seq) - sendCodec.write_str8(pname) - smsg = broker._message(sendCodec.encoded) - broker._send(smsg) - - - def _handleCommandComplete(self, broker, codec, seq, agent): - code = codec.read_uint32() - text = codec.read_str8() - context = self.seqMgr._release(seq) - if context == self._CONTEXT_STARTUP: - broker._decOutstanding() - elif context == self._CONTEXT_SYNC and seq == broker.syncSequence: - try: - broker.cv.acquire() - broker.syncInFlight = False - broker.cv.notify() - finally: - broker.cv.release() - elif context == self._CONTEXT_MULTIGET and seq in self.syncSequenceList: - try: - self.cv.acquire() - self.syncSequenceList.remove(seq) - if len(self.syncSequenceList) == 0: - self.cv.notify() - finally: - self.cv.release() - - if agent: - agent._handleV1Completion(seq, code, text) - - - def _handleClassInd(self, broker, codec, seq): - kind = codec.read_uint8() - classKey = ClassKey(codec) - classKey._setType(kind) - schema = self.schemaCache.getSchema(classKey) - - if not schema: - # Send a schema request for the unknown class - broker._incOutstanding() - sendCodec = Codec() - seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) - broker._setHeader(sendCodec, 'S', seq) - classKey.encode(sendCodec) - smsg = broker._message(sendCodec.encoded) - broker._send(smsg) - - - def _handleHeartbeatInd(self, broker, codec, seq, msg): - brokerBank = 1 - agentBank = 0 - dp = msg.get("delivery_properties") - if dp: - key = dp["routing_key"] - if key: - keyElements = key.split(".") - if len(keyElements) == 4: - brokerBank = int(keyElements[2]) - agentBank = int(keyElements[3]) - else: - # If there's no routing key in the delivery properties, - # assume the message is from the broker. - brokerBank = 1 - agentBank = 0 - - agent = broker.getAgent(brokerBank, agentBank) - if self.rcvHeartbeats and self.console != None and agent != None: - timestamp = codec.read_uint64() - self._heartbeatCallback(agent, timestamp) - - - def _handleSchemaResp(self, broker, codec, seq, agent_addr): - kind = codec.read_uint8() - classKey = ClassKey(codec) - classKey._setType(kind) - _class = SchemaClass(kind, classKey, codec, self) - new_pkg, new_cls = self.schemaCache.declareClass(classKey, _class) - ctx = self.seqMgr._release(seq) - if ctx: - broker._decOutstanding() - if self.console != None: - if new_pkg: - self._newPackageCallback(classKey.getPackageName()) - if new_cls: - self._newClassCallback(kind, classKey) - - if agent_addr and (agent_addr.__class__ == str or agent_addr.__class__ == unicode): - agent = self._getAgentForAgentAddr(agent_addr) - if agent: - agent._schemaInfoFromV2Agent() - - - def _v2HandleHeartbeatInd(self, broker, mp, ah, content): - try: - agentName = ah["qmf.agent"] - values = content["_values"] - - if '_timestamp' in values: - timestamp = values["_timestamp"] - else: - timestamp = values['timestamp'] - - if '_heartbeat_interval' in values: - interval = values['_heartbeat_interval'] - else: - interval = values['heartbeat_interval'] - - epoch = 0 - if '_epoch' in values: - epoch = values['_epoch'] - elif 'epoch' in values: - epoch = values['epoch'] - except Exception,e: - return - - if self.agent_filter: - # only allow V2 agents that satisfy the filter - v = agentName.split(":", 2) - if len(v) != 3 or ((v[0], None, None) not in self.agent_filter - and (v[0], v[1], None) not in self.agent_filter - and (v[0], v[1], v[2]) not in self.agent_filter): - return - - ## - ## We already have the "local-broker" agent in our list as ['0']. - ## - if '_vendor' in values and values['_vendor'] == 'apache.org' and \ - '_product' in values and values['_product'] == 'qpidd': - agent = broker.getBrokerAgent() - else: - agent = broker.getAgent(1, agentName) - if agent == None: - agent = Agent(broker, agentName, "QMFv2 Agent", True, interval) - agent.setEpoch(epoch) - broker._addAgent(agentName, agent) - else: - agent.touch() - if self.rcvHeartbeats and self.console and agent: - self._heartbeatCallback(agent, timestamp) - agent.update_schema_timestamp(values.get("_schema_updated", 0)) - - - def _v2HandleAgentLocateRsp(self, broker, mp, ah, content): - self._v2HandleHeartbeatInd(broker, mp, ah, content) - - - def _handleError(self, error): - try: - self.cv.acquire() - if len(self.syncSequenceList) > 0: - self.error = error - self.syncSequenceList = [] - self.cv.notify() - finally: - self.cv.release() - - - def _selectMatch(self, object): - """ Check the object against self.getSelect to check for a match """ - for key, value in self.getSelect: - for prop, propval in object.getProperties(): - if key == prop.name and value != propval: - return False - return True - - - def _decodeValue(self, codec, typecode, broker=None): - """ Decode, from the codec, a value based on its typecode. """ - if typecode == 1: data = codec.read_uint8() # U8 - elif typecode == 2: data = codec.read_uint16() # U16 - elif typecode == 3: data = codec.read_uint32() # U32 - elif typecode == 4: data = codec.read_uint64() # U64 - elif typecode == 6: data = codec.read_str8() # SSTR - elif typecode == 7: data = codec.read_str16() # LSTR - elif typecode == 8: data = codec.read_int64() # ABSTIME - elif typecode == 9: data = codec.read_uint64() # DELTATIME - elif typecode == 10: data = ObjectId(codec) # REF - elif typecode == 11: data = codec.read_uint8() != 0 # BOOL - elif typecode == 12: data = codec.read_float() # FLOAT - elif typecode == 13: data = codec.read_double() # DOUBLE - elif typecode == 14: data = codec.read_uuid() # UUID - elif typecode == 16: data = codec.read_int8() # S8 - elif typecode == 17: data = codec.read_int16() # S16 - elif typecode == 18: data = codec.read_int32() # S32 - elif typecode == 19: data = codec.read_int64() # S63 - elif typecode == 15: data = codec.read_map() # FTABLE - elif typecode == 20: # OBJECT - # Peek at the type, and if it is still 20 pull it decode. If - # Not, call back into self. - inner_type_code = codec.read_uint8() - if inner_type_code == 20: - classKey = ClassKey(codec) - schema = self.schemaCache.getSchema(classKey) - if not schema: - return None - data = Object(self, broker, schema, codec, True, True, False) - else: - data = self._decodeValue(codec, inner_type_code, broker) - elif typecode == 21: data = codec.read_list() # List - elif typecode == 22: #Array - #taken from codec10.read_array - sc = Codec(codec.read_vbin32()) - count = sc.read_uint32() - type = sc.read_uint8() - data = [] - while count > 0: - data.append(self._decodeValue(sc,type,broker)) - count -= 1 - else: - raise ValueError("Invalid type code: %d" % typecode) - return data - - - def _encodeValue(self, codec, value, typecode): - """ Encode, into the codec, a value based on its typecode. """ - if typecode == 1: codec.write_uint8 (int(value)) # U8 - elif typecode == 2: codec.write_uint16 (int(value)) # U16 - elif typecode == 3: codec.write_uint32 (long(value)) # U32 - elif typecode == 4: codec.write_uint64 (long(value)) # U64 - elif typecode == 6: codec.write_str8 (value) # SSTR - elif typecode == 7: codec.write_str16 (value) # LSTR - elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME - elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME - elif typecode == 10: value.encode (codec) # REF - elif typecode == 11: codec.write_uint8 (int(value)) # BOOL - elif typecode == 12: codec.write_float (float(value)) # FLOAT - elif typecode == 13: codec.write_double (float(value)) # DOUBLE - elif typecode == 14: codec.write_uuid (value.bytes) # UUID - elif typecode == 16: codec.write_int8 (int(value)) # S8 - elif typecode == 17: codec.write_int16 (int(value)) # S16 - elif typecode == 18: codec.write_int32 (int(value)) # S32 - elif typecode == 19: codec.write_int64 (int(value)) # S64 - elif typecode == 20: value._encodeUnmanaged(codec) # OBJECT - elif typecode == 15: codec.write_map (value) # FTABLE - elif typecode == 21: codec.write_list (value) # List - elif typecode == 22: # Array - sc = Codec() - self._encodeValue(sc, len(value), 3) - if len(value) > 0: - ltype = self.encoding(value[0]) - self._encodeValue(sc,ltype,1) - for o in value: - self._encodeValue(sc, o, ltype) - codec.write_vbin32(sc.encoded) - else: - raise ValueError ("Invalid type code: %d" % typecode) - - - def encoding(self, value): - return self._encoding(value.__class__) - - - def _encoding(self, klass): - if Session.ENCODINGS.has_key(klass): - return self.ENCODINGS[klass] - for base in klass.__bases__: - result = self._encoding(base) - if result != None: - return result - - - def _displayValue(self, value, typecode): - """ """ - if typecode == 1: return unicode(value) - elif typecode == 2: return unicode(value) - elif typecode == 3: return unicode(value) - elif typecode == 4: return unicode(value) - elif typecode == 6: return value - elif typecode == 7: return value - elif typecode == 8: return unicode(strftime("%c", gmtime(value / 1000000000))) - elif typecode == 9: return unicode(value) - elif typecode == 10: return unicode(value.__repr__()) - elif typecode == 11: - if value: return u"T" - else: return u"F" - elif typecode == 12: return unicode(value) - elif typecode == 13: return unicode(value) - elif typecode == 14: return unicode(value.__repr__()) - elif typecode == 15: return unicode(value.__repr__()) - elif typecode == 16: return unicode(value) - elif typecode == 17: return unicode(value) - elif typecode == 18: return unicode(value) - elif typecode == 19: return unicode(value) - elif typecode == 20: return unicode(value.__repr__()) - elif typecode == 21: return unicode(value.__repr__()) - elif typecode == 22: return unicode(value.__repr__()) - else: - raise ValueError ("Invalid type code: %d" % typecode) - - - def _defaultValue(self, stype, broker=None, kwargs={}): - """ """ - typecode = stype.type - if typecode == 1: return 0 - elif typecode == 2: return 0 - elif typecode == 3: return 0 - elif typecode == 4: return 0 - elif typecode == 6: return "" - elif typecode == 7: return "" - elif typecode == 8: return 0 - elif typecode == 9: return 0 - elif typecode == 10: return ObjectId(None) - elif typecode == 11: return False - elif typecode == 12: return 0.0 - elif typecode == 13: return 0.0 - elif typecode == 14: return UUID(bytes=[0 for i in range(16)]) - elif typecode == 15: return {} - elif typecode == 16: return 0 - elif typecode == 17: return 0 - elif typecode == 18: return 0 - elif typecode == 19: return 0 - elif typecode == 21: return [] - elif typecode == 22: return [] - elif typecode == 20: - try: - if "classKeys" in kwargs: - keyList = kwargs["classKeys"] - else: - keyList = None - classKey = self._bestClassKey(stype.refPackage, stype.refClass, keyList) - if classKey: - return self.makeObject(classKey, broker, kwargs) - except: - pass - return None - else: - raise ValueError ("Invalid type code: %d" % typecode) - - - def _bestClassKey(self, pname, cname, preferredList): - """ """ - if pname == None or cname == None: - if len(preferredList) == 0: - return None - return preferredList[0] - for p in preferredList: - if p.getPackageName() == pname and p.getClassName() == cname: - return p - clist = self.getClasses(pname) - for c in clist: - if c.getClassName() == cname: - return c - return None - - - def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList): - """ This is a legacy function that is used by qpid-tool to invoke methods - using the broker, objectId and schema. - Methods are now invoked on the object itself. - """ - objs = self.getObjects(_objectId=objectId) - if objs: - return objs[0]._sendMethodRequest(name, argList, {}) - return None - - def _newPackageCallback(self, pname): - """ - Invokes the console.newPackage() callback if the callback is present and - the package is not filtered. - """ - if self.console: - if len(self.class_filter) == 0 and len(self.event_filter) == 0: - self.console.newPackage(pname) - else: - for x in self.class_filter: - if x[0] == pname: - self.console.newPackage(pname) - return - - for x in self.event_filter: - if x[0] == pname: - self.console.newPackage(pname) - return - - - def _newClassCallback(self, ctype, ckey): - """ - Invokes the console.newClass() callback if the callback is present and the - class is not filtered. - """ - if self.console: - if ctype == ClassKey.TYPE_DATA: - if (len(self.class_filter) == 0 - or (ckey.getPackageName(), ckey.getClassName()) in self.class_filter): - self.console.newClass(ctype, ckey) - elif ctype == ClassKey.TYPE_EVENT: - if (len(self.event_filter) == 0 - or (ckey.getPackageName(), ckey.getClassName()) in self.event_filter): - self.console.newClass(ctype, ckey) - else: # old class keys did not contain type info, check both filters - if ((len(self.class_filter) == 0 and len(self.event_filter) == 0) - or (ckey.getPackageName(), ckey.getClassName()) in self.class_filter - or (ckey.getPackageName(), ckey.getClassName()) in self.event_filter): - self.console.newClass(ctype, ckey) - - def _agentAllowed(self, agentName, isV2): - """ True if the agent is NOT filtered. - """ - if self.agent_filter: - if isV2: - v = agentName.split(":", 2) - return ((len(v) > 2 and (v[0], v[1], v[2]) in self.agent_filter) - or (len(v) > 1 and (v[0], v[1], None) in self.agent_filter) - or (v and (v[0], None, None) in self.agent_filter)); - else: - return agentName in self.agent_filter - return True - - def _heartbeatCallback(self, agent, timestamp): - """ - Invokes the console.heartbeat() callback if the callback is present and the - agent is not filtered. - """ - if self.console and self.rcvHeartbeats: - if ((agent.isV2 and self._agentAllowed(agent.agentBank, True)) - or ((not agent.isV2) and self._agentAllowed(agent.label, False))): - self.console.heartbeat(agent, timestamp) - - def _newAgentCallback(self, agent): - """ - Invokes the console.newAgent() callback if the callback is present and the - agent is not filtered. - """ - if self.console: - if ((agent.isV2 and self._agentAllowed(agent.agentBank, True)) - or ((not agent.isV2) and self._agentAllowed(agent.label, False))): - self.console.newAgent(agent) - - def _delAgentCallback(self, agent): - """ - Invokes the console.delAgent() callback if the callback is present and the - agent is not filtered. - """ - if self.console: - if ((agent.isV2 and self._agentAllowed(agent.agentBank, True)) - or ((not agent.isV2) and self._agentAllowed(agent.label, False))): - self.console.delAgent(agent) - -#=================================================================================================== -# SessionGetRequest -#=================================================================================================== -class SessionGetRequest(object): - """ - This class is used to track get-object queries at the Session level. - """ - def __init__(self, agentCount): - self.agentCount = agentCount - self.result = [] - self.cv = Condition() - self.waiting = True - - def __call__(self, **kwargs): - """ - Callable entry point for gathering collected objects. - """ - try: - self.cv.acquire() - if 'qmf_object' in kwargs: - self.result.append(kwargs['qmf_object']) - elif 'qmf_complete' in kwargs or 'qmf_exception' in kwargs: - self.agentCount -= 1 - if self.agentCount == 0: - self.waiting = None - self.cv.notify() - finally: - self.cv.release() - - def wait(self, timeout): - starttime = time() - try: - self.cv.acquire() - while self.waiting: - if (time() - starttime) > timeout: - raise Exception("Timed out after %d seconds" % timeout) - self.cv.wait(1) - finally: - self.cv.release() - - -#=================================================================================================== -# SchemaCache -#=================================================================================================== -class SchemaCache(object): - """ - The SchemaCache is a data structure that stores learned schema information. - """ - def __init__(self): - """ - Create a map of schema packages and a lock to protect this data structure. - Note that this lock is at the bottom of any lock hierarchy. If it is held, no other - lock in the system should attempt to be acquired. - """ - self.packages = {} - self.lock = Lock() - - def getPackages(self): - """ Get the list of known QMF packages """ - list = [] - try: - self.lock.acquire() - for package in self.packages: - list.append(package) - finally: - self.lock.release() - return list - - def getClasses(self, packageName): - """ Get the list of known classes within a QMF package """ - list = [] - try: - self.lock.acquire() - if packageName in self.packages: - for pkey in self.packages[packageName]: - if isinstance(self.packages[packageName][pkey], SchemaClass): - list.append(self.packages[packageName][pkey].getKey()) - elif self.packages[packageName][pkey] is not None: - # schema not present yet, but we have schema type - list.append(ClassKey({"_package_name": packageName, - "_class_name": pkey[0], - "_hash": pkey[1], - "_type": self.packages[packageName][pkey]})) - finally: - self.lock.release() - return list - - def getSchema(self, classKey): - """ Get the schema for a QMF class, return None if schema not available """ - pname = classKey.getPackageName() - pkey = classKey.getPackageKey() - try: - self.lock.acquire() - if pname in self.packages: - if (pkey in self.packages[pname] and - isinstance(self.packages[pname][pkey], SchemaClass)): - # hack: value may be schema type info if schema not available - return self.packages[pname][pkey] - finally: - self.lock.release() - return None - - def declarePackage(self, pname): - """ Maybe add a package to the cache. Return True if package was added, None if it pre-existed. """ - try: - self.lock.acquire() - if pname in self.packages: - return None - self.packages[pname] = {} - finally: - self.lock.release() - return True - - def declareClass(self, classKey, classDef=None): - """ Add a class definition to the cache, if supplied. Return a pair - indicating if the package or class is new. - """ - new_package = False - new_class = False - pname = classKey.getPackageName() - pkey = classKey.getPackageKey() - try: - self.lock.acquire() - if pname not in self.packages: - self.packages[pname] = {} - new_package = True - packageMap = self.packages[pname] - if pkey not in packageMap or not isinstance(packageMap[pkey], SchemaClass): - if classDef is not None: - new_class = True - packageMap[pkey] = classDef - elif classKey.getType() is not None: - # hack: don't indicate "new_class" to caller unless the classKey type - # information is present. "new_class" causes the console.newClass() - # callback to be invoked, which -requires- a valid classKey type! - new_class = True - # store the type for the getClasses() method: - packageMap[pkey] = classKey.getType() - - finally: - self.lock.release() - return (new_package, new_class) - - -#=================================================================================================== -# ClassKey -#=================================================================================================== -class ClassKey: - """ A ClassKey uniquely identifies a class from the schema. """ - - TYPE_DATA = "_data" - TYPE_EVENT = "_event" - - def __init__(self, constructor): - if constructor.__class__ == str: - # construct from __repr__ string - try: - # supports two formats: - # type present = P:C:T(H) - # no type present = P:C(H) - tmp = constructor.split(":") - if len(tmp) == 3: - self.pname, self.cname, rem = tmp - self.type, hsh = rem.split("(") - else: - self.pname, rem = tmp - self.cname, hsh = rem.split("(") - self.type = None - hsh = hsh.strip(")") - hexValues = hsh.split("-") - h0 = int(hexValues[0], 16) - h1 = int(hexValues[1], 16) - h2 = int(hexValues[2], 16) - h3 = int(hexValues[3], 16) - h4 = int(hexValues[4][0:4], 16) - h5 = int(hexValues[4][4:12], 16) - self.hash = UUID(bytes=struct.pack("!LHHHHL", h0, h1, h2, h3, h4, h5)) - except: - raise Exception("Invalid ClassKey format") - elif constructor.__class__ == dict: - # construct from QMFv2 map - try: - self.pname = constructor['_package_name'] - self.cname = constructor['_class_name'] - self.hash = constructor['_hash'] - self.type = constructor.get('_type') - except: - raise Exception("Invalid ClassKey map format %s" % str(constructor)) - else: - # construct from codec - codec = constructor - self.pname = str(codec.read_str8()) - self.cname = str(codec.read_str8()) - self.hash = UUID(bytes=codec.read_bin128()) - # old V1 codec did not include "type" - self.type = None - - def encode(self, codec): - # old V1 codec did not include "type" - codec.write_str8(self.pname) - codec.write_str8(self.cname) - codec.write_bin128(self.hash.bytes) - - def asMap(self): - m = {'_package_name': self.pname, - '_class_name': self.cname, - '_hash': self.hash} - if self.type is not None: - m['_type'] = self.type - return m - - def getPackageName(self): - return self.pname - - def getClassName(self): - return self.cname - - def getHash(self): - return self.hash - - def getType(self): - return self.type - - def getHashString(self): - return str(self.hash) - - def getPackageKey(self): - return (self.cname, self.hash) - - def __repr__(self): - if self.type is None: - return self.pname + ":" + self.cname + "(" + self.getHashString() + ")" - return self.pname + ":" + self.cname + ":" + self.type + "(" + self.getHashString() + ")" - - def _setType(self, _type): - if _type == 2 or _type == ClassKey.TYPE_EVENT: - self.type = ClassKey.TYPE_EVENT - else: - self.type = ClassKey.TYPE_DATA - - def __hash__(self): - ss = self.pname + self.cname + self.getHashString() - return ss.__hash__() - - def __eq__(self, other): - return self.__repr__() == other.__repr__() - -#=================================================================================================== -# SchemaClass -#=================================================================================================== -class SchemaClass: - """ """ - CLASS_KIND_TABLE = 1 - CLASS_KIND_EVENT = 2 - - def __init__(self, kind, key, codec, session): - self.kind = kind - self.classKey = key - self.properties = [] - self.statistics = [] - self.methods = [] - self.arguments = [] - self.session = session - - hasSupertype = 0 #codec.read_uint8() - if self.kind == self.CLASS_KIND_TABLE: - propCount = codec.read_uint16() - statCount = codec.read_uint16() - methodCount = codec.read_uint16() - if hasSupertype == 1: - self.superTypeKey = ClassKey(codec) - else: - self.superTypeKey = None ; - for idx in range(propCount): - self.properties.append(SchemaProperty(codec)) - for idx in range(statCount): - self.statistics.append(SchemaStatistic(codec)) - for idx in range(methodCount): - self.methods.append(SchemaMethod(codec)) - - elif self.kind == self.CLASS_KIND_EVENT: - argCount = codec.read_uint16() - if (hasSupertype): - self.superTypeKey = ClassKey(codec) - else: - self.superTypeKey = None ; - for idx in range(argCount): - self.arguments.append(SchemaArgument(codec, methodArg=False)) - - def __repr__(self): - if self.kind == self.CLASS_KIND_TABLE: - kindStr = "Table" - elif self.kind == self.CLASS_KIND_EVENT: - kindStr = "Event" - else: - kindStr = "Unsupported" - result = "%s Class: %s " % (kindStr, self.classKey.__repr__()) - return result - - def getKey(self): - """ Return the class-key for this class. """ - return self.classKey - - def getProperties(self): - """ Return the list of properties for the class. """ - if (self.superTypeKey == None): - return self.properties - else: - return self.properties + self.session.getSchema(self.superTypeKey).getProperties() - - def getStatistics(self): - """ Return the list of statistics for the class. """ - if (self.superTypeKey == None): - return self.statistics - else: - return self.statistics + self.session.getSchema(self.superTypeKey).getStatistics() - - def getMethods(self): - """ Return the list of methods for the class. """ - if (self.superTypeKey == None): - return self.methods - else: - return self.methods + self.session.getSchema(self.superTypeKey).getMethods() - - def getArguments(self): - """ Return the list of events for the class. """ - """ Return the list of methods for the class. """ - if (self.superTypeKey == None): - return self.arguments - else: - return self.arguments + self.session.getSchema(self.superTypeKey).getArguments() - - -#=================================================================================================== -# SchemaProperty -#=================================================================================================== -class SchemaProperty: - """ """ - def __init__(self, codec): - map = codec.read_map() - self.name = str(map["name"]) - self.type = map["type"] - self.access = str(map["access"]) - self.index = map["index"] != 0 - self.optional = map["optional"] != 0 - self.refPackage = None - self.refClass = None - self.unit = None - self.min = None - self.max = None - self.maxlen = None - self.desc = None - - for key, value in map.items(): - if key == "unit" : self.unit = value - elif key == "min" : self.min = value - elif key == "max" : self.max = value - elif key == "maxlen" : self.maxlen = value - elif key == "desc" : self.desc = value - elif key == "refPackage" : self.refPackage = value - elif key == "refClass" : self.refClass = value - - def __repr__(self): - return self.name - - -#=================================================================================================== -# SchemaStatistic -#=================================================================================================== -class SchemaStatistic: - """ """ - def __init__(self, codec): - map = codec.read_map() - self.name = str(map["name"]) - self.type = map["type"] - self.unit = None - self.desc = None - - for key, value in map.items(): - if key == "unit" : self.unit = value - elif key == "desc" : self.desc = value - - def __repr__(self): - return self.name - - -#=================================================================================================== -# SchemaMethod -#=================================================================================================== -class SchemaMethod: - """ """ - def __init__(self, codec): - map = codec.read_map() - self.name = str(map["name"]) - argCount = map["argCount"] - if "desc" in map: - self.desc = map["desc"] - else: - self.desc = None - self.arguments = [] - - for idx in range(argCount): - self.arguments.append(SchemaArgument(codec, methodArg=True)) - - def __repr__(self): - result = self.name + "(" - first = True - for arg in self.arguments: - if arg.dir.find("I") != -1: - if first: - first = False - else: - result += ", " - result += arg.name - result += ")" - return result - - -#=================================================================================================== -# SchemaArgument -#=================================================================================================== -class SchemaArgument: - """ """ - def __init__(self, codec, methodArg): - map = codec.read_map() - self.name = str(map["name"]) - self.type = map["type"] - if methodArg: - self.dir = str(map["dir"]).upper() - self.unit = None - self.min = None - self.max = None - self.maxlen = None - self.desc = None - self.default = None - self.refPackage = None - self.refClass = None - - for key, value in map.items(): - if key == "unit" : self.unit = value - elif key == "min" : self.min = value - elif key == "max" : self.max = value - elif key == "maxlen" : self.maxlen = value - elif key == "desc" : self.desc = value - elif key == "default" : self.default = value - elif key == "refPackage" : self.refPackage = value - elif key == "refClass" : self.refClass = value - - -#=================================================================================================== -# ObjectId -#=================================================================================================== -class ObjectId: - """ Object that represents QMF object identifiers """ - def __init__(self, constructor, first=0, second=0, agentName=None): - if constructor.__class__ == dict: - self.isV2 = True - self.agentName = agentName - self.agentEpoch = 0 - if '_agent_name' in constructor: self.agentName = constructor['_agent_name'] - if '_agent_epoch' in constructor: self.agentEpoch = constructor['_agent_epoch'] - if '_object_name' not in constructor: - raise Exception("QMFv2 OBJECT_ID must have the '_object_name' field.") - self.objectName = constructor['_object_name'] - else: - self.isV2 = None - if not constructor: - first = first - second = second - else: - first = constructor.read_uint64() - second = constructor.read_uint64() - self.agentName = str(first & 0x000000000FFFFFFF) - self.agentEpoch = (first & 0x0FFF000000000000) >> 48 - self.objectName = str(second) - - def _create(cls, agent_name, object_name, epoch=0): - oid = {"_agent_name": agent_name, - "_object_name": object_name, - "_agent_epoch": epoch} - return cls(oid) - create = classmethod(_create) - - def __cmp__(self, other): - if other == None or not isinstance(other, ObjectId) : - return 1 - - if self.objectName < other.objectName: - return -1 - if self.objectName > other.objectName: - return 1 - - if self.agentName < other.agentName: - return -1 - if self.agentName > other.agentName: - return 1 - - if self.agentEpoch < other.agentEpoch: - return -1 - if self.agentEpoch > other.agentEpoch: - return 1 - return 0 - - def __repr__(self): - return "%d-%d-%d-%s-%s" % (self.getFlags(), self.getSequence(), - self.getBrokerBank(), self.getAgentBank(), self.getObject()) - - def index(self): - return self.__repr__() - - def getFlags(self): - return 0 - - def getSequence(self): - return self.agentEpoch - - def getBrokerBank(self): - return 1 - - def getAgentBank(self): - return self.agentName - - def getV2RoutingKey(self): - if self.agentName == '0': - return "broker" - return self.agentName - - def getObject(self): - return self.objectName - - def isDurable(self): - return self.getSequence() == 0 - - def encode(self, codec): - first = (self.agentEpoch << 48) + (1 << 28) - second = 0 - - try: - first += int(self.agentName) - except: - pass - - try: - second = int(self.objectName) - except: - pass - - codec.write_uint64(first) - codec.write_uint64(second) - - def asMap(self): - omap = {'_agent_name': self.agentName, '_object_name': self.objectName} - if self.agentEpoch != 0: - omap['_agent_epoch'] = self.agentEpoch - return omap - - def __hash__(self): - return self.__repr__().__hash__() - - def __eq__(self, other): - return self.__repr__().__eq__(other) - - -#=================================================================================================== -# MethodResult -#=================================================================================================== -class MethodResult(object): - """ """ - def __init__(self, status, text, outArgs): - """ """ - self.status = status - self.text = text - self.outArgs = outArgs - - def __getattr__(self, name): - if name in self.outArgs: - return self.outArgs[name] - - def __repr__(self): - return "%s (%d) - %s" % (self.text, self.status, self.outArgs) - - -#=================================================================================================== -# Broker -#=================================================================================================== -class Broker(Thread): - """ This object represents a connection (or potential connection) to a QMF broker. """ - SYNC_TIME = 60 - nextSeq = 1 - - # for connection recovery - DELAY_MIN = 1 - DELAY_MAX = 128 - DELAY_FACTOR = 2 - - class _q_item: - """ Broker-private class to encapsulate data sent to the broker thread - queue. - """ - type_wakeup = 0 - type_v1msg = 1 - type_v2msg = 2 - - def __init__(self, typecode, data): - self.typecode = typecode - self.data = data - - def __init__(self, session, host, port, authMechs, authUser, authPass, - ssl=False, connTimeout=None, sessTimeout=None, **connectArgs): - """ Create a broker proxy and setup a connection to the broker. Will raise - an exception if the connection fails and the session is not configured to - retry connection setup (manageConnections = False). - - Spawns a thread to manage the broker connection. Call _shutdown() to - shutdown the thread when releasing the broker. - """ - Thread.__init__(self) - self.session = session - self.host = host - self.port = port - self.mechanisms = authMechs - self.ssl = ssl - if connTimeout is not None: - connTimeout = float(connTimeout) - self.connTimeout = connTimeout - if sessTimeout is not None: - sessTimeout = float(sessTimeout) - else: - sessTimeout = self.SYNC_TIME - self.sessTimeout = sessTimeout - self.authUser = authUser - self.authPass = authPass - self.saslUser = None - self.cv = Condition() - self.seqToAgentMap = {} - self.error = None - self.conn_exc = None # exception hit by _tryToConnect() - self.brokerId = None - self.connected = False - self.brokerAgent = None - self.brokerSupportsV2 = None - self.rcv_queue = Queue() # for msg received on session - self.conn = None - self.amqpSession = None - self.amqpSessionId = "%s.%d.%d" % (platform.uname()[1], os.getpid(), Broker.nextSeq) - Broker.nextSeq += 1 - self.last_age_check = time() - self.connectArgs = connectArgs - # thread control - self.setDaemon(True) - self.setName("Thread for broker: %s:%d" % (host, port)) - self.canceled = False - self.ready = Semaphore(0) - self.start() - if not self.session.manageConnections: - # wait for connection setup to complete in subthread. - # On failure, propagate exception to caller - self.ready.acquire() - if self.conn_exc: - self._shutdown() # wait for the subthread to clean up... - raise self.conn_exc - # connection up - wait for stable... - try: - self._waitForStable() - agent = self.getBrokerAgent() - if agent: - agent.getObjects(_class="agent") - except: - self._shutdown() # wait for the subthread to clean up... - raise - - - def isConnected(self): - """ Return True if there is an active connection to the broker. """ - return self.connected - - def getError(self): - """ Return the last error message seen while trying to connect to the broker. """ - return self.error - - def getBrokerId(self): - """ Get broker's unique identifier (UUID) """ - return self.brokerId - - def getBrokerBank(self): - """ Return the broker-bank value. This is the value that the broker assigns to - objects within its control. This value appears as a field in the ObjectId - of objects created by agents controlled by this broker. """ - return 1 - - def getAgent(self, brokerBank, agentBank): - """ Return the agent object associated with a particular broker and agent bank value.""" - bankKey = str(agentBank) - try: - self.cv.acquire() - if bankKey in self.agents: - return self.agents[bankKey] - finally: - self.cv.release() - return None - - def getBrokerAgent(self): - return self.brokerAgent - - def getSessionId(self): - """ Get the identifier of the AMQP session to the broker """ - return self.amqpSessionId - - def getAgents(self): - """ Get the list of agents reachable via this broker """ - try: - self.cv.acquire() - return self.agents.values() - finally: - self.cv.release() - - def getAmqpSession(self): - """ Get the AMQP session object for this connected broker. """ - return self.amqpSession - - def getUrl(self): - """ """ - return BrokerURL(host=self.host, port=self.port) - - def getFullUrl(self, noAuthIfGuestDefault=True): - """ """ - if self.ssl: - scheme = "amqps" - else: - scheme = "amqp" - if self.authUser == "" or \ - (noAuthIfGuestDefault and self.authUser == "guest" and self.authPass == "guest"): - return BrokerURL(scheme=scheme, host=self.host, port=(self.port or 5672)) - else: - return BrokerURL(scheme=scheme, user=self.authUser, password=self.authPass, host=self.host, port=(self.port or 5672)) - - def __repr__(self): - if self.connected: - return "Broker connected at: %s" % self.getUrl() - else: - return "Disconnected Broker" - - def _setSequence(self, sequence, agent): - try: - self.cv.acquire() - self.seqToAgentMap[sequence] = agent - finally: - self.cv.release() - - def _clearSequence(self, sequence): - try: - self.cv.acquire() - self.seqToAgentMap.pop(sequence) - finally: - self.cv.release() - - def _tryToConnect(self): - """ Connect to the broker. Returns True if connection setup completes - successfully, otherwise returns False and sets self.error/self.conn_exc - with error info. Does not raise exceptions. - """ - self.error = None - self.conn_exc = None - try: - try: - self.cv.acquire() - self.agents = {} - finally: - self.cv.release() - - self.topicBound = False - self.syncInFlight = False - self.syncRequest = 0 - self.syncResult = None - self.reqsOutstanding = 1 - - try: - if self.amqpSession: - self.amqpSession.close() - except: - pass - self.amqpSession = None - - try: - if self.conn: - self.conn.close(5) - except: - pass - self.conn = None - - sock = connect(self.host, self.port) - sock.settimeout(5) - oldTimeout = sock.gettimeout() - sock.settimeout(self.connTimeout) - connSock = None - force_blocking = False - if self.ssl: - # Bug (QPID-4337): the "old" implementation of python SSL - # fails if the socket is set to non-blocking (which settimeout() - # may change). - if sys.version_info[:2] < (2, 6): # 2.6+ uses openssl - it's ok - force_blocking = True - sock.setblocking(1) - certfile = None - if 'ssl_certfile' in self.connectArgs: - certfile = self.connectArgs['ssl_certfile'] - keyfile = None - if 'ssl_keyfile' in self.connectArgs: - keyfile = self.connectArgs['ssl_keyfile'] - connSock = ssl(sock, certfile=certfile, keyfile=keyfile) - else: - connSock = sock - if not 'service' in self.connectArgs: - self.connectArgs['service'] = 'qpidd' - self.conn = Connection(connSock, username=self.authUser, password=self.authPass, - mechanism = self.mechanisms, host=self.host, - **self.connectArgs) - def aborted(): - raise Timeout("Waiting for connection to be established with broker") - oldAborted = self.conn.aborted - self.conn.aborted = aborted - self.conn.start() - - # Bug (QPID-4337): don't enable non-blocking (timeouts) for old SSL - if not force_blocking: - sock.settimeout(oldTimeout) - self.conn.aborted = oldAborted - uid = self.conn.user_id - if uid.__class__ == tuple and len(uid) == 2: - self.saslUser = uid[1] - elif type(uid) is str: - self.saslUser = uid; - else: - self.saslUser = None - - # prevent topic queues from filling up (and causing the agents to - # disconnect) by discarding the oldest queued messages when full. - topic_queue_options = {"qpid.policy_type":"ring"} - - self.replyName = "reply-%s" % self.amqpSessionId - self.amqpSession = self.conn.session(self.amqpSessionId) - self.amqpSession.timeout = self.sessTimeout - self.amqpSession.auto_sync = True - self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True) - self.amqpSession.exchange_bind(exchange="amq.direct", - queue=self.replyName, binding_key=self.replyName) - self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest", - accept_mode=self.amqpSession.accept_mode.none, - acquire_mode=self.amqpSession.acquire_mode.pre_acquired) - self.amqpSession.incoming("rdest").listen(self._v1Cb, self._exceptionCb) - self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=self.amqpSession.flow_mode.window) - self.amqpSession.message_flow(destination="rdest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) - self.amqpSession.message_flow(destination="rdest", unit=self.amqpSession.credit_unit.message, value=200) - - self.topicName = "topic-%s" % self.amqpSessionId - self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, - auto_delete=True, - arguments=topic_queue_options) - self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest", - accept_mode=self.amqpSession.accept_mode.none, - acquire_mode=self.amqpSession.acquire_mode.pre_acquired) - self.amqpSession.incoming("tdest").listen(self._v1Cb, self._exceptionCb) - self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=self.amqpSession.flow_mode.window) - self.amqpSession.message_flow(destination="tdest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) - self.amqpSession.message_flow(destination="tdest", unit=self.amqpSession.credit_unit.message, value=200) - - ## - ## Check to see if the broker has QMFv2 exchanges configured - ## - direct_result = self.amqpSession.exchange_query("qmf.default.direct") - topic_result = self.amqpSession.exchange_query("qmf.default.topic") - self.brokerSupportsV2 = not (direct_result.not_found or topic_result.not_found) - - try: - self.cv.acquire() - self.agents = {} - self.brokerAgent = Agent(self, 0, "BrokerAgent", isV2=self.brokerSupportsV2) - self.agents['0'] = self.brokerAgent - finally: - self.cv.release() - - ## - ## Set up connectivity for QMFv2 - ## - if self.brokerSupportsV2: - # set up 3 queues: - # 1 direct queue - for responses destined to this console. - # 2 topic queues - one for heartbeats (hb), one for unsolicited data - # and event indications (ui). - self.v2_direct_queue = "qmfc-v2-%s" % self.amqpSessionId - self.amqpSession.queue_declare(queue=self.v2_direct_queue, exclusive=True, auto_delete=True) - self.v2_topic_queue_ui = "qmfc-v2-ui-%s" % self.amqpSessionId - self.amqpSession.queue_declare(queue=self.v2_topic_queue_ui, - exclusive=True, auto_delete=True, - arguments=topic_queue_options) - self.v2_topic_queue_hb = "qmfc-v2-hb-%s" % self.amqpSessionId - self.amqpSession.queue_declare(queue=self.v2_topic_queue_hb, - exclusive=True, auto_delete=True, - arguments=topic_queue_options) - - self.amqpSession.exchange_bind(exchange="qmf.default.direct", - queue=self.v2_direct_queue, binding_key=self.v2_direct_queue) - ## Other bindings here... - - self.amqpSession.message_subscribe(queue=self.v2_direct_queue, destination="v2dest", - accept_mode=self.amqpSession.accept_mode.none, - acquire_mode=self.amqpSession.acquire_mode.pre_acquired) - self.amqpSession.incoming("v2dest").listen(self._v2Cb, self._exceptionCb) - self.amqpSession.message_set_flow_mode(destination="v2dest", flow_mode=self.amqpSession.flow_mode.window) - self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) - self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.message, value=50) - - self.amqpSession.message_subscribe(queue=self.v2_topic_queue_ui, destination="v2TopicUI", - accept_mode=self.amqpSession.accept_mode.none, - acquire_mode=self.amqpSession.acquire_mode.pre_acquired) - self.amqpSession.incoming("v2TopicUI").listen(self._v2Cb, self._exceptionCb) - self.amqpSession.message_set_flow_mode(destination="v2TopicUI", flow_mode=self.amqpSession.flow_mode.window) - self.amqpSession.message_flow(destination="v2TopicUI", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) - self.amqpSession.message_flow(destination="v2TopicUI", unit=self.amqpSession.credit_unit.message, value=25) - - - self.amqpSession.message_subscribe(queue=self.v2_topic_queue_hb, destination="v2TopicHB", - accept_mode=self.amqpSession.accept_mode.none, - acquire_mode=self.amqpSession.acquire_mode.pre_acquired) - self.amqpSession.incoming("v2TopicHB").listen(self._v2Cb, self._exceptionCb) - self.amqpSession.message_set_flow_mode(destination="v2TopicHB", flow_mode=self.amqpSession.flow_mode.window) - self.amqpSession.message_flow(destination="v2TopicHB", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) - self.amqpSession.message_flow(destination="v2TopicHB", unit=self.amqpSession.credit_unit.message, value=100) - - codec = Codec() - self._setHeader(codec, 'B') - msg = self._message(codec.encoded) - self._send(msg) - - return True # connection complete - - except Exception, e: - self.error = "Exception during connection setup: %s - %s" % (e.__class__.__name__, e) - self.conn_exc = e - if self.session.console: - self.session.console.brokerConnectionFailed(self) - return False # connection failed - - def _updateAgent(self, obj): - """ - Just received an object of class "org.apache.qpid.broker:agent", which - represents a V1 agent. Add or update the list of agent proxies. - """ - bankKey = str(obj.agentBank) - agent = None - if obj._deleteTime == 0: - try: - self.cv.acquire() - if bankKey not in self.agents: - # add new agent only if label is not filtered - if len(self.session.agent_filter) == 0 or obj.label in self.session.agent_filter: - agent = Agent(self, obj.agentBank, obj.label) - self.agents[bankKey] = agent - finally: - self.cv.release() - if agent and self.session.console: - self.session._newAgentCallback(agent) - else: - try: - self.cv.acquire() - agent = self.agents.pop(bankKey, None) - if agent: - agent.close() - finally: - self.cv.release() - if agent and self.session.console: - self.session._delAgentCallback(agent) - - def _addAgent(self, name, agent): - try: - self.cv.acquire() - self.agents[name] = agent - finally: - self.cv.release() - if self.session.console: - self.session._newAgentCallback(agent) - - def _ageAgents(self): - if (time() - self.last_age_check) < self.session.agent_heartbeat_min: - # don't age if it's too soon - return - self.cv.acquire() - try: - to_delete = [] - to_notify = [] - for key in self.agents: - if self.agents[key].isOld(): - to_delete.append(key) - for key in to_delete: - agent = self.agents.pop(key) - agent.close() - to_notify.append(agent) - self.last_age_check = time() - finally: - self.cv.release() - if self.session.console: - for agent in to_notify: - self.session._delAgentCallback(agent) - - def _v2SendAgentLocate(self, predicate=[]): - """ - Broadcast an agent-locate request to cause all agents in the domain to tell us who they are. - """ - # @todo: send locate only to those agents in agent_filter? - dp = self.amqpSession.delivery_properties() - dp.routing_key = "console.request.agent_locate" - mp = self.amqpSession.message_properties() - mp.content_type = "amqp/list" - if self.saslUser: - mp.user_id = self.saslUser - mp.app_id = "qmf2" - mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_direct_queue) - mp.application_headers = {'qmf.opcode':'_agent_locate_request'} - sendCodec = Codec() - sendCodec.write_list(predicate) - msg = Message(dp, mp, sendCodec.encoded) - self._send(msg, "qmf.default.topic") - - def _setHeader(self, codec, opcode, seq=0): - """ Compose the header of a management message. """ - codec.write_uint8(ord('A')) - codec.write_uint8(ord('M')) - codec.write_uint8(ord('2')) - codec.write_uint8(ord(opcode)) - codec.write_uint32(seq) - - def _checkHeader(self, codec): - """ Check the header of a management message and extract the opcode and class. """ - try: - octet = chr(codec.read_uint8()) - if octet != 'A': - return None, None - octet = chr(codec.read_uint8()) - if octet != 'M': - return None, None - octet = chr(codec.read_uint8()) - if octet != '2': - return None, None - opcode = chr(codec.read_uint8()) - seq = codec.read_uint32() - return opcode, seq - except: - return None, None - - def _message (self, body, routing_key="broker", ttl=None): - dp = self.amqpSession.delivery_properties() - dp.routing_key = routing_key - if ttl: - dp.ttl = ttl - mp = self.amqpSession.message_properties() - mp.content_type = "x-application/qmf" - if self.saslUser: - mp.user_id = self.saslUser - mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName) - return Message(dp, mp, body) - - def _send(self, msg, dest="qpid.management"): - self.amqpSession.message_transfer(destination=dest, message=msg) - - def _disconnect(self, err_info=None): - """ Called when the remote broker has disconnected. Re-initializes all - state associated with the broker. - """ - # notify any waiters, and callback - self.cv.acquire() - try: - if err_info is not None: - self.error = err_info - _agents = self.agents - self.agents = {} - for agent in _agents.itervalues(): - agent.close() - self.syncInFlight = False - self.reqsOutstanding = 0 - self.cv.notifyAll() - finally: - self.cv.release() - - if self.session.console: - for agent in _agents.itervalues(): - self.session._delAgentCallback(agent) - - def _shutdown(self, _timeout=10): - """ Disconnect from a broker, and release its resources. Errors are - ignored. - """ - if self.isAlive(): - # kick the thread - self.canceled = True - self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None)) - self.join(_timeout) - - # abort any pending transactions and delete agents - self._disconnect("broker shutdown") - - try: - if self.amqpSession: - self.amqpSession.close(); - except: - pass - self.amqpSession = None - try: - if self.conn: - self.conn.close(_timeout) - except: - pass - self.conn = None - self.connected = False - - def _waitForStable(self): - try: - self.cv.acquire() - if not self.connected: - return - if self.reqsOutstanding == 0: - return - self.syncInFlight = True - starttime = time() - while self.reqsOutstanding != 0: - self.cv.wait(self.SYNC_TIME) - if time() - starttime > self.SYNC_TIME: - raise RuntimeError("Timed out waiting for broker to synchronize") - finally: - self.cv.release() - - def _incOutstanding(self): - try: - self.cv.acquire() - self.reqsOutstanding += 1 - finally: - self.cv.release() - - def _decOutstanding(self): - try: - self.cv.acquire() - self.reqsOutstanding -= 1 - if self.reqsOutstanding == 0 and not self.topicBound: - self.topicBound = True - for key in self.session.v1BindingKeyList: - self.amqpSession.exchange_bind(exchange="qpid.management", - queue=self.topicName, binding_key=key) - if self.brokerSupportsV2: - # do not drop heartbeat indications when under load from data - # or event indications. Put heartbeats on their own dedicated - # queue. - # - for key in self.session.v2BindingKeyList: - if key.startswith("agent.ind.heartbeat"): - self.amqpSession.exchange_bind(exchange="qmf.default.topic", - queue=self.v2_topic_queue_hb, - binding_key=key) - else: - self.amqpSession.exchange_bind(exchange="qmf.default.topic", - queue=self.v2_topic_queue_ui, - binding_key=key) - # solicit an agent locate now, after we bind to agent.ind.data, - # because the agent locate will cause the agent to publish a - # data indication - and now we're able to receive it! - self._v2SendAgentLocate() - - - if self.reqsOutstanding == 0 and self.syncInFlight: - self.syncInFlight = False - self.cv.notify() - finally: - self.cv.release() - - def _v1Cb(self, msg): - """ Callback from session receive thread for V1 messages - """ - self.rcv_queue.put(Broker._q_item(Broker._q_item.type_v1msg, msg)) - - def _v1Dispatch(self, msg): - try: - self._v1DispatchProtected(msg) - except Exception, e: - print "EXCEPTION in Broker._v1Cb:", e - import traceback - traceback.print_exc() - - def _v1DispatchProtected(self, msg): - """ - This is the general message handler for messages received via the QMFv1 exchanges. - """ - try: - agent = None - agent_addr = None - mp = msg.get("message_properties") - ah = mp.application_headers - if ah and 'qmf.agent' in ah: - agent_addr = ah['qmf.agent'] - - if not agent_addr: - # - # See if we can determine the agent identity from the routing key - # - dp = msg.get("delivery_properties") - rkey = None - if dp and dp.routing_key: - rkey = dp.routing_key - items = rkey.split('.') - if len(items) >= 4: - if items[0] == 'console' and items[3].isdigit(): - agent_addr = str(items[3]) # The QMFv1 Agent Bank - if agent_addr != None and agent_addr in self.agents: - agent = self.agents[agent_addr] - - codec = Codec(msg.body) - alreadyTried = None - while True: - opcode, seq = self._checkHeader(codec) - - if not agent and not alreadyTried: - alreadyTried = True - try: - self.cv.acquire() - if seq in self.seqToAgentMap: - agent = self.seqToAgentMap[seq] - finally: - self.cv.release() - - if opcode == None: break - if opcode == 'b': self.session._handleBrokerResp (self, codec, seq) - elif opcode == 'p': self.session._handlePackageInd (self, codec, seq) - elif opcode == 'q': self.session._handleClassInd (self, codec, seq) - elif opcode == 's': self.session._handleSchemaResp (self, codec, seq, agent_addr) - elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg) - elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq, agent) - elif agent: - agent._handleQmfV1Message(opcode, seq, mp, ah, codec) - agent.touch() # mark agent as being alive - - finally: # always ack the message! - try: - # ignore failures as the session may be shutting down... - self.amqpSession.receiver._completed.add(msg.id) - self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed) - except: - pass - - - def _v2Cb(self, msg): - """ Callback from session receive thread for V2 messages - """ - self.rcv_queue.put(Broker._q_item(Broker._q_item.type_v2msg, msg)) - - def _v2Dispatch(self, msg): - try: - self._v2DispatchProtected(msg) - except Exception, e: - print "EXCEPTION in Broker._v2Cb:", e - import traceback - traceback.print_exc() - - def _v2DispatchProtected(self, msg): - """ - This is the general message handler for messages received via QMFv2 exchanges. - """ - try: - mp = msg.get("message_properties") - ah = mp["application_headers"] - codec = Codec(msg.body) - - if 'qmf.opcode' in ah: - opcode = ah['qmf.opcode'] - if mp.content_type == "amqp/list": - try: - content = codec.read_list() - if not content: - content = [] - except: - # malformed list - ignore - content = None - elif mp.content_type == "amqp/map": - try: - content = codec.read_map() - if not content: - content = {} - except: - # malformed map - ignore - content = None - else: - content = None - - if content != None: - ## - ## Directly handle agent heartbeats and agent locate responses as these are broker-scope (they are - ## used to maintain the broker's list of agent proxies. - ## - if opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content) - elif opcode == '_agent_locate_response': self.session._v2HandleAgentLocateRsp(self, mp, ah, content) - else: - ## - ## All other opcodes are agent-scope and are forwarded to the agent proxy representing the sender - ## of the message. - ## - # the broker's agent is mapped to index ['0'] - agentName = ah['qmf.agent'] - v = agentName.split(":") - if agentName == 'broker' or (len(v) >= 2 and v[0] == 'apache.org' - and v[1] == 'qpidd'): - agentName = '0' - if agentName in self.agents: - agent = self.agents[agentName] - agent._handleQmfV2Message(opcode, mp, ah, content) - agent.touch() - - finally: # always ack the message! - try: - # ignore failures as the session may be shutting down... - self.amqpSession.receiver._completed.add(msg.id) - self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed) - except: - pass - - def _exceptionCb(self, data): - """ Exception notification callback from session receive thread. - """ - self.cv.acquire() - try: - self.connected = False - self.error = "exception received from messaging layer: %s" % str(data) - finally: - self.cv.release() - self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None)) - - def run(self): - """ Main body of the running thread. """ - - # First, attempt a connection. In the unmanaged case, - # failure to connect needs to cause the Broker() - # constructor to raise an exception. - delay = self.DELAY_MIN - while not self.canceled: - if self._tryToConnect(): # connection up - break - # unmanaged connection - fail & wake up constructor - if not self.session.manageConnections: - self.ready.release() - return - # managed connection - try again - count = 0 - while not self.canceled and count < delay: - sleep(1) - count += 1 - if delay < self.DELAY_MAX: - delay *= self.DELAY_FACTOR - - if self.canceled: - self.ready.release() - return - - # connection successful! - self.cv.acquire() - try: - self.connected = True - finally: - self.cv.release() - - self.session._handleBrokerConnect(self) - self.ready.release() - - while not self.canceled: - - try: - item = self.rcv_queue.get(timeout=self.session.agent_heartbeat_min) - except Empty: - item = None - - while not self.canceled and item is not None: - - if not self.connected: - # connection failure - while item: - # drain the queue - try: - item = self.rcv_queue.get(block=False) - except Empty: - item = None - break - - self._disconnect() # clean up any pending agents - self.session._handleError(self.error) - self.session._handleBrokerDisconnect(self) - - if not self.session.manageConnections: - return # do not attempt recovery - - # retry connection setup - delay = self.DELAY_MIN - while not self.canceled: - if self._tryToConnect(): - break - # managed connection - try again - count = 0 - while not self.canceled and count < delay: - sleep(1) - count += 1 - if delay < self.DELAY_MAX: - delay *= self.DELAY_FACTOR - - if self.canceled: - return - - # connection successful! - self.cv.acquire() - try: - self.connected = True - finally: - self.cv.release() - - self.session._handleBrokerConnect(self) - - elif item.typecode == Broker._q_item.type_v1msg: - self._v1Dispatch(item.data) - elif item.typecode == Broker._q_item.type_v2msg: - self._v2Dispatch(item.data) - - try: - item = self.rcv_queue.get(block=False) - except Empty: - item = None - - # queue drained, age the agents... - if not self.canceled: - self._ageAgents() - -#=================================================================================================== -# Agent -#=================================================================================================== -class Agent: - """ - This class represents a proxy for a remote agent being managed - """ - def __init__(self, broker, agentBank, label, isV2=False, interval=0): - self.broker = broker - self.session = broker.session - self.schemaCache = self.session.schemaCache - self.brokerBank = broker.getBrokerBank() - self.agentBank = str(agentBank) - self.label = label - self.isV2 = isV2 - self.heartbeatInterval = 0 - if interval: - if interval < self.session.agent_heartbeat_min: - self.heartbeatInterval = self.session.agent_heartbeat_min - else: - self.heartbeatInterval = interval - self.lock = Lock() - self.seqMgr = self.session.seqMgr - self.contextMap = {} - self.unsolicitedContext = RequestContext(self, self) - self.lastSeenTime = time() - self.closed = None - self.epoch = 0 - self.schema_timestamp = None - - - def _checkClosed(self): - if self.closed: - raise Exception("Agent is disconnected") - - - def __call__(self, **kwargs): - """ - This is the handler for unsolicited stuff received from the agent - """ - if 'qmf_object' in kwargs: - if self.session.console: - obj = kwargs['qmf_object'] - if self.session.class_filter and obj.getClassKey(): - # slow path: check classKey against event_filter - pname = obj.getClassKey().getPackageName() - cname = obj.getClassKey().getClassName() - if ((pname, cname) not in self.session.class_filter - and (pname, None) not in self.session.class_filter): - return - if obj.getProperties(): - self.session.console.objectProps(self.broker, obj) - if obj.getStatistics(): - # QMFv2 objects may also contain statistic updates - self.session.console.objectStats(self.broker, obj) - elif 'qmf_object_stats' in kwargs: - if self.session.console: - obj = kwargs['qmf_object_stats'] - if len(self.session.class_filter) == 0: - self.session.console.objectStats(self.broker, obj) - elif obj.getClassKey(): - # slow path: check classKey against event_filter - pname = obj.getClassKey().getPackageName() - cname = obj.getClassKey().getClassName() - if ((pname, cname) in self.session.class_filter - or (pname, None) in self.session.class_filter): - self.session.console.objectStats(self.broker, obj) - elif 'qmf_event' in kwargs: - if self.session.console: - event = kwargs['qmf_event'] - if len(self.session.event_filter) == 0: - self.session.console.event(self.broker, event) - elif event.classKey: - # slow path: check classKey against event_filter - pname = event.classKey.getPackageName() - ename = event.classKey.getClassName() - if ((pname, ename) in self.session.event_filter - or (pname, None) in self.session.event_filter): - self.session.console.event(self.broker, event) - elif 'qmf_schema_id' in kwargs: - ckey = kwargs['qmf_schema_id'] - new_pkg, new_cls = self.session.schemaCache.declareClass(ckey) - if self.session.console: - if new_pkg: - self.session._newPackageCallback(ckey.getPackageName()) - if new_cls: - # translate V2's string based type value to legacy - # integer value for backward compatibility - cls_type = ckey.getType() - if str(cls_type) == ckey.TYPE_DATA: - cls_type = 1 - elif str(cls_type) == ckey.TYPE_EVENT: - cls_type = 2 - self.session._newClassCallback(cls_type, ckey) - - def touch(self): - if self.heartbeatInterval: - self.lastSeenTime = time() - - - def setEpoch(self, epoch): - self.epoch = epoch - - def update_schema_timestamp(self, timestamp): - """ Check the latest schema timestamp from the agent V2 heartbeat. Issue a - query for all packages & classes should the timestamp change. - """ - self.lock.acquire() - try: - if self.schema_timestamp == timestamp: - return - self.schema_timestamp = timestamp - - context = RequestContext(self, self) - sequence = self.seqMgr._reserve(context) - - self.contextMap[sequence] = context - context.setSequence(sequence) - - finally: - self.lock.release() - - self._v2SendSchemaIdQuery(sequence, {}) - - - def epochMismatch(self, epoch): - if epoch == 0 or self.epoch == 0: - return None - if epoch == self.epoch: - return None - return True - - - def isOld(self): - if self.heartbeatInterval == 0: - return None - if time() - self.lastSeenTime > (self.session.agent_heartbeat_miss * self.heartbeatInterval): - return True - return None - - - def close(self): - self.closed = True - copy = {} - try: - self.lock.acquire() - for seq in self.contextMap: - copy[seq] = self.contextMap[seq] - finally: - self.lock.release() - - for seq in copy: - context = copy[seq] - context.cancel("Agent disconnected") - self.seqMgr._release(seq) - - - def __repr__(self): - if self.isV2: - ver = "v2" - else: - ver = "v1" - return "Agent(%s) at bank %d.%s (%s)" % (ver, self.brokerBank, self.agentBank, self.label) - - - def getBroker(self): - return self.broker - - - def getBrokerBank(self): - return self.brokerBank - - - def getAgentBank(self): - return self.agentBank - - - def getV2RoutingKey(self): - if self.agentBank == '0': - return 'broker' - return self.agentBank - - - def getObjects(self, notifiable=None, **kwargs): - """ Get a list of objects from QMF agents. - All arguments are passed by name(keyword). - - If 'notifiable' is None (default), this call will block until completion or timeout. - If supplied, notifiable is assumed to be a callable object that will be called when the - list of queried objects arrives. The single argument to the call shall be a list of - the returned objects. - - The class for queried objects may be specified in one of the following ways: - - _schema = <schema> - supply a schema object returned from getSchema. - _key = <key> - supply a classKey from the list returned by getClasses. - _class = <name> - supply a class name as a string. If the class name exists - in multiple packages, a _package argument may also be supplied. - _objectId = <id> - get the object referenced by the object-id - - The default timeout for this synchronous operation is 60 seconds. To change the timeout, - use the following argument: - - _timeout = <time in seconds> - - If additional arguments are supplied, they are used as property selectors. For example, - if the argument name="test" is supplied, only objects whose "name" property is "test" - will be returned in the result. - """ - self._checkClosed() - if notifiable: - if not callable(notifiable): - raise Exception("notifiable object must be callable") - - # - # Isolate the selectors from the kwargs - # - selectors = {} - for key in kwargs: - value = kwargs[key] - if key[0] != '_': - selectors[key] = value - - # - # Allocate a context to track this asynchronous request. - # - context = RequestContext(self, notifiable, selectors) - sequence = self.seqMgr._reserve(context) - try: - self.lock.acquire() - self.contextMap[sequence] = context - context.setSequence(sequence) - finally: - self.lock.release() - - # - # Compose and send the query message to the agent using the appropriate protocol for the - # agent's QMF version. - # - if self.isV2: - self._v2SendGetQuery(sequence, kwargs) - else: - self.broker._setSequence(sequence, self) - self._v1SendGetQuery(sequence, kwargs) - - # - # If this is a synchronous call, block and wait for completion. - # - if not notifiable: - timeout = 60 - if '_timeout' in kwargs: - timeout = kwargs['_timeout'] - context.waitForSignal(timeout) - if context.exception: - raise Exception(context.exception) - result = context.queryResults - return result - - - def _clearContext(self, sequence): - try: - self.lock.acquire() - try: - self.contextMap.pop(sequence) - self.seqMgr._release(sequence) - except KeyError: - pass # @todo - shouldn't happen, log a warning. - finally: - self.lock.release() - - - def _schemaInfoFromV2Agent(self): - """ - We have just received new schema information from this agent. Check to see if there's - more work that can now be done. - """ - try: - self.lock.acquire() - copy_of_map = {} - for item in self.contextMap: - copy_of_map[item] = self.contextMap[item] - finally: - self.lock.release() - - self.unsolicitedContext.reprocess() - for context in copy_of_map: - copy_of_map[context].reprocess() - - - def _handleV1Completion(self, sequence, code, text): - """ - Called if one of this agent's V1 commands completed - """ - context = None - try: - self.lock.acquire() - if sequence in self.contextMap: - context = self.contextMap[sequence] - finally: - self.lock.release() - - if context: - if code != 0: - ex = "Error %d: %s" % (code, text) - context.setException(ex) - context.signal() - self.broker._clearSequence(sequence) - - - def _v1HandleMethodResp(self, codec, seq): - """ - Handle a QMFv1 method response - """ - code = codec.read_uint32() - text = codec.read_str16() - outArgs = {} - self.broker._clearSequence(seq) - pair = self.seqMgr._release(seq) - if pair == None: - return - method, synchronous = pair - if code == 0: - for arg in method.arguments: - if arg.dir.find("O") != -1: - outArgs[arg.name] = self.session._decodeValue(codec, arg.type, self.broker) - result = MethodResult(code, text, outArgs) - if synchronous: - try: - self.broker.cv.acquire() - self.broker.syncResult = result - self.broker.syncInFlight = False - self.broker.cv.notify() - finally: - self.broker.cv.release() - else: - if self.session.console: - self.session.console.methodResponse(self.broker, seq, result) - - - def _v1HandleEventInd(self, codec, seq): - """ - Handle a QMFv1 event indication - """ - event = Event(self, codec) - self.unsolicitedContext.doEvent(event) - - - def _v1HandleContentInd(self, codec, sequence, prop=False, stat=False): - """ - Handle a QMFv1 content indication - """ - classKey = ClassKey(codec) - schema = self.schemaCache.getSchema(classKey) - if not schema: - return - - obj = Object(self, schema, codec, prop, stat) - if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop: - self.broker._updateAgent(obj) - - context = self.unsolicitedContext - try: - self.lock.acquire() - if sequence in self.contextMap: - context = self.contextMap[sequence] - finally: - self.lock.release() - - context.addV1QueryResult(obj, prop, stat) - - - def _v2HandleDataInd(self, mp, ah, content): - """ - Handle a QMFv2 data indication from the agent. Note: called from context - of the Broker thread. - """ - if content.__class__ != list: - return - - if mp.correlation_id: - try: - self.lock.acquire() - sequence = int(mp.correlation_id) - if sequence not in self.contextMap: - return - context = self.contextMap[sequence] - finally: - self.lock.release() - else: - context = self.unsolicitedContext - - kind = "_data" - if "qmf.content" in ah: - kind = ah["qmf.content"] - if kind == "_data": - for omap in content: - context.addV2QueryResult(omap) - context.processV2Data() - if 'partial' not in ah: - context.signal() - - elif kind == "_event": - for omap in content: - event = Event(self, v2Map=omap) - if event.classKey is None or event.schema: - # schema optional or present - context.doEvent(event) - else: - # schema not optional and not present - if context.addPendingEvent(event): - self._v2SendSchemaRequest(event.classKey) - - elif kind == "_schema_id": - for sid in content: - try: - ckey = ClassKey(sid) - except: - # @todo: log error - ckey = None - if ckey is not None: - # @todo: for now, the application cannot directly send a query for - # _schema_id. This request _must_ have been initiated by the framework - # in order to update the schema cache. - context.notifiable(qmf_schema_id=ckey) - - - def _v2HandleMethodResp(self, mp, ah, content): - """ - Handle a QMFv2 method response from the agent - """ - context = None - sequence = None - if mp.correlation_id: - try: - self.lock.acquire() - seq = int(mp.correlation_id) - finally: - self.lock.release() - else: - return - - pair = self.seqMgr._release(seq) - if pair == None: - return - method, synchronous = pair - - result = MethodResult(0, 'OK', content['_arguments']) - if synchronous: - try: - self.broker.cv.acquire() - self.broker.syncResult = result - self.broker.syncInFlight = False - self.broker.cv.notify() - finally: - self.broker.cv.release() - else: - if self.session.console: - self.session.console.methodResponse(self.broker, seq, result) - - def _v2HandleException(self, mp, ah, content): - """ - Handle a QMFv2 exception - """ - context = None - if mp.correlation_id: - try: - self.lock.acquire() - seq = int(mp.correlation_id) - finally: - self.lock.release() - else: - return - - values = {} - if '_values' in content: - values = content['_values'] - - code = 7 - text = "error" - if 'error_code' in values: - code = values['error_code'] - if 'error_text' in values: - text = values['error_text'] - - pair = self.seqMgr._release(seq) - if pair == None: - return - - if pair.__class__ == RequestContext: - pair.cancel(text) - return - - method, synchronous = pair - - result = MethodResult(code, text, {}) - if synchronous: - try: - self.broker.cv.acquire() - self.broker.syncResult = result - self.broker.syncInFlight = False - self.broker.cv.notify() - finally: - self.broker.cv.release() - else: - if self.session.console: - self.session.console.methodResponse(self.broker, seq, result) - - - def _v1SendGetQuery(self, sequence, kwargs): - """ - Send a get query to a QMFv1 agent. - """ - # - # Build the query map - # - query = {} - if '_class' in kwargs: - query['_class'] = kwargs['_class'] - if '_package' in kwargs: - query['_package'] = kwargs['_package'] - elif '_key' in kwargs: - key = kwargs['_key'] - query['_class'] = key.getClassName() - query['_package'] = key.getPackageName() - elif '_objectId' in kwargs: - query['_objectid'] = kwargs['_objectId'].__repr__() - - # - # Construct and transmit the message - # - sendCodec = Codec() - self.broker._setHeader(sendCodec, 'G', sequence) - sendCodec.write_map(query) - smsg = self.broker._message(sendCodec.encoded, "agent.%d.%s" % (self.brokerBank, self.agentBank)) - self.broker._send(smsg) - - - def _v2SendQuery(self, query, sequence): - """ - Given a query map, construct and send a V2 Query message. - """ - dp = self.broker.amqpSession.delivery_properties() - dp.routing_key = self.getV2RoutingKey() - mp = self.broker.amqpSession.message_properties() - mp.content_type = "amqp/map" - if self.broker.saslUser: - mp.user_id = self.broker.saslUser - mp.correlation_id = str(sequence) - mp.app_id = "qmf2" - mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_direct_queue) - mp.application_headers = {'qmf.opcode':'_query_request'} - sendCodec = Codec() - sendCodec.write_map(query) - msg = Message(dp, mp, sendCodec.encoded) - self.broker._send(msg, "qmf.default.direct") - - - def _v2SendGetQuery(self, sequence, kwargs): - """ - Send a get query to a QMFv2 agent. - """ - # - # Build the query map - # - query = {'_what': 'OBJECT'} - if '_class' in kwargs: - schemaMap = {'_class_name': kwargs['_class']} - if '_package' in kwargs: - schemaMap['_package_name'] = kwargs['_package'] - query['_schema_id'] = schemaMap - elif '_key' in kwargs: - query['_schema_id'] = kwargs['_key'].asMap() - elif '_objectId' in kwargs: - query['_object_id'] = kwargs['_objectId'].asMap() - - self._v2SendQuery(query, sequence) - - - def _v2SendSchemaIdQuery(self, sequence, kwargs): - """ - Send a query for all schema ids to a QMFv2 agent. - """ - # - # Build the query map - # - query = {'_what': 'SCHEMA_ID'} - # @todo - predicate support. For now, return all known schema ids. - - self._v2SendQuery(query, sequence) - - - def _v2SendSchemaRequest(self, schemaId): - """ - Send a query to an agent to request details on a particular schema class. - IMPORTANT: This function currently sends a QMFv1 schema-request to the address of - the agent. The agent will send its response to amq.direct/<our-key>. - Eventually, this will be converted to a proper QMFv2 schema query. - """ - sendCodec = Codec() - seq = self.seqMgr._reserve(None) - self.broker._setHeader(sendCodec, 'S', seq) - schemaId.encode(sendCodec) - smsg = self.broker._message(sendCodec.encoded, self.agentBank) - self.broker._send(smsg, "qmf.default.direct") - - - def _handleQmfV1Message(self, opcode, seq, mp, ah, codec): - """ - Process QMFv1 messages arriving from an agent. Note well: this method is - called from the context of the Broker thread. - """ - if opcode == 'm': self._v1HandleMethodResp(codec, seq) - elif opcode == 'e': self._v1HandleEventInd(codec, seq) - elif opcode == 'c': self._v1HandleContentInd(codec, seq, prop=True) - elif opcode == 'i': self._v1HandleContentInd(codec, seq, stat=True) - elif opcode == 'g': self._v1HandleContentInd(codec, seq, prop=True, stat=True) - - - def _handleQmfV2Message(self, opcode, mp, ah, content): - """ - Process QMFv2 messages arriving from an agent. Note well: this method is - called from the context of the Broker thread. - """ - if opcode == '_data_indication': self._v2HandleDataInd(mp, ah, content) - elif opcode == '_query_response': self._v2HandleDataInd(mp, ah, content) - elif opcode == '_method_response': self._v2HandleMethodResp(mp, ah, content) - elif opcode == '_exception': self._v2HandleException(mp, ah, content) - - -#=================================================================================================== -# RequestContext -#=================================================================================================== -class RequestContext(object): - """ - This class tracks an asynchronous request sent to an agent. - TODO: Add logic for client-side selection and filtering deleted objects from get-queries - """ - def __init__(self, agent, notifiable, selectors={}): - self.sequence = None - self.agent = agent - self.schemaCache = self.agent.schemaCache - self.notifiable = notifiable - self.selectors = selectors - self.startTime = time() - self.rawQueryResults = [] - self.queryResults = [] - self.pendingEvents = {} - self.exception = None - self.waitingForSchema = None - self.pendingSignal = None - self.cv = Condition() - self.blocked = notifiable == None - - - def setSequence(self, sequence): - self.sequence = sequence - - - def addV1QueryResult(self, data, has_props, has_stats): - values = {} - if has_props: - for prop, val in data.getProperties(): - values[prop.name] = val - if has_stats: - for stat, val in data.getStatistics(): - values[stat.name] = val - for key in values: - val = values[key] - if key in self.selectors and val != self.selectors[key]: - return - - if self.notifiable: - if has_props: - self.notifiable(qmf_object=data) - if has_stats: - self.notifiable(qmf_object_stats=data) - else: - self.queryResults.append(data) - - - def addV2QueryResult(self, data): - values = data['_values'] - for key in values: - val = values[key] - if key in self.selectors: - sel_val = self.selectors[key] - if sel_val.__class__ == ObjectId: - val = ObjectId(val, agentName=self.agent.getAgentBank()) - if val != sel_val: - return - self.rawQueryResults.append(data) - - def addPendingEvent(self, event): - """ Stores a received event that is pending a schema. Returns True if this - event is the first instance of a given schema identifier. - """ - self.cv.acquire() - try: - if event.classKey in self.pendingEvents: - self.pendingEvents[event.classKey].append((event, time())) - return False - self.pendingEvents[event.classKey] = [(event, time())] - return True - finally: - self.cv.release() - - def processPendingEvents(self): - """ Walk the pending events looking for schemas that are now - available. Remove any events that now have schema, and process them. - """ - keysToDelete = [] - events = [] - self.cv.acquire() - try: - for key in self.pendingEvents.iterkeys(): - schema = self.schemaCache.getSchema(key) - if schema: - keysToDelete.append(key) - for item in self.pendingEvents[key]: - # item is (timestamp, event-obj) tuple. - # hack: I have no idea what a valid lifetime for an event - # should be. 60 seconds??? - if (time() - item[1]) < 60: - item[0].schema = schema - events.append(item[0]) - for key in keysToDelete: - self.pendingEvents.pop(key) - finally: - self.cv.release() - for event in events: - self.doEvent(event) - - def doEvent(self, data): - if self.notifiable: - self.notifiable(qmf_event=data) - - - def setException(self, ex): - self.exception = ex - - - def getAge(self): - return time() - self.startTime - - - def cancel(self, exception): - self.setException(exception) - try: - self.cv.acquire() - self.blocked = None - self.waitingForSchema = None - self.cv.notify() - finally: - self.cv.release() - self._complete() - - - def waitForSignal(self, timeout): - try: - self.cv.acquire() - while self.blocked: - if (time() - self.startTime) > timeout: - self.exception = "Request timed out after %d seconds" % timeout - return - self.cv.wait(1) - finally: - self.cv.release() - - - def signal(self): - try: - self.cv.acquire() - if self.waitingForSchema: - self.pendingSignal = True - return - else: - self.blocked = None - self.cv.notify() - finally: - self.cv.release() - self._complete() - - - def _complete(self): - if self.notifiable: - if self.exception: - self.notifiable(qmf_exception=self.exception) - else: - self.notifiable(qmf_complete=True) - - if self.sequence: - self.agent._clearContext(self.sequence) - - - def processV2Data(self): - """ - Attempt to make progress on the entries in the raw_query_results queue. If an entry has a schema - that is in our schema cache, process it. Otherwise, send a request for the schema information - to the agent that manages the object. - """ - schemaId = None - queryResults = [] - try: - self.cv.acquire() - if self.waitingForSchema: - return - while (not self.waitingForSchema) and len(self.rawQueryResults) > 0: - head = self.rawQueryResults[0] - schemaId = self._getSchemaIdforV2ObjectLH(head) - schema = self.schemaCache.getSchema(schemaId) - if schema: - obj = Object(self.agent, schema, v2Map=head, agentName=self.agent.agentBank) - queryResults.append(obj) - self.rawQueryResults.pop(0) - else: - self.waitingForSchema = True - finally: - self.cv.release() - - if self.waitingForSchema: - self.agent._v2SendSchemaRequest(schemaId) - - for result in queryResults: - key = result.getClassKey() - if key.getPackageName() == "org.apache.qpid.broker" and key.getClassName() == "agent": - self.agent.broker._updateAgent(result) - if self.notifiable: - self.notifiable(qmf_object=result) - else: - self.queryResults.append(result) - - complete = None - try: - self.cv.acquire() - if not self.waitingForSchema and self.pendingSignal: - self.blocked = None - self.cv.notify() - complete = True - finally: - self.cv.release() - - if complete: - self._complete() - - - def reprocess(self): - """ - New schema information has been added to the schema-cache. Clear our 'waiting' status - and see if we can make more progress on any pending inbound events/objects. - """ - try: - self.cv.acquire() - self.waitingForSchema = None - finally: - self.cv.release() - self.processV2Data() - self.processPendingEvents() - - def _getSchemaIdforV2ObjectLH(self, data): - """ - Given a data map, extract the schema-identifier. - """ - if data.__class__ != dict: - return None - if '_schema_id' in data: - return ClassKey(data['_schema_id']) - return None - - -#=================================================================================================== -# Event -#=================================================================================================== -class Event: - """ """ - def __init__(self, agent, codec=None, v2Map=None): - self.agent = agent - self.session = agent.session - self.broker = agent.broker - - if isinstance(v2Map,dict): - self.isV2 = True - self.classKey = None - self.schema = None - try: - self.arguments = v2Map["_values"] - self.timestamp = long(v2Map["_timestamp"]) - self.severity = v2Map["_severity"] - if "_schema_id" in v2Map: - self.classKey = ClassKey(v2Map["_schema_id"]) - self.classKey._setType(ClassKey.TYPE_EVENT) - except: - raise Exception("Invalid event object: %s " % str(v2Map)) - if self.classKey is not None: - self.schema = self.session.schemaCache.getSchema(self.classKey) - - elif codec is not None: - self.isV2 = None - self.classKey = ClassKey(codec) - self.classKey._setType(ClassKey.TYPE_EVENT) - self.timestamp = codec.read_int64() - self.severity = codec.read_uint8() - self.arguments = {} - self.schema = self.session.schemaCache.getSchema(self.classKey) - if not self.schema: - return - for arg in self.schema.arguments: - self.arguments[arg.name] = self.session._decodeValue(codec, arg.type, - self.broker) - else: - raise Exception("No constructor for event object.") - - - def __repr__(self): - if self.schema == None: - return "<uninterpretable>" - out = strftime("%c", gmtime(self.timestamp / 1000000000)) - out += " " + self._sevName() + " " + self.classKey.getPackageName() + ":" + self.classKey.getClassName() - out += " broker=" + str(self.broker.getUrl()) - for arg in self.schema.arguments: - disp = self.session._displayValue(self.arguments[arg.name], arg.type).encode("utf8") - if " " in disp: - disp = "\"" + disp + "\"" - out += " " + arg.name + "=" + disp - return out - - def _sevName(self): - if self.severity == 0 : return "EMER " - if self.severity == 1 : return "ALERT" - if self.severity == 2 : return "CRIT " - if self.severity == 3 : return "ERROR" - if self.severity == 4 : return "WARN " - if self.severity == 5 : return "NOTIC" - if self.severity == 6 : return "INFO " - if self.severity == 7 : return "DEBUG" - return "INV-%d" % self.severity - - def getClassKey(self): - return self.classKey - - def getArguments(self): - return self.arguments - - def getTimestamp(self): - return self.timestamp - - def getSchema(self): - return self.schema - - -#=================================================================================================== -# SequenceManager -#=================================================================================================== -class SequenceManager: - """ Manage sequence numbers for asynchronous method calls """ - def __init__(self): - self.lock = Lock() - self.sequence = long(time()) # pseudo-randomize the start - self.pending = {} - - def _reserve(self, data): - """ Reserve a unique sequence number """ - try: - self.lock.acquire() - result = self.sequence - self.sequence = self.sequence + 1 - self.pending[result] = data - finally: - self.lock.release() - return result - - def _release(self, seq): - """ Release a reserved sequence number """ - data = None - try: - self.lock.acquire() - if seq in self.pending: - data = self.pending[seq] - del self.pending[seq] - finally: - self.lock.release() - return data - - -#=================================================================================================== -# DebugConsole -#=================================================================================================== -class DebugConsole(Console): - """ """ - def brokerConnected(self, broker): - print "brokerConnected:", broker - - def brokerConnectionFailed(self, broker): - print "brokerConnectionFailed:", broker - - def brokerDisconnected(self, broker): - print "brokerDisconnected:", broker - - def newPackage(self, name): - print "newPackage:", name - - def newClass(self, kind, classKey): - print "newClass:", kind, classKey - - def newAgent(self, agent): - print "newAgent:", agent - - def delAgent(self, agent): - print "delAgent:", agent - - def objectProps(self, broker, record): - print "objectProps:", record - - def objectStats(self, broker, record): - print "objectStats:", record - - def event(self, broker, event): - print "event:", event - - def heartbeat(self, agent, timestamp): - print "heartbeat:", agent - - def brokerInfo(self, broker): - print "brokerInfo:", broker - diff --git a/qpid/cpp/management/python/lib/qpidstore/__init__.py b/qpid/cpp/management/python/lib/qpidstore/__init__.py deleted file mode 100644 index d8a500d9d8..0000000000 --- a/qpid/cpp/management/python/lib/qpidstore/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - diff --git a/qpid/cpp/management/python/lib/qpidstore/janal.py b/qpid/cpp/management/python/lib/qpidstore/janal.py deleted file mode 100644 index 1a892aca60..0000000000 --- a/qpid/cpp/management/python/lib/qpidstore/janal.py +++ /dev/null @@ -1,617 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import jerr, jrnl -import os.path, sys - - -#== class EnqMap ============================================================== - -class EnqMap(object): - """Class for maintaining a map of enqueued records, indexing the rid against hdr, fid and transaction lock""" - - def __init__(self): - """Constructor""" - self.__map = {} - - def __str__(self): - """Print the contents of the map""" - return self.report(True, True) - - def add(self, fid, hdr, lock = False): - """Add a new record into the map""" - if hdr.rid in self.__map: - raise jerr.DuplicateRidError(hdr.rid) - self.__map[hdr.rid] = [fid, hdr, lock] - - def contains(self, rid): - """Return True if the map contains the given rid""" - return rid in self.__map - - def delete(self, rid): - """Delete the rid and its associated data from the map""" - if rid in self.__map: - if self.get_lock(rid): - raise jerr.DeleteLockedRecordError(rid) - del self.__map[rid] - else: - raise jerr.JWarning("ERROR: Deleting non-existent rid from EnqMap: rid=0x%x" % rid) - - def get(self, rid): - """Return a list [fid, hdr, lock] for the given rid""" - if self.contains(rid): - return self.__map[rid] - return None - - def get_fid(self, rid): - """Return the fid for the given rid""" - if self.contains(rid): - return self.__map[rid][0] - return None - - def get_hdr(self, rid): - """Return the header record for the given rid""" - if self.contains(rid): - return self.__map[rid][1] - return None - - def get_lock(self, rid): - """Return the transaction lock value for the given rid""" - if self.contains(rid): - return self.__map[rid][2] - return None - - def get_rec_list(self): - """Return a list of tuples (fid, hdr, lock) for all entries in the map""" - return self.__map.values() - - def lock(self, rid): - """Set the transaction lock for a given rid to True""" - if rid in self.__map: - if not self.__map[rid][2]: # locked - self.__map[rid][2] = True - else: - raise jerr.AlreadyLockedError(rid) - else: - raise jerr.JWarning("ERROR: Locking non-existent rid in EnqMap: rid=0x%x" % rid) - - def report(self, show_stats, show_records): - """Return a string containing a text report for all records in the map""" - if len(self.__map) == 0: - return "No enqueued records found." - rstr = "%d enqueued records found" % len(self.__map) - if show_records: - rstr += ":" - rid_list = self.__map.keys() - rid_list.sort() - for rid in rid_list: - if self.__map[rid][2]: - lock_str = " [LOCKED]" - else: - lock_str = "" - rstr += "\n lfid=%d %s %s" % (rec[0], rec[1], lock_str) - else: - rstr += "." - return rstr - - def rids(self): - """Return a list of rids in the map""" - return self.__map.keys() - - def size(self): - """Return the number of entries in the map""" - return len(self.__map) - - def unlock(self, rid): - """Set the transaction lock for a given rid to False""" - if rid in self.__map: - if self.__map[rid][2]: - self.__map[rid][2] = False - else: - raise jerr.NotLockedError(rid) - else: - raise jerr.NonExistentRecordError("unlock", rid) - - -#== class TxnMap ============================================================== - -class TxnMap(object): - """Transaction map, which maps xids to a list of outstanding actions""" - - def __init__(self, emap): - """Constructor, requires an existing EnqMap instance""" - self.__emap = emap - self.__map = {} - - def __str__(self): - """Print the contents of the map""" - return self.report(True, True) - - def add(self, fid, hdr): - """Add a new transactional record into the map""" - if isinstance(hdr, jrnl.DeqRec): - try: - self.__emap.lock(hdr.deq_rid) - except jerr.JWarning: - # Not in emap, look for rid in tmap - l = self.find_rid(hdr.deq_rid, hdr.xid) - if l != None: - if l[2]: - raise jerr.AlreadyLockedError(hdr.deq_rid) - l[2] = True - if hdr.xid in self.__map: - self.__map[hdr.xid].append([fid, hdr, False]) # append to existing list - else: - self.__map[hdr.xid] = [[fid, hdr, False]] # create new list - - def contains(self, xid): - """Return True if the xid exists in the map; False otherwise""" - return xid in self.__map - - def delete(self, hdr): - """Remove a transaction record from the map using either a commit or abort header""" - if hdr.magic[-1] == "c": - return self._commit(hdr.xid) - if hdr.magic[-1] == "a": - self._abort(hdr.xid) - else: - raise jerr.InvalidRecordTypeError("delete from TxnMap", hdr.magic, hdr.rid) - - def find_rid(self, rid, xid_hint = None): - """ Search for and return map list with supplied rid. If xid_hint is supplied, try that xid first""" - if xid_hint != None and self.contains(xid_hint): - for l in self.__map[xid_hint]: - if l[1].rid == rid: - return l - for xid in self.__map.iterkeys(): - if xid_hint == None or xid != xid_hint: - for l in self.__map[xid]: - if l[1].rid == rid: - return l - - def get(self, xid): - """Return a list of operations for the given xid""" - if self.contains(xid): - return self.__map[xid] - - def report(self, show_stats, show_records): - """Return a string containing a text report for all records in the map""" - if len(self.__map) == 0: - return "No outstanding transactions found." - rstr = "%d outstanding transactions found" % len(self.__map) - if show_records: - rstr += ":" - for xid, tup in self.__map.iteritems(): - rstr += "\n xid=%s:" % jrnl.Utils.format_xid(xid) - for i in tup: - rstr += "\n %s" % str(i[1]) - else: - rstr += "." - return rstr - - def size(self): - """Return the number of xids in the map""" - return len(self.__map) - - def xids(self): - """Return a list of xids in the map""" - return self.__map.keys() - - def _abort(self, xid): - """Perform an abort operation for the given xid record""" - for _, hdr, _ in self.__map[xid]: - if isinstance(hdr, jrnl.DeqRec): - try: - self.__emap.unlock(hdr.deq_rid) - except jerr.NonExistentRecordError, err: # Not in emap, look in current transaction op list (TPL) - found_rid = False - for _, hdr1, _ in self.__map[xid]: - if isinstance(hdr1, jrnl.EnqRec) and hdr1.rid == hdr.deq_rid: - found_rid = True - break - if not found_rid: # Not found in current transaction op list, re-throw error - raise err - del self.__map[xid] - - def _commit(self, xid): - """Perform a commit operation for the given xid record""" - mismatch_list = [] - for fid, hdr, lock in self.__map[xid]: - if isinstance(hdr, jrnl.EnqRec): - self.__emap.add(fid, hdr, lock) # Transfer enq to emap - else: - if self.__emap.contains(hdr.deq_rid): - self.__emap.unlock(hdr.deq_rid) - self.__emap.delete(hdr.deq_rid) - else: - mismatch_list.append("0x%x" % hdr.deq_rid) - del self.__map[xid] - return mismatch_list - -#== class JrnlAnalyzer ======================================================== - -class JrnlAnalyzer(object): - """ - This class analyzes a set of journal files and determines which is the last to be written - (the newest file), and hence which should be the first to be read for recovery (the oldest - file). - - The analysis is performed on construction; the contents of the JrnlInfo object passed provide - the recovery details. - """ - - def __init__(self, jinf): - """Constructor""" - self.__oldest = None - self.__jinf = jinf - self.__flist = self._analyze() - - def __str__(self): - """String representation of this JrnlAnalyzer instance, will print out results of analysis.""" - ostr = "Journal files analyzed in directory %s (* = earliest full):\n" % self.__jinf.get_current_dir() - if self.is_empty(): - ostr += " <All journal files are empty>\n" - else: - for tup in self.__flist: - tmp = " " - if tup[0] == self.__oldest[0]: - tmp = "*" - ostr += " %s %s: owi=%-5s rid=0x%x, fro=0x%x ts=%s\n" % (tmp, os.path.basename(tup[1]), tup[2], - tup[3], tup[4], tup[5]) - for i in range(self.__flist[-1][0] + 1, self.__jinf.get_num_jrnl_files()): - ostr += " %s.%04x.jdat: <empty>\n" % (self.__jinf.get_jrnl_base_name(), i) - return ostr - - # Analysis - - def get_oldest_file(self): - """Return a tuple (ordnum, jfn, owi, rid, fro, timestamp) for the oldest data file found in the journal""" - return self.__oldest - - def get_oldest_file_index(self): - """Return the ordinal number of the oldest data file found in the journal""" - if self.is_empty(): - return None - return self.__oldest[0] - - def is_empty(self): - """Return true if the analysis found that the journal file has never been written to""" - return len(self.__flist) == 0 - - def _analyze(self): - """Perform the journal file analysis by reading and comparing the file headers of each journal data file""" - owi_found = False - flist = [] - for i in range(0, self.__jinf.get_num_jrnl_files()): - jfn = os.path.join(self.__jinf.get_current_dir(), "%s.%04x.jdat" % (self.__jinf.get_jrnl_base_name(), i)) - fhandle = open(jfn) - fhdr = jrnl.Utils.load(fhandle, jrnl.Hdr) - if fhdr.empty(): - break - this_tup = (i, jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str()) - flist.append(this_tup) - if i == 0: - init_owi = fhdr.owi() - self.__oldest = this_tup - elif fhdr.owi() != init_owi and not owi_found: - self.__oldest = this_tup - owi_found = True - return flist - - -#== class JrnlReader ==================================================== - -class JrnlReader(object): - """ - This class contains an Enqueue Map (emap), a transaction map (tmap) and a transaction - object list (txn_obj_list) which are populated by reading the journals from the oldest - to the newest and analyzing each record. The JrnlInfo and JrnlAnalyzer - objects supplied on construction provide the information used for the recovery. - - The analysis is performed on construction. - """ - - def __init__(self, jinfo, jra, qflag = False, rflag = False, vflag = False): - """Constructor, which reads all """ - self._jinfo = jinfo - self._jra = jra - self._qflag = qflag - self._rflag = rflag - self._vflag = vflag - - # test callback functions for CSV tests - self._csv_store_chk = None - self._csv_start_cb = None - self._csv_enq_cb = None - self._csv_deq_cb = None - self._csv_txn_cb = None - self._csv_end_cb = None - - self._emap = EnqMap() - self._tmap = TxnMap(self._emap) - self._txn_obj_list = {} - - self._file = None - self._file_hdr = None - self._file_num = None - self._first_rec_flag = None - self._fro = None - self._last_file_flag = None - self._start_file_num = None - self._file_hdr_owi = None - self._warning = [] - - self._abort_cnt = 0 - self._commit_cnt = 0 - self._msg_cnt = 0 - self._rec_cnt = 0 - self._txn_msg_cnt = 0 - - def __str__(self): - """Print out all the undequeued records""" - return self.report(True, self._rflag) - - def emap(self): - """Get the enqueue map""" - return self._emap - - def get_abort_cnt(self): - """Get the cumulative number of transactional aborts found""" - return self._abort_cnt - - def get_commit_cnt(self): - """Get the cumulative number of transactional commits found""" - return self._commit_cnt - - def get_msg_cnt(self): - """Get the cumulative number of messages found""" - return self._msg_cnt - - def get_rec_cnt(self): - """Get the cumulative number of journal records (including fillers) found""" - return self._rec_cnt - - def is_last_file(self): - """Return True if the last file is being read""" - return self._last_file_flag - - def report(self, show_stats = True, show_records = False): - """Return a string containing a report on the file analysis""" - rstr = self._emap.report(show_stats, show_records) + "\n" + self._tmap.report(show_stats, show_records) - #TODO - print size analysis here - ie how full, sparse, est. space remaining before enq threshold - return rstr - - def run(self): - """Perform the read of the journal""" - if self._csv_start_cb != None and self._csv_start_cb(self._csv_store_chk): - return - if self._jra.is_empty(): - return - stop = self._advance_jrnl_file(*self._jra.get_oldest_file()) - while not stop and not self._get_next_record(): - pass - if self._csv_end_cb != None and self._csv_end_cb(self._csv_store_chk): - return - if not self._qflag: - print - - def set_callbacks(self, csv_store_chk, csv_start_cb = None, csv_enq_cb = None, csv_deq_cb = None, csv_txn_cb = None, - csv_end_cb = None): - """Set callbacks for checks to be made at various points while reading the journal""" - self._csv_store_chk = csv_store_chk - self._csv_start_cb = csv_start_cb - self._csv_enq_cb = csv_enq_cb - self._csv_deq_cb = csv_deq_cb - self._csv_txn_cb = csv_txn_cb - self._csv_end_cb = csv_end_cb - - def tmap(self): - """Return the transaction map""" - return self._tmap - - def get_txn_msg_cnt(self): - """Get the cumulative transactional message count""" - return self._txn_msg_cnt - - def txn_obj_list(self): - """Get a cumulative list of transaction objects (commits and aborts)""" - return self._txn_obj_list - - def _advance_jrnl_file(self, *oldest_file_info): - """Rotate to using the next journal file. Return False if the operation was successful, True if there are no - more files to read.""" - fro_seek_flag = False - if len(oldest_file_info) > 0: - self._start_file_num = self._file_num = oldest_file_info[0] - self._fro = oldest_file_info[4] - fro_seek_flag = True # jump to fro to start reading - if not self._qflag and not self._rflag: - if self._vflag: - print "Recovering journals..." - else: - print "Recovering journals", - if self._file != None and self._is_file_full(): - self._file.close() - self._file_num = self._incr_file_num() - if self._file_num == self._start_file_num: - return True - if self._start_file_num == 0: - self._last_file_flag = self._file_num == self._jinfo.get_num_jrnl_files() - 1 - else: - self._last_file_flag = self._file_num == self._start_file_num - 1 - if self._file_num < 0 or self._file_num >= self._jinfo.get_num_jrnl_files(): - raise jerr.BadFileNumberError(self._file_num) - jfn = os.path.join(self._jinfo.get_current_dir(), "%s.%04x.jdat" % - (self._jinfo.get_jrnl_base_name(), self._file_num)) - self._file = open(jfn) - self._file_hdr = jrnl.Utils.load(self._file, jrnl.Hdr) - if fro_seek_flag and self._file.tell() != self._fro: - self._file.seek(self._fro) - self._first_rec_flag = True - if not self._qflag: - if self._rflag: - print jfn, ": ", self._file_hdr - elif self._vflag: - print "* Reading %s" % jfn - else: - print ".", - sys.stdout.flush() - return False - - def _check_owi(self, hdr): - """Return True if the header's owi indicator matches that of the file header record; False otherwise. This can - indicate whether the last record in a file has been read and now older records which have not yet been - overwritten are now being read.""" - return self._file_hdr_owi == hdr.owi() - - def _is_file_full(self): - """Return True if the current file is full (no more write space); false otherwise""" - return self._file.tell() >= self._jinfo.get_jrnl_file_size_bytes() - - def _get_next_record(self): - """Get the next record in the file for analysis""" - if self._is_file_full(): - if self._advance_jrnl_file(): - return True - try: - hdr = jrnl.Utils.load(self._file, jrnl.Hdr) - except: - return True - if hdr.empty(): - return True - if hdr.check(): - return True - self._rec_cnt += 1 - self._file_hdr_owi = self._file_hdr.owi() - if self._first_rec_flag: - if self._file_hdr.fro != hdr.foffs: - raise jerr.FirstRecordOffsetMismatch(self._file_hdr.fro, hdr.foffs) - else: - if self._rflag: - print " * fro ok: 0x%x" % self._file_hdr.fro - self._first_rec_flag = False - stop = False - if isinstance(hdr, jrnl.EnqRec): - stop = self._handle_enq_rec(hdr) - elif isinstance(hdr, jrnl.DeqRec): - stop = self._handle_deq_rec(hdr) - elif isinstance(hdr, jrnl.TxnRec): - stop = self._handle_txn_rec(hdr) - wstr = "" - for warn in self._warning: - wstr += " (%s)" % warn - if self._rflag: - print " > %s %s" % (hdr, wstr) - self._warning = [] - return stop - - def _handle_deq_rec(self, hdr): - """Process a dequeue ("RHMd") record""" - if self._load_rec(hdr): - return True - - # Check OWI flag - if not self._check_owi(hdr): - self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.") - return True - # Test hook - if self._csv_deq_cb != None and self._csv_deq_cb(self._csv_store_chk, hdr): - return True - - try: - if hdr.xid == None: - self._emap.delete(hdr.deq_rid) - else: - self._tmap.add(self._file_hdr.fid, hdr) - except jerr.JWarning, warn: - self._warning.append(str(warn)) - return False - - def _handle_enq_rec(self, hdr): - """Process a dequeue ("RHMe") record""" - if self._load_rec(hdr): - return True - - # Check extern flag - if hdr.extern and hdr.data != None: - raise jerr.ExternFlagDataError(hdr) - # Check OWI flag - if not self._check_owi(hdr): - self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.") - return True - # Test hook - if self._csv_enq_cb != None and self._csv_enq_cb(self._csv_store_chk, hdr): - return True - - if hdr.xid == None: - self._emap.add(self._file_hdr.fid, hdr) - else: - self._txn_msg_cnt += 1 - self._tmap.add(self._file_hdr.fid, hdr) - self._msg_cnt += 1 - return False - - def _handle_txn_rec(self, hdr): - """Process a transaction ("RHMa or RHMc") record""" - if self._load_rec(hdr): - return True - - # Check OWI flag - if not self._check_owi(hdr): - self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.") - return True - # Test hook - if self._csv_txn_cb != None and self._csv_txn_cb(self._csv_store_chk, hdr): - return True - - if hdr.magic[-1] == "a": - self._abort_cnt += 1 - else: - self._commit_cnt += 1 - - if self._tmap.contains(hdr.xid): - mismatched_rids = self._tmap.delete(hdr) - if mismatched_rids != None and len(mismatched_rids) > 0: - self._warning.append("WARNING: transactional dequeues not found in enqueue map; rids=%s" % - mismatched_rids) - else: - self._warning.append("WARNING: %s not found in transaction map" % jrnl.Utils.format_xid(hdr.xid)) - if hdr.magic[-1] == "c": # commits only - self._txn_obj_list[hdr.xid] = hdr - return False - - def _incr_file_num(self): - """Increment the number of files read with wraparound (ie after file n-1, go to 0)""" - self._file_num += 1 - if self._file_num >= self._jinfo.get_num_jrnl_files(): - self._file_num = 0 - return self._file_num - - def _load_rec(self, hdr): - """Load a single record for the given header. There may be arbitrarily large xids and data components.""" - while not hdr.complete(): - if self._advance_jrnl_file(): - return True - hdr.load(self._file) - return False - -# ============================================================================= - -if __name__ == "__main__": - print "This is a library, and cannot be executed." diff --git a/qpid/cpp/management/python/lib/qpidstore/jerr.py b/qpid/cpp/management/python/lib/qpidstore/jerr.py deleted file mode 100644 index 448f881ce3..0000000000 --- a/qpid/cpp/management/python/lib/qpidstore/jerr.py +++ /dev/null @@ -1,219 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# == Warnings ================================================================= - -class JWarning(Exception): - """Class to convey a warning""" - def __init__(self, err): - """Constructor""" - Exception.__init__(self, err) - -# == Errors =================================================================== - -class AllJrnlFilesEmptyCsvError(Exception): - """All journal files are empty (never been written)""" - def __init__(self, tnum, exp_num_msgs): - """Constructor""" - Exception.__init__(self, "[CSV %d] All journal files are empty, but test expects %d msg(s)." % - (tnum, exp_num_msgs)) - -class AlreadyLockedError(Exception): - """Error class for trying to lock a record that is already locked""" - def __init__(self, rid): - """Constructor""" - Exception.__init__(self, "Locking record which is already locked in EnqMap: rid=0x%x" % rid) - -class BadFileNumberError(Exception): - """Error class for incorrect or unexpected file number""" - def __init__(self, file_num): - """Constructor""" - Exception.__init__(self, "Bad file number %d" % file_num) - -class DataSizeError(Exception): - """Error class for data size mismatch""" - def __init__(self, exp_size, act_size, data_str): - """Constructor""" - Exception.__init__(self, "Inconsistent data size: expected:%d; actual:%d; data=\"%s\"" % - (exp_size, act_size, data_str)) - -class DeleteLockedRecordError(Exception): - """Error class for deleting a locked record from the enqueue map""" - def __init__(self, rid): - """Constructor""" - Exception.__init__(self, "Deleting locked record from EnqMap: rid=0x%s" % rid) - -class DequeueNonExistentEnqueueError(Exception): - """Error class for attempting to dequeue a non-existent enqueue record (rid)""" - def __init__(self, deq_rid): - """Constructor""" - Exception.__init__(self, "Dequeuing non-existent enqueue record: rid=0x%s" % deq_rid) - -class DuplicateRidError(Exception): - """Error class for placing duplicate rid into enqueue map""" - def __init__(self, rid): - """Constructor""" - Exception.__init__(self, "Adding duplicate record to EnqMap: rid=0x%x" % rid) - -class EndianMismatchError(Exception): - """Error class mismatched record header endian flag""" - def __init__(self, exp_endianness): - """Constructor""" - Exception.__init__(self, "Endian mismatch: expected %s, but current record is %s" % - self.endian_str(exp_endianness)) - #@staticmethod - def endian_str(endianness): - """Return a string tuple for the endianness error message""" - if endianness: - return "big", "little" - return "little", "big" - endian_str = staticmethod(endian_str) - -class ExternFlagDataError(Exception): - """Error class for the extern flag being set and the internal size > 0""" - def __init__(self, hdr): - """Constructor""" - Exception.__init__(self, "Message data found (msg size > 0) on record with external flag set: hdr=%s" % hdr) - -class ExternFlagCsvError(Exception): - """External flag mismatch between record and CSV test file""" - def __init__(self, tnum, exp_extern_flag): - """Constructor""" - Exception.__init__(self, "[CSV %d] External flag mismatch: expected %s" % (tnum, exp_extern_flag)) - -class ExternFlagWithDataCsvError(Exception): - """External flag set and Message data found""" - def __init__(self, tnum): - """Constructor""" - Exception.__init__(self, "[CSV %d] Message data found on record with external flag set" % tnum) - -class FillExceedsFileSizeError(Exception): - """Internal error from a fill operation which will exceed the specified file size""" - def __init__(self, cur_size, file_size): - """Constructor""" - Exception.__init__(self, "Filling to size %d > max file size %d" % (cur_size, file_size)) - -class FillSizeError(Exception): - """Internal error from a fill operation that did not match the calculated end point in the file""" - def __init__(self, cur_posn, exp_posn): - """Constructor""" - Exception.__init__(self, "Filled to size %d > expected file posn %d" % (cur_posn, exp_posn)) - -class FirstRecordOffsetMismatch(Exception): - """Error class for file header fro mismatch with actual record""" - def __init__(self, fro, actual_offs): - """Constructor""" - Exception.__init__(self, "File header first record offset mismatch: fro=0x%x; actual offs=0x%x" % - (fro, actual_offs)) - -class InvalidHeaderVersionError(Exception): - """Error class for invalid record header version""" - def __init__(self, exp_ver, act_ver): - """Constructor""" - Exception.__init__(self, "Invalid header version: expected:%d, actual:%d." % (exp_ver, act_ver)) - -class InvalidRecordTypeError(Exception): - """Error class for any operation using an invalid record type""" - def __init__(self, operation, magic, rid): - """Constructor""" - Exception.__init__(self, "Invalid record type for operation: operation=%s record magic=%s, rid=0x%x" % - (operation, magic, rid)) - -class InvalidRecordTailError(Exception): - """Error class for invalid record tail""" - def __init__(self, magic_err, rid_err, rec): - """Constructor""" - Exception.__init__(self, " > %s *INVALID TAIL RECORD (%s)*" % (rec, self.tail_err_str(magic_err, rid_err))) - #@staticmethod - def tail_err_str(magic_err, rid_err): - """Return a string indicating the tail record error(s)""" - estr = "" - if magic_err: - estr = "magic bad" - if rid_err: - estr += ", " - if rid_err: - estr += "rid mismatch" - return estr - tail_err_str = staticmethod(tail_err_str) - -class NonExistentRecordError(Exception): - """Error class for any operation on an non-existent record""" - def __init__(self, operation, rid): - """Constructor""" - Exception.__init__(self, "Operation on non-existent record: operation=%s; rid=0x%x" % (operation, rid)) - -class NotLockedError(Exception): - """Error class for unlocking a record which is not locked in the first place""" - def __init__(self, rid): - """Constructor""" - Exception.__init__(self, "Unlocking record which is not locked in EnqMap: rid=0x%x" % rid) - -class JournalSpaceExceededError(Exception): - """Error class for when journal space of resized journal is too small to contain the transferred records""" - def __init__(self): - """Constructor""" - Exception.__init__(self, "Ran out of journal space while writing records") - -class MessageLengthCsvError(Exception): - """Message length mismatch between record and CSV test file""" - def __init__(self, tnum, exp_msg_len, actual_msg_len): - """Constructor""" - Exception.__init__(self, "[CSV %d] Message length mismatch: expected %d; found %d" % - (tnum, exp_msg_len, actual_msg_len)) - -class NumMsgsCsvError(Exception): - """Number of messages found mismatched with CSV file""" - def __init__(self, tnum, exp_num_msgs, actual_num_msgs): - """Constructor""" - Exception.__init__(self, "[CSV %s] Incorrect number of messages: expected %d, found %d" % - (tnum, exp_num_msgs, actual_num_msgs)) - -class TransactionCsvError(Exception): - """Transaction mismatch between record and CSV file""" - def __init__(self, tnum, exp_transactional): - """Constructor""" - Exception.__init__(self, "[CSV %d] Transaction mismatch: expected %s" % (tnum, exp_transactional)) - -class UnexpectedEndOfFileError(Exception): - """Error class for unexpected end-of-file during reading""" - def __init__(self, exp_size, curr_offs): - """Constructor""" - Exception.__init__(self, "Unexpected end-of-file: expected file size:%d; current offset:%d" % - (exp_size, curr_offs)) - -class XidLengthCsvError(Exception): - """Message Xid length mismatch between record and CSV file""" - def __init__(self, tnum, exp_xid_len, actual_msg_len): - """Constructor""" - Exception.__init__(self, "[CSV %d] Message XID mismatch: expected %d; found %d" % - (tnum, exp_xid_len, actual_msg_len)) - -class XidSizeError(Exception): - """Error class for Xid size mismatch""" - def __init__(self, exp_size, act_size, xid_str): - """Constructor""" - Exception.__init__(self, "Inconsistent xid size: expected:%d; actual:%d; xid=\"%s\"" % - (exp_size, act_size, xid_str)) - -# ============================================================================= - -if __name__ == "__main__": - print "This is a library, and cannot be executed." - diff --git a/qpid/cpp/management/python/lib/qpidstore/jrnl.py b/qpid/cpp/management/python/lib/qpidstore/jrnl.py deleted file mode 100644 index 7c4d6de4a9..0000000000 --- a/qpid/cpp/management/python/lib/qpidstore/jrnl.py +++ /dev/null @@ -1,794 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import jerr -import os.path, sys, xml.parsers.expat -from struct import pack, unpack, calcsize -from time import gmtime, strftime - -# TODO: Get rid of these! Use jinf instance instead -DBLK_SIZE = 128 -SBLK_SIZE = 4 * DBLK_SIZE - -# TODO - this is messy - find a better way to handle this -# This is a global, but is set directly by the calling program -JRNL_FILE_SIZE = None - -#== class Utils ====================================================================== - -class Utils(object): - """Class containing utility functions for dealing with the journal""" - - __printchars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"#$%&'()*+,-./:;<=>?@[\\]^_`{\|}~ " - - # The @staticmethod declarations are not supported in RHEL4 (python 2.3.x) - # When RHEL4 support ends, restore these declarations and remove the older - # staticmethod() declaration. - - #@staticmethod - def format_data(dsize, data): - """Format binary data for printing""" - if data == None: - return "" - if Utils._is_printable(data): - datastr = Utils._split_str(data) - else: - datastr = Utils._hex_split_str(data) - if dsize != len(data): - raise jerr.DataSizeError(dsize, len(data), datastr) - return "data(%d)=\"%s\" " % (dsize, datastr) - format_data = staticmethod(format_data) - - #@staticmethod - def format_xid(xid, xidsize=None): - """Format binary XID for printing""" - if xid == None and xidsize != None: - if xidsize > 0: - raise jerr.XidSizeError(xidsize, 0, None) - return "" - if Utils._is_printable(xid): - xidstr = Utils._split_str(xid) - else: - xidstr = Utils._hex_split_str(xid) - if xidsize == None: - xidsize = len(xid) - elif xidsize != len(xid): - raise jerr.XidSizeError(xidsize, len(xid), xidstr) - return "xid(%d)=\"%s\" " % (xidsize, xidstr) - format_xid = staticmethod(format_xid) - - #@staticmethod - def inv_str(string): - """Perform a binary 1's compliment (invert all bits) on a binary string""" - istr = "" - for index in range(0, len(string)): - istr += chr(~ord(string[index]) & 0xff) - return istr - inv_str = staticmethod(inv_str) - - #@staticmethod - def load(fhandle, klass): - """Load a record of class klass from a file""" - args = Utils._load_args(fhandle, klass) - subclass = klass.discriminate(args) - result = subclass(*args) # create instance of record - if subclass != klass: - result.init(fhandle, *Utils._load_args(fhandle, subclass)) - result.skip(fhandle) - return result - load = staticmethod(load) - - #@staticmethod - def load_file_data(fhandle, size, data): - """Load the data portion of a message from file""" - if size == 0: - return (data, True) - if data == None: - loaded = 0 - else: - loaded = len(data) - foverflow = fhandle.tell() + size - loaded > JRNL_FILE_SIZE - if foverflow: - rsize = JRNL_FILE_SIZE - fhandle.tell() - else: - rsize = size - loaded - fbin = fhandle.read(rsize) - if data == None: - data = unpack("%ds" % (rsize), fbin)[0] - else: - data = data + unpack("%ds" % (rsize), fbin)[0] - return (data, not foverflow) - load_file_data = staticmethod(load_file_data) - - #@staticmethod - def rem_bytes_in_blk(fhandle, blk_size): - """Return the remaining bytes in a block""" - foffs = fhandle.tell() - return Utils.size_in_bytes_to_blk(foffs, blk_size) - foffs - rem_bytes_in_blk = staticmethod(rem_bytes_in_blk) - - #@staticmethod - def size_in_blks(size, blk_size): - """Return the size in terms of data blocks""" - return int((size + blk_size - 1) / blk_size) - size_in_blks = staticmethod(size_in_blks) - - #@staticmethod - def size_in_bytes_to_blk(size, blk_size): - """Return the bytes remaining until the next block boundary""" - return Utils.size_in_blks(size, blk_size) * blk_size - size_in_bytes_to_blk = staticmethod(size_in_bytes_to_blk) - - #@staticmethod - def _hex_split_str(in_str, split_size = 50): - """Split a hex string into two parts separated by an ellipsis""" - if len(in_str) <= split_size: - return Utils._hex_str(in_str, 0, len(in_str)) -# if len(in_str) > split_size + 25: -# return Utils._hex_str(in_str, 0, 10) + " ... " + Utils._hex_str(in_str, 55, 65) + " ... " + \ -# Utils._hex_str(in_str, len(in_str)-10, len(in_str)) - return Utils._hex_str(in_str, 0, 10) + " ... " + Utils._hex_str(in_str, len(in_str)-10, len(in_str)) - _hex_split_str = staticmethod(_hex_split_str) - - #@staticmethod - def _hex_str(in_str, begin, end): - """Return a binary string as a hex string""" - hstr = "" - for index in range(begin, end): - if Utils._is_printable(in_str[index]): - hstr += in_str[index] - else: - hstr += "\\%02x" % ord(in_str[index]) - return hstr - _hex_str = staticmethod(_hex_str) - - #@staticmethod - def _is_printable(in_str): - """Return True if in_str in printable; False otherwise.""" - return in_str.strip(Utils.__printchars) == "" - _is_printable = staticmethod(_is_printable) - - #@staticmethod - def _load_args(fhandle, klass): - """Load the arguments from class klass""" - size = calcsize(klass.FORMAT) - foffs = fhandle.tell(), - fbin = fhandle.read(size) - if len(fbin) != size: - raise jerr.UnexpectedEndOfFileError(size, len(fbin)) - return foffs + unpack(klass.FORMAT, fbin) - _load_args = staticmethod(_load_args) - - #@staticmethod - def _split_str(in_str, split_size = 50): - """Split a string into two parts separated by an ellipsis if it is longer than split_size""" - if len(in_str) < split_size: - return in_str - return in_str[:25] + " ... " + in_str[-25:] - _split_str = staticmethod(_split_str) - - -#== class Hdr ================================================================= - -class Hdr: - """Class representing the journal header records""" - - FORMAT = "=4sBBHQ" - HDR_VER = 1 - OWI_MASK = 0x01 - BIG_ENDIAN = sys.byteorder == "big" - REC_BOUNDARY = DBLK_SIZE - - def __init__(self, foffs, magic, ver, endn, flags, rid): - """Constructor""" -# Sizeable.__init__(self) - self.foffs = foffs - self.magic = magic - self.ver = ver - self.endn = endn - self.flags = flags - self.rid = long(rid) - - def __str__(self): - """Return string representation of this header""" - if self.empty(): - return "0x%08x: <empty>" % (self.foffs) - if self.magic[-1] == "x": - return "0x%08x: [\"%s\"]" % (self.foffs, self.magic) - if self.magic[-1] in ["a", "c", "d", "e", "f", "x"]: - return "0x%08x: [\"%s\" v=%d e=%d f=0x%04x rid=0x%x]" % (self.foffs, self.magic, self.ver, self.endn, - self.flags, self.rid) - return "0x%08x: <error, unknown magic \"%s\" (possible overwrite boundary?)>" % (self.foffs, self.magic) - - #@staticmethod - def discriminate(args): - """Use the last char in the header magic to determine the header type""" - return _CLASSES.get(args[1][-1], Hdr) - discriminate = staticmethod(discriminate) - - def empty(self): - """Return True if this record is empty (ie has a magic of 0x0000""" - return self.magic == "\x00"*4 - - def encode(self): - """Encode the header into a binary string""" - return pack(Hdr.FORMAT, self.magic, self.ver, self.endn, self.flags, self.rid) - - def owi(self): - """Return the OWI (overwrite indicator) for this header""" - return self.flags & self.OWI_MASK != 0 - - def skip(self, fhandle): - """Read and discard the remainder of this record""" - fhandle.read(Utils.rem_bytes_in_blk(fhandle, self.REC_BOUNDARY)) - - def check(self): - """Check that this record is valid""" - if self.empty() or self.magic[:3] != "RHM" or self.magic[3] not in ["a", "c", "d", "e", "f", "x"]: - return True - if self.magic[-1] != "x": - if self.ver != self.HDR_VER: - raise jerr.InvalidHeaderVersionError(self.HDR_VER, self.ver) - if bool(self.endn) != self.BIG_ENDIAN: - raise jerr.EndianMismatchError(self.BIG_ENDIAN) - return False - - -#== class FileHdr ============================================================= - -class FileHdr(Hdr): - """Class for file headers, found at the beginning of journal files""" - - FORMAT = "=2H4x3Q" - REC_BOUNDARY = SBLK_SIZE - - def __str__(self): - """Return a string representation of the this FileHdr instance""" - return "%s fid=%d lid=%d fro=0x%08x t=%s" % (Hdr.__str__(self), self.fid, self.lid, self.fro, - self.timestamp_str()) - - def encode(self): - """Encode this class into a binary string""" - return Hdr.encode(self) + pack(FileHdr.FORMAT, self.fid, self.lid, self.fro, self.time_sec, self.time_ns) - - def init(self, fhandle, foffs, fid, lid, fro, time_sec, time_ns): - """Initialize this instance to known values""" - self.fid = fid - self.lid = lid - self.fro = fro - self.time_sec = time_sec - self.time_ns = time_ns - - def timestamp(self): - """Get the timestamp of this record as a tuple (secs, nsecs)""" - return (self.time_sec, self.time_ns) - - def timestamp_str(self): - """Get the timestamp of this record in string format""" - time = gmtime(self.time_sec) - fstr = "%%a %%b %%d %%H:%%M:%%S.%09d %%Y" % (self.time_ns) - return strftime(fstr, time) - - -#== class DeqRec ============================================================== - -class DeqRec(Hdr): - """Class for a dequeue record""" - - FORMAT = "=QQ" - - def __str__(self): - """Return a string representation of the this DeqRec instance""" - return "%s %sdrid=0x%x" % (Hdr.__str__(self), Utils.format_xid(self.xid, self.xidsize), self.deq_rid) - - def init(self, fhandle, foffs, deq_rid, xidsize): - """Initialize this instance to known values""" - self.deq_rid = deq_rid - self.xidsize = xidsize - self.xid = None - self.deq_tail = None - self.xid_complete = False - self.tail_complete = False - self.tail_bin = None - self.tail_offs = 0 - self.load(fhandle) - - def encode(self): - """Encode this class into a binary string""" - buf = Hdr.encode(self) + pack(DeqRec.FORMAT, self.deq_rid, self.xidsize) - if self.xidsize > 0: - fmt = "%ds" % (self.xidsize) - buf += pack(fmt, self.xid) - buf += self.deq_tail.encode() - return buf - - def load(self, fhandle): - """Load the remainder of this record (after the header has been loaded""" - if self.xidsize == 0: - self.xid_complete = True - self.tail_complete = True - else: - if not self.xid_complete: - (self.xid, self.xid_complete) = Utils.load_file_data(fhandle, self.xidsize, self.xid) - if self.xid_complete and not self.tail_complete: - ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin) - self.tail_bin = ret[0] - if ret[1]: - self.deq_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT, self.tail_bin)) - magic_err = self.deq_tail.magic_inv != Utils.inv_str(self.magic) - rid_err = self.deq_tail.rid != self.rid - if magic_err or rid_err: - raise jerr.InvalidRecordTailError(magic_err, rid_err, self) - self.skip(fhandle) - self.tail_complete = ret[1] - return self.complete() - - def complete(self): - """Returns True if the entire record is loaded, False otherwise""" - return self.xid_complete and self.tail_complete - - -#== class TxnRec ============================================================== - -class TxnRec(Hdr): - """Class for a transaction commit/abort record""" - - FORMAT = "=Q" - - def __str__(self): - """Return a string representation of the this TxnRec instance""" - return "%s %s" % (Hdr.__str__(self), Utils.format_xid(self.xid, self.xidsize)) - - def init(self, fhandle, foffs, xidsize): - """Initialize this instance to known values""" - self.xidsize = xidsize - self.xid = None - self.tx_tail = None - self.xid_complete = False - self.tail_complete = False - self.tail_bin = None - self.tail_offs = 0 - self.load(fhandle) - - def encode(self): - """Encode this class into a binary string""" - return Hdr.encode(self) + pack(TxnRec.FORMAT, self.xidsize) + pack("%ds" % self.xidsize, self.xid) + \ - self.tx_tail.encode() - - def load(self, fhandle): - """Load the remainder of this record (after the header has been loaded""" - if not self.xid_complete: - ret = Utils.load_file_data(fhandle, self.xidsize, self.xid) - self.xid = ret[0] - self.xid_complete = ret[1] - if self.xid_complete and not self.tail_complete: - ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin) - self.tail_bin = ret[0] - if ret[1]: - self.tx_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT, self.tail_bin)) - magic_err = self.tx_tail.magic_inv != Utils.inv_str(self.magic) - rid_err = self.tx_tail.rid != self.rid - if magic_err or rid_err: - raise jerr.InvalidRecordTailError(magic_err, rid_err, self) - self.skip(fhandle) - self.tail_complete = ret[1] - return self.complete() - - def complete(self): - """Returns True if the entire record is loaded, False otherwise""" - return self.xid_complete and self.tail_complete - - -#== class EnqRec ============================================================== - -class EnqRec(Hdr): - """Class for a enqueue record""" - - FORMAT = "=QQ" - TRANSIENT_MASK = 0x10 - EXTERN_MASK = 0x20 - - def __str__(self): - """Return a string representation of the this EnqRec instance""" - return "%s %s%s %s %s" % (Hdr.__str__(self), Utils.format_xid(self.xid, self.xidsize), - Utils.format_data(self.dsize, self.data), self.enq_tail, self.print_flags()) - - def encode(self): - """Encode this class into a binary string""" - buf = Hdr.encode(self) + pack(EnqRec.FORMAT, self.xidsize, self.dsize) - if self.xidsize > 0: - buf += pack("%ds" % self.xidsize, self.xid) - if self.dsize > 0: - buf += pack("%ds" % self.dsize, self.data) - if self.xidsize > 0 or self.dsize > 0: - buf += self.enq_tail.encode() - return buf - - def init(self, fhandle, foffs, xidsize, dsize): - """Initialize this instance to known values""" - self.xidsize = xidsize - self.dsize = dsize - self.transient = self.flags & self.TRANSIENT_MASK > 0 - self.extern = self.flags & self.EXTERN_MASK > 0 - self.xid = None - self.data = None - self.enq_tail = None - self.xid_complete = False - self.data_complete = False - self.tail_complete = False - self.tail_bin = None - self.tail_offs = 0 - self.load(fhandle) - - def load(self, fhandle): - """Load the remainder of this record (after the header has been loaded""" - if not self.xid_complete: - ret = Utils.load_file_data(fhandle, self.xidsize, self.xid) - self.xid = ret[0] - self.xid_complete = ret[1] - if self.xid_complete and not self.data_complete: - if self.extern: - self.data_complete = True - else: - ret = Utils.load_file_data(fhandle, self.dsize, self.data) - self.data = ret[0] - self.data_complete = ret[1] - if self.data_complete and not self.tail_complete: - ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin) - self.tail_bin = ret[0] - if ret[1]: - self.enq_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT, self.tail_bin)) - magic_err = self.enq_tail.magic_inv != Utils.inv_str(self.magic) - rid_err = self.enq_tail.rid != self.rid - if magic_err or rid_err: - raise jerr.InvalidRecordTailError(magic_err, rid_err, self) - self.skip(fhandle) - self.tail_complete = ret[1] - return self.complete() - - def complete(self): - """Returns True if the entire record is loaded, False otherwise""" - return self.xid_complete and self.data_complete and self.tail_complete - - def print_flags(self): - """Utility function to decode the flags field in the header and print a string representation""" - fstr = "" - if self.transient: - fstr = "*TRANSIENT" - if self.extern: - if len(fstr) > 0: - fstr += ",EXTERNAL" - else: - fstr = "*EXTERNAL" - if len(fstr) > 0: - fstr += "*" - return fstr - - -#== class RecTail ============================================================= - -class RecTail: - """Class for a record tail - for all records where either an XID or data separate the header from the end of the - record""" - - FORMAT = "=4sQ" - - def __init__(self, foffs, magic_inv, rid): - """Initialize this instance to known values""" - self.foffs = foffs - self.magic_inv = magic_inv - self.rid = long(rid) - - def __str__(self): - """Return a string representation of the this RecTail instance""" - magic = Utils.inv_str(self.magic_inv) - return "[\"%s\" rid=0x%x]" % (magic, self.rid) - - def encode(self): - """Encode this class into a binary string""" - return pack(RecTail.FORMAT, self.magic_inv, self.rid) - - -#== class JrnlInfo ============================================================ - -class JrnlInfo(object): - """ - This object reads and writes journal information files (<basename>.jinf). Methods are provided - to read a file, query its properties and reset just those properties necessary for normalizing - and resizing a journal. - - Normalizing: resetting the directory and/or base filename to different values. This is necessary - if a set of journal files is copied from one location to another before being restored, as the - value of the path in the file no longer matches the actual path. - - Resizing: If the journal geometry parameters (size and number of journal files) changes, then the - .jinf file must reflect these changes, as this file is the source of information for journal - recovery. - - NOTE: Data size vs File size: There are methods which return the data size and file size of the - journal files. - - +-------------+--------------------/ /----------+ - | File header | File data | - +-------------+--------------------/ /----------+ - | | | - | |<---------- Data size ---------->| - |<------------------ File Size ---------------->| - - Data size: The size of the data content of the journal, ie that part which stores the data records. - - File size: The actual disk size of the journal including data and the file header which precedes the - data. - - The file header is fixed to 1 sblk, so file size = jrnl size + sblk size. - """ - - def __init__(self, jdir, bfn = "JournalData"): - """Constructor""" - self.__jdir = jdir - self.__bfn = bfn - self.__jinf_dict = {} - self._read_jinf() - - def __str__(self): - """Create a string containing all of the journal info contained in the jinf file""" - ostr = "Journal info file %s:\n" % os.path.join(self.__jdir, "%s.jinf" % self.__bfn) - for key, val in self.__jinf_dict.iteritems(): - ostr += " %s = %s\n" % (key, val) - return ostr - - def normalize(self, jdir = None, bfn = None): - """Normalize the directory (ie reset the directory path to match the actual current location) for this - jinf file""" - if jdir == None: - self.__jinf_dict["directory"] = self.__jdir - else: - self.__jdir = jdir - self.__jinf_dict["directory"] = jdir - if bfn != None: - self.__bfn = bfn - self.__jinf_dict["base_filename"] = bfn - - def resize(self, num_jrnl_files = None, jrnl_file_size = None): - """Reset the journal size information to allow for resizing the journal""" - if num_jrnl_files != None: - self.__jinf_dict["number_jrnl_files"] = num_jrnl_files - if jrnl_file_size != None: - self.__jinf_dict["jrnl_file_size_sblks"] = jrnl_file_size * self.get_jrnl_dblk_size_bytes() - - def write(self, jdir = None, bfn = None): - """Write the .jinf file""" - self.normalize(jdir, bfn) - if not os.path.exists(self.get_jrnl_dir()): - os.makedirs(self.get_jrnl_dir()) - fhandle = open(os.path.join(self.get_jrnl_dir(), "%s.jinf" % self.get_jrnl_base_name()), "w") - fhandle.write("<?xml version=\"1.0\" ?>\n") - fhandle.write("<jrnl>\n") - fhandle.write(" <journal_version value=\"%d\" />\n" % self.get_jrnl_version()) - fhandle.write(" <journal_id>\n") - fhandle.write(" <id_string value=\"%s\" />\n" % self.get_jrnl_id()) - fhandle.write(" <directory value=\"%s\" />\n" % self.get_jrnl_dir()) - fhandle.write(" <base_filename value=\"%s\" />\n" % self.get_jrnl_base_name()) - fhandle.write(" </journal_id>\n") - fhandle.write(" <creation_time>\n") - fhandle.write(" <seconds value=\"%d\" />\n" % self.get_creation_time()[0]) - fhandle.write(" <nanoseconds value=\"%d\" />\n" % self.get_creation_time()[1]) - fhandle.write(" <string value=\"%s\" />\n" % self.get_creation_time_str()) - fhandle.write(" </creation_time>\n") - fhandle.write(" <journal_file_geometry>\n") - fhandle.write(" <number_jrnl_files value=\"%d\" />\n" % self.get_num_jrnl_files()) - fhandle.write(" <auto_expand value=\"%s\" />\n" % str.lower(str(self.get_auto_expand()))) - fhandle.write(" <jrnl_file_size_sblks value=\"%d\" />\n" % self.get_jrnl_data_size_sblks()) - fhandle.write(" <JRNL_SBLK_SIZE value=\"%d\" />\n" % self.get_jrnl_sblk_size_dblks()) - fhandle.write(" <JRNL_DBLK_SIZE value=\"%d\" />\n" % self.get_jrnl_dblk_size_bytes()) - fhandle.write(" </journal_file_geometry>\n") - fhandle.write(" <cache_geometry>\n") - fhandle.write(" <wcache_pgsize_sblks value=\"%d\" />\n" % self.get_wr_buf_pg_size_sblks()) - fhandle.write(" <wcache_num_pages value=\"%d\" />\n" % self.get_num_wr_buf_pgs()) - fhandle.write(" <JRNL_RMGR_PAGE_SIZE value=\"%d\" />\n" % self.get_rd_buf_pg_size_sblks()) - fhandle.write(" <JRNL_RMGR_PAGES value=\"%d\" />\n" % self.get_num_rd_buf_pgs()) - fhandle.write(" </cache_geometry>\n") - fhandle.write("</jrnl>\n") - fhandle.close() - - # Journal ID - - def get_jrnl_version(self): - """Get the journal version""" - return self.__jinf_dict["journal_version"] - - def get_jrnl_id(self): - """Get the journal id""" - return self.__jinf_dict["id_string"] - - def get_current_dir(self): - """Get the current directory of the store (as opposed to that value saved in the .jinf file)""" - return self.__jdir - - def get_jrnl_dir(self): - """Get the journal directory stored in the .jinf file""" - return self.__jinf_dict["directory"] - - def get_jrnl_base_name(self): - """Get the base filename - that string used to name the journal files <basefilename>-nnnn.jdat and - <basefilename>.jinf""" - return self.__jinf_dict["base_filename"] - - # Journal creation time - - def get_creation_time(self): - """Get journal creation time as a tuple (secs, nsecs)""" - return (self.__jinf_dict["seconds"], self.__jinf_dict["nanoseconds"]) - - def get_creation_time_str(self): - """Get journal creation time as a string""" - return self.__jinf_dict["string"] - - # --- Files and geometry --- - - def get_num_jrnl_files(self): - """Get number of data files in the journal""" - return self.__jinf_dict["number_jrnl_files"] - - def get_auto_expand(self): - """Return True if auto-expand is enabled; False otherwise""" - return self.__jinf_dict["auto_expand"] - - def get_jrnl_sblk_size_dblks(self): - """Get the journal softblock size in dblks""" - return self.__jinf_dict["JRNL_SBLK_SIZE"] - - def get_jrnl_sblk_size_bytes(self): - """Get the journal softblock size in bytes""" - return self.get_jrnl_sblk_size_dblks() * self.get_jrnl_dblk_size_bytes() - - def get_jrnl_dblk_size_bytes(self): - """Get the journal datablock size in bytes""" - return self.__jinf_dict["JRNL_DBLK_SIZE"] - - def get_jrnl_data_size_sblks(self): - """Get the data capacity (excluding the file headers) for one journal file in softblocks""" - return self.__jinf_dict["jrnl_file_size_sblks"] - - def get_jrnl_data_size_dblks(self): - """Get the data capacity (excluding the file headers) for one journal file in datablocks""" - return self.get_jrnl_data_size_sblks() * self.get_jrnl_sblk_size_dblks() - - def get_jrnl_data_size_bytes(self): - """Get the data capacity (excluding the file headers) for one journal file in bytes""" - return self.get_jrnl_data_size_dblks() * self.get_jrnl_dblk_size_bytes() - - def get_jrnl_file_size_sblks(self): - """Get the size of one journal file on disk (including the file headers) in softblocks""" - return self.get_jrnl_data_size_sblks() + 1 - - def get_jrnl_file_size_dblks(self): - """Get the size of one journal file on disk (including the file headers) in datablocks""" - return self.get_jrnl_file_size_sblks() * self.get_jrnl_sblk_size_dblks() - - def get_jrnl_file_size_bytes(self): - """Get the size of one journal file on disk (including the file headers) in bytes""" - return self.get_jrnl_file_size_dblks() * self.get_jrnl_dblk_size_bytes() - - def get_tot_jrnl_data_size_sblks(self): - """Get the size of the entire jouranl's data capacity (excluding the file headers) for all files together in - softblocks""" - return self.get_num_jrnl_files() * self.get_jrnl_data_size_bytes() - - def get_tot_jrnl_data_size_dblks(self): - """Get the size of the entire jouranl's data capacity (excluding the file headers) for all files together in - datablocks""" - return self.get_num_jrnl_files() * self.get_jrnl_data_size_dblks() - - def get_tot_jrnl_data_size_bytes(self): - """Get the size of the entire jouranl's data capacity (excluding the file headers) for all files together in - bytes""" - return self.get_num_jrnl_files() * self.get_jrnl_data_size_bytes() - - # Read and write buffers - - def get_wr_buf_pg_size_sblks(self): - """Get the size of the write buffer pages in softblocks""" - return self.__jinf_dict["wcache_pgsize_sblks"] - - def get_wr_buf_pg_size_dblks(self): - """Get the size of the write buffer pages in datablocks""" - return self.get_wr_buf_pg_size_sblks() * self.get_jrnl_sblk_size_dblks() - - def get_wr_buf_pg_size_bytes(self): - """Get the size of the write buffer pages in bytes""" - return self.get_wr_buf_pg_size_dblks() * self.get_jrnl_dblk_size_bytes() - - def get_num_wr_buf_pgs(self): - """Get the number of write buffer pages""" - return self.__jinf_dict["wcache_num_pages"] - - def get_rd_buf_pg_size_sblks(self): - """Get the size of the read buffer pages in softblocks""" - return self.__jinf_dict["JRNL_RMGR_PAGE_SIZE"] - - def get_rd_buf_pg_size_dblks(self): - """Get the size of the read buffer pages in datablocks""" - return self.get_rd_buf_pg_size_sblks * self.get_jrnl_sblk_size_dblks() - - def get_rd_buf_pg_size_bytes(self): - """Get the size of the read buffer pages in bytes""" - return self.get_rd_buf_pg_size_dblks * self.get_jrnl_dblk_size_bytes() - - def get_num_rd_buf_pgs(self): - """Get the number of read buffer pages""" - return self.__jinf_dict["JRNL_RMGR_PAGES"] - - def _read_jinf(self): - """Read and initialize this instance from an existing jinf file located at the directory named in the - constructor - called by the constructor""" - fhandle = open(os.path.join(self.__jdir, "%s.jinf" % self.__bfn), "r") - parser = xml.parsers.expat.ParserCreate() - parser.StartElementHandler = self._handle_xml_start_elt - parser.CharacterDataHandler = self._handle_xml_char_data - parser.EndElementHandler = self._handle_xml_end_elt - parser.ParseFile(fhandle) - fhandle.close() - - def _handle_xml_start_elt(self, name, attrs): - """Callback for handling XML start elements. Used by the XML parser.""" - # bool values - if name == "auto_expand": - self.__jinf_dict[name] = attrs["value"] == "true" - # long values - elif name == "seconds" or \ - name == "nanoseconds": - self.__jinf_dict[name] = long(attrs["value"]) - # int values - elif name == "journal_version" or \ - name == "number_jrnl_files" or \ - name == "jrnl_file_size_sblks" or \ - name == "JRNL_SBLK_SIZE" or \ - name == "JRNL_DBLK_SIZE" or \ - name == "wcache_pgsize_sblks" or \ - name == "wcache_num_pages" or \ - name == "JRNL_RMGR_PAGE_SIZE" or \ - name == "JRNL_RMGR_PAGES": - self.__jinf_dict[name] = int(attrs["value"]) - # strings - elif "value" in attrs: - self.__jinf_dict[name] = attrs["value"] - - def _handle_xml_char_data(self, data): - """Callback for handling character data (ie within <elt>...</elt>). The jinf file does not use this in its - data. Used by the XML parser.""" - pass - - def _handle_xml_end_elt(self, name): - """Callback for handling XML end elements. Used by XML parser.""" - pass - - -#============================================================================== - -_CLASSES = { - "a": TxnRec, - "c": TxnRec, - "d": DeqRec, - "e": EnqRec, - "f": FileHdr -} - -if __name__ == "__main__": - print "This is a library, and cannot be executed." diff --git a/qpid/cpp/management/python/lib/qpidtoollibs/__init__.py b/qpid/cpp/management/python/lib/qpidtoollibs/__init__.py deleted file mode 100644 index 2815bac22f..0000000000 --- a/qpid/cpp/management/python/lib/qpidtoollibs/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from qpidtoollibs.broker import * -from qpidtoollibs.disp import * - diff --git a/qpid/cpp/management/python/lib/qpidtoollibs/broker.py b/qpid/cpp/management/python/lib/qpidtoollibs/broker.py deleted file mode 100644 index fca6680067..0000000000 --- a/qpid/cpp/management/python/lib/qpidtoollibs/broker.py +++ /dev/null @@ -1,486 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import sys -from qpidtoollibs.disp import TimeLong -try: - from uuid import uuid4 -except ImportError: - from qpid.datatypes import uuid4 - -class BrokerAgent(object): - """ - Proxy for a manageable Qpid Broker - Invoke with an opened qpid.messaging.Connection - or qpid_messaging.Connection - """ - def __init__(self, conn): - # Use the Message class from the same module as conn which could be qpid.messaging - # or qpid_messaging - self.message_class = sys.modules[conn.__class__.__module__].Message - self.conn = conn - self.sess = self.conn.session() - self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}}" % str(uuid4()) - self.reply_rx = self.sess.receiver(self.reply_to) - self.reply_rx.capacity = 10 - self.tx = self.sess.sender("qmf.default.direct/broker") - self.next_correlator = 1 - - def close(self): - """ - Close the proxy session. This will not affect the connection used in creating the object. - """ - self.sess.close() - - def _method(self, method, arguments=None, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10): - props = {'method' : 'request', - 'qmf.opcode' : '_method_request', - 'x-amqp-0-10.app-id' : 'qmf2'} - correlator = str(self.next_correlator) - self.next_correlator += 1 - - content = {'_object_id' : {'_object_name' : addr}, - '_method_name' : method, - '_arguments' : arguments or {}} - - message = self.message_class( - content, reply_to=self.reply_to, correlation_id=correlator, - properties=props, subject="broker") - self.tx.send(message) - response = self.reply_rx.fetch(timeout) - self.sess.acknowledge() - if response.properties['qmf.opcode'] == '_exception': - raise Exception("Exception from Agent: %r" % response.content['_values']) - if response.properties['qmf.opcode'] != '_method_response': - raise Exception("bad response: %r" % response.properties) - return response.content['_arguments'] - - def _sendRequest(self, opcode, content): - props = {'method' : 'request', - 'qmf.opcode' : opcode, - 'x-amqp-0-10.app-id' : 'qmf2'} - correlator = str(self.next_correlator) - self.next_correlator += 1 - message = self.message_class( - content, reply_to=self.reply_to, correlation_id=correlator, - properties=props, subject="broker") - self.tx.send(message) - return correlator - - def _doClassQuery(self, class_name): - query = {'_what' : 'OBJECT', - '_schema_id' : {'_class_name' : class_name}} - correlator = self._sendRequest('_query_request', query) - response = self.reply_rx.fetch(10) - if response.properties['qmf.opcode'] != '_query_response': - raise Exception("bad response") - items = [] - done = False - while not done: - for item in response.content: - items.append(item) - if 'partial' in response.properties: - response = self.reply_rx.fetch(10) - else: - done = True - self.sess.acknowledge() - return items - - def _doNameQuery(self, object_id): - query = {'_what' : 'OBJECT', '_object_id' : {'_object_name' : object_id}} - correlator = self._sendRequest('_query_request', query) - response = self.reply_rx.fetch(10) - if response.properties['qmf.opcode'] != '_query_response': - raise Exception("bad response") - items = [] - done = False - while not done: - for item in response.content: - items.append(item) - if 'partial' in response.properties: - response = self.reply_rx.fetch(10) - else: - done = True - self.sess.acknowledge() - if len(items) == 1: - return items[0] - return None - - def _getAllBrokerObjects(self, cls): - items = self._doClassQuery(cls.__name__.lower()) - objs = [] - for item in items: - objs.append(cls(self, item)) - return objs - - def _getBrokerObject(self, cls, oid): - obj = self._doNameQuery(oid) - if obj: - return cls(self, obj) - return None - - def _getSingleObject(self, cls): - # - # getAllBrokerObjects is used instead of getBrokerObject(Broker, 'amqp-broker') because - # of a bug that used to be in the broker whereby by-name queries did not return the - # object timestamps. - # - objects = self._getAllBrokerObjects(cls) - if objects: return objects[0] - return None - - def getBroker(self): - """ - Get the Broker object that contains broker-scope statistics and operations. - """ - return self._getSingleObject(Broker) - - - def getCluster(self): - return self._getSingleObject(Cluster) - - def getHaBroker(self): - return self._getSingleObject(HaBroker) - - def getAllConnections(self): - return self._getAllBrokerObjects(Connection) - - def getConnection(self, oid): - return self._getBrokerObject(Connection, "org.apache.qpid.broker:connection:%s" % oid) - - def getAllSessions(self): - return self._getAllBrokerObjects(Session) - - def getSession(self, oid): - return self._getBrokerObject(Session, "org.apache.qpid.broker:session:%s" % oid) - - def getAllSubscriptions(self): - return self._getAllBrokerObjects(Subscription) - - def getSubscription(self, oid): - return self._getBrokerObject(Subscription, "org.apache.qpid.broker:subscription:%s" % oid) - - def getAllExchanges(self): - return self._getAllBrokerObjects(Exchange) - - def getExchange(self, name): - return self._getBrokerObject(Exchange, "org.apache.qpid.broker:exchange:%s" % name) - - def getAllQueues(self): - return self._getAllBrokerObjects(Queue) - - def getQueue(self, name): - return self._getBrokerObject(Queue, "org.apache.qpid.broker:queue:%s" % name) - - def getAllBindings(self): - return self._getAllBrokerObjects(Binding) - - def getAllLinks(self): - return self._getAllBrokerObjects(Link) - - def getAcl(self): - return self._getSingleObject(Acl) - - def getMemory(self): - return self._getSingleObject(Memory) - - def echo(self, sequence = 1, body = "Body"): - """Request a response to test the path to the management broker""" - args = {'sequence' : sequence, 'body' : body} - return self._method('echo', args) - - def connect(self, host, port, durable, authMechanism, username, password, transport): - """Establish a connection to another broker""" - pass - - def queueMoveMessages(self, srcQueue, destQueue, qty): - """Move messages from one queue to another""" - self._method("queueMoveMessages", {'srcQueue':srcQueue,'destQueue':destQueue,'qty':qty}) - - def queueRedirect(self, sourceQueue, targetQueue): - """Enable/disable delivery redirect for indicated queues""" - self._method("queueRedirect", {'sourceQueue':sourceQueue,'targetQueue':targetQueue}) - - def setLogLevel(self, level): - """Set the log level""" - self._method("setLogLevel", {'level':level}) - - def getLogLevel(self): - """Get the log level""" - return self._method('getLogLevel') - - def setTimestampConfig(self, receive): - """Set the message timestamping configuration""" - self._method("setTimestampConfig", {'receive':receive}) - - def getTimestampConfig(self): - """Get the message timestamping configuration""" - return self._method('getTimestampConfig') - - def setLogHiresTimestamp(self, logHires): - """Set the high resolution timestamp in logs""" - self._method("setLogHiresTimestamp", {'logHires':logHires}) - - def getLogHiresTimestamp(self): - """Get the high resolution timestamp in logs""" - return self._method('getLogHiresTimestamp') - - def addExchange(self, exchange_type, name, options={}, **kwargs): - properties = {} - properties['exchange-type'] = exchange_type - for k,v in options.items(): - properties[k] = v - for k,v in kwargs.items(): - properties[k] = v - args = {'type': 'exchange', - 'name': name, - 'properties': properties, - 'strict': True} - self._method('create', args) - - def delExchange(self, name): - args = {'type': 'exchange', 'name': name} - self._method('delete', args) - - def addQueue(self, name, options={}, **kwargs): - properties = options - for k,v in kwargs.items(): - properties[k] = v - args = {'type': 'queue', - 'name': name, - 'properties': properties, - 'strict': True} - self._method('create', args) - - def delQueue(self, name, if_empty=True, if_unused=True): - options = {'if_empty': if_empty, - 'if_unused': if_unused} - - args = {'type': 'queue', - 'name': name, - 'options': options} - self._method('delete', args) - - def bind(self, exchange, queue, key="", options={}, **kwargs): - properties = options - for k,v in kwargs.items(): - properties[k] = v - args = {'type': 'binding', - 'name': "%s/%s/%s" % (exchange, queue, key), - 'properties': properties, - 'strict': True} - self._method('create', args) - - def unbind(self, exchange, queue, key, **kwargs): - args = {'type': 'binding', - 'name': "%s/%s/%s" % (exchange, queue, key), - 'strict': True} - self._method('delete', args) - - def reloadAclFile(self): - self._method('reloadACLFile', {}, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") - - def acl_lookup(self, userName, action, aclObj, aclObjName, propMap): - args = {'userId': userName, - 'action': action, - 'object': aclObj, - 'objectName': aclObjName, - 'propertyMap': propMap} - return self._method('Lookup', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") - - def acl_lookupPublish(self, userName, exchange, key): - args = {'userId': userName, - 'exchangeName': exchange, - 'routingKey': key} - return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") - - def Redirect(self, sourceQueue, targetQueue): - args = {'sourceQueue': sourceQueue, - 'targetQueue': targetQueue} - return self._method('queueRedirect', args, "org.apache.qpid.broker:broker:amqp-broker") - - def create(self, _type, name, properties={}, strict=False): - """Create an object of the specified type""" - args = {'type': _type, - 'name': name, - 'properties': properties, - 'strict': strict} - return self._method('create', args) - - def delete(self, _type, name, options): - """Delete an object of the specified type""" - args = {'type': _type, - 'name': name, - 'options': options} - return self._method('delete', args) - - def list(self, _type): - """List objects of the specified type""" - return [i["_values"] for i in self._doClassQuery(_type.lower())] - - def query(self, _type, oid): - """Query the current state of an object""" - return self._getBrokerObject(self, _type, oid) - - -class EventHelper(object): - def eventAddress(self, pkg='*', cls='*', sev='*'): - return "qmf.default.topic/agent.ind.event.%s.%s.%s.#" % (pkg.replace('.', '_'), cls, sev) - - def event(self, msg): - return BrokerEvent(msg) - - -class BrokerEvent(object): - def __init__(self, msg): - self.msg = msg - self.content = msg.content[0] - self.values = self.content['_values'] - self.schema_id = self.content['_schema_id'] - self.name = "%s:%s" % (self.schema_id['_package_name'], self.schema_id['_class_name']) - - def __repr__(self): - rep = "%s %s" % (TimeLong(self.getTimestamp()), self.name) - for k,v in self.values.items(): - rep = rep + " %s=%s" % (k, v) - return rep - - def __getattr__(self, key): - if key not in self.values: - return None - value = self.values[key] - return value - - def getAttributes(self): - return self.values - - def getTimestamp(self): - return self.content['_timestamp'] - - -class BrokerObject(object): - def __init__(self, broker, content): - self.broker = broker - self.content = content - self.values = content['_values'] - - def __getattr__(self, key): - if key not in self.values: - return None - value = self.values[key] - if value.__class__ == dict and '_object_name' in value: - full_name = value['_object_name'] - colon = full_name.find(':') - if colon > 0: - full_name = full_name[colon+1:] - colon = full_name.find(':') - if colon > 0: - return full_name[colon+1:] - return value - - def getObjectId(self): - return self.content['_object_id']['_object_name'] - - def getAttributes(self): - return self.values - - def getCreateTime(self): - return self.content['_create_ts'] - - def getDeleteTime(self): - return self.content['_delete_ts'] - - def getUpdateTime(self): - return self.content['_update_ts'] - - def update(self): - """ - Reload the property values from the agent. - """ - refreshed = self.broker._getBrokerObject(self.__class__, self.getObjectId()) - if refreshed: - self.content = refreshed.content - self.values = self.content['_values'] - else: - raise Exception("No longer exists on the broker") - -class Broker(BrokerObject): - def __init__(self, broker, values): - BrokerObject.__init__(self, broker, values) - -class Cluster(BrokerObject): - def __init__(self, broker, values): - BrokerObject.__init__(self, broker, values) - -class HaBroker(BrokerObject): - def __init__(self, broker, values): - BrokerObject.__init__(self, broker, values) - -class Memory(BrokerObject): - def __init__(self, broker, values): - BrokerObject.__init__(self, broker, values) - -class Connection(BrokerObject): - def __init__(self, broker, values): - BrokerObject.__init__(self, broker, values) - - def close(self): - self.broker._method("close", {}, "org.apache.qpid.broker:connection:%s" % self.address) - -class Session(BrokerObject): - def __init__(self, broker, values): - BrokerObject.__init__(self, broker, values) - -class Subscription(BrokerObject): - def __init__(self, broker, values): - BrokerObject.__init__(self, broker, values) - - def __repr__(self): - return "subscription name undefined" - -class Exchange(BrokerObject): - def __init__(self, broker, values): - BrokerObject.__init__(self, broker, values) - -class Binding(BrokerObject): - def __init__(self, broker, values): - BrokerObject.__init__(self, broker, values) - - def __repr__(self): - return "Binding key: %s" % self.values['bindingKey'] - -class Queue(BrokerObject): - def __init__(self, broker, values): - BrokerObject.__init__(self, broker, values) - - def purge(self, request): - """Discard all or some messages on a queue""" - self.broker._method("purge", {'request':request}, "org.apache.qpid.broker:queue:%s" % self.name) - - def reroute(self, request, useAltExchange, exchange, filter={}): - """Remove all or some messages on this queue and route them to an exchange""" - self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter}, - "org.apache.qpid.broker:queue:%s" % self.name) - -class Link(BrokerObject): - def __init__(self, broker, values): - BrokerObject.__init__(self, broker, values) - -class Acl(BrokerObject): - def __init__(self, broker, values): - BrokerObject.__init__(self, broker, values) diff --git a/qpid/cpp/management/python/lib/qpidtoollibs/config.py b/qpid/cpp/management/python/lib/qpidtoollibs/config.py deleted file mode 100644 index 9168215ac3..0000000000 --- a/qpid/cpp/management/python/lib/qpidtoollibs/config.py +++ /dev/null @@ -1,36 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -"""Utilities for managing configuration files""" -import os - -QPID_ENV_PREFIX="QPID_" - -def parse_qpidd_conf(config_file): - """Parse a qpidd.conf configuration file into a dictionary""" - f = open(config_file) - try: - clean = filter(None, [line.split("#")[0].strip() for line in f]) # Strip comments and blanks - def item(line): return [x.strip() for x in line.split("=")] - config = dict(item(line) for line in clean if "=" in line) - finally: f.close() - def name(env_name): return env_name[len(QPID_ENV_PREFIX):].lower() - env = dict((name(i[0]), i[1]) for i in os.environ.iteritems() if i[0].startswith(QPID_ENV_PREFIX)) - config.update(env) # Environment takes precedence - return config diff --git a/qpid/cpp/management/python/lib/qpidtoollibs/disp.py b/qpid/cpp/management/python/lib/qpidtoollibs/disp.py deleted file mode 100644 index 1b7419ba2c..0000000000 --- a/qpid/cpp/management/python/lib/qpidtoollibs/disp.py +++ /dev/null @@ -1,270 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from time import strftime, gmtime - -def YN(val): - if val: - return 'Y' - return 'N' - -def Commas(value): - sval = str(value) - result = "" - while True: - if len(sval) == 0: - return result - left = sval[:-3] - right = sval[-3:] - result = right + result - if len(left) > 0: - result = ',' + result - sval = left - -def TimeLong(value): - return strftime("%c", gmtime(value / 1000000000)) - -def TimeShort(value): - return strftime("%X", gmtime(value / 1000000000)) - - -class Header: - """ """ - NONE = 1 - KMG = 2 - YN = 3 - Y = 4 - TIME_LONG = 5 - TIME_SHORT = 6 - DURATION = 7 - COMMAS = 8 - - def __init__(self, text, format=NONE): - self.text = text - self.format = format - - def __repr__(self): - return self.text - - def __str__(self): - return self.text - - def formatted(self, value): - try: - if value == None: - return '' - if self.format == Header.NONE: - return value - if self.format == Header.KMG: - return self.num(value) - if self.format == Header.YN: - if value: - return 'Y' - return 'N' - if self.format == Header.Y: - if value: - return 'Y' - return '' - if self.format == Header.TIME_LONG: - return TimeLong(value) - if self.format == Header.TIME_SHORT: - return TimeShort(value) - if self.format == Header.DURATION: - if value < 0: value = 0 - sec = value / 1000000000 - min = sec / 60 - hour = min / 60 - day = hour / 24 - result = "" - if day > 0: - result = "%dd " % day - if hour > 0 or result != "": - result += "%dh " % (hour % 24) - if min > 0 or result != "": - result += "%dm " % (min % 60) - result += "%ds" % (sec % 60) - return result - if self.format == Header.COMMAS: - return Commas(value) - except: - return "?" - - def numCell(self, value, tag): - fp = float(value) / 1000. - if fp < 10.0: - return "%1.2f%c" % (fp, tag) - if fp < 100.0: - return "%2.1f%c" % (fp, tag) - return "%4d%c" % (value / 1000, tag) - - def num(self, value): - if value < 1000: - return "%4d" % value - if value < 1000000: - return self.numCell(value, 'k') - value /= 1000 - if value < 1000000: - return self.numCell(value, 'm') - value /= 1000 - return self.numCell(value, 'g') - - -class Display: - """ Display formatting for QPID Management CLI """ - - def __init__(self, spacing=2, prefix=" "): - self.tableSpacing = spacing - self.tablePrefix = prefix - self.timestampFormat = "%X" - - def formattedTable(self, title, heads, rows): - fRows = [] - for row in rows: - fRow = [] - col = 0 - for cell in row: - fRow.append(heads[col].formatted(cell)) - col += 1 - fRows.append(fRow) - headtext = [] - for head in heads: - headtext.append(head.text) - self.table(title, headtext, fRows) - - def table(self, title, heads, rows): - """ Print a table with autosized columns """ - - # Pad the rows to the number of heads - for row in rows: - diff = len(heads) - len(row) - for idx in range(diff): - row.append("") - - print title - if len (rows) == 0: - return - colWidth = [] - col = 0 - line = self.tablePrefix - for head in heads: - width = len (head) - for row in rows: - text = row[col] - if text.__class__ == str: - text = text.decode('utf-8') - cellWidth = len(unicode(text)) - if cellWidth > width: - width = cellWidth - colWidth.append (width + self.tableSpacing) - line = line + head - if col < len (heads) - 1: - for i in range (colWidth[col] - len (head)): - line = line + " " - col = col + 1 - print line - line = self.tablePrefix - for width in colWidth: - line = line + "=" * width - line = line[:255] - print line - - for row in rows: - line = self.tablePrefix - col = 0 - for width in colWidth: - text = row[col] - if text.__class__ == str: - text = text.decode('utf-8') - line = line + unicode(text) - if col < len (heads) - 1: - for i in range (width - len(unicode(text))): - line = line + " " - col = col + 1 - print line - - def do_setTimeFormat (self, fmt): - """ Select timestamp format """ - if fmt == "long": - self.timestampFormat = "%c" - elif fmt == "short": - self.timestampFormat = "%X" - - def timestamp (self, nsec): - """ Format a nanosecond-since-the-epoch timestamp for printing """ - return strftime (self.timestampFormat, gmtime (nsec / 1000000000)) - - def duration(self, nsec): - if nsec < 0: nsec = 0 - sec = nsec / 1000000000 - min = sec / 60 - hour = min / 60 - day = hour / 24 - result = "" - if day > 0: - result = "%dd " % day - if hour > 0 or result != "": - result += "%dh " % (hour % 24) - if min > 0 or result != "": - result += "%dm " % (min % 60) - result += "%ds" % (sec % 60) - return result - -class Sortable: - """ """ - def __init__(self, row, sortIndex): - self.row = row - self.sortIndex = sortIndex - if sortIndex >= len(row): - raise Exception("sort index exceeds row boundary") - - def __cmp__(self, other): - return cmp(self.row[self.sortIndex], other.row[self.sortIndex]) - - def getRow(self): - return self.row - -class Sorter: - """ """ - def __init__(self, heads, rows, sortCol, limit=0, inc=True): - col = 0 - for head in heads: - if head.text == sortCol: - break - col += 1 - if col == len(heads): - raise Exception("sortCol '%s', not found in headers" % sortCol) - - list = [] - for row in rows: - list.append(Sortable(row, col)) - list.sort() - if not inc: - list.reverse() - count = 0 - self.sorted = [] - for row in list: - self.sorted.append(row.getRow()) - count += 1 - if count == limit: - break - - def getSorted(self): - return self.sorted |
