Почему срабатывает мое окно времени обработки, а время события — нет

Я изо всех сил пытаюсь заставить запускать триггеры на основе времени события для моего конвейера лучей apache, но, похоже, могу запускать запуск окна со временем обработки.

Мой конвейер довольно прост:

  1. Я получаю пакеты точек данных, которые включают отметки времени миллисекундного уровня от чтения pubsub с отметкой времени немного раньше, чем самая ранняя точка пакетных данных. Данные группируются, чтобы уменьшить рабочую нагрузку на стороне клиента и расходы на pubsub.

  2. Я извлекаю временные метки второго уровня и применяю временные метки к отдельным точкам данных

  3. Я закрываю данные для обработки и избегаю использования глобального окна.

  4. Я группирую данные по секундам для последующей категоризации по секундам потоковых данных.

  5. В конце концов, я использую скользящие окна в отсортированных по категориям секундах, чтобы условно отправлять одно из двух сообщений в pubsub один раз в секунду.

Моя проблема, похоже, находится на шаге 3.

Я пытаюсь использовать ту же стратегию работы с окнами на этапе 3, которую я в конечном итоге буду использовать на этапе 5, чтобы выполнить расчет скользящего среднего на категоризированных секундах.

Я пробовал возиться с параметрами withTimestampCombiner (TimestampCombiner.EARLIEST), но, похоже, это не решает проблему.

Я читал о методе .withEarlyFirings, используемом для времени события, но похоже, что он имитирует мою существующую работу. В идеале я мог бы полагаться на водяной знак, проходящий через конец окна, и включать позднее срабатывание.

// De-Batching The Pubsub Message static public class UnpackDataPoints extends DoFn<String,String>{ @ProcessElement public void processElement(@Element String c, OutputReceiver<String> out) { JsonArray packedData = new JsonParser().parse(c).getAsJsonArray(); DateTimeFormatter dtf = DateTimeFormat.forPattern(«EEE dd MMM YYYY HH:mm:ss:SSS zzz»); for (JsonElement acDataPoint: packedData){ String hereData = acDataPoint.toString(); DateTime date = dtf.parseDateTime(acDataPoint.getAsJsonObject().get(«Timestamp»).getAsString()); Instant eventTimeStamp = date.toInstant(); out.outputWithTimestamp(hereData,eventTimeStamp); } } } // Extracting The Second static public class ExtractTimeStamp extends DoFn<String,KV<String,String>> { @ProcessElement public void processElement(ProcessContext ctx ,@Element String c, OutputReceiver<KV<String,String>> out) { JsonObject accDataObject = new JsonParser().parse(c).getAsJsonObject(); String milliString = accDataObject.get(«Timestamp»).getAsString(); String secondString = StringUtils.left(milliString,24); accDataObject.addProperty(«noMiliTimeStamp», secondString); String updatedAccData = accDataObject.toString(); KV<String,String> outputKV = KV.of(secondString,updatedAccData); out.output(outputKV); } } // The Pipeline & Windowing Pipeline pipeline = Pipeline.create(options); PCollection<String> dataPoints = pipeline .apply(«Read from Pubsub», PubsubIO.readStrings() .fromTopic(«projects/????/topics/???») .withTimestampAttribute(«messageTimestamp»)) .apply(«Extract Individual Data Points»,ParDo.of(new UnpackDataPoints())); /// This is the event time window that doesn’t fire for some reason /* PCollection<String> windowedDataPoints = dataPoints.apply( Window.<String>into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1))) // .triggering(AfterWatermark.pastEndOfWindow()) .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(TWO_MINUTES)) //.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(2))) .discardingFiredPanes() .withTimestampCombiner(TimestampCombiner.EARLIEST) .withAllowedLateness(Duration.standardSeconds(1))); */ ///// Temporary Work Around, this does fire but data is out of order PCollection<String> windowedDataPoints = dataPoints.apply( Window.<String>into(FixedWindows.of(Duration.standardMinutes(120))) .triggering( AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(5))) .discardingFiredPanes() .withTimestampCombiner(TimestampCombiner.EARLIEST) .withAllowedLateness(Duration.standardSeconds(1))); PCollection<KV<String, String>> TimeStamped = windowedDataPoints .apply( «Pulling Out The Second For Aggregates», ParDo.of(new ExtractTimeStamp())); PCollection<KV<String, Iterable<String>>> TimeStampedGrouped = TimeStamped.apply(«Group By Key»,GroupByKey.create()); PCollection<KV<String, Iterable<String>>> testing = TimeStampedGrouped.apply(«testingIsh», ParDo.of(new LogKVIterable()));

Когда я использую первую оконную стратегию, которая закомментирована, мой конвейер работает бесконечно, и получение данных и LogKVIterable ParDo никогда ничего не возвращает, когда я использую время обработки, LogKVIterable запускается и регистрируется в консоли.

Вы проверили, правильно ли установлены ваши временные метки? Возможно, ваш водяной знак не продвигается должным образом, если временные метки не анализируются должным образом …   —  person Sam-U_L-L-L    schedule 28.09.2019

Вы видите, что показатели актуальности данных / системного лага не отстают от вашего конвейера?   —  person Sam-U_L-L-L    schedule 30.09.2019

Источник: ledsshop.ru

Стиль жизни - Здоровье!