Composer (Airflow) DAG RunID конфликт в GCP

У нас есть одна облачная функция, основанная на облачном хранилище. Эта облачная функция сработает после загрузки файла в корзину. Когда файл загружен, функция вызывает / запускает DAG воздушного потока. Этот DAG обработает файл.

Проблема в том, что когда несколько файлов помещаются одновременно с за секунду, вызов функции завершается с ошибкой ниже,

b ‘{error: Run id manual__2020-07-31T17: 48: 15 + 00: 00 уже существует для dag id pl_imaoc_trigger_dag} n’

Чтобы решить эту проблему, мы передаем run_id как run_id: IMAOC_31072020201842766625, дату в миллисекундах.

Код:

dag_name = environ_vars[‘imaoc_meta_dag’] webserver_url = ( webserver_id + ‘/api/experimental/dags/’ + dag_name + ‘/dag_runs’ ) print(‘webserver_url: {}’.format(webserver_url)) data[‘run_id’] = _datetime.datetime.now().strftime(**»IMAOC_%d%m%Y%H%M%S%f»**) resp = map_iap_request(webserver_url,client_id,method = ‘POST’,json = data) print(‘response text:{}’.format(resp))

Но все еще не решена, и AIRFLOW_CTX_DAG_RUN_ID поступает в формате manual__2020-07-31T20: 18: 43 + 00: 00 ….

Не знаю, что делать, чтобы удалить этот конфликт и запустить DAG, если файл приходит в ту же секунду.

Есть ли возможность установить переменную AIRFLOW_CTX_DAG_RUN_ID перед запуском DAG ..?   —  person Chimmu    schedule 03.08.2020

Источник: ledsshop.ru

Стиль жизни - Здоровье!