summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
m---------kafka-src0
-rw-r--r--kafka/client.py2
-rw-r--r--kafka/consumer.py1
-rw-r--r--test/integration.py5
4 files changed, 3 insertions, 5 deletions
diff --git a/kafka-src b/kafka-src
-Subproject 3c27988ca4036985f4c7bef62b9bbae3f95f0fb
+Subproject 9ff4e8eb10e0ddd86f257e99d55971a13242660
diff --git a/kafka/client.py b/kafka/client.py
index 1c7fc93..8498a08 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -216,7 +216,6 @@ class KafkaClient(object):
return out
def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None):
- raise NotImplementedError("Broker-managed offsets not supported in 0.8")
resps = self._send_broker_aware_request(payloads,
partial(KafkaProtocol.encode_offset_commit_request, group=group),
KafkaProtocol.decode_offset_commit_response)
@@ -231,7 +230,6 @@ class KafkaClient(object):
return out
def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None):
- raise NotImplementedError("Broker-managed offsets not supported in 0.8")
resps = self._send_broker_aware_request(payloads,
partial(KafkaProtocol.encode_offset_commit_fetch, group=group),
KafkaProtocol.decode_offset_fetch_response)
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 4ce62e2..b9d4d9e 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -108,7 +108,6 @@ class SimpleConsumer(object):
partitions: list of partitions to commit, default is to commit all of them
"""
- raise NotImplementedError("Broker-managed offsets not supported in 0.8")
# short circuit if nothing happened
if self.count_since_commit == 0:
diff --git a/test/integration.py b/test/integration.py
index a1fcce7..609cfc6 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -6,6 +6,7 @@ import shlex
import shutil
import socket
import subprocess
+import sys
import tempfile
from threading import Thread, Event
import time
@@ -73,7 +74,8 @@ class KafkaFixture(Thread):
args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, self.zk_chroot))
proc = subprocess.Popen(args)
ret = proc.wait()
- assert ret == 0
+ if ret != 0:
+ sys.exit(1)
# Start Kafka
@@ -354,7 +356,6 @@ class TestKafkaClient(unittest.TestCase):
# Offset Tests #
####################
- @unittest.skip("No supported until 0.8.1")
def test_commit_fetch_offsets(self):
req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata")
(resp,) = self.client.send_offset_commit_request("group", [req])