summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
Commit message (Collapse)AuthorAgeFilesLines
* Separate consumers/producers/partitionersDana Powers2014-09-101-698/+0
|
* Merge pull request #223 from dpkp/metadata_refactorDana Powers2014-09-081-6/+5
|\ | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | Metadata Refactor * add MetadataRequest and MetadataResponse namedtuples * add TopicMetadata namedtuple * add error codes to Topic and Partition Metadata * add KafkaClient.send_metadata_request() method * KafkaProtocol.decode_metadata_response changed to return a MetadataResponse object so that it is consistent with server api: [broker_list, topic_list] * raise server exceptions in load_metadata_for_topics(*topics) unless topics is null (full refresh) * Replace non-standard exceptions (LeaderUnavailable, PartitionUnavailable) with server standard exceptions (LeaderNotAvailableError, UnknownTopicOrPartitionError) Conflicts: kafka/client.py test/test_client.py test/test_producer_integration.py test/test_protocol.py
| * Dont need to use callbacks for offset fetch requestsDana Powers2014-09-011-4/+3
| |
| * Refactor internal metadata dicts in KafkaClientDana Powers2014-09-011-2/+2
| | | | | | | | | | | | | | | | - use helper methods not direct access - add get_partition_ids_for_topic - check for topic and partition errors during load_metadata_for_topics - raise LeaderNotAvailableError when topic is being auto-created or UnknownTopicOrPartitionError if auto-creation off
* | Merge pull request #227 from wizzat-feature/py3Dana Powers2014-09-071-4/+13
|\ \ | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | Python 3 Support Conflicts: kafka/producer.py test/test_client.py test/test_client_integration.py test/test_codec.py test/test_consumer.py test/test_consumer_integration.py test/test_failover_integration.py test/test_producer.py test/test_producer_integration.py test/test_protocol.py test/test_util.py
| * | Fix the multiprocessing tests for python 3Mark Roberts2014-09-031-1/+1
| | |
| * | Cross-python iteritemsBruno Renié2014-09-031-1/+3
| | |
| * | Make all unit tests pass on py3.3/3.4Bruno Renié2014-09-031-2/+9
| |/
* | Minor import cleanupsDana Powers2014-09-041-6/+6
|/
* Merge pull request #136 from DataDog/fix-multifetch-buffer-sizeDana Powers2014-08-261-12/+14
|\ | | | | | | | | | | | | fix consumer retry logic (fixes #135) Conflicts: kafka/consumer.py
| * fix consumer retry logic (fixes #135)Carlo Cabanilla2014-02-281-12/+14
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | Fixes bug in the follow condition: * Starting buffer size is 1024, max buffer size is 2048, both set on an instance level * Fetch from p0, p1 and received response * p0 has more than 1024 bytes, consumer doubles buffer size to 2048 and marks p0 for retry * p1 has more than 1024 bytes, consumer tries to double buffer size, but sees that it's at the max and raises ConsumerFetchSizeTooSmall The fix changes the logic to the following: * Starting buffer size is 1024 set on a per-partition level, max buffer size is 2048 set on an instance level * Fetch from p0, p1 and received response * p0 has more than 1024 bytes, consumer doubles buffer size to 2048 for p0 and marks p0 for retry * p1 has more than 1024 bytes, consumer double buffer size to 2048 for p1 and marks p1 for retry * Consumer sees that there's partitions to retry, repeats parsing loop * p0 sent all the bytes this time, consumer yields these messages * p1 sent all the bytes this time, consumer yields these messages
* | also reset `self.fetch_offsets` in `fetch_last_known_offsets`Zack Dever2014-08-251-0/+1
| |
* | Move fetching last known offset logic to a stand alone function.Zack Dever2014-08-251-10/+16
| | | | | | | | | | | | | | | | | | | | The `Consumer` class fetches the last known offsets in `__init__` if `auto_commit` is enabled, but it would be nice to expose this behavior for consumers that aren't using auto_commit. This doesn't change existing behavior, just exposes the ability to easily fetch and set the last known offsets. Once #162 or something similar lands this may no longer be necessary, but it looks like that might take a while to make it through.
* | Attempt to fix travis build. Decrease complexity of service.py in favor of ↵Mark Roberts2014-05-061-2/+2
| | | | | | | | in memory logging. Address code review concerns
* | Make commit() check for errors instead of simply assert no errorMark Roberts2014-04-301-1/+1
| |
* | Make BrokerRequestError a base class, make subclasses for each broker errorMark Roberts2014-04-301-9/+7
| |
* | Various fixesMark Roberts2014-04-251-0/+3
| | | | | | | | | | | | | | | | Bump version number to 0.9.1 Update readme to show supported Kafka/Python versions Validate arguments in consumer.py, add initial consumer unit test Make service kill() child processes when startup fails Add tests for util.py, fix Python 2.6 specific bug.
* | Fix last remaining test by making autocommit more intuitiveMark Roberts2014-04-241-1/+1
| |
* | Split out kafka version environments, default tox no longer runs any ↵Mark Roberts2014-04-231-11/+11
| | | | | | | | integration tests, make skipped integration also skip setupClass, implement rudimentary offset support in consumer.py
* | Split up and speed up producer based integration testsMark Roberts2014-04-171-1/+1
| |
* | Commit in seek if autocommitMark Roberts2014-03-271-1/+4
| |
* | Make seek(); commit(); work without commit discarding the seek changeMark Roberts2014-03-251-0/+1
|/
* Resolve conflicts for #106Omar Ghishan2014-01-281-39/+70
|\
| * Add doc string for SimpleConsumer._get_message()Omar Ghishan2014-01-201-0/+6
| |
| * Make get_messages() update and commit offsets just before returningOmar Ghishan2014-01-151-16/+35
| |
| * Only use timeout if it's not NoneOmar Ghishan2014-01-151-4/+5
| |
| * Store fetched offsets separately.Omar Ghishan2014-01-151-10/+14
| | | | | | | | | | | | | | Fetch requests can be repeated if we get a ConsumerFetchSizeTooSmall or if _fetch() is called multiple times for some reason. We don't want to re-fetch messages that are already in our queue, so store the offsets of the last enqueued messages from each partition.
| * Fix offset increments:Omar Ghishan2014-01-151-16/+17
| | | | | | | | | | * Increment the offset before returning a message rather than when putting it in the internal queue. This prevents committing the wrong offsets. * In MultiProcessConsumer, store the offset of the next message
* | Merge branch 'repr' of https://github.com/mahendra/kafka-python into ↵mrtheb2014-01-141-0/+8
|\ \ | |/ |/| | | | | | | | | | | mahendra-repr Conflicts: kafka/client.py kafka/consumer.py
| * Add proper string representations for each classMahendra M2013-10-081-0/+8
| |
* | Exception hierarchy, invalidate more md on errorsThomas Dimson2014-01-131-1/+1
| |
* | Merge pull request #88 from rdiomar/rdiomar_changesOmar2014-01-131-129/+140
|\ \ | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | Various changes/fixes, including: * Allow customizing socket timeouts * Read the correct number of bytes from kafka * Guarantee reading the expected number of bytes from the socket every time * Remove bufsize from client and conn * SimpleConsumer flow changes * Fix some error handling * Add optional upper limit to consumer fetch buffer size * Add and fix unit and integration tests
| * | Remove unnecessary bracketsOmar Ghishan2014-01-061-2/+2
| | |
| * | Add a limit to fetch buffer size, and actually retry requests when fetch ↵Omar Ghishan2014-01-061-37/+58
| | | | | | | | | | | | | | | | | | size is too small Note: This can cause fetching a message to exceed a given timeout, but timeouts are not guaranteed anyways, and in this case it's the client's fault for not sending a big enough buffer size rather than the kafka server. This can be bad if max_fetch_size is None (no limit) and there is some message in Kafka that is crazy huge, but that is why we should have some max_fetch_size.
| * | Fix seek offset deltasOmar Ghishan2014-01-061-6/+0
| | | | | | | | | | | | | | | We always store the offset of the next available message, so we shouldn't decrement the offset deltas when seeking by an extra 1
| * | Add comments and maintain 80 character line limitOmar Ghishan2014-01-061-7/+23
| | |
| * | Add iter_timeout option to SimpleConsumer. If not None, it causes the ↵Omar Ghishan2014-01-061-6/+22
| | | | | | | | | | | | | | | | | | iterator to exit when reached. Also put constant timeout values in pre-defined constants
| * | Add buffer_size param description to docstring Omar Ghishan2014-01-061-1/+2
| | |
| * | Remove SimpleConsumer queue size limit since it can cause the iteratorOmar Ghishan2014-01-061-1/+1
| | | | | | | | | | | | to block forever if it's reached.
| * | SimpleConsumer flow changes:Omar Ghishan2014-01-061-112/+70
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | * Combine partition fetch requests into a single request * Put the messages received in a queue and update offsets * Grab as many messages from the queue as requested * When the queue is empty, request more * timeout param for get_messages() is the actual timeout for getting those messages * Based on https://github.com/mumrah/kafka-python/pull/74 - don't increase min_bytes if the consumer fetch buffer size is too small. Notes: Change MultiProcessConsumer and _mp_consume() accordingly. Previously, when querying each partition separately, it was possible to block waiting for messages on partition 0 even if there are new ones in partition 1. These changes allow us to block while waiting for messages on all partitions, and reduce total number of kafka requests. Use Queue.Queue for single proc Queue instead of already imported multiprocessing.Queue because the latter doesn't seem to guarantee immediate availability of items after a put: >>> from multiprocessing import Queue >>> q = Queue() >>> q.put(1); q.get_nowait() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 152, in get_nowait return self.get(False) File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 134, in get raise Empty Queue.Empty
| * | Reset consumer fields to original values rather than defaults in FetchContextOmar Ghishan2014-01-061-3/+5
| | |
| * | Allow None timeout in FetchContext even if block is FalseOmar Ghishan2014-01-061-4/+4
| | |
| * | * Guarantee reading the expected number of bytes from the socket every timeOmar Ghishan2014-01-061-2/+5
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | * Remove bufsize from client and conn, since they're not actually enforced Notes: This commit changes behavior a bit by raising a BufferUnderflowError when no data is received for the message size rather than a ConnectionError. Since bufsize in the socket is not actually enforced, but it is used by the consumer when creating requests, moving it there until a better solution is implemented.
* | | Merge pull request #66 from jcrobak/fix-import-collisionDavid Arthur2014-01-081-0/+2
|\ \ \ | |/ / |/| | Enable absolute imports for modules using Queue.
| * | Enable absolute imports for modules using Queue.Joe Crobak2013-10-211-0/+2
| | | | | | | | | | | | | | | | | | | | | When running on Linux with code on a case-insensitive file system, imports of the `Queue` module fail because python resolves the wrong file (It is trying to use a relative import of `queue.py` in the kafka directory). This change forces absolute imports via PEP328.
* | | allow for timeout to be None in SimpleConsumer.get_messagesZack Dever2013-12-121-1/+2
|/ /
* | Ensure that multiprocess consumer works in windowsMahendra M2013-10-081-53/+63
|/
* flake8 pass (pep8 and pyflakes)mrtheb2013-10-031-23/+30
|
* remove unused exception instance variablesVetoshkin Nikita2013-10-031-1/+1
|
* Cherry-pick mrtheb/kafka-python 2b016b69mrtheb2013-10-031-1/+4
| | | | Set FetchRequest MaxBytes value to bufsize instead of fetchsize (=MinBytes)