| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
 | # -*- coding: utf-8 -*-
# test_commit.py
# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
from __future__ import print_function
from git.test.lib import (
    TestBase,
    assert_equal,
    assert_not_equal,
    with_rw_repo,
    fixture_path,
    StringProcessAdapter
)
from git import (
    Commit,
    Actor,
)
from gitdb import IStream
from gitdb.util import hex_to_bin
from git.compat import (
    string_types,
    text_type
)
from io import BytesIO
import time
import sys
import re
def assert_commit_serialization(rwrepo, commit_id, print_performance_info=False):
    """traverse all commits in the history of commit identified by commit_id and check
    if the serialization works.
    :param print_performance_info: if True, we will show how fast we are"""
    ns = 0      # num serializations
    nds = 0     # num deserializations
    st = time.time()
    for cm in rwrepo.commit(commit_id).traverse():
        nds += 1
        # assert that we deserialize commits correctly, hence we get the same
        # sha on serialization
        stream = BytesIO()
        cm._serialize(stream)
        ns += 1
        streamlen = stream.tell()
        stream.seek(0)
        istream = rwrepo.odb.store(IStream(Commit.type, streamlen, stream))
        assert istream.hexsha == cm.hexsha
        nc = Commit(rwrepo, Commit.NULL_BIN_SHA, cm.tree,
                    cm.author, cm.authored_date, cm.author_tz_offset,
                    cm.committer, cm.committed_date, cm.committer_tz_offset,
                    cm.message, cm.parents, cm.encoding)
        assert nc.parents == cm.parents
        stream = BytesIO()
        nc._serialize(stream)
        ns += 1
        streamlen = stream.tell()
        stream.seek(0)
        # reuse istream
        istream.size = streamlen
        istream.stream = stream
        istream.binsha = None
        nc.binsha = rwrepo.odb.store(istream).binsha
        # if it worked, we have exactly the same contents !
        assert nc.hexsha == cm.hexsha
    # END check commits
    elapsed = time.time() - st
    if print_performance_info:
        print("Serialized %i and deserialized %i commits in %f s ( (%f, %f) commits / s"
              % (ns, nds, elapsed, ns / elapsed, nds / elapsed), file=sys.stderr)
    # END handle performance info
class TestCommit(TestBase):
    def test_bake(self):
        commit = self.rorepo.commit('2454ae89983a4496a445ce347d7a41c0bb0ea7ae')
        # commits have no dict
        self.failUnlessRaises(AttributeError, setattr, commit, 'someattr', 1)
        commit.author  # bake
        assert_equal("Sebastian Thiel", commit.author.name)
        assert_equal("byronimo@gmail.com", commit.author.email)
        assert commit.author == commit.committer
        assert isinstance(commit.authored_date, int) and isinstance(commit.committed_date, int)
        assert isinstance(commit.author_tz_offset, int) and isinstance(commit.committer_tz_offset, int)
        assert commit.message == "Added missing information to docstrings of commit and stats module\n"
    def test_stats(self):
        commit = self.rorepo.commit('33ebe7acec14b25c5f84f35a664803fcab2f7781')
        stats = commit.stats
        def check_entries(d):
            assert isinstance(d, dict)
            for key in ("insertions", "deletions", "lines"):
                assert key in d
        # END assertion helper
        assert stats.files
        assert stats.total
        check_entries(stats.total)
        assert "files" in stats.total
        for filepath, d in stats.files.items():
            check_entries(d)
        # END for each stated file
        # assure data is parsed properly
        michael = Actor._from_string("Michael Trier <mtrier@gmail.com>")
        assert commit.author == michael
        assert commit.committer == michael
        assert commit.authored_date == 1210193388
        assert commit.committed_date == 1210193388
        assert commit.author_tz_offset == 14400, commit.author_tz_offset
        assert commit.committer_tz_offset == 14400, commit.committer_tz_offset
        assert commit.message == "initial project\n"
    def test_unicode_actor(self):
        # assure we can parse unicode actors correctly
        name = "Üäöß ÄußÉ".decode("utf-8")
        assert len(name) == 9
        special = Actor._from_string(u"%s <something@this.com>" % name)
        assert special.name == name
        assert isinstance(special.name, text_type)
    def test_traversal(self):
        start = self.rorepo.commit("a4d06724202afccd2b5c54f81bcf2bf26dea7fff")
        first = self.rorepo.commit("33ebe7acec14b25c5f84f35a664803fcab2f7781")
        p0 = start.parents[0]
        p1 = start.parents[1]
        p00 = p0.parents[0]
        p10 = p1.parents[0]
        # basic branch first, depth first
        dfirst = start.traverse(branch_first=False)
        bfirst = start.traverse(branch_first=True)
        assert dfirst.next() == p0
        assert dfirst.next() == p00
        assert bfirst.next() == p0
        assert bfirst.next() == p1
        assert bfirst.next() == p00
        assert bfirst.next() == p10
        # at some point, both iterations should stop
        assert list(bfirst)[-1] == first
        stoptraverse = self.rorepo.commit("254d04aa3180eb8b8daf7b7ff25f010cd69b4e7d").traverse(as_edge=True)
        l = list(stoptraverse)
        assert len(l[0]) == 2
        # ignore self
        assert start.traverse(ignore_self=False).next() == start
        # depth
        assert len(list(start.traverse(ignore_self=False, depth=0))) == 1
        # prune
        assert start.traverse(branch_first=1, prune=lambda i, d: i == p0).next() == p1
        # predicate
        assert start.traverse(branch_first=1, predicate=lambda i, d: i == p1).next() == p1
        # traversal should stop when the beginning is reached
        self.failUnlessRaises(StopIteration, first.traverse().next)
        # parents of the first commit should be empty ( as the only parent has a null
        # sha )
        assert len(first.parents) == 0
    def test_iteration(self):
        # we can iterate commits
        all_commits = Commit.list_items(self.rorepo, self.rorepo.head)
        assert all_commits
        assert all_commits == list(self.rorepo.iter_commits())
        # this includes merge commits
        mcomit = self.rorepo.commit('d884adc80c80300b4cc05321494713904ef1df2d')
        assert mcomit in all_commits
        # we can limit the result to paths
        ltd_commits = list(self.rorepo.iter_commits(paths='CHANGES'))
        assert ltd_commits and len(ltd_commits) < len(all_commits)
        # show commits of multiple paths, resulting in a union of commits
        less_ltd_commits = list(Commit.iter_items(self.rorepo, 'master', paths=('CHANGES', 'AUTHORS')))
        assert len(ltd_commits) < len(less_ltd_commits)
    def test_iter_items(self):
        # pretty not allowed
        self.failUnlessRaises(ValueError, Commit.iter_items, self.rorepo, 'master', pretty="raw")
    def test_rev_list_bisect_all(self):
        """
        'git rev-list --bisect-all' returns additional information
        in the commit header.  This test ensures that we properly parse it.
        """
        revs = self.rorepo.git.rev_list('933d23bf95a5bd1624fbcdf328d904e1fa173474',
                                        first_parent=True,
                                        bisect_all=True)
        commits = Commit._iter_from_process_or_stream(self.rorepo, StringProcessAdapter(revs))
        expected_ids = (
            '7156cece3c49544abb6bf7a0c218eb36646fad6d',
            '1f66cfbbce58b4b552b041707a12d437cc5f400a',
            '33ebe7acec14b25c5f84f35a664803fcab2f7781',
            '933d23bf95a5bd1624fbcdf328d904e1fa173474'
        )
        for sha1, commit in zip(expected_ids, commits):
            assert_equal(sha1, commit.hexsha)
    def test_count(self):
        assert self.rorepo.tag('refs/tags/0.1.5').commit.count() == 143
    def test_list(self):
        assert isinstance(Commit.list_items(self.rorepo, '0.1.5', max_count=5)[
                          hex_to_bin('5117c9c8a4d3af19a9958677e45cda9269de1541')], Commit)
    def test_str(self):
        commit = Commit(self.rorepo, Commit.NULL_BIN_SHA)
        assert_equal(Commit.NULL_HEX_SHA, str(commit))
    def test_repr(self):
        commit = Commit(self.rorepo, Commit.NULL_BIN_SHA)
        assert_equal('<git.Commit "%s">' % Commit.NULL_HEX_SHA, repr(commit))
    def test_equality(self):
        commit1 = Commit(self.rorepo, Commit.NULL_BIN_SHA)
        commit2 = Commit(self.rorepo, Commit.NULL_BIN_SHA)
        commit3 = Commit(self.rorepo, "\1" * 20)
        assert_equal(commit1, commit2)
        assert_not_equal(commit2, commit3)
    def test_iter_parents(self):
        # should return all but ourselves, even if skip is defined
        c = self.rorepo.commit('0.1.5')
        for skip in (0, 1):
            piter = c.iter_parents(skip=skip)
            first_parent = piter.next()
            assert first_parent != c
            assert first_parent == c.parents[0]
        # END for each
    def test_base(self):
        name_rev = self.rorepo.head.commit.name_rev
        assert isinstance(name_rev, string_types)
    @with_rw_repo('HEAD', bare=True)
    def test_serialization(self, rwrepo):
        # create all commits of our repo
        assert_commit_serialization(rwrepo, '0.1.6')
    def test_serialization_unicode_support(self):
        assert Commit.default_encoding.lower() == 'utf-8'
        # create a commit with unicode in the message, and the author's name
        # Verify its serialization and deserialization
        cmt = self.rorepo.commit('0.1.6')
        assert isinstance(cmt.message, text_type)     # it automatically decodes it as such
        assert isinstance(cmt.author.name, text_type)  # same here
        cmt.message = "üäêèß".decode("utf-8")
        assert len(cmt.message) == 5
        cmt.author.name = "äüß".decode("utf-8")
        assert len(cmt.author.name) == 3
        cstream = BytesIO()
        cmt._serialize(cstream)
        cstream.seek(0)
        assert len(cstream.getvalue())
        ncmt = Commit(self.rorepo, cmt.binsha)
        ncmt._deserialize(cstream)
        assert cmt.author.name == ncmt.author.name
        assert cmt.message == ncmt.message
        # actually, it can't be printed in a shell as repr wants to have ascii only
        # it appears
        cmt.author.__repr__()
    def test_gpgsig(self):
        cmt = self.rorepo.commit()
        cmt._deserialize(open(fixture_path('commit_with_gpgsig')))
        fixture_sig = """-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.11 (GNU/Linux)
iQIcBAABAgAGBQJRk8zMAAoJEG5mS6x6i9IjsTEP/0v2Wx/i7dqyKban6XMIhVdj
uI0DycfXqnCCZmejidzeao+P+cuK/ZAA/b9fU4MtwkDm2USvnIOrB00W0isxsrED
sdv6uJNa2ybGjxBolLrfQcWutxGXLZ1FGRhEvkPTLMHHvVriKoNFXcS7ewxP9MBf
NH97K2wauqA+J4BDLDHQJgADCOmLrGTAU+G1eAXHIschDqa6PZMH5nInetYZONDh
3SkOOv8VKFIF7gu8X7HC+7+Y8k8U0TW0cjlQ2icinwCc+KFoG6GwXS7u/VqIo1Yp
Tack6sxIdK7NXJhV5gAeAOMJBGhO0fHl8UUr96vGEKwtxyZhWf8cuIPOWLk06jA0
g9DpLqmy/pvyRfiPci+24YdYRBua/vta+yo/Lp85N7Hu/cpIh+q5WSLvUlv09Dmo
TTTG8Hf6s3lEej7W8z2xcNZoB6GwXd8buSDU8cu0I6mEO9sNtAuUOHp2dBvTA6cX
PuQW8jg3zofnx7CyNcd3KF3nh2z8mBcDLgh0Q84srZJCPRuxRcp9ylggvAG7iaNd
XMNvSK8IZtWLkx7k3A3QYt1cN4y1zdSHLR2S+BVCEJea1mvUE+jK5wiB9S4XNtKm
BX/otlTa8pNE3fWYBxURvfHnMY4i3HQT7Bc1QjImAhMnyo2vJk4ORBJIZ1FTNIhJ
JzJMZDRLQLFvnzqZuCjE
=przd
-----END PGP SIGNATURE-----"""
        assert cmt.gpgsig == fixture_sig
        cmt.gpgsig = "<test\ndummy\nsig>"
        assert cmt.gpgsig != fixture_sig
        cstream = BytesIO()
        cmt._serialize(cstream)
        assert re.search(r"^gpgsig <test\n dummy\n sig>$", cstream.getvalue(), re.MULTILINE)
        cstream.seek(0)
        cmt.gpgsig = None
        cmt._deserialize(cstream)
        assert cmt.gpgsig == "<test\ndummy\nsig>"
        cmt.gpgsig = None
        cstream = BytesIO()
        cmt._serialize(cstream)
        assert not re.search(r"^gpgsig ", cstream.getvalue(), re.MULTILINE)
 |