í¬ì¤í ê°ì
본 í¬ì¤í ì Apache Airflow(ìì´íë¡ì°)ì ëí´ì ì 리íë Airflow ìë¦¬ì¦ í¬ì¤í ì ëë¤.
Airflow í¬ì¤í ììë ìëì ê°ì ììë¡ Airflowì ëí´ì ì 리í´ë³´ë ¤ê³ í©ëë¤.
- Airflowë 무ìì¸ê°? Airflow ì¤ì¹ ë°©ë²ê³¼ ê°ë¨í ìì ( https://lsjsj92.tistory.com/631 )
- Airflow branch(ë¶ê¸°) ìì ë° airflow íì´ì¬(Python) operator íì© ê°ë¨ ìì ( https://lsjsj92.tistory.com/632 )
- Airflow를 íì©í 머ì ë¬ë ( Machine Learning ) ìì (본 í¬ì¤í )
- Airflow slack ë©ì¸ì§ë¡ ìë ë°ê¸° ìì (https://lsjsj92.tistory.com/634)
ìì ê°ì´ ì´ 4ê°ì§ì ê¸ì ìì±í ìì ì´ë©° 본 ê¸ì Airflow ì¸ ë²ì§¸ í¬ì¤í ì¼ë¡ Airflow를 íì©í Machine Learning ìì (example) ì½ë를 ì´í´ë³´ë í¬ì¤í ì ëë¤.
í´ë¹ í¬ì¤í ì ìì±íë©´ì ì°¸ê³ í ê¸ì ìëì ê°ìµëë¤.
- https://github.com/apache/airflow
- https://airflow.apache.org/docs/apache-airflow/stable/installation/index.html
- https://airflow.apache.org/docs/apache-airflow/stable/start/local.html
- https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html
- https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html
- https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html
- https://www.kaggle.com/c/titanic
본 í¬ì¤í ì리ì¦ìì ì¬ì©ëë ì½ëë ìë github repoìì íì¸íì¤ ì ììµëë¤.
GitHub - lsjsj92/airflow_tutorial: python airflow tutorial and example
python airflow tutorial and example. Contribute to lsjsj92/airflow_tutorial development by creating an account on GitHub.
github.com
í¬ì¤í 본문
í¬ì¤í ê°ìììë ì¸ê¸íìë¯ì´ ì´ë² í¬ì¤í ì airflow를 íì©í 머ì ë¬ë(Machine Learning) ìì (example)를 ì´í´ë´ ëë¤.
본 í¬ì¤í ìì ì§íí ì ì Airflowì Python íê²½ì ë¤ìê³¼ ê°ìµëë¤.
- Airflow ì¤ì¹ë ê²½ë¡ : /Users/user_name/airflow
- Python ë²ì : Python3.8
- Airflow ë²ì : 2.2.3
- OS : Mac Pro
Airflow Machine Learning ìì
본 í¬ì¤í ììë 머ì ë¬ë(Machine Learning) ì½ë를 Airflow DAGì íì©í©ëë¤. 본 í¬ì¤í ìì ì¬ì©í ë°ì´í°ë kaggleìì ì ê³µí´ì£¼ë íì´íë(titanic) ë°ì´í°ë¥¼ íì©í©ëë¤. íì´íë ë°ì´í°ë ìºê¸ì ì 문í ë ë§ì´ ì í ì ìë ë°ì´í° ì ì´ë©° ìì¡´ ì¬ë¶ë¥¼ ì측íëë° ì¬ì©íë ë°ì´í° ì ì ëë¤. íì´íë ë°ì´í°ë í¬ì¤í ê°ìì ìë ìºê¸ íì´íë ë§í¬ìì ë¤ì´ ë°ì¼ì¤ ì ììµëë¤.
ëí, 본 í¬ì¤í ìì ì¬ì©í ì ì²´ ì½ëë í¬ì¤í ê°ìì ì¸ê¸í ì githubì ì ì²´ ìì¤ ì½ëê° ì ë¡ë ëì´ ìì¼ë ì°¸ê³ íìë©´ ëê² ìµëë¤.
airflow íì¼ êµ¬ì¡°ë ìë ì¬ì§ê³¼ ê°ìµëë¤.

Titanic machine learning code ì¤ëª
íì´íë ë°ì´í° ì ì íì©í 머ì ë¬ë ì½ëë ìë¹í ë§ìµëë¤. kaggleìë ë¤ìí ìì ê° ì¬ë¼ììê³ ì¢ì ì±ë¥ì ë³´ì¬ì£¼ë ë¤ìí ìì ê° ììµëë¤. 본 ì½ëììë íì´ëí± ëª¨ë¸ì ì§ì¤í기 ë³´ë¤ë airflow íê²½ììì 머ì ë¬ë ì½ë를 ëë ¤ë³´ë ê²ì´ 목ì ì´ê¸° ë문ì ë§¤ì° ê°ë¨íê² ë°ì´í°ë¥¼ ì²ë¦¬íê³ ëª¨ë¸ë ê°ë¨íê² ì¬ì©íë¤ë ê²ì 미리 ìë ¤ë립ëë¤.
ì ê° êµ¬ì±í íì´íë 머ì ë¬ë ì½ë 구ì±ì ë¤ìê³¼ ê°ìµëë¤.

ê° íì¼ì ëí ì¤ëª ì ìëì ê°ìµëë¤.
- config.py
- Titanic machine learningì ë리기 ìí ê°ì¢ ì í ê°ì ì¤ì
- dataio.py
- ë°ì´í°ë¥¼ ì½ì´ì¤ê±°ë íì ë°ì´í°ë¥¼ ì¶ì¶íë ìì ì ìí
- model.py
- 머ì ë¬ë ëª¨ë¸ ì í
- machine learning model training ìí
- preprocess.py
- ê°ì¢ ì ì²ë¦¬ ìì ìí
- titanic.py
- titanic machine learningì ë©ì¸ íì¼
ì¬ê¸°ì ì¬ì¤ì ë©ì¸ ì½ëë titanic.py íì¼ì´ê¸° ë문ì titanic.py íì¼ì ë´ì©ë§ ì´í´ë³´ê² ìµëë¤.
titanic.py
class TitanicMain(TitanicPreprocess, PathConfig, TitanicModeling, DataIOSteam):
def __init__(self):
TitanicPreprocess.__init__(self)
PathConfig.__init__(self)
TitanicModeling.__init__(self)
DataIOSteam.__init__(self)
def prepro_data(self, f_name, **kwargs):
# fname = train.csv
data = self.get_data(self.titanic_path, f_name)
data = self.run_preprocessing(data)
data.to_csv(f"{self.titanic_path}/prepro_titanic.csv", index=False)
kwargs['task_instance'].xcom_push(key='prepro_csv', value=f"{self.titanic_path}/prepro_titanic")
return "end prepro"
def run_modeling(self, n_estimator, flag, **kwargs):
# n_estimator = 100
f_name = kwargs["task_instance"].xcom_pull(key='prepro_csv')
data = self.get_data(self.titanic_path, f_name, flag)
X, y = self.get_X_y(data)
model_info = self.run_sklearn_modeling(X, y, n_estimator)
kwargs['task_instance'].xcom_push(key='result_msg', value=model_info)
return "end modeling"
titanic.py ì½ëìë titanic ë°ì´í°ë¥¼ ê°ì ¸ì¤ê³ ì ì²ë¦¬ë¥¼ ìííê³ ë¨¸ì ë¬ë ëª¨ë¸ íë ¨ì ìííë 모ë ì¼ë ¨ì ìì ì´ ë´ê²¨ ììµëë¤. ì´ 2ê°ì í¨ìë¡ êµ¬ì±ëì´ ìì¼ë©° ê° í¨ìë Airflow DAG íì¼ ììì taskë¡ êµ¬ì±ë©ëë¤.
íì´íë 머ì ë¬ë ì½ëê° ëìëë ììë ìëì ê°ìµëë¤.
- ìµì´ íì´íë ë°ì´í°ë¥¼ load ( prepro_data )
- íì´íë ë°ì´í° ì ì²ë¦¬ ë° prepro_csvë¡ ì ì¥ ( prepro_data )
- preprocessingì ê±°ì¹ íì¼ì ê²½ë¡ë¥¼ Airflow XComì ì ì¥ ( prepro_data )
- machine learning modeling ìí ( run_modeling )
- Airflow XComìì ì ì¥ë ì ì²ë¦¬ ë°ì´í° ê²½ë¡ë¥¼ ê°ì ¸ì´( run_modeling )
- titanic ì ì²ë¦¬ ë°ì´í° load ( run_modeling )
- model training ( run_modeling )
- model ì ë³´ Airflow XComì ì ì¥ ( run_modeling )
- ì¢ ë£
Airflow machine learning DAG ìì ì½ë
ë¤ìì Airflow DAG codeì ëí´ì ì´í´ë´ ëë¤. 기본ì ì¼ë¡ ì í¬ì¤í ìì ì´í´ë³¸ DAGì ëì¼í©ëë¤. ë¤ë§ ìì ëìíë ì½ëê° ë¨¸ì ë¬ë ì½ëì¼ ë¿ì ëë¤. ê·¸ ì½ëë ìëì ê°ìµëë¤.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from MLproject.titanic import *
titanic = TitanicMain()
def print_result(**kwargs):
r = kwargs["task_instance"].xcom_pull(key='result_msg')
print("message : ", r)
with DAG( **dag_args ) as dag:
start = BashOperator(
task_id='start',
bash_command='echo "start!"',
)
prepro_task = PythonOperator(
task_id='preprocessing',
python_callable=titanic.prepro_data,
op_kwargs={'f_name': "train"}
)
modeling_task = PythonOperator(
task_id='modeling',
python_callable=titanic.run_modeling,
op_kwargs={'n_estimator': 100, 'flag' : True}
)
msg = PythonOperator(
task_id='msg',
python_callable=print_result
)
complete = BashOperator(
task_id='complete_bash',
bash_command='echo "complete~!"',
)
start >> prepro_task >> modeling_task >> msg >> complete
본 DAG ì½ëìì íµì¬ ì½ë ì¤ëª ì ìëì ê°ìµëë¤.
1. íì ë¼ì´ë¸ë¬ë¦¬ import
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from MLproject.titanic import *
titanic = TitanicMain()
본 airflow dag ìì ììë Pythonì íì©í©ëë¤. ë°ë¼ì íìí Operatorì¸ Python operatorì bash operator를 ê°ì ¸ìµëë¤.
ê·¸ë¦¬ê³ ììì ì ìí íì´íë 머ì ë¬ë ì½ë를 ì¤íí ì½ë를 import íê³ ê°ì²´ë¥¼ ìì±í©ëë¤.
2. Python Operator ì¤ì
ë¤ìì¼ë¡ Python operator를 ì¬ì©í´ ê° task를 ì ìí©ëë¤.
start = BashOperator(
task_id='start',
bash_command='echo "start!"',
)
prepro_task = PythonOperator(
task_id='preprocessing',
python_callable=titanic.prepro_data,
op_kwargs={'f_name': "train"}
)
modeling_task = PythonOperator(
task_id='modeling',
python_callable=titanic.run_modeling,
op_kwargs={'n_estimator': 100, 'flag' : True}
)
msg = PythonOperator(
task_id='msg',
python_callable=print_result
)
complete = BashOperator(
task_id='complete_bash',
bash_command='echo "complete~!"',
)
ì¬ê¸°ì 3ê°ì Python operator를 ì¬ì©í´ì 3ê°ì task를 ì ìí©ëë¤. ê° taskì ëí ì¤ëª ì ìëì ê°ìµëë¤.
- prepro_task
- íì´íë ë°ì´í°ë¥¼ ì ì²ë¦¬ ìííë DAG task
- titanic.pyì ìë prepro_data í¨ì를 python_callableì ì¬ì©
- prepro_data í¨ììì ì¬ì©íë parameter f_nameì op_kwargsë¡ ë³´ë´ì¤
- modeling_task
- ì ì²ë¦¬ ë íì´íë ë°ì´í°ë¥¼ íì©í´ 머ì ë¬ë 모ë¸ë§ ìí
- titanic.pyì ìë run_modeling í¨ì를 python_callableì ì¬ì©
- run_modeling í¨ììì ì¬ì©íë n_estimatorì flag를 op_kwargsë¡ ë³´ë´ì¤
- msg
- ìì ë°ì ê²°ê³¼ ê°ì XComìì ê°ì ¸ìì ì¶ë ¥
ì¦, titanic.pyìì ì ìë í¨ì를 python operatorì python_callableì ì§ì íê³ í´ë¹ í¨ì를 ëìíëë¡ ì§ì í©ëë¤.
ê° taskë íì´íë ë°ì´í° ì ì²ë¦¬, íì´íìµ ë°ì´í°ë¥¼ íì©í machine learning model training ìí, 결과를 ì¶ë ¥íë ìí ì ë´ë¹íê³ ììµëë¤.
3. task relationship ì¤ì
ë§ì§ë§ì¼ë¡ dag taskì relationshipì ì¤ì í©ëë¤.
start >> prepro_task >> modeling_task >> msg >> complete
ì relationshipì ë³´ë©´ ê° taskë ììëë¡ ì§íë©ëë¤.
- ìì
- íì´íë ë°ì´í° ì ì²ë¦¬ task ìí
- ì ì²ë¦¬ ë íì´íë ë°ì´í° 머ì ë¬ë 모ë¸ë§ ìí
- ë©ì¸ì§ ì¶ë ¥
- ìë£
ì¤í ê²°ê³¼
ì DAG를 Airflow Schedulerì ë±ë¡ìí¤ê³ ì¤íìì¼ ë´ ëë¤. 먼ì , command line ìì DAGê° ì ëë¡ ë±ë¡ì´ ëìëì§ íì¸í´ë´ ëë¤. ëª ë ¹ì´ë aiflow dags list, airflow tasks list {dag-id} --tree ì ëë¤.
aiflow dags list
airflow tasks list {dag-id} --tree


ì ë±ë¡ëìë¤ë©´ ìì ê°ì´ aiflow dag listì ëì¤ë ê²ì íì¸í ì ììµëë¤.
ì´ì airflow web server를 ì¤íìì¼ web uiìì íì¸í´ë´ ëë¤.

ë°©ê¸ ë±ë¡í dagì¸ tutorial-ml-opë¼ë dagê° ë±ë¡ë ê²ì íì¸í ì ììµëë¤.
í´ë¹ dag를 í´ë¦í´ë³´ë©´ tree, graph ë± ë¤ìí íë©´ì íì¸í ì ììê²ëë¤.
ì´ì í´ë¹ dag를 ì¤íìì¼ë´ ëë¤. ì¤íìí¤ë©´ ìê¹ ì ìí task relationshipì²ë¼ ììëë¡ taskê° ì¤íëë ê²ì íì¸í ì ììµëë¤.
ì¤íí ê²°ê³¼ë ìëì ê°ì´ graphì tree ííë¡ íì¸í ì ììµëë¤.


ê·¸ ì¸ìë Gantt íì ë¤ì´ê°ë©´ ì´ëì ì¼ë§ë ìê°ì´ ê±¸ë ¸ëì§ ì ë³´ë íì¸í ì ììµëë¤.

ì machine learning dag를 ì¤íìí¤ë©´ 맨 ë§ì§ë§ì msg taskìì 결과를 printí©ëë¤.
ì´ ê²°ê³¼ ê°ì airflow XCom ê²°ê³¼ ê°ì pullë¡ ê°ì ¸ìì ì¶ë ¥í´ì£¼ëë°ì. ê·¸ ê²°ê³¼ë ìë ì¬ì§ê³¼ ê°ìµëë¤.

ì ê° ì¤ì í íì´íë 머ì ë¬ë ë©ì¸ì§ë modelì metricê³¼ parameter ê°ì ì¶ë ¥íëë¡ íììµëë¤.
scoreë accuracy를 기ì¤ì¼ë¡ íìì¼ë©° ì ê° ì¤íìí¨ machine learning 모ë¸ì íì´íë ìì¡´ ì측 ê²°ê³¼ì ëí´ 80% ì íë를 ë³´ììµëë¤.
기í ë¤ë¥¸ ë°©ë²
ì´ë ê² í¨ìë¡ ì¤íìí¤ì§ ìê³ bash shellì ì´ì©íë Bash Operator를 ì´ì©í´ì ë°ë¡ ì¤íìí¬ ìë ììµëë¤.
bash_command = """
python3 /user/leesoojin/airflow/MLProject/titanic.py
"""
dag_args = dict(
dag_id="tutorial-ml-op",
default_args=default_args,
description='tutorial DAG ml',
schedule_interval=timedelta(minutes=50),
start_date=datetime(2022, 2, 1),
tags=['example-sj'],
)
with DAG( **dag_args ) as dag:
train = BashOperator(
task_id='train',
bash_command=bash_command
)
train
ë§ë¬´ë¦¬
본 í¬ì¤í ììë airflow dag ìì ì¤ ë¨¸ì ë¬ë(Machine Learning) ìì 를 ì´í´ë³´ììµëë¤.
ëêµ°ê°ìê² ëìì´ ëì길 ë°ë¼ë©° ì½ì´ì£¼ì ì ê°ì¬í©ëë¤.
ì ìê² ì°ë½ì 주ìê³ ì¶ì¼ì ê²ì´ ìì¼ìë¤ë©´
- Linkedin : https://www.linkedin.com/in/lsjsj92/
- github : https://github.com/lsjsj92
- ë¸ë¡ê·¸ ëê¸ ëë ë°©ëª ë¡
ì¼ë¡ ì°ë½ì£¼ìë©´ ê°ì¬íê² ìµëë¤.