Jak tworzyć – i konfigurować – konsumentów Apache Kafka

Apache Kafka’s przetwarzanie danych w czasie rzeczywistym opiera się na konsumentach Kafki (więcej informacji tutaj), które odczytują wiadomości w ramach swojej infrastruktury. Producenci publikują wiadomości do Tematy Kafka, a konsumenci – często należący do grupy konsumentów – subskrybują te tematy w celu odbierania wiadomości w czasie rzeczywistym. Konsument śledzi swoją pozycję w kolejce za pomocą offsetu. Aby skonfigurować konsumenta, programiści tworzą go z odpowiednim identyfikatorem grupy, wcześniejszym przesunięciem i szczegółami. Następnie implementują pętlę dla konsumenta, aby efektywnie przetwarzać przychodzące wiadomości.

Jest to ważne zrozumienie dla każdej organizacji korzystającej z Kafki w wersji 100% open-source, gotowej do użycia w przedsiębiorstwach – a oto co należy wiedzieć.

Przykład: Tworzenie konsumenta Kafka

Proces tworzenia i konfigurowania Konsumenci Kafka przestrzega spójnych zasad we wszystkich językach programowania, z kilkoma niuansami specyficznymi dla danego języka. Ten przykład ilustruje podstawowe kroki tworzenia aplikacji Konsument Kafka w Javie.

Proszę zacząć od stworzenia pliku właściwości. Chociaż podejście programowe jest możliwe, zaleca się użycie pliku właściwości. W poniższym kodzie proszę zastąpić “MYKAFKAIPADDRESS1” rzeczywistymi adresami IP brokerów Kafka:

bootstrap.servers=MYKAFKAIPADDRESS1:9092, MYKAFKAIPADDRESS2:9092, MYKAF KAIPADDRESS3:9092

key.deserializer=org.apache.kafka.common.serialization. StringDeserializer

value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

group.id=my-group

security.protocol=SASL_PLAINTEXT

sasl.mechanism=SCRAM-SHA-256

sasl.jaas.config=org.apache.kafka.common.security.scram. ScramLoginModule required \

username="[USER NAME]" \

password="[USER PASSWORD]";

Następnym krokiem jest utworzenie konsumenta. Ten przykładowy kod przygotowuje główny punkt wejścia programu, a także niezbędną pętlę przetwarzania wiadomości:

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.clients.consumer. Consumer Record;

import org.apache.kafka.clients.consumer. Consumer Records;

import java.io.FileReader;

import java.io.IOException;

import java.time. Duration;

import java.util.Collections;

import java.util.Properties;

public class Consumer {

public static void main(String[] args) {

Properties kafkaProps = new Properties();

try (FileReader fileReader = new FileReader ("consumer.properties")) {

kafkaProps.load(fileReader);

} catch (IOException e) {

e.printStackTrace();

}

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) {

consumer.subscribe (Collections.singleton("test"));

while (true) {

Consumer Records<String, String> records = consumer.poll (Duration.ofMillis(100));

for (Consumer Record<String, String>; record records) {

System.out.println(String.format("topic = %s, partition = %s, offset = %d, key = %s, value = %",

record.topic (), record.partition(), record.offset(), record.key(), record.value()));

}

}

}

}

Najczęściej używane opcje konfiguracji konsumenta Kafka

Po zakończeniu podstawowej konfiguracji programiści mają do dyspozycji szereg zaawansowanych opcji umożliwiających dostosowanie konsumentów Kafka do ich preferencji. Poniższe podsumowania podkreślają najczęściej używane opcje, natomiast Państwa dokumentacja sterownika Kafka zawiera wyczerpującą listę konfiguracji.

Niektóre z najpopularniejszych opcji konfiguracji konsumenta Kafka obejmują:

  • client.id – Identyfikuje klienta (konsumenta) dla brokerów w klastrze Kafka, aby wyjaśnić, który konsument wykonał które żądania. Zgodnie z najlepszą praktyką, konsumenci w grupie konsumenckiej powinni mieć ten sam identyfikator klienta, umożliwiając egzekwowanie limitu klienta dla tej grupy.
  • session.timeout.ms – Ta wartość kontroluje, jak długo broker będzie nasłuchiwał bicia serca konsumenta przed uznaniem go za martwego. Domyślną wartością jest 10 sekund.
  • heartbeat.interval.ms – Ta wartość kontroluje, jak często konsument wysyła bicie serca, aby poinformować brokera, że żyje i działa. Domyślna wartość to 3 sekundy.
  • Proszę zauważyć, że session.timeout.ms i heartbeat.interval.ms muszą współpracować, aby poinformować brokera o statusie konsumenta. Jako najlepszą praktykę, konsument powinien wysyłać kilka uderzeń serca na interwał limitu czasu. Na przykład domyślne ustawienia bicia serca co 3 sekundy i 10-sekundowego limitu czasu oferują zdrową strategię.
  • max.poll.interval.ms – Ustawia czas, przez jaki broker będzie czekał między wywołaniami metody ankiety i monituje konsumenta o próbę odebrania dalszych wiadomości przed uznaniem go za martwego. Domyślną wartością jest 300 sekund.
  • enable.auto.commit – Mówi konsumentowi, aby automatycznie zatwierdzał okresowe przesunięcia (w odstępie czasu określonym przez auto.commit.interval.ms). Ta konfiguracja jest domyślnie włączona.
  • fetch.min.bytes – Ustawia minimalną ilość danych, które konsument pobiera od brokera. Konsument będzie czekał na więcej danych, jeśli dostępne dane nie spełniają ustawionej ilości. Podejście to minimalizuje połączenia tam i z powrotem między konsumentami a brokerami, zwiększając przepustowość kosztem opóźnień.
  • fetch.max.wait.ms – Używany razem z fetch.min.bytes, ustawia maksymalny czas oczekiwania konsumenta przed pobraniem wiadomości od brokera.
  • max.partition.fetch.bytes – Ustawia maksymalną liczbę bajtów, które konsument pobierze na partycję, skutecznie nakładając górny limit wymagań pamięci na pobieranie wiadomości. Ważne jest, aby pamiętać o max.poll.interval przy wyborze tej wartości: zbyt duże maksimum może sprawić, że będzie więcej danych do dostarczenia niż konsumenci mogą pobrać w przedziale czasowym ankiety.
  • auto.offset.reset Mówi konsumentowi, co ma zrobić, jeśli odczyta partycję bez dostępnego przesunięcia ostatniego odczytu. Może to być ustawione na “najnowsze” lub “najwcześniejsze”, informując konsumenta, aby rozpoczął od najnowszej dostępnej wiadomości lub najwcześniejszego dostępnego offsetu.
  • partition.assignment.strategy – To informuje konsumenta-lidera grupy, jak przypisać partycje do konsumentów w jego grupie konsumentów.
  • max.poll.records – Ustawia maksymalne rekordy, które konsument pobierze w wywołaniu ankiety, oferując kontrolę nad przepustowością.

Odblokowanie potencjału Apache Kafka

Jako niezawodne, wysokowydajne, hiperskalowe rozwiązanie do strumieniowego przesyłania danych w czasie rzeczywistym, Apache Kafka o otwartym kodzie źródłowym oferuje ogromne możliwości. Rozumiejąc, jak tworzyć i konfigurować konsumentów Kafka, programiści mogą ściślej kontrolować zachowanie rozwiązania i uzyskać więcej z wdrożenia Kafka.