summaryrefslogtreecommitdiff
path: root/kafka/streams/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/streams/kafka.py')
-rw-r--r--kafka/streams/kafka.py18
1 files changed, 17 insertions, 1 deletions
diff --git a/kafka/streams/kafka.py b/kafka/streams/kafka.py
index bc7dcec..9eb71e3 100644
--- a/kafka/streams/kafka.py
+++ b/kafka/streams/kafka.py
@@ -23,6 +23,7 @@ import uuid
import kafka.streams.errors as Errors
from .processor.stream_thread import StreamThread
+from .processor.partition_group import partition_grouper
from .utils import AtomicInteger
log = logging.getLogger(__name__)
@@ -82,7 +83,22 @@ class KafkaStreams(object):
DEFAULT_CONFIG = {
'application_id': None,
'bootstrap_servers': None,
+ 'client_id': 'kafka-python-streams',
+ 'zookeeper_connect': '',
+ 'state_dir': '/tmp/kafka-streams',
+ 'replication_factor': 1,
+ 'timestamp_extractor': lambda x: x.timestamp,
+ 'partition_grouper': partition_grouper,
+ 'key_serializer': None,
+ 'key_deserializer': None,
+ 'value_serializer': None,
+ 'value_deserializer': None,
+ 'commit_interval_ms': 30000,
+ 'poll_ms': 100,
'num_stream_threads': 1,
+ 'num_standby_replicas': 0,
+ 'buffered_records_per_partition': 1000,
+ 'state_cleanup_delay_ms': 60000,
}
def __init__(self, builder, **configs):
@@ -173,7 +189,7 @@ class KafkaStreams(object):
self._state = STOPPED
log.info('Stopped Kafka Stream process')
- def setUncaughtExceptionHandler(self, handler):
+ def set_uncaught_exception_handler(self, handler):
"""Sets the handler invoked when a stream thread abruptly terminates
due to an uncaught exception.