์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- Airflow
- Speculative Execution
- mysql
- Spark ์ค์ต
- Spark Partitioning
- Dag
- redshift
- Docker
- DataFrame Hint
- SQL
- aws
- CI/CD
- colab
- Spark
- k8s
- off heap memory
- disk spill
- ๋น ๋ฐ์ดํฐ
- Kubernetes
- Spark SQL
- Salting
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- spark executor memory
- backfill
- AQE
- topic
- Kafka
- etl
- KDT_TIL
- Spark Caching
- Today
- Total
JUST DO IT!
Kafka ์์๋ณด๊ณ ์ค์นํด๋ณด๊ธฐ - TIL230712 ๋ณธ๋ฌธ
๐ KDT WEEK 15 DAY 3 TIL
- Kafka
- Kafka ์ํคํ ์ฒ
- Kafka ๊ธฐํ ๊ธฐ๋ฅ
- Kafka ์ค์น ๋ฐ ๊ฐ๋จ ์ค์ต
๐ฅ Kafka
์ค์๊ฐ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ธฐ ์ํด ์ค๊ณ๋ ์คํ์์ค ๋ถ์ฐ ์คํธ๋ฆฌ๋ฐ ํ๋ซํผ
- ๋ถ์ฐ ์ํคํ ์ฒ๋ฅผ ๋ฐ๋ฅด๊ธฐ ๋๋ฌธ์ Scale Out ํํ๋ก ์ค์ผ์ผ ๊ฐ๋ฅ
- Scalability์ Fault Tolerance๋ฅผ ์ ๊ณตํ๋ Publish-Subscription(Producer-Consumer) ๋ฉ์์ง ์์คํ
- ์ค๊ฐ์ ๋ฒํผ(ํ)๋ฅผ ๋๊ณ , Producer์ Consumer๊ฐ ๋ ๋ฆฝ์ ์ผ๋ก ๋์๊ฐ๊ฒ ๋ง๋ ์์คํ
- High Throughput, Low Latency๋ก ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ์ ๋ง๊ฒ ๊ตฌํ
- ๋ฉ์์ง๋ฅผ ์ ํด์ง ๋ณด์ ๊ธฐํ(Retention period)๋์ ์ ์ฅํ๊ฒ ๋๋ค
ํน์ด์
- ๋ฉ์์ง ์์ฐ๊ณผ ์๋น๊ฐ ๋ถ๋ฆฌ๋จ์ผ๋ก์จ BackPressure(๋ฐ์ดํฐ ๊ณผ์์ฐ) ๋ฐฉ์ง
- ksqlDB๋ฅผ ํตํด SQL๋ก๋ ์ค์๊ฐ ์ด๋ฒคํธ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ๊ฐ๋ฅ
- ๋ฐ์ดํฐ ๋ณต์ ๋ฐ ๋ถ์ฐ ์ปค๋ฐ ๋ก๊ทธ ๊ธฐ๋ฅ์ ์ ๊ณตํ์ฌ ์ฅ์ ๋์์ด ์ฉ์ด
- ๋ธ๋ก์ปค(์๋ฒ)๋ฅผ ์ถ๊ฐํ์ฌ ์ฝ๊ฒ ์ํ ํ์ฅ ๊ฐ๋ฅ
- ์ปค๋ฅํฐ์ ํตํฉ ๋๊ตฌ๋ก ๊ตฌ์ฑ๋ ํ๋ถํ ์์ฝ์์คํ
์ ๊ฐ์ถ๊ณ ์์ด ๋ค๋ฅธ ๋ฐ์ดํฐ ์์คํ
๋ฐ ํ๋ ์์ํฌ์ ์ฝ๊ฒ ์ฐ๋ ๊ฐ๋ฅ
- ex) Kafka Connect, Kafka Schema Registry ๋ฑ
! ๋ถ์ฐ ์์คํ
๋ถ์ฐ ์์คํ ์์๋ Fault Tolerance ๋ฑ์ ์ํด ๋ฐ์ดํฐ๋ฅผ ๋ค์์ ์๋ฒ์ ๋ถ์ฐํด์ ์ ์ฅํจ
100๋ ์๋ฒ๋ก ๊ตฌ์ฑ๋ ๋ถ์ฐ ์์คํ ์ ๋ ์ฝ๋๋ฅผ ํ๋ ์ด๋ค๋ฉด ๊ทธ ๋ ์ฝ๋๋ฅผ ๋ฐ๋ก ์ฝ์ ์ ์๋ ๊ฐ? > Consistency์ ๋ฐ๋ผ ๋ค๋ฆ
- Strong Consistency : ๋ฐ์ดํฐ๋ฅผ ์ธ ๋ ๋ชจ๋ ์๋ฒ์ ๋ณต์ ๊ฐ ์๋ฃ๋ ๋๊น์ง ๊ธฐ๋ค๋ฆฐ ๋ค, ๋ ์ฝ๋๋ฅผ ์ฝ์ ์ ์๊ฒ ๋จ
- Eventual Consistency : ๋ฐ์ดํฐ๋ฅผ ์ผ๋ถ ์๋ฒ์๋ง ๋ณต์ ํ๊ณ , ๋ฐ๋ก ๋ ์ฝ๋๋ฅผ ์ฝ์ ์ ์๋๋ก ํจ
๐ฆ Kafka ์ํคํ ์ฒ
1. Topic
Kafka์์๋ Topic์ด๋ผ๋ ๋ฐ์ดํฐ ์ด๋ฒคํธ ์คํธ๋ฆผ์ ๊ฐ์ง๊ณ ์๋ค.
Topic์์ ๋ฐ์ดํฐ๊ฐ ์์ฑ๋ ์์๋๋ก ์์ฌ ์ ์ฅ๋๋ค.
Producer(๋ฐ์ดํฐ ์์ฐ์)๊ฐ Topic์ ๋ง๋ค๊ณ Consumer(๋ฐ์ดํฐ ์๋น์)๊ฐ Topic์์ ๋ฐ์ดํฐ๋ฅผ ์ฝ๋ ๊ตฌ์กฐ์ด๋ค.
์ด๋ Consumer๋ ๋ค์๋ก ์กด์ฌํ์ฌ Topic์ ์ฝ์ด๋ค์ด๋ ๊ฒ์ด ๊ฐ๋ฅํ๋ค.
++) ์ฌ๊ธฐ์ ์ค์ํ ์ ์ Consumer๊ฐ ๋ฐ์ดํฐ๋ฅผ ์ฝ๋๋ค๊ณ ํด์ ๋ฐ์ดํฐ๊ฐ ์ฌ๋ผ์ง์ง๋ ์๋๋ค๋ ๊ฒ์ด๋ค.
Topic์ ์์ Kafka ํน์ง์ผ๋ก ์๊ฐํ๋ Retention period๊ฐ ์กด์ฌํ์ฌ, ์ด ๊ธฐ๊ฐ๋์ ๋ฐ์ดํฐ๊ฐ ์กด์ฌํ๊ฒ ๋๋ค.
๋ฐ๋ผ์ Consumer๋ณ๋ก ์ด๋ ์์น์ ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ณ ์๋์ง ์์น ์ ๋ณด(Offset)๋ฅผ ์ ์งํ๊ฒ ๋๋ค.
์ด Topic์ ์์ด๋ ๋ฐ์ดํฐ๋ฅผ Message(Event)๋ผ๊ณ ํ๋ค.
- Event๋ ์ต๋ 1Mb์ ํฌ๊ธฐ์ด๋ฉฐ Header, Key, Value, Timestamp๋ฅผ ํฌํจํ ๊ตฌ์กฐ๋ฅผ ๊ฐ์ง๋ค.
- Header์๋ ๊ฒฝ๋ ๋ฉํ ๋ฐ์ดํฐ ์ ๋ณด๊ฐ ๋ด๊ธฐ๊ณ
- Key์๋ Topic ๋ฐ์ดํฐ๋ฅผ Partitioningํ ๋ ํ์ํ ๋ฐ์ดํฐ๊ฐ ํฌํจ๋๋ค.
1-1. Partitioning
ํ๋์ Topic์ ํ์ฅ์ฑ์ ์ํด ๋ค์์ Partition์ผ๋ก ๋๋์ด ์ ์ฅ๋๋ค.
๋ณดํต์ Key๊ฐ ์๋ค๋ฉด ํค์ Hashing ๊ฐ์ Partition์ ์๋ก ๋๋ ๋๋จธ์ง๋ก ๊ฒฐ์ ๋๋ค.
ํ๋์ Partition์ Fail-over๋ฅผ ์ํด Replication Partiton(๋ณต์ ๋ณธ)์ ๊ฐ๋๋ค.
์ด๋ ๊ฐ Partition๋ณ๋ก Leader์ Follower๋ก ๊ตฌ๋ถ๋์ด Leader๋ฅผ ํตํด ์ฐ๊ธฐ๊ฐ ์ด๋ค์ง๊ณ , Leader/Follower๋ฅผ ํตํด ์ฝ๊ฒ ๋๋ค.
Partition์ด ๋ณต์ ๋ณธ์ ๊ฐ๋ ์ด์ ๋ ๋ฐ์ดํฐ ์ ์ค์ ๋ง๊ธฐ ์ํ ๊ฒ ๋ฟ๋ง์๋๋ผ ๋ค์์ Consumer๊ฐ ๋ณ๋ ฌ์ ์ผ๋ก ์ฝ์ ์ ์๋๋ก ํ๊ธฐ ์ํ ๊ฒ๋ ์๋ค.
++) Partition์ ์ฉ๋
- ์ํ์ ์ผ๋ก Partition์ ์๋ฅผ ๋๋ ค, Topic์ด ๊ฐ์ง ์ ์๋ ๋ฐ์ดํฐ์ ํฌ๊ธฐ ๋๋ฆฌ๊ธฐ > Load Balancing์ผ๋ก ์ด์ด์ง
- ํน์ ํค๋ฅผ ๊ฐ์ง๊ณ ๋ ์ฝ๋๋ฅผ ๋๋๋ ๊ฒฝ์ฐ > Segmantic Partitioning
์ด๋ฌํ ์ต์ ๋ค์ ๋์ค์ Kafka์์ Topic์ ํ๋ผ๋ฏธํฐ์ ์ํด ๊ฒฐ์ ๋๋ค.
์ด๋ฆ, Partition์ ์, ๋ณต์ ๋ณธ์ ์(์๋ณธ ํฌํจ), Consistency Level, ๋ฐ์ดํฐ ๋ณด์กด ๊ธฐํ, ๋ฉ์์ง ์์ถ ๋ฐฉ์.. ๋ฑ
์ง๊ธ๊น์ง ์ค๋ช ํ๋ ๊ฐ๋ ๋ค์ด ์ฌ๋ฟ ๋ณด์ธ๋ค.
์ถ๊ฐ์ ์ธ ํ๋ผ๋ฏธํฐ๋ ๋ค์์ ๋งํฌ๋ฅผ ์ฐธ๊ณ ํ์.
https://kafka.apache.org/documentation/#topicconfigs
2. Broker
์ค์ ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๋ ์๋ฒ
Kafka ํด๋ฌ์คํฐ๋ ๊ธฐ๋ณธ์ ์ผ๋ก ๋ค์์ Broker๋ก ๊ตฌ์ฑ๋๋ค. (Broker = Server = Node)
Broker๋ค์ด ์ค์ ๋ก Producer/Consumer๋ค๊ณผ ํต์ ์ ์ํํ๊ฒ ๋๋ฉฐ, ๋ถ๊ฐ ๊ธฐ๋ฅ์ ์ํ ๋ค๋ฅธ ์๋น์ค๋ค์ด ์ด๊ณณ์ ์ถ๊ฐ๋๋ค.
๋ํ Topic์ Partition์ ์ค์ ๋ก ๊ด๋ฆฌํ๋ฉฐ, ํ๋์ Broker์์ ์ต๋ 4000๊ฐ์ Partition์ ์ฒ๋ฆฌํ๊ณ ๊ธฐ๋กํ๋ค.
์ด๋ ๋๊ฐ ์ด๋์ Broker, Topic์ ๋ํ ์ ๋ณด๋ฅผ ๊ด๋ฆฌํ ๊น?
3. ZooKeeper / KRaft
๋ฉํ ์ ๋ณด ๊ด๋ฆฌ๋ Apache ZooKeeper์ KRaft(Kafka Raft)์ ๋ณดํต ์ฌ์ฉํ๊ฒ ๋๋ค.
- Broker ๋ฆฌ์คํธ ๊ด๋ฆฌ(๋๊ฐ Controller(Broker ๋์ฅ)์ธ๊ฐ?)
- Topic ๋ฆฌ์คํธ ๊ด๋ฆฌ(Partition ๊ด๋ฆฌ, Partition์ Replica ๊ด๋ฆฌ)
- Topic๋ณ Access Control Lists) ๊ด๋ฆฌ ๋ฑ
์ต๊ทผ์๋ ZooKeeper ๊ด๋ จ ๋ฌธ์ ๊ฐ ๋ง์ KRaft๋ก ๋ง์ด ๋์ด์ค๊ณ ์๋ค๊ณ ํ๋ค.
์ด์ ๊น์ง ์ค๋ช ํ Kafka์ ์ํคํ ์ฒ๋ฅผ ์ ์ฒด์ ์ธ ๊ทธ๋ฆผ์ผ๋ก ๋ณด๋ฉด ๋ค์๊ณผ ๊ฐ๋ค.
Partition์ ๋ค์ ์ฌ๋ฌ ๊ฐ์ Segment๋ก ๊ตฌ์ฑ๋๋ค.
Segment๋ ๋ฌผ๋ฆฌ์ ์ผ๋ก ๋์คํฌ์ ์ ์ฅ๋๋ ๋ก๊ทธ ํ์ผ๊ณผ ๊ฐ๋ค. (Immutableํ๊ณ , Append๋ง ๊ฐ๋ฅํ ํํ)
Segment๋ ์ต๋ ํฌ๊ธฐ๊ฐ ์กด์ฌํด์, ์ด ํฌ๊ธฐ๋ฅผ ๋์ด๊ฐ๋ฉด ์๋ก์ด Segment ํ์ผ์ ๋ง๋ค์ด๋ธ๋ค.
๋ฐ์ดํฐ๋ฒ ์ด์ค์์ Data Recovery๋ Replay์ ์ฌ์ฉํ๋ Commit Log๋ผ๊ณ ํ ์ ์๋ค.
๐ง Kafka ๊ธฐํ ๊ธฐ๋ฅ
1. Kafka Connect
Kafka ์์ ๋ง๋ค์ด์ง ์ค์์ง์ค ๋ฐ์ดํฐ ํ๋ธ(๋ณ๋์ ์๋ฒ ํ์)
๋ฐ์ดํฐ ์์คํ ๋ค๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ๋ ์ฉ๋๋ก Kafka๋ฅผ ์ฌ์ฉํ ์ ์๊ฒ ๋ง๋ ๋ค.
Kafka์ Prodcuer์ Consumer ์ญํ ์ Kafka Connect๊ฐ ์คํํ๊ฒ ๋๋ ๊ฒ์ด๋ค.
์ฌ๊ธฐ์ Source Task๋ฅผ ์ํํ๋ ๊ฒ์ด Producer์ ์ญํ , Sink Task๋ฅผ ์ํํ๋ ๊ฒ์ด Consumer์ ์ญํ ์ ํ๋ค.
Kafka ์์ชฝ์ Kafka Connect๋ฅผ ๋๋ ๊ฒ์ผ๋ก, ํน๋ณํ ์ฝ๋ฉ์์ด ๊ฐ๋จํ ํ๊ฒฝ์ค์ ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด Kafka ํด๋ฌ์คํฐ์ ์ ์ฅํ๊ฑฐ๋ Kafka ํด๋ฌ์คํฐ๋ก๋ถํฐ ์ฝ์ด์์ S3์ ๊ฐ์ ์ ์ฅ์์ ์ ์ฅํ ์ ์๊ฒ ๋๋ค. (ETL)
2. Kafka Schema Registry
Kafka Topic์ ๋ค์ด์จ ๋ฐ์ดํฐ์ ํฌ๋งท์ด ์ด๋ค ํํ์ด์ด์ผ ํ๋์ง ๊ธฐ๋ก๋๋ ์ ์ฅ์ ์ญํ
๋ฐ๋ผ์ Topic ๋ฉ์์ง ๋ฐ์ดํฐ์ ๋ํ ์คํค๋ง ๊ด๋ฆฌ ๋ฐ ๊ฒ์ฆํ๋๋ฐ ์ฌ์ฉํ๊ฒ ๋๋ค.
Schema ID(์ ๋ฒ์ )๋ฅผ ์ฌ์ฉํด์ Schema Evolution(ํฌ๋งท ๋ณ๊ฒฝ)์ ์ง์ํ๋ค.
๋ณดํต ์์ถ ํฌ๋งท์ AVRO๋ฅผ ์ฌ์ฉํ๋ค.(์คํค๋ง ์๋ฒ ๋ ๊ฐ๋ฅ!)
๊ทธ๋ฆผ์ฒ๋ผ Schema Registry๊ฐ ๋ฐ๊ธํ SchemaID๋ฅผ ํตํด ์คํค๋ง ์ ๋ณด๋ฅผ Consumer๊ฐ ์ดํดํ ์ ์๊ฒ ๋๋ค.
์ด ๊ณผ์ ์์ ์ฌ์ฉํ๋ Serialization๊ณผ Deserialization์ ๋ค์๊ณผ ๊ฐ์ด ์ดํดํ๋ฉด ๋๋ค.
Serialization(์ง๋ ฌํ)
- ๊ฐ์ฒด๋ฅผ ์ ์ฅํ๊ฑฐ๋ ์ ์กํ ์ ์๋ ํํ๋ก ๋ณํํ๋ ํ๋ก์ธ์ค
- ๋ฐ์ดํฐ๋ฅผ ์ผ๋ ฌ๋ก ๋๋ ค์ ์์ถํ๊ณ ๋ฐ์ดํฐ์ ์คํค๋ง ์ ๋ณด๋ฅผ ์ถ๊ฐํ๋ ๋ฑ์ ์์
Deserialization(์ญ์ง๋ ฌํ)
- Serialized๋ ๋ฐ์ดํฐ๋ฅผ ๋ค์ ์ฌ์ฉํ ์ ์๋ ํํ๋ก ๋ณํํ๋ ํ๋ก์ธ์ค
- ๋ฐ์ดํฐ์ ์์ถ์ ํด์ ํ๊ฑฐ๋ ์คํค๋ง ์ ๋ณด๋ฅผ ํ ๋๋ก ๋ฐ์ดํฐ ํฌ๋งท ๊ฒ์ฆ ์ํ ์์
์ง๋ ฌํ์ ์ญ์ง๋ ฌํ๋ ๋ณดํต Kafka ๊ด๋ จ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํ๋ค.
3. REST Proxy
Producer, Kafka Cluster, Consumer๋ ๋ณดํต ๊ฐ์ ๋คํธ์ํฌ์์ ์ฌ์ฉํ๊ฒ ๋๋ค.
์ด๋ ์ธ๋ถ์์ Kafka Cluster์ ํน์ Topic ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ฑฐ๋ ์ธ ๋ ๋ฐ์ํ๋ ๋ณด์ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํ ๋ฐฉ๋ฒ์ด๋ค.
ํด๋ผ์ด์ธํธ๊ฐ API ํธ์ถ์ ์ฌ์ฉํ์ฌ Kafka๋ฅผ ์ฌ์ฉ ๊ฐ๋ฅํ๊ฒ ํ๋ฉฐ,
Message Serialization๊ณผ Deserialization์ ๋์ ์ํํ๊ณ Load Balancing๋ ์ํํ๊ฒ ๋๋ค.
Message๋ฅผ ์์ฑ ๋ฐ ์๋นํ๊ณ Topic์ ๊ด๋ฆฌํ๋ ๊ฐ๋จํ๊ณ ํ์คํ๋ ๋ฐฉ๋ฒ์ ์ ๊ณตํ๋ ๊ฒ์ด๋ค.
4. Kafka Streams
Kafka Topic์ ์๋นํ๊ณ ์์ฑํ๋ ์ค์๊ฐ ์คํธ๋ฆผ ์ฒ๋ฆฌ ๋ผ์ด๋ธ๋ฌ๋ฆฌ
Spark Streaming๊ฐ Kafka Topic์ ์ฒ๋ฆฌํ๋ ๊ฒฝ์ฐ micro batch์ ๊ฐ๊น๊ณ ,
Kafka Streams๋ Topic์ ๋ฐ์ดํฐ๋ฅผ ํ๋์ฉ ์ฒ๋ฆฌํ๋ ์กฐ๊ธ ๋ Realtime์ ๊ฐ๊น์ด ๋ฐฉ๋ฒ์ด๋ค.
5. ksqlDB
Kafka Streams๋ก ๊ตฌํ๋ ์คํธ๋ฆผ ์ฒ๋ฆฌ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ด๋ค.(KSQL ๋์ฒด)
SQL๊ณผ ์ ์ฌํ ์ฟผ๋ฆฌ ์ธ์ด๋ฅผ ์ง์ํ๊ณ , ์กฐ๊ธ ํน๋ณํ๊ฒ ์ฐ์ ์ฟผ๋ฆฌ๋ผ๋ ๊ธฐ๋ฅ๋ ์ง์ํ๋ค.
์ฐ์ ์ฟผ๋ฆฌ๋ SELECT์ ๊ฐ์ ์ฟผ๋ฆฌ๋ฅผ ๋ฃ์์ ๋, ๋ฐ์ดํฐ๊ฐ ์ค์๊ฐ์ผ๋ก ๋์ฐฉํ ๋๋ง๋ค ์ ๋ฐ์ดํธ๋๋ ๊ธฐ๋ฅ์ด๋ค.
๐ฉ Kafka ์ค์น ๋ฐ ๊ฐ๋จ ์ค์ต
Kafka docker-compose ํ์ผ์ ํตํด ์ค์นํ๋ค.
Conduktor.io ํ์ฌ์์ ๋ง๋ Kafka ๋ ํฌ๋ฅผ ์ฐธ๊ณ ํ์.
https://github.com/conduktor/kafka-stack-docker-compose
๋ค์ํ yml ํ์ผ ์ค ์ฌ๋ฌ ๊ฐ์ง ๊ธฐ๋ฅ์ ๋ด๊ณ ์๋ Full Stack ๋ฒ์ ์ ์ค์นํ๋ค.
์ด ์ค์๋ conduktor๊ฐ ์ ๊ณตํ๋ ์น UI๋ ์๋ค.
git pull https://github.com/conduktor/kafka-stack-docker-compose.git
docker compose -f full-stack.yml up
๋ ๋ฒ์งธ ๋ช ๋ น์ด๋ฅผ ์ํํ๋ ๋ฐ์๋ ๋๋ต 5๋ถ ์ ๋ ๊ธฐ๋ค๋ ค์ผ ํ๋ค.
์์ ์ด ๋ชจ๋ ๋๋๋ฉด conduktor์ ์น UI์ ์ ์ํด๋ณด์.
http://localhost:8080/ ๋ก ์ ์ํ ์ ์๊ณ , ์์ด๋์ ๋น๋ฐ๋ฒํธ๋ ๊ฐ๊ฐ admin@admin.io , admin ์ด๋ค.
์ผ์ชฝ ๋ฉ๋ด์ ๋ณด์ด๋ Topics, Schema Registry, Consumer Groups ๋ฑ ์ด์ ๊ป ์์๋ดค๋ ๊ธฐ๋ฅ๋ค์ ์ฝ๊ฒ ํ์ธํ ์ ์๋ค.
์ฐธ๊ณ ๋ก ๋ก์ปฌ์ ์ค์นํ์ฌ ์ด์ฉํ๋ ํ ์คํธ์ฉ Kafka๋ก์จ ํ์ฉ๋๋ ํ๊ฒฝ์ด๋ฏ๋ก, Brokers๋ ํ๋๋ง ์กด์ฌํ๋ค.
์ด์ python์ ํ์ฉํ Kafka๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด ๊ด๋ จ ํ๊ฒฝ์ ์ค์นํด์ฃผ์.
pip3 install kafka-python
๊ทธ๋ฆฌ๊ณ producer.py๋ฅผ ์์ฑํ๋ค.
from time import sleep
from json import dumps
from kafka import KafkaProducer
# producer ์์ฑ
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'], # Broker์ ํด๋นํ๋ ์๋ฒ ์
๋ ฅ
value_serializer=lambda x: dumps(x).encode('utf-8') # dumps๋ dictinary๋ฅผ json string์ผ๋ก ๋ฐ๊ฟ์ค
)
# 0.5์ด๋ง๋ค ๋ฐ์ดํฐ ๋ฃ๊ธฐ
for j in range(50):
print("Iteration", j)
data = {'counter': j}
producer.send('topic_test', value=data) # ์ด๋ฆ์ด topic_test์ธ Topic์ ํด๋น ๋ฐ์ดํฐ ์ ์ก
sleep(0.5)
์ด ํ์ด์ฌ ํ์ผ์ ์๊น git clone๋ ํด๋์ ๋ฃ๊ณ ์คํ์์ผ๋ณด์.
Kafka๊ฐ ์คํ๋์ด ์์ง ์๋ค๋ฉด "NoBrokersAvailable"์ด๋ ์๋ฌ ๋ฉ์์ง๊ฐ ๋ฐ์ํ๋ค.
๊ทธ๋ฆฌ๊ณ ์น UI๋ก ๊ฒฐ๊ณผ๋ฅผ ํ์ธํ ์ ์๋ค.
๋ณ๋ค๋ฅธ Key๋ ์ต์ ์ ์ง์ ํด์ฃผ์ง ์์ Key๋ null๋ก ๊ธฐ๋ก๋์ด์๋ค.
์ด๋ ๊ฒ Key๊ฐ Null์ธ๋ฐ, ์ฌ๋ฌ ๊ฐ์ Partition์ ๊ฐ์ง๊ณ ์๋ค๋ฉด ๋ผ์ด๋ ๋ก๋นํ์์ผ๋ก ๋ถ์ฐํ๊ฒ ๋๋ค.
์ด๋ฒ์๋ consumer.py๋ฅผ ์์ฑํด๋ณด์.
from kafka import KafkaConsumer
from json import loads
from time import sleep
# consumer ์์ฑ
consumer = KafkaConsumer(
'topic_test', # topic_test Topic์์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด์จ๋ค.
bootstrap_servers=['localhost:9092'], # Broker ์๋ฒ ์
๋ ฅ
auto_offset_reset='earliest', # ๊ฐ์ฅ ๋จผ์ ๋ค์ด์จ ๋ฐ์ดํฐ๋ถํฐ ์ฝ๊ธฐ, ๋ฐ๋๋ 'latest'
enable_auto_commit=True, # False์ธ ๊ฒฝ์ฐ commit ํจ์๋ก ๋ช
์์ ์ผ๋ก offset ์์น๋ฅผ ์ปค๋ฐํด์ผํจ(์์ )
group_id='my-group-id', # Topic์ ๊ฐ์ด ์๋นํ Conumser ๊ทธ๋ฃน
value_deserializer=lambda x: loads(x.decode('utf-8')) # ์๊น producer์ ๋ฐ๋ ์์
)
# ์ฝ์ด์จ ๋ฐ์ดํฐ๋ฅผ ์ถ๋ ฅ
for event in consumer:
event_data = event.value
# Do whatever you want
print(event_data)
sleep(2)
consumer๋ ์ ์์ ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์ฝ์ด์จ๋ค.