Kafka
Configuration Options
Required Options
bootstrap_servers(required)
A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself.
Type | Syntax | Default | Example |
---|---|---|---|
string | literal | ["10.14.22.123:9092,10.14.23.332:9092"] |
inputs(required)
A list of upstream source or transform
IDs. Wildcards (*
) are supported.
See configuration for more info.
Type | Syntax | Default | Example |
---|---|---|---|
array | literal | ["my-source-or-transform-id","prefix-*"] |
encoding(required)
Configures the encoding specific sink behavior.
Type | Syntax | Default | Example |
---|---|---|---|
hash | literal | [] |
topic(required)
The Kafka topic name to write events to.
Type | Syntax | Default | Example |
---|---|---|---|
string | literal | ["topic-1234","logs-{{unit}}-%Y-%m-%d"] |
type(required)
The component type. This is a required field for all components and tells Vector which component to use.
Type | Syntax | Default | Example |
---|---|---|---|
string | literal | ["kafka"] |
Advanced Options
key_field(optional)
The log field name or tags key to use for the topic key. If the field does not exist in the log or in tags, a blank value will be used. If unspecified, the key is not sent. Kafka uses a hash of the key to choose the partition or uses round-robin if the record has no key.
Type | Syntax | Default | Example |
---|---|---|---|
string | literal | ["user_id"] |
librdkafka_options(optional)
Advanced options. See librdkafka documentation for details.
Type | Syntax | Default | Example |
---|---|---|---|
hash | [{"client.id":"${ENV_VAR}","fetch.error.backoff.ms":"1000","socket.send.buffer.bytes":"100"}] |
message_timeout_ms(optional)
Local message timeout.
Type | Syntax | Default | Example |
---|---|---|---|
uint | 300000 | [150000,450000] |
sasl(optional)
Options for SASL/SCRAM authentication support.
Type | Syntax | Default | Example |
---|---|---|---|
hash | literal | [] |
socket_timeout_ms(optional)
Default timeout for network requests.
Type | Syntax | Default | Example |
---|---|---|---|
uint | 60000 | [30000,60000] |
buffer(optional)
Configures the sink specific buffer behavior.
Type | Syntax | Default | Example |
---|---|---|---|
hash | literal | [] |
batch(optional)
Configures the sink batching behavior.
Type | Syntax | Default | Example |
---|---|---|---|
hash | [] |
compression(optional)
The compression strategy used to compress the encoded event data before transmission.
Some cloud storage API clients and browsers will handle decompression transparently, so files may not always appear to be compressed depending how they are accessed.
Type | Syntax | Default | Example |
---|---|---|---|
string | literal | none |
healthcheck(optional)
Health check options for the sink.
Type | Syntax | Default | Example |
---|---|---|---|
hash | [] |
tls(optional)
Configures the TLS options for incoming connections.
Type | Syntax | Default | Example |
---|---|---|---|
hash | literal | [] |
How it Works
librdkafka
The kafka
sink uses librdkafka
under the hood. This
is a battle-tested, high performance, and reliable library that facilitates
communication with Kafka. As Vector produces static MUSL builds,
this dependency is packaged with Vector, meaning you do not need to install it.
State
This component is stateless, meaning its behavior is consistent across each input.
Health checks
Health checks ensure that the downstream service is accessible and ready to accept data. This check is performed upon sink initialization. If the health check fails an error will be logged and Vector will proceed to start.
Transport Layer Security (TLS)
Buffers and batches
This component buffers & batches data as shown in the diagram above. You'll notice that Vector treats these concepts differently, instead of treating them as global concepts, Vector treats them as sink specific concepts. This isolates sinks, ensuring services disruptions are contained and delivery guarantees are honored.
Batches are flushed when 1 of 2 conditions are met:
- The batch age meets or exceeds the configured
timeout_secs
. - The batch size meets or exceeds the configured
max_size
ormax_events
.
Buffers are controlled via the buffer.*
options.