Потребителям Apache Beam KafkaIO в группе потребителей назначается уникальный идентификатор группы

Я запускаю несколько экземпляров Apache Beam KafkaIO с помощью DirectRunner, которые читаются из той же темы. Но сообщение доставляется во все запущенные экземпляры. После просмотра конфигурации Kafka, которую я нашел, к имени группы добавляется уникальный префикс, и каждый экземпляр имеет уникальное имя группы.

  1. group.id = Reader-0_offset_consumer_559337182_ моя_группа
  2. group.id = Reader-0_offset_consumer_559337345_ my_group

Таким образом, каждому экземпляру назначен уникальный group.id, и поэтому сообщения доставляются во все экземпляры.

pipeline.apply(«ReadFromKafka», KafkaIO.<String, String>read().withReadCommitted() .withConsumerConfigUpdates( new ImmutableMap.Builder<String, Object>().put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) .put(ConsumerConfig.GROUP_ID_CONFIG, «my_group») .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5).build()) .withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class) .withBootstrapServers(servers).withTopics(Collections.singletonList(topicName)).withoutMetadata()

Итак, какую конфигурацию я должен предоставить, чтобы все потребители в группе не читали одно и то же сообщение

В чем причина для запуска нескольких экземпляров KafkaIO с DirectRunner и чтения из одной и той же темы?   —  person Aditya    schedule 21.07.2020

@AlexeyRomanenko, мы не используем GCP и запускаем его на собственном голом железе. поэтому мы не можем использовать поток данных. Итак, мы хотим масштабироваться за счет развертывания в модуле k8s и увеличения количества модулей. Но проблема здесь в том, что я вижу, так как каждому экземпляру назначается уникальный groupId, когда когда-либо я отправляю сообщение, сообщение отправляется во все группы / экземпляры. Надеюсь, это проясняет проблему   —  person Aditya    schedule 23.07.2020

Я бы не рекомендовал вам использовать DirectRunner в производстве для значительного объема данных, поскольку этот бегун должен использоваться в основном для тестирования, он содержит и выполняет множество дополнительных проверок во время работы конвейера, поэтому он может быть довольно медленным по сравнению с другими бегуны. Можно ли использовать бегуны Spark или Flink поверх распределенных кластеров Spark или Flink?   —  person Aditya    schedule 23.07.2020

@AlexeyRomanenko Нет, на данный момент у нас нет возможности использовать Spark of Flink. Кроме того, отмените отрицательный голос, так как это допустимый сценарий.   —  person Aditya    schedule 27.07.2020

Я не голосовал отрицательно, но поставил +1 к вашему посту. Я ожидаю, что у людей могут быть разные случаи, я просто рекомендую, как их лучше использовать.   —  person Aditya    schedule 27.07.2020

@Aditya, вы когда-нибудь придумывали решение этой проблемы? У меня похожая ситуация. В моем случае я хочу, чтобы тот же groupId оставался после перезапуска на случай, если Beam Job выйдет из строя. Будем признательны за любые выводы с вашей стороны. Спасибо   —  person Aditya    schedule 18.12.2020

@ user3693309 Мы перешли в поток данных.   —  person Aditya    schedule 19.12.2020

Источник: ledsshop.ru

Стиль жизни - Здоровье!