Infra/MLops

[airflow] mysql연결하기

뚜둔뚜둔 2022. 1. 10. 15:00

airflow2.0 관련하여 자료가 부족하여, 기록 겸 정리 중입니당 ><

 

 

1. airflow에서 작업

airflow Connections

 

admin -&gt; connections -&gt; +(add) click
DB 정보 입력 후 save

 

2. dag script 작성

 

from airflow.providers.mysql.hooks.mysql import MySqlHook

default_args = {
    'owner': 'airflow_dbtest',
    'depends_on_past': False,
    'start_date': datetime(2021, 12, 20 ,tzinfo=local_tz), #dag생성일자

    'wait_for_downstream': False,
    
    'mysql_conn_id': 'Mysql_197',

}


class ReturningMySqlOperator(BaseMySqlOperator):
    def execute(self, context):
        # self.log.info('Executing: %s', self.sql)
        print('Executing: {}'.format(self.sql))
        
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
                         schema=self.database)
        
        return hook.get_first(self.sql, parameters=self.parameters)
 

-> mysql_conn_id : 이부분이 airflow Connections Id 와 일치 해야함

with DAG(dag_id="daily_dagtest",
         default_args=default_args,
         schedule_interval="0 10 * * *"
        ) as dag_instance:
    

    t1_mysql_get = ReturningMySqlOperator(
        task_id='t1_mysql_get',
        sql=r'''
            SELECT *
            FROM Table_name
            ''',
        dag=dag_instance
        )

dag 안에서 ReturningMySqlOperator 호출 후 사용

sql=r''' 이곳에 쿼리 작성 '''

 

 

반응형