diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-31 15:22:42 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-09-24 14:01:05 -0700 |
commit | c72b69ba58eb88ce90ddb3db6e3ff2ff7cb2ab64 (patch) | |
tree | 6958da7aa675201beb1be6c878fc8ff7f154c02d | |
parent | 26fe1f2e296aa78e0fe79c01f0b974dfc8741246 (diff) | |
download | kafka-python-c72b69ba58eb88ce90ddb3db6e3ff2ff7cb2ab64.tar.gz |
Fixes to kafka.streams -- simple source -> sink topology works
-rw-r--r-- | kafka/streams/kafka.py | 18 | ||||
-rw-r--r-- | kafka/streams/processor/assignment/subscription_info.py | 7 | ||||
-rw-r--r-- | kafka/streams/processor/assignment/task_assignor.py | 2 | ||||
-rw-r--r-- | kafka/streams/processor/node.py | 23 | ||||
-rw-r--r-- | kafka/streams/processor/partition_group.py | 14 | ||||
-rw-r--r-- | kafka/streams/processor/record_collector.py | 2 | ||||
-rw-r--r-- | kafka/streams/processor/record_queue.py | 14 | ||||
-rw-r--r-- | kafka/streams/processor/stream_partition_assignor.py | 28 | ||||
-rw-r--r-- | kafka/streams/processor/stream_task.py | 6 | ||||
-rw-r--r-- | kafka/streams/processor/stream_thread.py | 90 | ||||
-rw-r--r-- | kafka/streams/processor/task.py | 30 | ||||
-rw-r--r-- | kafka/streams/processor/topology_builder.py | 2 |
12 files changed, 157 insertions, 79 deletions
diff --git a/kafka/streams/kafka.py b/kafka/streams/kafka.py index bc7dcec..9eb71e3 100644 --- a/kafka/streams/kafka.py +++ b/kafka/streams/kafka.py @@ -23,6 +23,7 @@ import uuid import kafka.streams.errors as Errors from .processor.stream_thread import StreamThread +from .processor.partition_group import partition_grouper from .utils import AtomicInteger log = logging.getLogger(__name__) @@ -82,7 +83,22 @@ class KafkaStreams(object): DEFAULT_CONFIG = { 'application_id': None, 'bootstrap_servers': None, + 'client_id': 'kafka-python-streams', + 'zookeeper_connect': '', + 'state_dir': '/tmp/kafka-streams', + 'replication_factor': 1, + 'timestamp_extractor': lambda x: x.timestamp, + 'partition_grouper': partition_grouper, + 'key_serializer': None, + 'key_deserializer': None, + 'value_serializer': None, + 'value_deserializer': None, + 'commit_interval_ms': 30000, + 'poll_ms': 100, 'num_stream_threads': 1, + 'num_standby_replicas': 0, + 'buffered_records_per_partition': 1000, + 'state_cleanup_delay_ms': 60000, } def __init__(self, builder, **configs): @@ -173,7 +189,7 @@ class KafkaStreams(object): self._state = STOPPED log.info('Stopped Kafka Stream process') - def setUncaughtExceptionHandler(self, handler): + def set_uncaught_exception_handler(self, handler): """Sets the handler invoked when a stream thread abruptly terminates due to an uncaught exception. diff --git a/kafka/streams/processor/assignment/subscription_info.py b/kafka/streams/processor/assignment/subscription_info.py index c7fce14..3452f43 100644 --- a/kafka/streams/processor/assignment/subscription_info.py +++ b/kafka/streams/processor/assignment/subscription_info.py @@ -20,6 +20,7 @@ import json import logging from kafka.streams.errors import TaskAssignmentError +from kafka.streams.processor.partition_group import TaskId log = logging.getLogger(__name__) @@ -53,9 +54,9 @@ class SubscriptionInfo(object): if decoded['version'] != cls.CURRENT_VERSION: raise TaskAssignmentError('unable to decode subscription data: version=' + str(cls.version)) - decoded['prev_tasks'] = set(decoded['prev_tasks']) - decoded['standby_tasks'] = set(decoded['standby_tasks']) + decoded['prev_tasks'] = set([TaskId(*item) for item in decoded['prev_tasks']]) + decoded['standby_tasks'] = set([TaskId(*item) for item in decoded['standby_tasks']]) return cls(decoded['process_id'], decoded['prev_tasks'], decoded['standby_tasks'], decoded['version']) except Exception as e: - raise TaskAssignmentError('unable to decode subscription data', e) + raise TaskAssignmentError('unable to decode subscription data', data, e) diff --git a/kafka/streams/processor/assignment/task_assignor.py b/kafka/streams/processor/assignment/task_assignor.py index 8cb6c4e..a295faf 100644 --- a/kafka/streams/processor/assignment/task_assignor.py +++ b/kafka/streams/processor/assignment/task_assignor.py @@ -135,7 +135,7 @@ class TaskAssignor(object): return False def compute_addition_cost(self, task, state): - cost = len(state.assignedTasks) // state.capacity + cost = len(state.assigned_tasks) // state.capacity if task in state.prev_assigned_tasks: if task in state.prev_active_tasks: diff --git a/kafka/streams/processor/node.py b/kafka/streams/processor/node.py index 90b0a6a..c672066 100644 --- a/kafka/streams/processor/node.py +++ b/kafka/streams/processor/node.py @@ -26,7 +26,8 @@ class ProcessorNode(object): self.name = name # Could we construct a Processor here if the processor is just a function? - assert isinstance(processor, Processor), 'processor must subclass Processor' + if processor is not None: + assert isinstance(processor, Processor), 'processor must subclass Processor' self.processor = processor self.children = [] @@ -47,11 +48,11 @@ class ProcessorNode(object): class SourceNode(ProcessorNode): - def __init__(self, name, key_deserializer, val_deserializer): + def __init__(self, name, key_deserializer, value_deserializer): super(SourceNode, self).__init__(name) self.key_deserializer = key_deserializer - self.val_deserializer = val_deserializer + self.value_deserializer = value_deserializer self.context = None def deserialize_key(self, topic, data): @@ -62,7 +63,7 @@ class SourceNode(ProcessorNode): def deserialize_value(self, topic, data): if self.value_deserializer is None: return data - return self.val_deserializer.deserialize(topic, data) + return self.value_deserializer.deserialize(topic, data) def init(self, context): self.context = context @@ -70,8 +71,8 @@ class SourceNode(ProcessorNode): # if deserializers are null, get the default ones from the context if self.key_deserializer is None: self.key_deserializer = self.context.key_deserializer - if self.val_deserializer is None: - self.val_deserializer = self.context.value_deserializer + if self.value_deserializer is None: + self.value_deserializer = self.context.value_deserializer """ // if value deserializers are for {@code Change} values, set the inner deserializer when necessary @@ -90,12 +91,12 @@ class SourceNode(ProcessorNode): class SinkNode(ProcessorNode): - def __init__(self, name, topic, key_serializer, val_serializer, partitioner): + def __init__(self, name, topic, key_serializer, value_serializer, partitioner): super(SinkNode, self).__init__(name) self.topic = topic self.key_serializer = key_serializer - self.val_serializer = val_serializer + self.value_serializer = value_serializer self.partitioner = partitioner self.context = None @@ -108,8 +109,8 @@ class SinkNode(ProcessorNode): # if serializers are null, get the default ones from the context if self.key_serializer is None: self.key_serializer = self.context.key_serializer - if self.val_serializer is None: - self.val_serializer = self.context.value_serializer + if self.value_serializer is None: + self.value_serializer = self.context.value_serializer """ // if value serializers are for {@code Change} values, set the inner serializer when necessary @@ -124,7 +125,7 @@ class SinkNode(ProcessorNode): collector.send(self.topic, key=key, value=value, timestamp_ms=self.context.timestamp(), key_serializer=self.key_serializer, - val_serializer=self.val_serializer, + value_serializer=self.value_serializer, partitioner=self.partitioner) def close(self): diff --git a/kafka/streams/processor/partition_group.py b/kafka/streams/processor/partition_group.py index 6d5ea3f..4e726a9 100644 --- a/kafka/streams/processor/partition_group.py +++ b/kafka/streams/processor/partition_group.py @@ -29,13 +29,13 @@ TaskId = collections.namedtuple('TaskId', 'topic_group_id partition_id') class RecordInfo(object): def __init__(self): - self.queue = RecordQueue() + self.queue = None def node(self): - return self.queue.source() + return self.queue.source def partition(self): - return self.queue.partition() + return self.queue.partition def queue(self): return self.queue @@ -57,13 +57,13 @@ class PartitionGroup(object): Returns: (timestamp, ConsumerRecord) """ - record = None + timestamp = record = queue = None if self._queues_by_time: _, queue = heapq.heappop(self._queues_by_time) # get the first record from this queue. - record = queue.poll() + timestamp, record = queue.poll() if queue: heapq.heappush(self._queues_by_time, (queue.timestamp(), queue)) @@ -73,7 +73,7 @@ class PartitionGroup(object): if record: self._total_buffered -= 1 - return record + return timestamp, record def add_raw_records(self, partition, raw_records): """Adds raw records to this partition group @@ -124,7 +124,7 @@ class PartitionGroup(object): def top_queue_size(self): if not self._queues_by_time: return 0 - return self._queues_by_time[0].size() + return self._queues_by_time[0][1].size() # XXX RecordQueue.__len__ def close(self): self._queues_by_time = [] diff --git a/kafka/streams/processor/record_collector.py b/kafka/streams/processor/record_collector.py index de59b10..332e923 100644 --- a/kafka/streams/processor/record_collector.py +++ b/kafka/streams/processor/record_collector.py @@ -42,11 +42,13 @@ class RecordCollector(object): if key_serializer: key_bytes = key_serializer(topic, key) else: + assert key is None or isinstance(key, bytes), key key_bytes = key if value_serializer: val_bytes = value_serializer(topic, value) else: + assert value is None or isinstance(value, bytes), value val_bytes = value if partition is None and partitioner is not None: diff --git a/kafka/streams/processor/record_queue.py b/kafka/streams/processor/record_queue.py index ba22b76..debeaf5 100644 --- a/kafka/streams/processor/record_queue.py +++ b/kafka/streams/processor/record_queue.py @@ -84,7 +84,7 @@ class RecordQueue(object): Arguments: raw_records (list of ConsumerRecord): the raw records - timestamp_extractor (TimestampExtractor) + timestamp_extractor (callable): given a record, return a timestamp Returns: the size of this queue """ @@ -98,12 +98,12 @@ class RecordQueue(object): raw_record.offset, raw_record.timestamp, 0, # TimestampType.CREATE_TIME, + key, value, raw_record.checksum, raw_record.serialized_key_size, - raw_record.serialized_value_size, - key, value) + raw_record.serialized_value_size) - timestamp = timestamp_extractor.extract(record) + timestamp = timestamp_extractor(record) # validate that timestamp must be non-negative if timestamp < 0: @@ -125,12 +125,12 @@ class RecordQueue(object): return self.size() def poll(self): - """Get the next StampedRecord from the queue + """Get the next Record from the queue - Returns: StampedRecord + Returns: (timestamp, record) """ if not self.fifo_queue: - return None + return None, None elem = self.fifo_queue.popleft() self.time_tracker.remove_element(elem) diff --git a/kafka/streams/processor/stream_partition_assignor.py b/kafka/streams/processor/stream_partition_assignor.py index dfe7e18..711a58d 100644 --- a/kafka/streams/processor/stream_partition_assignor.py +++ b/kafka/streams/processor/stream_partition_assignor.py @@ -36,12 +36,14 @@ log = logging.getLogger(__name__) class AssignedPartition(object): - def __init__(self, task_id, partition): + def __init__(self, task_id, tp): self.task_id = task_id - self.partition = partition + self.tp = tp + self.topic = tp.topic + self.partition = tp.partition def __cmp__(self, that): - return cmp(self.partition, that.partition) + return cmp(self.tp, that.tp) class SubscriptionUpdates(object): @@ -53,7 +55,7 @@ class SubscriptionUpdates(object): self.updated_topic_subscriptions = set() def update_topics(self, topic_names): - self.updatedTopicSubscriptions.clear() + self.updated_topic_subscriptions.clear() self.updated_topic_subscriptions.update(topic_names) def get_updates(self): @@ -192,7 +194,6 @@ class StreamPartitionAssignor(AbstractPartitionAssignor): Returns: {member_id: ConsumerProtocolMemberAssignment} """ - import pdb; pdb.set_trace() consumers_by_client = {} states = {} subscription_updates = SubscriptionUpdates() @@ -320,11 +321,12 @@ class StreamPartitionAssignor(AbstractPartitionAssignor): task_ids.append(task_id) num_consumers = len(consumers) - standby = {} i = 0 for consumer in consumers: assigned_partitions = [] + standby = {} + active = [] num_task_ids = len(task_ids) j = i @@ -342,7 +344,6 @@ class StreamPartitionAssignor(AbstractPartitionAssignor): j += num_consumers assigned_partitions.sort() - active = [] active_partitions = collections.defaultdict(list) for partition in assigned_partitions: active.append(partition.task_id) @@ -355,9 +356,6 @@ class StreamPartitionAssignor(AbstractPartitionAssignor): data.encode()) i += 1 - active.clear() - standby.clear() - # if ZK is specified, validate the internal topics again self.prepare_topic(self.internal_source_topic_to_task_ids, False, True) # change log topics should be compacted @@ -367,7 +365,7 @@ class StreamPartitionAssignor(AbstractPartitionAssignor): def on_assignment(self, assignment): partitions = [TopicPartition(topic, partition) - for topic, topic_partitions in assignment.partition_assignment + for topic, topic_partitions in assignment.assignment for partition in topic_partitions] partitions.sort() @@ -375,13 +373,13 @@ class StreamPartitionAssignor(AbstractPartitionAssignor): info = AssignmentInfo.decode(assignment.user_data) self.standby_tasks = info.standby_tasks - partition_to_task_ids = {} + new_partition_to_task_ids = {} task_iter = iter(info.active_tasks) for partition in partitions: - task_ids = self.partition_to_task_ids.get(partition) + task_ids = new_partition_to_task_ids.get(partition) if task_ids is None: task_ids = set() - self.partition_to_task_ids[partition] = task_ids + new_partition_to_task_ids[partition] = task_ids try: task_ids.add(next(task_iter)) @@ -390,7 +388,7 @@ class StreamPartitionAssignor(AbstractPartitionAssignor): "failed to find a task id for the partition=%s" ", partitions=%d, assignment_info=%s" % (partition, len(partitions), info)) - self.partition_to_task_ids = partition_to_task_ids + self.partition_to_task_ids = new_partition_to_task_ids def ensure_copartitioning(self, copartition_groups, internal_topic_groups, metadata): internal_topics = set() diff --git a/kafka/streams/processor/stream_task.py b/kafka/streams/processor/stream_task.py index 5c073d6..7732e74 100644 --- a/kafka/streams/processor/stream_task.py +++ b/kafka/streams/processor/stream_task.py @@ -74,7 +74,7 @@ class StreamTask(AbstractTask): queue = self.create_record_queue(partition, source) partition_queues[partition] = queue - self.partition_group = PartitionGroup(partition_queues, self.config['timestamp_extractor_class']) + self.partition_group = PartitionGroup(partition_queues, self.config['timestamp_extractor']) # initialize the consumed offset cache self.consumed_offsets = {} @@ -119,7 +119,7 @@ class StreamTask(AbstractTask): """ with self._process_lock: # get the next record to process - record = self.partition_group.next_record(self._record_info) + timestamp, record = self.partition_group.next_record(self._record_info) # if there is no record to process, return immediately if record is None: @@ -206,7 +206,7 @@ class StreamTask(AbstractTask): for partition, offset in self.consumed_offsets.items(): consumed_offsets_and_metadata[partition] = OffsetAndMetadata(offset + 1) self.state_mgr.put_offset_limit(partition, offset + 1) - self.consumer.commit_sync(consumed_offsets_and_metadata) + self.consumer.commit(consumed_offsets_and_metadata) self._commit_offset_needed = False self._commit_requested = False diff --git a/kafka/streams/processor/stream_thread.py b/kafka/streams/processor/stream_thread.py index 021fb0a..dd4f2ae 100644 --- a/kafka/streams/processor/stream_thread.py +++ b/kafka/streams/processor/stream_thread.py @@ -39,7 +39,7 @@ log = logging.getLogger(__name__) STREAM_THREAD_ID_SEQUENCE = AtomicInteger(0) -class StreamThread(object):#Process): +class StreamThread(Process): DEFAULT_CONFIG = { 'application_id': None, # required 'bootstrap_servers': None, # required @@ -60,12 +60,24 @@ class StreamThread(object):#Process): 'auto_offset_reset': 'earliest', 'enable_auto_commit': False, 'max_poll_records': 1000, + 'replication_factor': 1, + 'num_standby_replicas': 0, + 'buffered_records_per_partition': 1000, + 'zookeeper_connect': None, + 'timestamp_extractor': lambda x: x.timestamp, + } + PRODUCER_OVERRIDES = { + 'linger_ms': 100 + } + CONSUMER_OVERRIDES = { + 'max_poll_records': 1000, + 'auto_offset_reset': 'earliest', + 'enable_auto_commit': False, } def __init__(self, builder, **configs): stream_id = STREAM_THREAD_ID_SEQUENCE.increment() - #super(StreamThread, self).__init__(name='StreamThread-' + str(stream_id)) - self.name = 'StreamThread-' + str(stream_id) + super(StreamThread, self).__init__(name='StreamThread-' + str(stream_id)) self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -79,7 +91,6 @@ class StreamThread(object):#Process): log.warning('Unrecognized configs: %s', configs.keys()) self.builder = builder - self._partition_grouper = self.config.get('partition_grouper', partition_grouper) self.source_topics = builder.source_topics() self.topic_pattern = builder.source_topic_pattern() self.partition_assignor = None @@ -102,21 +113,67 @@ class StreamThread(object):#Process): def client_id(self): return self.config['client_id'] + @property + def partition_grouper(self): + return self.config['partition_grouper'] + + def consumer_configs(self, restore=False): + consumer = {} + for key in KafkaConsumer.DEFAULT_CONFIG: + if key in self.config: + consumer[key] = self.config[key] + if key in self.CONSUMER_OVERRIDES: + if (key in consumer + and consumer[key] != self.CONSUMER_OVERRIDES[key]): + log.warning('Overriding KafkaConsumer configs: %s=%s', + key, self.CONSUMER_OVERRIDES[key]) + consumer[key] = self.CONSUMER_OVERRIDES[key] + + assert not consumer['enable_auto_commit'], ( + 'Unexpected user-specified consumer config enable_auto_commit,' + ' as the streams client will always turn off auto committing.') + + if restore: + if 'group_id' in consumer: + consumer.pop('group_id') + restore_id = self.config['thread_client_id'] + '-restore-consumer' + consumer['client_id'] = restore_id + return consumer + + consumer['group_id'] = self.config['application_id'] + consumer['client_id'] = self.config['thread_client_id'] + '-consumer' + consumer['stream_thread_instance'] = self + consumer['replication_factor'] = self.config['replication_factor'] + consumer['num_standby_replicas'] = self.config['num_standby_replicas'] + consumer['zookeeper_connect'] = self.config['zookeeper_connect'] + consumer['partition_assignment_strategy'] = [ + StreamPartitionAssignor(**consumer) + ] + return consumer + + def producer_configs(self): + producer = {} + for key in KafkaProducer.DEFAULT_CONFIG: + if key in self.config: + producer[key] = self.config[key] + if key in self.PRODUCER_OVERRIDES: + if (key in producer + and producer[key] != self.PRODUCER_OVERRIDES[key]): + log.warning('Overriding KafkaProducer configs: %s=%s', + key, self.PRODUCER_OVERRIDES[key]) + producer[key] = self.PRODUCER_OVERRIDES[key] + producer['client_id'] = self.config['thread_client_id'] + '-producer' + return producer + def initialize(self): assert not self._running log.info('Creating producer client for stream thread [%s]', self.name) #client_supplier = self.config['client_supplier'] - self.producer = KafkaProducer(**self.config) + self.producer = KafkaProducer(**self.producer_configs()) log.info('Creating consumer client for stream thread [%s]', self.name) - assignor = StreamPartitionAssignor( - stream_thread_instance=self, **self.config) - self.consumer = KafkaConsumer( - partition_assignment_strategy=[assignor], **self.config) + self.consumer = KafkaConsumer(**self.consumer_configs()) log.info('Creating restore consumer client for stream thread [%s]', self.name) - restore_assignor = StreamPartitionAssignor( - stream_thread_instance=self, **self.config) - self.restore_consumer = KafkaConsumer( - partition_assignment_strategy=[restore_assignor], **self.config) + self.restore_consumer = KafkaConsumer(**self.consumer_configs(restore=True)) # initialize the task list self._active_tasks = {} @@ -255,7 +312,7 @@ class StreamThread(object):#Process): start_process = time.time() total_num_buffered += task.process() - requires_poll = requires_poll or task.requires_poll() + requires_poll = requires_poll or task.requires_poll latency_ms = (time.time() - start_process) * 1000 #self._sensors.process_time_sensor.record(latency_ms) @@ -457,8 +514,7 @@ class StreamThread(object):#Process): topology = self.builder.build(self.config['application_id'], task_id.topic_group_id) - return StreamTask(task_id, self.config['application_id'], - partitions, topology, + return StreamTask(task_id, partitions, topology, self.consumer, self.producer, self.restore_consumer, **self.config) # self._sensors @@ -535,7 +591,7 @@ class StreamThread(object):#Process): checkpointed_offsets = {} # create the standby tasks - for task_id, partitions in self.partition_assignor.standby_tasks().items(): + for task_id, partitions in self.partition_assignor.standby_tasks.items(): task = self._create_standby_task(task_id, partitions) if task: self._standby_tasks[task_id] = task diff --git a/kafka/streams/processor/task.py b/kafka/streams/processor/task.py index 67a276b..4ed1c30 100644 --- a/kafka/streams/processor/task.py +++ b/kafka/streams/processor/task.py @@ -43,11 +43,12 @@ class AbstractTask(object): consumer, restore_consumer, is_standby, **config): """Raises ProcessorStateError if the state manager cannot be created""" self.id = task_id - self.application_id = self.config['application_id'] + self.application_id = config['application_id'] self.partitions = set(partitions) self.topology = topology self.consumer = consumer self.processor_context = None + self.state_mgr = None # create the processor state manager """ @@ -78,7 +79,8 @@ class AbstractTask(object): Raises ProcessorStateError if there is an error while closing the state manager """ try: - self.state_mgr.close(self.record_collector_offsets()) + if self.state_mgr is not None: + self.state_mgr.close(self.record_collector_offsets()) except Exception as e: raise ProcessorStateError('Error while closing the state manager', e) @@ -126,17 +128,17 @@ class StreamTask(AbstractTask): partition_queues = {} for partition in partitions: - source = self.topology.source(partition.topic()) + source = self.topology.source(partition.topic) queue = self._create_record_queue(partition, source) partition_queues[partition] = queue - self.partition_group = PartitionGroup(partition_queues, self.config['timestamp_extractor_class']) + self.partition_group = PartitionGroup(partition_queues, config['timestamp_extractor']) # initialize the consumed offset cache self.consumed_offsets = {} # create the RecordCollector that maintains the produced offsets - self.record_collector = RecordCollector(self.producer) + self.record_collector = RecordCollector(producer) log.info('Creating restoration consumer client for stream task #%s', self.id) @@ -144,7 +146,7 @@ class StreamTask(AbstractTask): self.processor_context = ProcessorContext(self.id, self, self.record_collector, self.state_mgr, **config) # initialize the state stores - self.initialize_state_stores() + #self.initialize_state_stores() # initialize the task by initializing all its processor nodes in the topology for node in self.topology.processors(): @@ -175,7 +177,7 @@ class StreamTask(AbstractTask): """ with self._process_lock: # get the next record to process - record = self.partition_group.next_record(self._record_info) + timestamp, record = self.partition_group.next_record(self._record_info) # if there is no record to process, return immediately if record is None: @@ -203,7 +205,7 @@ class StreamTask(AbstractTask): # after processing this record, if its partition queue's # buffered size has been decreased to the threshold, we can then # resume the consumption on this partition - if self._record_info.queue().size() == self.max_buffered_size: + if self._record_info.queue.size() == self.max_buffered_size: self.consumer.resume(partition) self.requires_poll = True @@ -251,7 +253,8 @@ class StreamTask(AbstractTask): def commit(self): """Commit the current task state""" # 1) flush local state - self.state_mgr.flush() + if self.state_mgr is not None: + self.state_mgr.flush() # 2) flush produced records in the downstream and change logs of local states self.record_collector.flush() @@ -260,9 +263,10 @@ class StreamTask(AbstractTask): if self._commit_offset_needed: consumed_offsets_and_metadata = {} for partition, offset in self.consumed_offsets.items(): - consumed_offsets_and_metadata[partition] = OffsetAndMetadata(offset + 1) - self.state_mgr.put_offset_limit(partition, offset + 1) - self.consumer.commit_sync(consumed_offsets_and_metadata) + consumed_offsets_and_metadata[partition] = OffsetAndMetadata(offset + 1, b'') + if self.state_mgr is not None: + self.state_mgr.put_offset_limit(partition, offset + 1) + self.consumer.commit(consumed_offsets_and_metadata) self._commit_offset_needed = False self._commit_requested = False @@ -319,7 +323,7 @@ class StreamTask(AbstractTask): def forward(self, key, value, child_index=None, child_name=None): this_node = self._curr_node try: - children = this_node.children() + children = this_node.children if child_index is not None: children = [children[child_index]] diff --git a/kafka/streams/processor/topology_builder.py b/kafka/streams/processor/topology_builder.py index 223cda9..3a2679a 100644 --- a/kafka/streams/processor/topology_builder.py +++ b/kafka/streams/processor/topology_builder.py @@ -108,7 +108,7 @@ class SinkNodeFactory(NodeFactory): self.partitioner = partitioner def build(self, application_id): - if self.topic in self.builder.internal_topics: + if self.topic in self.builder.internal_topic_names: sink_name = application_id + '-' + self.topic else: sink_name = self.topic |