summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/benchmark.py56
-rw-r--r--rdflib/plugin.py3
-rw-r--r--rdflib/plugins/bdboptimized.py532
-rw-r--r--test/test_bdboptimized.py17
4 files changed, 608 insertions, 0 deletions
diff --git a/examples/benchmark.py b/examples/benchmark.py
new file mode 100644
index 00000000..9f7ea963
--- /dev/null
+++ b/examples/benchmark.py
@@ -0,0 +1,56 @@
+import sys
+import os
+import time
+import random
+
+import rdflib
+
+def resource(n):
+ return rdflib.URIRef("urn:resource:%d"%n)
+
+def prop(n):
+ return rdflib.URIRef("urn:property:%d"%n)
+
+
+def createData(g,N, M=80):
+
+ for x in range(N):
+ g.add(( resource(random.randint(0,M)),
+ prop(random.randint(0,M)),
+ resource(random.randint(0,M))))
+
+if __name__=='__main__':
+
+ g=rdflib.Graph(sys.argv[1])
+
+ g.open(os.tempnam(), create=True)
+ start=time.time()
+ #g.load("foaf890K.rdf")
+ createData(g,300000)
+ print len(g)
+ read=time.time()
+ sys.stderr.write("Reading took %.2fs\n"%(read-start))
+
+ x=set(g.subjects(prop(5), resource(5)))
+ t1=time.time()
+ sys.stderr.write("Subjects took %.2fs\n"%(t1-read))
+
+ x=set(g.predicates(resource(5), resource(5)))
+ t2=time.time()
+ sys.stderr.write("Predicates took %.2fs\n"%(t2-t1))
+
+ x=set(g.objects(resource(5), prop(5)))
+ t3=time.time()
+ sys.stderr.write("Objects took %.2fs\n"%(t3-t2))
+
+ x=set(g.subject_objects(prop(5)))
+ t4=time.time()
+ sys.stderr.write("SubjectObjects took %.2fs\n"%(t4-t3))
+
+ x=set(g.subject_predicates(resource(5)))
+ t5=time.time()
+ sys.stderr.write("SubjectPredicates took %.2fs\n"%(t5-t4))
+
+ x=set(g.predicate_objects(resource(5)))
+ t6=time.time()
+ sys.stderr.write("SubjectPredicates took %.2fs\n"%(t6-t5))
diff --git a/rdflib/plugin.py b/rdflib/plugin.py
index 1383cb1c..4a000607 100644
--- a/rdflib/plugin.py
+++ b/rdflib/plugin.py
@@ -122,6 +122,9 @@ register('IOMemory', Store,
'rdflib.plugins.memory', 'IOMemory')
register('Sleepycat', Store,
'rdflib.plugins.sleepycat', 'Sleepycat')
+register('BDBOptimized', Store,
+ 'rdflib.plugins.bdboptimized', 'BDBOptimized')
+
register('xml', Serializer,
'rdflib.plugins.serializers.rdfxml', 'XMLSerializer')
diff --git a/rdflib/plugins/bdboptimized.py b/rdflib/plugins/bdboptimized.py
new file mode 100644
index 00000000..ef069e91
--- /dev/null
+++ b/rdflib/plugins/bdboptimized.py
@@ -0,0 +1,532 @@
+import warnings
+from bsddb import db
+from urllib import pathname2url
+from os.path import exists, abspath, join
+from os import makedirs
+
+from rdflib import URIRef
+from rdflib.store import Store, NO_STORE
+
+SUPPORT_MULTIPLE_STORE_ENVIRON = False
+
+warnings.warn("The BDBOptimized store is experimental and not yet recommended for production.")
+
+if db.version() < (4,3,29):
+ warnings.warn("Your BDB library may not be supported.")
+
+import logging
+_logger = logging.getLogger(__name__)
+
+# TODO: performance testing?
+
+class NamespaceIndex:
+
+ def __init__(self, db_env):
+ self.__db_env = db_env
+ self.__namespace = db.DB(db_env)
+ self.__namespace.open('namespace.db', None, db.DB_BTREE, db.DB_CREATE | db.DB_AUTO_COMMIT)
+
+ self.__prefix = db.DB(db_env)
+ self.__prefix.open("prefix.db", None, db.DB_BTREE, db.DB_CREATE | db.DB_AUTO_COMMIT)
+
+ def bind(self, prefix, namespace):
+ prefix = prefix.encode("utf-8")
+ namespace = namespace.encode("utf-8")
+
+ t = self.__db_env.txn_begin()
+ try:
+ bound_prefix = self.__prefix.get(namespace, txn=t)
+ if bound_prefix:
+ self.__namespace.delete(bound_prefix, txn=t)
+ self.__prefix.put(namespace, prefix, txn=t)
+ self.__namespace.put(prefix, namespace, txn=t)
+ t.commit(0)
+ except Exception, e:
+ t.abort()
+
+ def namespaces(self):
+ cursor = self.__namespace.cursor()
+ results = []
+ current = cursor.first()
+ while current:
+ prefix, namespace = current
+ results.append((prefix, namespace))
+ current = cursor.next()
+ cursor.close()
+ for prefix, namespace in results:
+ yield prefix, URIRef(namespace)
+
+ def prefix(self, namespace):
+ namespace = namespace.encode("utf-8")
+ t = self.__db_env.txn_begin()
+ try:
+ r = self.__prefix.get(namespace, None)
+ t.commit(0)
+ return r
+ except Exception, e:
+ t.abort()
+ raise e
+
+ def namespace(self, prefix):
+ prefix = prefix.encode("utf-8")
+ t = self.__db_env.txn_begin()
+ try:
+ r = self.__namespace.get(prefix, None)
+ t.commit(0)
+ return r
+ except Exception, e:
+ t.abort()
+ raise e
+
+ def close(self):
+ self.__namespace.close()
+ self.__prefix.close()
+
+class IDMap:
+ def __init__(self, db_env, node_pickler):
+ self.__db_env = db_env
+ self.__dbp = db.DB(db_env)
+ self.__dbp.open("IDMap_hash.db", None, db.DB_HASH, db.DB_CREATE | db.DB_AUTO_COMMIT)
+
+ self.__dbs = db.DB(db_env)
+ self.__dbs.open("IDMap_recno.db", None, db.DB_RECNO, db.DB_CREATE | db.DB_AUTO_COMMIT)
+
+ # pickling and un-pickling the data
+ self.__node_pickler = node_pickler
+
+ self.__loads = self.__node_pickler.loads
+ self.__dumps = self.__node_pickler.dumps
+
+ def insert(self, key):
+ # this inserts a new key if the key was not available
+ t = self.__db_env.txn_begin()
+ try:
+ k = self.__dumps(key)
+ val = self.__dbp.get(k, txn=t)
+ # the key is not found, register a new value for it
+ if val is None:
+ val = "%s" % self.__dbs.append(k, t)
+ #dbp.put("counter", counter, txn=t)
+ self.__dbp.put(k, val, txn=t)
+ t.commit(0)
+ return val
+ except Exception, e:
+ t.abort()
+
+ # t2.commit(0)
+
+ def get_id(self, key):
+ k = self.__dumps(key)
+ t = self.__db_env.txn_begin()
+ try:
+ val = self.__dbp.get(k, txn=t)
+ t.commit(0)
+ if val == None:
+ return None
+
+ return val
+ except Exception, e:
+ t.abort()
+
+ def get_var(self, num):
+ t = self.__db_env.txn_begin()
+ try:
+ val = self.__dbs.get(num, txn=t)
+ t.commit(0)
+ return self.__loads(val)
+ except Exception, e:
+ t.abort()
+
+ def close(self):
+ self.__dbp.close()
+ self.__dbs.close()
+
+ def all(self):
+ l = []
+
+ cursor = self.__dbs.cursor()
+ current = cursor.first()
+ while current:
+ try:
+ key, value = current
+ l.append((key, value))
+ current = cursor.next()
+ except Exception, e:
+ cursor.close()
+
+ cursor.close()
+ return l
+
+def secondaryIndexKey(key, data):
+ # returns the first part of a tuple of ints joined by : in a str.
+ return (data.split("^")[0])
+
+class QuadIndex:
+
+ def __init__(self, db_env, idmapper):
+ self.__db_env = db_env
+ self.__map = idmapper
+
+ self.__splitter = '^'
+
+ self.__index_list = ['spoc', 'pocs', 'ocsp', 'ospc', 'cspo', 'cpso']
+ self.__indices = self.__init_indices()
+ self.__use_index = self.__init_use_index()
+ self.__re_order = self.__init_re_order()
+ self.__open = True
+
+ def __init_indices(self):
+ indices = {}
+ for index in self.__index_list:
+ indices[index] = db.DB(self.__db_env)
+ indices[index].open("index_%s.db" % index, None, db.DB_BTREE, db.DB_CREATE | db.DB_AUTO_COMMIT)
+
+ return indices
+
+ def __init_re_order(self):
+ # create functions that changes the variable order back
+ # to s,p,o,c
+ re_order = {}
+
+ re_order['spoc'] = lambda (s,p,o,c): (s,p,o,c)
+ re_order['pocs'] = lambda (p,o,c,s): (s,p,o,c)
+ re_order['ocsp'] = lambda (o,c,s,p): (s,p,o,c)
+ re_order['ospc'] = lambda (o,s,p,c): (s,p,o,c)
+ re_order['cspo'] = lambda (c,s,p,o): (s,p,o,c)
+ re_order['cpso'] = lambda (c,p,s,o): (s,p,o,c)
+
+ return re_order
+
+ def __init_use_index(self):
+ # a hashmap deciding which index to use depending on bound variables
+ # there are 16 combinations and 6 indices
+ use_index = {}
+
+ # spoc
+ use_index[(False, False, False, False)] = 'spoc'
+ use_index[(True, False, False, False)] = 'spoc'
+ use_index[(True, True, False, False)] = 'spoc'
+ use_index[(True, True, True, False)] = 'spoc'
+ use_index[(True, True, True, True)] = 'spoc'
+
+ # pocs
+ use_index[(False, True, False, False)] = 'pocs'
+ use_index[(False, True, True, False)] = 'pocs'
+ use_index[(False, True, True, True)] = 'pocs'
+
+ # ocsp
+ use_index[(False, False, True, False)] = 'ocsp'
+ use_index[(False, False, True, True)] = 'ocsp'
+ use_index[(True, False, True, True)] = 'ocsp'
+
+ # cspo
+ use_index[(False, False, False, True)] = 'cspo'
+ use_index[(True, False, False, True)] = 'cspo'
+ use_index[(True, True, False, True)] = 'cspo'
+
+ # cpso
+ use_index[(False, True, False, True)] = 'cpso'
+
+ # ospc
+ use_index[(True, False, True, False)] = 'ospc'
+
+ return use_index
+
+ def insert(self, (s,p,o,c)):
+ # check if the key is available,
+
+ # make sure there is a mapping for all the values
+ s_id = self.__map.insert(s)
+ p_id = self.__map.insert(p)
+ o_id = self.__map.insert(o)
+ c_id = self.__map.insert(c)
+
+ index_map = self.__init_index_map((s_id, p_id, o_id, c_id))
+
+ t = self.__db_env.txn_begin()
+ try:
+ for index in self.__indices:
+ self.__indices[index].put(index_map[index], '', txn=t)
+
+ t.commit(0)
+ except Exception, e:
+ t.abort()
+
+ def delete(self, (s,p,o,c), txn=None):
+ (s_id, p_id, o_id, c_id) = self.__map_id((s,p,o,c))
+
+ # setup the indices
+ index_map = self.__init_index_map((s_id, p_id, o_id, c_id))
+
+ # since an index is in used within a transaction to traverse
+ # the keys to delete, the delete deadlocks when acting on that index
+ # close the cursor in __all_prefix before yielding?
+ if txn == None:
+ t = self.__db_env.txn_begin()
+ else:
+ t = self.__db_env.txn_begin(txn)
+
+ try:
+ for index in self.__indices:
+ self.__indices[index].delete(index_map[index], txn=t, flags=0)
+ t.commit(0)
+ except Exception, e:
+ t.abort()
+
+ # t2.commit(0)
+
+ # returns a mapping from index configuration to a
+ # string in the format v1^v2^v3^v4, which is used
+ # as a key in the index
+ def __init_index_map(self, (s_id,p_id,o_id,c_id)):
+ indices = {}
+
+ indices['spoc'] = self.__splitter.join([str(k) for k in (s_id, p_id, o_id, c_id)])
+ indices['pocs'] = self.__splitter.join([str(k) for k in (p_id, o_id, c_id, s_id)])
+ indices['ocsp'] = self.__splitter.join([str(k) for k in (o_id, c_id, s_id, p_id)])
+ indices['ospc'] = self.__splitter.join([str(k) for k in (o_id, s_id, p_id, c_id)])
+ indices['cspo'] = self.__splitter.join([str(k) for k in (c_id, s_id, p_id, o_id)])
+ indices['cpso'] = self.__splitter.join([str(k) for k in (c_id, p_id, s_id, o_id)])
+
+ return indices
+
+ # a 0 (or '0') in a BDB range query is first in the range
+ # returns the list of ints representing the bound
+ # variables in the index
+
+ def __map_id(self, (s,p,o,c)):
+ def map_id(val):
+ m = self.__map.get_id(val)
+ if m == None:
+ return 0
+ return int(m)
+
+ return [map_id(v) for v in (s,p,o,c)]
+
+ def __map_var(self, (s_id, p_id, o_id, c_id)):
+ def map_var(val):
+ v = self.__map.get_var(int(val))
+ if v == None:
+ return ''
+ return v
+
+ return tuple([map_var(v) for v in (s_id, p_id, o_id, c_id)])
+
+ def triples(self, (s,p,o,c), twopass=False):
+ # TODO: implement a twopass version where all IDs are collected before
+ # being mapped to their real values. Does this improve performance?
+ #
+ # iterates over the triples depending on the values of s,p,o,c
+ indices = {}
+
+ (s_id, p_id, o_id, c_id) = self.__map_id((s,p,o,c))
+
+ # setup the indices
+ indices = self.__init_index_map((s_id, p_id, o_id, c_id))
+
+ # get the bool map for the current configuration
+ (s_bool, p_bool, o_bool, c_bool) = [v != 0 for v in (s_id, p_id, o_id, c_id)]
+
+ current_index = self.__use_index[(s_bool, p_bool, o_bool, c_bool)]
+ prefix = indices[current_index]
+ # strip of all ^0
+ # no bound variables
+ if not (True in (s_bool, p_bool, o_bool, c_bool)):
+ prefix = ''
+ # bound variables found, strip of trailing ^0 for the prefix
+ elif self.__splitter + '0' in prefix:
+ prefix = prefix[0:prefix.find(self.__splitter + '0')]
+ # otherwise use the given prefix
+
+ re_order_f = self.__re_order[current_index]
+
+ # convert the key back into the corresponding values
+ for k,v in self.__all_prefix(prefix, current_index):
+ (s,p,o,c) = self.__map_var(re_order_f(k.split(self.__splitter)))
+# print (k,v, prefix, indices[current_index], s, p, o, c)
+ yield ((s,p,o), c)
+
+ return
+
+ def contexts(self, triple=None):
+ for k,v in self.__all_prefix('', index='cspo'):
+ (c,s,p,o) = self.__map_var(k.split(self.__splitter))
+ yield c
+
+ def remove(self, (s,p,o,c)):
+ [self.delete((s_t,p_t,o_t,c_t)) for ((s_t,p_t,o_t),c_t) in self.triples((s,p,o,c))]
+
+
+ def __len__(self, context=None):
+ return len([x for x in self.triples((None, None, None, context))])
+
+ def __all_prefix(self, prefix, index='spoc'):
+ next = True
+ next_key = prefix
+
+ while next:
+ c = self.__indices[index].cursor()
+ try:
+ current = c.set_range(next_key)
+ next = c.next()
+ if next:
+ next_key, data = next
+ except db.DBNotFoundError, e:
+ next = None
+ # what happens when the cursor is closed and re-opened between
+ # each access, does this mean that the lookup will be done again
+ # or is the location preserved somehow?
+ # in the first case it is better to collect a list of results and
+ # then yield over this list
+ c.close()
+
+ if current:
+ key, data = current
+ if key and key.startswith(prefix):
+ yield key, data
+
+ if next_key and not next_key.startswith(prefix):
+ next = None
+
+ def close(self):
+ self.__open = False
+
+ for index in self.__indices:
+ self.__indices[index].close()
+
+class BDBOptimized(Store):
+ """ An alternative BDB store implementing the index-structure proposed in:
+ http://sw.deri.org/2005/02/dexa/yars.pdf
+
+ Index structures
+ key -> int, int -> key for variable to id and id -> variable
+ Triple indices: spoc, pocs, ocsp, cspo, cpso, ospc
+
+ This store is both transaction and context-aware.
+ """
+
+ context_aware = True
+
+ # hmm - this was false, but if false cannot be used as sink for n3 parser
+ # I set it to true - but this probably breaks something. [GAG 2011/08/20]
+ formula_aware = True
+
+ # TODO: transaction support
+ transaction_aware = True
+
+ def __init__(self, configuration=None, identifier=None):
+ self.__open = False
+ self.__identifier = identifier
+ self.configuration = configuration
+ self.__locks = 5000
+ self.__db_env = None
+ self.__id_mapper = None
+ self.__quad_index = None
+ self.__namespace_index = None
+ # Store.__init__ calls open if there is a configuration
+ super(BDBOptimized, self).__init__(configuration)
+
+ def __get_identifier(self):
+ return self.__identifier
+ identifier = property(__get_identifier)
+
+ def _init_db_environment(self, homeDir, create=True):
+ #NOTE: The identifier is appended to the path as the location for the db
+ #This provides proper isolation for stores which have the same path but different identifiers
+
+ if SUPPORT_MULTIPLE_STORE_ENVIRON:
+ fullDir = join(homeDir,self.identifier)
+ else:
+ fullDir = homeDir
+ envsetflags = db.DB_CDB_ALLDB
+ envflags = db.DB_INIT_MPOOL | db.DB_INIT_LOCK | db.DB_THREAD | db.DB_INIT_TXN | db.DB_RECOVER
+ if not exists(fullDir):
+ if create==True:
+ makedirs(fullDir)
+ self.create(fullDir)
+ else:
+ return NO_STORE
+
+ db_env = db.DBEnv()
+ db_env.set_cachesize(0, 1024*1024*50) # TODO
+
+ # enable deadlock-detection
+ db_env.set_lk_detect(db.DB_LOCK_MAXLOCKS)
+
+ # increase the number of locks, this is correlated to the size (num triples) that
+ # can be added/removed with a single transaction
+ db_env.set_lk_max_locks(self.__locks)
+ db_env.set_lk_max_lockers(self.__locks)
+ db_env.set_lk_max_objects(self.__locks)
+
+ #db_env.set_lg_max(1024*1024)
+ #db_env.set_flags(envsetflags, 1)
+ db_env.open(fullDir, envflags | db.DB_CREATE,0)
+ return db_env
+
+ def is_open(self):
+ return self.__open
+
+ def open(self, path, create=True):
+ homeDir = path
+
+ if self.__identifier is None:
+ self.__identifier = URIRef(pathname2url(abspath(homeDir)))
+
+ self.__db_env = self._init_db_environment(homeDir, create)
+ self.__open = True
+
+ self.__id_mapper = IDMap(self.__db_env, self.node_pickler)
+ self.__quad_index = QuadIndex(self.__db_env, self.__id_mapper)
+ self.__namespace_index = NamespaceIndex(self.__db_env)
+
+ def triples(self, (subject, predicate, object), context=None):
+ for result in self.__quad_index.triples((subject, predicate, object, context)):
+ yield result
+
+ def contexts(self, triple=None):
+ return self.__quad_index.contexts(triple=triple)
+
+ def add(self, (subject, predicate, object), context, quoted=False, txn=None):
+ """\
+ Add a triple to the store of triples.
+ """
+ assert self.__open, "The Store must be open."
+ Store.add(self, (subject, predicate, object), context, quoted)
+
+ self.__quad_index.insert((subject, predicate, object, context))
+
+ def remove(self, (subject, predicate, object), context, txn=None):
+ """
+ Remove the matching triples and/or context from the store. Variables
+ can be unbound by using None.
+ """
+
+ assert self.__open, "The Store must be open."
+ Store.remove(self, (subject, predicate, object), context)
+
+ self.__quad_index.remove((subject, predicate, object, context))
+
+ def bind(self, prefix, namespace):
+ return self.__namespace_index.bind(prefix, namespace)
+
+ def namespace(self, prefix):
+ return self.__namespace_index.namespace(prefix)
+
+ def prefix(self, namespace):
+ return self.__namespace_index.prefix(namespace)
+
+ def namespaces(self):
+ for r in self.__namespace_index.namespaces():
+ yield r
+
+ def __len__(self, context=None):
+ return self.__quad_index.__len__(context)
+
+ def close(self, commit_pending_transaction=True):
+ self.__open = False
+ self.__id_mapper.close()
+ self.__quad_index.close()
+ self.__namespace_index.close()
+ self.__db_env.close()
+
diff --git a/test/test_bdboptimized.py b/test/test_bdboptimized.py
new file mode 100644
index 00000000..2f01dc00
--- /dev/null
+++ b/test/test_bdboptimized.py
@@ -0,0 +1,17 @@
+import logging
+
+_logger = logging.getLogger(__name__)
+
+from test import test_graph
+from test import test_context
+
+
+class BDBOptimizedGraphTestCase(test_graph.GraphTestCase):
+ store_name = "BDBOptimized"
+ non_core = True
+ bsddb = True
+
+class BDBOptimizedStoreTestCase(test_context.ContextTestCase):
+ store = "BDBOptimized"
+ non_core = True
+ bsddb = True