summaryrefslogtreecommitdiff
path: root/git/test/test_example.py
blob: 8a80aac82b258b7dc2565e0541e96a72e5a51025 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
#
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Module with examples from the tutorial section of the docs"""
from lib import TestBase, fixture_path
from git.base import IStream
from git.db.py.loose import PureLooseObjectODB
from git.util import pool

from cStringIO import StringIO

from async import IteratorReader


class TestExamples(TestBase):

    def test_base(self):
        ldb = PureLooseObjectODB(fixture_path("../../../.git/objects"))

        for sha1 in ldb.sha_iter():
            oinfo = ldb.info(sha1)
            ostream = ldb.stream(sha1)
            assert oinfo[:3] == ostream[:3]

            assert len(ostream.read()) == ostream.size
            assert ldb.has_object(oinfo.binsha)
        # END for each sha in database
        # assure we close all files
        try:
            del(ostream)
            del(oinfo)
        except UnboundLocalError:
            pass
        # END ignore exception if there are no loose objects

        data = "my data"
        istream = IStream("blob", len(data), StringIO(data))

        # the object does not yet have a sha
        assert istream.binsha is None
        ldb.store(istream)
        # now the sha is set
        assert len(istream.binsha) == 20
        assert ldb.has_object(istream.binsha)

        # async operation
        # Create a reader from an iterator
        reader = IteratorReader(ldb.sha_iter())

        # get reader for object streams
        info_reader = ldb.stream_async(reader)

        # read one
        info = info_reader.read(1)[0]

        # read all the rest until depletion
        ostreams = info_reader.read()

        # set the pool to use two threads
        pool.set_size(2)

        # synchronize the mode of operation
        pool.set_size(0)