summaryrefslogtreecommitdiff
path: root/README.md
blob: 1e220543df220348db96332b1e1320989633eae3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# Kafka Python client

This module provides low-level protocol support Apache Kafka. It implements the five basic request types 
(and their responses): Produce, Fetch, MultiFetch, MultiProduce, and Offsets. Gzip and Snappy compression
is also supported.

Compatible with Apache Kafka 0.7x. Tested against 0.7.0, 0.7.1, and 0.7.2

http://incubator.apache.org/kafka/

# License

Copyright 2012, David Arthur under Apache License, v2.0. See `LICENSE`

# Status

Current version is 0.1-alpha. The current API should be pretty stable.

# Install

Install with your favorite package manager

Pip:

```shell
git clone https://github.com/mumrah/kafka-python
pip install ./kafka-python
```

Setuptools:
```shell
git clone https://github.com/mumrah/kafka-python
easy_install ./kafka-python
```

Using `setup.py` directly:
```shell
git clone https://github.com/mumrah/kafka-python
cd kafka-python
python setup.py install
```

## Optional Snappy install

Download and build Snappy from http://code.google.com/p/snappy/downloads/list

```shell
wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz
tar xzvf snappy-1.0.5.tar.gz
cd snappy-1.0.5
./configure
make
sudo make install
```

Install the `python-snappy` module
```shell
pip install python-snappy
```

# Tests

Some of the tests will fail if Snappy is not installed. These tests will throw NotImplementedError. If you see other failures,
they might be bugs - so please report them!

## Run the unit tests

```shell
python -m test.unit
```

## Run the integration tests

First, checkout the Kafka source

```shell
git submodule init
git submodule update
cd kafka-src
./sbt update
./sbt package
```

Then from the root directory, run the integration tests

```shell
python -m test.integration
```

# Usage

## Send a message to a topic

```python
from kafka.client import KafkaClient
kafka = KafkaClient("localhost", 9092)
kafka.send_messages_simple("my-topic", "some message")
kafka.close()
```

## Send several messages to a topic

Same as before, just add more arguments to `send_simple`

```python
kafka = KafkaClient("localhost", 9092)
kafka.send_messages_simple("my-topic", "some message", "another message", "and another")
kafka.close()
```

## Recieve some messages from a topic

Supply `get_message_set` with a `FetchRequest`, get back the messages and new `FetchRequest`

```python
kafka = KafkaClient("localhost", 9092)
req = FetchRequest("my-topic", 0, 0, 1024*1024)
(messages, req1) = kafka.get_message_set(req)
kafka.close()
```

The returned `FetchRequest` includes the offset of the next message. This makes 
paging through the queue very simple.

## Send multiple messages to multiple topics

For this we use the `send_multi_message_set` method along with `ProduceRequest` objects.

```python
kafka = KafkaClient("localhost", 9092)
req1 = ProduceRequest("my-topic-1", 0, [
    create_message_from_string("message one"),
    create_message_from_string("message two")
])
req2 = ProduceRequest("my-topic-2", 0, [
    create_message_from_string("nachricht ein"),
    create_message_from_string("nachricht zwei")
])
kafka.send_multi_message_set([req1, req1])
kafka.close()
```

## Iterate through all messages from an offset

The `iter_messages` method will make the underlying calls to `get_message_set`
to provide a generator that returns every message available.

```python
kafka = KafkaClient("localhost", 9092)
for msg in kafka.iter_messages("my-topic", 0, 0, 1024*1024):
    print(msg.payload)
kafka.close()
```

An optional `auto` argument will control auto-paging through results

```python
kafka = KafkaClient("localhost", 9092)
for msg in kafka.iter_messages("my-topic", 0, 0, 1024*1024, False):
    print(msg.payload)
kafka.close()
```
This will only iterate through messages in the byte range of (0, 1024\*1024)

## Create some compressed messages

```python
kafka = KafkaClient("localhost", 9092)
messages = [kafka.create_snappy_message("testing 1"),
            kafka.create_snappy_message("testing 2")]
req = ProduceRequest(topic, 1, messages)
kafka.send_message_set(req)
kafka.close()
```

## Use Kafka like a FIFO queue

Simple API: `get`, `put`, `close`.

```python
kafka = KafkaClient("localhost", 9092)
q = KafkaQueue(kafka, "my-topic", [0,1])
q.put("first")
q.put("second")
q.get() # first
q.get() # second
q.close()
kafka.close()
```

Since the producer and consumers are backed by actual `multiprocessing.Queue`, you can 
do blocking or non-blocking puts and gets.

```python
q.put("first", block=False)
q.get(block=True, timeout=10)
```