diff options
author | David Arthur <mumrah@gmail.com> | 2013-03-01 15:32:32 -0500 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-04-02 20:19:30 -0400 |
commit | 8fab55ad198eea41513a5daac3e92a709cb39247 (patch) | |
tree | 2be166cd3cff3215860d72733146d8191053066c /kafka/client.py | |
parent | eac51e9c68c50f15962b6c785ede92cb3d512a17 (diff) | |
download | kafka-python-8fab55ad198eea41513a5daac3e92a709cb39247.tar.gz |
Removing __main__ stuff from client.py
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 66 |
1 files changed, 0 insertions, 66 deletions
diff --git a/kafka/client.py b/kafka/client.py index 5da3919..fb99910 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -806,69 +806,3 @@ class KafkaClient(object): else: out.append(offset_fetch_response) return out - - -if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) - - topic = "foo8" - # Bootstrap connection - conn = KafkaClient("localhost", 49720) - - # Create some Messages - messages = (KafkaProtocol.create_gzip_message(["GZIPPed"]), - KafkaProtocol.create_message("not-gzipped")) - - produce1 = ProduceRequest(topic=topic, partition=0, messages=messages) - produce2 = ProduceRequest(topic=topic, partition=1, messages=messages) - - # Send the ProduceRequest - produce_resp = conn.send_produce_request(payloads=[produce1, produce2]) - - # Check for errors - for resp in produce_resp: - if resp.error != 0: - raise Exception("ProduceRequest failed with errorcode=%d", resp.error) - print resp - - # Offset commit/fetch - #conn.send_offset_commit_request(group="group", payloads=[OffsetCommitRequest("topic-1", 0, 42, "METADATA?")]) - #conn.send_offset_fetch_request(group="group", payloads=[OffsetFetchRequest("topic-1", 0)]) - - def init_offsets(offset_response): - if offset_response.error not in (ErrorMapping.NO_ERROR, ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON): - raise Exception("OffsetFetch failed: %s" % (offset_response)) - elif offset_response.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: - return 0 - else: - return offset_response.offset - # Load offsets - (offset1, offset2) = conn.send_offset_fetch_request( - group="group1", - payloads=[OffsetFetchRequest(topic, 0),OffsetFetchRequest(topic, 1)], - fail_on_error=False, - callback=init_offsets - ) - print offset1, offset2 - - while True: - for resp in conn.send_fetch_request(payloads=[FetchRequest(topic=topic, partition=0, offset=offset1, max_bytes=4096)]): - i = 0 - for msg in resp.messages: - print msg - offset1 = msg.offset+1 - print offset1, conn.send_offset_commit_request(group="group1", payloads=[OffsetCommitRequest(topic, 0, offset1, "")]) - i += 1 - if i == 0: - raise StopIteration("no more messages") - - for resp in conn.send_fetch_request(payloads=[FetchRequest(topic=topic, partition=1, offset=offset2, max_bytes=4096)]): - i = 0 - for msg in resp.messages: - print msg - offset2 = msg.offset+1 - print offset2, conn.send_offset_commit_request(group="group1", payloads=[OffsetCommitRequest(topic, 1, offset2, "")]) - i += 1 - if i == 0: - raise StopIteration("no more messages") - |