У нас есть одна облачная функция, основанная на облачном хранилище. Эта облачная функция сработает после загрузки файла в корзину. Когда файл загружен, функция вызывает / запускает DAG воздушного потока. Этот DAG обработает файл.
Проблема в том, что когда несколько файлов помещаются одновременно с за секунду, вызов функции завершается с ошибкой ниже,
b ‘{error: Run id manual__2020-07-31T17: 48: 15 + 00: 00 уже существует для dag id pl_imaoc_trigger_dag} n’
Чтобы решить эту проблему, мы передаем run_id как run_id: IMAOC_31072020201842766625, дату в миллисекундах.
Код:
dag_name = environ_vars[‘imaoc_meta_dag’] webserver_url = ( webserver_id + ‘/api/experimental/dags/’ + dag_name + ‘/dag_runs’ ) print(‘webserver_url: {}’.format(webserver_url)) data[‘run_id’] = _datetime.datetime.now().strftime(**»IMAOC_%d%m%Y%H%M%S%f»**) resp = map_iap_request(webserver_url,client_id,method = ‘POST’,json = data) print(‘response text:{}’.format(resp))
Но все еще не решена, и AIRFLOW_CTX_DAG_RUN_ID поступает в формате manual__2020-07-31T20: 18: 43 + 00: 00 ….
Не знаю, что делать, чтобы удалить этот конфликт и запустить DAG, если файл приходит в ту же секунду.
Есть ли возможность установить переменную AIRFLOW_CTX_DAG_RUN_ID перед запуском DAG ..? — person Chimmu schedule 03.08.2020
Источник: