Я пытаюсь реализовать инкрементную загрузку данных для извлечения данных из rds postgres в другой postgres rds
Я использую воздушный поток для реализации ETL. Итак, прочитав некоторое время о макросах воздушного потока, я решил, что настрою инкрементный поток с переменными воздушного потока по умолчанию.
Итак, алгоритм такой,
если моя предыдущая дата выполнения None или null или »: выберите данные с начала времени (в нашем случае это год назад), иначе выберите конец предыдущей даты выполнения, если
Примечание: следующий код предназначен сначала для понимания переменных по умолчанию, и это еще не реализовано для проблемы, о которой я упоминал выше.
Соответствующий код для этого показан ниже. Когда я запускаю даг в первый раз, я всегда заканчиваю тем, что печатаю «Нет» для переменной предыдущей успешной даты выполнения, а не историческую дату, как то, что я упомянул. Я не могу понять этого. Любые идеи по этому поводу будут большим подспорьем
from datetime import * from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.operators.dummy_operator import DummyOperator from airflow.utils.dates import days_ago default_args={‘owner’:’airflow’,’start_date’: days_ago(1),’depends_on_past’:’False’} dag = DAG(‘jinja_trial_10’,default_args=default_args,schedule_interval=timedelta(minutes=5)) def printexecutiontimes(**kwargs): executiondate = kwargs.get(‘execution_date’) previoussuccessfulexecutiondate = kwargs.get(‘prev_execution_date_success’) previousexecutiondate = kwargs.get(‘prev_ds_nodash’) if (previoussuccessfulexecutiondate == ‘None’ or previoussuccessfulexecutiondate is None): previoussuccessfulexecutiondate = datetime.strftime(datetime.now() — timedelta(days = 365),’%Y-%m-%d’) print(‘Execution Date : {0}’.format(executiondate)) print(‘Previous successful execution date : {0}’.format(previoussuccessfulexecutiondate)) print(‘Previous execution date : {0}’.format(previousexecutiondate)) print(‘hello’) task_start = DummyOperator(task_id = ‘start’,dag=dag) jinja_task= PythonOperator(task_id = ‘TryingoutJinjatemplates’, python_callable =printexecutiontimes, provide_context = ‘True’, dag=dag ) task_end = DummyOperator(task_id = ‘end’,dag=dag) task_start >>jinja_task >> task_end
Источник: