JUST DO IT!

Kafka Producer ๋ฐ Consumer ์‹ค์Šตํ•ด๋ณด๊ธฐ with Python (+ksqlDB) - TIL230713 ๋ณธ๋ฌธ

TIL

Kafka Producer ๋ฐ Consumer ์‹ค์Šตํ•ด๋ณด๊ธฐ with Python (+ksqlDB) - TIL230713

sunhokimDev 2023. 7. 14. 22:19

๐Ÿ“š KDT WEEK 15 DAY 4 TIL

  • Kafka ๊ธฐ๋ณธ ํ”„๋กœ๊ทธ๋ž˜๋ฐ
  • Topic ์ƒ์„ฑ ๋ฐ ํŒŒ๋ผ๋ฏธํ„ฐ ์„ค์ •
  • Consumer ์ƒ์„ฑ ๋ฐ ํŒŒ๋ผ๋ฏธํ„ฐ ์„ค์ •
  • ksqlDB

๐ŸŸฅ Kafka ๊ธฐ๋ณธ ํ”„๋กœ๊ทธ๋ž˜๋ฐ

 

๋‚˜์˜ Kafka ์„ค์น˜ ํ™˜๊ฒฝ์€ conduktor์‚ฌ๊ฐ€ ์ œ๊ณตํ•˜๋Š” Repo๋ฅผ git cloneํ•ด์„œ ๊ฐ€์ ธ์™”๋‹ค.

์ด ๊ธ€์€ ์ด ํ™˜๊ฒฝ์„ ๊ธฐ๋ฐ˜์œผ๋กœ ์„ค๋ช…ํ•œ๋‹ค.

์ฐธ๊ณ  : https://sunhokimdev.tistory.com/66

 

Kafka ์•Œ์•„๋ณด๊ณ  ์„ค์น˜ํ•ด๋ณด๊ธฐ - TIL230712

๐Ÿ“š KDT WEEK 15 DAY 3 TIL Kafka Kafka ์•„ํ‚คํ…์ฒ˜ Kafka ๊ธฐํƒ€ ๊ธฐ๋Šฅ Kafka ์„ค์น˜ ๋ฐ ๊ฐ„๋‹จ ์‹ค์Šต ๐ŸŸฅ Kafka ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ์„ค๊ณ„๋œ ์˜คํ”ˆ์†Œ์Šค ๋ถ„์‚ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ํ”Œ๋žซํผ ๋ถ„์‚ฐ ์•„ํ‚คํ…์ฒ˜๋ฅผ ๋”ฐ๋ฅด๊ธฐ ๋•Œ๋ฌธ์— Sca

sunhokimdev.tistory.com

 

ํ„ฐ๋ฏธ๋„์—์„œ ๊ฐ„๋‹จํ•˜๊ฒŒ Kafka๋ฅผ ์‚ฌ์šฉํ•ด๋ณด๋Š” ์‹ค์Šต์„ ํ•ด๋ณด์ž.

 

1. Topics ๋ฆฌ์ŠคํŠธํ•˜๊ธฐ

 

docker ps # Broker์— ํ•ด๋‹นํ•˜๋Š” ์ปจํ…Œ์ด๋„ˆ์˜ ID ํ™•์ธ
docker exec -it Broker_ContainerID sh # ์‰˜๋กœ ์‹คํ–‰

 

docker-compose.yml์— ์ž‘์„ฑ๋œ Broker์˜ ์ด๋ฆ„์„ ์ฐพ์•„ ๊ทธ ์ปจํ…Œ์ด๋„ˆ ID๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค.

๋‚˜์™€ ๊ฐ™์€ ํ™˜๊ฒฝ์œผ๋กœ Kafka๋ฅผ ๊ตฌ์ถ•ํ–ˆ๋‹ค๋ฉด, kafka1์˜ ์ปจํ…Œ์ด๋„ˆ ID๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค.

 

Broker ์ปจํ…Œ์ด๋„ˆ ์‰˜์— ์ ‘์†ํ–ˆ๋‹ค.

 

์‰˜์—์„œ ๋‹ค์Œ์˜ ๋ช…๋ น์–ด๋ฅผ ์ž…๋ ฅํ•˜๋ฉด ํ˜„์žฌ ๊ฐ€์ง€๊ณ ์žˆ๋Š” Topic์„ ๋ฆฌ์ŠคํŠธํ•  ์ˆ˜ ์žˆ๋‹ค.

kafka-topics --bootstrap-server kafka1:9092 --list

 

kafka-topics์œผ๋กœ Topic์— ๊ด€๋ จ๋œ ๋ช…๋ น๋“ค์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

--bootstrap-server๋Š” ์‚ฌ์šฉํ•  Broker๋ฅผ ์ง€์ •ํ•˜๋Š” ๋ช…๋ น์–ด์ธ๋ฐ, ๋‚ด ํ™˜๊ฒฝ์—์„œ๋Š” Broker๊ฐ€ ๋„์ปค ์ปจํ…Œ์ด๋„ˆ์— ๋“ค์–ด๊ฐ€์žˆ๋‹ค.

๋”ฐ๋ผ์„œ Broker ์ปจํ…Œ์ด๋„ˆ์˜ ํ˜ธ์ŠคํŠธ ์ด๋ฆ„๊ณผ Broker๊ฐ€ ๋Œ๊ณ ์žˆ๋Š” ํฌํŠธ๋ฒˆํ˜ธ๋ฅผ ๋„ฃ์–ด ์‚ฌ์šฉํ•œ๋‹ค.

 

์–ด๋–ค topic์ด ์žˆ๋Š”์ง€ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

 

docker-connect-*๋Š” Kafka Connect๊ฐ€ ์‚ฌ์šฉํ•˜๋Š” ์ผ์ข…์˜ DB๊ฐ™์€ Topic๋“ค์ด๋‹ค.

__consumer_offsets์€ ์–ด๋–ค consumer๊ฐ€ ์–ด๋–ค topic์˜ ์–ด๋–ค ๋ฐ์ดํ„ฐ๊นŒ์ง€ ์ฝ์—ˆ๋Š”์ง€ ๊ธฐ๋ก๋˜๋Š” topic์ด๋‹ค.

 

์—ฌ๊ธฐ์„œ ๋ฆฌ์ŠคํŠธ๋œ ์ด๋ฆ„์„ ํ™•์ธํ•ด์„œ, ์ง€์šฐ๊ณ  ์‹ถ์€ ํ† ํ”ฝ์€ ๋‹ค์Œ ๋ช…๋ น์–ด๋กœ ์ง€์šธ ์ˆ˜ ์žˆ๋‹ค.

 

kafka-topics --bootstrap-server kafka1:9092 --delete --topic ํ† ํ”ฝ์ด๋ฆ„

 

2. Producer์™€ Consumer ์‚ฌ์šฉํ•ด๋ณด๊ธฐ

 

์‰˜์—์„œ ๊ฐ๊ฐ Producer์™€ Consumer๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•์€ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ž…๋ ฅํ•˜๋ฉด ๋œ๋‹ค.

 

kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console # Producer
kafka-console-consumer --bootstrap-server kafka1:9092 --topic test_console # Consumer

 

test_console์€ ์‚ฌ์šฉํ•˜๊ณ ์ž ํ•˜๋Š” topic์˜ ์ด๋ฆ„์ด ๋œ๋‹ค.

ํ„ฐ๋ฏธ๋„ ๋‘ ๊ฐœ๋ฅผ ๋™์‹œ์— ์—ด์–ด์„œ ๊ฐ๊ฐ ์‰˜์— ์ง„์ž…ํ•œ ํ›„, Producer์™€ Consumer๋ฅผ ์œ„ ๋ช…๋ น์–ด๋กœ ํ•˜๋‚˜์”ฉ ์‚ฌ์šฉํ•ด๋ณด์ž.

๊ทธ๋ฆฌ๊ณ  Producer์—์„œ ๊ฐ’์„ ์ž…๋ ฅํ•  ๋•Œ๋งˆ๋‹ค ์‹ค์‹œ๊ฐ„์œผ๋กœ Consumer์— ๋ฐ˜์˜๋˜๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค!

 

Producer

 

Consumer

 

์›น UI์—์„œ๋„ Topic์— ๋“ค์–ด๊ฐ„ ๋ฐ์ดํ„ฐ๊ฐ€ ๋ณด์ธ๋‹ค.

 


 

๐ŸŸฆ Topic ์ƒ์„ฑ ๋ฐ Topic ํŒŒ๋ผ๋ฏธํ„ฐ ์„ค์ •

์ด์ œ๊นŒ์ง€๋Š” ์ž๋™์œผ๋กœ Topic์ด ์ƒ์„ฑ๋˜๋„๋กํ•˜์—ฌ ๋”ฐ๋กœ ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์„ค์ •ํ•˜์ง€ ์•Š์•˜๋‹ค.

Topic์˜ ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์„ค์ •ํ•จ์œผ๋กœ์จ ํŒŒํ‹ฐ์…˜์˜ ์ˆ˜์™€ ๋ณต์ œ๋ณธ์˜ ์ˆ˜๋„ ์กฐ์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

 

Producer ํŒŒ์ด์ฌ ํŒŒ์ผ์•ˆ์—์„œ ์ƒˆ๋กœ์šด Topic์„ ์ƒ์„ฑํ•˜๊ณ  ๋ฐ์ดํ„ฐ๋ฅผ ๋„ฃ๋Š” ์‹ค์Šต์„ ํ•ด๋ณด์•˜๋‹ค.

 

๋จผ์ € producer๋กœ ์‚ฌ์šฉํ•  ํŒŒ์ด์ฌ ํŒŒ์ผ์„ ํ•˜๋‚˜ ๋งŒ๋“ค์—ˆ๋‹ค.

 

fake_person_producer.py

import uuid
import json
from typing import List
from person import Person # ๋ชจ๋ธ์ด ์žˆ๋Š” person.py์—์„œ Person ํด๋ž˜์Šค ์ฐธ์กฐ

from faker import Faker # ๊ฐ€์งœ ์‚ฌ๋žŒ ๋ฐ์ดํ„ฐ๋ฅผ ๋งŒ๋“œ๋Š”๋ฐ ์‚ฌ์šฉ
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError # Topic์ด ์ด๋ฏธ ์กด์žฌํ•  ๊ฒฝ์šฐ ์—๋Ÿฌ
from kafka import KafkaAdminClient
from kafka.producer import KafkaProducer

# ์ƒˆ๋กœ์šด Topic์„ ์ƒ์„ฑํ•˜๋Š” ํ•จ์ˆ˜
# ์ธ์ž๋Š” ์ˆœ์„œ๋Œ€๋กœ Broker ๋ฆฌ์ŠคํŠธ, Topic ์ด๋ฆ„, ํŒŒํ‹ฐ์…˜ ์ˆ˜, ๋ณต์ œ๋ณธ ์ˆ˜
def create_topic(bootstrap_servers, name, partitions, replica=1):
    client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    try:
        topic = NewTopic(
            name=name,
            num_partitions=partitions,
            replication_factor=replica)
        client.create_topics([topic])
    except TopicAlreadyExistsError as e: # Topic์ด ์ด๋ฏธ ์กด์žฌํ•˜๋Š” ๊ฒฝ์šฐ ์—๋Ÿฌ ๋ฐœ์ƒํ•˜์ง€๋งŒ, ๊ทธ๋ƒฅ passํ•˜๋„๋ก ๊ตฌํ˜„
        print(e)
        pass
    finally:
        client.close()


def main():
    topic_name = "fake_people"
    bootstrap_servers = ["localhost:9092"]

    # fake_people ์ด๋ฆ„์œผ๋กœ 4๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์„ ๊ฐ–๋Š” ์ƒˆ๋กœ์šด Topic ์ƒ์„ฑ
    create_topic(bootstrap_servers, topic_name, 4)

    faker = Faker()
    producer = KafkaProducer(
        bootstrap_servers=bootstrap_servers,
        client_id="Fake_Person_Producer", # Producer์— ์ด๋ฆ„ ์ฃผ๊ธฐ
        
    )

    # faker ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•ด์„œ ๋žœ๋คํ•œ ์‚ฌ๋žŒ 100๋ช… ๋ฐ์ดํ„ฐ Topic์— ๋ณด๋‚ด๊ธฐ
    for _ in range(100):
        person = Person(id=str(uuid.uuid4()), name=faker.name(), title=faker.job().title())

        producer.send(
            topic=topic_name,
            key=person.title.lower().replace(r's+', '-').encode('utf-8'), # ์†Œ๋ฌธ์ž๋กœ ๋ฐ”๊พธ๊ณ , ๊ณต๋ฐฑ์ด ์—ฌ๋Ÿฌ ๊ฐœ์ธ ๊ฒฝ์šฐ '-'๋กœ ๋ฐ”๊ฟˆ
            value=person.json().encode('utf-8')) # json์œผ๋กœ ๋ฐ”๊พธ๊ณ  utf-8๋กœ ์ธ์ฝ”๋”ฉ

    producer.flush()

if __name__ == '__main__':
    main()

 

faker๋กœ ์ƒ์„ฑํ•œ title(์ง์—…)์„ ๊ธฐ์ค€์œผ๋กœ Key๋กœ ๋งŒ๋“ค์—ˆ๋‹ค.

๋‚˜์ค‘์— Key๊ฐ€ ๋™์ผํ•œ ๋ฐ์ดํ„ฐ๋ผ๋ฆฌ ๋ฌถ์—ฌ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋‚˜๋ˆ„๊ณ , ์ด์™€ ๊ด€๋ จ๋œ ํšจ์œจ์ ์ธ ์ž‘์—…์ด ๊ฐ€๋Šฅํ•  ์ˆ˜๋„ ์žˆ์„ ๊ฒƒ์ด๋‹ค.

 

person.py

from pydantic import BaseModel

class Person(BaseModel):
    id: str
    name: str
    title: str

 

์ด ๋‘๊ฐœ์˜ ํŒŒ์ผ์„ Kafka๊ฐ€ ์„ค์น˜๋œ ํด๋”์— ๋„ฃ๊ณ  ์‹คํ–‰ํ•ด๋ณด์ž.

 

python fake_persone_producer.py

 

์‹คํ–‰ ๊ฒฐ๊ณผ๊ฐ€ ์›น UI์— ๋ฐ˜์˜๋œ ๋ชจ์Šต

 

์‹คํ–‰์ด ์™„๋ฃŒ๋œ ํ›„, ์›น UI์—์„œ fake_people Topic์„ ํ™•์ธํ•ด๋ณด๋ฉด ๋“ค์–ด๊ฐ„ ๋ฐ์ดํ„ฐ๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

Key, Value ๋ชจ๋‘ ์„ค์ •ํ•œ ๊ฐ’๋Œ€๋กœ ๋“ค์–ด๊ฐ”๋‹ค.

 


  

๐ŸŸฉ Consumer ์ƒ์„ฑ ๋ฐ ํŒŒ๋ผ๋ฏธํ„ฐ ์„ค์ •

 

Consumer์˜ ์ˆ˜๋ณด๋‹ค Partition์˜ ์ˆ˜๊ฐ€ ๋” ๋งŽ์€ ๊ฒฝ์šฐ, Partition์€ ๋ผ์šด๋“œ ๋กœ๋นˆ ๋ฐฉ์‹์œผ๋กœ Consumer๋“ค์—๊ฒŒ ํ• ๋‹น๋œ๋‹ค.

์ด๋•Œ, ๊ธฐ๋ณธ์ ์œผ๋กœ ํ•œ Partition์€ ํ•œ Consumer์—๊ฒŒ๋งŒ ํ• ๋‹น๋œ๋‹ค.

 

๋จผ์ €, ๋ฉ”์‹œ์ง€๋ฅผ ์ฝ๋Š” ๋ณด์žฅ ๋ฐฉ์‹์—๋Š” Message Processing Guarantee์—๋Š” ์„ธ ๊ฐ€์ง€๊ฐ€ ์žˆ๋‹ค.

 

  • Exactly Once : ๊ฐ Message๊ฐ€ Consumer์—๊ฒŒ ์ •ํ™•ํžˆ ํ•œ ๋ฒˆ๋งŒ ์ „๋‹ฌ๋œ๋‹ค๋Š” ๊ฒƒ์„ ๋ณด์žฅํ•œ๋‹ค. > ๊ตฌํ˜„ ์–ด๋ ค์›€
  • At Least Once : Consumer์—๊ฒŒ ์ ์–ด๋„ ํ•œ ๋ฒˆ ์ด์ƒ ์ „๋‹ฌ๋˜๋„๋ก ๋ณด์žฅํ•˜์ง€๋งŒ, ์ค‘๋ณต๋  ์ˆ˜๋„ ์žˆ๋‹ค.
  • At Most Once(Default) : ๋ฉ”์‹œ์ง€๊ฐ€ ์ „๋‹ฌ๋  ์ˆ˜๋„ ์žˆ๊ณ , ์•„๋‹ ์ˆ˜๋„ ์žˆ์ง€๋งŒ ์ค‘๋ณต ์ „๋‹ฌ๋˜๋Š” ๊ฒฝ์šฐ๋Š” ์—†๋‹ค. 

 

Message Processing Guarantee

 

์ด ๋ฐฉ์‹๋“ค ์ค‘์—์„œ ๋‚˜์˜ ํ™˜๊ฒฝ์ด๋‚˜ ๋ฐ์ดํ„ฐ๋“ค์„ ๊ณ ๋ คํ•˜์—ฌ ํ•œ ๊ฐ€์ง€๋ฅผ ์„ ํƒํ•ด์„œ ๊ตฌํ˜„ํ•˜๊ฒŒ ๋œ๋‹ค.

 

Consumer๋Š” enable_auto_commit ์˜ต์…˜์ด True์ธ ๊ฒฝ์šฐ์™€ False ๊ฒฝ์šฐ๋กœ ๋‚˜๋ˆ„์–ด์„œ ๋งŒ๋“ค์—ˆ๋‹ค.

 

> enable_auto_commit = True์ธ Consumer์˜ ๊ฒฝ์šฐ

autocommit_consumer.py

import json
from kafka.consumer import KafkaConsumer


def key_deserializer(key):
    return key.decode('utf-8')


# Producer์˜ Serializer์™€ ๋ฐ˜๋Œ€๋กœ ๋™์ž‘
def value_deserializer(value):
    return json.loads(value.decode('utf-8'))


def main():
    topic_name = "fake_people"
    bootstrap_servers = ["localhost:9092"]
    consumer_group_id = "fake_people_group"

    consumer = KafkaConsumer(
        bootstrap_servers=bootstrap_servers,
        group_id=consumer_group_id,
        key_deserializer=key_deserializer,
        value_deserializer=value_deserializer,
        auto_offset_reset='earliest', # ์ฒ˜์Œ ์‹คํ–‰๋  ๋•Œ ๊ฐ€์žฅ ์•ž์— ์žˆ๋˜ ๊ธฐ๋ก๋ถ€ํ„ฐ ์ฝ์–ด์˜ด
        enable_auto_commit=True
    )

    consumer.subscribe([topic_name])
    for record in consumer:
        print(f"""
            Consumed person {record.value} with key '{record.key}'
            from partition {record.partition} at offset {record.offset}
        """)


if __name__ == '__main__':
    main()

 

subscribeํ•œ Topic๋“ค๋กœ๋ถ€ํ„ฐ Producer์˜ ์ง๋ ฌํ™”ํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์•„ ์—ญ์ง๋ ฌํ™”ํ•˜๊ณ , printํ•œ๋‹ค.

auto_commit ์˜ต์…˜์ด True์ด๊ธฐ ๋•Œ๋ฌธ์—, offset๊ณผ Partition ์ •๋ณด๋“ค์„ ์•Œ์•„์„œ Commitํ•œ๋‹ค.

 

์•Œ์•„์„œ offset ์ •๋ณด๋ฅผ ์—…๋ฐ์ดํŠธํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋งค๋ฒˆ offset์ด ๋‹ค๋ฅด๊ฒŒ ๋‚˜์˜จ๋‹ค.

 

> enable_auto_commit = False์ธ Consumer์˜ ๊ฒฝ์šฐ

manualcommit_consumer.py

import json

from kafka import TopicPartition, OffsetAndMetadata # ๋‘ ๊ฐœ์˜ ์ƒˆ๋กœ์šด ๋ชจ๋“ˆ ํ•„์š”
from kafka.consumer import KafkaConsumer


def key_deserializer(key):
    return key.decode('utf-8')


def value_deserializer(value):
    return json.loads(value.decode('utf-8'))


def main():
    topic_name = "fake_people"
    bootstrap_servers = ["localhost:9092"]
    consumer_group_id = "manual_fake_people_group" # ์•„๊นŒ์™€๋Š” ๋‹ค๋ฅธ ๊ทธ๋ฃน

    consumer = KafkaConsumer(
        bootstrap_servers=bootstrap_servers,
        group_id=consumer_group_id,
        key_deserializer=key_deserializer,
        value_deserializer=value_deserializer,
        auto_offset_reset='earliest',
        enable_auto_commit=False # autocommit์ด False
    )

    consumer.subscribe([topic_name])
    for record in consumer:
        print(f"""
            Consumed person {record.value} with key '{record.key}'
            from partition {record.partition} at offset {record.offset}
        """)

        topic_partition = TopicPartition(record.topic, record.partition) # Topic์˜ ํŒŒํ‹ฐ์…˜ ์ •๋ณด๋ฅผ ์ฝ๊ธฐ
        offset = OffsetAndMetadata(record.offset + 1, record.timestamp) # offset ์œ„์น˜ +1ํ•˜๊ณ , timestamp๋„ ์—…๋ฐ์ดํŠธ
        consumer.commit({
            topic_partition: offset # ํŒŒํ‹ฐ์…˜ ์œ„์น˜์™€ offset ์ •๋ณด COMMIT
        })

if __name__ == '__main__':
    main()

 

์ƒˆ๋กœ์šด ๋‘ ๊ฐ€์ง€ ๋ชจ๋“ˆ์„ ์ถ”๊ฐ€ํ•˜๊ณ , ๋ช‡ ์ค„์ •๋„ ์ถ”๊ฐ€๋˜์—ˆ๋‹ค.

๊ฐœ๋ฐœ์ž๊ฐ€ ์ง์ ‘ Commit์ด ๊ฐ€๋Šฅํ•˜๋„๋ก ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•ด์•ผ ํ•œ๋‹ค.

 

TopicPartition ๋ชจ๋“ˆ๋กœ partition ์ •๋ณด๋ฅผ ๋‹ด๊ณ , OffsetAndMetadata ๋ชจ๋“ˆ๋กœ offset ์ •๋ณด๋ฅผ ๋‹ด์•„ ๊ฐ๊ฐ Key์™€ Value๋กœ Commit ํ–ˆ๋‹ค.

 

์ง์ ‘ Commit์„ ๋„ฃ์–ด์ฃผ์—ˆ๋”๋‹ˆ, ์•„๊นŒ์™€ ๋™์ผํ•œ ๊ฒฐ๊ณผ๊ฐ€ ๋‚˜ํƒ€๋‚ฌ๋‹ค.

 

๋™์ผํ•œ ๊ฒฐ๊ณผ๊ฐ€ ๋‚˜ํƒ€๋‚œ๋‹ค๋ฉด ์„ฑ๊ณต์ด๋‹ค.

 

์›น UI์—์„œ ์ƒ์„ฑ๋œ Consumer Group์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

 

Consumer Groups

 

autocommit_consumer.py๋กœ ์ƒ์„ฑ๋œ fake_people_group์€ ์•„๊นŒ ๋™์ž‘์„ ๋ฉˆ์ท„๊ธฐ ๋•Œ๋ฌธ์— State๊ฐ€ EMPTY๋กœ ๋‚˜ํƒ€๋‚œ๋‹ค.

manualcommit_consumer.py๋กœ ์ƒ์„ฑ๋œ manual_fake_people_group์€ ์ง€๊ธˆ๋„ ๋™์ž‘์„ ํ•˜๊ณ  ์žˆ์–ด์„œ,

State๊ฐ€ STABLE์ด๊ณ , Members๋„ ํ•˜๋‚˜ ์žˆ๋Š” ๊ฒƒ์œผ๋กœ ๋‚˜ํƒ€๋‚˜๊ณ  ์žˆ๋‹ค.

 

 

manual_fake_people_group์˜ partiton ์ •๋ณด

 

manual_fake_people_group์˜ Partition ์ •๋ณด์—๋„ ์–ด๋–ค ๊ฒฐ๊ณผ๊ฐ€ ์žˆ์—ˆ๋Š”์ง€ ๋Œ€๋žต ๋‚˜ํƒ€๋‚œ๋‹ค.

 


 

๐ŸŸจ ksqlDB

REST API๋‚˜ ksql ํด๋ผ์ด์–ธํŠธ ํˆด์„ ์‚ฌ์šฉํ•˜๋ฉด Topic์„ ํ…Œ์ด๋ธ”์ฒ˜๋Ÿผ SQL ์กฐ์ž‘์ด ๊ฐ€๋Šฅํ•ด์ง„๋‹ค.

CREATE STREAM ๋ช…๋ น์–ด๋กœ ์ƒˆ๋กœ์šด Stream์„ ํ•˜๋‚˜ ์ƒ์„ฑํ•ด์„œ, ํ…Œ์ด๋ธ”์ฒ˜๋Ÿผ ๋‹ค๋ฃฐ ์ˆ˜ ์žˆ๋‹ค.

์ด ๋ฐฉ๋ฒ• ๋˜ํ•œ ํ•˜๋‚˜์˜ Consumer๋กœ ๋™์ž‘ํ•˜๋Š” ๊ฒƒ์œผ๋กœ ์ทจ๊ธ‰ํ•ด์„œ, Consumer Group๊ณผ Consumer ID๋ฅผ ์ž๋™์œผ๋กœ ๊ฐ–๊ฒŒ ๋œ๋‹ค.

ํ•˜์ง€๋งŒ Kafka Topic์— ๋ณ„๊ฐœ์˜ ksqlDB๋ฅผ ์œ„ํ•œ Topic์ด ์กด์žฌํ•˜์—ฌ ๊ทธ ๋ฐ‘์— ์ €์žฅ๋˜๋ฏ€๋กœ Consumer Group ์ชฝ์— ๋‚˜ํƒ€๋‚˜์ง€๋Š” ์•Š๋Š”๋‹ค.

 

docker ps # ksqldb-server ์ปจํ…Œ์ด๋„ˆ ID ์ฐพ๊ธฐ
docker exec -it ContainerID sh # ksqldb-server ์‰˜์— ์ ‘์†
ksql # ksql ์ ‘์†

 

ksql์— ์ ‘์†๋œ ๋ชจ์Šต

 

์—ฌ๊ธฐ์„œ ์•„๊นŒ ์ƒ์„ฑํ–ˆ๋˜ Topic์˜ ๋ฐ์ดํ„ฐ๋ฅผ ํ™•์ธํ•ด๋ณด์ž.

 

CREATE STREAM my_stream (id STRING, name STRING, title STRING) with (kafka_topic='fake_people', value_format='JSON');
SELECT * FROM my_stream;

 

fake_people Topic ์ •๋ณด๋ฅผ ํ† ๋Œ€๋กœ id, name, title ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜จ๋‹ค.

 

SELECT ์ฟผ๋ฆฌ๋ฌธ์˜ ๊ฒฐ๊ณผ

 

์—ฌ๊ธฐ์„œ Producer๊ฐ€ ๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์„ฑํ•  ๋•Œ์˜ ์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ๋ฅผ ์ƒˆ๋กœ์šด ์ปฌ๋Ÿผ์œผ๋กœ ๋ถ™์—ฌ๋ณด์ž.

 

SELECT *, ROWTIME FROM my_stream;

 

ROWTIME์„ ์‚ฌ์šฉํ•˜๋ฉด ๊ทธ ์—ญํ• ์„ ํ•ด์ค€๋‹ค.

 

์ฐธ๊ณ ๋กœ ๋’ค์— EMIT CHANGES ์„ ์ถ”๊ฐ€ํ•˜๋ฉด ์•„๊นŒ ์ฝ์–ด์˜จ ๋ฐ์ดํ„ฐ ๋’ค๋กœ ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๋งŒ ์ฝ์–ด์˜จ๋‹ค.

 

LIMIT๋„ ๋˜‘๊ฐ™์ด ๋™์ž‘ํ•œ๋‹ค.