summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/git/odb/channel.py106
-rw-r--r--lib/git/odb/db.py11
-rw-r--r--test/git/odb/__init__.py1
-rw-r--r--test/git/odb/lib.py60
-rw-r--r--test/git/odb/test_channel.py61
-rw-r--r--test/git/odb/test_db.py90
-rw-r--r--test/git/odb/test_stream.py (renamed from test/git/test_odb.py)147
-rw-r--r--test/git/odb/test_utils.py15
8 files changed, 344 insertions, 147 deletions
diff --git a/lib/git/odb/channel.py b/lib/git/odb/channel.py
new file mode 100644
index 00000000..f6469d42
--- /dev/null
+++ b/lib/git/odb/channel.py
@@ -0,0 +1,106 @@
+"""Contains a queue based channel implementation"""
+from Queue import (
+ Queue,
+ Empty,
+ Full
+ )
+
+#{ Classes
+class Channel(object):
+ """A channel is similar to a system pipe. It has a write end as well as one or
+ more read ends. If Data is in the channel, it can be read, if not the read operation
+ will block until data becomes available.
+ If the channel is closed, any read operation will result in an exception
+
+ This base class is not instantiated directly, but instead serves as constructor
+ for RWChannel pairs.
+
+ Create a new channel """
+ __slots__ = tuple()
+ def __new__(cls, *args):
+ if cls is Channel:
+ max_items = 0
+ if len(args) == 1:
+ max_items = args[0]
+ if len(args) > 1:
+ raise ValueError("Specify not more than the number of items the channel should take")
+ wc = WChannel(max_items)
+ rc = RChannel(wc)
+ return wc, rc
+ # END constructor mode
+ return object.__new__(cls)
+
+class WChannel(Channel):
+ """The write end of a channel"""
+ __slots__ = ('_closed', '_queue')
+
+ def __init__(self, max_items=0):
+ """initialize this instance, able to hold max_items at once
+ Write calls will block if the channel is full, until someone reads from it"""
+ self._closed = False
+ self._queue = Queue(max_items)
+
+
+ #{ Interface
+ def write(self, item, block=True, timeout=None):
+ """Send an item into the channel, it can be read from the read end of the
+ channel accordingly
+ :param item: Item to send
+ :param block: If True, the call will block until there is free space in the
+ channel
+ :param timeout: timeout in seconds for blocking calls.
+ :raise IOError: when writing into closed file or when writing into a non-blocking
+ full channel
+ :note: may block if the channel has a limited capacity"""
+ if self._closed:
+ raise IOError("Cannot write to a closed channel")
+
+ try:
+ self._queue.put(item, block, timeout)
+ except Full:
+ raise IOError("Capacity of the channel was exeeded")
+ # END exception handling
+
+ def close(self):
+ """Close the channel. Multiple close calls on a closed channel are no
+ an error"""
+ self._closed = True
+
+ @property
+ def closed(self):
+ """:return: True if the channel was closed"""
+ return self._closed
+ #} END interface
+
+
+class RChannel(Channel):
+ """The read-end of a corresponding write channel"""
+ __slots__ = '_wc'
+
+ def __init__(self, wchannel):
+ """Initialize this instance from its parent write channel"""
+ self._wc = wchannel
+
+
+ #{ Interface
+
+ def read(self, block=True, timeout=None):
+ """:return: an item read from the channel
+ :param block: if True, the call will block until an item is available
+ :param timeout: if positive and block is True, it will block only for the
+ given amount of seconds.
+ :raise IOError: When reading from an empty channel ( if non-blocking, or
+ if the channel is still empty after the timeout"""
+ # if the channel is closed for writing, we never block
+ if self._wc.closed:
+ block = False
+
+ try:
+ return self._wc._queue.get(block, timeout)
+ except Empty:
+ raise IOError("Error reading from an empty channel")
+ # END handle reading
+
+ #} END interface
+
+#} END classes
diff --git a/lib/git/odb/db.py b/lib/git/odb/db.py
index d970b0bf..5d3cc6a3 100644
--- a/lib/git/odb/db.py
+++ b/lib/git/odb/db.py
@@ -142,11 +142,10 @@ class FileDBBase(object):
"""Initialize this instance to look for its files at the given root path
All subsequent operations will be relative to this path
:raise InvalidDBRoot:
- :note: The base will perform basic checking for accessability, but the subclass
- is required to verify that the root_path contains the database structure it needs"""
+ :note: The base will not perform any accessablity checking as the base
+ might not yet be accessible, but become accessible before the first
+ access."""
super(FileDBBase, self).__init__()
- if not os.path.isdir(root_path):
- raise InvalidDBRoot(root_path)
self._root_path = root_path
@@ -333,10 +332,10 @@ class GitObjectDB(LooseObjectDB):
def info(self, sha):
t = self._git.get_object_header(sha)
- return OInfo(t[0], t[1], t[2])
+ return OInfo(*t)
def stream(self, sha):
"""For now, all lookup is done by git itself"""
t = self._git.stream_object_data(sha)
- return OStream(t[0], t[1], t[2], t[3])
+ return OStream(*t)
diff --git a/test/git/odb/__init__.py b/test/git/odb/__init__.py
new file mode 100644
index 00000000..8b137891
--- /dev/null
+++ b/test/git/odb/__init__.py
@@ -0,0 +1 @@
+
diff --git a/test/git/odb/lib.py b/test/git/odb/lib.py
new file mode 100644
index 00000000..d5199748
--- /dev/null
+++ b/test/git/odb/lib.py
@@ -0,0 +1,60 @@
+"""Utilities used in ODB testing"""
+from git.odb import (
+ OStream,
+ )
+from git.odb.stream import Sha1Writer
+
+import zlib
+from cStringIO import StringIO
+
+#{ Stream Utilities
+
+class DummyStream(object):
+ def __init__(self):
+ self.was_read = False
+ self.bytes = 0
+ self.closed = False
+
+ def read(self, size):
+ self.was_read = True
+ self.bytes = size
+
+ def close(self):
+ self.closed = True
+
+ def _assert(self):
+ assert self.was_read
+
+
+class DeriveTest(OStream):
+ def __init__(self, sha, type, size, stream, *args, **kwargs):
+ self.myarg = kwargs.pop('myarg')
+ self.args = args
+
+ def _assert(self):
+ assert self.args
+ assert self.myarg
+
+
+class ZippedStoreShaWriter(Sha1Writer):
+ """Remembers everything someone writes to it"""
+ __slots__ = ('buf', 'zip')
+ def __init__(self):
+ Sha1Writer.__init__(self)
+ self.buf = StringIO()
+ self.zip = zlib.compressobj(1) # fastest
+
+ def __getattr__(self, attr):
+ return getattr(self.buf, attr)
+
+ def write(self, data):
+ alen = Sha1Writer.write(self, data)
+ self.buf.write(self.zip.compress(data))
+ return alen
+
+ def close(self):
+ self.buf.write(self.zip.flush())
+
+
+#} END stream utilitiess
+
diff --git a/test/git/odb/test_channel.py b/test/git/odb/test_channel.py
new file mode 100644
index 00000000..89b26582
--- /dev/null
+++ b/test/git/odb/test_channel.py
@@ -0,0 +1,61 @@
+"""Channel testing"""
+from test.testlib import *
+from git.odb.channel import *
+
+import time
+
+class TestDB(TestBase):
+
+ def test_base(self):
+ # creating channel yields a write and a read channal
+ wc, rc = Channel()
+ assert isinstance(wc, WChannel)
+ assert isinstance(rc, RChannel)
+
+ # everything else fails
+ self.failUnlessRaises(ValueError, Channel, 1, "too many args")
+
+ # TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO
+ item = 1
+ item2 = 2
+ wc.write(item)
+ wc.write(item2)
+ assert rc.read() == item
+ assert rc.read() == item2
+
+ # next read blocks, then raises - it waits a second
+ st = time.time()
+ self.failUnlessRaises(IOError, rc.read, True, 1)
+ assert time.time() - st >= 1.0
+
+ # writing to a closed channel raises
+ assert not wc.closed
+ wc.close()
+ assert wc.closed
+ wc.close() # fine
+ assert wc.closed
+
+ self.failUnlessRaises(IOError, wc.write, 1)
+
+ # reading from a closed channel never blocks
+ self.failUnlessRaises(IOError, rc.read)
+
+
+
+ # TEST LIMITED SIZE CHANNEL
+ # channel with max-items set
+ wc, rc = Channel(1)
+ wc.write(item) # fine
+
+ # blocks for a second, its full
+ st = time.time()
+ self.failUnlessRaises(IOError, wc.write, item, True, 1)
+ assert time.time() - st >= 1.0
+
+ # get one
+ assert rc.read() == item
+
+ # its empty,can put one again
+ wc.write(item2)
+ assert rc.read() == item2
+ wc.close()
diff --git a/test/git/odb/test_db.py b/test/git/odb/test_db.py
new file mode 100644
index 00000000..35ba8680
--- /dev/null
+++ b/test/git/odb/test_db.py
@@ -0,0 +1,90 @@
+"""Test for object db"""
+from test.testlib import *
+from lib import ZippedStoreShaWriter
+
+from git.odb import *
+from git.odb.stream import Sha1Writer
+from git import Blob
+from git.errors import BadObject
+
+
+from cStringIO import StringIO
+import os
+
+class TestDB(TestBase):
+ """Test the different db class implementations"""
+
+ # data
+ two_lines = "1234\nhello world"
+
+ all_data = (two_lines, )
+
+ def _assert_object_writing(self, db):
+ """General tests to verify object writing, compatible to ObjectDBW
+ :note: requires write access to the database"""
+ # start in 'dry-run' mode, using a simple sha1 writer
+ ostreams = (ZippedStoreShaWriter, None)
+ for ostreamcls in ostreams:
+ for data in self.all_data:
+ dry_run = ostreamcls is not None
+ ostream = None
+ if ostreamcls is not None:
+ ostream = ostreamcls()
+ assert isinstance(ostream, Sha1Writer)
+ # END create ostream
+
+ prev_ostream = db.set_ostream(ostream)
+ assert type(prev_ostream) in ostreams or prev_ostream in ostreams
+
+ istream = IStream(Blob.type, len(data), StringIO(data))
+
+ # store returns same istream instance, with new sha set
+ my_istream = db.store(istream)
+ sha = istream.sha
+ assert my_istream is istream
+ assert db.has_object(sha) != dry_run
+ assert len(sha) == 40 # for now we require 40 byte shas as default
+
+ # verify data - the slow way, we want to run code
+ if not dry_run:
+ info = db.info(sha)
+ assert Blob.type == info.type
+ assert info.size == len(data)
+
+ ostream = db.stream(sha)
+ assert ostream.read() == data
+ assert ostream.type == Blob.type
+ assert ostream.size == len(data)
+ else:
+ self.failUnlessRaises(BadObject, db.info, sha)
+ self.failUnlessRaises(BadObject, db.stream, sha)
+
+ # DIRECT STREAM COPY
+ # our data hase been written in object format to the StringIO
+ # we pasesd as output stream. No physical database representation
+ # was created.
+ # Test direct stream copy of object streams, the result must be
+ # identical to what we fed in
+ ostream.seek(0)
+ istream.stream = ostream
+ assert istream.sha is not None
+ prev_sha = istream.sha
+
+ db.set_ostream(ZippedStoreShaWriter())
+ db.store(istream)
+ assert istream.sha == prev_sha
+ new_ostream = db.ostream()
+
+ # note: only works as long our store write uses the same compression
+ # level, which is zip
+ assert ostream.getvalue() == new_ostream.getvalue()
+ # END for each data set
+ # END for each dry_run mode
+
+ @with_bare_rw_repo
+ def test_writing(self, rwrepo):
+ ldb = LooseObjectDB(os.path.join(rwrepo.git_dir, 'objects'))
+
+ # write data
+ self._assert_object_writing(ldb)
+
diff --git a/test/git/test_odb.py b/test/git/odb/test_stream.py
index 5c8268cd..020fe6bd 100644
--- a/test/git/test_odb.py
+++ b/test/git/odb/test_stream.py
@@ -1,71 +1,20 @@
"""Test for object db"""
from test.testlib import *
-from git.odb import *
-from git.odb.utils import (
- to_hex_sha,
- to_bin_sha
+from lib import (
+ DummyStream,
+ DeriveTest,
+ Sha1Writer
)
-from git.odb.stream import Sha1Writer
+
+from git.odb import *
from git import Blob
-from git.errors import BadObject
from cStringIO import StringIO
import tempfile
import os
import zlib
-#{ Stream Utilities
-
-class DummyStream(object):
- def __init__(self):
- self.was_read = False
- self.bytes = 0
- self.closed = False
-
- def read(self, size):
- self.was_read = True
- self.bytes = size
-
- def close(self):
- self.closed = True
-
- def _assert(self):
- assert self.was_read
-
-
-class DeriveTest(OStream):
- def __init__(self, sha, type, size, stream, *args, **kwargs):
- self.myarg = kwargs.pop('myarg')
- self.args = args
-
- def _assert(self):
- assert self.args
- assert self.myarg
-
-
-class ZippedStoreShaWriter(Sha1Writer):
- """Remembers everything someone writes to it"""
- __slots__ = ('buf', 'zip')
- def __init__(self):
- Sha1Writer.__init__(self)
- self.buf = StringIO()
- self.zip = zlib.compressobj(1) # fastest
-
- def __getattr__(self, attr):
- return getattr(self.buf, attr)
-
- def write(self, data):
- alen = Sha1Writer.write(self, data)
- self.buf.write(self.zip.compress(data))
- return alen
-
- def close(self):
- self.buf.write(self.zip.flush())
-
-#} END stream utilitiess
-
-
class TestStream(TestBase):
"""Test stream classes"""
@@ -220,88 +169,4 @@ class TestStream(TestBase):
os.remove(path)
# END for each os
-
-class TestUtils(TestBase):
- def test_basics(self):
- assert to_hex_sha(Blob.NULL_HEX_SHA) == Blob.NULL_HEX_SHA
- assert len(to_bin_sha(Blob.NULL_HEX_SHA)) == 20
- assert to_hex_sha(to_bin_sha(Blob.NULL_HEX_SHA)) == Blob.NULL_HEX_SHA
-
-class TestDB(TestBase):
- """Test the different db class implementations"""
-
- # data
- two_lines = "1234\nhello world"
-
- all_data = (two_lines, )
-
- def _assert_object_writing(self, db):
- """General tests to verify object writing, compatible to ObjectDBW
- :note: requires write access to the database"""
- # start in 'dry-run' mode, using a simple sha1 writer
- ostreams = (ZippedStoreShaWriter, None)
- for ostreamcls in ostreams:
- for data in self.all_data:
- dry_run = ostreamcls is not None
- ostream = None
- if ostreamcls is not None:
- ostream = ostreamcls()
- assert isinstance(ostream, Sha1Writer)
- # END create ostream
-
- prev_ostream = db.set_ostream(ostream)
- assert type(prev_ostream) in ostreams or prev_ostream in ostreams
-
- istream = IStream(Blob.type, len(data), StringIO(data))
-
- # store returns same istream instance, with new sha set
- my_istream = db.store(istream)
- sha = istream.sha
- assert my_istream is istream
- assert db.has_object(sha) != dry_run
- assert len(sha) == 40 # for now we require 40 byte shas as default
-
- # verify data - the slow way, we want to run code
- if not dry_run:
- info = db.info(sha)
- assert Blob.type == info.type
- assert info.size == len(data)
-
- ostream = db.stream(sha)
- assert ostream.read() == data
- assert ostream.type == Blob.type
- assert ostream.size == len(data)
- else:
- self.failUnlessRaises(BadObject, db.info, sha)
- self.failUnlessRaises(BadObject, db.stream, sha)
-
- # DIRECT STREAM COPY
- # our data hase been written in object format to the StringIO
- # we pasesd as output stream. No physical database representation
- # was created.
- # Test direct stream copy of object streams, the result must be
- # identical to what we fed in
- ostream.seek(0)
- istream.stream = ostream
- assert istream.sha is not None
- prev_sha = istream.sha
-
- db.set_ostream(ZippedStoreShaWriter())
- db.store(istream)
- assert istream.sha == prev_sha
- new_ostream = db.ostream()
-
- # note: only works as long our store write uses the same compression
- # level, which is zip
- assert ostream.getvalue() == new_ostream.getvalue()
- # END for each data set
- # END for each dry_run mode
-
- @with_bare_rw_repo
- def test_writing(self, rwrepo):
- ldb = LooseObjectDB(os.path.join(rwrepo.git_dir, 'objects'))
-
- # write data
- self._assert_object_writing(ldb)
-
diff --git a/test/git/odb/test_utils.py b/test/git/odb/test_utils.py
new file mode 100644
index 00000000..34572b37
--- /dev/null
+++ b/test/git/odb/test_utils.py
@@ -0,0 +1,15 @@
+"""Test for object db"""
+from test.testlib import *
+from git import Blob
+from git.odb.utils import (
+ to_hex_sha,
+ to_bin_sha
+ )
+
+
+class TestUtils(TestBase):
+ def test_basics(self):
+ assert to_hex_sha(Blob.NULL_HEX_SHA) == Blob.NULL_HEX_SHA
+ assert len(to_bin_sha(Blob.NULL_HEX_SHA)) == 20
+ assert to_hex_sha(to_bin_sha(Blob.NULL_HEX_SHA)) == Blob.NULL_HEX_SHA
+