airflow2.0 관련하여 자료가 부족하여, 기록 겸 정리 중입니당 ><
1. airflow에서 작업
airflow Connections
2. dag script 작성
from airflow.providers.mysql.hooks.mysql import MySqlHook
default_args = {
'owner': 'airflow_dbtest',
'depends_on_past': False,
'start_date': datetime(2021, 12, 20 ,tzinfo=local_tz), #dag생성일자
'wait_for_downstream': False,
'mysql_conn_id': 'Mysql_197',
}
class ReturningMySqlOperator(BaseMySqlOperator):
def execute(self, context):
# self.log.info('Executing: %s', self.sql)
print('Executing: {}'.format(self.sql))
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
schema=self.database)
return hook.get_first(self.sql, parameters=self.parameters)
-> mysql_conn_id : 이부분이 airflow Connections Id 와 일치 해야함
with DAG(dag_id="daily_dagtest",
default_args=default_args,
schedule_interval="0 10 * * *"
) as dag_instance:
t1_mysql_get = ReturningMySqlOperator(
task_id='t1_mysql_get',
sql=r'''
SELECT *
FROM Table_name
''',
dag=dag_instance
)
dag 안에서 ReturningMySqlOperator 호출 후 사용
sql=r''' 이곳에 쿼리 작성 '''
반응형
'Infra > MLops' 카테고리의 다른 글
[MLOps][Infra]MLOps에 jenkins CI/CD 적용 가능한 구조 고르기 (0) | 2022.01.18 |
---|---|
[airflow][crontab] 시간 설정 (0) | 2022.01.18 |
[airflow] Dag간 연결 하기 (Connecting between Dags) ⏰ (0) | 2022.01.10 |
[airflow]task Group in Airflow 2.0 ⏰ (0) | 2022.01.10 |
[airflow] slack 연결하기-많은 dag에 적용 (0) | 2022.01.10 |