일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- off heap memory
- redshift
- KDT_TIL
- k8s
- etl
- Docker
- spark executor memory
- colab
- Spark SQL
- 빅데이터
- Airflow
- SQL
- CI/CD
- Kafka
- 데이터 파이프라인
- topic
- Dag
- Spark Caching
- Spark
- Speculative Execution
- Spark Partitioning
- disk spill
- mysql
- AQE
- Salting
- aws
- DataFrame Hint
- Spark 실습
- Kubernetes
- backfill
- Today
- Total
목록Airflow (9)
JUST DO IT!
📚 KDT WEEK 12 DAY 4 TIL Airflow 운영 Airflow 대안 🟥 프로덕션 사용을 위한 Airflow 환경 설정 1. airflow.cfg core 섹션의 dags_folder가 들어있는 디렉토리 설정 dag_dir_list_interval에 설정된 dag 폴더 스캔 주기 설정(초단위) 모든 DAG를 한번씩 실행해보기 때문에, 이게 곤란한 DAG들은 airflowignore 활용 2. DB의 정보를 주기적으로 백업하는게 좋다. Airflow의 기본 DB는 Sqlite이지만 Postgres나 MySQL로 바꾸는 경우, airflow.cfg의 core 섹션의 sql_alchemy_conn 설정 변경과 Executor 변경필요 3. Authentication과 보안 기본으로 주어지는 어드민..
📚 KDT WEEK 12 DAY 3 TIL Task Grouping Dynamic Dags 📦 Task Grouping Task 수가 많은 DAG라면 Task들을 성격에 따라 관리할 수 있음 Airflow 2.0에서부터는 SubDAG 대신 Task Grouping 사용 ex) start > 파일 1, 2, 3 다운로드 Task > 파일 1, 2, 3 Process Task > end 구조로 DAG를 작성했다. from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator from airflow.operators.bash import BashOperator from airflow.utils.task_group impo..
📐 Airflow에서 Jinja Template 사용하기 Jinja Template를 사용하면 Airflow의 Variables나 execution_date 등을 코드 내에서 쉽게 사용하기 좋다. Jinja Template을 지원하는 Operator의 Parameter에서만 사용 가능하다. BashOperator에서는 bash_command에서 Jinja Template를 사용할 수 있다. 아래는 그 예시이다. # BashOperator를 사용하여 템플릿 작업 정의 task1 = BashOperator( task_id='task1', bash_command='echo "{{ ds }}"', # ds로 execution_date 나타냄 dag=dag ) # 동적 매개변수가 있는 다른 템플릿 작업 정의 ta..
📚 KDT WEEK 12 DAY 2 TIL Airflow API 사용하기 사용하기 전에 먼저 Airflow API를 사용하기 위해 airflow.cfg 변경이 조금 필요하다. airflow.cfg의 api 섹션에서 auth_backend의 값을 아래와 같이 변경해야한다. [api] auth_backend = airflow.api.auth.backend.basic_auth basic_auth는 airflow api를 사용하기 위해 사용자의 아이디와 비밀번호로 인증하는 방식이다. 이 설정이 되어있는지 확인하고 싶다면 터미널에 다음의 명령어를 입력해서 확인할 수 있다. (docker) docker exec -it airflow-scheduler airflow config get-value api auth_ba..
📚 KDT WEEK 12 DAY 1 TIL Airflow에서 Slack 알림 처리하기 🟥 Slack 알림 처리하기 1. Slack API 페이지에서 알림받을 채널 설정하기 먼저, Slack API 페이지에 접속한다. https://api.slack.com/messaging/webhooks Sending messages using Incoming Webhooks Creating an Incoming Webhook gives you a unique URL to which you send a JSON payload with the message text and some options. api.slack.com 오른쪽 상단의 Your Apps를 누르고, 새로운 앱을 만든다. 첫 번째 옵션(From scratch..
📚 KDT WEEK 11 DAY 1 TIL Docker 실습 리눅스 우분투 MySQL Airflow 측면에서 바라보는 Docker 더보기 Airflow 운영상의 어려움 DAG의 수가 많아지면 데이터 품질이나 데이터 리니지 이슈 등 외에도 다른도 발생하는데, DAG 라이브러리 충돌, Worker부족, Worker 서버들의 관리와 활용도 이슈 등이 발생할 수 있다. 이런 이슈들을 해결하기 위해 사용하는 방법으로, Docker와 K8s를 많이 사용한다. 태스크나 DAG 코드를 Docker Image로 만들어서 Docker Container 형태로 실행 라이브러리와 모듈 충돌 방지 개발 환경과 프로덕션 환경을 동일하게 유지 가능 Airflow Worker를 K8s(공용 서버 클러스터)에서 필요한 대로 동적으로 ..
📚 KDT WEEK 10 DAY 4 TIL MySQL ➡️ Redshift Airflow ETL 구현 사전작업 ETL 코드 Backfill 구동 🟥 MySQL(OLTP)에서 Redshift(OLAP)로 Airflow ETL 구현해보기 프로덕션 데이터베이스(MySQL)에서 데이터 웨어하우스(Redshift)로 데이터를 전송하는 ETL을 Airflow로 구현해보자. 🛠️ 사전 작업 1. 권한 설정 먼저, 서로간의 권한 설정이 사전에 필요하다. Airflow DAG에서 S3 접근 권한 : AWS IAM User(S3 버킷 읽기, 쓰기 권한) 생성해서 access key, secret key 받기 https://sunhokimdev.tistory.com/34 > Snowflake 실습 > Snowflake에서 ..
📚 KDT WEEK 10 DAY 4 TIL Primary Key Uniqueness 보장하기 🟥 Primary Key Uniqueness 보장하기 PK : 테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드(들) 관계형 데이터베이스 시스템이 Primary key의 값이 중복 존재하는 것을 막아준다. 빅데이터 기반 데이터 웨어하우스들은 Primary key Uniqueness를 보장하지 않으므로 엔지니어가 보장해야함 날씨 데이터 API(https://openweathermap.org/api)를 가져오는 과정에서 PK Uniquness를 보장해보려고 한다. SQL 문법을 사용해서 Upsert(데이터가 있는 경우 새로 업데이트)방식을 사용할 것이다. 참고로 적재 대상 DB인 Redshift에서는 Upse..
📚 KDT WEEK 10 DAY 3 TIL Yahoo finance API Airflow DAG 코드 구현 Full Refresh 방식 구현 Incremental Update 방식 구현 Yahoo finance API를 사용하여 Airflow ETL 구현하기 먼저 Yahoo finance API를 사용하려면 yfinance 모듈을 다운로드받아야한다. 나는 Airflow를 도커 환경에서 사용하고 있으므로 도커에 설치를 해주었다. docker ps # Airflow scheduler의 Container ID를 확인하자 docker exec -it SchedulerContainerID sh # 찾은 scheduler의 Container ID를 입력해서 접속한다. # airflow 접속 후 pip install..