Airflow
Apache Airflow는 초기 에어비엔비(Airfbnb) 엔지니어링 팀에서 개발한 워크플로우(workflow) 오픈 소스 플랫폼
프로그래밍 방식으로 워크플로우를 작성, 스케줄링 및 모니터링
Airflow 아키텍처
Airflow의 구성요소
Webserver
Airflow의 로그, Scheduler에 의해 생성된 DAG 목록, Task 상태 등을 시각화(UI)
Scheduler
할당된 work들을 스케줄링. 예약된 워크플로우들을 실행(트리거링)하기 위해 executor에게 task 제공
Executor
실행중인 task 핸들링. default로는 scheduler에 할당된 모든 task 실행시키지만, production 수준에서는 실행할 task worker에 push
Workers
실제 task 실행하는 주체
Metadata Database
DAG, Task 등의 metadata 저장 및 관리
DAG Directory
DAG 저장
Airflow 주요 개념
DAG(Directed Acyclic Graph)
- 비순환 그래프(사이클x)로써, 노드와 노드가 단방향으로 연결.
- Airflow에서는 DAG 이용해 워크플로우를 구성하고 어떤 순서로 task를 실행시킬 것인지, dependency를 어떻게 표현할지 결정
- 순차적으로 task실행되며, 논리적 오류 발생 시 교착상태(deadlock) 발생
- 일반적으로 python 코드로 정의하며 $AIRFLOW_HOME/dags에 위치
- dags 폴더의 .py 이름에 'airflow', 'dag' 단어 포함 시 Web UI에 표시
- $AIRFLOW_HOME/airflow.cfg 파일에서 변경
Operator
- DAG 동작 시 실제로 실행되는 task 결정(정의)
- Operator Type
- Action Operator
- 기능이나 명령 실행
- 실제 연산 수행, 데이터 추출 및 프로세싱
- 참고: https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/index.html
- Transfer Operator
- 하나의 시스템에서 다른 시스템으로 옮김(ex. 데이터를 Source에서 Destination으로 전송)
- Sensor Operator
- 조건 만족할 때까지 기다리다가 충족 시 다음 task 실행
- Action Operator
Task & Task Instance
- Task: 데이터 파이프라인에 존재하는 Operator 의미
- Operator 실행 시 task됨
- Task Instance: 데이터 파이프라인 trigger되어 실행 시 생성된 task
- task 실행 순서 정의
- 각 task는 ">>", "<<", "[]" 이용해 task의 의존성 정의하며 dag 그림
Task와 Operator
- 사용자 관점에서 두 용어는 같은 의미지만 task는 작업의 올바른 실행을 보장하기 위한 manager
- 사용자는 operator 사용해서 수행할 작업에 집중하며, airflow는 task를 통해 작업 올바르게 실행
- 사용자는 각 환경별 작업이 잘 이루어지는지 확인하기 위해 operator 내 코드 구성에 집중
- airflow는 각 operator 내의 구성 요소들이 전부 잘 맞아야 작업이 이루어지는 형태
Airflow 기본 동작 원리
- 유저가 DAG 작성
- dags 폴더 아래에 py파일 배치
- Webserver와 Scheduler가 DAG 읽어옴
- Scheduler가 Metastore 이용해 DagRun 오브젝트 생성
- DagRun은 사용자가 작성한 DAG인스턴스 (DagRun Status: Running)
- Scheduler는 DagRun 오브젝트의 인스턴스인 Task Instance Object 스케줄링
- 트리거의 조건이 맞으면 Scheduler가 Task Instance를 Executor로 보냄
- Executor는 Task Instance 실행
- 작업 완료 후 Metastore에 보고
- 완료 Task Instance는 DagRun에 업데이트됨
- Scheduler는 DAG 실행 완료 여부를 Metastore 통해 확인 후 DagRun 상태 바꿈(DagRun Status: Completed)
- Metastore가 Webserver에 업데이트하여 사용자도 확인
DAG 코드 구조
Python 코드로 정의하는 DAG 구조
라이브러리 임포트
DAG와 워크플로우 구성에 필요한 라이브러리 선언
공통 변수 정의
DAG 구성에 사용하기 위해 공통으로 사용하는 변수 정의. 변경 자주 발생 시 Variable 기능 활용
DAG 공통 속성값 정의
DAG 정의 시 필요한 공통 속성 값 정의
DAG 정의
DAG 선언하고 공통 속성값 전달
Task 정의
DAG에 포함될 각 작업(task) 정의. Operator, Sensor, Hook 등 사용
Task 배열
각 작업(Task)들의 순서 나열. '<<', '>>' 같은 Shift 연산자 사용. set_upstream, set_downstream 함수도 사용 가능
ex.
# dags/branch_without_trigger.py
import pendulum
from airflow.decorators import task
from airflow.models import DAG
from airflow.operators.empty import EmptyOperator
dag = DAG(
dag_id="branch_without_trigger",
schedule="@once",
start_date=pendulum.datetime(2019, 2, 28, tz="UTC"),
)
run_this_first = EmptyOperator(task_id="run_this_first", dag=dag)
@task.branch(task_id="branching")
def do_branching():
return "branch_a"
branching = do_branching()
branch_a = EmptyOperator(task_id="branch_a", dag=dag)
follow_branch_a = EmptyOperator(task_id="follow_branch_a", dag=dag)
branch_false = EmptyOperator(task_id="branch_false", dag=dag)
join = EmptyOperator(task_id="join", dag=dag)
run_this_first >> branching
branching >> branch_a >> follow_branch_a >> join
branching >> branch_false >> join
Airflow 장단점
장점
- Python 코드를 이용해 파이프라인 구현하므로 Python에서 구현할 수 있는 대부분의 방법 사용하여 복잡한 파이프라인 생성 가능
- Python 기반으로 쉽게 확장 가능하며 다양한 시스템과 통합 가능 (다양한 유형의 DB, Cloud Service 통합 가능)
- 데이터 인프라 관리, 데이터 웨어하우스 구축, 머신러닝/분석/실험에 사용할 데이터 환경 구성에 유용
- 스케줄링 기능으로 DAG에 정의된 특정 시점에 트리거 가능하며 최종 시점과 다음 스케줄 주기 상세히 알려줌
- 백필 기능 사용하면 과거 데이터 손쉽게 재처리 가능하여 코드 변경 후 재생성 필요한 데이터 재처리 가능
단점
- Python에 익숙하지 않으면 DAG 구성 어려움
- 초기 설치는 간단해보여도 작은 환경 변화에 오류 발생하는 경우 있어 롤백 잦음
주의사항
- Data Streaming Solution 적용에 부적합
- 초단위(그 이하) 데이터 처리 필요한 경우 사용에 부적합
- airflow는 반복적이거나 배치 테스크를 실행하는 기능에 초점 맞춰져 있음
- Data Processing Framework (Flink, Spark, Hadoop 등) 로 사용 부적절
- 데이터 프로세싱 작업에는 최적화 되어있지 않아 매우 느림
- 경우에 따라 메모리 부족 발생
- SparkSubmitOperator와 같은 operator 이용하여 데이터 프로세싱은 Spark와 같은 외부 Framework로 처리
- 파이프라인 규모 커질 시 Python코드 복잡해질 수 있음 -> 초기 사용 시점에 엄격한 관리 필요
'기타' 카테고리의 다른 글
Monitoring - Pull vs Push (1) | 2023.12.02 |
---|