Kafka

Configuration Options

Required Options

bootstrap_servers(required)

A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself.

TypeSyntaxDefaultExample
stringliteral["10.14.22.123:9092,10.14.23.332:9092"]
group_id(required)

The consumer group name to be used to consume events from Kafka.

TypeSyntaxDefaultExample
stringliteral["consumer-group-name"]
topics(required)

The Kafka topics names to read events from. Regex is supported if the topic begins with ^.

TypeSyntaxDefaultExample
arrayliteral["^(prefix1|prefix2)-.+","topic-1","topic-2"]
type(required)

The component type. This is a required field for all components and tells Vector which component to use.

TypeSyntaxDefaultExample
stringliteral["kafka"]

Advanced Options

acknowledgements(optional)

Controls if the source will wait for destination sinks to deliver the events before acknowledging receipt.

TypeSyntaxDefaultExample
bool
auto_offset_reset(optional)

If offsets for consumer group do not exist, set them using this strategy. See the librdkafka documentation for the auto.offset.reset option for further clarification.

TypeSyntaxDefaultExample
stringliterallargest["smallest","earliest","beginning","largest","latest","end","error"]
commit_interval_ms(optional)

The frequency that the consumer offsets are committed (written) to offset storage.

TypeSyntaxDefaultExample
uint5000[5000,10000]
fetch_wait_max_ms(optional)

Maximum time the broker may wait to fill the response.

TypeSyntaxDefaultExample
uint100[50,100]
key_field(optional)

The log field name to use for the Kafka message key.

TypeSyntaxDefaultExample
stringliteralmessage_key["message_key"]
topic_key(optional)

The log field name to use for the Kafka topic.

TypeSyntaxDefaultExample
stringliteraltopic["topic"]
partition_key(optional)

The log field name to use for the Kafka partition name.

TypeSyntaxDefaultExample
stringliteralpartition["partition"]
offset_key(optional)

The log field name to use for the Kafka offset.

TypeSyntaxDefaultExample
stringliteraloffset["offset"]
headers_key(optional)

The log field name to use for the Kafka headers.

TypeSyntaxDefaultExample
stringliteral["headers"]
librdkafka_options(optional)

Advanced options. See librdkafka documentation for details.

TypeSyntaxDefaultExample
hash[{"client.id":"${ENV_VAR}","fetch.error.backoff.ms":"1000","socket.send.buffer.bytes":"100"}]
sasl(optional)

Options for SASL/SCRAM authentication support.

TypeSyntaxDefaultExample
hashliteral[]
session_timeout_ms(optional)

The Kafka session timeout in milliseconds.

TypeSyntaxDefaultExample
uint10000[5000,10000]
socket_timeout_ms(optional)

Default timeout for network requests.

TypeSyntaxDefaultExample
uint60000[30000,60000]
tls(optional)

Configures the TLS options for incoming connections.

TypeSyntaxDefaultExample
hashliteral[]

How it Works

librdkafka

The kafka sink uses librdkafka under the hood. This is a battle-tested, high performance, and reliable library that facilitates communication with Kafka. As Vector produces static MUSL builds, this dependency is packaged with Vector, meaning you do not need to install it.

State

This component is stateless, meaning its behavior is consistent across each input.

Transport Layer Security (TLS)

Vector uses OpenSSL for TLS protocols. You can adjust TLS behavior via the tls.* options.

Context

By default, the kafka source augments events with helpful context keys.