Kafka Producer ๋ฐ Consumer ์ค์ตํด๋ณด๊ธฐ with Python (+ksqlDB) - TIL230713
๐ KDT WEEK 15 DAY 4 TIL
- Kafka ๊ธฐ๋ณธ ํ๋ก๊ทธ๋๋ฐ
- Topic ์์ฑ ๋ฐ ํ๋ผ๋ฏธํฐ ์ค์
- Consumer ์์ฑ ๋ฐ ํ๋ผ๋ฏธํฐ ์ค์
- ksqlDB
๐ฅ Kafka ๊ธฐ๋ณธ ํ๋ก๊ทธ๋๋ฐ
๋์ Kafka ์ค์น ํ๊ฒฝ์ conduktor์ฌ๊ฐ ์ ๊ณตํ๋ Repo๋ฅผ git cloneํด์ ๊ฐ์ ธ์๋ค.
์ด ๊ธ์ ์ด ํ๊ฒฝ์ ๊ธฐ๋ฐ์ผ๋ก ์ค๋ช ํ๋ค.
์ฐธ๊ณ : https://sunhokimdev.tistory.com/66
Kafka ์์๋ณด๊ณ ์ค์นํด๋ณด๊ธฐ - TIL230712
๐ KDT WEEK 15 DAY 3 TIL Kafka Kafka ์ํคํ ์ฒ Kafka ๊ธฐํ ๊ธฐ๋ฅ Kafka ์ค์น ๋ฐ ๊ฐ๋จ ์ค์ต ๐ฅ Kafka ์ค์๊ฐ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ธฐ ์ํด ์ค๊ณ๋ ์คํ์์ค ๋ถ์ฐ ์คํธ๋ฆฌ๋ฐ ํ๋ซํผ ๋ถ์ฐ ์ํคํ ์ฒ๋ฅผ ๋ฐ๋ฅด๊ธฐ ๋๋ฌธ์ Sca
sunhokimdev.tistory.com
ํฐ๋ฏธ๋์์ ๊ฐ๋จํ๊ฒ Kafka๋ฅผ ์ฌ์ฉํด๋ณด๋ ์ค์ต์ ํด๋ณด์.
1. Topics ๋ฆฌ์คํธํ๊ธฐ
docker ps # Broker์ ํด๋นํ๋ ์ปจํ
์ด๋์ ID ํ์ธ
docker exec -it Broker_ContainerID sh # ์๋ก ์คํ
docker-compose.yml์ ์์ฑ๋ Broker์ ์ด๋ฆ์ ์ฐพ์ ๊ทธ ์ปจํ ์ด๋ ID๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋ค.
๋์ ๊ฐ์ ํ๊ฒฝ์ผ๋ก Kafka๋ฅผ ๊ตฌ์ถํ๋ค๋ฉด, kafka1์ ์ปจํ ์ด๋ ID๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋ค.
์์์ ๋ค์์ ๋ช ๋ น์ด๋ฅผ ์ ๋ ฅํ๋ฉด ํ์ฌ ๊ฐ์ง๊ณ ์๋ Topic์ ๋ฆฌ์คํธํ ์ ์๋ค.
kafka-topics --bootstrap-server kafka1:9092 --list
kafka-topics์ผ๋ก Topic์ ๊ด๋ จ๋ ๋ช ๋ น๋ค์ ์ํํ ์ ์๋ค.
--bootstrap-server๋ ์ฌ์ฉํ Broker๋ฅผ ์ง์ ํ๋ ๋ช ๋ น์ด์ธ๋ฐ, ๋ด ํ๊ฒฝ์์๋ Broker๊ฐ ๋์ปค ์ปจํ ์ด๋์ ๋ค์ด๊ฐ์๋ค.
๋ฐ๋ผ์ Broker ์ปจํ ์ด๋์ ํธ์คํธ ์ด๋ฆ๊ณผ Broker๊ฐ ๋๊ณ ์๋ ํฌํธ๋ฒํธ๋ฅผ ๋ฃ์ด ์ฌ์ฉํ๋ค.
docker-connect-*๋ Kafka Connect๊ฐ ์ฌ์ฉํ๋ ์ผ์ข ์ DB๊ฐ์ Topic๋ค์ด๋ค.
__consumer_offsets์ ์ด๋ค consumer๊ฐ ์ด๋ค topic์ ์ด๋ค ๋ฐ์ดํฐ๊น์ง ์ฝ์๋์ง ๊ธฐ๋ก๋๋ topic์ด๋ค.
์ฌ๊ธฐ์ ๋ฆฌ์คํธ๋ ์ด๋ฆ์ ํ์ธํด์, ์ง์ฐ๊ณ ์ถ์ ํ ํฝ์ ๋ค์ ๋ช ๋ น์ด๋ก ์ง์ธ ์ ์๋ค.
kafka-topics --bootstrap-server kafka1:9092 --delete --topic ํ ํฝ์ด๋ฆ
2. Producer์ Consumer ์ฌ์ฉํด๋ณด๊ธฐ
์์์ ๊ฐ๊ฐ Producer์ Consumer๋ฅผ ์ฌ์ฉํ๋ ๋ฐฉ๋ฒ์ ๋ค์๊ณผ ๊ฐ์ด ์ ๋ ฅํ๋ฉด ๋๋ค.
kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console # Producer
kafka-console-consumer --bootstrap-server kafka1:9092 --topic test_console # Consumer
test_console์ ์ฌ์ฉํ๊ณ ์ ํ๋ topic์ ์ด๋ฆ์ด ๋๋ค.
ํฐ๋ฏธ๋ ๋ ๊ฐ๋ฅผ ๋์์ ์ด์ด์ ๊ฐ๊ฐ ์์ ์ง์ ํ ํ, Producer์ Consumer๋ฅผ ์ ๋ช ๋ น์ด๋ก ํ๋์ฉ ์ฌ์ฉํด๋ณด์.
๊ทธ๋ฆฌ๊ณ Producer์์ ๊ฐ์ ์ ๋ ฅํ ๋๋ง๋ค ์ค์๊ฐ์ผ๋ก Consumer์ ๋ฐ์๋๋ ๊ฒ์ ํ์ธํ ์ ์๋ค!
๐ฆ Topic ์์ฑ ๋ฐ Topic ํ๋ผ๋ฏธํฐ ์ค์
์ด์ ๊น์ง๋ ์๋์ผ๋ก Topic์ด ์์ฑ๋๋๋กํ์ฌ ๋ฐ๋ก ํ๋ผ๋ฏธํฐ๋ฅผ ์ค์ ํ์ง ์์๋ค.
Topic์ ํ๋ผ๋ฏธํฐ๋ฅผ ์ค์ ํจ์ผ๋ก์จ ํํฐ์ ์ ์์ ๋ณต์ ๋ณธ์ ์๋ ์กฐ์ ํ ์ ์๋ค.
Producer ํ์ด์ฌ ํ์ผ์์์ ์๋ก์ด Topic์ ์์ฑํ๊ณ ๋ฐ์ดํฐ๋ฅผ ๋ฃ๋ ์ค์ต์ ํด๋ณด์๋ค.
๋จผ์ producer๋ก ์ฌ์ฉํ ํ์ด์ฌ ํ์ผ์ ํ๋ ๋ง๋ค์๋ค.
fake_person_producer.py
import uuid
import json
from typing import List
from person import Person # ๋ชจ๋ธ์ด ์๋ person.py์์ Person ํด๋์ค ์ฐธ์กฐ
from faker import Faker # ๊ฐ์ง ์ฌ๋ ๋ฐ์ดํฐ๋ฅผ ๋ง๋๋๋ฐ ์ฌ์ฉ
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError # Topic์ด ์ด๋ฏธ ์กด์ฌํ ๊ฒฝ์ฐ ์๋ฌ
from kafka import KafkaAdminClient
from kafka.producer import KafkaProducer
# ์๋ก์ด Topic์ ์์ฑํ๋ ํจ์
# ์ธ์๋ ์์๋๋ก Broker ๋ฆฌ์คํธ, Topic ์ด๋ฆ, ํํฐ์
์, ๋ณต์ ๋ณธ ์
def create_topic(bootstrap_servers, name, partitions, replica=1):
client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
try:
topic = NewTopic(
name=name,
num_partitions=partitions,
replication_factor=replica)
client.create_topics([topic])
except TopicAlreadyExistsError as e: # Topic์ด ์ด๋ฏธ ์กด์ฌํ๋ ๊ฒฝ์ฐ ์๋ฌ ๋ฐ์ํ์ง๋ง, ๊ทธ๋ฅ passํ๋๋ก ๊ตฌํ
print(e)
pass
finally:
client.close()
def main():
topic_name = "fake_people"
bootstrap_servers = ["localhost:9092"]
# fake_people ์ด๋ฆ์ผ๋ก 4๊ฐ์ ํํฐ์
์ ๊ฐ๋ ์๋ก์ด Topic ์์ฑ
create_topic(bootstrap_servers, topic_name, 4)
faker = Faker()
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
client_id="Fake_Person_Producer", # Producer์ ์ด๋ฆ ์ฃผ๊ธฐ
)
# faker ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํด์ ๋๋คํ ์ฌ๋ 100๋ช
๋ฐ์ดํฐ Topic์ ๋ณด๋ด๊ธฐ
for _ in range(100):
person = Person(id=str(uuid.uuid4()), name=faker.name(), title=faker.job().title())
producer.send(
topic=topic_name,
key=person.title.lower().replace(r's+', '-').encode('utf-8'), # ์๋ฌธ์๋ก ๋ฐ๊พธ๊ณ , ๊ณต๋ฐฑ์ด ์ฌ๋ฌ ๊ฐ์ธ ๊ฒฝ์ฐ '-'๋ก ๋ฐ๊ฟ
value=person.json().encode('utf-8')) # json์ผ๋ก ๋ฐ๊พธ๊ณ utf-8๋ก ์ธ์ฝ๋ฉ
producer.flush()
if __name__ == '__main__':
main()
faker๋ก ์์ฑํ title(์ง์ )์ ๊ธฐ์ค์ผ๋ก Key๋ก ๋ง๋ค์๋ค.
๋์ค์ Key๊ฐ ๋์ผํ ๋ฐ์ดํฐ๋ผ๋ฆฌ ๋ฌถ์ฌ ํํฐ์ ์ผ๋ก ๋๋๊ณ , ์ด์ ๊ด๋ จ๋ ํจ์จ์ ์ธ ์์ ์ด ๊ฐ๋ฅํ ์๋ ์์ ๊ฒ์ด๋ค.
person.py
from pydantic import BaseModel
class Person(BaseModel):
id: str
name: str
title: str
์ด ๋๊ฐ์ ํ์ผ์ Kafka๊ฐ ์ค์น๋ ํด๋์ ๋ฃ๊ณ ์คํํด๋ณด์.
python fake_persone_producer.py
์คํ์ด ์๋ฃ๋ ํ, ์น UI์์ fake_people Topic์ ํ์ธํด๋ณด๋ฉด ๋ค์ด๊ฐ ๋ฐ์ดํฐ๋ฅผ ํ์ธํ ์ ์๋ค.
Key, Value ๋ชจ๋ ์ค์ ํ ๊ฐ๋๋ก ๋ค์ด๊ฐ๋ค.
๐ฉ Consumer ์์ฑ ๋ฐ ํ๋ผ๋ฏธํฐ ์ค์
Consumer์ ์๋ณด๋ค Partition์ ์๊ฐ ๋ ๋ง์ ๊ฒฝ์ฐ, Partition์ ๋ผ์ด๋ ๋ก๋น ๋ฐฉ์์ผ๋ก Consumer๋ค์๊ฒ ํ ๋น๋๋ค.
์ด๋, ๊ธฐ๋ณธ์ ์ผ๋ก ํ Partition์ ํ Consumer์๊ฒ๋ง ํ ๋น๋๋ค.
๋จผ์ , ๋ฉ์์ง๋ฅผ ์ฝ๋ ๋ณด์ฅ ๋ฐฉ์์๋ Message Processing Guarantee์๋ ์ธ ๊ฐ์ง๊ฐ ์๋ค.
- Exactly Once : ๊ฐ Message๊ฐ Consumer์๊ฒ ์ ํํ ํ ๋ฒ๋ง ์ ๋ฌ๋๋ค๋ ๊ฒ์ ๋ณด์ฅํ๋ค. > ๊ตฌํ ์ด๋ ค์
- At Least Once : Consumer์๊ฒ ์ ์ด๋ ํ ๋ฒ ์ด์ ์ ๋ฌ๋๋๋ก ๋ณด์ฅํ์ง๋ง, ์ค๋ณต๋ ์๋ ์๋ค.
- At Most Once(Default) : ๋ฉ์์ง๊ฐ ์ ๋ฌ๋ ์๋ ์๊ณ , ์๋ ์๋ ์์ง๋ง ์ค๋ณต ์ ๋ฌ๋๋ ๊ฒฝ์ฐ๋ ์๋ค.
์ด ๋ฐฉ์๋ค ์ค์์ ๋์ ํ๊ฒฝ์ด๋ ๋ฐ์ดํฐ๋ค์ ๊ณ ๋ คํ์ฌ ํ ๊ฐ์ง๋ฅผ ์ ํํด์ ๊ตฌํํ๊ฒ ๋๋ค.
Consumer๋ enable_auto_commit ์ต์ ์ด True์ธ ๊ฒฝ์ฐ์ False ๊ฒฝ์ฐ๋ก ๋๋์ด์ ๋ง๋ค์๋ค.
> enable_auto_commit = True์ธ Consumer์ ๊ฒฝ์ฐ
autocommit_consumer.py
import json
from kafka.consumer import KafkaConsumer
def key_deserializer(key):
return key.decode('utf-8')
# Producer์ Serializer์ ๋ฐ๋๋ก ๋์
def value_deserializer(value):
return json.loads(value.decode('utf-8'))
def main():
topic_name = "fake_people"
bootstrap_servers = ["localhost:9092"]
consumer_group_id = "fake_people_group"
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=consumer_group_id,
key_deserializer=key_deserializer,
value_deserializer=value_deserializer,
auto_offset_reset='earliest', # ์ฒ์ ์คํ๋ ๋ ๊ฐ์ฅ ์์ ์๋ ๊ธฐ๋ก๋ถํฐ ์ฝ์ด์ด
enable_auto_commit=True
)
consumer.subscribe([topic_name])
for record in consumer:
print(f"""
Consumed person {record.value} with key '{record.key}'
from partition {record.partition} at offset {record.offset}
""")
if __name__ == '__main__':
main()
subscribeํ Topic๋ค๋ก๋ถํฐ Producer์ ์ง๋ ฌํํ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ ์ญ์ง๋ ฌํํ๊ณ , printํ๋ค.
auto_commit ์ต์ ์ด True์ด๊ธฐ ๋๋ฌธ์, offset๊ณผ Partition ์ ๋ณด๋ค์ ์์์ Commitํ๋ค.
> enable_auto_commit = False์ธ Consumer์ ๊ฒฝ์ฐ
manualcommit_consumer.py
import json
from kafka import TopicPartition, OffsetAndMetadata # ๋ ๊ฐ์ ์๋ก์ด ๋ชจ๋ ํ์
from kafka.consumer import KafkaConsumer
def key_deserializer(key):
return key.decode('utf-8')
def value_deserializer(value):
return json.loads(value.decode('utf-8'))
def main():
topic_name = "fake_people"
bootstrap_servers = ["localhost:9092"]
consumer_group_id = "manual_fake_people_group" # ์๊น์๋ ๋ค๋ฅธ ๊ทธ๋ฃน
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=consumer_group_id,
key_deserializer=key_deserializer,
value_deserializer=value_deserializer,
auto_offset_reset='earliest',
enable_auto_commit=False # autocommit์ด False
)
consumer.subscribe([topic_name])
for record in consumer:
print(f"""
Consumed person {record.value} with key '{record.key}'
from partition {record.partition} at offset {record.offset}
""")
topic_partition = TopicPartition(record.topic, record.partition) # Topic์ ํํฐ์
์ ๋ณด๋ฅผ ์ฝ๊ธฐ
offset = OffsetAndMetadata(record.offset + 1, record.timestamp) # offset ์์น +1ํ๊ณ , timestamp๋ ์
๋ฐ์ดํธ
consumer.commit({
topic_partition: offset # ํํฐ์
์์น์ offset ์ ๋ณด COMMIT
})
if __name__ == '__main__':
main()
์๋ก์ด ๋ ๊ฐ์ง ๋ชจ๋์ ์ถ๊ฐํ๊ณ , ๋ช ์ค์ ๋ ์ถ๊ฐ๋์๋ค.
๊ฐ๋ฐ์๊ฐ ์ง์ Commit์ด ๊ฐ๋ฅํ๋๋ก ์ฝ๋๋ฅผ ์์ฑํด์ผ ํ๋ค.
TopicPartition ๋ชจ๋๋ก partition ์ ๋ณด๋ฅผ ๋ด๊ณ , OffsetAndMetadata ๋ชจ๋๋ก offset ์ ๋ณด๋ฅผ ๋ด์ ๊ฐ๊ฐ Key์ Value๋ก Commit ํ๋ค.
๋์ผํ ๊ฒฐ๊ณผ๊ฐ ๋ํ๋๋ค๋ฉด ์ฑ๊ณต์ด๋ค.
์น UI์์ ์์ฑ๋ Consumer Group์ ํ์ธํ ์ ์๋ค.
autocommit_consumer.py๋ก ์์ฑ๋ fake_people_group์ ์๊น ๋์์ ๋ฉ์ท๊ธฐ ๋๋ฌธ์ State๊ฐ EMPTY๋ก ๋ํ๋๋ค.
manualcommit_consumer.py๋ก ์์ฑ๋ manual_fake_people_group์ ์ง๊ธ๋ ๋์์ ํ๊ณ ์์ด์,
State๊ฐ STABLE์ด๊ณ , Members๋ ํ๋ ์๋ ๊ฒ์ผ๋ก ๋ํ๋๊ณ ์๋ค.
manual_fake_people_group์ Partition ์ ๋ณด์๋ ์ด๋ค ๊ฒฐ๊ณผ๊ฐ ์์๋์ง ๋๋ต ๋ํ๋๋ค.
๐จ ksqlDB
REST API๋ ksql ํด๋ผ์ด์ธํธ ํด์ ์ฌ์ฉํ๋ฉด Topic์ ํ ์ด๋ธ์ฒ๋ผ SQL ์กฐ์์ด ๊ฐ๋ฅํด์ง๋ค.
CREATE STREAM ๋ช ๋ น์ด๋ก ์๋ก์ด Stream์ ํ๋ ์์ฑํด์, ํ ์ด๋ธ์ฒ๋ผ ๋ค๋ฃฐ ์ ์๋ค.
์ด ๋ฐฉ๋ฒ ๋ํ ํ๋์ Consumer๋ก ๋์ํ๋ ๊ฒ์ผ๋ก ์ทจ๊ธํด์, Consumer Group๊ณผ Consumer ID๋ฅผ ์๋์ผ๋ก ๊ฐ๊ฒ ๋๋ค.
ํ์ง๋ง Kafka Topic์ ๋ณ๊ฐ์ ksqlDB๋ฅผ ์ํ Topic์ด ์กด์ฌํ์ฌ ๊ทธ ๋ฐ์ ์ ์ฅ๋๋ฏ๋ก Consumer Group ์ชฝ์ ๋ํ๋์ง๋ ์๋๋ค.
docker ps # ksqldb-server ์ปจํ
์ด๋ ID ์ฐพ๊ธฐ
docker exec -it ContainerID sh # ksqldb-server ์์ ์ ์
ksql # ksql ์ ์
์ฌ๊ธฐ์ ์๊น ์์ฑํ๋ Topic์ ๋ฐ์ดํฐ๋ฅผ ํ์ธํด๋ณด์.
CREATE STREAM my_stream (id STRING, name STRING, title STRING) with (kafka_topic='fake_people', value_format='JSON');
SELECT * FROM my_stream;
fake_people Topic ์ ๋ณด๋ฅผ ํ ๋๋ก id, name, title ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์จ๋ค.
์ฌ๊ธฐ์ Producer๊ฐ ๋ฐ์ดํฐ๋ฅผ ์์ฑํ ๋์ ์๊ฐ ๋ฐ์ดํฐ๋ฅผ ์๋ก์ด ์ปฌ๋ผ์ผ๋ก ๋ถ์ฌ๋ณด์.
SELECT *, ROWTIME FROM my_stream;
ROWTIME์ ์ฌ์ฉํ๋ฉด ๊ทธ ์ญํ ์ ํด์ค๋ค.
์ฐธ๊ณ ๋ก ๋ค์ EMIT CHANGES ์ ์ถ๊ฐํ๋ฉด ์๊น ์ฝ์ด์จ ๋ฐ์ดํฐ ๋ค๋ก ์๋ก์ด ๋ฐ์ดํฐ๋ง ์ฝ์ด์จ๋ค.