fastify-kafkajs is a Fastify plugin which adds support for KafkaJS - a modern Apache Kafka client written in pure JavaScript, without librdkafka
bindings.
- Decorates the Fastify instance with
kafka
object exposing aclient
(KafkaJS client instance),producer
(a single, pre-connected KafkaJS producer) andconsumers
(array of consumers declared in plugin options). - Disconnects the producer and consumers when the Fastify
onClose
hook is triggered. - Has 100% test coverage.
npm install fastify-kafkajs
const app = fastify();
app.register(fastifyKafkaJS, {
clientConfig: {
brokers: ['localhost:9092'],
clientId: 'demo-app'
},
consumers: [
{
consumerConfig: {
groupId: 'example-consumer-group'
},
subscription: {
topics: ['test-topic'],
fromBeginning: false
},
runConfig: {
eachMessage: async ({ message }) => {
console.log(`Consumed message: ${message.value}`);
}
}
}
]
});
app.post('/produce', async (request, reply) => {
return app.kafka.producer.send({
topic: 'test-topic',
messages: [{ key: 'key1', value: randomUUID() }]
});
});
A complete example is available in the example
folder.
By default, fastify-kafkajs
disconnects the producer and consumers when the application's onClose
hook is triggered. You can opt-out of this default behavior by setting the ignoreOnClose
option to false
.
In order to ensure that the onClose
hook is triggered when the process receives SIGINT
or SIGTERM
(thus allowing fastify-kafkajs
to disconnect the producer and consumers and preventing delayed consumer group re-balancing), usage of fastify-graceful-shutdown plugin is recommended.
npm install fastify-graceful-shutdown
import fastifyGracefulShutdown from 'fastify-graceful-shutdown';
fastify.register(fastifyGracefulShutdown);
The config object passed as a second parameter passed to register() is optional (since 3 out of 4 keys have default values, and a KafkaJS producer can be initialized with default values when provided with empty config) and has the following schema:
interface FastifyKafkaJSOptions {
/**
* KafkaJS client config
* @default {
* brokers: ['localhost:9092'],
* clientId: 'fastify-kafkajs'
* }
*/
clientConfig?: KafkaConfig;
/**
* KafkaJS producer config
*/
producerConfig?: ProducerConfig;
/**
* Array of objects describing consumers
* @default []
*/
consumers?: FastifyKafkaJSConsumerDeclaration[];
/**
* Ignore the default onClose handled which closes the producer
* and all consumers. If set to true, you will have to manage
* closing the producer and the consumers yourself.
* @default false
*/
ignoreOnClose?: boolean;
}
where FastifyKafkaJSConsumerDeclaration
has the following keys:
consumerConfig
- Consumer config passed toKafkaJS.consumer()
during consumer creationsubscription
- parameter passed toConsumer.subscribe()
runConfig
- config object passed toConsumer.run()
Upon being registered, fastify-kafkajs decorates the FastifyInstance
with kafka
exposing the following keys:
client
- a KafkaJS client instanceproducer
- a single, pre-connected KafkaJS producer (initialized with config specified inclientConfig
options key)consumers
- array of pre-connected, subscribed and started KafkaJS consumers, declared insubscribers
options key