From 71149592670f7592886751a9a866459bef0f12cc Mon Sep 17 00:00:00 2001 From: Justin Ross Date: Thu, 21 Apr 2016 12:31:34 +0000 Subject: QPID-7207: Create independent cpp and python subtrees, with content from tools and extras git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1740289 13f79535-47bb-0310-9956-ffa450edef68 --- .../management/python/lib/qpidstore/__init__.py | 19 + qpid/cpp/management/python/lib/qpidstore/janal.py | 617 ++++++++++++++++ qpid/cpp/management/python/lib/qpidstore/jerr.py | 219 ++++++ qpid/cpp/management/python/lib/qpidstore/jrnl.py | 794 +++++++++++++++++++++ 4 files changed, 1649 insertions(+) create mode 100644 qpid/cpp/management/python/lib/qpidstore/__init__.py create mode 100644 qpid/cpp/management/python/lib/qpidstore/janal.py create mode 100644 qpid/cpp/management/python/lib/qpidstore/jerr.py create mode 100644 qpid/cpp/management/python/lib/qpidstore/jrnl.py (limited to 'qpid/cpp/management/python/lib/qpidstore') diff --git a/qpid/cpp/management/python/lib/qpidstore/__init__.py b/qpid/cpp/management/python/lib/qpidstore/__init__.py new file mode 100644 index 0000000000..d8a500d9d8 --- /dev/null +++ b/qpid/cpp/management/python/lib/qpidstore/__init__.py @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + diff --git a/qpid/cpp/management/python/lib/qpidstore/janal.py b/qpid/cpp/management/python/lib/qpidstore/janal.py new file mode 100644 index 0000000000..1a892aca60 --- /dev/null +++ b/qpid/cpp/management/python/lib/qpidstore/janal.py @@ -0,0 +1,617 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import jerr, jrnl +import os.path, sys + + +#== class EnqMap ============================================================== + +class EnqMap(object): + """Class for maintaining a map of enqueued records, indexing the rid against hdr, fid and transaction lock""" + + def __init__(self): + """Constructor""" + self.__map = {} + + def __str__(self): + """Print the contents of the map""" + return self.report(True, True) + + def add(self, fid, hdr, lock = False): + """Add a new record into the map""" + if hdr.rid in self.__map: + raise jerr.DuplicateRidError(hdr.rid) + self.__map[hdr.rid] = [fid, hdr, lock] + + def contains(self, rid): + """Return True if the map contains the given rid""" + return rid in self.__map + + def delete(self, rid): + """Delete the rid and its associated data from the map""" + if rid in self.__map: + if self.get_lock(rid): + raise jerr.DeleteLockedRecordError(rid) + del self.__map[rid] + else: + raise jerr.JWarning("ERROR: Deleting non-existent rid from EnqMap: rid=0x%x" % rid) + + def get(self, rid): + """Return a list [fid, hdr, lock] for the given rid""" + if self.contains(rid): + return self.__map[rid] + return None + + def get_fid(self, rid): + """Return the fid for the given rid""" + if self.contains(rid): + return self.__map[rid][0] + return None + + def get_hdr(self, rid): + """Return the header record for the given rid""" + if self.contains(rid): + return self.__map[rid][1] + return None + + def get_lock(self, rid): + """Return the transaction lock value for the given rid""" + if self.contains(rid): + return self.__map[rid][2] + return None + + def get_rec_list(self): + """Return a list of tuples (fid, hdr, lock) for all entries in the map""" + return self.__map.values() + + def lock(self, rid): + """Set the transaction lock for a given rid to True""" + if rid in self.__map: + if not self.__map[rid][2]: # locked + self.__map[rid][2] = True + else: + raise jerr.AlreadyLockedError(rid) + else: + raise jerr.JWarning("ERROR: Locking non-existent rid in EnqMap: rid=0x%x" % rid) + + def report(self, show_stats, show_records): + """Return a string containing a text report for all records in the map""" + if len(self.__map) == 0: + return "No enqueued records found." + rstr = "%d enqueued records found" % len(self.__map) + if show_records: + rstr += ":" + rid_list = self.__map.keys() + rid_list.sort() + for rid in rid_list: + if self.__map[rid][2]: + lock_str = " [LOCKED]" + else: + lock_str = "" + rstr += "\n lfid=%d %s %s" % (rec[0], rec[1], lock_str) + else: + rstr += "." + return rstr + + def rids(self): + """Return a list of rids in the map""" + return self.__map.keys() + + def size(self): + """Return the number of entries in the map""" + return len(self.__map) + + def unlock(self, rid): + """Set the transaction lock for a given rid to False""" + if rid in self.__map: + if self.__map[rid][2]: + self.__map[rid][2] = False + else: + raise jerr.NotLockedError(rid) + else: + raise jerr.NonExistentRecordError("unlock", rid) + + +#== class TxnMap ============================================================== + +class TxnMap(object): + """Transaction map, which maps xids to a list of outstanding actions""" + + def __init__(self, emap): + """Constructor, requires an existing EnqMap instance""" + self.__emap = emap + self.__map = {} + + def __str__(self): + """Print the contents of the map""" + return self.report(True, True) + + def add(self, fid, hdr): + """Add a new transactional record into the map""" + if isinstance(hdr, jrnl.DeqRec): + try: + self.__emap.lock(hdr.deq_rid) + except jerr.JWarning: + # Not in emap, look for rid in tmap + l = self.find_rid(hdr.deq_rid, hdr.xid) + if l != None: + if l[2]: + raise jerr.AlreadyLockedError(hdr.deq_rid) + l[2] = True + if hdr.xid in self.__map: + self.__map[hdr.xid].append([fid, hdr, False]) # append to existing list + else: + self.__map[hdr.xid] = [[fid, hdr, False]] # create new list + + def contains(self, xid): + """Return True if the xid exists in the map; False otherwise""" + return xid in self.__map + + def delete(self, hdr): + """Remove a transaction record from the map using either a commit or abort header""" + if hdr.magic[-1] == "c": + return self._commit(hdr.xid) + if hdr.magic[-1] == "a": + self._abort(hdr.xid) + else: + raise jerr.InvalidRecordTypeError("delete from TxnMap", hdr.magic, hdr.rid) + + def find_rid(self, rid, xid_hint = None): + """ Search for and return map list with supplied rid. If xid_hint is supplied, try that xid first""" + if xid_hint != None and self.contains(xid_hint): + for l in self.__map[xid_hint]: + if l[1].rid == rid: + return l + for xid in self.__map.iterkeys(): + if xid_hint == None or xid != xid_hint: + for l in self.__map[xid]: + if l[1].rid == rid: + return l + + def get(self, xid): + """Return a list of operations for the given xid""" + if self.contains(xid): + return self.__map[xid] + + def report(self, show_stats, show_records): + """Return a string containing a text report for all records in the map""" + if len(self.__map) == 0: + return "No outstanding transactions found." + rstr = "%d outstanding transactions found" % len(self.__map) + if show_records: + rstr += ":" + for xid, tup in self.__map.iteritems(): + rstr += "\n xid=%s:" % jrnl.Utils.format_xid(xid) + for i in tup: + rstr += "\n %s" % str(i[1]) + else: + rstr += "." + return rstr + + def size(self): + """Return the number of xids in the map""" + return len(self.__map) + + def xids(self): + """Return a list of xids in the map""" + return self.__map.keys() + + def _abort(self, xid): + """Perform an abort operation for the given xid record""" + for _, hdr, _ in self.__map[xid]: + if isinstance(hdr, jrnl.DeqRec): + try: + self.__emap.unlock(hdr.deq_rid) + except jerr.NonExistentRecordError, err: # Not in emap, look in current transaction op list (TPL) + found_rid = False + for _, hdr1, _ in self.__map[xid]: + if isinstance(hdr1, jrnl.EnqRec) and hdr1.rid == hdr.deq_rid: + found_rid = True + break + if not found_rid: # Not found in current transaction op list, re-throw error + raise err + del self.__map[xid] + + def _commit(self, xid): + """Perform a commit operation for the given xid record""" + mismatch_list = [] + for fid, hdr, lock in self.__map[xid]: + if isinstance(hdr, jrnl.EnqRec): + self.__emap.add(fid, hdr, lock) # Transfer enq to emap + else: + if self.__emap.contains(hdr.deq_rid): + self.__emap.unlock(hdr.deq_rid) + self.__emap.delete(hdr.deq_rid) + else: + mismatch_list.append("0x%x" % hdr.deq_rid) + del self.__map[xid] + return mismatch_list + +#== class JrnlAnalyzer ======================================================== + +class JrnlAnalyzer(object): + """ + This class analyzes a set of journal files and determines which is the last to be written + (the newest file), and hence which should be the first to be read for recovery (the oldest + file). + + The analysis is performed on construction; the contents of the JrnlInfo object passed provide + the recovery details. + """ + + def __init__(self, jinf): + """Constructor""" + self.__oldest = None + self.__jinf = jinf + self.__flist = self._analyze() + + def __str__(self): + """String representation of this JrnlAnalyzer instance, will print out results of analysis.""" + ostr = "Journal files analyzed in directory %s (* = earliest full):\n" % self.__jinf.get_current_dir() + if self.is_empty(): + ostr += " \n" + else: + for tup in self.__flist: + tmp = " " + if tup[0] == self.__oldest[0]: + tmp = "*" + ostr += " %s %s: owi=%-5s rid=0x%x, fro=0x%x ts=%s\n" % (tmp, os.path.basename(tup[1]), tup[2], + tup[3], tup[4], tup[5]) + for i in range(self.__flist[-1][0] + 1, self.__jinf.get_num_jrnl_files()): + ostr += " %s.%04x.jdat: \n" % (self.__jinf.get_jrnl_base_name(), i) + return ostr + + # Analysis + + def get_oldest_file(self): + """Return a tuple (ordnum, jfn, owi, rid, fro, timestamp) for the oldest data file found in the journal""" + return self.__oldest + + def get_oldest_file_index(self): + """Return the ordinal number of the oldest data file found in the journal""" + if self.is_empty(): + return None + return self.__oldest[0] + + def is_empty(self): + """Return true if the analysis found that the journal file has never been written to""" + return len(self.__flist) == 0 + + def _analyze(self): + """Perform the journal file analysis by reading and comparing the file headers of each journal data file""" + owi_found = False + flist = [] + for i in range(0, self.__jinf.get_num_jrnl_files()): + jfn = os.path.join(self.__jinf.get_current_dir(), "%s.%04x.jdat" % (self.__jinf.get_jrnl_base_name(), i)) + fhandle = open(jfn) + fhdr = jrnl.Utils.load(fhandle, jrnl.Hdr) + if fhdr.empty(): + break + this_tup = (i, jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str()) + flist.append(this_tup) + if i == 0: + init_owi = fhdr.owi() + self.__oldest = this_tup + elif fhdr.owi() != init_owi and not owi_found: + self.__oldest = this_tup + owi_found = True + return flist + + +#== class JrnlReader ==================================================== + +class JrnlReader(object): + """ + This class contains an Enqueue Map (emap), a transaction map (tmap) and a transaction + object list (txn_obj_list) which are populated by reading the journals from the oldest + to the newest and analyzing each record. The JrnlInfo and JrnlAnalyzer + objects supplied on construction provide the information used for the recovery. + + The analysis is performed on construction. + """ + + def __init__(self, jinfo, jra, qflag = False, rflag = False, vflag = False): + """Constructor, which reads all """ + self._jinfo = jinfo + self._jra = jra + self._qflag = qflag + self._rflag = rflag + self._vflag = vflag + + # test callback functions for CSV tests + self._csv_store_chk = None + self._csv_start_cb = None + self._csv_enq_cb = None + self._csv_deq_cb = None + self._csv_txn_cb = None + self._csv_end_cb = None + + self._emap = EnqMap() + self._tmap = TxnMap(self._emap) + self._txn_obj_list = {} + + self._file = None + self._file_hdr = None + self._file_num = None + self._first_rec_flag = None + self._fro = None + self._last_file_flag = None + self._start_file_num = None + self._file_hdr_owi = None + self._warning = [] + + self._abort_cnt = 0 + self._commit_cnt = 0 + self._msg_cnt = 0 + self._rec_cnt = 0 + self._txn_msg_cnt = 0 + + def __str__(self): + """Print out all the undequeued records""" + return self.report(True, self._rflag) + + def emap(self): + """Get the enqueue map""" + return self._emap + + def get_abort_cnt(self): + """Get the cumulative number of transactional aborts found""" + return self._abort_cnt + + def get_commit_cnt(self): + """Get the cumulative number of transactional commits found""" + return self._commit_cnt + + def get_msg_cnt(self): + """Get the cumulative number of messages found""" + return self._msg_cnt + + def get_rec_cnt(self): + """Get the cumulative number of journal records (including fillers) found""" + return self._rec_cnt + + def is_last_file(self): + """Return True if the last file is being read""" + return self._last_file_flag + + def report(self, show_stats = True, show_records = False): + """Return a string containing a report on the file analysis""" + rstr = self._emap.report(show_stats, show_records) + "\n" + self._tmap.report(show_stats, show_records) + #TODO - print size analysis here - ie how full, sparse, est. space remaining before enq threshold + return rstr + + def run(self): + """Perform the read of the journal""" + if self._csv_start_cb != None and self._csv_start_cb(self._csv_store_chk): + return + if self._jra.is_empty(): + return + stop = self._advance_jrnl_file(*self._jra.get_oldest_file()) + while not stop and not self._get_next_record(): + pass + if self._csv_end_cb != None and self._csv_end_cb(self._csv_store_chk): + return + if not self._qflag: + print + + def set_callbacks(self, csv_store_chk, csv_start_cb = None, csv_enq_cb = None, csv_deq_cb = None, csv_txn_cb = None, + csv_end_cb = None): + """Set callbacks for checks to be made at various points while reading the journal""" + self._csv_store_chk = csv_store_chk + self._csv_start_cb = csv_start_cb + self._csv_enq_cb = csv_enq_cb + self._csv_deq_cb = csv_deq_cb + self._csv_txn_cb = csv_txn_cb + self._csv_end_cb = csv_end_cb + + def tmap(self): + """Return the transaction map""" + return self._tmap + + def get_txn_msg_cnt(self): + """Get the cumulative transactional message count""" + return self._txn_msg_cnt + + def txn_obj_list(self): + """Get a cumulative list of transaction objects (commits and aborts)""" + return self._txn_obj_list + + def _advance_jrnl_file(self, *oldest_file_info): + """Rotate to using the next journal file. Return False if the operation was successful, True if there are no + more files to read.""" + fro_seek_flag = False + if len(oldest_file_info) > 0: + self._start_file_num = self._file_num = oldest_file_info[0] + self._fro = oldest_file_info[4] + fro_seek_flag = True # jump to fro to start reading + if not self._qflag and not self._rflag: + if self._vflag: + print "Recovering journals..." + else: + print "Recovering journals", + if self._file != None and self._is_file_full(): + self._file.close() + self._file_num = self._incr_file_num() + if self._file_num == self._start_file_num: + return True + if self._start_file_num == 0: + self._last_file_flag = self._file_num == self._jinfo.get_num_jrnl_files() - 1 + else: + self._last_file_flag = self._file_num == self._start_file_num - 1 + if self._file_num < 0 or self._file_num >= self._jinfo.get_num_jrnl_files(): + raise jerr.BadFileNumberError(self._file_num) + jfn = os.path.join(self._jinfo.get_current_dir(), "%s.%04x.jdat" % + (self._jinfo.get_jrnl_base_name(), self._file_num)) + self._file = open(jfn) + self._file_hdr = jrnl.Utils.load(self._file, jrnl.Hdr) + if fro_seek_flag and self._file.tell() != self._fro: + self._file.seek(self._fro) + self._first_rec_flag = True + if not self._qflag: + if self._rflag: + print jfn, ": ", self._file_hdr + elif self._vflag: + print "* Reading %s" % jfn + else: + print ".", + sys.stdout.flush() + return False + + def _check_owi(self, hdr): + """Return True if the header's owi indicator matches that of the file header record; False otherwise. This can + indicate whether the last record in a file has been read and now older records which have not yet been + overwritten are now being read.""" + return self._file_hdr_owi == hdr.owi() + + def _is_file_full(self): + """Return True if the current file is full (no more write space); false otherwise""" + return self._file.tell() >= self._jinfo.get_jrnl_file_size_bytes() + + def _get_next_record(self): + """Get the next record in the file for analysis""" + if self._is_file_full(): + if self._advance_jrnl_file(): + return True + try: + hdr = jrnl.Utils.load(self._file, jrnl.Hdr) + except: + return True + if hdr.empty(): + return True + if hdr.check(): + return True + self._rec_cnt += 1 + self._file_hdr_owi = self._file_hdr.owi() + if self._first_rec_flag: + if self._file_hdr.fro != hdr.foffs: + raise jerr.FirstRecordOffsetMismatch(self._file_hdr.fro, hdr.foffs) + else: + if self._rflag: + print " * fro ok: 0x%x" % self._file_hdr.fro + self._first_rec_flag = False + stop = False + if isinstance(hdr, jrnl.EnqRec): + stop = self._handle_enq_rec(hdr) + elif isinstance(hdr, jrnl.DeqRec): + stop = self._handle_deq_rec(hdr) + elif isinstance(hdr, jrnl.TxnRec): + stop = self._handle_txn_rec(hdr) + wstr = "" + for warn in self._warning: + wstr += " (%s)" % warn + if self._rflag: + print " > %s %s" % (hdr, wstr) + self._warning = [] + return stop + + def _handle_deq_rec(self, hdr): + """Process a dequeue ("RHMd") record""" + if self._load_rec(hdr): + return True + + # Check OWI flag + if not self._check_owi(hdr): + self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.") + return True + # Test hook + if self._csv_deq_cb != None and self._csv_deq_cb(self._csv_store_chk, hdr): + return True + + try: + if hdr.xid == None: + self._emap.delete(hdr.deq_rid) + else: + self._tmap.add(self._file_hdr.fid, hdr) + except jerr.JWarning, warn: + self._warning.append(str(warn)) + return False + + def _handle_enq_rec(self, hdr): + """Process a dequeue ("RHMe") record""" + if self._load_rec(hdr): + return True + + # Check extern flag + if hdr.extern and hdr.data != None: + raise jerr.ExternFlagDataError(hdr) + # Check OWI flag + if not self._check_owi(hdr): + self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.") + return True + # Test hook + if self._csv_enq_cb != None and self._csv_enq_cb(self._csv_store_chk, hdr): + return True + + if hdr.xid == None: + self._emap.add(self._file_hdr.fid, hdr) + else: + self._txn_msg_cnt += 1 + self._tmap.add(self._file_hdr.fid, hdr) + self._msg_cnt += 1 + return False + + def _handle_txn_rec(self, hdr): + """Process a transaction ("RHMa or RHMc") record""" + if self._load_rec(hdr): + return True + + # Check OWI flag + if not self._check_owi(hdr): + self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.") + return True + # Test hook + if self._csv_txn_cb != None and self._csv_txn_cb(self._csv_store_chk, hdr): + return True + + if hdr.magic[-1] == "a": + self._abort_cnt += 1 + else: + self._commit_cnt += 1 + + if self._tmap.contains(hdr.xid): + mismatched_rids = self._tmap.delete(hdr) + if mismatched_rids != None and len(mismatched_rids) > 0: + self._warning.append("WARNING: transactional dequeues not found in enqueue map; rids=%s" % + mismatched_rids) + else: + self._warning.append("WARNING: %s not found in transaction map" % jrnl.Utils.format_xid(hdr.xid)) + if hdr.magic[-1] == "c": # commits only + self._txn_obj_list[hdr.xid] = hdr + return False + + def _incr_file_num(self): + """Increment the number of files read with wraparound (ie after file n-1, go to 0)""" + self._file_num += 1 + if self._file_num >= self._jinfo.get_num_jrnl_files(): + self._file_num = 0 + return self._file_num + + def _load_rec(self, hdr): + """Load a single record for the given header. There may be arbitrarily large xids and data components.""" + while not hdr.complete(): + if self._advance_jrnl_file(): + return True + hdr.load(self._file) + return False + +# ============================================================================= + +if __name__ == "__main__": + print "This is a library, and cannot be executed." diff --git a/qpid/cpp/management/python/lib/qpidstore/jerr.py b/qpid/cpp/management/python/lib/qpidstore/jerr.py new file mode 100644 index 0000000000..448f881ce3 --- /dev/null +++ b/qpid/cpp/management/python/lib/qpidstore/jerr.py @@ -0,0 +1,219 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# == Warnings ================================================================= + +class JWarning(Exception): + """Class to convey a warning""" + def __init__(self, err): + """Constructor""" + Exception.__init__(self, err) + +# == Errors =================================================================== + +class AllJrnlFilesEmptyCsvError(Exception): + """All journal files are empty (never been written)""" + def __init__(self, tnum, exp_num_msgs): + """Constructor""" + Exception.__init__(self, "[CSV %d] All journal files are empty, but test expects %d msg(s)." % + (tnum, exp_num_msgs)) + +class AlreadyLockedError(Exception): + """Error class for trying to lock a record that is already locked""" + def __init__(self, rid): + """Constructor""" + Exception.__init__(self, "Locking record which is already locked in EnqMap: rid=0x%x" % rid) + +class BadFileNumberError(Exception): + """Error class for incorrect or unexpected file number""" + def __init__(self, file_num): + """Constructor""" + Exception.__init__(self, "Bad file number %d" % file_num) + +class DataSizeError(Exception): + """Error class for data size mismatch""" + def __init__(self, exp_size, act_size, data_str): + """Constructor""" + Exception.__init__(self, "Inconsistent data size: expected:%d; actual:%d; data=\"%s\"" % + (exp_size, act_size, data_str)) + +class DeleteLockedRecordError(Exception): + """Error class for deleting a locked record from the enqueue map""" + def __init__(self, rid): + """Constructor""" + Exception.__init__(self, "Deleting locked record from EnqMap: rid=0x%s" % rid) + +class DequeueNonExistentEnqueueError(Exception): + """Error class for attempting to dequeue a non-existent enqueue record (rid)""" + def __init__(self, deq_rid): + """Constructor""" + Exception.__init__(self, "Dequeuing non-existent enqueue record: rid=0x%s" % deq_rid) + +class DuplicateRidError(Exception): + """Error class for placing duplicate rid into enqueue map""" + def __init__(self, rid): + """Constructor""" + Exception.__init__(self, "Adding duplicate record to EnqMap: rid=0x%x" % rid) + +class EndianMismatchError(Exception): + """Error class mismatched record header endian flag""" + def __init__(self, exp_endianness): + """Constructor""" + Exception.__init__(self, "Endian mismatch: expected %s, but current record is %s" % + self.endian_str(exp_endianness)) + #@staticmethod + def endian_str(endianness): + """Return a string tuple for the endianness error message""" + if endianness: + return "big", "little" + return "little", "big" + endian_str = staticmethod(endian_str) + +class ExternFlagDataError(Exception): + """Error class for the extern flag being set and the internal size > 0""" + def __init__(self, hdr): + """Constructor""" + Exception.__init__(self, "Message data found (msg size > 0) on record with external flag set: hdr=%s" % hdr) + +class ExternFlagCsvError(Exception): + """External flag mismatch between record and CSV test file""" + def __init__(self, tnum, exp_extern_flag): + """Constructor""" + Exception.__init__(self, "[CSV %d] External flag mismatch: expected %s" % (tnum, exp_extern_flag)) + +class ExternFlagWithDataCsvError(Exception): + """External flag set and Message data found""" + def __init__(self, tnum): + """Constructor""" + Exception.__init__(self, "[CSV %d] Message data found on record with external flag set" % tnum) + +class FillExceedsFileSizeError(Exception): + """Internal error from a fill operation which will exceed the specified file size""" + def __init__(self, cur_size, file_size): + """Constructor""" + Exception.__init__(self, "Filling to size %d > max file size %d" % (cur_size, file_size)) + +class FillSizeError(Exception): + """Internal error from a fill operation that did not match the calculated end point in the file""" + def __init__(self, cur_posn, exp_posn): + """Constructor""" + Exception.__init__(self, "Filled to size %d > expected file posn %d" % (cur_posn, exp_posn)) + +class FirstRecordOffsetMismatch(Exception): + """Error class for file header fro mismatch with actual record""" + def __init__(self, fro, actual_offs): + """Constructor""" + Exception.__init__(self, "File header first record offset mismatch: fro=0x%x; actual offs=0x%x" % + (fro, actual_offs)) + +class InvalidHeaderVersionError(Exception): + """Error class for invalid record header version""" + def __init__(self, exp_ver, act_ver): + """Constructor""" + Exception.__init__(self, "Invalid header version: expected:%d, actual:%d." % (exp_ver, act_ver)) + +class InvalidRecordTypeError(Exception): + """Error class for any operation using an invalid record type""" + def __init__(self, operation, magic, rid): + """Constructor""" + Exception.__init__(self, "Invalid record type for operation: operation=%s record magic=%s, rid=0x%x" % + (operation, magic, rid)) + +class InvalidRecordTailError(Exception): + """Error class for invalid record tail""" + def __init__(self, magic_err, rid_err, rec): + """Constructor""" + Exception.__init__(self, " > %s *INVALID TAIL RECORD (%s)*" % (rec, self.tail_err_str(magic_err, rid_err))) + #@staticmethod + def tail_err_str(magic_err, rid_err): + """Return a string indicating the tail record error(s)""" + estr = "" + if magic_err: + estr = "magic bad" + if rid_err: + estr += ", " + if rid_err: + estr += "rid mismatch" + return estr + tail_err_str = staticmethod(tail_err_str) + +class NonExistentRecordError(Exception): + """Error class for any operation on an non-existent record""" + def __init__(self, operation, rid): + """Constructor""" + Exception.__init__(self, "Operation on non-existent record: operation=%s; rid=0x%x" % (operation, rid)) + +class NotLockedError(Exception): + """Error class for unlocking a record which is not locked in the first place""" + def __init__(self, rid): + """Constructor""" + Exception.__init__(self, "Unlocking record which is not locked in EnqMap: rid=0x%x" % rid) + +class JournalSpaceExceededError(Exception): + """Error class for when journal space of resized journal is too small to contain the transferred records""" + def __init__(self): + """Constructor""" + Exception.__init__(self, "Ran out of journal space while writing records") + +class MessageLengthCsvError(Exception): + """Message length mismatch between record and CSV test file""" + def __init__(self, tnum, exp_msg_len, actual_msg_len): + """Constructor""" + Exception.__init__(self, "[CSV %d] Message length mismatch: expected %d; found %d" % + (tnum, exp_msg_len, actual_msg_len)) + +class NumMsgsCsvError(Exception): + """Number of messages found mismatched with CSV file""" + def __init__(self, tnum, exp_num_msgs, actual_num_msgs): + """Constructor""" + Exception.__init__(self, "[CSV %s] Incorrect number of messages: expected %d, found %d" % + (tnum, exp_num_msgs, actual_num_msgs)) + +class TransactionCsvError(Exception): + """Transaction mismatch between record and CSV file""" + def __init__(self, tnum, exp_transactional): + """Constructor""" + Exception.__init__(self, "[CSV %d] Transaction mismatch: expected %s" % (tnum, exp_transactional)) + +class UnexpectedEndOfFileError(Exception): + """Error class for unexpected end-of-file during reading""" + def __init__(self, exp_size, curr_offs): + """Constructor""" + Exception.__init__(self, "Unexpected end-of-file: expected file size:%d; current offset:%d" % + (exp_size, curr_offs)) + +class XidLengthCsvError(Exception): + """Message Xid length mismatch between record and CSV file""" + def __init__(self, tnum, exp_xid_len, actual_msg_len): + """Constructor""" + Exception.__init__(self, "[CSV %d] Message XID mismatch: expected %d; found %d" % + (tnum, exp_xid_len, actual_msg_len)) + +class XidSizeError(Exception): + """Error class for Xid size mismatch""" + def __init__(self, exp_size, act_size, xid_str): + """Constructor""" + Exception.__init__(self, "Inconsistent xid size: expected:%d; actual:%d; xid=\"%s\"" % + (exp_size, act_size, xid_str)) + +# ============================================================================= + +if __name__ == "__main__": + print "This is a library, and cannot be executed." + diff --git a/qpid/cpp/management/python/lib/qpidstore/jrnl.py b/qpid/cpp/management/python/lib/qpidstore/jrnl.py new file mode 100644 index 0000000000..7c4d6de4a9 --- /dev/null +++ b/qpid/cpp/management/python/lib/qpidstore/jrnl.py @@ -0,0 +1,794 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import jerr +import os.path, sys, xml.parsers.expat +from struct import pack, unpack, calcsize +from time import gmtime, strftime + +# TODO: Get rid of these! Use jinf instance instead +DBLK_SIZE = 128 +SBLK_SIZE = 4 * DBLK_SIZE + +# TODO - this is messy - find a better way to handle this +# This is a global, but is set directly by the calling program +JRNL_FILE_SIZE = None + +#== class Utils ====================================================================== + +class Utils(object): + """Class containing utility functions for dealing with the journal""" + + __printchars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"#$%&'()*+,-./:;<=>?@[\\]^_`{\|}~ " + + # The @staticmethod declarations are not supported in RHEL4 (python 2.3.x) + # When RHEL4 support ends, restore these declarations and remove the older + # staticmethod() declaration. + + #@staticmethod + def format_data(dsize, data): + """Format binary data for printing""" + if data == None: + return "" + if Utils._is_printable(data): + datastr = Utils._split_str(data) + else: + datastr = Utils._hex_split_str(data) + if dsize != len(data): + raise jerr.DataSizeError(dsize, len(data), datastr) + return "data(%d)=\"%s\" " % (dsize, datastr) + format_data = staticmethod(format_data) + + #@staticmethod + def format_xid(xid, xidsize=None): + """Format binary XID for printing""" + if xid == None and xidsize != None: + if xidsize > 0: + raise jerr.XidSizeError(xidsize, 0, None) + return "" + if Utils._is_printable(xid): + xidstr = Utils._split_str(xid) + else: + xidstr = Utils._hex_split_str(xid) + if xidsize == None: + xidsize = len(xid) + elif xidsize != len(xid): + raise jerr.XidSizeError(xidsize, len(xid), xidstr) + return "xid(%d)=\"%s\" " % (xidsize, xidstr) + format_xid = staticmethod(format_xid) + + #@staticmethod + def inv_str(string): + """Perform a binary 1's compliment (invert all bits) on a binary string""" + istr = "" + for index in range(0, len(string)): + istr += chr(~ord(string[index]) & 0xff) + return istr + inv_str = staticmethod(inv_str) + + #@staticmethod + def load(fhandle, klass): + """Load a record of class klass from a file""" + args = Utils._load_args(fhandle, klass) + subclass = klass.discriminate(args) + result = subclass(*args) # create instance of record + if subclass != klass: + result.init(fhandle, *Utils._load_args(fhandle, subclass)) + result.skip(fhandle) + return result + load = staticmethod(load) + + #@staticmethod + def load_file_data(fhandle, size, data): + """Load the data portion of a message from file""" + if size == 0: + return (data, True) + if data == None: + loaded = 0 + else: + loaded = len(data) + foverflow = fhandle.tell() + size - loaded > JRNL_FILE_SIZE + if foverflow: + rsize = JRNL_FILE_SIZE - fhandle.tell() + else: + rsize = size - loaded + fbin = fhandle.read(rsize) + if data == None: + data = unpack("%ds" % (rsize), fbin)[0] + else: + data = data + unpack("%ds" % (rsize), fbin)[0] + return (data, not foverflow) + load_file_data = staticmethod(load_file_data) + + #@staticmethod + def rem_bytes_in_blk(fhandle, blk_size): + """Return the remaining bytes in a block""" + foffs = fhandle.tell() + return Utils.size_in_bytes_to_blk(foffs, blk_size) - foffs + rem_bytes_in_blk = staticmethod(rem_bytes_in_blk) + + #@staticmethod + def size_in_blks(size, blk_size): + """Return the size in terms of data blocks""" + return int((size + blk_size - 1) / blk_size) + size_in_blks = staticmethod(size_in_blks) + + #@staticmethod + def size_in_bytes_to_blk(size, blk_size): + """Return the bytes remaining until the next block boundary""" + return Utils.size_in_blks(size, blk_size) * blk_size + size_in_bytes_to_blk = staticmethod(size_in_bytes_to_blk) + + #@staticmethod + def _hex_split_str(in_str, split_size = 50): + """Split a hex string into two parts separated by an ellipsis""" + if len(in_str) <= split_size: + return Utils._hex_str(in_str, 0, len(in_str)) +# if len(in_str) > split_size + 25: +# return Utils._hex_str(in_str, 0, 10) + " ... " + Utils._hex_str(in_str, 55, 65) + " ... " + \ +# Utils._hex_str(in_str, len(in_str)-10, len(in_str)) + return Utils._hex_str(in_str, 0, 10) + " ... " + Utils._hex_str(in_str, len(in_str)-10, len(in_str)) + _hex_split_str = staticmethod(_hex_split_str) + + #@staticmethod + def _hex_str(in_str, begin, end): + """Return a binary string as a hex string""" + hstr = "" + for index in range(begin, end): + if Utils._is_printable(in_str[index]): + hstr += in_str[index] + else: + hstr += "\\%02x" % ord(in_str[index]) + return hstr + _hex_str = staticmethod(_hex_str) + + #@staticmethod + def _is_printable(in_str): + """Return True if in_str in printable; False otherwise.""" + return in_str.strip(Utils.__printchars) == "" + _is_printable = staticmethod(_is_printable) + + #@staticmethod + def _load_args(fhandle, klass): + """Load the arguments from class klass""" + size = calcsize(klass.FORMAT) + foffs = fhandle.tell(), + fbin = fhandle.read(size) + if len(fbin) != size: + raise jerr.UnexpectedEndOfFileError(size, len(fbin)) + return foffs + unpack(klass.FORMAT, fbin) + _load_args = staticmethod(_load_args) + + #@staticmethod + def _split_str(in_str, split_size = 50): + """Split a string into two parts separated by an ellipsis if it is longer than split_size""" + if len(in_str) < split_size: + return in_str + return in_str[:25] + " ... " + in_str[-25:] + _split_str = staticmethod(_split_str) + + +#== class Hdr ================================================================= + +class Hdr: + """Class representing the journal header records""" + + FORMAT = "=4sBBHQ" + HDR_VER = 1 + OWI_MASK = 0x01 + BIG_ENDIAN = sys.byteorder == "big" + REC_BOUNDARY = DBLK_SIZE + + def __init__(self, foffs, magic, ver, endn, flags, rid): + """Constructor""" +# Sizeable.__init__(self) + self.foffs = foffs + self.magic = magic + self.ver = ver + self.endn = endn + self.flags = flags + self.rid = long(rid) + + def __str__(self): + """Return string representation of this header""" + if self.empty(): + return "0x%08x: " % (self.foffs) + if self.magic[-1] == "x": + return "0x%08x: [\"%s\"]" % (self.foffs, self.magic) + if self.magic[-1] in ["a", "c", "d", "e", "f", "x"]: + return "0x%08x: [\"%s\" v=%d e=%d f=0x%04x rid=0x%x]" % (self.foffs, self.magic, self.ver, self.endn, + self.flags, self.rid) + return "0x%08x: " % (self.foffs, self.magic) + + #@staticmethod + def discriminate(args): + """Use the last char in the header magic to determine the header type""" + return _CLASSES.get(args[1][-1], Hdr) + discriminate = staticmethod(discriminate) + + def empty(self): + """Return True if this record is empty (ie has a magic of 0x0000""" + return self.magic == "\x00"*4 + + def encode(self): + """Encode the header into a binary string""" + return pack(Hdr.FORMAT, self.magic, self.ver, self.endn, self.flags, self.rid) + + def owi(self): + """Return the OWI (overwrite indicator) for this header""" + return self.flags & self.OWI_MASK != 0 + + def skip(self, fhandle): + """Read and discard the remainder of this record""" + fhandle.read(Utils.rem_bytes_in_blk(fhandle, self.REC_BOUNDARY)) + + def check(self): + """Check that this record is valid""" + if self.empty() or self.magic[:3] != "RHM" or self.magic[3] not in ["a", "c", "d", "e", "f", "x"]: + return True + if self.magic[-1] != "x": + if self.ver != self.HDR_VER: + raise jerr.InvalidHeaderVersionError(self.HDR_VER, self.ver) + if bool(self.endn) != self.BIG_ENDIAN: + raise jerr.EndianMismatchError(self.BIG_ENDIAN) + return False + + +#== class FileHdr ============================================================= + +class FileHdr(Hdr): + """Class for file headers, found at the beginning of journal files""" + + FORMAT = "=2H4x3Q" + REC_BOUNDARY = SBLK_SIZE + + def __str__(self): + """Return a string representation of the this FileHdr instance""" + return "%s fid=%d lid=%d fro=0x%08x t=%s" % (Hdr.__str__(self), self.fid, self.lid, self.fro, + self.timestamp_str()) + + def encode(self): + """Encode this class into a binary string""" + return Hdr.encode(self) + pack(FileHdr.FORMAT, self.fid, self.lid, self.fro, self.time_sec, self.time_ns) + + def init(self, fhandle, foffs, fid, lid, fro, time_sec, time_ns): + """Initialize this instance to known values""" + self.fid = fid + self.lid = lid + self.fro = fro + self.time_sec = time_sec + self.time_ns = time_ns + + def timestamp(self): + """Get the timestamp of this record as a tuple (secs, nsecs)""" + return (self.time_sec, self.time_ns) + + def timestamp_str(self): + """Get the timestamp of this record in string format""" + time = gmtime(self.time_sec) + fstr = "%%a %%b %%d %%H:%%M:%%S.%09d %%Y" % (self.time_ns) + return strftime(fstr, time) + + +#== class DeqRec ============================================================== + +class DeqRec(Hdr): + """Class for a dequeue record""" + + FORMAT = "=QQ" + + def __str__(self): + """Return a string representation of the this DeqRec instance""" + return "%s %sdrid=0x%x" % (Hdr.__str__(self), Utils.format_xid(self.xid, self.xidsize), self.deq_rid) + + def init(self, fhandle, foffs, deq_rid, xidsize): + """Initialize this instance to known values""" + self.deq_rid = deq_rid + self.xidsize = xidsize + self.xid = None + self.deq_tail = None + self.xid_complete = False + self.tail_complete = False + self.tail_bin = None + self.tail_offs = 0 + self.load(fhandle) + + def encode(self): + """Encode this class into a binary string""" + buf = Hdr.encode(self) + pack(DeqRec.FORMAT, self.deq_rid, self.xidsize) + if self.xidsize > 0: + fmt = "%ds" % (self.xidsize) + buf += pack(fmt, self.xid) + buf += self.deq_tail.encode() + return buf + + def load(self, fhandle): + """Load the remainder of this record (after the header has been loaded""" + if self.xidsize == 0: + self.xid_complete = True + self.tail_complete = True + else: + if not self.xid_complete: + (self.xid, self.xid_complete) = Utils.load_file_data(fhandle, self.xidsize, self.xid) + if self.xid_complete and not self.tail_complete: + ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin) + self.tail_bin = ret[0] + if ret[1]: + self.deq_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT, self.tail_bin)) + magic_err = self.deq_tail.magic_inv != Utils.inv_str(self.magic) + rid_err = self.deq_tail.rid != self.rid + if magic_err or rid_err: + raise jerr.InvalidRecordTailError(magic_err, rid_err, self) + self.skip(fhandle) + self.tail_complete = ret[1] + return self.complete() + + def complete(self): + """Returns True if the entire record is loaded, False otherwise""" + return self.xid_complete and self.tail_complete + + +#== class TxnRec ============================================================== + +class TxnRec(Hdr): + """Class for a transaction commit/abort record""" + + FORMAT = "=Q" + + def __str__(self): + """Return a string representation of the this TxnRec instance""" + return "%s %s" % (Hdr.__str__(self), Utils.format_xid(self.xid, self.xidsize)) + + def init(self, fhandle, foffs, xidsize): + """Initialize this instance to known values""" + self.xidsize = xidsize + self.xid = None + self.tx_tail = None + self.xid_complete = False + self.tail_complete = False + self.tail_bin = None + self.tail_offs = 0 + self.load(fhandle) + + def encode(self): + """Encode this class into a binary string""" + return Hdr.encode(self) + pack(TxnRec.FORMAT, self.xidsize) + pack("%ds" % self.xidsize, self.xid) + \ + self.tx_tail.encode() + + def load(self, fhandle): + """Load the remainder of this record (after the header has been loaded""" + if not self.xid_complete: + ret = Utils.load_file_data(fhandle, self.xidsize, self.xid) + self.xid = ret[0] + self.xid_complete = ret[1] + if self.xid_complete and not self.tail_complete: + ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin) + self.tail_bin = ret[0] + if ret[1]: + self.tx_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT, self.tail_bin)) + magic_err = self.tx_tail.magic_inv != Utils.inv_str(self.magic) + rid_err = self.tx_tail.rid != self.rid + if magic_err or rid_err: + raise jerr.InvalidRecordTailError(magic_err, rid_err, self) + self.skip(fhandle) + self.tail_complete = ret[1] + return self.complete() + + def complete(self): + """Returns True if the entire record is loaded, False otherwise""" + return self.xid_complete and self.tail_complete + + +#== class EnqRec ============================================================== + +class EnqRec(Hdr): + """Class for a enqueue record""" + + FORMAT = "=QQ" + TRANSIENT_MASK = 0x10 + EXTERN_MASK = 0x20 + + def __str__(self): + """Return a string representation of the this EnqRec instance""" + return "%s %s%s %s %s" % (Hdr.__str__(self), Utils.format_xid(self.xid, self.xidsize), + Utils.format_data(self.dsize, self.data), self.enq_tail, self.print_flags()) + + def encode(self): + """Encode this class into a binary string""" + buf = Hdr.encode(self) + pack(EnqRec.FORMAT, self.xidsize, self.dsize) + if self.xidsize > 0: + buf += pack("%ds" % self.xidsize, self.xid) + if self.dsize > 0: + buf += pack("%ds" % self.dsize, self.data) + if self.xidsize > 0 or self.dsize > 0: + buf += self.enq_tail.encode() + return buf + + def init(self, fhandle, foffs, xidsize, dsize): + """Initialize this instance to known values""" + self.xidsize = xidsize + self.dsize = dsize + self.transient = self.flags & self.TRANSIENT_MASK > 0 + self.extern = self.flags & self.EXTERN_MASK > 0 + self.xid = None + self.data = None + self.enq_tail = None + self.xid_complete = False + self.data_complete = False + self.tail_complete = False + self.tail_bin = None + self.tail_offs = 0 + self.load(fhandle) + + def load(self, fhandle): + """Load the remainder of this record (after the header has been loaded""" + if not self.xid_complete: + ret = Utils.load_file_data(fhandle, self.xidsize, self.xid) + self.xid = ret[0] + self.xid_complete = ret[1] + if self.xid_complete and not self.data_complete: + if self.extern: + self.data_complete = True + else: + ret = Utils.load_file_data(fhandle, self.dsize, self.data) + self.data = ret[0] + self.data_complete = ret[1] + if self.data_complete and not self.tail_complete: + ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin) + self.tail_bin = ret[0] + if ret[1]: + self.enq_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT, self.tail_bin)) + magic_err = self.enq_tail.magic_inv != Utils.inv_str(self.magic) + rid_err = self.enq_tail.rid != self.rid + if magic_err or rid_err: + raise jerr.InvalidRecordTailError(magic_err, rid_err, self) + self.skip(fhandle) + self.tail_complete = ret[1] + return self.complete() + + def complete(self): + """Returns True if the entire record is loaded, False otherwise""" + return self.xid_complete and self.data_complete and self.tail_complete + + def print_flags(self): + """Utility function to decode the flags field in the header and print a string representation""" + fstr = "" + if self.transient: + fstr = "*TRANSIENT" + if self.extern: + if len(fstr) > 0: + fstr += ",EXTERNAL" + else: + fstr = "*EXTERNAL" + if len(fstr) > 0: + fstr += "*" + return fstr + + +#== class RecTail ============================================================= + +class RecTail: + """Class for a record tail - for all records where either an XID or data separate the header from the end of the + record""" + + FORMAT = "=4sQ" + + def __init__(self, foffs, magic_inv, rid): + """Initialize this instance to known values""" + self.foffs = foffs + self.magic_inv = magic_inv + self.rid = long(rid) + + def __str__(self): + """Return a string representation of the this RecTail instance""" + magic = Utils.inv_str(self.magic_inv) + return "[\"%s\" rid=0x%x]" % (magic, self.rid) + + def encode(self): + """Encode this class into a binary string""" + return pack(RecTail.FORMAT, self.magic_inv, self.rid) + + +#== class JrnlInfo ============================================================ + +class JrnlInfo(object): + """ + This object reads and writes journal information files (.jinf). Methods are provided + to read a file, query its properties and reset just those properties necessary for normalizing + and resizing a journal. + + Normalizing: resetting the directory and/or base filename to different values. This is necessary + if a set of journal files is copied from one location to another before being restored, as the + value of the path in the file no longer matches the actual path. + + Resizing: If the journal geometry parameters (size and number of journal files) changes, then the + .jinf file must reflect these changes, as this file is the source of information for journal + recovery. + + NOTE: Data size vs File size: There are methods which return the data size and file size of the + journal files. + + +-------------+--------------------/ /----------+ + | File header | File data | + +-------------+--------------------/ /----------+ + | | | + | |<---------- Data size ---------->| + |<------------------ File Size ---------------->| + + Data size: The size of the data content of the journal, ie that part which stores the data records. + + File size: The actual disk size of the journal including data and the file header which precedes the + data. + + The file header is fixed to 1 sblk, so file size = jrnl size + sblk size. + """ + + def __init__(self, jdir, bfn = "JournalData"): + """Constructor""" + self.__jdir = jdir + self.__bfn = bfn + self.__jinf_dict = {} + self._read_jinf() + + def __str__(self): + """Create a string containing all of the journal info contained in the jinf file""" + ostr = "Journal info file %s:\n" % os.path.join(self.__jdir, "%s.jinf" % self.__bfn) + for key, val in self.__jinf_dict.iteritems(): + ostr += " %s = %s\n" % (key, val) + return ostr + + def normalize(self, jdir = None, bfn = None): + """Normalize the directory (ie reset the directory path to match the actual current location) for this + jinf file""" + if jdir == None: + self.__jinf_dict["directory"] = self.__jdir + else: + self.__jdir = jdir + self.__jinf_dict["directory"] = jdir + if bfn != None: + self.__bfn = bfn + self.__jinf_dict["base_filename"] = bfn + + def resize(self, num_jrnl_files = None, jrnl_file_size = None): + """Reset the journal size information to allow for resizing the journal""" + if num_jrnl_files != None: + self.__jinf_dict["number_jrnl_files"] = num_jrnl_files + if jrnl_file_size != None: + self.__jinf_dict["jrnl_file_size_sblks"] = jrnl_file_size * self.get_jrnl_dblk_size_bytes() + + def write(self, jdir = None, bfn = None): + """Write the .jinf file""" + self.normalize(jdir, bfn) + if not os.path.exists(self.get_jrnl_dir()): + os.makedirs(self.get_jrnl_dir()) + fhandle = open(os.path.join(self.get_jrnl_dir(), "%s.jinf" % self.get_jrnl_base_name()), "w") + fhandle.write("\n") + fhandle.write("\n") + fhandle.write(" \n" % self.get_jrnl_version()) + fhandle.write(" \n") + fhandle.write(" \n" % self.get_jrnl_id()) + fhandle.write(" \n" % self.get_jrnl_dir()) + fhandle.write(" \n" % self.get_jrnl_base_name()) + fhandle.write(" \n") + fhandle.write(" \n") + fhandle.write(" \n" % self.get_creation_time()[0]) + fhandle.write(" \n" % self.get_creation_time()[1]) + fhandle.write(" \n" % self.get_creation_time_str()) + fhandle.write(" \n") + fhandle.write(" \n") + fhandle.write(" \n" % self.get_num_jrnl_files()) + fhandle.write(" \n" % str.lower(str(self.get_auto_expand()))) + fhandle.write(" \n" % self.get_jrnl_data_size_sblks()) + fhandle.write(" \n" % self.get_jrnl_sblk_size_dblks()) + fhandle.write(" \n" % self.get_jrnl_dblk_size_bytes()) + fhandle.write(" \n") + fhandle.write(" \n") + fhandle.write(" \n" % self.get_wr_buf_pg_size_sblks()) + fhandle.write(" \n" % self.get_num_wr_buf_pgs()) + fhandle.write(" \n" % self.get_rd_buf_pg_size_sblks()) + fhandle.write(" \n" % self.get_num_rd_buf_pgs()) + fhandle.write(" \n") + fhandle.write("\n") + fhandle.close() + + # Journal ID + + def get_jrnl_version(self): + """Get the journal version""" + return self.__jinf_dict["journal_version"] + + def get_jrnl_id(self): + """Get the journal id""" + return self.__jinf_dict["id_string"] + + def get_current_dir(self): + """Get the current directory of the store (as opposed to that value saved in the .jinf file)""" + return self.__jdir + + def get_jrnl_dir(self): + """Get the journal directory stored in the .jinf file""" + return self.__jinf_dict["directory"] + + def get_jrnl_base_name(self): + """Get the base filename - that string used to name the journal files -nnnn.jdat and + .jinf""" + return self.__jinf_dict["base_filename"] + + # Journal creation time + + def get_creation_time(self): + """Get journal creation time as a tuple (secs, nsecs)""" + return (self.__jinf_dict["seconds"], self.__jinf_dict["nanoseconds"]) + + def get_creation_time_str(self): + """Get journal creation time as a string""" + return self.__jinf_dict["string"] + + # --- Files and geometry --- + + def get_num_jrnl_files(self): + """Get number of data files in the journal""" + return self.__jinf_dict["number_jrnl_files"] + + def get_auto_expand(self): + """Return True if auto-expand is enabled; False otherwise""" + return self.__jinf_dict["auto_expand"] + + def get_jrnl_sblk_size_dblks(self): + """Get the journal softblock size in dblks""" + return self.__jinf_dict["JRNL_SBLK_SIZE"] + + def get_jrnl_sblk_size_bytes(self): + """Get the journal softblock size in bytes""" + return self.get_jrnl_sblk_size_dblks() * self.get_jrnl_dblk_size_bytes() + + def get_jrnl_dblk_size_bytes(self): + """Get the journal datablock size in bytes""" + return self.__jinf_dict["JRNL_DBLK_SIZE"] + + def get_jrnl_data_size_sblks(self): + """Get the data capacity (excluding the file headers) for one journal file in softblocks""" + return self.__jinf_dict["jrnl_file_size_sblks"] + + def get_jrnl_data_size_dblks(self): + """Get the data capacity (excluding the file headers) for one journal file in datablocks""" + return self.get_jrnl_data_size_sblks() * self.get_jrnl_sblk_size_dblks() + + def get_jrnl_data_size_bytes(self): + """Get the data capacity (excluding the file headers) for one journal file in bytes""" + return self.get_jrnl_data_size_dblks() * self.get_jrnl_dblk_size_bytes() + + def get_jrnl_file_size_sblks(self): + """Get the size of one journal file on disk (including the file headers) in softblocks""" + return self.get_jrnl_data_size_sblks() + 1 + + def get_jrnl_file_size_dblks(self): + """Get the size of one journal file on disk (including the file headers) in datablocks""" + return self.get_jrnl_file_size_sblks() * self.get_jrnl_sblk_size_dblks() + + def get_jrnl_file_size_bytes(self): + """Get the size of one journal file on disk (including the file headers) in bytes""" + return self.get_jrnl_file_size_dblks() * self.get_jrnl_dblk_size_bytes() + + def get_tot_jrnl_data_size_sblks(self): + """Get the size of the entire jouranl's data capacity (excluding the file headers) for all files together in + softblocks""" + return self.get_num_jrnl_files() * self.get_jrnl_data_size_bytes() + + def get_tot_jrnl_data_size_dblks(self): + """Get the size of the entire jouranl's data capacity (excluding the file headers) for all files together in + datablocks""" + return self.get_num_jrnl_files() * self.get_jrnl_data_size_dblks() + + def get_tot_jrnl_data_size_bytes(self): + """Get the size of the entire jouranl's data capacity (excluding the file headers) for all files together in + bytes""" + return self.get_num_jrnl_files() * self.get_jrnl_data_size_bytes() + + # Read and write buffers + + def get_wr_buf_pg_size_sblks(self): + """Get the size of the write buffer pages in softblocks""" + return self.__jinf_dict["wcache_pgsize_sblks"] + + def get_wr_buf_pg_size_dblks(self): + """Get the size of the write buffer pages in datablocks""" + return self.get_wr_buf_pg_size_sblks() * self.get_jrnl_sblk_size_dblks() + + def get_wr_buf_pg_size_bytes(self): + """Get the size of the write buffer pages in bytes""" + return self.get_wr_buf_pg_size_dblks() * self.get_jrnl_dblk_size_bytes() + + def get_num_wr_buf_pgs(self): + """Get the number of write buffer pages""" + return self.__jinf_dict["wcache_num_pages"] + + def get_rd_buf_pg_size_sblks(self): + """Get the size of the read buffer pages in softblocks""" + return self.__jinf_dict["JRNL_RMGR_PAGE_SIZE"] + + def get_rd_buf_pg_size_dblks(self): + """Get the size of the read buffer pages in datablocks""" + return self.get_rd_buf_pg_size_sblks * self.get_jrnl_sblk_size_dblks() + + def get_rd_buf_pg_size_bytes(self): + """Get the size of the read buffer pages in bytes""" + return self.get_rd_buf_pg_size_dblks * self.get_jrnl_dblk_size_bytes() + + def get_num_rd_buf_pgs(self): + """Get the number of read buffer pages""" + return self.__jinf_dict["JRNL_RMGR_PAGES"] + + def _read_jinf(self): + """Read and initialize this instance from an existing jinf file located at the directory named in the + constructor - called by the constructor""" + fhandle = open(os.path.join(self.__jdir, "%s.jinf" % self.__bfn), "r") + parser = xml.parsers.expat.ParserCreate() + parser.StartElementHandler = self._handle_xml_start_elt + parser.CharacterDataHandler = self._handle_xml_char_data + parser.EndElementHandler = self._handle_xml_end_elt + parser.ParseFile(fhandle) + fhandle.close() + + def _handle_xml_start_elt(self, name, attrs): + """Callback for handling XML start elements. Used by the XML parser.""" + # bool values + if name == "auto_expand": + self.__jinf_dict[name] = attrs["value"] == "true" + # long values + elif name == "seconds" or \ + name == "nanoseconds": + self.__jinf_dict[name] = long(attrs["value"]) + # int values + elif name == "journal_version" or \ + name == "number_jrnl_files" or \ + name == "jrnl_file_size_sblks" or \ + name == "JRNL_SBLK_SIZE" or \ + name == "JRNL_DBLK_SIZE" or \ + name == "wcache_pgsize_sblks" or \ + name == "wcache_num_pages" or \ + name == "JRNL_RMGR_PAGE_SIZE" or \ + name == "JRNL_RMGR_PAGES": + self.__jinf_dict[name] = int(attrs["value"]) + # strings + elif "value" in attrs: + self.__jinf_dict[name] = attrs["value"] + + def _handle_xml_char_data(self, data): + """Callback for handling character data (ie within ...). The jinf file does not use this in its + data. Used by the XML parser.""" + pass + + def _handle_xml_end_elt(self, name): + """Callback for handling XML end elements. Used by XML parser.""" + pass + + +#============================================================================== + +_CLASSES = { + "a": TxnRec, + "c": TxnRec, + "d": DeqRec, + "e": EnqRec, + "f": FileHdr +} + +if __name__ == "__main__": + print "This is a library, and cannot be executed." -- cgit v1.2.1