Проблема
Есть ли способ в Airflow создать рабочий процесс, при котором количество задач B. * неизвестно до завершения задачи A? Я просмотрел вложенные теги, но похоже, что он может работать только со статическим набором задач, которые должны быть определены при создании Dag.
Сработают ли кинжальные триггеры? И если да, то не могли бы вы привести пример.
У меня проблема, когда невозможно узнать количество задач B, которые потребуются для расчета задачи C, пока задача A не будет завершена. Для вычисления каждой задачи B. * потребуется несколько часов, и ее нельзя объединить.
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
Идея # 1
Мне не нравится это решение, потому что мне нужно создать блокирующий ExternalTaskSensor, и выполнение всей задачи B. * займет от 2 до 24 часов. Так что я не считаю это жизнеспособным решением. Неужто есть способ попроще? Или Airflow не предназначен для этого?
Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C
Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
|-- Task B.1 --|
|-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
| .... |
|-- Task B.N --|
Изменить 1:
На данный момент на этот вопрос все еще нет однозначного ответа . Со мной связались несколько человек, ищущих решение.
Ответы:
Вот как я сделал это с аналогичным запросом без каких-либо подтегов:
Сначала создайте метод, который возвращает любые значения, которые вы хотите
def values_function(): return values
Затем создайте метод, который будет динамически генерировать задания:
def group(number, **kwargs): #load the values if needed in the command you plan to execute dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}" return BashOperator( task_id='JOB_NAME_{}'.format(number), bash_command='script.sh {} {}'.format(dyn_value, number), dag=dag)
А затем объедините их:
push_func = PythonOperator( task_id='push_func', provide_context=True, python_callable=values_function, dag=dag) complete = DummyOperator( task_id='All_jobs_completed', dag=dag) for i in values_function(): push_func >> group(i) >> complete
источник
for i in values_function()
я ожидал чего-то вродеfor i in push_func_output
. Проблема в том, что я не могу найти способ получить этот вывод динамически. Результат PythonOperator после выполнения будет в Xcom, но я не знаю, могу ли я ссылаться на него из определения DAG.group
функции быть вторая цепочка зависимостей?Я придумал, как создавать рабочие процессы на основе результатов предыдущих задач.
По сути, вы хотите создать два вложенных тега со следующим:
def return_list()
)parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]
), возможно, здесь можно было бы добавить больше фильтров.dag_id='%s.%s' % (parent_dag_name, 'test1')
Теперь я протестировал это в своей локальной установке воздушного потока, и он отлично работает. Я не знаю, будут ли проблемы с вытягивающей частью xcom, если одновременно запущено более одного экземпляра dag, но тогда вы, вероятно, либо используете уникальный ключ, либо что-то в этом роде, чтобы однозначно идентифицировать xcom ценность, которую вы хотите. Вероятно, можно было бы оптимизировать 3. шаг, чтобы быть на 100% уверенным в получении конкретной задачи текущего основного dag, но для моего использования это работает достаточно хорошо, я думаю, что для использования xcom_pull нужен только один объект task_instance.
Также я очищаю xcoms для первого субдага перед каждым выполнением, просто чтобы убедиться, что я случайно не получу неправильное значение.
Я плохо объясняю, поэтому надеюсь, что следующий код все прояснит:
test1.py
from airflow.models import DAG import logging from airflow.operators.python_operator import PythonOperator from airflow.operators.postgres_operator import PostgresOperator log = logging.getLogger(__name__) def test1(parent_dag_name, start_date, schedule_interval): dag = DAG( '%s.test1' % parent_dag_name, schedule_interval=schedule_interval, start_date=start_date, ) def return_list(): return ['test1', 'test2'] list_extract_folder = PythonOperator( task_id='list', dag=dag, python_callable=return_list ) clean_xcoms = PostgresOperator( task_id='clean_xcoms', postgres_conn_id='airflow_db', sql="delete from xcom where dag_id='{{ dag.dag_id }}'", dag=dag) clean_xcoms >> list_extract_folder return dag
test2.py
from airflow.models import DAG, settings import logging from airflow.operators.dummy_operator import DummyOperator log = logging.getLogger(__name__) def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None): dag = DAG( '%s.test2' % parent_dag_name, schedule_interval=schedule_interval, start_date=start_date ) if len(parent_dag.get_active_runs()) > 0: test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull( dag_id='%s.%s' % (parent_dag_name, 'test1'), task_ids='list') if test_list: for i in test_list: test = DummyOperator( task_id=i, dag=dag ) return dag
и основной рабочий процесс:
test.py
from datetime import datetime from airflow import DAG from airflow.operators.subdag_operator import SubDagOperator from subdags.test1 import test1 from subdags.test2 import test2 DAG_NAME = 'test-dag' dag = DAG(DAG_NAME, description='Test workflow', catchup=False, schedule_interval='0 0 * * *', start_date=datetime(2018, 8, 24)) test1 = SubDagOperator( subdag=test1(DAG_NAME, dag.start_date, dag.schedule_interval), task_id='test1', dag=dag ) test2 = SubDagOperator( subdag=test2(DAG_NAME, dag.start_date, dag.schedule_interval, parent_dag=dag), task_id='test2', dag=dag ) test1 >> test2
источник
_ _init_ _.py
в папку subdags. ошибка новичкаДа, это возможно, я создал пример DAG, который демонстрирует это.
import airflow from airflow.operators.python_operator import PythonOperator import os from airflow.models import Variable import logging from airflow import configuration as conf from airflow.models import DagBag, TaskInstance from airflow import DAG, settings from airflow.operators.bash_operator import BashOperator main_dag_id = 'DynamicWorkflow2' args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(2), 'provide_context': True } dag = DAG( main_dag_id, schedule_interval="@once", default_args=args) def start(*args, **kwargs): value = Variable.get("DynamicWorkflow_Group1") logging.info("Current DynamicWorkflow_Group1 value is " + str(value)) def resetTasksStatus(task_id, execution_date): logging.info("Resetting: " + task_id + " " + execution_date) dag_folder = conf.get('core', 'DAGS_FOLDER') dagbag = DagBag(dag_folder) check_dag = dagbag.dags[main_dag_id] session = settings.Session() my_task = check_dag.get_task(task_id) ti = TaskInstance(my_task, execution_date) state = ti.current_state() logging.info("Current state of " + task_id + " is " + str(state)) ti.set_state(None, session) state = ti.current_state() logging.info("Updated state of " + task_id + " is " + str(state)) def bridge1(*args, **kwargs): # You can set this value dynamically e.g., from a database or a calculation dynamicValue = 2 variableValue = Variable.get("DynamicWorkflow_Group2") logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue)) logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue)) os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue)) variableValue = Variable.get("DynamicWorkflow_Group2") logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue)) # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460 for i in range(dynamicValue): resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date'])) def bridge2(*args, **kwargs): # You can set this value dynamically e.g., from a database or a calculation dynamicValue = 3 variableValue = Variable.get("DynamicWorkflow_Group3") logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue)) logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue)) os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue)) variableValue = Variable.get("DynamicWorkflow_Group3") logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue)) # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460 for i in range(dynamicValue): resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date'])) def end(*args, **kwargs): logging.info("Ending") def doSomeWork(name, index, *args, **kwargs): # Do whatever work you need to do # Here I will just create a new file os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt') starting_task = PythonOperator( task_id='start', dag=dag, provide_context=True, python_callable=start, op_args=[]) # Used to connect the stream in the event that the range is zero bridge1_task = PythonOperator( task_id='bridge1', dag=dag, provide_context=True, python_callable=bridge1, op_args=[]) DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1") logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1)) for index in range(int(DynamicWorkflow_Group1)): dynamicTask = PythonOperator( task_id='firstGroup_' + str(index), dag=dag, provide_context=True, python_callable=doSomeWork, op_args=['firstGroup', index]) starting_task.set_downstream(dynamicTask) dynamicTask.set_downstream(bridge1_task) # Used to connect the stream in the event that the range is zero bridge2_task = PythonOperator( task_id='bridge2', dag=dag, provide_context=True, python_callable=bridge2, op_args=[]) DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2") logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2)) for index in range(int(DynamicWorkflow_Group2)): dynamicTask = PythonOperator( task_id='secondGroup_' + str(index), dag=dag, provide_context=True, python_callable=doSomeWork, op_args=['secondGroup', index]) bridge1_task.set_downstream(dynamicTask) dynamicTask.set_downstream(bridge2_task) ending_task = PythonOperator( task_id='end', dag=dag, provide_context=True, python_callable=end, op_args=[]) DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3") logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3)) for index in range(int(DynamicWorkflow_Group3)): # You can make this logic anything you'd like # I chose to use the PythonOperator for all tasks # except the last task will use the BashOperator if index < (int(DynamicWorkflow_Group3) - 1): dynamicTask = PythonOperator( task_id='thirdGroup_' + str(index), dag=dag, provide_context=True, python_callable=doSomeWork, op_args=['thirdGroup', index]) else: dynamicTask = BashOperator( task_id='thirdGroup_' + str(index), bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt', dag=dag) bridge2_task.set_downstream(dynamicTask) dynamicTask.set_downstream(ending_task) # If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream # and your tasks will run simultaneously instead of in your desired stream order. starting_task.set_downstream(bridge1_task) bridge1_task.set_downstream(bridge2_task) bridge2_task.set_downstream(ending_task)
Перед запуском DAG создайте эти три переменные воздушного потока.
airflow variables --set DynamicWorkflow_Group1 1 airflow variables --set DynamicWorkflow_Group2 0 airflow variables --set DynamicWorkflow_Group3 0
Вы увидите, что DAG идет от этого
К этому после того, как он побежал
Вы можете увидеть больше информации об этой группе доступности базы данных в моей статье о создании динамических рабочих процессов в Airflow .
источник
OA: «Есть ли способ в Airflow создать рабочий процесс, при котором количество задач B. * неизвестно до завершения задачи A?»
Короткий ответ - нет. Перед запуском Airflow создаст поток DAG.
При этом мы пришли к простому выводу, что у нас нет такой потребности. Если вы хотите распараллелить некоторую работу, вы должны оценить имеющиеся у вас ресурсы, а не количество элементов для обработки.
Мы сделали это так: мы динамически генерируем фиксированное количество задач, скажем 10, которые разделяют работу. Например, если нам нужно обработать 100 файлов, каждая задача будет обрабатывать 10 из них. Я отправлю код позже сегодня.
Обновить
Вот код, извините за задержку.
from datetime import datetime, timedelta import airflow from airflow.operators.dummy_operator import DummyOperator args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 8), 'email': ['myemail@gmail.com'], 'email_on_failure': True, 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(seconds=5) } dag = airflow.DAG( 'parallel_tasks_v1', schedule_interval="@daily", catchup=False, default_args=args) # You can read this from variables parallel_tasks_total_number = 10 start_task = DummyOperator( task_id='start_task', dag=dag ) # Creates the tasks dynamically. # Each one will elaborate one chunk of data. def create_dynamic_task(current_task_number): return DummyOperator( provide_context=True, task_id='parallel_task_' + str(current_task_number), python_callable=parallelTask, # your task will take as input the total number and the current number to elaborate a chunk of total elements op_args=[current_task_number, int(parallel_tasks_total_number)], dag=dag) end = DummyOperator( task_id='end', dag=dag) for page in range(int(parallel_tasks_total_number)): created_task = create_dynamic_task(page) start_task >> created_task created_task >> end
Пояснение к коду:
Здесь у нас есть одна начальная задача и одна конечная задача (обе фиктивные).
Затем из начальной задачи с циклом for мы создаем 10 задач с одним и тем же вызываемым Python. Задачи создаются в функции create_dynamic_task.
Каждому вызываемому объекту python мы передаем в качестве аргументов общее количество параллельных задач и текущий индекс задачи.
Предположим, у вас есть 1000 элементов, которые нужно проработать: первая задача получит на входе информацию о том, что она должна разработать первый кусок из 10. Он разделит 1000 элементов на 10 частей и разработает первый.
источник
parallelTask
не определена: я что-то упускаю ?Я думаю, что вы ищете динамическое создание DAG. Я столкнулся с подобной ситуацией несколько дней назад после некоторого поиска, я нашел этот блог .
Генерация динамических задач
start = DummyOperator( task_id='start', dag=dag ) end = DummyOperator( task_id='end', dag=dag) def createDynamicETL(task_id, callableFunction, args): task = PythonOperator( task_id = task_id, provide_context=True, #Eval is used since the callableFunction var is of type string #while the python_callable argument for PythonOperators only receives objects of type callable not strings. python_callable = eval(callableFunction), op_kwargs = args, xcom_push = True, dag = dag, ) return task
Настройка рабочего процесса DAG
with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f: # Use safe_load instead to load the YAML file configFile = yaml.safe_load(f) # Extract table names and fields to be processed tables = configFile['tables'] # In this loop tasks are created for each table defined in the YAML file for table in tables: for table, fieldName in table.items(): # In our example, first step in the workflow for each table is to get SQL data from db. # Remember task id is provided in order to exchange data among tasks generated in dynamic way. get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table), 'getSQLData', {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass', 'dbname': configFile['dbname']}) # Second step is upload data to s3 upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table), 'uploadDataToS3', {'previous_task_id': '{}-getSQLData'.format(table), 'bucket_name': configFile['bucket_name'], 'prefix': configFile['prefix']}) # This is where the magic lies. The idea is that # once tasks are generated they should linked with the # dummy operators generated in the start and end tasks. # Then you are done! start >> get_sql_data_task get_sql_data_task >> upload_to_s3_task upload_to_s3_task >> end
Так выглядит наш DAG после сборки кода
import yaml import airflow from airflow import DAG from datetime import datetime, timedelta, time from airflow.operators.python_operator import PythonOperator from airflow.operators.dummy_operator import DummyOperator start = DummyOperator( task_id='start', dag=dag ) def createDynamicETL(task_id, callableFunction, args): task = PythonOperator( task_id=task_id, provide_context=True, # Eval is used since the callableFunction var is of type string # while the python_callable argument for PythonOperators only receives objects of type callable not strings. python_callable=eval(callableFunction), op_kwargs=args, xcom_push=True, dag=dag, ) return task end = DummyOperator( task_id='end', dag=dag) with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f: # use safe_load instead to load the YAML file configFile = yaml.safe_load(f) # Extract table names and fields to be processed tables = configFile['tables'] # In this loop tasks are created for each table defined in the YAML file for table in tables: for table, fieldName in table.items(): # In our example, first step in the workflow for each table is to get SQL data from db. # Remember task id is provided in order to exchange data among tasks generated in dynamic way. get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table), 'getSQLData', {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass', 'dbname': configFile['dbname']}) # Second step is upload data to s3 upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table), 'uploadDataToS3', {'previous_task_id': '{}-getSQLData'.format(table), 'bucket_name': configFile['bucket_name'], 'prefix': configFile['prefix']}) # This is where the magic lies. The idea is that # once tasks are generated they should linked with the # dummy operators generated in the start and end tasks. # Then you are done! start >> get_sql_data_task get_sql_data_task >> upload_to_s3_task upload_to_s3_task >> end
Это было очень помогло, полная надежда, Это также поможет кому-то другому
источник
Я думаю, что нашел более приятное решение для этого на https://github.com/mastak/airflow_multi_dagrun , в котором используется простая постановка DagRuns в очередь путем запуска нескольких dagruns, аналогичных TriggerDagRuns . Большая часть благодарностей идет на https://github.com/mastak , хотя мне пришлось исправить некоторые детали, чтобы он работал с самым последним воздушным потоком.
В решении используется специальный оператор, запускающий несколько DagRuns :
from airflow import settings from airflow.models import DagBag from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator from airflow.utils.decorators import apply_defaults from airflow.utils.state import State from airflow.utils import timezone class TriggerMultiDagRunOperator(TriggerDagRunOperator): CREATED_DAGRUN_KEY = 'created_dagrun_key' @apply_defaults def __init__(self, op_args=None, op_kwargs=None, *args, **kwargs): super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs) self.op_args = op_args or [] self.op_kwargs = op_kwargs or {} def execute(self, context): context.update(self.op_kwargs) session = settings.Session() created_dr_ids = [] for dro in self.python_callable(*self.op_args, **context): if not dro: break if not isinstance(dro, DagRunOrder): dro = DagRunOrder(payload=dro) now = timezone.utcnow() if dro.run_id is None: dro.run_id = 'trig__' + now.isoformat() dbag = DagBag(settings.DAGS_FOLDER) trigger_dag = dbag.get_dag(self.trigger_dag_id) dr = trigger_dag.create_dagrun( run_id=dro.run_id, execution_date=now, state=State.RUNNING, conf=dro.payload, external_trigger=True, ) created_dr_ids.append(dr.id) self.log.info("Created DagRun %s, %s", dr, now) if created_dr_ids: session.commit() context['ti'].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids) else: self.log.info("No DagRun created") session.close()
Затем вы можете отправить несколько дагрунов из вызываемой функции в вашем PythonOperator, например:
from airflow.operators.dagrun_operator import DagRunOrder from airflow.models import DAG from airflow.operators import TriggerMultiDagRunOperator from airflow.utils.dates import days_ago def generate_dag_run(**kwargs): for i in range(10): order = DagRunOrder(payload={'my_variable': i}) yield order args = { 'start_date': days_ago(1), 'owner': 'airflow', } dag = DAG( dag_id='simple_trigger', max_active_runs=1, schedule_interval='@hourly', default_args=args, ) gen_target_dag_run = TriggerMultiDagRunOperator( task_id='gen_target_dag_run', dag=dag, trigger_dag_id='common_target', python_callable=generate_dag_run )
Я создал вилку с кодом на https://github.com/flinz/airflow_multi_dagrun
источник
График заданий не создается во время выполнения. Скорее график строится, когда он берется Airflow из папки dags. Следовательно, на самом деле невозможно будет иметь другой график для задания каждый раз, когда оно выполняется. Вы можете настроить задание на построение графика на основе запроса во время загрузки . Этот график будет оставаться неизменным для каждого прогона после этого, что, вероятно, не очень полезно.
Вы можете создать график, который выполняет разные задачи при каждом запуске на основе результатов запроса, используя оператор ветви.
Я предварительно сконфигурировал набор задач, а затем взял результаты запроса и распределил их по задачам. Это, вероятно, в любом случае лучше, потому что, если ваш запрос возвращает много результатов, вы, вероятно, все равно не захотите завалить планировщик множеством параллельных задач. Чтобы быть еще безопаснее, я также использовал пул, чтобы гарантировать, что мой параллелизм не выйдет из-под контроля с неожиданно большим запросом.
""" - This is an idea for how to invoke multiple tasks based on the query results """ import logging from datetime import datetime from airflow import DAG from airflow.hooks.postgres_hook import PostgresHook from airflow.operators.mysql_operator import MySqlOperator from airflow.operators.python_operator import PythonOperator, BranchPythonOperator from include.run_celery_task import runCeleryTask ######################################################################## default_args = { 'owner': 'airflow', 'catchup': False, 'depends_on_past': False, 'start_date': datetime(2019, 7, 2, 19, 50, 00), 'email': ['rotten@stackoverflow'], 'email_on_failure': True, 'email_on_retry': False, 'retries': 0, 'max_active_runs': 1 } dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None) totalBuckets = 5 get_orders_query = """ select o.id, o.customer from orders o where o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval and o.is_test = false and o.is_processed = false """ ########################################################################################################### # Generate a set of tasks so we can parallelize the results def createOrderProcessingTask(bucket_number): return PythonOperator( task_id=f'order_processing_task_{bucket_number}', python_callable=runOrderProcessing, pool='order_processing_pool', op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'}, provide_context=True, dag=dag ) # Fetch the order arguments from xcom and doStuff() to them def runOrderProcessing(task_bucket, **context): orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket) if orderList is not None: for order in orderList: logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}") doStuff(**op_kwargs) # Discover the orders we need to run and group them into buckets for processing def getOpenOrders(**context): myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id') # initialize the task list buckets tasks = {} for task_number in range(0, totalBuckets): tasks[f'order_processing_task_{task_number}'] = [] # populate the task list buckets # distribute them evenly across the set of buckets resultCounter = 0 for record in myDatabaseHook.get_records(get_orders_query): resultCounter += 1 bucket = (resultCounter % totalBuckets) tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])}) # push the order lists into xcom for task in tasks: if len(tasks[task]) > 0: logging.info(f'Task {task} has {len(tasks[task])} orders.') context['ti'].xcom_push(key=task, value=tasks[task]) else: # if we didn't have enough tasks for every bucket # don't bother running that task - remove it from the list logging.info(f"Task {task} doesn't have any orders.") del(tasks[task]) return list(tasks.keys()) ################################################################################################### # this just makes sure that there aren't any dangling xcom values in the database from a crashed dag clean_xcoms = MySqlOperator( task_id='clean_xcoms', mysql_conn_id='airflow_db', sql="delete from xcom where dag_id='{{ dag.dag_id }}'", dag=dag) # Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our # query returns fewer results than we have buckets, we don't try to run them all. # Unfortunately I couldn't get BranchPythonOperator to take a list of results like the # documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now. get_orders_task = PythonOperator( task_id='get_orders', python_callable=getOpenOrders, provide_context=True, dag=dag ) get_orders_task.set_upstream(clean_xcoms) # set up the parallel tasks -- these are configured at compile time, not at run time: for bucketNumber in range(0, totalBuckets): taskBucket = createOrderProcessingTask(bucketNumber) taskBucket.set_upstream(get_orders_task) ###################################################################################################
источник
for tasks in tasks
цикле в моем примере я удаляю объект, который повторяю. Это плохая идея. Вместо этого получите список ключей и перебирайте его - или пропустите удаления. Точно так же, если xcom_pull возвращает None (вместо списка или пустого списка), цикл for также не выполняется. Вы можете захотеть запустить xcom_pull перед 'for', а затем проверить, является ли он None - или убедиться, что там есть хотя бы пустой список. YMMV. Удачи!open_order_task
?Не понял в чем проблема?
Вот стандартный пример. Теперь, если в функции subdag заменить
for i in range(5):
на,for i in range(random.randint(0, 10)):
то все заработает. Теперь представьте, что оператор start помещает данные в файл, и вместо случайного значения функция будет читать эти данные. Тогда оператор start повлияет на количество задач.Проблема будет только в отображении в пользовательском интерфейсе, поскольку при входе в субдаг количество задач будет равно последнему чтению из файла / базы данных / XCom на данный момент. Что автоматически дает ограничение на несколько запусков одного дага одновременно.
источник
Я нашел это сообщение Medium, которое очень похоже на этот вопрос. Однако он полон опечаток и не работает, когда я пытался его реализовать.
Мой ответ на вышеизложенное:
Если вы создаете задачи динамически, вы должны делать это , повторяя то, что не создается восходящей задачей или может быть определено независимо от этой задачи. Я узнал, что вы не можете передавать даты выполнения или другие переменные воздушного потока чему-то за пределами шаблона (например, задаче), как многие другие указывали ранее. См. Также этот пост .
источник