summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-31 20:41:17 -0700
committerDana Powers <dana.powers@gmail.com>2016-09-24 14:01:05 -0700
commit31993945c031826dd86ad877f2bf51a49c3d0f28 (patch)
tree32be278ce5cb17c524671bc8d7bf3a1fa3a72d79
parentc72b69ba58eb88ce90ddb3db6e3ff2ff7cb2ab64 (diff)
downloadkafka-python-31993945c031826dd86ad877f2bf51a49c3d0f28.tar.gz
Fixup kafka streams consumer configs
-rw-r--r--kafka/streams/processor/stream_thread.py13
1 files changed, 7 insertions, 6 deletions
diff --git a/kafka/streams/processor/stream_thread.py b/kafka/streams/processor/stream_thread.py
index dd4f2ae..7c4ddff 100644
--- a/kafka/streams/processor/stream_thread.py
+++ b/kafka/streams/processor/stream_thread.py
@@ -142,13 +142,14 @@ class StreamThread(Process):
consumer['group_id'] = self.config['application_id']
consumer['client_id'] = self.config['thread_client_id'] + '-consumer'
- consumer['stream_thread_instance'] = self
- consumer['replication_factor'] = self.config['replication_factor']
- consumer['num_standby_replicas'] = self.config['num_standby_replicas']
- consumer['zookeeper_connect'] = self.config['zookeeper_connect']
+
consumer['partition_assignment_strategy'] = [
- StreamPartitionAssignor(**consumer)
- ]
+ StreamPartitionAssignor(
+ stream_thread_instance=self,
+ replication_factor=self.config['replication_factor'],
+ num_standby_replicas=self.config['num_standby_replicas'],
+ zookeeper_connect=self.config['zookeeper_connect'],
+ **consumer)]
return consumer
def producer_configs(self):