summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/streams/processor/stream_thread.py13
1 files changed, 7 insertions, 6 deletions
diff --git a/kafka/streams/processor/stream_thread.py b/kafka/streams/processor/stream_thread.py
index dd4f2ae..7c4ddff 100644
--- a/kafka/streams/processor/stream_thread.py
+++ b/kafka/streams/processor/stream_thread.py
@@ -142,13 +142,14 @@ class StreamThread(Process):
consumer['group_id'] = self.config['application_id']
consumer['client_id'] = self.config['thread_client_id'] + '-consumer'
- consumer['stream_thread_instance'] = self
- consumer['replication_factor'] = self.config['replication_factor']
- consumer['num_standby_replicas'] = self.config['num_standby_replicas']
- consumer['zookeeper_connect'] = self.config['zookeeper_connect']
+
consumer['partition_assignment_strategy'] = [
- StreamPartitionAssignor(**consumer)
- ]
+ StreamPartitionAssignor(
+ stream_thread_instance=self,
+ replication_factor=self.config['replication_factor'],
+ num_standby_replicas=self.config['num_standby_replicas'],
+ zookeeper_connect=self.config['zookeeper_connect'],
+ **consumer)]
return consumer
def producer_configs(self):