-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer.py
More file actions
116 lines (98 loc) · 3.53 KB
/
consumer.py
File metadata and controls
116 lines (98 loc) · 3.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import os
import uuid
import argparse
import asyncio
import logging
import colorlog
from aiokafka import AIOKafkaConsumer
# ANSI styles for terminal colors
STYLES = {
"blue": "\033[1;34m",
"yellow": "\033[1;33m",
"red": "\033[1;31m",
"green": "\033[1;32m",
"magenta": "\033[1;35m",
"cyan": "\033[1;36m",
"reset": "\033[0m"
}
# Emoji + color by topic
TOPIC_META = {
"alerts": {"emoji": "🚨", "style": STYLES["red"]},
"health": {"emoji": "💚", "style": STYLES["green"]},
"transactions": {"emoji": "💸", "style": STYLES["cyan"]},
"default": {"emoji": "📩", "style": STYLES["blue"]},
}
# Colorlog setup
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
"%(log_color)s%(message)s",
log_colors={
"DEBUG": "cyan",
"INFO": "white",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "bold_red",
}
))
logger = colorlog.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
async def consume(topic: str):
bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
group_id = f"test-group-{uuid.uuid4()}"
if not bootstrap_servers:
logger.error("KAFKA_BOOTSTRAP_SERVERS environment variable is not set.")
return
logger.info(f"🔌 KAFKA_BOOTSTRAP_SERVERS: {bootstrap_servers}")
logger.info(f"📦 Kafka topic: {topic}")
logger.info(f"👥 Consumer group: {group_id}")
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
auto_offset_reset='earliest'
)
await consumer.start()
try:
logger.info(f"🧲 Subscribed to {topic}, waiting for messages...\n")
async for msg in consumer:
decoded = msg.value.decode('utf-8')
partition = msg.partition
topic_name = msg.topic
# Determine style & emoji based on topic
meta = TOPIC_META.get(topic_name, TOPIC_META["default"])
emoji = meta["emoji"]
style = meta["style"]
# Override style if content indicates severity
lowered = decoded.lower()
if "error" in lowered:
style = STYLES["red"]
elif "warn" in lowered:
style = STYLES["yellow"]
elif "success" in lowered or "ok" in lowered:
style = STYLES["green"]
# Print the styled message
#logger.info(f"{style}{emoji} [{topic_name}-p{partition}]: {decoded}{STYLES['reset']}")
message_style = STYLES["yellow"]
logger.info(f"{style}{emoji} [{topic_name}-p{partition}]: {message_style}{decoded}{STYLES['reset']}")
except KeyboardInterrupt:
logger.info("👋 Consumer stopped by user.")
finally:
await consumer.stop()
logger.info("🛑 Kafka consumer stopped.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Kafka Consumer")
parser.add_argument(
"--topic",
type=str,
default="test-topic",
help="Kafka topic to consume messages from (default: test-topic)"
)
args = parser.parse_args()
try:
asyncio.run(consume(args.topic))
except KeyboardInterrupt:
print("\n👋 Consumer interrupted by user.")
# HELP! My neighbor's favorite topic is pets, but he always gets my cats name wrong!
# I've suggested he look on pk-kafka-kafka-bootstrap.pk-world.svc.cluster.local for my cats name
# Think you can find my cats name?