diff options
-rw-r--r-- | docs/apidoc/KafkaClient.rst | 2 | ||||
-rw-r--r-- | kafka/__init__.py | 22 | ||||
-rw-r--r-- | kafka/client.py | 8 | ||||
-rw-r--r-- | kafka/conn.py | 3 | ||||
-rw-r--r-- | kafka/consumer/base.py | 4 | ||||
-rw-r--r-- | kafka/consumer/multiprocess.py | 5 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 6 | ||||
-rw-r--r-- | test/test_package.py | 15 |
8 files changed, 51 insertions, 14 deletions
diff --git a/docs/apidoc/KafkaClient.rst b/docs/apidoc/KafkaClient.rst index 5c9d736..04f4e6e 100644 --- a/docs/apidoc/KafkaClient.rst +++ b/docs/apidoc/KafkaClient.rst @@ -1,5 +1,5 @@ KafkaClient =========== -.. autoclass:: kafka.KafkaClient +.. autoclass:: kafka.client.KafkaClient :members: diff --git a/kafka/__init__.py b/kafka/__init__.py index 2a99847..68ba597 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -4,14 +4,28 @@ __author__ = 'Dana Powers' __license__ = 'Apache License 2.0' __copyright__ = 'Copyright 2016 Dana Powers, David Arthur, and Contributors' -from kafka.client import KafkaClient as SimpleClient -from kafka.client_async import KafkaClient +from kafka.consumer import KafkaConsumer from kafka.conn import BrokerConnection from kafka.protocol import ( create_message, create_gzip_message, create_snappy_message) -from kafka.producer import SimpleProducer, KeyedProducer from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner -from kafka.consumer import KafkaConsumer, SimpleConsumer, MultiProcessConsumer + +# To be deprecated when KafkaProducer interface is released +from kafka.client import SimpleClient +from kafka.producer import SimpleProducer, KeyedProducer + +# deprecated in favor of KafkaConsumer +from kafka.consumer import SimpleConsumer, MultiProcessConsumer + + +import warnings +class KafkaClient(SimpleClient): + def __init__(self, *args, **kwargs): + warnings.warn('The legacy KafkaClient interface has been moved to' + ' kafka.SimpleClient - this import will break in a' + ' future release', DeprecationWarning) + super(KafkaClient, self).__init__(*args, **kwargs) + __all__ = [ 'KafkaConsumer', 'KafkaClient', 'BrokerConnection', diff --git a/kafka/client.py b/kafka/client.py index 14e71bb..a517997 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -19,11 +19,17 @@ from kafka.conn import ( ConnectionStates) from kafka.protocol import KafkaProtocol +# New KafkaClient +# this is not exposed in top-level imports yet, +# due to conflicts with legacy SimpleConsumer / SimpleProducer usage +from kafka.client_async import KafkaClient + log = logging.getLogger(__name__) -class KafkaClient(object): +# Legacy KafkaClient interface -- will be deprecated soon +class SimpleClient(object): CLIENT_ID = b'kafka-python' diff --git a/kafka/conn.py b/kafka/conn.py index 6ee5f5f..0c8d002 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -9,6 +9,7 @@ import socket import struct from threading import local import time +import warnings import six @@ -375,6 +376,8 @@ class KafkaConnection(local): in seconds. None means no timeout, so a request can block forever. """ def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): + warnings.warn('KafkaConnection has been deprecated and will be' + ' removed in a future release', DeprecationWarning) super(KafkaConnection, self).__init__() self.host = host self.port = port diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 2059d92..78f376e 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -4,6 +4,7 @@ import atexit import logging import numbers from threading import Lock +import warnings import kafka.common from kafka.common import ( @@ -46,6 +47,9 @@ class Consumer(object): auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): + warnings.warn('deprecated -- this class will be removed in a future' + ' release. Use KafkaConsumer instead.', + DeprecationWarning) self.client = client self.topic = topic self.group = group diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 9358b09..fddb269 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -4,6 +4,7 @@ from collections import namedtuple import logging from multiprocessing import Process, Manager as MPManager import time +import warnings from six.moves import queue @@ -135,6 +136,10 @@ class MultiProcessConsumer(Consumer): partitions_per_proc=0, **simple_consumer_options): + warnings.warn('This class has been deprecated and will be removed in a' + ' future release. Use KafkaConsumer instead', + DeprecationWarning) + # Initiate the base consumer class super(MultiProcessConsumer, self).__init__( client, group, topic, diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 29eb480..77c99b1 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -7,6 +7,7 @@ except ImportError: import logging import sys import time +import warnings import six from six.moves import queue @@ -40,6 +41,8 @@ class FetchContext(object): Class for managing the state of a consumer during fetch """ def __init__(self, consumer, block, timeout): + warnings.warn('deprecated - this class will be removed in a future' + ' release', DeprecationWarning) self.consumer = consumer self.block = block @@ -116,6 +119,9 @@ class SimpleConsumer(Consumer): max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, iter_timeout=None, auto_offset_reset='largest'): + warnings.warn('deprecated - this class will be removed in a future' + ' release. Use KafkaConsumer instead.', + DeprecationWarning) super(SimpleConsumer, self).__init__( client, group, topic, partitions=partitions, diff --git a/test/test_package.py b/test/test_package.py index e91753c..eb53027 100644 --- a/test/test_package.py +++ b/test/test_package.py @@ -1,29 +1,28 @@ from . import unittest + class TestPackage(unittest.TestCase): def test_top_level_namespace(self): import kafka as kafka1 - self.assertEqual(kafka1.KafkaClient.__name__, "KafkaClient") - self.assertEqual(kafka1.client.__name__, "kafka.client") + self.assertEqual(kafka1.KafkaConsumer.__name__, "KafkaConsumer") + self.assertEqual(kafka1.consumer.__name__, "kafka.consumer") self.assertEqual(kafka1.codec.__name__, "kafka.codec") def test_submodule_namespace(self): import kafka.client as client1 self.assertEqual(client1.__name__, "kafka.client") - self.assertEqual(client1.KafkaClient.__name__, "KafkaClient") from kafka import client as client2 self.assertEqual(client2.__name__, "kafka.client") - self.assertEqual(client2.KafkaClient.__name__, "KafkaClient") - from kafka.client import KafkaClient as KafkaClient1 - self.assertEqual(KafkaClient1.__name__, "KafkaClient") + from kafka.client import SimpleClient as SimpleClient1 + self.assertEqual(SimpleClient1.__name__, "SimpleClient") from kafka.codec import gzip_encode as gzip_encode1 self.assertEqual(gzip_encode1.__name__, "gzip_encode") - from kafka import KafkaClient as KafkaClient2 - self.assertEqual(KafkaClient2.__name__, "KafkaClient") + from kafka import SimpleClient as SimpleClient2 + self.assertEqual(SimpleClient2.__name__, "SimpleClient") from kafka.codec import snappy_encode self.assertEqual(snappy_encode.__name__, "snappy_encode") |