При обработке моих данных в ParDo мне нужно использовать схему JSON, хранящуюся в Google Cloud Storage. Я думаю, это может быть неопубликованная загрузка? Я читал страницы, которые они называют документацией (https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.pvalue.html), и он содержит что-то о apache_beam.pvalue.AsSingleton и apache_beam.pvalue.AsSideInput, но нет результатов, если я использую их в Google, и я не могу найти ни одного пример для Python.
Как я могу прочитать файл из хранилища из ParDo? Или мне нужно загрузить неопубликованный файл в свой конвейер до ParDo, но как мне тогда использовать этот второй источник в ParDo?
[ИЗМЕНИТЬ]
Мои основные данные поступают от BQ: beam.io.Read(beam.io.BigQuerySource(…
Боковой ввод также поступает от BQ, используя тот же BigQuerySource.
Когда я затем добавляю шаг после того, как основная сторона данных вводит другие данные, я получаю некоторые странные ошибки. Я замечаю, что когда я делаю beam.Map(lambda x: x) в сторону ввода, он работает.
боковой ввод
schema_data = (p | «read schema data» >> beam.io.Read(beam.io.BigQuerySource(query=f»select * from `{schema_table}` limit 1″, use_standard_sql=True, flatten_results=True)) | beam.Map(lambda x: x) )
основные данные
source_data = (p | «read source data» >> beam.io.Read(beam.io.BigQuerySource(query=f»select {columns} from `{source_table}` limit 10″, use_standard_sql=True, flatten_results=True)))
комбинирование
validated_records = source_data | ‘record validation’ >> beam.ParDo(Validate(), pvalue.AsList(schema_data))
Источник: