summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-03-01 15:32:32 -0500
committerDavid Arthur <mumrah@gmail.com>2013-04-02 20:19:30 -0400
commit8fab55ad198eea41513a5daac3e92a709cb39247 (patch)
tree2be166cd3cff3215860d72733146d8191053066c /kafka/client.py
parenteac51e9c68c50f15962b6c785ede92cb3d512a17 (diff)
downloadkafka-python-8fab55ad198eea41513a5daac3e92a709cb39247.tar.gz
Removing __main__ stuff from client.py
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py66
1 files changed, 0 insertions, 66 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 5da3919..fb99910 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -806,69 +806,3 @@ class KafkaClient(object):
else:
out.append(offset_fetch_response)
return out
-
-
-if __name__ == "__main__":
- logging.basicConfig(level=logging.DEBUG)
-
- topic = "foo8"
- # Bootstrap connection
- conn = KafkaClient("localhost", 49720)
-
- # Create some Messages
- messages = (KafkaProtocol.create_gzip_message(["GZIPPed"]),
- KafkaProtocol.create_message("not-gzipped"))
-
- produce1 = ProduceRequest(topic=topic, partition=0, messages=messages)
- produce2 = ProduceRequest(topic=topic, partition=1, messages=messages)
-
- # Send the ProduceRequest
- produce_resp = conn.send_produce_request(payloads=[produce1, produce2])
-
- # Check for errors
- for resp in produce_resp:
- if resp.error != 0:
- raise Exception("ProduceRequest failed with errorcode=%d", resp.error)
- print resp
-
- # Offset commit/fetch
- #conn.send_offset_commit_request(group="group", payloads=[OffsetCommitRequest("topic-1", 0, 42, "METADATA?")])
- #conn.send_offset_fetch_request(group="group", payloads=[OffsetFetchRequest("topic-1", 0)])
-
- def init_offsets(offset_response):
- if offset_response.error not in (ErrorMapping.NO_ERROR, ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON):
- raise Exception("OffsetFetch failed: %s" % (offset_response))
- elif offset_response.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
- return 0
- else:
- return offset_response.offset
- # Load offsets
- (offset1, offset2) = conn.send_offset_fetch_request(
- group="group1",
- payloads=[OffsetFetchRequest(topic, 0),OffsetFetchRequest(topic, 1)],
- fail_on_error=False,
- callback=init_offsets
- )
- print offset1, offset2
-
- while True:
- for resp in conn.send_fetch_request(payloads=[FetchRequest(topic=topic, partition=0, offset=offset1, max_bytes=4096)]):
- i = 0
- for msg in resp.messages:
- print msg
- offset1 = msg.offset+1
- print offset1, conn.send_offset_commit_request(group="group1", payloads=[OffsetCommitRequest(topic, 0, offset1, "")])
- i += 1
- if i == 0:
- raise StopIteration("no more messages")
-
- for resp in conn.send_fetch_request(payloads=[FetchRequest(topic=topic, partition=1, offset=offset2, max_bytes=4096)]):
- i = 0
- for msg in resp.messages:
- print msg
- offset2 = msg.offset+1
- print offset2, conn.send_offset_commit_request(group="group1", payloads=[OffsetCommitRequest(topic, 1, offset2, "")])
- i += 1
- if i == 0:
- raise StopIteration("no more messages")
-