Infra/MLops
[airflow] mysql연결하기
뚜둔뚜둔
2022. 1. 10. 15:00
airflow2.0 관련하여 자료가 부족하여, 기록 겸 정리 중입니당 ><
1. airflow에서 작업
airflow Connections
2. dag script 작성
from airflow.providers.mysql.hooks.mysql import MySqlHook
default_args = {
'owner': 'airflow_dbtest',
'depends_on_past': False,
'start_date': datetime(2021, 12, 20 ,tzinfo=local_tz), #dag생성일자
'wait_for_downstream': False,
'mysql_conn_id': 'Mysql_197',
}
class ReturningMySqlOperator(BaseMySqlOperator):
def execute(self, context):
# self.log.info('Executing: %s', self.sql)
print('Executing: {}'.format(self.sql))
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
schema=self.database)
return hook.get_first(self.sql, parameters=self.parameters)
-> mysql_conn_id : 이부분이 airflow Connections Id 와 일치 해야함
with DAG(dag_id="daily_dagtest",
default_args=default_args,
schedule_interval="0 10 * * *"
) as dag_instance:
t1_mysql_get = ReturningMySqlOperator(
task_id='t1_mysql_get',
sql=r'''
SELECT *
FROM Table_name
''',
dag=dag_instance
)
dag 안에서 ReturningMySqlOperator 호출 후 사용
sql=r''' 이곳에 쿼리 작성 '''
반응형