summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-31 15:22:42 -0700
committerDana Powers <dana.powers@gmail.com>2016-09-24 14:01:05 -0700
commitc72b69ba58eb88ce90ddb3db6e3ff2ff7cb2ab64 (patch)
tree6958da7aa675201beb1be6c878fc8ff7f154c02d
parent26fe1f2e296aa78e0fe79c01f0b974dfc8741246 (diff)
downloadkafka-python-c72b69ba58eb88ce90ddb3db6e3ff2ff7cb2ab64.tar.gz
Fixes to kafka.streams -- simple source -> sink topology works
-rw-r--r--kafka/streams/kafka.py18
-rw-r--r--kafka/streams/processor/assignment/subscription_info.py7
-rw-r--r--kafka/streams/processor/assignment/task_assignor.py2
-rw-r--r--kafka/streams/processor/node.py23
-rw-r--r--kafka/streams/processor/partition_group.py14
-rw-r--r--kafka/streams/processor/record_collector.py2
-rw-r--r--kafka/streams/processor/record_queue.py14
-rw-r--r--kafka/streams/processor/stream_partition_assignor.py28
-rw-r--r--kafka/streams/processor/stream_task.py6
-rw-r--r--kafka/streams/processor/stream_thread.py90
-rw-r--r--kafka/streams/processor/task.py30
-rw-r--r--kafka/streams/processor/topology_builder.py2
12 files changed, 157 insertions, 79 deletions
diff --git a/kafka/streams/kafka.py b/kafka/streams/kafka.py
index bc7dcec..9eb71e3 100644
--- a/kafka/streams/kafka.py
+++ b/kafka/streams/kafka.py
@@ -23,6 +23,7 @@ import uuid
import kafka.streams.errors as Errors
from .processor.stream_thread import StreamThread
+from .processor.partition_group import partition_grouper
from .utils import AtomicInteger
log = logging.getLogger(__name__)
@@ -82,7 +83,22 @@ class KafkaStreams(object):
DEFAULT_CONFIG = {
'application_id': None,
'bootstrap_servers': None,
+ 'client_id': 'kafka-python-streams',
+ 'zookeeper_connect': '',
+ 'state_dir': '/tmp/kafka-streams',
+ 'replication_factor': 1,
+ 'timestamp_extractor': lambda x: x.timestamp,
+ 'partition_grouper': partition_grouper,
+ 'key_serializer': None,
+ 'key_deserializer': None,
+ 'value_serializer': None,
+ 'value_deserializer': None,
+ 'commit_interval_ms': 30000,
+ 'poll_ms': 100,
'num_stream_threads': 1,
+ 'num_standby_replicas': 0,
+ 'buffered_records_per_partition': 1000,
+ 'state_cleanup_delay_ms': 60000,
}
def __init__(self, builder, **configs):
@@ -173,7 +189,7 @@ class KafkaStreams(object):
self._state = STOPPED
log.info('Stopped Kafka Stream process')
- def setUncaughtExceptionHandler(self, handler):
+ def set_uncaught_exception_handler(self, handler):
"""Sets the handler invoked when a stream thread abruptly terminates
due to an uncaught exception.
diff --git a/kafka/streams/processor/assignment/subscription_info.py b/kafka/streams/processor/assignment/subscription_info.py
index c7fce14..3452f43 100644
--- a/kafka/streams/processor/assignment/subscription_info.py
+++ b/kafka/streams/processor/assignment/subscription_info.py
@@ -20,6 +20,7 @@ import json
import logging
from kafka.streams.errors import TaskAssignmentError
+from kafka.streams.processor.partition_group import TaskId
log = logging.getLogger(__name__)
@@ -53,9 +54,9 @@ class SubscriptionInfo(object):
if decoded['version'] != cls.CURRENT_VERSION:
raise TaskAssignmentError('unable to decode subscription data: version=' + str(cls.version))
- decoded['prev_tasks'] = set(decoded['prev_tasks'])
- decoded['standby_tasks'] = set(decoded['standby_tasks'])
+ decoded['prev_tasks'] = set([TaskId(*item) for item in decoded['prev_tasks']])
+ decoded['standby_tasks'] = set([TaskId(*item) for item in decoded['standby_tasks']])
return cls(decoded['process_id'], decoded['prev_tasks'], decoded['standby_tasks'], decoded['version'])
except Exception as e:
- raise TaskAssignmentError('unable to decode subscription data', e)
+ raise TaskAssignmentError('unable to decode subscription data', data, e)
diff --git a/kafka/streams/processor/assignment/task_assignor.py b/kafka/streams/processor/assignment/task_assignor.py
index 8cb6c4e..a295faf 100644
--- a/kafka/streams/processor/assignment/task_assignor.py
+++ b/kafka/streams/processor/assignment/task_assignor.py
@@ -135,7 +135,7 @@ class TaskAssignor(object):
return False
def compute_addition_cost(self, task, state):
- cost = len(state.assignedTasks) // state.capacity
+ cost = len(state.assigned_tasks) // state.capacity
if task in state.prev_assigned_tasks:
if task in state.prev_active_tasks:
diff --git a/kafka/streams/processor/node.py b/kafka/streams/processor/node.py
index 90b0a6a..c672066 100644
--- a/kafka/streams/processor/node.py
+++ b/kafka/streams/processor/node.py
@@ -26,7 +26,8 @@ class ProcessorNode(object):
self.name = name
# Could we construct a Processor here if the processor is just a function?
- assert isinstance(processor, Processor), 'processor must subclass Processor'
+ if processor is not None:
+ assert isinstance(processor, Processor), 'processor must subclass Processor'
self.processor = processor
self.children = []
@@ -47,11 +48,11 @@ class ProcessorNode(object):
class SourceNode(ProcessorNode):
- def __init__(self, name, key_deserializer, val_deserializer):
+ def __init__(self, name, key_deserializer, value_deserializer):
super(SourceNode, self).__init__(name)
self.key_deserializer = key_deserializer
- self.val_deserializer = val_deserializer
+ self.value_deserializer = value_deserializer
self.context = None
def deserialize_key(self, topic, data):
@@ -62,7 +63,7 @@ class SourceNode(ProcessorNode):
def deserialize_value(self, topic, data):
if self.value_deserializer is None:
return data
- return self.val_deserializer.deserialize(topic, data)
+ return self.value_deserializer.deserialize(topic, data)
def init(self, context):
self.context = context
@@ -70,8 +71,8 @@ class SourceNode(ProcessorNode):
# if deserializers are null, get the default ones from the context
if self.key_deserializer is None:
self.key_deserializer = self.context.key_deserializer
- if self.val_deserializer is None:
- self.val_deserializer = self.context.value_deserializer
+ if self.value_deserializer is None:
+ self.value_deserializer = self.context.value_deserializer
"""
// if value deserializers are for {@code Change} values, set the inner deserializer when necessary
@@ -90,12 +91,12 @@ class SourceNode(ProcessorNode):
class SinkNode(ProcessorNode):
- def __init__(self, name, topic, key_serializer, val_serializer, partitioner):
+ def __init__(self, name, topic, key_serializer, value_serializer, partitioner):
super(SinkNode, self).__init__(name)
self.topic = topic
self.key_serializer = key_serializer
- self.val_serializer = val_serializer
+ self.value_serializer = value_serializer
self.partitioner = partitioner
self.context = None
@@ -108,8 +109,8 @@ class SinkNode(ProcessorNode):
# if serializers are null, get the default ones from the context
if self.key_serializer is None:
self.key_serializer = self.context.key_serializer
- if self.val_serializer is None:
- self.val_serializer = self.context.value_serializer
+ if self.value_serializer is None:
+ self.value_serializer = self.context.value_serializer
"""
// if value serializers are for {@code Change} values, set the inner serializer when necessary
@@ -124,7 +125,7 @@ class SinkNode(ProcessorNode):
collector.send(self.topic, key=key, value=value,
timestamp_ms=self.context.timestamp(),
key_serializer=self.key_serializer,
- val_serializer=self.val_serializer,
+ value_serializer=self.value_serializer,
partitioner=self.partitioner)
def close(self):
diff --git a/kafka/streams/processor/partition_group.py b/kafka/streams/processor/partition_group.py
index 6d5ea3f..4e726a9 100644
--- a/kafka/streams/processor/partition_group.py
+++ b/kafka/streams/processor/partition_group.py
@@ -29,13 +29,13 @@ TaskId = collections.namedtuple('TaskId', 'topic_group_id partition_id')
class RecordInfo(object):
def __init__(self):
- self.queue = RecordQueue()
+ self.queue = None
def node(self):
- return self.queue.source()
+ return self.queue.source
def partition(self):
- return self.queue.partition()
+ return self.queue.partition
def queue(self):
return self.queue
@@ -57,13 +57,13 @@ class PartitionGroup(object):
Returns: (timestamp, ConsumerRecord)
"""
- record = None
+ timestamp = record = queue = None
if self._queues_by_time:
_, queue = heapq.heappop(self._queues_by_time)
# get the first record from this queue.
- record = queue.poll()
+ timestamp, record = queue.poll()
if queue:
heapq.heappush(self._queues_by_time, (queue.timestamp(), queue))
@@ -73,7 +73,7 @@ class PartitionGroup(object):
if record:
self._total_buffered -= 1
- return record
+ return timestamp, record
def add_raw_records(self, partition, raw_records):
"""Adds raw records to this partition group
@@ -124,7 +124,7 @@ class PartitionGroup(object):
def top_queue_size(self):
if not self._queues_by_time:
return 0
- return self._queues_by_time[0].size()
+ return self._queues_by_time[0][1].size() # XXX RecordQueue.__len__
def close(self):
self._queues_by_time = []
diff --git a/kafka/streams/processor/record_collector.py b/kafka/streams/processor/record_collector.py
index de59b10..332e923 100644
--- a/kafka/streams/processor/record_collector.py
+++ b/kafka/streams/processor/record_collector.py
@@ -42,11 +42,13 @@ class RecordCollector(object):
if key_serializer:
key_bytes = key_serializer(topic, key)
else:
+ assert key is None or isinstance(key, bytes), key
key_bytes = key
if value_serializer:
val_bytes = value_serializer(topic, value)
else:
+ assert value is None or isinstance(value, bytes), value
val_bytes = value
if partition is None and partitioner is not None:
diff --git a/kafka/streams/processor/record_queue.py b/kafka/streams/processor/record_queue.py
index ba22b76..debeaf5 100644
--- a/kafka/streams/processor/record_queue.py
+++ b/kafka/streams/processor/record_queue.py
@@ -84,7 +84,7 @@ class RecordQueue(object):
Arguments:
raw_records (list of ConsumerRecord): the raw records
- timestamp_extractor (TimestampExtractor)
+ timestamp_extractor (callable): given a record, return a timestamp
Returns: the size of this queue
"""
@@ -98,12 +98,12 @@ class RecordQueue(object):
raw_record.offset,
raw_record.timestamp,
0, # TimestampType.CREATE_TIME,
+ key, value,
raw_record.checksum,
raw_record.serialized_key_size,
- raw_record.serialized_value_size,
- key, value)
+ raw_record.serialized_value_size)
- timestamp = timestamp_extractor.extract(record)
+ timestamp = timestamp_extractor(record)
# validate that timestamp must be non-negative
if timestamp < 0:
@@ -125,12 +125,12 @@ class RecordQueue(object):
return self.size()
def poll(self):
- """Get the next StampedRecord from the queue
+ """Get the next Record from the queue
- Returns: StampedRecord
+ Returns: (timestamp, record)
"""
if not self.fifo_queue:
- return None
+ return None, None
elem = self.fifo_queue.popleft()
self.time_tracker.remove_element(elem)
diff --git a/kafka/streams/processor/stream_partition_assignor.py b/kafka/streams/processor/stream_partition_assignor.py
index dfe7e18..711a58d 100644
--- a/kafka/streams/processor/stream_partition_assignor.py
+++ b/kafka/streams/processor/stream_partition_assignor.py
@@ -36,12 +36,14 @@ log = logging.getLogger(__name__)
class AssignedPartition(object):
- def __init__(self, task_id, partition):
+ def __init__(self, task_id, tp):
self.task_id = task_id
- self.partition = partition
+ self.tp = tp
+ self.topic = tp.topic
+ self.partition = tp.partition
def __cmp__(self, that):
- return cmp(self.partition, that.partition)
+ return cmp(self.tp, that.tp)
class SubscriptionUpdates(object):
@@ -53,7 +55,7 @@ class SubscriptionUpdates(object):
self.updated_topic_subscriptions = set()
def update_topics(self, topic_names):
- self.updatedTopicSubscriptions.clear()
+ self.updated_topic_subscriptions.clear()
self.updated_topic_subscriptions.update(topic_names)
def get_updates(self):
@@ -192,7 +194,6 @@ class StreamPartitionAssignor(AbstractPartitionAssignor):
Returns:
{member_id: ConsumerProtocolMemberAssignment}
"""
- import pdb; pdb.set_trace()
consumers_by_client = {}
states = {}
subscription_updates = SubscriptionUpdates()
@@ -320,11 +321,12 @@ class StreamPartitionAssignor(AbstractPartitionAssignor):
task_ids.append(task_id)
num_consumers = len(consumers)
- standby = {}
i = 0
for consumer in consumers:
assigned_partitions = []
+ standby = {}
+ active = []
num_task_ids = len(task_ids)
j = i
@@ -342,7 +344,6 @@ class StreamPartitionAssignor(AbstractPartitionAssignor):
j += num_consumers
assigned_partitions.sort()
- active = []
active_partitions = collections.defaultdict(list)
for partition in assigned_partitions:
active.append(partition.task_id)
@@ -355,9 +356,6 @@ class StreamPartitionAssignor(AbstractPartitionAssignor):
data.encode())
i += 1
- active.clear()
- standby.clear()
-
# if ZK is specified, validate the internal topics again
self.prepare_topic(self.internal_source_topic_to_task_ids, False, True)
# change log topics should be compacted
@@ -367,7 +365,7 @@ class StreamPartitionAssignor(AbstractPartitionAssignor):
def on_assignment(self, assignment):
partitions = [TopicPartition(topic, partition)
- for topic, topic_partitions in assignment.partition_assignment
+ for topic, topic_partitions in assignment.assignment
for partition in topic_partitions]
partitions.sort()
@@ -375,13 +373,13 @@ class StreamPartitionAssignor(AbstractPartitionAssignor):
info = AssignmentInfo.decode(assignment.user_data)
self.standby_tasks = info.standby_tasks
- partition_to_task_ids = {}
+ new_partition_to_task_ids = {}
task_iter = iter(info.active_tasks)
for partition in partitions:
- task_ids = self.partition_to_task_ids.get(partition)
+ task_ids = new_partition_to_task_ids.get(partition)
if task_ids is None:
task_ids = set()
- self.partition_to_task_ids[partition] = task_ids
+ new_partition_to_task_ids[partition] = task_ids
try:
task_ids.add(next(task_iter))
@@ -390,7 +388,7 @@ class StreamPartitionAssignor(AbstractPartitionAssignor):
"failed to find a task id for the partition=%s"
", partitions=%d, assignment_info=%s"
% (partition, len(partitions), info))
- self.partition_to_task_ids = partition_to_task_ids
+ self.partition_to_task_ids = new_partition_to_task_ids
def ensure_copartitioning(self, copartition_groups, internal_topic_groups, metadata):
internal_topics = set()
diff --git a/kafka/streams/processor/stream_task.py b/kafka/streams/processor/stream_task.py
index 5c073d6..7732e74 100644
--- a/kafka/streams/processor/stream_task.py
+++ b/kafka/streams/processor/stream_task.py
@@ -74,7 +74,7 @@ class StreamTask(AbstractTask):
queue = self.create_record_queue(partition, source)
partition_queues[partition] = queue
- self.partition_group = PartitionGroup(partition_queues, self.config['timestamp_extractor_class'])
+ self.partition_group = PartitionGroup(partition_queues, self.config['timestamp_extractor'])
# initialize the consumed offset cache
self.consumed_offsets = {}
@@ -119,7 +119,7 @@ class StreamTask(AbstractTask):
"""
with self._process_lock:
# get the next record to process
- record = self.partition_group.next_record(self._record_info)
+ timestamp, record = self.partition_group.next_record(self._record_info)
# if there is no record to process, return immediately
if record is None:
@@ -206,7 +206,7 @@ class StreamTask(AbstractTask):
for partition, offset in self.consumed_offsets.items():
consumed_offsets_and_metadata[partition] = OffsetAndMetadata(offset + 1)
self.state_mgr.put_offset_limit(partition, offset + 1)
- self.consumer.commit_sync(consumed_offsets_and_metadata)
+ self.consumer.commit(consumed_offsets_and_metadata)
self._commit_offset_needed = False
self._commit_requested = False
diff --git a/kafka/streams/processor/stream_thread.py b/kafka/streams/processor/stream_thread.py
index 021fb0a..dd4f2ae 100644
--- a/kafka/streams/processor/stream_thread.py
+++ b/kafka/streams/processor/stream_thread.py
@@ -39,7 +39,7 @@ log = logging.getLogger(__name__)
STREAM_THREAD_ID_SEQUENCE = AtomicInteger(0)
-class StreamThread(object):#Process):
+class StreamThread(Process):
DEFAULT_CONFIG = {
'application_id': None, # required
'bootstrap_servers': None, # required
@@ -60,12 +60,24 @@ class StreamThread(object):#Process):
'auto_offset_reset': 'earliest',
'enable_auto_commit': False,
'max_poll_records': 1000,
+ 'replication_factor': 1,
+ 'num_standby_replicas': 0,
+ 'buffered_records_per_partition': 1000,
+ 'zookeeper_connect': None,
+ 'timestamp_extractor': lambda x: x.timestamp,
+ }
+ PRODUCER_OVERRIDES = {
+ 'linger_ms': 100
+ }
+ CONSUMER_OVERRIDES = {
+ 'max_poll_records': 1000,
+ 'auto_offset_reset': 'earliest',
+ 'enable_auto_commit': False,
}
def __init__(self, builder, **configs):
stream_id = STREAM_THREAD_ID_SEQUENCE.increment()
- #super(StreamThread, self).__init__(name='StreamThread-' + str(stream_id))
- self.name = 'StreamThread-' + str(stream_id)
+ super(StreamThread, self).__init__(name='StreamThread-' + str(stream_id))
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -79,7 +91,6 @@ class StreamThread(object):#Process):
log.warning('Unrecognized configs: %s', configs.keys())
self.builder = builder
- self._partition_grouper = self.config.get('partition_grouper', partition_grouper)
self.source_topics = builder.source_topics()
self.topic_pattern = builder.source_topic_pattern()
self.partition_assignor = None
@@ -102,21 +113,67 @@ class StreamThread(object):#Process):
def client_id(self):
return self.config['client_id']
+ @property
+ def partition_grouper(self):
+ return self.config['partition_grouper']
+
+ def consumer_configs(self, restore=False):
+ consumer = {}
+ for key in KafkaConsumer.DEFAULT_CONFIG:
+ if key in self.config:
+ consumer[key] = self.config[key]
+ if key in self.CONSUMER_OVERRIDES:
+ if (key in consumer
+ and consumer[key] != self.CONSUMER_OVERRIDES[key]):
+ log.warning('Overriding KafkaConsumer configs: %s=%s',
+ key, self.CONSUMER_OVERRIDES[key])
+ consumer[key] = self.CONSUMER_OVERRIDES[key]
+
+ assert not consumer['enable_auto_commit'], (
+ 'Unexpected user-specified consumer config enable_auto_commit,'
+ ' as the streams client will always turn off auto committing.')
+
+ if restore:
+ if 'group_id' in consumer:
+ consumer.pop('group_id')
+ restore_id = self.config['thread_client_id'] + '-restore-consumer'
+ consumer['client_id'] = restore_id
+ return consumer
+
+ consumer['group_id'] = self.config['application_id']
+ consumer['client_id'] = self.config['thread_client_id'] + '-consumer'
+ consumer['stream_thread_instance'] = self
+ consumer['replication_factor'] = self.config['replication_factor']
+ consumer['num_standby_replicas'] = self.config['num_standby_replicas']
+ consumer['zookeeper_connect'] = self.config['zookeeper_connect']
+ consumer['partition_assignment_strategy'] = [
+ StreamPartitionAssignor(**consumer)
+ ]
+ return consumer
+
+ def producer_configs(self):
+ producer = {}
+ for key in KafkaProducer.DEFAULT_CONFIG:
+ if key in self.config:
+ producer[key] = self.config[key]
+ if key in self.PRODUCER_OVERRIDES:
+ if (key in producer
+ and producer[key] != self.PRODUCER_OVERRIDES[key]):
+ log.warning('Overriding KafkaProducer configs: %s=%s',
+ key, self.PRODUCER_OVERRIDES[key])
+ producer[key] = self.PRODUCER_OVERRIDES[key]
+ producer['client_id'] = self.config['thread_client_id'] + '-producer'
+ return producer
+
def initialize(self):
assert not self._running
log.info('Creating producer client for stream thread [%s]', self.name)
#client_supplier = self.config['client_supplier']
- self.producer = KafkaProducer(**self.config)
+ self.producer = KafkaProducer(**self.producer_configs())
log.info('Creating consumer client for stream thread [%s]', self.name)
- assignor = StreamPartitionAssignor(
- stream_thread_instance=self, **self.config)
- self.consumer = KafkaConsumer(
- partition_assignment_strategy=[assignor], **self.config)
+ self.consumer = KafkaConsumer(**self.consumer_configs())
log.info('Creating restore consumer client for stream thread [%s]', self.name)
- restore_assignor = StreamPartitionAssignor(
- stream_thread_instance=self, **self.config)
- self.restore_consumer = KafkaConsumer(
- partition_assignment_strategy=[restore_assignor], **self.config)
+ self.restore_consumer = KafkaConsumer(**self.consumer_configs(restore=True))
# initialize the task list
self._active_tasks = {}
@@ -255,7 +312,7 @@ class StreamThread(object):#Process):
start_process = time.time()
total_num_buffered += task.process()
- requires_poll = requires_poll or task.requires_poll()
+ requires_poll = requires_poll or task.requires_poll
latency_ms = (time.time() - start_process) * 1000
#self._sensors.process_time_sensor.record(latency_ms)
@@ -457,8 +514,7 @@ class StreamThread(object):#Process):
topology = self.builder.build(self.config['application_id'],
task_id.topic_group_id)
- return StreamTask(task_id, self.config['application_id'],
- partitions, topology,
+ return StreamTask(task_id, partitions, topology,
self.consumer, self.producer, self.restore_consumer,
**self.config) # self._sensors
@@ -535,7 +591,7 @@ class StreamThread(object):#Process):
checkpointed_offsets = {}
# create the standby tasks
- for task_id, partitions in self.partition_assignor.standby_tasks().items():
+ for task_id, partitions in self.partition_assignor.standby_tasks.items():
task = self._create_standby_task(task_id, partitions)
if task:
self._standby_tasks[task_id] = task
diff --git a/kafka/streams/processor/task.py b/kafka/streams/processor/task.py
index 67a276b..4ed1c30 100644
--- a/kafka/streams/processor/task.py
+++ b/kafka/streams/processor/task.py
@@ -43,11 +43,12 @@ class AbstractTask(object):
consumer, restore_consumer, is_standby, **config):
"""Raises ProcessorStateError if the state manager cannot be created"""
self.id = task_id
- self.application_id = self.config['application_id']
+ self.application_id = config['application_id']
self.partitions = set(partitions)
self.topology = topology
self.consumer = consumer
self.processor_context = None
+ self.state_mgr = None
# create the processor state manager
"""
@@ -78,7 +79,8 @@ class AbstractTask(object):
Raises ProcessorStateError if there is an error while closing the state manager
"""
try:
- self.state_mgr.close(self.record_collector_offsets())
+ if self.state_mgr is not None:
+ self.state_mgr.close(self.record_collector_offsets())
except Exception as e:
raise ProcessorStateError('Error while closing the state manager', e)
@@ -126,17 +128,17 @@ class StreamTask(AbstractTask):
partition_queues = {}
for partition in partitions:
- source = self.topology.source(partition.topic())
+ source = self.topology.source(partition.topic)
queue = self._create_record_queue(partition, source)
partition_queues[partition] = queue
- self.partition_group = PartitionGroup(partition_queues, self.config['timestamp_extractor_class'])
+ self.partition_group = PartitionGroup(partition_queues, config['timestamp_extractor'])
# initialize the consumed offset cache
self.consumed_offsets = {}
# create the RecordCollector that maintains the produced offsets
- self.record_collector = RecordCollector(self.producer)
+ self.record_collector = RecordCollector(producer)
log.info('Creating restoration consumer client for stream task #%s', self.id)
@@ -144,7 +146,7 @@ class StreamTask(AbstractTask):
self.processor_context = ProcessorContext(self.id, self, self.record_collector, self.state_mgr, **config)
# initialize the state stores
- self.initialize_state_stores()
+ #self.initialize_state_stores()
# initialize the task by initializing all its processor nodes in the topology
for node in self.topology.processors():
@@ -175,7 +177,7 @@ class StreamTask(AbstractTask):
"""
with self._process_lock:
# get the next record to process
- record = self.partition_group.next_record(self._record_info)
+ timestamp, record = self.partition_group.next_record(self._record_info)
# if there is no record to process, return immediately
if record is None:
@@ -203,7 +205,7 @@ class StreamTask(AbstractTask):
# after processing this record, if its partition queue's
# buffered size has been decreased to the threshold, we can then
# resume the consumption on this partition
- if self._record_info.queue().size() == self.max_buffered_size:
+ if self._record_info.queue.size() == self.max_buffered_size:
self.consumer.resume(partition)
self.requires_poll = True
@@ -251,7 +253,8 @@ class StreamTask(AbstractTask):
def commit(self):
"""Commit the current task state"""
# 1) flush local state
- self.state_mgr.flush()
+ if self.state_mgr is not None:
+ self.state_mgr.flush()
# 2) flush produced records in the downstream and change logs of local states
self.record_collector.flush()
@@ -260,9 +263,10 @@ class StreamTask(AbstractTask):
if self._commit_offset_needed:
consumed_offsets_and_metadata = {}
for partition, offset in self.consumed_offsets.items():
- consumed_offsets_and_metadata[partition] = OffsetAndMetadata(offset + 1)
- self.state_mgr.put_offset_limit(partition, offset + 1)
- self.consumer.commit_sync(consumed_offsets_and_metadata)
+ consumed_offsets_and_metadata[partition] = OffsetAndMetadata(offset + 1, b'')
+ if self.state_mgr is not None:
+ self.state_mgr.put_offset_limit(partition, offset + 1)
+ self.consumer.commit(consumed_offsets_and_metadata)
self._commit_offset_needed = False
self._commit_requested = False
@@ -319,7 +323,7 @@ class StreamTask(AbstractTask):
def forward(self, key, value, child_index=None, child_name=None):
this_node = self._curr_node
try:
- children = this_node.children()
+ children = this_node.children
if child_index is not None:
children = [children[child_index]]
diff --git a/kafka/streams/processor/topology_builder.py b/kafka/streams/processor/topology_builder.py
index 223cda9..3a2679a 100644
--- a/kafka/streams/processor/topology_builder.py
+++ b/kafka/streams/processor/topology_builder.py
@@ -108,7 +108,7 @@ class SinkNodeFactory(NodeFactory):
self.partitioner = partitioner
def build(self, application_id):
- if self.topic in self.builder.internal_topics:
+ if self.topic in self.builder.internal_topic_names:
sink_name = application_id + '-' + self.topic
else:
sink_name = self.topic