diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-31 20:41:17 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-09-24 14:01:05 -0700 |
commit | 31993945c031826dd86ad877f2bf51a49c3d0f28 (patch) | |
tree | 32be278ce5cb17c524671bc8d7bf3a1fa3a72d79 | |
parent | c72b69ba58eb88ce90ddb3db6e3ff2ff7cb2ab64 (diff) | |
download | kafka-python-31993945c031826dd86ad877f2bf51a49c3d0f28.tar.gz |
Fixup kafka streams consumer configs
-rw-r--r-- | kafka/streams/processor/stream_thread.py | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/kafka/streams/processor/stream_thread.py b/kafka/streams/processor/stream_thread.py index dd4f2ae..7c4ddff 100644 --- a/kafka/streams/processor/stream_thread.py +++ b/kafka/streams/processor/stream_thread.py @@ -142,13 +142,14 @@ class StreamThread(Process): consumer['group_id'] = self.config['application_id'] consumer['client_id'] = self.config['thread_client_id'] + '-consumer' - consumer['stream_thread_instance'] = self - consumer['replication_factor'] = self.config['replication_factor'] - consumer['num_standby_replicas'] = self.config['num_standby_replicas'] - consumer['zookeeper_connect'] = self.config['zookeeper_connect'] + consumer['partition_assignment_strategy'] = [ - StreamPartitionAssignor(**consumer) - ] + StreamPartitionAssignor( + stream_thread_instance=self, + replication_factor=self.config['replication_factor'], + num_standby_replicas=self.config['num_standby_replicas'], + zookeeper_connect=self.config['zookeeper_connect'], + **consumer)] return consumer def producer_configs(self): |