У меня есть scenerio как ниже:
- Триггер а
Task 1
иTask 2
только тогда, когда для них доступны новые данные в исходной таблице (Афина). Триггер для Task1 и Task2 должен произойти при новом разделении данных за день. - Триггер
Task 3
только по завершенииTask 1
иTask 2
- Запустить
Task 4
только завершениеTask 3
Мой код
from airflow import DAG
from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task1_partition_exists',
database_name='DB',
table_name='Table1',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task2_partition_exists',
database_name='DB',
table_name='Table2',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
execute_Task1 = PostgresOperator(
task_id='Task1',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task1.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task2 = PostgresOperator(
task_id='Task2',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task2.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task3 = PostgresOperator(
task_id='Task3',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task3.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task4 = PostgresOperator(
task_id='Task4',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task4",
params={'limit': '50'},
dag=dag
)
execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)
execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)
execute_Task4.set_upstream(execute_Task3)
Каков наилучший оптимальный способ достижения этого?
Task1
иTask2
идет по кругу. Для меня данные загружаются в исходную таблицу Athena 10 AM CET.Ответы:
Я считаю, что ваш вопрос касается двух основных проблем:
schedule_interval
явно, чтобы @daily настраивал то, что вы не ожидали.краткий ответ: явно установите свой schedule_interval с форматом задания cron и время от времени проверяйте с помощью операторов датчиков
где
startime
в какое время будет начинаться ваше ежедневное задание, вendtime
какое время дня вы должны проверять, было ли выполнено событие, прежде чем помечать его как неудачное, иpoke_time
это интервал, в течение которого выsensor_operator
будете проверять, произошло ли событие.Как явным образом обратиться к задаче cron, когда вы устанавливаете dag так,
@daily
как вы это делали:из документов вы можете видеть, что вы действительно делаете:
@daily - Run once a day at midnight
Что теперь имеет смысл, почему вы получаете ошибку тайм-аута, и терпит неудачу через 5 минут, потому что вы установили
'retries': 1
и'retry_delay': timedelta(minutes=5)
. Таким образом, он пытается запустить даг в полночь, он терпит неудачу. повторная попытка через 5 минут и повторение неудачи, поэтому он помечается как неудачный.Таким образом, в основном @daily run устанавливает неявную работу cron:
Формат задания cron имеет следующий формат, и вы устанавливаете значение в
*
любое время, когда хотите сказать «все».Minute Hour Day_of_Month Month Day_of_Week
Таким образом, @daily в основном говорит, что запускайте это каждые: минуты 0 часов 0 всех days_of_month всех месяцев всех days_of_week
Таким образом, ваш случай запускается каждые: минута 0 час 10 всех days_of_month of all_months всех days_of_week. Это переводится в формате cron:
Как правильно запустить и повторить выполнение dag, когда вы зависите от внешнего события, чтобы завершить выполнение
Вы можете вызвать прерывание потока воздуха от внешнего события, используя команду
airflow trigger_dag
. это было бы возможно, если бы вы могли запускать лямбда-скрипт функции / python для настройки вашего экземпляра airflow.Если вы не можете вызвать dag извне, используйте оператор датчика, как OP, установите для него значение poke_time и установите достаточно большое количество повторных попыток.
источник
airflow trigger_dag
. это было бы возможно, если бы вы могли запускать лямбда-скрипт функции / python для настройки вашего экземпляра airflow.AwsGlueCatalogPartitionSensor
вместе с командой airflow{{ds_nodash}}
для выхода из раздела. Мой вопрос тогда, как запланировать это.