Архитектура компонентов
- Topic
- Broker
- Producer
- Consumer
Kafka API
- Основные API Kafka
- Какова роль Producer API?
- Какова роль Consumer API?
- Какова роль Connector API?
- Какова роль Streams API?
- Какова роль Transactions API?
- Какова роль Quota API?
- Какова роль AdminClient API?
Kafka Consumer
- Для чего нужен координатор группы?
- Для чего нужен Consumer heartbeat thread?
- Как Kafka обрабатывает сообщения?
- Как Kafka обрабатывает задержку консюмера?
- Для чего нужны методы subscribe() и poll()?
- Для чего нужен метод position()?
- Для чего нужны методы commitSync() и commitAsync()?
Другие вопросы
- Для чего нужен идемпотентный продюсер?
- Для чего нужен интерфейс Partitioner?
- Для чего нужен Broker log cleaner thread?
- Для чего нужен Kafka Mirror Maker?
- Для чего нужна Schema Registry?
- Для чего нужен Streams DSL?
- Как Kafka обеспечивает версионирование сообщений?
- Как потребители получают сообщения от брокера?
Сравнение с другими компонентами и системами
- В чем разница между Kafka Consumer и Kafka Stream?
- В чем разница между Kafka Streams и Apache Flink?
- В чем разница между Kafka и Flume?
- В чем разница между Kafka и RabbitMQ?
Это распределённая система с открытым исходным кодом, разработанная для высокоскоростной передачи больших объёмов данных с минимальной задержкой.
- Персистентность данных
- Высокая производительность
- Независимость пайплайнов обработки
- Возможность просмотреть историю записей заново
- Гибкость в использовании
- λ-архитектура или k-архитектура
- Стриминг больших данных
- Много клиентов (producer и consumer)
- Требуется кратное масштабирование
- Это не брокер сообщений
- Отложенные сообщения
- DLQ
- AMQP / MQTT
- TTL на сообщение
- Очереди с приоритетами
- Producer (Производитель) — приложение, которое публикует сообщения в топики Kafka
- Consumer (Потребитель) — приложение, которое подписывается на топики и читает сообщения
- Broker (Брокер) — сервер Kafka, который принимает, хранит и распределяет сообщения. В кластере Kafka может быть несколько брокеров
- Topic (Топик) — логическое разделение, по которому организуются данные. Производители отправляют сообщения в топики, а потребители читают из них
- Partition (Раздел) — каждый топик разделён на партиции для параллельной обработки. Сообщения в партициях упорядочены
- Zookeeper — сервис, используемый Kafka для управления состоянием кластера и координации брокеров. Однако в новых версиях Kafka отказывается от Zookeeper в пользу собственного механизма метаданных KRaft (Kafka Raft). Это новая внутренняя архитектура метаданных Kafka, которая устраняет зависимость от Zookeeper. Она основана на Raft-консенсусе, позволяя Kafka брокерам самостоятельно управлять метаданными и координировать взаимодействие между собой.
- Топик разбит на партиции — сообщения в топике распределяются по партициям для более эффективной параллельной обработки и хранения
- Партиции хранятся на диске — Kafka сохраняет данные на диск, что позволяет долговременно хранить сообщения
- Партиции делятся на сегменты — сегмент представляет собой обычный файл на диске, сегменты делятся на пассивные и активный. Запись происходит в активный сегмент
- Данные удаляются либо по времени, либо по размеру. Удаление происходит посегментно, с самого старого сегмента
- retention.bytes - по максимальному размеру
- retention.ms - по времени
- Сообщение можно быстро найти по его Offset — каждому сообщению в партиции присваивается уникальный смещающий индекс (offset), по которому можно легко найти сообщение
replication.factor
- Описание: Количество реплик для каждой партиции топика
- Пример:
replication.factor=3
min.insync.replicas
- Описание: Минимальное количество синхронизированных реплик
- Пример:
min.insync.replicas=2
retention.ms
- Описание: Время хранения сообщений в топике в миллисекундах
- Пример:
retention.ms=604800000
(7 дней)
retention.bytes
- Описание: Максимальный объём данных в топике, после чего старые сообщения удаляются
- Пример:
retention.bytes=10737418240
(10 GB)
segment.bytes
- Описание: Размер сегмента логов топика
- Пример:
segment.bytes=1073741824
(1 GB)
cleanup.policy
- Описание: Как Kafka обрабатывает старые сообщения
- Значения:
delete
,compact
- Пример:
cleanup.policy=delete
num.partitions
- Описание: Количество партиций в топике
- Пример:
num.partitions=3
- У каждой партиции свой лидер — в Kafka для каждой партиции в топике назначается лидер-брокер, который отвечает за запись и чтение данных
- Сообщения пишутся в лидера — производители отправляют сообщения напрямую в брокер-лидер партиции
- Данные реплицируются между брокерами — для обеспечения отказоустойчивости Kafka реплицирует данные партиций на другие брокеры, которые становятся репликами
- Автоматический фейловер лидера — в случае сбоя брокера-лидера Kafka автоматически назначает новый лидер из числа реплик, обеспечивая бесшовную работу системы
min.insync.replicas
- Описание: Минимальное количество синхронизированных реплик для подтверждения записи
- Пример:
min.insync.replicas=2
unclean.leader.election.enable
- Описание: Разрешает выбор лидера из неактуальных реплик, если нет синхронизированных реплик
- Пример:
unclean.leader.election.enable=false
log.dirs
- Описание: Директория на диске, где хранятся логи партиций
- Пример:
log.dirs=/var/lib/kafka/logs
log.retention.hours
- Описание: Максимальное время хранения данных в логах
- Пример:
log.retention.hours=168
(7 дней)
log.segment.bytes
- Описание: Максимальный размер сегмента лога, после чего создаётся новый
- Пример:
log.segment.bytes=1073741824
(1 GB)
num.network.threads
- Описание: Количество потоков для обработки сетевых запросов
- Пример:
num.network.threads=3
num.io.threads
- Описание: Количество потоков для ввода-вывода
- Пример:
num.io.threads=8
socket.send.buffer.bytes
- Описание: Размер буфера для отправки данных по сети
- Пример:
socket.send.buffer.bytes=102400
message.max.bytes
- Описание: Максимальный размер сообщения, которое брокер может принять
- Пример:
message.max.bytes=1048576
(1 MB)
replica.fetch.max.bytes
- Описание: Максимальный размер данных для запроса реплики
- Пример:
replica.fetch.max.bytes=1048576
(1 MB)
ssl.keystore.location
- Описание: Путь к хранилищу ключей SSL
- Пример:
ssl.keystore.location=/var/private/ssl/kafka.keystore.jks
ssl.truststore.location
- Описание: Путь к хранилищу доверенных сертификатов
- Пример:
ssl.truststore.location=/var/private/ssl/kafka.truststore.jks
- Создание сообщения (Record): Продюсер формирует сообщение, содержащее ключ (необязательный), значение и метаданные, такие как время отправки. Сообщение отправляется в топик (Topic), который состоит из одной или нескольких партиций
- Выбор партиции: Если ключ сообщения указан, Kafka использует его для хеширования и определения, в какую партицию записать сообщение (сообщения с одинаковым ключом попадают в одну и ту же партицию). Если ключа нет, Kafka распределяет сообщения по партициям с помощью round-robin или по другим правилам
- Отправка сообщений в буфер (Batching): Для повышения производительности продюсер Kafka не отправляет каждое сообщение по отдельности, а группирует несколько сообщений в пакеты (batching), прежде чем отправить их брокеру. Это снижает сетевые задержки и нагрузку на брокера
- Сжатие (Compression): Для уменьшения объёма передаваемых данных продюсер может сжимать сообщения с использованием таких алгоритмов, как GZIP, Snappy или LZ4. Сжатие снижает нагрузку на сеть и хранение, но добавляет небольшие накладные расходы на процессор
- Асинхронная отправка: Продюсер отправляет пакеты сообщений асинхронно. Это означает, что сообщения записываются в буфер памяти и отправляются брокеру, не ожидая завершения предыдущих операций. Это повышает пропускную способность
- Подтверждения (Acknowledgments): Kafka позволяет настраивать уровень подтверждений от брокеров
- Ретрай и идемпотентность: Если отправка сообщения не удалась, продюсер может повторить попытку отправки (ретрай). Также можно включить идемпотентный режим продюсера, что предотвращает повторную отправку одного и того же сообщения в случае сбоя, обеспечивая отправку уникального сообщения один раз
- Error handling: Продюсер обрабатывает ошибки при отправке сообщений. В зависимости от настроек продюсер может попытаться переотправить сообщение или сообщить о проблеме через callback
- Продюсер выбирает партицию для сообщения
- Продюсер выбирает уровень гарантии доставки
- В продюсере можно тюнить производительность
- Описание: Указывает адреса брокеров Kafka, к которым продюсер должен подключаться для отправки сообщений
- Пример:
bootstrap.servers: localhost:9092,localhost:9093
- Зачем это нужно: Kafka продюсер использует эти брокеры для получения метаданных о кластере (например, информация о топиках и партициях). Эти брокеры служат точками входа в кластер Kafka.
Продюсер должен преобразовывать (сериализовать) данные в байтовый формат перед отправкой в Kafka
- Ключевая настройка для сериализации ключа:
key.serializer
- Пример:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
- Ключевая настройка для сериализации значения:
value.serializer
- Пример:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
Варианты сериализаторов:
StringSerializer
для строкByteArraySerializer
для массива байтовLongSerializer
для чисел- Также можно реализовать свои собственные сериализаторы
Продюсер Kafka отправляет сообщения асинхронно, и для этого используется буферизация сообщений
- batch.size: Размер одного пакета (batch), который продюсер отправляет брокеру
- Описание: Определяет количество байтов сообщений, которые могут быть буферизованы в одном пакете перед отправкой брокеру
- Пример:
"batch.size": 16384
(16 KB) - Зачем это нужно: Большие пакеты могут повысить производительность, но могут увеличить задержки
- linger.ms: Максимальное время ожидания перед отправкой пакета
- Описание: Продюсер может немного подождать, пока буфер накопит сообщения, чтобы отправить больше данных за один раз
- Пример:
linger.ms: 5
(время ожидания 5 мс) - Зачем это нужно: Позволяет продюсеру собирать больше сообщений в пакете перед отправкой, что может улучшить эффективность использования сети
- buffer.memory: Размер выделенной памяти для буферизации сообщений
- Описание: Общий объем памяти, который продюсер может использовать для хранения сообщений, ожидающих отправки
- Пример:
buffer.memory: 33554432
(32 MB) - Зачем это нужно: Если буфер заполняется, продюсер приостанавливает отправку сообщений, пока буфер не освободится
Продюсер может сжимать сообщения для уменьшения объема передаваемых данных
- compression.type
- Описание: Указывает тип сжатия для сообщений
- Пример:
compression.type: gzip
(варианты: none, gzip, snappy, lz4, zstd) - Зачем это нужно: Сжатие уменьшает объем данных, передаваемых по сети, что может снизить нагрузку на сеть и хранилище, особенно при больших объемах сообщений. Однако это может потребовать дополнительных ресурсов на сжатие/разжатие
- partitioner.class
- Описание: определяет логику, по которой продюсер выбирает партицию для каждого сообщения
- Примеры:
- если настройка не задана, по умолчанию используется
DefaultPartitioner
, который может распределять сообщения по партициям равномерно или на основе ключа сообщения partitioner.class: o.a.k.clients.producer.RoundRobinPartitioner
использует метод Round Robin для распределения сообщенийpartitioner.class: o.a.k.clients.producer.UniformStickyPartitioner
равномерно отправляет сообщения, привязываясь к партиции на короткий промежуток времени, чтобы уменьшить нагрузку на брокеры
- если настройка не задана, по умолчанию используется
Настройка определяет, как много брокеров должны подтвердить получение сообщения перед тем, как продюсер будет считать его успешно отправленным
- acks
- Описание: Определяет количество подтверждений от брокеров
- Значения:
0
: Продюсер не ждёт подтверждений (самая быстрая отправка, но высокий риск потери сообщений)1
: Продюсер ждёт подтверждения от лидера партицииall
(или-1
): Продюсер ждёт подтверждений от всех реплик (наибольшая надежность, но увеличенные задержки)
- Пример:
acks: all
- Зачем это нужно: Позволяет выбрать баланс между скоростью и надежностью отправки данных.
- Количество повторных попыток (retries):
- Описание: Определяет, сколько раз продюсер должен попытаться отправить сообщение при неудаче
- Пример:
retries: 3
- Зачем это нужно: Если произошёл временный сбой, продюсер может попытаться повторить отправку сообщений, что увеличивает шанс доставки
- Идемпотентность продюсера (enable.idempotence):
- Описание: Включение идемпотентного режима, что предотвращает дублирование сообщений при сбоях
- Пример:
enable.idempotence: true
- Зачем это нужно: Гарантирует, что каждое сообщение будет доставлено ровно один раз
- Максимальный размер сообщения (max.request.size):
- Описание: Максимальный размер сообщения, которое продюсер может отправить брокеру
- Пример:
max.request.size: 1048576
(1 MB) - Зачем это нужно: Ограничивает размер сообщений, которые могут быть отправлены, чтобы избежать перегрузки сети и брокеров.
- Таймаут ожидания подтверждений (request.timeout.ms):
- Описание: Максимальное время ожидания подтверждения от брокера
- Пример:
request.timeout.ms: 30000
(30 секунд) - Зачем это нужно: Помогает избежать бесконечного ожидания ответа от брокера в случае его сбоя
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaStringArrayProducer {
public static void main(String[] args) {
// Настройки Kafka Producer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Создание Kafka Producer
KafkaProducer<String, String[]> producer = new KafkaProducer<>(props);
String key = "user123";
String[] value = {"message1", "message2", "message3"};
// Создание записи и добавление заголовков
ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", key, value);
record.headers().add("traceId", "someTraceId");
// Отправка сообщения в Kafka
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.out.println("Ошибка при отправке сообщения: " + exception.getMessage());
} else {
System.out.println("Сообщение отправлено в топик " + metadata.topic() + " с партицией " + metadata.partition());
}
});
producer.close();
}
}
acks=all
retries=3
compression.type=gzip
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.config.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.producer.Producer;
import org.springframework.kafka.producer.ProducerRecord;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServer());
props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProperties.getProducerId());
props.put(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.example.configuration.kafka.KafkaProducerLoggingInterceptor"
);
if ("SASL_SSL".equals(kafkaProperties.getSecurityProtocol())) {
props.put("ssl.truststore.location", kafkaProperties.getSslTrustStoreLocation());
props.put("ssl.truststore.password", kafkaProperties.getSslTrustStorePassword());
props.put("ssl.truststore.type", kafkaProperties.getSslTrustStoreType());
props.put("ssl.keystore.type", kafkaProperties.getSslKeyStoreType());
props.put("sasl.mechanism", kafkaProperties.getSaslMechanism());
props.put("security.protocol", kafkaProperties.getSecurityProtocol());
props.put("sasl.jaas.config", kafkaProperties.getJaasConfigCompiled());
}
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
var stringSerializerKey = new StringSerializer();
stringSerializerKey.configure(Map.of("key.serializer.encoding", "UTF-8"), true);
stringSerializerKey.configure(Map.of("serializer.encoding", "UTF-8"), true);
var stringSerializerValue = new StringSerializer();
stringSerializerValue.configure(Map.of("value.serializer.encoding", "UTF-8"), false);
stringSerializerValue.configure(Map.of("serializer.encoding", "UTF-8"), false);
return new DefaultKafkaProducerFactory<>(producerConfigs(), stringSerializerKey, stringSerializerValue);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message, String key, String topic) {
try {
log.info("Sending message {}", data);
kafkaTemplate.send(topic, key, message);
log.info("Successfully send message {}", data);
} catch (Exception ex) {
log.error("Failed send message to {} topic by key {}", key, topic);
throw ex;
}
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
private KafkaProducerService kafkaProducerService;
@PostMapping("/send")
public String sendMessage(@RequestParam String message, @RequestParam String key, @RequestParam String topic) {
kafkaProducerService.sendMessage(message, key, topic);
return "Message sent to Kafka!";
}
}
spring:
cloud:
stream:
bindings:
output:
destination: my_topic
kafka:
binder:
brokers: localhost:9092
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
@Service
@EnableBinding(Source.class) // Подключение к каналу сообщений
public class KafkaStreamProducer {
private final Source source;
public KafkaStreamProducer(Source source) {
this.source = source;
}
public void sendMessage(String message) {
Message<String> msg = MessageBuilder.withPayload(message).build();
source.output().send(msg); // Отправка сообщения в Kafka
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/kafka-stream")
public class KafkaStreamController {
@Autowired
private KafkaStreamProducer kafkaStreamProducer;
@PostMapping("/send")
public String sendMessage(@RequestParam String message) {
kafkaStreamProducer.sendMessage(message);
return "Message sent to Kafka via Spring Cloud Stream!";
}
}
Потребители используют Kafka Consumer API для взаимодействия с брокерами Kafka. Они получают сообщения и обрабатывают их согласно своей логике. Потребители могут быть объединены в группы Consumer Groups.
- "Smart" консюмер
- Консюмер опрашивает кафку
- Консюмер отвечает за гарантию обработки
- Автоматические фейловер в консюмер-группе
- Независимая обработка разными консюмер-группе
Kafka использует концепцию Consumer Groups, что позволяет нескольким потребителям работать вместе, чтобы параллельно обрабатывать данные из топиков. Каждый потребитель в группе обрабатывает только часть данных из топика, обеспечивая масштабируемость и балансировку нагрузки.
- Все сообщения из одного Kafka Topic делятся между всеми потребителями в группе
- Если в группе несколько потребителей, Kafka гарантирует, что каждая партиция топика будет обрабатываться только одним потребителем
- В случае если один из потребителей выходит из строя, его партиции автоматически перераспределяются между оставшимися активными потребителями
Потребитель отслеживает offset каждой партиции, чтобы понимать, с какого сообщения продолжать чтение. Смещение — это уникальный идентификатор каждого сообщения в партиции.
Потребители могут хранить offset в Kafka или вне её (например, в базе данных или файловой системе). Если потребитель отключается, он может возобновить обработку с того места, где остановился, прочитав сохранённый offset.
Потребители используют метод poll() для опроса Kafka на наличие новых сообщений. Это асинхронный процесс, и Kafka будет отправлять потребителю доступные сообщения по мере их поступления.
- Потребитель может указывать тайм-аут, после которого метод poll() вернёт пустой результат, если сообщений нет.
- Потребитель должен обрабатывать сообщения, а затем снова опрашивать Kafka для получения новых данных.
- Инициализация: Потребитель подключается к Kafka-брокерам и присоединяется к consumer group. Он получает информацию о партиции топика, который будет читать.
- Подписка на топик: Потребитель подписывается на определённые топики с помощью метода
subscribe()
. - Опрос: Потребитель вызывает метод
poll()
для получения новых сообщений. Если в очереди есть сообщения, они передаются потребителю для обработки. - Обработка сообщений: Потребитель обрабатывает сообщения, извлекая полезную информацию из каждого.
- Подтверждение обработки: После обработки сообщения потребитель подтверждает обработку с помощью
commit()
. Это обновляет offset, позволяя потребителю продолжить чтение с места, на котором остановился. - Обработка ошибок: В случае ошибки потребитель может решить, как повторить обработку сообщения (например, с использованием механизма повторных попыток).
- Завершение работы: Когда потребитель завершает обработку, он выходит из consumer group и может закрыть соединение с Kafka.
- bootstrap.servers — список брокеров, к которым будет подключаться потребитель
- group.id — идентификатор группы потребителей
- auto.offset.reset — настройка поведения при отсутствии offset (
earliest
для чтения с самого начала илиlatest
для чтения с конца) - enable.auto.commit — указывает, должен ли потребитель автоматически коммитить offset. Если
false
, потребитель должен делать это вручную - auto.commit.interval.ms — определяет интервал времени между автоматическими коммитами offset сообщений, если включена автоматическая фиксация
- max.poll.records — максимальное количество сообщений, которые потребитель будет получать за один вызов
poll()
- session.timeout.ms — максимальное время без общения с Kafka перед тем, как потребитель считается недоступным
- client.rack — используется для указания серверной стойки или дата-центра. Это особенно важно в случае, если у вас есть распределённая инфраструктура Kafka с несколькими стойками или дата-центрами, где сообщения могут быть реплицированы между разными физическими местоположениями (например, несколькими дата-центрами).
Rack — это метка, которая идентифицирует физическое местоположение брокеров Kafka. В Kafka можно задать rack для каждого брокера
с помощью параметра broker.rack
, чтобы управлять репликацией данных, предпочтительно размещая реплики на разных физических машинах или в разных дата-центрах.
Преимущества использования client.rack
- Снижение задержек: Kafka будет предпочитать, чтобы данные попадали в тот же rack, где находится клиент, что уменьшает время отклика
- Повышенная отказоустойчивость: С правильной настройкой client.rack и broker.rack можно улучшить отказоустойчивость за счет размещения реплик в разных физически удаленных местах
- Лучшее использование ресурсов: Правильное распределение нагрузки по rack помогает избежать перегрузки одного физического местоположения
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;
public class KafkaConsumerExample {
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String groupId = "my-consumer-group";
String topic = "my-topic";
// Настройки Consumer
Map<String, Object> consumerConfigs = new HashMap<>();
consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Создание Consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs);
// Подписка на тему
consumer.subscribe(Collections.singletonList(topic));
try {
// Чтение сообщений из Kafka
while (true) {
var records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> System.out.println("Received message: " + record.value()));
}
} finally {
consumer.close();
}
}
}
At least once
Чтобы гарантировать обработку сообщений хотя бы один раз, нужно коммитить после обработки.
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;
public class KafkaConsumerAtLeastOnce {
public static void main(String[] args) {
try {
// Чтение сообщений
while (true) {
var records = consumer.poll(Duration.ofSeconds(1)); // Ожидание 1 секунду для получения сообщений
process(records);
consumer.commitAsync(); // Commit после обработки
}
} finally {
consumer.close(); // Закрытие consumer
}
}
}
At most once
Чтобы гарантировать обработку сообщений не более одного раза, нужно коммитить до обработки или включить авто-подтверждение смещений
enable.auto.commit=true
.
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;
public class KafkaConsumerAtLeastOnce {
public static void main(String[] args) {
try {
// Чтение сообщений
while (true) {
var records = consumer.poll(Duration.ofSeconds(1)); // Ожидание 1 секунду для получения сообщений
consumer.commitAsync(); // Commit перед обработкой
process(records);
}
} finally {
consumer.close(); // Закрытие consumer
}
}
}
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServer());
configs.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumerGroupId());
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return configs;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentMessageListenerContainerFactory<String, String> factory = new ConcurrentMessageListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "my_topic", groupId = "group_id")
public void listen(@Payload String message,
@Header("traceId") String traceId,
@Header("correlationId") String correlationId) {
System.out.println("Received message: " + message);
System.out.println("Trace ID: " + traceId);
System.out.println("Correlation ID: " + correlationId);
}
}
At least once
spring:
kafka:
consumer:
enable-auto-commit: false # Отключение авто-commit
auto-offset-reset: earliest # Начинать чтение с самого начала (если нет смещения)
group-id: my-consumer-group
max-poll-records: 500 # Максимальное количество сообщений для обработки за один раз
listener:
ack-mode: manual # Ручное подтверждение
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.config.DefaultMessageListenerContainer;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
@EnableKafka
public class AtLeastOnceConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-consumer-group")
public void listen(String message, Acknowledgment acknowledgment) {
System.out.println("Received message: " + message);
// Обработка сообщения
// Подтверждение смещения вручную после успешной обработки
acknowledgment.acknowledge();
}
}
At most once
spring:
kafka:
consumer:
enable-auto-commit: true # Включение авто-commit
group-id: my-consumer-group
auto-offset-reset: earliest # Начинать чтение с самого начала
max-poll-records: 100 # Максимальное количество сообщений для обработки за один раз
import org.springframework.kafka.annotation.KafkaListener;
public class AtMostOnceConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-consumer-group")
public void listen(String message) {
System.out.println("Received message: " + message);
// Обработка сообщения...
// Смещение будет автоматически зафиксировано после получения сообщения
}
}
spring:
cloud:
stream:
bindings:
input:
destination: my-topic
group: my-consumer-group
content-type: application/json
kafka:
binder:
brokers: localhost:9092
auto-create-topics: false
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
@Service
@EnableBinding(KafkaProcessor.class) // Указывает на интерфейс, с которым связывается этот сервис
public class KafkaConsumerService {
// Метод будет слушать сообщения из указанного канала
@StreamListener("input")
public void handle(@Payload String message) {
System.out.println("Received message: " + message);
}
}
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface KafkaProcessor {
@Input("input") // Имя канала, которое мы используем в application.yml
SubscribableChannel input();
}
At least once
spring:
cloud:
stream:
bindings:
input:
destination: my-topic
group: my-consumer-group
content-type: application/json
consumer:
ackMode: manual # Ручное подтверждение
maxAttempts: 3 # Максимальное количество попыток
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class) // Sink - это интерфейс, предоставляющий Binding для входных сообщений
public class AtLeastOnceConsumer {
@StreamListener(Sink.INPUT)
public void handleMessage(Message<String> message, @Header(name = "kafka_offset") String offset) {
// Обработка сообщения
System.out.println("Received message: " + message.getPayload());
// После успешной обработки подтверждаем сообщение
// Spring Cloud Stream автоматически подтвердит сообщение после завершения метода
// благодаря ackMode=manual и настроенному acknowledgment
}
}
At most once
spring:
cloud:
stream:
bindings:
input:
destination: my-topic
group: my-consumer-group
content-type: application/json
consumer:
ackMode: batch # Автоматическое подтверждение после пакета сообщений
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class AtMostOnceConsumer {
@StreamListener(Sink.INPUT)
public void handleMessage(Message<String> message) {
// Обработка сообщения
System.out.println("Received message: " + message.getPayload());
// Смещение будет автоматически зафиксировано после получения сообщения
}
}
Mostly Once
Это гибридный режим, который стремится быть чем-то средним между At least once и At most once. Он предполагает, что сообщения будут доставлены обычно один раз, но иногда, в случае сбоев, может быть обработано больше одного раза. Для реализации такого режима в Spring Cloud Stream потребуется дополнительная логика, например, фильтрация дублированных сообщений или использование уникальных идентификаторов сообщений.
В рамках Spring Cloud Stream, можно обработать Mostly Once с использованием уникальных идентификаторов сообщений или кеширования состояния, чтобы отфильтровать повторно обработанные сообщения.
spring:
cloud:
stream:
bindings:
input:
destination: my-topic
group: my-consumer-group
content-type: application/json
consumer:
ackMode: manual # Ручное подтверждение
maxAttempts: 3 # Максимальное количество попыток
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Set;
@Component
@EnableBinding(Sink.class)
public class MostlyOnceConsumer {
private Set<String> processedMessageIds = new HashSet<>();
@StreamListener(Sink.INPUT)
public void handleMessage(Message<String> message, @Header("messageId") String messageId) {
if (processedMessageIds.contains(messageId)) {
System.out.println("Duplicate message: " + messageId);
return; // Пропускаем дублированное сообщение
}
// Обработка сообщения
System.out.println("Received message: " + message.getPayload());
// Добавляем идентификатор в обработанные
processedMessageIds.add(messageId);
// После успешной обработки подтверждаем сообщение вручную
// Spring Cloud Stream подтвердит сообщение после выполнения метода
}
}
- Producer API
- Consumer API
- Streams API
- Connector API
Используется для публикации потока сообщений в топики Kafka. Он управляет партицированием сообщений, сжатием и балансировкой нагрузки между несколькими брокерами. Продюсер также отвечает за повторные неудачные попытки публикации и может быть настроен на различные уровни гарантий доставки.
Обеспечивает механизм для потребления сообщений топиков. Оно позволяет приложениям и микросервисам читать данные, поступающие в Kafka, и обрабатывать их для дальнейшего использования, будь то хранение, анализ или реактивная обработка.
Connector API в Apache Kafka является частью Kafka Connect, которая представляет собой инфраструктуру для интеграции внешних систем с Kafka. Connector API играет ключевую роль в упрощении процесса подключения различных источников данных и систем-приемников к Kafka, предоставляя возможность автоматического перемещения данных между ними.
Это компонент Apache Kafka, предназначенный для создания приложений и микросервисов, которые обрабатывают потоки данных в реальном времени. Его основная роль заключается в том, чтобы позволить разработчикам легко обрабатывать и анализировать данные, поступающие в виде непрерывных потоков из топиков. Kafka Streams API предоставляет высокоуровневый интерфейс для выполнения таких операций, как фильтрация, агрегация, объединение данных и вычисление оконных функций.
Kafka Transactions API позволяет выполнять атомарные обновления для нескольких топиков. Он включает exactly-once гарантию для приложений, которые читают данные из одного топика и пишут в другой. Это особенно полезно для приложений потоковой обработки, которым необходимо гарантировать, что каждое входное событие влияет на выходные данные ровно один раз, даже в случае сбоев.
Quota API позволяет настраивать квоты для каждого клиента для ограничения скорости создания или потребления данных, чтобы один клиент не потреблял слишком много ресурсов брокера. Это помогает обеспечить справедливое распределение ресурсов и предотвратить сценарии отказа в обслуживании.
AdminClient API предоставляет операции для управления топиками, брокерами, конфигурацией и другими объектами Kafka. Его можно использовать для создания, удаления и описания топиков, управления списками ACL, получения информации о кластере и программного выполнения других административных задач.
Координатор группы отвечает за управление группами потребителей. Он управляет членством в группах потребителей, назначает партиции потребителям внутри группы и управляет фиксацией смещения. Когда потребитель присоединяется к группе или покидает ее, координатор группы запускает перебалансировку для переназначения партиций среди оставшихся потребителей.
Consumer Heartbeat Thread отвечает за отправку периодических сигналов брокеру Kafka (в частности, координатору группы). Эти сигналы указывают на то, что потребитель жив и все еще является частью группы потребителей. Если потребитель не отправляет данные сигналы в течение настроенного периода, он считается неживым, и координатор группы инициирует перебалансировку для переназначения его партиций другим потребителям в группе.
Kafka поддерживает два основных способа обработки сообщений:
- Queue: каждое сообщение обрабатывается одним потребителем в группе потребителей. Это достигается за счет наличия в группе нескольких потребителей, каждый из которых считывает данные из отдельных партиций.
- Publish-Subscribe: все сообщения обрабатываются всеми потребителями. Это достигается за счет того, что каждый потребитель находится в своей собственной группе потребителей, что позволяет всем потребителям читать все сообщения.
Задержка (лаг) консюмера в Kafka относится к разнице между оффсетом последнего созданного сообщения и оффсетом последнего полученного сообщения. Kafka предоставляет инструменты и API для мониторинга задержек консюмеров такие, как инструмент командной строки Kafka Consumer Groups и API AdminClient. Высокая задержка консюмеров может указывать на проблемы с производительностью или недостаточную пропускную способность консюмеров. Kafka не обрабатывает задержки автоматически, но предоставляет информацию, необходимую приложениям для принятия решений о масштабировании или оптимизации производительности.
Метод subscribe() используется для подписки на один или несколько топиков. Фактически он не извлекает никаких данных. Метод poll(), с другой стороны, используется для извлечения данных из топиков. Он возвращает записи, которые были опубликованы с момента последнего запроса топиков и партиций. Метод poll() обычно вызывается в цикле для непрерывного получения данных.
Метод position() возвращает смещение следующей записи, которая будет извлечена для данной партиции. Это полезно для отслеживания хода получения данных и может использоваться в сочетании с методом committed(), чтобы определить насколько сильно потребитель отстал от своего последнего комита оффсета. Эта информация может быть ценной для мониторинга и управления показателями потребителей.
Эти методы используются для фиксации смещений:
- commitSync(): синхронно фиксирует последнее смещение, возвращенное poll(). Он будет повторять попытку до тех пор, пока не завершится успешно или не столкнется с непроверяемой ошибкой.
- commitAsync(): асинхронно фиксирует смещения. Он не повторяет попытку при сбое, что делает его более быстрым, но менее надежным, чем commitSync(). Выбор между этими методами зависит от баланса между производительностью и надежностью, требуемого приложением.
Идемпотентный продюсер гарантирует exactly-once гарантию доставки, предотвращая дублирование записей в Kafka в случае повторных попыток отправки сообщений. Это важно для поддержания целостности данных и правильности их обработки в системе, особенно в распределенных системах, где могут возникать ошибки связи или сбои.
Интерфейс Partitioner в Producer API определяет в какую партицию топика будет отправлено сообщение. Partitioner по-умолчанию использует хэш ключа (если он присутствует) для выбора партиции, гарантируя, что сообщения с одним и тем же ключом всегда отправляются в одну и ту же партицию. Могут быть реализованы пользовательские Partitioner для управления распределением сообщений по партициям на основе определенной бизнес-логики или характеристик данных.
Поток очистки журнала в Kafka отвечает за выполнение сжатия журнала. Сжатие журнала - это механизм, при котором Kafka удаляет избыточные записи, сохраняя только последнее значение для каждого ключа. Это полезно в тех случаях, когда требуется только последнее обновление для данного ключа, например, для обслуживания changelog или состояния БД. Программа очистки журналов периодически запускается для сжатия соответствующих партиций.
Это инструмент, позволяющий реплицировать данные между кластерами Kafka, потенциально находящихся в разных дата-центрах. Он работает, потребляя данные из одного кластера и передавая в другой. Можно использовать для создания резервной копии данных, объединения данных из нескольких дата-центров в единое хранилище или для переноса данных между кластерами.
Kafka Schema Registry предоставляет RESTful интерфейс для хранения и извлечения схем Avro. Schema Registry используется совместно с Kafka для обеспечения совместимости схем данных между производителями и потребителями. Это особенно полезно при разработке моделей данных с течением времени, сохраняя обратную и прямую совместимость.
Kafka Streams DSL предоставляет высокоуровневый API для операций потоковой обработки. Он позволяет разработчикам описывать сложную логику обработки, такую как фильтрация, преобразование, агрегирование и объединение потоков данных. DSL абстрагирует многие низкоуровневые детали потоковой обработки, упрощая создание и обслуживание приложений потоковой обработки.
Сама по себе Kafka не обеспечивает версионирование сообщений напрямую, но предоставляет механизмы, позволяющие реализовывать управление версиями. Одним из распространенных подходов является включение поля версии в схему сообщения. Для более сложных задач управления версиями используются реестры схем (например, Confluent Schema Registry), которые могут управлять изменением схемы и совместимостью.
Kafka использует pull-модель для извлечения сообщений. Потребители запрашивают сообщения у брокеров, а не брокеры отправляют сообщения потребителям. Это позволяет потребителям контролировать скорость, с которой они получают сообщения. Потребители отправляют запросы на получение данных от брокера, указывая топик, партицию и начальное смещение для каждой партиции. Брокер отвечает сообщениями с объемом до указанного максимального предела в байтах.
Kafka Streams и Apache Flink — это два мощных инструмента для обработки потоков данных в режиме реального времени, но они различаются по архитектуре, возможностям и сценариям применения.
Критерий | Kafka Streams | Apache Flink |
---|---|---|
Архитектура | Встроенная библиотека, работающая внутри приложения. Зависит от Kafka. | Независимая распределенная система потоковой обработки данных с возможностью интеграции с различными источниками и приемниками данных. |
Обработка данных | Обрабатывает потоки событий непосредственно из Kafka. Подходит для обработки событий и транзакционных данных с минимальной задержкой. | Поддерживает как потоковую (streaming), так и пакетную (batch) обработку данных. Специализируется на сложной обработке событий с гибкими возможностями управления состоянием. |
Зависимость от Kafka | Построена исключительно вокруг Kafka. Требует Kafka для получения и отправки данных. | Работает с широким спектром источников данных (Kafka, HDFS, базы данных и т. д.). Kafka — лишь один из многих источников. |
Установка | Легко интегрируется в существующее Java/Scala-приложение как библиотека. Не требует развертывания кластеров. | Требует отдельного кластера для выполнения, что подходит для высокопроизводительных распределенных систем. |
Управление состоянием | Встроенное состояние с использованием RocksDB, также поддержка репликации состояния. | Имеет развитую систему управления состоянием, поддерживает сложные функции восстановления состояния и обработки данных. |
Гарантия доставки | Поддерживает "at-least-once" и "exactly-once" семантику, когда Kafka настроена соответствующим образом. | Имеет гибкие гарантии доставки: поддержка "exactly-once", "at-least-once" и "at-most-once". |
Масштабируемость | Масштабируется автоматически вместе с Kafka-партициями. Каждая инстанция потребителя Kafka обрабатывает свою партицию. | Поддерживает масштабирование на уровне задач (task), с более гибкой моделью масштабирования и управления ресурсами. |
Обработка событий | Подходит для обработки событий с низкой задержкой и транзакционными требованиями. | Специализируется на сложной обработке событий, таких как windowing, агрегирование и работа с изменяющимся состоянием. Поддерживает сложные аналитические операции. |
Инструменты и API | Легковесная библиотека с простыми API для работы с потоками данных. Основные операции — фильтрация, маппинг, объединение потоков, windowing. | Продвинутая система с богатыми API для сложных вычислений, поддерживающая потоковую и пакетную обработку, обработку событий и контроль сложных бизнес-процессов. |
Требования к ресурсам | Менее ресурсоемка, так как не требует отдельного кластера. Работает в рамках JVM-приложения. | Требует более высоких вычислительных ресурсов, так как выполняется на отдельном кластере и поддерживает высокую степень параллелизма. |
- Если вы уже используете Kafka и вам нужна легковесная библиотека для обработки данных непосредственно внутри вашего приложения.
- Для сценариев с низкой задержкой, где данные приходят из Kafka и должны быть быстро обработаны с минимальными накладными расходами.
- Если вам нужно встроить обработку потоков данных в существующую Java/Scala программу без необходимости развертывания отдельных кластеров.
- Если вы работаете с потоковой и пакетной обработкой данных, где источники и приемники могут быть не только Kafka, но и другие системы (например, HDFS, базы данных).
- Для сложных задач обработки событий, требующих управления состоянием, временных окон, аналитики и восстановления после сбоев.
- Если ваш проект требует высокой производительности, гибкости, точных гарантий доставки и распределенной обработки в кластере.
- Kafka Streams — это идеальный выбор, если ваша инфраструктура уже основана на Kafka, и вам нужна быстрая и легковесная обработка потоков данных.
- Apache Flink — это мощный инструмент для сложных аналитических задач, потоковой обработки данных в режиме реального времени с поддержкой сложных схем обработки, который предоставляет больше возможностей для работы с разнообразными источниками данных.
Kafka Consumer - это клиент, который читает данные из топика и производит некоторую обработку. Обычно используется для простых сценариев получения данных. Kafka Stream, с другой стороны, более подвинутый клиент, который может потреблять, обрабатывать и класть данные обратно в Kafka. Он предоставляет DSL для сложных операций потоковой обработки, таких как фильтрация, преобразование, агрегирование и объединение потоков.
Apache Kafka и Apache Flume — это два популярных инструмента для обработки и передачи данных, однако они имеют разные цели и архитектуры. Вот основные различия между ними:
- Kafka: Это распределенная платформа для потоковой передачи данных, которая обеспечивает высокую пропускную способность и низкую задержку для обработки больших объемов данных. Kafka используется для создания стриминговых приложений и обработки данных в реальном времени. Она может быть использована для передачи логов, событий, метрик и других данных, требующих высокой доступности и масштабируемости.
- Flume: Это распределенная система для сбора, агрегации и передачи логов и событий. Flume обычно используется для доставки логов с серверов в HDFS, HBase или другие системы хранения данных. Его основное предназначение — это сбор данных из различных источников (например, лог-файлов) и передача их в системы хранения или аналитики.
- Kafka: В Kafka данные отправляются в топики и партиции, которые могут быть независимо прочитаны несколькими потребителями. Kafka ориентирована на высокую пропускную способность и масштабируемость. Это решает задачу обработки потоковых данных и событий в реальном времени.
- Flume: Flume состоит из источников (sources), каналов (channels) и приемников (sinks). Источник получает данные, канал их буферизует, а приемник отправляет их в конечную систему. Flume использует систему "event-based" и часто применяется для сбора логов.
- Kafka: Kafka сохраняет сообщения на диске в течение длительного времени (по умолчанию — до 7 дней) в топиках. Потребители могут читать данные в любой момент времени, и Kafka поддерживает концепцию сохранения и ретрансляции данных.
- Flume: Flume не имеет встроенного механизма долговременного хранения. Он просто передает данные в назначенные места хранения (например, HDFS). Данные в Flume не сохраняются долго, и если система хранения не доступна, они теряются.
- Kafka: Kafka предназначен для работы с высокими объемами данных. Он поддерживает масштабируемость как по производителям, так и по потребителям, и может обрабатывать миллионы сообщений в секунду с минимальной задержкой.
- Flume: Flume может быть менее масштабируемым по сравнению с Kafka и больше ориентирован на сбор логов и событий с различных источников. Хотя Flume тоже может обрабатывать большие объемы данных, он не предназначен для работы с такими большими потоками, как Kafka.
- Kafka: Используется для стриминга данных, аналитики в реальном времени, интеграции различных систем, работы с большими данными и построения событийных приложений.
- Flume: Используется для сбора, агрегации и передачи логов и событий в системы хранения, такие как HDFS, HBase, или внешние системы. Это идеальный выбор для организации потоков логирования и мониторинга.
- Kafka: Kafka поддерживает широкий спектр интеграций и может быть использован с различными системами для построения распределенных приложений и аналитических решений.
- Flume: Flume ориентирован на интеграцию с Hadoop-экосистемой, и основное его использование — это интеграция с HDFS, HBase и другими хранилищами данных в этой экосистеме.
- Kafka: Kafka поддерживает много потребителей, которые могут читать из одного и того же топика независимо, а также возможность повторного прочтения данных.
- Flume: Flume имеет фиксированную схему доставки данных и не поддерживает такую гибкость, как Kafka в части потребителей и обработки.
- Kafka: Kafka поддерживает гарантии доставки с различными уровнями подтверждения (acknowledgment), а также может обеспечивать доставку сообщений точно один раз (exactly-once semantics).
- Flume: Flume обеспечивает базовые гарантии доставки, но они менее строгие, чем у Kafka, и больше ориентированы на устойчивость к сбоям, а не на гарантированную доставку.
RabbitMQ и Apache Kafka — это две популярные системы обмена сообщениями, каждая из которых имеет свои особенности и используется для разных типов приложений. Вот основные различия между ними:
- RabbitMQ использует очереди сообщений. Сообщения отправляются в очередь, и один потребитель извлекает сообщение из очереди для обработки.
- Apache Kafka использует топики и партиции. Сообщения отправляются в топики, которые могут быть разделены на партиции, и несколько потребителей могут читать эти сообщения в любом порядке. Kafka ориентирован на большие потоки данных и масштабируемость.
- RabbitMQ: Сообщения передаются в очереди, и каждый потребитель получает одно сообщение. Сообщения могут быть подтверждены (acknowledged) или отклонены (rejected). RabbitMQ гарантирует, что сообщение будет доставлено хотя бы одному потребителю.
- Kafka: Сообщения сохраняются в топиках на длительный срок, и потребители могут читать их в любом порядке. Kafka гарантирует доставку сообщений всем потребителям, если они подписаны на топик, и может позволить многократное чтение старых сообщений.
- RabbitMQ: Предоставляет подтверждения доставки и может повторно отправить сообщение, если потребитель не подтвердил его получение. Можно настроить разные уровни надежности (например, за счет использования подтверждений или транзакций).
- Kafka: Сообщения сохраняются на диске, что позволяет потребителям считывать их в любое время. Kafka гарантирует доставку сообщений при определенной конфигурации репликации и сохранения.
- RabbitMQ: Лучше подходит для небольших и средних систем, где требуется высокая надежность и гарантированная доставка. Он поддерживает горизонтальное масштабирование, но требует дополнительных усилий для настройки и управления.
- Kafka: Отличается высокой производительностью и возможностью обработки больших объемов данных. Kafka легко масштабируется за счет партиционирования и репликации данных.
- RabbitMQ: Один потребитель получает одно сообщение. Если потребитель не успевает обработать сообщение, оно может быть повторно отправлено.
- Kafka: Потребители могут читать сообщения независимо друг от друга. Kafka сохраняет все сообщения в топиках, и потребители могут читать их в любое время. Kafka также поддерживает концепцию групп потребителей, где каждый потребитель группы обрабатывает разные партиции.
- RabbitMQ: Идеален для обработки запросов и ответов, распределенных приложений, микросервисов с гарантией доставки, бизнес-процессов с очередями задач.
- Kafka: Используется для обработки потоков данных, интеграции с большими данными, записи журналов, мониторинга, обработки событий в реальном времени и сохранения больших объемов данных для последующего анализа.
- RabbitMQ: Один производитель отправляет сообщения в очередь, и несколько потребителей могут обрабатывать эти сообщения.
- Kafka: Множество производителей могут отправлять сообщения в топики, и несколько потребителей могут читать их одновременно, поддерживая масштабируемость.
- RabbitMQ: Сообщения удаляются из очереди после их обработки потребителем. Хранение сообщений обычно краткосрочное.
- Kafka: Сообщения сохраняются на диске в топиках до тех пор, пока не истечет срок хранения (по конфигурации). Это позволяет повторно читать данные.