diff options
Diffstat (limited to 'kafka/streams/kafka.py')
-rw-r--r-- | kafka/streams/kafka.py | 18 |
1 files changed, 17 insertions, 1 deletions
diff --git a/kafka/streams/kafka.py b/kafka/streams/kafka.py index bc7dcec..9eb71e3 100644 --- a/kafka/streams/kafka.py +++ b/kafka/streams/kafka.py @@ -23,6 +23,7 @@ import uuid import kafka.streams.errors as Errors from .processor.stream_thread import StreamThread +from .processor.partition_group import partition_grouper from .utils import AtomicInteger log = logging.getLogger(__name__) @@ -82,7 +83,22 @@ class KafkaStreams(object): DEFAULT_CONFIG = { 'application_id': None, 'bootstrap_servers': None, + 'client_id': 'kafka-python-streams', + 'zookeeper_connect': '', + 'state_dir': '/tmp/kafka-streams', + 'replication_factor': 1, + 'timestamp_extractor': lambda x: x.timestamp, + 'partition_grouper': partition_grouper, + 'key_serializer': None, + 'key_deserializer': None, + 'value_serializer': None, + 'value_deserializer': None, + 'commit_interval_ms': 30000, + 'poll_ms': 100, 'num_stream_threads': 1, + 'num_standby_replicas': 0, + 'buffered_records_per_partition': 1000, + 'state_cleanup_delay_ms': 60000, } def __init__(self, builder, **configs): @@ -173,7 +189,7 @@ class KafkaStreams(object): self._state = STOPPED log.info('Stopped Kafka Stream process') - def setUncaughtExceptionHandler(self, handler): + def set_uncaught_exception_handler(self, handler): """Sets the handler invoked when a stream thread abruptly terminates due to an uncaught exception. |