batch 작업을 손쉽게 도와주는 Airflow
이번 포스팅에서는 airflow의 metadata DB, executor, operator, decorator 기본 개념과 Dynamic task mapping에 대해 정리하고자 한다.
Airflow

우선 Airflow는 기본적으로 scheduler, workers, metadata DB, webserver로 이루어져 있다.
Scheduler
Scheduler는 DAG 파일을 주기적으로 스캔하고 있다가 실행할 task가 있으면 Executor에게 전달한다.
즉, 어떤 task가 언제 실행되어야 하는지 모니터링하고 있다가 Executor에게 알려준다.
Executor
Executor는 Scheduler가 스케줄링한 task들을 실제로 어디서, 어떻게 실행할지 결정한다.
Scheduler가 "이 task 지금 실행해야 돼"라고 알려주면, Executor가 이를 받아서 실제 실행 환경에 task를 올린다.
Executor는 사용자가 정의할 수 있으며, 그 대표적인 종류는 다음과 같다.
1. SequentialExecutor
여러 task들이 있어도 한 번에 하나씩 순차적으로 실행
가장 단순한 executor로, 보통 로컬 환경에서 주로 사용한다.
2. LocalExecutor

여러 task를 멀티프로세싱으로 로컬에서 병렬 실행
별도의 외부 구성 없이 빠르게 병렬화가 가능하다는 장점이 있다.
CPU 8코어인 서버에서 동시에 8개의 task를 실행한다는 개념이다.
3. CeleryExecutor

여러 work로 task들을 분산시켜서 실행
RabbitMQ나 Redis와 같은 중간 매개체를 두고 있다.
Scheduler는 task instance를 브로커에게 보내고, Celery worker가 이를 수신해 task를 직접 실행한다.
즉, worker가 곧 실행 주체이며 worker는 상시 실행되고 있다. (=자원을 계속 잡아먹고 있다.)
4. KubernetesExecutor

task를 개별 pod를 띄워 그 위에서 실행
Scheduler가 task 실행 판단 후 Executor가 k8s API로 worker를 띄운 후 worker에서 task를 실행한다.
worker는 Celery처럼 상시 실행되고 있지 않으며, 실행할 task가 있을 때 worker가 생성된다. (=자원을 효율적으로 사용)
Metadata DB
Airflow의 Metadata DB는 Airflow 시스템의 상태를 기록하고 공유하는 중앙 저장소이다.
그럼 Scheduler, Executor, Worker는 언제 데이터를 쓰고 읽을까?
1. Scheduler
- DAG 스캔 -> 실행할 Task Instance 생성 -> 상태를 DB에 기록
- Executor에게 task 실행 요청
2. Executor
- 종류에 따라 직접 실행 or Pod/Worker에게 실행 위임
3. Worker(Celery / k8s Pod)
- DB에서 task 정보 읽음
- 실제 task 실행
- task 상태와 실행 결과를 DB에 기록
4. Web server
- Scheduler가 기록한 DAG metadata를 읽어서 UI에 보여줌
Operator
Operator는 실제로 task를 정의하는 객체이다.
종류에 따라 python 코드를 실행시킬 수도 있고, bash 명령어를 실행시킬 수도 있다.
1. PythonOperator : Python 함수 실행
2. BashOperator : shell 커멘드 실행
3. DummyOperator : 아무 작업도 하지 않고, 단순 흐름 제어용
4. DockerOperator : docker 컨테이너에서 작업 실행
5. KubernetesPodOperator : pod를 실행하여 task를 실행
Decorator
Operator와 비슷하게 task를 정의하지만, 약간의 차이점이 있다.
Operator는 만들어진 클래스에 매개변수를 넣어주면서 커스터마이징하는 방식이라면,
Decorator는 task에 대한 흐름을 함수로 직접 작성하는 방식이다.
Task 정의하는 방식
DAG 내 Task를 정의하는 방식은 크게 Operator, Decorator가 있다.
1. Operator 기반
from airflow.operators.python import PythonOperator
def my_func(**kwargs):
print("실행 중!")
task = PythonOperator(
task_id="my_task",
python_callable=my_func,
dag=dag
)
PythonOperator, BashOperator 등과 같이 Operator 클래스를 인스턴스화해 정의하는 방식이다.
task 정의 시점(=DAG 파일이 파싱될 때 =Scheduler가 DAG를 읽을 때)에 모든 정보가 명확히 정해져 있어야 한다.
파라미터를 동적으로 매핑하는 방식은 제한적으로 지원한다.
2. Decorator 기반(TaskFlow API)
from airflow.decorators import task
@task
def add(a, b):
return a + b
@task
def print_result(x):
print(f"결과: {x}")
# DAG 정의 내부에서
result = add(1, 2)
print_result(result)
add(1,2)는 task 정의 시점에는 XComArg라는 Airflow 내부 객체를 반환한다.
XComArg는 task의 출력을 나타내는 placeholder 역할을 하고, 이후 실행 시점에서 실제 값으로 resolve 된다.
DAG 정의 시에는 실행 계획만 세워지고 task 실행 시 Airflow가 값을 XCom에서 가져와 함수로 넘겨주는 deferred execution(지연 실행) 방식이다.
Dynamic Task Mapping
task들을 설계하다보면, task를 static하게 정의할 때도 있지만 동적으로 정의하고 싶을 때도 있다.
이와 같은 상황에서 사용하는 것이 Dynamic Task Mapping이다.
예를 들어, A 작업 -> B 작업 -> C(1) 작업
-> C(2) 작업 이 있다고 가정해보자.
B-C 작업은 하나의 group으로 생성되고, B-C group을 몇 개 생성할지는 A 함수의 return 값으로 정의된다.
A의 반환값이 3이면 B-C 그룹이 3개가 생기는 것이다.
A의 반환값을 토대로 B-C를 동적으로 실행시키고 싶을 때 expand_kwargs 함수를 사용한다.
num = A()
for i in range(num):
B(i)
C(i)
위와 같은 흐름이 A의 반환값을 기반으로 B와 C를 동적으로 실행시키는 흐름이다.
expand_kwargs는 여기서 A로부터 반환값을 받고 for문으로 B-C 그룹을 생성하는걸 자동으로 해준다.
@task_group(group_id='B_C_group')
def group(start: int, end: int):
B(start, end)
C(start, end)
num = A()
group.expand_kwargs(num)
위에서 A의 return 값이 [{1, 2}, {3, 4}] 라고 하면 expand_kwargs가
- {1, 2} 세트를 B와 C에 전달
- {3, 4} 세트를 B와 C에 전달 을 병렬로 바로 해준다.
이때 expand_kwargs로 전달받은 start, end 매개변수는 task 정의 시점에는 XComArg라는 객체로 정의되고,
실제 task가 실행될 때 각 값이 매핑된다. lazy evaluation으로 값이 전달된다.
반면 이와 비슷한 expand는 task를 정의할 때부터 실제 값에 접근할 수 있어야 한다.
expand는 리스트 같은 iterable 값이 task 정의 시점에 결정되어 있어야 한다.
'Data Engineering' 카테고리의 다른 글
| Elasicsearch는 저장소가 아니다 (0) | 2025.05.23 |
|---|---|
| Data Lake vs Data Warehouse: 저장소인가, 체계인가 (0) | 2025.05.23 |
| ETL이 지나고, ELT가 온다: 구조적 전환과 클라우드 시대의 흐름 (1) | 2025.05.23 |
| 병렬 처리의 함정, 그리고 최적화: DataHub 메타데이터 Ingestion 파이프라인 개선기 (0) | 2025.05.20 |
| NAVER DAN24 - CQueryHub (0) | 2025.01.12 |