Как вызвать задачу Airflow, только когда новый раздел / данные доступны в таблице athena AWS с использованием DAG в python?

9

У меня есть scenerio как ниже:

  1. Триггер а Task 1и Task 2только тогда, когда для них доступны новые данные в исходной таблице (Афина). Триггер для Task1 и Task2 должен произойти при новом разделении данных за день.
  2. Триггер Task 3только по завершении Task 1иTask 2
  3. Запустить Task 4только завершениеTask 3

введите описание изображения здесь

Мой код

from airflow import DAG

from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta

from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task1_partition_exists',
    database_name='DB',
    table_name='Table1',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task2_partition_exists',
    database_name='DB',
    table_name='Table2',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

execute_Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task1.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task2 = PostgresOperator(
    task_id='Task2',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task2.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)



execute_Task3 = PostgresOperator(
    task_id='Task3',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task3.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task4 = PostgresOperator(
    task_id='Task4',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task4",
    params={'limit': '50'},
    dag=dag
)



execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)

execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)

execute_Task4.set_upstream(execute_Task3)

Каков наилучший оптимальный способ достижения этого?

Панкай
источник
у вас есть какие-либо проблемы с этим решением?
Бернардо Стеарнс возвращается
@ Bernardostearnsreisen, иногда Task1и Task2идет по кругу. Для меня данные загружаются в исходную таблицу Athena 10 AM CET.
pankaj
продолжая цикл, поток воздуха повторяет задачи 1 и 2 много раз, пока не преуспеет?
Бернардо Стеарнс возвращается
@Bernardostearnsreisen, да точно
pankaj
1
@Bernardostearnsreisen, я не знал, как присуждать награду :)
pankaj

Ответы:

1

Я считаю, что ваш вопрос касается двух основных проблем:

  1. забывая настраивать schedule_intervalявно, чтобы @daily настраивал то, что вы не ожидали.
  2. Как правильно запустить и повторить выполнение dag, когда вы зависите от внешнего события, чтобы завершить выполнение

краткий ответ: явно установите свой schedule_interval с форматом задания cron и время от времени проверяйте с помощью операторов датчиков

default_args={
        'retries': (endtime - starttime)*60/poke_time
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='0 10 * * *')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
     ....
    poke_time= 60*5 #<---- set a poke_time in seconds
    dag=dag)

где startimeв какое время будет начинаться ваше ежедневное задание, в endtimeкакое время дня вы должны проверять, было ли выполнено событие, прежде чем помечать его как неудачное, и poke_timeэто интервал, в течение которого вы sensor_operatorбудете проверять, произошло ли событие.

Как явным образом обратиться к задаче cron, когда вы устанавливаете dag так,@dailyкак вы это делали:

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

из документов вы можете видеть, что вы действительно делаете: @daily - Run once a day at midnight

Что теперь имеет смысл, почему вы получаете ошибку тайм-аута, и терпит неудачу через 5 минут, потому что вы установили 'retries': 1и 'retry_delay': timedelta(minutes=5). Таким образом, он пытается запустить даг в полночь, он терпит неудачу. повторная попытка через 5 минут и повторение неудачи, поэтому он помечается как неудачный.

Таким образом, в основном @daily run устанавливает неявную работу cron:

@daily -> Run once a day at midnight -> 0 0 * * *

Формат задания cron имеет следующий формат, и вы устанавливаете значение в *любое время, когда хотите сказать «все».

Minute Hour Day_of_Month Month Day_of_Week

Таким образом, @daily в основном говорит, что запускайте это каждые: минуты 0 часов 0 всех days_of_month всех месяцев всех days_of_week

Таким образом, ваш случай запускается каждые: минута 0 час 10 всех days_of_month of all_months всех days_of_week. Это переводится в формате cron:

0 10 * * *

Как правильно запустить и повторить выполнение dag, когда вы зависите от внешнего события, чтобы завершить выполнение

  1. Вы можете вызвать прерывание потока воздуха от внешнего события, используя команду airflow trigger_dag. это было бы возможно, если бы вы могли запускать лямбда-скрипт функции / python для настройки вашего экземпляра airflow.

  2. Если вы не можете вызвать dag извне, используйте оператор датчика, как OP, установите для него значение poke_time и установите достаточно большое количество повторных попыток.

Бернардо Стернс Райзен
источник
Спасибо за это. Также, если я хочу запускать задачи, основанные на событии, а не на времени, т. Е. Как только новый раздел данных станет доступен в источнике `AWS Athena Tables`, должна быть запущена следующая задача. Тогда как мне планировать. Достаточно ли подходит мой текущий код?
pankaj
@pankaj, я вижу только две альтернативы. Я не очень разбираюсь в aws athena, но вы можете вызвать прерывание потока воздуха от внешнего события, используя команду airflow trigger_dag. это было бы возможно, если бы вы могли запускать лямбда-скрипт функции / python для настройки вашего экземпляра airflow.
Бернардо Стеарнс возвращается
другая альтернатива - более или менее то, что вы делаете, потому что у вас нет триггера на основе событий, вам нужно периодически проверять, произошло ли это событие. Таким образом, при использовании этого текущего решения будет задано задание cron для диапазона часов, запускающих даг с высокой частотой минут ... многие потерпят неудачу, но после происшествия он сможет довольно быстро поймать
Бернардо Стеарнс вновь появится в
@Bernado, я выяснил, какой пакет в Airflow вызывается AwsGlueCatalogPartitionSensorвместе с командой airflow {{ds_nodash}}для выхода из раздела. Мой вопрос тогда, как запланировать это.
pankaj
@ Benado, Можете ли вы взглянуть на мой код, в котором я реализовал вышеупомянутую проверку, и высказать свое
мнение