summaryrefslogtreecommitdiff
path: root/kafka/streams/processor/context.py
blob: 14c9bf254872401febea0ee44ce69a60992a60a7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
"""

import kafka.errors as Errors

NONEXIST_TOPIC = '__null_topic__'


class ProcessorContext(object):

    def __init__(self, task_id, task, record_collector, state_mgr, **config):
        self.config = config
        self._task = task
        self.record_collector = record_collector
        self.task_id = task_id
        self.state_mgr = state_mgr

        #self.metrics = metrics
        self.key_serializer = config['key_serializer']
        self.value_serializer = config['value_serializer']
        self.key_deserializer = config['key_deserializer']
        self.value_deserializer = config['value_deserializer']
        self._initialized = False

    def initialized(self):
        self._initialized = True

    @property
    def application_id(self):
        return self._task.application_id

    def state_dir(self):
        return self.state_mgr.base_dir()

    def register(self, state_store, logging_enabled, state_restore_callback):
        if self._initialized:
            raise Errors.IllegalStateError('Can only create state stores during initialization.')

        self.state_mgr.register(state_store, logging_enabled, state_restore_callback)

    def get_state_store(self, name):
        """
        Raises TopologyBuilderError if an attempt is made to access this state store from an unknown node
        """
        node = self._task.node()
        if not node:
            raise Errors.TopologyBuilderError('Accessing from an unknown node')

        # TODO: restore this once we fix the ValueGetter initialization issue
        #if (!node.stateStores.contains(name))
        #    throw new TopologyBuilderException("Processor " + node.name() + " has no access to StateStore " + name);

        return self.state_mgr.get_store(name)

    def topic(self):
        if self._task.record() is None:
            raise Errors.IllegalStateError('This should not happen as topic() should only be called while a record is processed')

        topic = self._task.record().topic()
        if topic == NONEXIST_TOPIC:
            return None
        else:
            return topic

    def partition(self):
        if self._task.record() is None:
            raise Errors.IllegalStateError('This should not happen as partition() should only be called while a record is processed')
        return self._task.record().partition()

    def offset(self):
        if self._task.record() is None:
            raise Errors.IllegalStateError('This should not happen as offset() should only be called while a record is processed')
        return self._task.record().offset()

    def timestamp(self):
        if self._task.record() is None:
            raise Errors.IllegalStateError('This should not happen as timestamp() should only be called while a record is processed')
        return self._task.record().timestamp

    def forward(self, key, value, child_index=None, child_name=None):
        self._task.forward(key, value, child_index=child_index, child_name=child_name)

    def commit(self):
        self._task.need_commit()

    def schedule(self, interval):
        self._task.schedule(interval)