summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client_async.py166
1 files changed, 83 insertions, 83 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 2c6413a..e94b65d 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -41,12 +41,92 @@ log = logging.getLogger('kafka.client')
class KafkaClient(object):
"""
- A network client for asynchronous request/response network i/o.
- This is an internal class used to implement the
- user-facing producer and consumer clients.
+ A network client for asynchronous request/response network I/O.
+
+ This is an internal class used to implement the user-facing producer and
+ consumer clients.
This class is not thread-safe!
+
+ Keyword Arguments:
+ bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
+ strings) that the consumer should contact to bootstrap initial
+ cluster metadata. This does not have to be the full node list.
+ It just needs to have at least one broker that will respond to a
+ Metadata API Request. Default port is 9092. If no servers are
+ specified, will default to localhost:9092.
+ client_id (str): a name for this client. This string is passed in
+ each request to servers and can be used to identify specific
+ server-side log entries that correspond to this client. Also
+ submitted to GroupCoordinator for logging with respect to
+ consumer group administration. Default: 'kafka-python-{version}'
+ reconnect_backoff_ms (int): The amount of time in milliseconds to
+ wait before attempting to reconnect to a given host.
+ Default: 50.
+ request_timeout_ms (int): Client request timeout in milliseconds.
+ Default: 40000.
+ retry_backoff_ms (int): Milliseconds to backoff when retrying on
+ errors. Default: 100.
+ max_in_flight_requests_per_connection (int): Requests are pipelined
+ to kafka brokers up to this number of maximum requests per
+ broker connection. Default: 5.
+ receive_buffer_bytes (int): The size of the TCP receive buffer
+ (SO_RCVBUF) to use when reading data. Default: None (relies on
+ system defaults). Java client defaults to 32768.
+ send_buffer_bytes (int): The size of the TCP send buffer
+ (SO_SNDBUF) to use when sending data. Default: None (relies on
+ system defaults). Java client defaults to 131072.
+ socket_options (list): List of tuple-arguments to socket.setsockopt
+ to apply to broker connection sockets. Default:
+ [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
+ metadata_max_age_ms (int): The period of time in milliseconds after
+ which we force a refresh of metadata even if we haven't seen any
+ partition leadership changes to proactively discover any new
+ brokers or partitions. Default: 300000
+ security_protocol (str): Protocol used to communicate with brokers.
+ Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
+ ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
+ socket connections. If provided, all other ssl_* configurations
+ will be ignored. Default: None.
+ ssl_check_hostname (bool): flag to configure whether ssl handshake
+ should verify that the certificate matches the brokers hostname.
+ default: true.
+ ssl_cafile (str): optional filename of ca file to use in certificate
+ veriication. default: none.
+ ssl_certfile (str): optional filename of file in pem format containing
+ the client certificate, as well as any ca certificates needed to
+ establish the certificate's authenticity. default: none.
+ ssl_keyfile (str): optional filename containing the client private key.
+ default: none.
+ ssl_password (str): optional password to be used when loading the
+ certificate chain. default: none.
+ ssl_crlfile (str): optional filename containing the CRL to check for
+ certificate expiration. By default, no CRL check is done. When
+ providing a file, only the leaf certificate will be checked against
+ this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+.
+ default: none.
+ api_version (tuple): specify which kafka API version to use. Accepted
+ values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9), (0, 10)
+ If None, KafkaClient will attempt to infer the broker
+ version by probing various APIs. Default: None
+ api_version_auto_timeout_ms (int): number of milliseconds to throw a
+ timeout exception from the constructor when checking the broker
+ api version. Only applies if api_version is None
+ selector (selectors.BaseSelector): Provide a specific selector
+ implementation to use for I/O multiplexing.
+ Default: selectors.DefaultSelector
+ metrics (kafka.metrics.Metrics): Optionally provide a metrics
+ instance for capturing network IO stats. Default: None.
+ metric_group_prefix (str): Prefix for metric names. Default: ''
+ sasl_mechanism (str): string picking sasl mechanism when security_protocol
+ is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
+ Default: None
+ sasl_plain_username (str): username for sasl PLAIN authentication.
+ Default: None
+ sasl_plain_password (str): password for sasl PLAIN authentication.
+ Default: None
"""
+
DEFAULT_CONFIG = {
'bootstrap_servers': 'localhost',
'client_id': 'kafka-python-' + __version__,
@@ -84,86 +164,6 @@ class KafkaClient(object):
]
def __init__(self, **configs):
- """Initialize an asynchronous kafka client
-
- Keyword Arguments:
- bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
- strings) that the consumer should contact to bootstrap initial
- cluster metadata. This does not have to be the full node list.
- It just needs to have at least one broker that will respond to a
- Metadata API Request. Default port is 9092. If no servers are
- specified, will default to localhost:9092.
- client_id (str): a name for this client. This string is passed in
- each request to servers and can be used to identify specific
- server-side log entries that correspond to this client. Also
- submitted to GroupCoordinator for logging with respect to
- consumer group administration. Default: 'kafka-python-{version}'
- reconnect_backoff_ms (int): The amount of time in milliseconds to
- wait before attempting to reconnect to a given host.
- Default: 50.
- request_timeout_ms (int): Client request timeout in milliseconds.
- Default: 40000.
- retry_backoff_ms (int): Milliseconds to backoff when retrying on
- errors. Default: 100.
- max_in_flight_requests_per_connection (int): Requests are pipelined
- to kafka brokers up to this number of maximum requests per
- broker connection. Default: 5.
- receive_buffer_bytes (int): The size of the TCP receive buffer
- (SO_RCVBUF) to use when reading data. Default: None (relies on
- system defaults). Java client defaults to 32768.
- send_buffer_bytes (int): The size of the TCP send buffer
- (SO_SNDBUF) to use when sending data. Default: None (relies on
- system defaults). Java client defaults to 131072.
- socket_options (list): List of tuple-arguments to socket.setsockopt
- to apply to broker connection sockets. Default:
- [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
- metadata_max_age_ms (int): The period of time in milliseconds after
- which we force a refresh of metadata even if we haven't seen any
- partition leadership changes to proactively discover any new
- brokers or partitions. Default: 300000
- security_protocol (str): Protocol used to communicate with brokers.
- Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
- ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
- socket connections. If provided, all other ssl_* configurations
- will be ignored. Default: None.
- ssl_check_hostname (bool): flag to configure whether ssl handshake
- should verify that the certificate matches the brokers hostname.
- default: true.
- ssl_cafile (str): optional filename of ca file to use in certificate
- veriication. default: none.
- ssl_certfile (str): optional filename of file in pem format containing
- the client certificate, as well as any ca certificates needed to
- establish the certificate's authenticity. default: none.
- ssl_keyfile (str): optional filename containing the client private key.
- default: none.
- ssl_password (str): optional password to be used when loading the
- certificate chain. default: none.
- ssl_crlfile (str): optional filename containing the CRL to check for
- certificate expiration. By default, no CRL check is done. When
- providing a file, only the leaf certificate will be checked against
- this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+.
- default: none.
- api_version (tuple): specify which kafka API version to use. Accepted
- values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9), (0, 10)
- If None, KafkaClient will attempt to infer the broker
- version by probing various APIs. Default: None
- api_version_auto_timeout_ms (int): number of milliseconds to throw a
- timeout exception from the constructor when checking the broker
- api version. Only applies if api_version is None
- selector (selectors.BaseSelector): Provide a specific selector
- implementation to use for I/O multiplexing.
- Default: selectors.DefaultSelector
- metrics (kafka.metrics.Metrics): Optionally provide a metrics
- instance for capturing network IO stats. Default: None.
- metric_group_prefix (str): Prefix for metric names. Default: ''
- sasl_mechanism (str): string picking sasl mechanism when security_protocol
- is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
- Default: None
- sasl_plain_username (str): username for sasl PLAIN authentication.
- Default: None
- sasl_plain_password (str): password for sasl PLAIN authentication.
- Default: None
- """
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs: