summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/subscription_state.py1
-rw-r--r--kafka/coordinator/abstract.py5
2 files changed, 5 insertions, 1 deletions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index 5330e9f..38d4571 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -172,6 +172,7 @@ class SubscriptionState(object):
for tp in assignments:
self._add_assigned_partition(tp)
self.needs_partition_assignment = False
+ log.info("Updated partition assignment: %s", assignments)
def unsubscribe(self):
self.subscription = None
diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py
index b0413d5..78e8d74 100644
--- a/kafka/coordinator/abstract.py
+++ b/kafka/coordinator/abstract.py
@@ -264,13 +264,16 @@ class AbstractCoordinator(object):
def _handle_join_group_response(self, future, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
- log.debug("Joined group: %s", response)
self.member_id = response.member_id
self.generation = response.generation_id
self.rejoin_needed = False
self.protocol = response.group_protocol
+ log.info("Joined group '%s' (generation %s) with member_id %s",
+ self.group_id, self.generation, self.member_id)
#self.sensors.join_latency.record(response.requestLatencyMs())
if response.leader_id == response.member_id:
+ log.info("Elected group leader -- performing partition"
+ " assignments using %s", self.protocol)
self._on_join_leader(response).chain(future)
else:
self._on_join_follower().chain(future)