1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
"""
This file provides a single function `serialize_in_chunks()` which can serialize a
Graph into a number of NT files with a maximum number of triples or maximum file size.
There is an option to preserve any prefixes declared for the original graph in the first
file, which will be a Turtle file.
"""
from contextlib import ExitStack, contextmanager
from pathlib import Path
from typing import TYPE_CHECKING, BinaryIO, Generator, Optional, Tuple
from rdflib.graph import Graph
from rdflib.plugins.serializers.nt import _nt_row
# from rdflib.term import Literal
# if TYPE_CHECKING:
# from rdflib.graph import _TriplePatternType
__all__ = ["serialize_in_chunks"]
def serialize_in_chunks(
g: Graph,
max_triples: int = 10000,
max_file_size_kb: Optional[int] = None,
file_name_stem: str = "chunk",
output_dir: Optional[Path] = None,
write_prefixes: bool = False,
) -> None:
"""
Serializes a given Graph into a series of n-triples with a given length.
:param g:
The graph to serialize.
:param max_file_size_kb:
Maximum size per NT file in kB (1,000 bytes)
Equivalent to ~6,000 triples, depending on Literal sizes.
:param max_triples:
Maximum size per NT file in triples
Equivalent to lines in file.
If both this parameter and max_file_size_kb are set, max_file_size_kb will be used.
:param file_name_stem:
Prefix of each file name.
e.g. "chunk" = chunk_000001.nt, chunk_000002.nt...
:param output_dir:
The directory you want the files to be written to.
:param write_prefixes:
The first file created is a Turtle file containing original graph prefixes.
See ``../test/test_tools/test_chunk_serializer.py`` for examples of this in use.
"""
if output_dir is None:
output_dir = Path.cwd()
if not output_dir.is_dir():
raise ValueError(
"If you specify an output_dir, it must actually be a directory!"
)
@contextmanager
def _start_new_file(file_no: int) -> Generator[Tuple[Path, BinaryIO], None, None]:
if TYPE_CHECKING:
# this is here because mypy gets a bit confused
assert output_dir is not None
fp = Path(output_dir) / f"{file_name_stem}_{str(file_no).zfill(6)}.nt"
with open(fp, "wb") as fh:
yield fp, fh
def _serialize_prefixes(g: Graph) -> str:
pres = []
for k, v in g.namespace_manager.namespaces():
pres.append(f"PREFIX {k}: <{v}>")
return "\n".join(sorted(pres)) + "\n"
if write_prefixes:
with open(
Path(output_dir) / f"{file_name_stem}_000000.ttl", "w", encoding="utf-8"
) as fh:
fh.write(_serialize_prefixes(g))
bytes_written = 0
with ExitStack() as xstack:
if max_file_size_kb is not None:
max_file_size = max_file_size_kb * 1000
file_no = 1 if write_prefixes else 0
for i, t in enumerate(g.triples((None, None, None))):
row_bytes = _nt_row(t).encode("utf-8")
if len(row_bytes) > max_file_size:
raise ValueError(
# type error: Unsupported operand types for / ("bytes" and "int")
f"cannot write triple {t!r} as it's serialized size of {row_bytes / 1000} exceeds max_file_size_kb = {max_file_size_kb}" # type: ignore[operator]
)
if i == 0:
fp, fhb = xstack.enter_context(_start_new_file(file_no))
bytes_written = 0
elif (bytes_written + len(row_bytes)) >= max_file_size:
file_no += 1
fp, fhb = xstack.enter_context(_start_new_file(file_no))
bytes_written = 0
bytes_written += fhb.write(row_bytes)
else:
# count the triples in the graph
graph_length = len(g)
if graph_length <= max_triples:
# the graph is less than max so just NT serialize the whole thing
g.serialize(
destination=Path(output_dir) / f"{file_name_stem}_all.nt",
format="nt",
)
else:
# graph_length is > max_lines, make enough files for all graph
# no_files = math.ceil(graph_length / max_triples)
file_no = 1 if write_prefixes else 0
for i, t in enumerate(g.triples((None, None, None))):
if i % max_triples == 0:
fp, fhb = xstack.enter_context(_start_new_file(file_no))
file_no += 1
fhb.write(_nt_row(t).encode("utf-8"))
return
|