์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- Spark Caching
- off heap memory
- Spark
- Salting
- Kubernetes
- Dag
- k8s
- Kafka
- Airflow
- AQE
- Speculative Execution
- redshift
- backfill
- SQL
- spark executor memory
- Spark SQL
- Docker
- DataFrame Hint
- topic
- mysql
- CI/CD
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- Spark Partitioning
- Spark ์ค์ต
- colab
- KDT_TIL
- disk spill
- aws
- ๋น ๋ฐ์ดํฐ
- etl
- Today
- Total
JUST DO IT!
Kafka Producer ๋ฐ Consumer ์ค์ตํด๋ณด๊ธฐ with Python (+ksqlDB) - TIL230713 ๋ณธ๋ฌธ
Kafka Producer ๋ฐ Consumer ์ค์ตํด๋ณด๊ธฐ with Python (+ksqlDB) - TIL230713
sunhokimDev 2023. 7. 14. 22:19๐ KDT WEEK 15 DAY 4 TIL
- Kafka ๊ธฐ๋ณธ ํ๋ก๊ทธ๋๋ฐ
- Topic ์์ฑ ๋ฐ ํ๋ผ๋ฏธํฐ ์ค์
- Consumer ์์ฑ ๋ฐ ํ๋ผ๋ฏธํฐ ์ค์
- ksqlDB
๐ฅ Kafka ๊ธฐ๋ณธ ํ๋ก๊ทธ๋๋ฐ
๋์ Kafka ์ค์น ํ๊ฒฝ์ conduktor์ฌ๊ฐ ์ ๊ณตํ๋ Repo๋ฅผ git cloneํด์ ๊ฐ์ ธ์๋ค.
์ด ๊ธ์ ์ด ํ๊ฒฝ์ ๊ธฐ๋ฐ์ผ๋ก ์ค๋ช ํ๋ค.
์ฐธ๊ณ : https://sunhokimdev.tistory.com/66
ํฐ๋ฏธ๋์์ ๊ฐ๋จํ๊ฒ Kafka๋ฅผ ์ฌ์ฉํด๋ณด๋ ์ค์ต์ ํด๋ณด์.
1. Topics ๋ฆฌ์คํธํ๊ธฐ
docker ps # Broker์ ํด๋นํ๋ ์ปจํ
์ด๋์ ID ํ์ธ
docker exec -it Broker_ContainerID sh # ์๋ก ์คํ
docker-compose.yml์ ์์ฑ๋ Broker์ ์ด๋ฆ์ ์ฐพ์ ๊ทธ ์ปจํ ์ด๋ ID๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋ค.
๋์ ๊ฐ์ ํ๊ฒฝ์ผ๋ก Kafka๋ฅผ ๊ตฌ์ถํ๋ค๋ฉด, kafka1์ ์ปจํ ์ด๋ ID๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋ค.
์์์ ๋ค์์ ๋ช ๋ น์ด๋ฅผ ์ ๋ ฅํ๋ฉด ํ์ฌ ๊ฐ์ง๊ณ ์๋ Topic์ ๋ฆฌ์คํธํ ์ ์๋ค.
kafka-topics --bootstrap-server kafka1:9092 --list
kafka-topics์ผ๋ก Topic์ ๊ด๋ จ๋ ๋ช ๋ น๋ค์ ์ํํ ์ ์๋ค.
--bootstrap-server๋ ์ฌ์ฉํ Broker๋ฅผ ์ง์ ํ๋ ๋ช ๋ น์ด์ธ๋ฐ, ๋ด ํ๊ฒฝ์์๋ Broker๊ฐ ๋์ปค ์ปจํ ์ด๋์ ๋ค์ด๊ฐ์๋ค.
๋ฐ๋ผ์ Broker ์ปจํ ์ด๋์ ํธ์คํธ ์ด๋ฆ๊ณผ Broker๊ฐ ๋๊ณ ์๋ ํฌํธ๋ฒํธ๋ฅผ ๋ฃ์ด ์ฌ์ฉํ๋ค.
docker-connect-*๋ Kafka Connect๊ฐ ์ฌ์ฉํ๋ ์ผ์ข ์ DB๊ฐ์ Topic๋ค์ด๋ค.
__consumer_offsets์ ์ด๋ค consumer๊ฐ ์ด๋ค topic์ ์ด๋ค ๋ฐ์ดํฐ๊น์ง ์ฝ์๋์ง ๊ธฐ๋ก๋๋ topic์ด๋ค.
์ฌ๊ธฐ์ ๋ฆฌ์คํธ๋ ์ด๋ฆ์ ํ์ธํด์, ์ง์ฐ๊ณ ์ถ์ ํ ํฝ์ ๋ค์ ๋ช ๋ น์ด๋ก ์ง์ธ ์ ์๋ค.
kafka-topics --bootstrap-server kafka1:9092 --delete --topic ํ ํฝ์ด๋ฆ
2. Producer์ Consumer ์ฌ์ฉํด๋ณด๊ธฐ
์์์ ๊ฐ๊ฐ Producer์ Consumer๋ฅผ ์ฌ์ฉํ๋ ๋ฐฉ๋ฒ์ ๋ค์๊ณผ ๊ฐ์ด ์ ๋ ฅํ๋ฉด ๋๋ค.
kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console # Producer
kafka-console-consumer --bootstrap-server kafka1:9092 --topic test_console # Consumer
test_console์ ์ฌ์ฉํ๊ณ ์ ํ๋ topic์ ์ด๋ฆ์ด ๋๋ค.
ํฐ๋ฏธ๋ ๋ ๊ฐ๋ฅผ ๋์์ ์ด์ด์ ๊ฐ๊ฐ ์์ ์ง์ ํ ํ, Producer์ Consumer๋ฅผ ์ ๋ช ๋ น์ด๋ก ํ๋์ฉ ์ฌ์ฉํด๋ณด์.
๊ทธ๋ฆฌ๊ณ Producer์์ ๊ฐ์ ์ ๋ ฅํ ๋๋ง๋ค ์ค์๊ฐ์ผ๋ก Consumer์ ๋ฐ์๋๋ ๊ฒ์ ํ์ธํ ์ ์๋ค!
๐ฆ Topic ์์ฑ ๋ฐ Topic ํ๋ผ๋ฏธํฐ ์ค์
์ด์ ๊น์ง๋ ์๋์ผ๋ก Topic์ด ์์ฑ๋๋๋กํ์ฌ ๋ฐ๋ก ํ๋ผ๋ฏธํฐ๋ฅผ ์ค์ ํ์ง ์์๋ค.
Topic์ ํ๋ผ๋ฏธํฐ๋ฅผ ์ค์ ํจ์ผ๋ก์จ ํํฐ์ ์ ์์ ๋ณต์ ๋ณธ์ ์๋ ์กฐ์ ํ ์ ์๋ค.
Producer ํ์ด์ฌ ํ์ผ์์์ ์๋ก์ด Topic์ ์์ฑํ๊ณ ๋ฐ์ดํฐ๋ฅผ ๋ฃ๋ ์ค์ต์ ํด๋ณด์๋ค.
๋จผ์ producer๋ก ์ฌ์ฉํ ํ์ด์ฌ ํ์ผ์ ํ๋ ๋ง๋ค์๋ค.
fake_person_producer.py
import uuid
import json
from typing import List
from person import Person # ๋ชจ๋ธ์ด ์๋ person.py์์ Person ํด๋์ค ์ฐธ์กฐ
from faker import Faker # ๊ฐ์ง ์ฌ๋ ๋ฐ์ดํฐ๋ฅผ ๋ง๋๋๋ฐ ์ฌ์ฉ
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError # Topic์ด ์ด๋ฏธ ์กด์ฌํ ๊ฒฝ์ฐ ์๋ฌ
from kafka import KafkaAdminClient
from kafka.producer import KafkaProducer
# ์๋ก์ด Topic์ ์์ฑํ๋ ํจ์
# ์ธ์๋ ์์๋๋ก Broker ๋ฆฌ์คํธ, Topic ์ด๋ฆ, ํํฐ์
์, ๋ณต์ ๋ณธ ์
def create_topic(bootstrap_servers, name, partitions, replica=1):
client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
try:
topic = NewTopic(
name=name,
num_partitions=partitions,
replication_factor=replica)
client.create_topics([topic])
except TopicAlreadyExistsError as e: # Topic์ด ์ด๋ฏธ ์กด์ฌํ๋ ๊ฒฝ์ฐ ์๋ฌ ๋ฐ์ํ์ง๋ง, ๊ทธ๋ฅ passํ๋๋ก ๊ตฌํ
print(e)
pass
finally:
client.close()
def main():
topic_name = "fake_people"
bootstrap_servers = ["localhost:9092"]
# fake_people ์ด๋ฆ์ผ๋ก 4๊ฐ์ ํํฐ์
์ ๊ฐ๋ ์๋ก์ด Topic ์์ฑ
create_topic(bootstrap_servers, topic_name, 4)
faker = Faker()
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
client_id="Fake_Person_Producer", # Producer์ ์ด๋ฆ ์ฃผ๊ธฐ
)
# faker ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํด์ ๋๋คํ ์ฌ๋ 100๋ช
๋ฐ์ดํฐ Topic์ ๋ณด๋ด๊ธฐ
for _ in range(100):
person = Person(id=str(uuid.uuid4()), name=faker.name(), title=faker.job().title())
producer.send(
topic=topic_name,
key=person.title.lower().replace(r's+', '-').encode('utf-8'), # ์๋ฌธ์๋ก ๋ฐ๊พธ๊ณ , ๊ณต๋ฐฑ์ด ์ฌ๋ฌ ๊ฐ์ธ ๊ฒฝ์ฐ '-'๋ก ๋ฐ๊ฟ
value=person.json().encode('utf-8')) # json์ผ๋ก ๋ฐ๊พธ๊ณ utf-8๋ก ์ธ์ฝ๋ฉ
producer.flush()
if __name__ == '__main__':
main()
faker๋ก ์์ฑํ title(์ง์ )์ ๊ธฐ์ค์ผ๋ก Key๋ก ๋ง๋ค์๋ค.
๋์ค์ Key๊ฐ ๋์ผํ ๋ฐ์ดํฐ๋ผ๋ฆฌ ๋ฌถ์ฌ ํํฐ์ ์ผ๋ก ๋๋๊ณ , ์ด์ ๊ด๋ จ๋ ํจ์จ์ ์ธ ์์ ์ด ๊ฐ๋ฅํ ์๋ ์์ ๊ฒ์ด๋ค.
person.py
from pydantic import BaseModel
class Person(BaseModel):
id: str
name: str
title: str
์ด ๋๊ฐ์ ํ์ผ์ Kafka๊ฐ ์ค์น๋ ํด๋์ ๋ฃ๊ณ ์คํํด๋ณด์.
python fake_persone_producer.py
์คํ์ด ์๋ฃ๋ ํ, ์น UI์์ fake_people Topic์ ํ์ธํด๋ณด๋ฉด ๋ค์ด๊ฐ ๋ฐ์ดํฐ๋ฅผ ํ์ธํ ์ ์๋ค.
Key, Value ๋ชจ๋ ์ค์ ํ ๊ฐ๋๋ก ๋ค์ด๊ฐ๋ค.
๐ฉ Consumer ์์ฑ ๋ฐ ํ๋ผ๋ฏธํฐ ์ค์
Consumer์ ์๋ณด๋ค Partition์ ์๊ฐ ๋ ๋ง์ ๊ฒฝ์ฐ, Partition์ ๋ผ์ด๋ ๋ก๋น ๋ฐฉ์์ผ๋ก Consumer๋ค์๊ฒ ํ ๋น๋๋ค.
์ด๋, ๊ธฐ๋ณธ์ ์ผ๋ก ํ Partition์ ํ Consumer์๊ฒ๋ง ํ ๋น๋๋ค.
๋จผ์ , ๋ฉ์์ง๋ฅผ ์ฝ๋ ๋ณด์ฅ ๋ฐฉ์์๋ Message Processing Guarantee์๋ ์ธ ๊ฐ์ง๊ฐ ์๋ค.
- Exactly Once : ๊ฐ Message๊ฐ Consumer์๊ฒ ์ ํํ ํ ๋ฒ๋ง ์ ๋ฌ๋๋ค๋ ๊ฒ์ ๋ณด์ฅํ๋ค. > ๊ตฌํ ์ด๋ ค์
- At Least Once : Consumer์๊ฒ ์ ์ด๋ ํ ๋ฒ ์ด์ ์ ๋ฌ๋๋๋ก ๋ณด์ฅํ์ง๋ง, ์ค๋ณต๋ ์๋ ์๋ค.
- At Most Once(Default) : ๋ฉ์์ง๊ฐ ์ ๋ฌ๋ ์๋ ์๊ณ , ์๋ ์๋ ์์ง๋ง ์ค๋ณต ์ ๋ฌ๋๋ ๊ฒฝ์ฐ๋ ์๋ค.
์ด ๋ฐฉ์๋ค ์ค์์ ๋์ ํ๊ฒฝ์ด๋ ๋ฐ์ดํฐ๋ค์ ๊ณ ๋ คํ์ฌ ํ ๊ฐ์ง๋ฅผ ์ ํํด์ ๊ตฌํํ๊ฒ ๋๋ค.
Consumer๋ enable_auto_commit ์ต์ ์ด True์ธ ๊ฒฝ์ฐ์ False ๊ฒฝ์ฐ๋ก ๋๋์ด์ ๋ง๋ค์๋ค.
> enable_auto_commit = True์ธ Consumer์ ๊ฒฝ์ฐ
autocommit_consumer.py
import json
from kafka.consumer import KafkaConsumer
def key_deserializer(key):
return key.decode('utf-8')
# Producer์ Serializer์ ๋ฐ๋๋ก ๋์
def value_deserializer(value):
return json.loads(value.decode('utf-8'))
def main():
topic_name = "fake_people"
bootstrap_servers = ["localhost:9092"]
consumer_group_id = "fake_people_group"
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=consumer_group_id,
key_deserializer=key_deserializer,
value_deserializer=value_deserializer,
auto_offset_reset='earliest', # ์ฒ์ ์คํ๋ ๋ ๊ฐ์ฅ ์์ ์๋ ๊ธฐ๋ก๋ถํฐ ์ฝ์ด์ด
enable_auto_commit=True
)
consumer.subscribe([topic_name])
for record in consumer:
print(f"""
Consumed person {record.value} with key '{record.key}'
from partition {record.partition} at offset {record.offset}
""")
if __name__ == '__main__':
main()
subscribeํ Topic๋ค๋ก๋ถํฐ Producer์ ์ง๋ ฌํํ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ ์ญ์ง๋ ฌํํ๊ณ , printํ๋ค.
auto_commit ์ต์ ์ด True์ด๊ธฐ ๋๋ฌธ์, offset๊ณผ Partition ์ ๋ณด๋ค์ ์์์ Commitํ๋ค.
> enable_auto_commit = False์ธ Consumer์ ๊ฒฝ์ฐ
manualcommit_consumer.py
import json
from kafka import TopicPartition, OffsetAndMetadata # ๋ ๊ฐ์ ์๋ก์ด ๋ชจ๋ ํ์
from kafka.consumer import KafkaConsumer
def key_deserializer(key):
return key.decode('utf-8')
def value_deserializer(value):
return json.loads(value.decode('utf-8'))
def main():
topic_name = "fake_people"
bootstrap_servers = ["localhost:9092"]
consumer_group_id = "manual_fake_people_group" # ์๊น์๋ ๋ค๋ฅธ ๊ทธ๋ฃน
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=consumer_group_id,
key_deserializer=key_deserializer,
value_deserializer=value_deserializer,
auto_offset_reset='earliest',
enable_auto_commit=False # autocommit์ด False
)
consumer.subscribe([topic_name])
for record in consumer:
print(f"""
Consumed person {record.value} with key '{record.key}'
from partition {record.partition} at offset {record.offset}
""")
topic_partition = TopicPartition(record.topic, record.partition) # Topic์ ํํฐ์
์ ๋ณด๋ฅผ ์ฝ๊ธฐ
offset = OffsetAndMetadata(record.offset + 1, record.timestamp) # offset ์์น +1ํ๊ณ , timestamp๋ ์
๋ฐ์ดํธ
consumer.commit({
topic_partition: offset # ํํฐ์
์์น์ offset ์ ๋ณด COMMIT
})
if __name__ == '__main__':
main()
์๋ก์ด ๋ ๊ฐ์ง ๋ชจ๋์ ์ถ๊ฐํ๊ณ , ๋ช ์ค์ ๋ ์ถ๊ฐ๋์๋ค.
๊ฐ๋ฐ์๊ฐ ์ง์ Commit์ด ๊ฐ๋ฅํ๋๋ก ์ฝ๋๋ฅผ ์์ฑํด์ผ ํ๋ค.
TopicPartition ๋ชจ๋๋ก partition ์ ๋ณด๋ฅผ ๋ด๊ณ , OffsetAndMetadata ๋ชจ๋๋ก offset ์ ๋ณด๋ฅผ ๋ด์ ๊ฐ๊ฐ Key์ Value๋ก Commit ํ๋ค.
๋์ผํ ๊ฒฐ๊ณผ๊ฐ ๋ํ๋๋ค๋ฉด ์ฑ๊ณต์ด๋ค.
์น UI์์ ์์ฑ๋ Consumer Group์ ํ์ธํ ์ ์๋ค.
autocommit_consumer.py๋ก ์์ฑ๋ fake_people_group์ ์๊น ๋์์ ๋ฉ์ท๊ธฐ ๋๋ฌธ์ State๊ฐ EMPTY๋ก ๋ํ๋๋ค.
manualcommit_consumer.py๋ก ์์ฑ๋ manual_fake_people_group์ ์ง๊ธ๋ ๋์์ ํ๊ณ ์์ด์,
State๊ฐ STABLE์ด๊ณ , Members๋ ํ๋ ์๋ ๊ฒ์ผ๋ก ๋ํ๋๊ณ ์๋ค.
manual_fake_people_group์ Partition ์ ๋ณด์๋ ์ด๋ค ๊ฒฐ๊ณผ๊ฐ ์์๋์ง ๋๋ต ๋ํ๋๋ค.
๐จ ksqlDB
REST API๋ ksql ํด๋ผ์ด์ธํธ ํด์ ์ฌ์ฉํ๋ฉด Topic์ ํ ์ด๋ธ์ฒ๋ผ SQL ์กฐ์์ด ๊ฐ๋ฅํด์ง๋ค.
CREATE STREAM ๋ช ๋ น์ด๋ก ์๋ก์ด Stream์ ํ๋ ์์ฑํด์, ํ ์ด๋ธ์ฒ๋ผ ๋ค๋ฃฐ ์ ์๋ค.
์ด ๋ฐฉ๋ฒ ๋ํ ํ๋์ Consumer๋ก ๋์ํ๋ ๊ฒ์ผ๋ก ์ทจ๊ธํด์, Consumer Group๊ณผ Consumer ID๋ฅผ ์๋์ผ๋ก ๊ฐ๊ฒ ๋๋ค.
ํ์ง๋ง Kafka Topic์ ๋ณ๊ฐ์ ksqlDB๋ฅผ ์ํ Topic์ด ์กด์ฌํ์ฌ ๊ทธ ๋ฐ์ ์ ์ฅ๋๋ฏ๋ก Consumer Group ์ชฝ์ ๋ํ๋์ง๋ ์๋๋ค.
docker ps # ksqldb-server ์ปจํ
์ด๋ ID ์ฐพ๊ธฐ
docker exec -it ContainerID sh # ksqldb-server ์์ ์ ์
ksql # ksql ์ ์
์ฌ๊ธฐ์ ์๊น ์์ฑํ๋ Topic์ ๋ฐ์ดํฐ๋ฅผ ํ์ธํด๋ณด์.
CREATE STREAM my_stream (id STRING, name STRING, title STRING) with (kafka_topic='fake_people', value_format='JSON');
SELECT * FROM my_stream;
fake_people Topic ์ ๋ณด๋ฅผ ํ ๋๋ก id, name, title ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์จ๋ค.
์ฌ๊ธฐ์ Producer๊ฐ ๋ฐ์ดํฐ๋ฅผ ์์ฑํ ๋์ ์๊ฐ ๋ฐ์ดํฐ๋ฅผ ์๋ก์ด ์ปฌ๋ผ์ผ๋ก ๋ถ์ฌ๋ณด์.
SELECT *, ROWTIME FROM my_stream;
ROWTIME์ ์ฌ์ฉํ๋ฉด ๊ทธ ์ญํ ์ ํด์ค๋ค.
์ฐธ๊ณ ๋ก ๋ค์ EMIT CHANGES ์ ์ถ๊ฐํ๋ฉด ์๊น ์ฝ์ด์จ ๋ฐ์ดํฐ ๋ค๋ก ์๋ก์ด ๋ฐ์ดํฐ๋ง ์ฝ์ด์จ๋ค.
'TIL' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
๋จธ์ ๋ฌ๋์ ์ํ ๊ธฐ์ด ์ ํ ๋์ ์์๋ณด๊ธฐ - TIL230718 (0) | 2023.07.20 |
---|---|
jupyter์์ ๋จธ์ ๋ฌ๋ End to End ์ค์ตํด๋ณด๊ธฐ - TIL230717 (0) | 2023.07.20 |
Kafka ์์๋ณด๊ณ ์ค์นํด๋ณด๊ธฐ - TIL230712 (0) | 2023.07.14 |
๋น ๋ฐ์ดํฐ์ ์ค์๊ฐ ์ฒ๋ฆฌ -TIL230710 (0) | 2023.07.11 |
Spark์์ Parquet ๋ค๋ฃจ๊ธฐ ์ค์ต + Execution Plan ์์๋ณด๊ธฐ - TIL230706 (0) | 2023.07.09 |