JUST DO IT!

Kafka ์•Œ์•„๋ณด๊ณ  ์„ค์น˜ํ•ด๋ณด๊ธฐ - TIL230712 ๋ณธ๋ฌธ

TIL

Kafka ์•Œ์•„๋ณด๊ณ  ์„ค์น˜ํ•ด๋ณด๊ธฐ - TIL230712

sunhokimDev 2023. 7. 14. 20:57

๐Ÿ“š KDT WEEK 15 DAY 3 TIL

  • Kafka
  • Kafka ์•„ํ‚คํ…์ฒ˜
  • Kafka ๊ธฐํƒ€ ๊ธฐ๋Šฅ
  • Kafka ์„ค์น˜ ๋ฐ ๊ฐ„๋‹จ ์‹ค์Šต

 

๐ŸŸฅ Kafka

 

 

์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ์„ค๊ณ„๋œ ์˜คํ”ˆ์†Œ์Šค ๋ถ„์‚ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ํ”Œ๋žซํผ

  • ๋ถ„์‚ฐ ์•„ํ‚คํ…์ฒ˜๋ฅผ ๋”ฐ๋ฅด๊ธฐ ๋•Œ๋ฌธ์— Scale Out ํ˜•ํƒœ๋กœ ์Šค์ผ€์ผ ๊ฐ€๋Šฅ
  • Scalability์™€ Fault Tolerance๋ฅผ ์ œ๊ณตํ•˜๋Š” Publish-Subscription(Producer-Consumer) ๋ฉ”์‹œ์ง• ์‹œ์Šคํ…œ
    • ์ค‘๊ฐ„์— ๋ฒ„ํผ(ํ)๋ฅผ ๋‘๊ณ , Producer์™€ Consumer๊ฐ€ ๋…๋ฆฝ์ ์œผ๋กœ ๋Œ์•„๊ฐ€๊ฒŒ ๋งŒ๋“  ์‹œ์Šคํ…œ
  • High Throughput, Low Latency๋กœ ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ์— ๋งž๊ฒŒ ๊ตฌํ˜„
  • ๋ฉ”์‹œ์ง€๋ฅผ ์ •ํ•ด์ง„ ๋ณด์œ ๊ธฐํ•œ(Retention period)๋™์•ˆ ์ €์žฅํ•˜๊ฒŒ ๋œ๋‹ค

 

ํŠน์ด์ 

  • ๋ฉ”์‹œ์ง€ ์ƒ์‚ฐ๊ณผ ์†Œ๋น„๊ฐ€ ๋ถ„๋ฆฌ๋จ์œผ๋กœ์จ BackPressure(๋ฐ์ดํ„ฐ ๊ณผ์ƒ์‚ฐ) ๋ฐฉ์ง€
  • ksqlDB๋ฅผ ํ†ตํ•ด SQL๋กœ๋„ ์‹ค์‹œ๊ฐ„ ์ด๋ฒคํŠธ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ
  • ๋ฐ์ดํ„ฐ ๋ณต์ œ ๋ฐ ๋ถ„์‚ฐ ์ปค๋ฐ‹ ๋กœ๊ทธ ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•˜์—ฌ ์žฅ์•  ๋Œ€์‘์ด ์šฉ์ด
  • ๋ธŒ๋กœ์ปค(์„œ๋ฒ„)๋ฅผ ์ถ”๊ฐ€ํ•˜์—ฌ ์‰ฝ๊ฒŒ ์ˆ˜ํ‰ ํ™•์žฅ ๊ฐ€๋Šฅ
  • ์ปค๋„ฅํ„ฐ์™€ ํ†ตํ•ฉ ๋„๊ตฌ๋กœ ๊ตฌ์„ฑ๋œ ํ’๋ถ€ํ•œ ์—์ฝ”์‹œ์Šคํ…œ์„ ๊ฐ–์ถ”๊ณ  ์žˆ์–ด ๋‹ค๋ฅธ ๋ฐ์ดํ„ฐ ์‹œ์Šคํ…œ ๋ฐ ํ”„๋ ˆ์ž„์›Œํฌ์— ์‰ฝ๊ฒŒ ์—ฐ๋™ ๊ฐ€๋Šฅ
    • ex) Kafka Connect, Kafka Schema Registry ๋“ฑ

 

! ๋ถ„์‚ฐ ์‹œ์Šคํ…œ

๋ถ„์‚ฐ ์‹œ์Šคํ…œ์—์„œ๋Š” Fault Tolerance ๋“ฑ์„ ์œ„ํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค์ˆ˜์˜ ์„œ๋ฒ„์— ๋ถ„์‚ฐํ•ด์„œ ์ €์žฅํ•จ

100๋Œ€ ์„œ๋ฒ„๋กœ ๊ตฌ์„ฑ๋œ ๋ถ„์‚ฐ ์‹œ์Šคํ…œ์— ๋ ˆ์ฝ”๋“œ๋ฅผ ํ•˜๋‚˜ ์“ด๋‹ค๋ฉด ๊ทธ ๋ ˆ์ฝ”๋“œ๋ฅผ ๋ฐ”๋กœ ์ฝ์„ ์ˆ˜ ์žˆ๋Š” ๊ฐ€? > Consistency์— ๋”ฐ๋ผ ๋‹ค๋ฆ„

 

  • Strong Consistency : ๋ฐ์ดํ„ฐ๋ฅผ ์“ธ ๋•Œ ๋ชจ๋“  ์„œ๋ฒ„์— ๋ณต์ œ๊ฐ€ ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆฐ ๋’ค, ๋ ˆ์ฝ”๋“œ๋ฅผ ์ฝ์„ ์ˆ˜ ์žˆ๊ฒŒ ๋จ
  • Eventual Consistency : ๋ฐ์ดํ„ฐ๋ฅผ ์ผ๋ถ€ ์„œ๋ฒ„์—๋งŒ ๋ณต์ œํ•˜๊ณ , ๋ฐ”๋กœ ๋ ˆ์ฝ”๋“œ๋ฅผ ์ฝ์„ ์ˆ˜ ์žˆ๋„๋ก ํ•จ

 


 

๐ŸŸฆ Kafka ์•„ํ‚คํ…์ฒ˜

 

1. Topic

Kafka์—์„œ๋Š” Topic์ด๋ผ๋Š” ๋ฐ์ดํ„ฐ ์ด๋ฒคํŠธ ์ŠคํŠธ๋ฆผ์„ ๊ฐ€์ง€๊ณ  ์žˆ๋‹ค. 

Topic์•ˆ์— ๋ฐ์ดํ„ฐ๊ฐ€ ์ƒ์„ฑ๋œ ์ˆœ์„œ๋Œ€๋กœ ์Œ“์—ฌ ์ €์žฅ๋œ๋‹ค.

 

Producer(๋ฐ์ดํ„ฐ ์ƒ์‚ฐ์ž)๊ฐ€ Topic์„ ๋งŒ๋“ค๊ณ  Consumer(๋ฐ์ดํ„ฐ ์†Œ๋น„์ž)๊ฐ€ Topic์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ๋Š” ๊ตฌ์กฐ์ด๋‹ค.

์ด๋•Œ Consumer๋Š” ๋‹ค์ˆ˜๋กœ ์กด์žฌํ•˜์—ฌ Topic์„ ์ฝ์–ด๋“ค์ด๋Š” ๊ฒƒ์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

 

++) ์—ฌ๊ธฐ์„œ ์ค‘์š”ํ•œ ์ ์€ Consumer๊ฐ€ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ๋Š”๋‹ค๊ณ ํ•ด์„œ ๋ฐ์ดํ„ฐ๊ฐ€ ์‚ฌ๋ผ์ง€์ง€๋Š” ์•Š๋Š”๋‹ค๋Š” ๊ฒƒ์ด๋‹ค.

Topic์€ ์•ž์„œ Kafka ํŠน์ง•์œผ๋กœ ์†Œ๊ฐœํ–ˆ๋˜ Retention period๊ฐ€ ์กด์žฌํ•˜์—ฌ, ์ด ๊ธฐ๊ฐ„๋™์•ˆ ๋ฐ์ดํ„ฐ๊ฐ€ ์กด์žฌํ•˜๊ฒŒ ๋œ๋‹ค.

๋”ฐ๋ผ์„œ Consumer๋ณ„๋กœ ์–ด๋Š ์œ„์น˜์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ๊ณ  ์žˆ๋Š”์ง€ ์œ„์น˜ ์ •๋ณด(Offset)๋ฅผ ์œ ์ง€ํ•˜๊ฒŒ ๋œ๋‹ค.

 

์ด Topic์— ์Œ“์ด๋Š” ๋ฐ์ดํ„ฐ๋ฅผ Message(Event)๋ผ๊ณ  ํ•œ๋‹ค.

  • Event๋Š” ์ตœ๋Œ€ 1Mb์˜ ํฌ๊ธฐ์ด๋ฉฐ Header, Key, Value, Timestamp๋ฅผ ํฌํ•จํ•œ ๊ตฌ์กฐ๋ฅผ ๊ฐ€์ง„๋‹ค.
  • Header์—๋Š” ๊ฒฝ๋Ÿ‰ ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ ์ •๋ณด๊ฐ€ ๋‹ด๊ธฐ๊ณ 
  • Key์—๋Š” Topic ๋ฐ์ดํ„ฐ๋ฅผ Partitioningํ•  ๋•Œ ํ•„์š”ํ•œ ๋ฐ์ดํ„ฐ๊ฐ€ ํฌํ•จ๋œ๋‹ค.

 

1-1. Partitioning

 

ํ•˜๋‚˜์˜ Topic์€ ํ™•์žฅ์„ฑ์„ ์œ„ํ•ด ๋‹ค์ˆ˜์˜ Partition์œผ๋กœ ๋‚˜๋‰˜์–ด ์ €์žฅ๋œ๋‹ค.

๋ณดํ†ต์€ Key๊ฐ€ ์žˆ๋‹ค๋ฉด ํ‚ค์˜ Hashing ๊ฐ’์„ Partition์˜ ์ˆ˜๋กœ ๋‚˜๋ˆˆ ๋‚˜๋จธ์ง€๋กœ ๊ฒฐ์ •๋œ๋‹ค.

 

 

Topic์˜ Partition

 

ํ•˜๋‚˜์˜ Partition์€ Fail-over๋ฅผ ์œ„ํ•ด Replication Partiton(๋ณต์ œ๋ณธ)์„ ๊ฐ–๋Š”๋‹ค.

์ด๋•Œ ๊ฐ Partition๋ณ„๋กœ Leader์™€ Follower๋กœ ๊ตฌ๋ถ„๋˜์–ด Leader๋ฅผ ํ†ตํ•ด ์“ฐ๊ธฐ๊ฐ€ ์ด๋ค„์ง€๊ณ , Leader/Follower๋ฅผ ํ†ตํ•ด ์ฝ๊ฒŒ ๋œ๋‹ค.

Partition์ด ๋ณต์ œ๋ณธ์„ ๊ฐ–๋Š” ์ด์œ ๋Š” ๋ฐ์ดํ„ฐ ์œ ์‹ค์„ ๋ง‰๊ธฐ ์œ„ํ•œ ๊ฒƒ ๋ฟ๋งŒ์•„๋‹ˆ๋ผ ๋‹ค์ˆ˜์˜ Consumer๊ฐ€ ๋ณ‘๋ ฌ์ ์œผ๋กœ ์ฝ์„ ์ˆ˜ ์žˆ๋„๋ก ํ•˜๊ธฐ ์œ„ํ•œ ๊ฒƒ๋„ ์žˆ๋‹ค.

 

++) Partition์˜ ์šฉ๋„

  • ์ˆ˜ํ‰์ ์œผ๋กœ Partition์˜ ์ˆ˜๋ฅผ ๋Š˜๋ ค, Topic์ด ๊ฐ€์งˆ ์ˆ˜ ์žˆ๋Š” ๋ฐ์ดํ„ฐ์˜ ํฌ๊ธฐ ๋Š˜๋ฆฌ๊ธฐ > Load Balancing์œผ๋กœ ์ด์–ด์ง
  • ํŠน์ • ํ‚ค๋ฅผ ๊ฐ€์ง€๊ณ  ๋ ˆ์ฝ”๋“œ๋ฅผ ๋‚˜๋ˆ„๋Š” ๊ฒฝ์šฐ > Segmantic Partitioning

 

์ด๋Ÿฌํ•œ ์˜ต์…˜๋“ค์€ ๋‚˜์ค‘์— Kafka์—์„œ Topic์˜ ํŒŒ๋ผ๋ฏธํ„ฐ์— ์˜ํ•ด ๊ฒฐ์ •๋œ๋‹ค.

 

์ด๋ฆ„, Partition์˜ ์ˆ˜, ๋ณต์ œ๋ณธ์˜ ์ˆ˜(์›๋ณธ ํฌํ•จ), Consistency Level, ๋ฐ์ดํ„ฐ ๋ณด์กด ๊ธฐํ•œ, ๋ฉ”์‹œ์ง€ ์••์ถ• ๋ฐฉ์‹.. ๋“ฑ

 

์ง€๊ธˆ๊นŒ์ง€ ์„ค๋ช…ํ–ˆ๋˜ ๊ฐœ๋…๋“ค์ด ์—ฌ๋Ÿฟ ๋ณด์ธ๋‹ค.

์ถ”๊ฐ€์ ์ธ ํŒŒ๋ผ๋ฏธํ„ฐ๋Š” ๋‹ค์Œ์˜ ๋งํฌ๋ฅผ ์ฐธ๊ณ ํ•˜์ž.

https://kafka.apache.org/documentation/#topicconfigs

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

 

2. Broker

์‹ค์ œ ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•˜๋Š” ์„œ๋ฒ„

Kafka ํด๋Ÿฌ์Šคํ„ฐ๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ ๋‹ค์ˆ˜์˜ Broker๋กœ ๊ตฌ์„ฑ๋œ๋‹ค. (Broker = Server = Node)

Broker๋“ค์ด ์‹ค์ œ๋กœ Producer/Consumer๋“ค๊ณผ ํ†ต์‹ ์„ ์ˆ˜ํ–‰ํ•˜๊ฒŒ ๋˜๋ฉฐ, ๋ถ€๊ฐ€ ๊ธฐ๋Šฅ์„ ์œ„ํ•œ ๋‹ค๋ฅธ ์„œ๋น„์Šค๋“ค์ด ์ด๊ณณ์— ์ถ”๊ฐ€๋œ๋‹ค.

๋˜ํ•œ Topic์˜ Partition์„ ์‹ค์ œ๋กœ ๊ด€๋ฆฌํ•˜๋ฉฐ, ํ•˜๋‚˜์˜ Broker์—์„œ ์ตœ๋Œ€ 4000๊ฐœ์˜ Partition์„ ์ฒ˜๋ฆฌํ•˜๊ณ  ๊ธฐ๋กํ•œ๋‹ค.

 

์ถœ์ฒ˜ : http://cloudurable.com/

 

์ด๋•Œ ๋ˆ„๊ฐ€ ์–ด๋””์— Broker, Topic์— ๋Œ€ํ•œ ์ •๋ณด๋ฅผ ๊ด€๋ฆฌํ• ๊นŒ?

 

3. ZooKeeper / KRaft

๋ฉ”ํƒ€ ์ •๋ณด ๊ด€๋ฆฌ๋Š” Apache ZooKeeper์™€ KRaft(Kafka Raft)์„ ๋ณดํ†ต ์‚ฌ์šฉํ•˜๊ฒŒ ๋œ๋‹ค.

  • Broker ๋ฆฌ์ŠคํŠธ ๊ด€๋ฆฌ(๋ˆ„๊ฐ€ Controller(Broker ๋Œ€์žฅ)์ธ๊ฐ€?)
  • Topic ๋ฆฌ์ŠคํŠธ ๊ด€๋ฆฌ(Partition ๊ด€๋ฆฌ, Partition์˜ Replica ๊ด€๋ฆฌ)
  • Topic๋ณ„ Access Control Lists) ๊ด€๋ฆฌ ๋“ฑ

์ตœ๊ทผ์—๋Š” ZooKeeper ๊ด€๋ จ ๋ฌธ์ œ๊ฐ€ ๋งŽ์•„ KRaft๋กœ ๋งŽ์ด ๋„˜์–ด์˜ค๊ณ  ์žˆ๋‹ค๊ณ  ํ•œ๋‹ค.

 

์ด์ œ๊นŒ์ง€ ์„ค๋ช…ํ•œ Kafka์˜ ์•„ํ‚คํ…์ฒ˜๋ฅผ ์ „์ฒด์ ์ธ ๊ทธ๋ฆผ์œผ๋กœ ๋ณด๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

 

์ถœ์ฒ˜: https://mail-narayank.medium.com/

 

Partition์€ ๋‹ค์‹œ ์—ฌ๋Ÿฌ ๊ฐœ์˜ Segment๋กœ ๊ตฌ์„ฑ๋œ๋‹ค.

Segment๋Š” ๋ฌผ๋ฆฌ์ ์œผ๋กœ ๋””์Šคํฌ์— ์ €์žฅ๋˜๋Š” ๋กœ๊ทธ ํŒŒ์ผ๊ณผ ๊ฐ™๋‹ค. (Immutableํ•˜๊ณ , Append๋งŒ ๊ฐ€๋Šฅํ•œ ํ˜•ํƒœ)

Segment๋Š” ์ตœ๋Œ€ ํฌ๊ธฐ๊ฐ€ ์กด์žฌํ•ด์„œ, ์ด ํฌ๊ธฐ๋ฅผ ๋„˜์–ด๊ฐ€๋ฉด ์ƒˆ๋กœ์šด Segment ํŒŒ์ผ์„ ๋งŒ๋“ค์–ด๋‚ธ๋‹ค.

๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์—์„œ Data Recovery๋‚˜ Replay์— ์‚ฌ์šฉํ•˜๋Š” Commit Log๋ผ๊ณ  ํ•  ์ˆ˜ ์žˆ๋‹ค.

 

 


 

๐ŸŸง Kafka ๊ธฐํƒ€ ๊ธฐ๋Šฅ

 

1. Kafka Connect

Kafka ์œ„์— ๋งŒ๋“ค์–ด์ง„ ์ค‘์•™์ง‘์ค‘ ๋ฐ์ดํ„ฐ ํ—ˆ๋ธŒ(๋ณ„๋„์˜ ์„œ๋ฒ„ ํ•„์š”)

๋ฐ์ดํ„ฐ ์‹œ์Šคํ…œ๋“ค๊ฐ„์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ๊ณ  ๋ฐ›๋Š” ์šฉ๋„๋กœ Kafka๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๊ฒŒ ๋งŒ๋“ ๋‹ค.

Kafka์˜ Prodcuer์™€ Consumer ์—ญํ• ์„ Kafka Connect๊ฐ€ ์‹คํ–‰ํ•˜๊ฒŒ ๋˜๋Š” ๊ฒƒ์ด๋‹ค.

 

Kafka Connect, ์ถœ์ฒ˜ : kakao tech

 

์—ฌ๊ธฐ์„œ Source Task๋ฅผ ์ˆ˜ํ–‰ํ•˜๋Š” ๊ฒƒ์ด Producer์˜ ์—ญํ• , Sink Task๋ฅผ ์ˆ˜ํ–‰ํ•˜๋Š” ๊ฒƒ์ด Consumer์˜ ์—ญํ• ์„ ํ•œ๋‹ค.

 

Kafka ์–‘์ชฝ์— Kafka Connect๋ฅผ ๋‘๋Š” ๊ฒƒ์œผ๋กœ, ํŠน๋ณ„ํ•œ ์ฝ”๋”ฉ์—†์ด ๊ฐ„๋‹จํ•œ ํ™˜๊ฒฝ์„ค์ •์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด Kafka ํด๋Ÿฌ์Šคํ„ฐ์— ์ €์žฅํ•˜๊ฑฐ๋‚˜ Kafka ํด๋Ÿฌ์Šคํ„ฐ๋กœ๋ถ€ํ„ฐ ์ฝ์–ด์™€์„œ S3์™€ ๊ฐ™์€ ์ €์žฅ์†Œ์— ์ €์žฅํ•  ์ˆ˜ ์žˆ๊ฒŒ ๋œ๋‹ค. (ETL)

 

2. Kafka Schema Registry

Kafka Topic์— ๋“ค์–ด์˜จ ๋ฐ์ดํ„ฐ์˜ ํฌ๋งท์ด ์–ด๋–ค ํ˜•ํƒœ์ด์–ด์•ผ ํ•˜๋Š”์ง€ ๊ธฐ๋ก๋˜๋Š” ์ €์žฅ์†Œ ์—ญํ• 

๋”ฐ๋ผ์„œ Topic ๋ฉ”์‹œ์ง€ ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•œ ์Šคํ‚ค๋งˆ ๊ด€๋ฆฌ ๋ฐ ๊ฒ€์ฆํ•˜๋Š”๋ฐ ์‚ฌ์šฉํ•˜๊ฒŒ ๋œ๋‹ค.

 

Schema ID(์™€ ๋ฒ„์ „)๋ฅผ ์‚ฌ์šฉํ•ด์„œ Schema Evolution(ํฌ๋งท ๋ณ€๊ฒฝ)์„ ์ง€์›ํ•œ๋‹ค.

๋ณดํ†ต ์••์ถ• ํฌ๋งท์€ AVRO๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.(์Šคํ‚ค๋งˆ ์ž„๋ฒ ๋“œ ๊ฐ€๋Šฅ!)

 

 

 

Kafka Schema Registry ๊ตฌ์กฐ

 

๊ทธ๋ฆผ์ฒ˜๋Ÿผ Schema Registry๊ฐ€ ๋ฐœ๊ธ‰ํ•œ SchemaID๋ฅผ ํ†ตํ•ด ์Šคํ‚ค๋งˆ ์ •๋ณด๋ฅผ Consumer๊ฐ€ ์ดํ•ดํ•  ์ˆ˜ ์žˆ๊ฒŒ ๋œ๋‹ค.

์ด ๊ณผ์ •์—์„œ ์‚ฌ์šฉํ•˜๋Š” Serialization๊ณผ Deserialization์€ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ดํ•ดํ•˜๋ฉด ๋œ๋‹ค.

 

Serialization(์ง๋ ฌํ™”)

  • ๊ฐ์ฒด๋ฅผ ์ €์žฅํ•˜๊ฑฐ๋‚˜ ์ „์†กํ•  ์ˆ˜ ์žˆ๋Š” ํ˜•ํƒœ๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ํ”„๋กœ์„ธ์Šค
  • ๋ฐ์ดํ„ฐ๋ฅผ ์ผ๋ ฌ๋กœ ๋Š˜๋ ค์„œ ์••์ถ•ํ•˜๊ณ  ๋ฐ์ดํ„ฐ์˜ ์Šคํ‚ค๋งˆ ์ •๋ณด๋ฅผ ์ถ”๊ฐ€ํ•˜๋Š” ๋“ฑ์˜ ์ž‘์—…

 

Deserialization(์—ญ์ง๋ ฌํ™”)

  • Serialized๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค์‹œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ํ˜•ํƒœ๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ํ”„๋กœ์„ธ์Šค
  • ๋ฐ์ดํ„ฐ์˜ ์••์ถ•์„ ํ•ด์ œํ•˜๊ฑฐ๋‚˜ ์Šคํ‚ค๋งˆ ์ •๋ณด๋ฅผ ํ† ๋Œ€๋กœ ๋ฐ์ดํ„ฐ ํฌ๋งท ๊ฒ€์ฆ ์ˆ˜ํ–‰ ์ž‘์—…

 

์ง๋ ฌํ™”์™€ ์—ญ์ง๋ ฌํ™”๋Š” ๋ณดํ†ต Kafka ๊ด€๋ จ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

 

3. REST Proxy

Producer, Kafka Cluster, Consumer๋Š” ๋ณดํ†ต ๊ฐ™์€ ๋„คํŠธ์›Œํฌ์—์„œ ์‚ฌ์šฉํ•˜๊ฒŒ ๋œ๋‹ค.

์ด๋•Œ ์™ธ๋ถ€์—์„œ Kafka Cluster์˜ ํŠน์ • Topic ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ๊ฑฐ๋‚˜ ์“ธ ๋•Œ ๋ฐœ์ƒํ•˜๋Š” ๋ณด์•ˆ ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•œ ๋ฐฉ๋ฒ•์ด๋‹ค.

 

ํด๋ผ์ด์–ธํŠธ๊ฐ€ API ํ˜ธ์ถœ์„ ์‚ฌ์šฉํ•˜์—ฌ Kafka๋ฅผ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•˜๋ฉฐ,

Message Serialization๊ณผ Deserialization์„ ๋Œ€์‹  ์ˆ˜ํ–‰ํ•˜๊ณ  Load Balancing๋„ ์ˆ˜ํ–‰ํ•˜๊ฒŒ ๋œ๋‹ค.

Message๋ฅผ ์ƒ์„ฑ ๋ฐ ์†Œ๋น„ํ•˜๊ณ  Topic์„ ๊ด€๋ฆฌํ•˜๋Š” ๊ฐ„๋‹จํ•˜๊ณ  ํ‘œ์ค€ํ™”๋œ ๋ฐฉ๋ฒ•์„ ์ œ๊ณตํ•˜๋Š” ๊ฒƒ์ด๋‹ค.

 

4. Kafka Streams

Kafka Topic์„ ์†Œ๋น„ํ•˜๊ณ  ์ƒ์„ฑํ•˜๋Š” ์‹ค์‹œ๊ฐ„ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ

 

Spark Streaming๊ฐ€ Kafka Topic์„ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฒฝ์šฐ micro batch์— ๊ฐ€๊น๊ณ ,

Kafka Streams๋Š” Topic์˜ ๋ฐ์ดํ„ฐ๋ฅผ ํ•˜๋‚˜์”ฉ ์ฒ˜๋ฆฌํ•˜๋Š” ์กฐ๊ธˆ ๋” Realtime์— ๊ฐ€๊นŒ์šด ๋ฐฉ๋ฒ•์ด๋‹ค.

 

5. ksqlDB

Kafka Streams๋กœ ๊ตฌํ˜„๋œ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์ด๋‹ค.(KSQL ๋Œ€์ฒด)

SQL๊ณผ ์œ ์‚ฌํ•œ ์ฟผ๋ฆฌ ์–ธ์–ด๋ฅผ ์ง€์›ํ•˜๊ณ , ์กฐ๊ธˆ ํŠน๋ณ„ํ•˜๊ฒŒ ์—ฐ์† ์ฟผ๋ฆฌ๋ผ๋Š” ๊ธฐ๋Šฅ๋„ ์ง€์›ํ•œ๋‹ค.

์—ฐ์† ์ฟผ๋ฆฌ๋Š” SELECT์™€ ๊ฐ™์€ ์ฟผ๋ฆฌ๋ฅผ ๋„ฃ์—ˆ์„ ๋•Œ, ๋ฐ์ดํ„ฐ๊ฐ€ ์‹ค์‹œ๊ฐ„์œผ๋กœ ๋„์ฐฉํ•  ๋•Œ๋งˆ๋‹ค ์—…๋ฐ์ดํŠธ๋˜๋Š” ๊ธฐ๋Šฅ์ด๋‹ค.

 


 

๐ŸŸฉ Kafka ์„ค์น˜ ๋ฐ ๊ฐ„๋‹จ ์‹ค์Šต

 

Kafka docker-compose ํŒŒ์ผ์„ ํ†ตํ•ด ์„ค์น˜ํ•œ๋‹ค.

 

Conduktor.io ํšŒ์‚ฌ์—์„œ ๋งŒ๋“  Kafka ๋ ˆํฌ๋ฅผ ์ฐธ๊ณ ํ•˜์ž.

https://github.com/conduktor/kafka-stack-docker-compose

 

GitHub - conduktor/kafka-stack-docker-compose: docker compose files to create a fully working kafka stack

docker compose files to create a fully working kafka stack - GitHub - conduktor/kafka-stack-docker-compose: docker compose files to create a fully working kafka stack

github.com

 

๋‹ค์–‘ํ•œ yml ํŒŒ์ผ ์ค‘ ์—ฌ๋Ÿฌ ๊ฐ€์ง€ ๊ธฐ๋Šฅ์„ ๋‹ด๊ณ ์žˆ๋Š” Full Stack ๋ฒ„์ „์„ ์„ค์น˜ํ•œ๋‹ค.

์ด ์ค‘์—๋Š” conduktor๊ฐ€ ์ œ๊ณตํ•˜๋Š” ์›น UI๋„ ์žˆ๋‹ค.

 

git pull https://github.com/conduktor/kafka-stack-docker-compose.git
docker compose -f full-stack.yml up

 

๋‘ ๋ฒˆ์งธ ๋ช…๋ น์–ด๋ฅผ ์ˆ˜ํ–‰ํ•˜๋Š” ๋ฐ์—๋Š” ๋Œ€๋žต 5๋ถ„ ์ •๋„ ๊ธฐ๋‹ค๋ ค์•ผ ํ•œ๋‹ค.

 

์ž‘์—…์ด ๋ชจ๋‘ ๋๋‚˜๋ฉด conduktor์˜ ์›น UI์— ์ ‘์†ํ•ด๋ณด์ž.

 http://localhost:8080/ ๋กœ ์ ‘์†ํ•  ์ˆ˜ ์žˆ๊ณ , ์•„์ด๋””์™€ ๋น„๋ฐ€๋ฒˆํ˜ธ๋Š” ๊ฐ๊ฐ admin@admin.io , admin ์ด๋‹ค.

 

์›น UI ํ™”๋ฉด

 

์™ผ์ชฝ ๋ฉ”๋‰ด์— ๋ณด์ด๋Š” Topics, Schema Registry, Consumer Groups ๋“ฑ ์ด์ œ๊ป ์•Œ์•„๋ดค๋˜ ๊ธฐ๋Šฅ๋“ค์„ ์‰ฝ๊ฒŒ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

์ฐธ๊ณ ๋กœ ๋กœ์ปฌ์— ์„ค์น˜ํ•˜์—ฌ ์šด์šฉํ•˜๋Š” ํ…Œ์ŠคํŠธ์šฉ Kafka๋กœ์จ ํ™œ์šฉ๋˜๋Š” ํ™˜๊ฒฝ์ด๋ฏ€๋กœ, Brokers๋„ ํ•˜๋‚˜๋งŒ ์กด์žฌํ•œ๋‹ค.

 

์ด์ œ python์„ ํ™œ์šฉํ•œ Kafka๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด ๊ด€๋ จ ํ™˜๊ฒฝ์„ ์„ค์น˜ํ•ด์ฃผ์ž.

pip3 install kafka-python

 

๊ทธ๋ฆฌ๊ณ  producer.py๋ฅผ ์ž‘์„ฑํ•œ๋‹ค.

 

from time import sleep
from json import dumps
from kafka import KafkaProducer

# producer ์ƒ์„ฑ
producer = KafkaProducer(
 bootstrap_servers=['localhost:9092'], # Broker์— ํ•ด๋‹นํ•˜๋Š” ์„œ๋ฒ„ ์ž…๋ ฅ
 value_serializer=lambda x: dumps(x).encode('utf-8') # dumps๋Š” dictinary๋ฅผ json string์œผ๋กœ ๋ฐ”๊ฟ”์คŒ
)

# 0.5์ดˆ๋งˆ๋‹ค ๋ฐ์ดํ„ฐ ๋„ฃ๊ธฐ
for j in range(50):
 print("Iteration", j)
 data = {'counter': j}
 producer.send('topic_test', value=data) # ์ด๋ฆ„์ด topic_test์ธ Topic์— ํ•ด๋‹น ๋ฐ์ดํ„ฐ ์ „์†ก
 sleep(0.5)

 

์ด ํŒŒ์ด์ฌ ํŒŒ์ผ์„ ์•„๊นŒ git clone๋œ ํด๋”์— ๋„ฃ๊ณ  ์‹คํ–‰์‹œ์ผœ๋ณด์ž.

 

 

Kafka๊ฐ€ ์‹คํ–‰๋˜์–ด ์žˆ์ง€ ์•Š๋‹ค๋ฉด "NoBrokersAvailable"์ด๋ž€ ์—๋Ÿฌ ๋ฉ”์‹œ์ง€๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค.

๊ทธ๋ฆฌ๊ณ  ์›น UI๋กœ ๊ฒฐ๊ณผ๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

 

topic_test๋ผ๋Š” topic์ด ์ƒ๊ฒผ๋‹ค.

 

topic_test ๋‚ด๋ถ€ ๋ชจ์Šต

 

๋ณ„๋‹ค๋ฅธ Key๋‚˜ ์˜ต์…˜์„ ์ง€์ •ํ•ด์ฃผ์ง€ ์•Š์•„ Key๋Š” null๋กœ ๊ธฐ๋ก๋˜์–ด์žˆ๋‹ค.

์ด๋ ‡๊ฒŒ Key๊ฐ€ Null์ธ๋ฐ, ์—ฌ๋Ÿฌ ๊ฐœ์˜ Partition์„ ๊ฐ€์ง€๊ณ  ์žˆ๋‹ค๋ฉด ๋ผ์šด๋“œ ๋กœ๋นˆํ˜•์‹์œผ๋กœ ๋ถ„์‚ฐํ•˜๊ฒŒ ๋œ๋‹ค.

 

์ด๋ฒˆ์—๋Š” consumer.py๋ฅผ ์ž‘์„ฑํ•ด๋ณด์ž.

 

from kafka import KafkaConsumer
from json import loads
from time import sleep

# consumer ์ƒ์„ฑ
consumer = KafkaConsumer(
 'topic_test', # topic_test Topic์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด์˜จ๋‹ค.
 bootstrap_servers=['localhost:9092'], # Broker ์„œ๋ฒ„ ์ž…๋ ฅ
 auto_offset_reset='earliest', # ๊ฐ€์žฅ ๋จผ์ € ๋“ค์–ด์˜จ ๋ฐ์ดํ„ฐ๋ถ€ํ„ฐ ์ฝ๊ธฐ, ๋ฐ˜๋Œ€๋Š” 'latest'
 enable_auto_commit=True, # False์ธ ๊ฒฝ์šฐ commit ํ•จ์ˆ˜๋กœ ๋ช…์‹œ์ ์œผ๋กœ offset ์œ„์น˜๋ฅผ ์ปค๋ฐ‹ํ•ด์•ผํ•จ(์•ˆ์ „)
 group_id='my-group-id', # Topic์„ ๊ฐ™์ด ์†Œ๋น„ํ•  Conumser ๊ทธ๋ฃน
 value_deserializer=lambda x: loads(x.decode('utf-8')) # ์•„๊นŒ producer์™€ ๋ฐ˜๋Œ€ ์ž‘์—…
)

# ์ฝ์–ด์˜จ ๋ฐ์ดํ„ฐ๋ฅผ ์ถœ๋ ฅ
for event in consumer:
 event_data = event.value
 # Do whatever you want
 print(event_data)
 sleep(2)

 

 

consumer๋„ ์ •์ƒ์ ์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ž˜ ์ฝ์–ด์˜จ๋‹ค.