Kafka

Configuration Options

Required Options

bootstrap_servers(required)

A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself.

TypeSyntaxDefaultExample
stringliteral["10.14.22.123:9092,10.14.23.332:9092"]
inputs(required)

A list of upstream source or transform IDs. Wildcards (*) are supported.

See configuration for more info.

TypeSyntaxDefaultExample
arrayliteral["my-source-or-transform-id","prefix-*"]
encoding(required)

Configures the encoding specific sink behavior.

TypeSyntaxDefaultExample
hashliteral[]
topic(required)

The Kafka topic name to write events to.

TypeSyntaxDefaultExample
stringliteral["topic-1234","logs-{{unit}}-%Y-%m-%d"]
type(required)

The component type. This is a required field for all components and tells Vector which component to use.

TypeSyntaxDefaultExample
stringliteral["kafka"]

Advanced Options

key_field(optional)

The log field name or tags key to use for the topic key. If the field does not exist in the log or in tags, a blank value will be used. If unspecified, the key is not sent. Kafka uses a hash of the key to choose the partition or uses round-robin if the record has no key.

TypeSyntaxDefaultExample
stringliteral["user_id"]
librdkafka_options(optional)

Advanced options. See librdkafka documentation for details.

TypeSyntaxDefaultExample
hash[{"client.id":"${ENV_VAR}","fetch.error.backoff.ms":"1000","socket.send.buffer.bytes":"100"}]
message_timeout_ms(optional)

Local message timeout.

TypeSyntaxDefaultExample
uint300000[150000,450000]
sasl(optional)

Options for SASL/SCRAM authentication support.

TypeSyntaxDefaultExample
hashliteral[]
socket_timeout_ms(optional)

Default timeout for network requests.

TypeSyntaxDefaultExample
uint60000[30000,60000]
buffer(optional)

Configures the sink specific buffer behavior.

TypeSyntaxDefaultExample
hashliteral[]
batch(optional)

Configures the sink batching behavior.

TypeSyntaxDefaultExample
hash[]
compression(optional)

The compression strategy used to compress the encoded event data before transmission.

Some cloud storage API clients and browsers will handle decompression transparently, so files may not always appear to be compressed depending how they are accessed.

TypeSyntaxDefaultExample
stringliteralnone
healthcheck(optional)

Health check options for the sink.

TypeSyntaxDefaultExample
hash[]
tls(optional)

Configures the TLS options for incoming connections.

TypeSyntaxDefaultExample
hashliteral[]

How it Works

librdkafka

The kafka sink uses librdkafka under the hood. This is a battle-tested, high performance, and reliable library that facilitates communication with Kafka. As Vector produces static MUSL builds, this dependency is packaged with Vector, meaning you do not need to install it.

State

This component is stateless, meaning its behavior is consistent across each input.

Health checks

Health checks ensure that the downstream service is accessible and ready to accept data. This check is performed upon sink initialization. If the health check fails an error will be logged and Vector will proceed to start.

Transport Layer Security (TLS)

Vector uses OpenSSL for TLS protocols due to OpenSSL's maturity. You can enable and adjust TLS behavior using the tls.* options.

Buffers and batches

This component buffers & batches data as shown in the diagram above. You'll notice that Vector treats these concepts differently, instead of treating them as global concepts, Vector treats them as sink specific concepts. This isolates sinks, ensuring services disruptions are contained and delivery guarantees are honored.

Batches are flushed when 1 of 2 conditions are met:

  1. The batch age meets or exceeds the configured timeout_secs.
  2. The batch size meets or exceeds the configured max_size or max_events.

Buffers are controlled via the buffer.* options.