1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
|
# This class takes advantage of the fact that all formats v0, v1 and v2 of
# messages storage has the same byte offsets for Length and Magic fields.
# Lets look closely at what leading bytes all versions have:
#
# V0 and V1 (Offset is MessageSet part, other bytes are Message ones):
# Offset => Int64
# BytesLength => Int32
# CRC => Int32
# Magic => Int8
# ...
#
# V2:
# BaseOffset => Int64
# Length => Int32
# PartitionLeaderEpoch => Int32
# Magic => Int8
# ...
#
# So we can iterate over batches just by knowing offsets of Length. Magic is
# used to construct the correct class for Batch itself.
from __future__ import division
import struct
from kafka.errors import CorruptRecordException
from kafka.record.abc import ABCRecords
from kafka.record.legacy_records import LegacyRecordBatch, LegacyRecordBatchBuilder
from kafka.record.default_records import DefaultRecordBatch, DefaultRecordBatchBuilder
class MemoryRecords(ABCRecords):
LENGTH_OFFSET = struct.calcsize(">q")
LOG_OVERHEAD = struct.calcsize(">qi")
MAGIC_OFFSET = struct.calcsize(">qii")
# Minimum space requirements for Record V0
MIN_SLICE = LOG_OVERHEAD + LegacyRecordBatch.RECORD_OVERHEAD_V0
__slots__ = ("_buffer", "_pos", "_next_slice", "_remaining_bytes")
def __init__(self, bytes_data):
self._buffer = bytes_data
self._pos = 0
# We keep one slice ahead so `has_next` will return very fast
self._next_slice = None
self._remaining_bytes = None
self._cache_next()
def size_in_bytes(self):
return len(self._buffer)
def valid_bytes(self):
# We need to read the whole buffer to get the valid_bytes.
# NOTE: in Fetcher we do the call after iteration, so should be fast
if self._remaining_bytes is None:
next_slice = self._next_slice
pos = self._pos
while self._remaining_bytes is None:
self._cache_next()
# Reset previous iterator position
self._next_slice = next_slice
self._pos = pos
return len(self._buffer) - self._remaining_bytes
# NOTE: we cache offsets here as kwargs for a bit more speed, as cPython
# will use LOAD_FAST opcode in this case
def _cache_next(self, len_offset=LENGTH_OFFSET, log_overhead=LOG_OVERHEAD):
buffer = self._buffer
buffer_len = len(buffer)
pos = self._pos
remaining = buffer_len - pos
if remaining < log_overhead:
# Will be re-checked in Fetcher for remaining bytes.
self._remaining_bytes = remaining
self._next_slice = None
return
length, = struct.unpack_from(
">i", buffer, pos + len_offset)
slice_end = pos + log_overhead + length
if slice_end > buffer_len:
# Will be re-checked in Fetcher for remaining bytes
self._remaining_bytes = remaining
self._next_slice = None
return
self._next_slice = memoryview(buffer)[pos: slice_end]
self._pos = slice_end
def has_next(self):
return self._next_slice is not None
# NOTE: same cache for LOAD_FAST as above
def next_batch(self, _min_slice=MIN_SLICE,
_magic_offset=MAGIC_OFFSET):
next_slice = self._next_slice
if next_slice is None:
return None
if len(next_slice) < _min_slice:
raise CorruptRecordException(
"Record size is less than the minimum record overhead "
"({})".format(_min_slice - self.LOG_OVERHEAD))
self._cache_next()
magic, = struct.unpack_from(">b", next_slice, _magic_offset)
if magic <= 1:
return LegacyRecordBatch(next_slice, magic)
else:
return DefaultRecordBatch(next_slice)
class MemoryRecordsBuilder(object):
__slots__ = ("_builder", "_batch_size", "_buffer", "_next_offset", "_closed",
"_bytes_written")
def __init__(self, magic, compression_type, batch_size):
assert magic in [0, 1, 2], "Not supported magic"
assert compression_type in [0, 1, 2, 3, 4], "Not valid compression type"
if magic >= 2:
self._builder = DefaultRecordBatchBuilder(
magic=magic, compression_type=compression_type,
is_transactional=False, producer_id=-1, producer_epoch=-1,
base_sequence=-1, batch_size=batch_size)
else:
self._builder = LegacyRecordBatchBuilder(
magic=magic, compression_type=compression_type,
batch_size=batch_size)
self._batch_size = batch_size
self._buffer = None
self._next_offset = 0
self._closed = False
self._bytes_written = 0
def append(self, timestamp, key, value, headers=[]):
""" Append a message to the buffer.
Returns: RecordMetadata or None if unable to append
"""
if self._closed:
return None
offset = self._next_offset
metadata = self._builder.append(offset, timestamp, key, value, headers)
# Return of None means there's no space to add a new message
if metadata is None:
return None
self._next_offset += 1
return metadata
def close(self):
# This method may be called multiple times on the same batch
# i.e., on retries
# we need to make sure we only close it out once
# otherwise compressed messages may be double-compressed
# see Issue 718
if not self._closed:
self._bytes_written = self._builder.size()
self._buffer = bytes(self._builder.build())
self._builder = None
self._closed = True
def size_in_bytes(self):
if not self._closed:
return self._builder.size()
else:
return len(self._buffer)
def compression_rate(self):
assert self._closed
return self.size_in_bytes() / self._bytes_written
def is_full(self):
if self._closed:
return True
else:
return self._builder.size() >= self._batch_size
def next_offset(self):
return self._next_offset
def buffer(self):
assert self._closed
return self._buffer
|