Я пытаюсь переместить файлы s3 из «не удаляющего» контейнера (то есть я не могу удалить файлы) в GCS, используя поток воздуха. Я не могу гарантировать, что новые файлы будут там каждый день, но я должен проверять новые файлы каждый день.
Моя проблема заключается в динамическом создании subdags. Если есть файлы ARE, мне нужны подзадачи. Если нет файлов, мне не нужны подзадачи. Моя проблема в настройках вверх / вниз по течению. В моем коде он обнаруживает файлы, но не запускает подзадачи, как они должны. Я что-то упустил.
вот мой код:
from airflow import models
from airflow.utils.helpers import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.utils import dates
from airflow.models import Variable
import logging
args = {
'owner': 'Airflow',
'start_date': dates.days_ago(1),
'email': ['sinistersparrow1701@gmail.com'],
'email_on_failure': True,
'email_on_success': True,
}
bucket = 'mybucket'
prefix = 'myprefix/'
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = []
parent_dag = models.DAG(
dag_id='My_Ingestion',
default_args=args,
schedule_interval='@daily',
catchup=False
)
def Check_For_Files(**kwargs):
s3 = S3Hook(aws_conn_id='S3_BOX')
s3.get_conn()
bucket = bucket
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = s3.list_keys(bucket_name=bucket, prefix='myprefix/file')
for file in files:
print(file)
print(file.split("_")[-2])
print(file.split("_")[-2][-8:]) ##proves I can see a date in the file name is ok.
maxdate = maxdate if maxdate > int(file.split("_")[-2][-8:]) else int(file.split("_")[-2][-8:])
if maxdate > LastBDEXDate:
return 'Start_Process'
return 'finished'
def create_subdag(dag_parent, dag_id_child_prefix, file_name):
# dag params
dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix)
# dag
subdag = models.DAG(dag_id=dag_id_child,
default_args=args,
schedule_interval=None)
# operators
s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
task_id=dag_id_child,
bucket=bucket,
prefix=file_name,
dest_gcs_conn_id='GCP_Account',
dest_gcs='gs://my_files/To_Process/',
replace=False,
gzip=True,
dag=subdag)
return subdag
def create_subdag_operator(dag_parent, filename, index):
tid_subdag = 'file_{}'.format(index)
subdag = create_subdag(dag_parent, tid_subdag, filename)
sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
return sd_op
def create_subdag_operators(dag_parent, file_list):
subdags = [create_subdag_operator(dag_parent, file, file_list.index(file)) for file in file_list]
# chain subdag-operators together
chain(*subdags)
return subdags
check_for_files = BranchPythonOperator(
task_id='Check_for_s3_Files',
provide_context=True,
python_callable=Check_For_Files,
dag=parent_dag
)
finished = DummyOperator(
task_id='finished',
dag=parent_dag
)
decision_to_continue = DummyOperator(
task_id='Start_Process',
dag=parent_dag
)
if len(files) > 0:
subdag_ops = create_subdag_operators(parent_dag, files)
check_for_files >> decision_to_continue >> subdag_ops[0] >> subdag_ops[-1] >> finished
check_for_files >> finished
python
airflow
directed-acyclic-graphs
arcee123
источник
источник
spark
задания или какой-либоpython
сценарий, и что вы используете для его выполнения, какlivy
и какой-либо другой методfiles
пустой список?Ответы:
Ниже приведен рекомендуемый способ создания динамического DAG или вспомогательного DAG в потоке воздуха, хотя есть и другие способы, но я думаю, что это в значительной степени применимо к вашей проблеме.
Сначала создайте файл,
(yaml/csv)
который включает в себя список всехs3
файлов и местоположений, в вашем случае вы написали функцию для сохранения их в списке, я бы сказал, сохранить их в отдельномyaml
файле и загрузить его во время выполнения в airflow env, а затем создать DAGs.Ниже приведен пример
yaml
файла:dynamicDagConfigFile.yaml
Вы можете изменить свою
Check_For_Files
функцию, чтобы сохранить их вyaml
файле.Теперь мы можем перейти к динамическому созданию даг:
Сначала определите две задачи, используя фиктивные операторы: начало и конец задачи. Такие задачи - те, в которых мы собираемся опираться
DAG
, динамически создавая задачи между ними:Динамический DAG: мы будем использовать
PythonOperators
в воздушном потоке. Функция должна получить в качестве аргументов идентификатор задачи; выполняемая функция python, то есть python_callable для оператора Python; и набор аргументов, которые будут использоваться во время выполнения.Включите аргумент в
task id
. Таким образом, мы можем обмениваться данными между задачами, сгенерированными динамически, например, с помощьюXCOM
.Вы можете указать свою операционную функцию в этом динамическом dag, как
s3_to_gcs_op
.Наконец, в зависимости от местоположения, присутствующего в файле yaml, вы можете создать динамический даг, сначала прочитайте
yaml
файл, как показано ниже, и создайте динамический даг:Окончательное определение DAG:
Идея в том, что
Полный код воздушного потока в заказе:
источник
upload_s3_toGCS
не будет существовать, и возникнет ошибка в потоке воздуха.yaml
файла, как только все эти файлы будут загружены в GCS, таким образом, в файле будут присутствовать только новые файлыyaml
. И в случае, если нет новых файлов,yaml
файл будет пустым, и динамическая метка не будет создана. Вот почемуyaml
файл является гораздо лучшим вариантом по сравнению с хранением файлов в списке.yaml
файл также поможет в ведении журнала файлов s3 таким образом, если предполагается, что часть файла s3 не была загружена в GCS, вы также можете сохранить флаг, соответствующий этому файлу, и затем повторить их при следующем запуске DAG.if
перед DAG условие, которое проверит наличие новых файлов вyaml
файлах, если появятся новые файлы, запустите его, в противном случае пропустите его.