diff options
-rw-r--r-- | kafka/streams/processor/stream_thread.py | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/kafka/streams/processor/stream_thread.py b/kafka/streams/processor/stream_thread.py index dd4f2ae..7c4ddff 100644 --- a/kafka/streams/processor/stream_thread.py +++ b/kafka/streams/processor/stream_thread.py @@ -142,13 +142,14 @@ class StreamThread(Process): consumer['group_id'] = self.config['application_id'] consumer['client_id'] = self.config['thread_client_id'] + '-consumer' - consumer['stream_thread_instance'] = self - consumer['replication_factor'] = self.config['replication_factor'] - consumer['num_standby_replicas'] = self.config['num_standby_replicas'] - consumer['zookeeper_connect'] = self.config['zookeeper_connect'] + consumer['partition_assignment_strategy'] = [ - StreamPartitionAssignor(**consumer) - ] + StreamPartitionAssignor( + stream_thread_instance=self, + replication_factor=self.config['replication_factor'], + num_standby_replicas=self.config['num_standby_replicas'], + zookeeper_connect=self.config['zookeeper_connect'], + **consumer)] return consumer def producer_configs(self): |