Kafka

The Observo AI Kafka destination enables real-time streaming of telemetry data, such as logs, metrics, and traces, to Apache Kafka topics, supporting formats like JSON, Avro, and Protobuf with secure authentication (SASL, Kerberos, or SSL) and compression (Gzip, Snappy) for scalable data integration and processing.

Purpose

The purpose of the Observo AI Kafka Destination is to enable the routing and delivery of telemetry data (such as logs, metrics, or traces) from an Observo AI Site to a Kafka cluster for further processing, storage, or analysis. It allows organizations to integrate Observo AI’s data observability and routing capabilities with Apache Kafka, a distributed streaming platform, to stream data to Kafka topics in real-time. This facilitates use cases such as:

  • Real-Time Data Processing: Sending telemetry data to Kafka for processing by downstream applications, such as analytics platforms, SIEM systems, or machine learning pipelines.

  • Centralized Data Collection: Aggregating data from multiple sources into a Kafka topic for unified storage or distribution to other systems.

  • Data Transformation and Enrichment: Leveraging Observo AI’s pipeline capabilities to structure, filter, mask, or enrich data before forwarding it to a Kafka topic.

  • Scalable Data Integration: Supporting high-volume, high-throughput environments by streaming data to Kafka, which can handle large-scale, distributed data workloads.

By configuring a Kafka destination, Observo AI users can seamlessly integrate with Kafka-based ecosystems, ensuring data is delivered in formats like JSON, Avro, or Protobuf, with support for authentication such as SASL, Kerberos, or SSL and compression such as Gzip, Snappy for secure and efficient data transfer.

Prerequisites

Before configuring a Kafka destination in Observo AI, ensure the following requirements are met:

  • Observo AI Account: You must have an active Observo AI account with administrative access to the Observo console.

  • Kafka Cluster: A running Kafka cluster such as Apache Kafka, Confluent Cloud, or Azure Event Hubs with at least one accessible broker. Obtain the list of bootstrap servers such as localhost:9092.

  • Topic Configuration: A Kafka topic must be created in the cluster to receive data. Ensure you have the topic name and appropriate permissions to publish events.

  • Authentication Details: If the Kafka cluster requires authentication, prepare the necessary credentials such as SASL/PLAIN username and password, Kerberos keytab, or SSL certificates.

  • Network Access: Ensure your Observo Site (data plane) has network connectivity to the Kafka bootstrap servers, with no firewall rules blocking outbound TCP traffic (typically port 9092). Kafka uses a binary protocol over TCP and does not support HTTP proxies, so direct connectivity is required.

  • Data Schema: Understand the expected data format for the Kafka topic such as JSON, Avro, or Protobuf. Ensure compatibility with the destination topic’s schema.

  • Observo Site Deployment: A functional Observo Site must be deployed in your environment (on-premises or cloud) to handle data routing. Refer to the Observo AI documentation for deployment instructions.

Prerequisite
Description
Notes

Observo AI Platform

The Observo AI Site must be installed and available.

Verify support for JSON, Avro, or Protobuf payloads.

Network

Connectivity to Kafka broker ports.

Default port: TCP 9092; check firewall rules.

Authentication

SASL, Kerberos, or SSL setup if enabled.

Provide credentials, keytab, or SSL certificates as needed.

Kafka Topic

A pre-existing topic in the Kafka cluster.

Ensure write permissions for the topic.

Integration

To configure a Kafka destination in Observo AI for sending telemetry data, follow these steps:

  • Log in to Observo AI:

    • Navigate to the Destinations tab in the Observo AI interface.

    • Click on the "Add Destination" button and select "Create New".

    • Choose "Kafka" from the list of available destinations.

  • General Settings:

    • Name: Add a unique identifier, such as kafka-dest-1.

    • Description (Optional): Provide a description for the destination.

    • Bootstrap Servers: Enter the list of Kafka bootstrap servers. Multiple servers can be comma-separated.

      Examples

      10.14.22.123:9092

      10.14.23.332:9092

    • Topic: Specify the Kafka topic to publish events to.

      Examples

      topic-1234

      logs-{{unit}}-%Y-%m-%d

  • Acknowledgement:

    • Acknowledgements Enabled (False): Whether or not end-to-end acknowledgements are enabled. When enabled, any source connected to this supporting end-to-end acknowledgements, will wait for events to be acknowledged by the sink before acknowledging them at the source.

  • SASL Authentication:

    • SASL Enable (False): Enables SASL authentication. Only PLAIN and SCRAM mechanisms are supported when configuring SASL authentication via sasl.*. For other mechanisms, Librdkafka Options must be used directly to configure other librdkafka-specific values i.e. sasl.kerberos.* and so on. NOT supported on Windows.

    • SASL Mechanism (Optional): Enter the authentication mechanism.

      Examples

      SCRAM-SHA-256

      SCRAM-SHA-512

    • SASL Password (Optional): Provide the SASL password or select a stored secret.

      Example

      password

    • SASL Username (Optional): Provide the SASL username.

      Example

      username

  • Encoding:

    • Encoding Codec: The codec to use for encoding events. Default: JSON Encoding.

      Options
      Sub-Options

      JSON Encoding

      Pretty JSON (False): Format JSON with indentation and line breaks for better readability. Encoding Avro Schema (Optional): The Avro schema. Example: { "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] } Encoding Metric Tag Values (Select): Controls how metric tag values are encoded. - Tag values will be exposed as single strings (default) - Tags exposed as arrays of strings Note: When set to single, only the last non-bare value of tags will be displayed with the metric. When set to full, all metric tags will be exposed as separate assignments. Encoding Timestamp Format (Select): - RFC3339 format - UNIX format

      logfmt Encoding

      Encoding Avro Schema (Optional): The Avro schema. Example: { "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] } Encoding Metric Tag Values (Select): Controls how metric tag values are encoded. - Tag values will be exposed as single strings (default) - Tags exposed as arrays of strings Note: When set to single, only the last non-bare value of tags will be displayed with the metric. When set to full, all metric tags will be exposed as separate assignments. Encoding Timestamp Format (Select): - RFC3339 format - UNIX format

      Apache Avro Encoding

      Avro Schema: Specify the Apache Avro schema definition for serializing events. Example: { "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] } Encoding Avro Schema (Optional): The Avro schema. Example: { "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] } Encoding Metric Tag Values (Select): Controls how metric tag values are encoded. - Tag values will be exposed as single strings (default) - Tags exposed as arrays of strings Note: When set to single, only the last non-bare value of tags will be displayed with the metric. When set to full, all metric tags will be exposed as separate assignments. Encoding Timestamp Format (Select): - RFC3339 format - UNIX format

      Newline Delimited JSON Encoding

      Encoding Avro Schema (Optional): The Avro schema. Example: { "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] } Encoding Metric Tag Values (Select): Controls how metric tag values are encoded. - Tag values will be exposed as single strings (Default) - Tags exposed as arrays of strings Note: When set to single, only the last non-bare value of tags will be displayed with the metric. When set to full, all metric tags will be exposed as separate assignments. Encoding Timestamp Format (Select): - RFC3339 format - UNIX format

      No encoding

      Encoding Avro Schema (Optional): The Avro schema. Example: { "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] } Encoding Metric Tag Values (Select): Controls how metric tag values are encoded. - Tag values will be exposed as single strings (default) - Tags exposed as arrays of strings Note: When set to single, only the last non-bare value of tags will be displayed with the metric. When set to full, all metric tags will be exposed as separate assignments. Encoding Timestamp Format (Select): - RFC3339 format - UNIX format

      Plain text encoding

      Encoding Avro Schema (Optional): The Avro schema. Example: { "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] } Encoding Metric Tag Values (Select): Controls how metric tag values are encoded. - Tag values will be exposed as single strings (default) - Tags exposed as arrays of strings Note: When set to single, only the last non-bare value of tags will be displayed with the metric. When set to full, all metric tags will be exposed as separate assignments. Encoding Timestamp Format (Select): - RFC3339 format - UNIX format

      Parquet

      Include Raw Log (False): Capture the complete log message as an additional field(observo_record) apart from the given schema. Examples: In addition to the Parquet schema, there will be a field named "observo_record" in the Parquet file. Parquet Schema: Enter parquet schema for encoding. Examples: message root { optional binary stream; optional binary time; optional group kubernetes { optional binary pod_name; optional binary pod_id; optional binary docker_id; optional binary container_hash; optional binary container_image; optional group labels { optional binary pod-template-hash; } } } Encoding Avro Schema (Optional): The Avro schema. Example: { "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] } Encoding Metric Tag Values (Select): Controls how metric tag values are encoded. - Tag values will be exposed as single strings (default) - Tags exposed as arrays of strings Note: When set to single, only the last non-bare value of tags will be displayed with the metric. When set to full, all metric tags will be exposed as separate assignments. Encoding Timestamp Format (Select): - RFC3339 format - UNIX format

      Common Event Format (CEF)

      CEF Device Event Class ID: Provide a unique identifier for categorizing the type of event (maximum 1023 characters). Example: login-failure CEF Device Product: Specify the product name that generated the event (maximum 63 characters). Example: Log Analyzer CEF Device Vendor: Specify the vendor name that produced the event (maximum 63 characters). Example: Observo CEF Device Version: Specify the version of the product that generated the event (maximum 31 characters). Example: 1.0.0 CEF Extensions (Add): Define custom key-value pairs for additional event data fields in CEF format. CEF Name: Provide a human-readable description of the event (maximum 512 characters). Example: cef.name CEF Severity: Indicate the importance of the event with a value from 0 (lowest) to 10 (highest). Example: 5 CEF Version (Select): Specify which version of the CEF specification to use for formatting. - CEF specification version 0.1 - CEF specification version 1.x Encoding Avro Schema (Optional): The Avro schema. Example: { "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] } Encoding Metric Tag Values (Select): Controls how metric tag values are encoded. - Tag values will be exposed as single strings (default) - Tags exposed as arrays of strings Note: When set to single, only the last non-bare value of tags will be displayed with the metric. When set to full, all metric tags will be exposed as separate assignments. Encoding Timestamp Format (Select): - RFC3339 format - UNIX format

      CSV Format

      CSV Fields (Add): Specify the field names to include as columns in the CSV output and their order. Examples: - timestamp - host - message CSV Buffer Capacity (Optional): Set the internal buffer size (in bytes) used when writing CSV data. Example: 8192 CSV Delimitier (Optional): Set the character that separates fields in the CSV output. Example: , Enable Double Quote Escapes (True): When enabled, quotes in field data are escaped by doubling them. When disabled, an escape character is used instead. CSV Escape Character (Optional): Set the character used to escape quotes when double_quote is disabled. Example: <br> CSV Quote Character (Optional): Set the character used for quoting fields in the CSV output. Example: " CSV Quoting Style (Optional): Control when field values should be wrapped in quote characters. Options: - Always quot all fields - Quote only when necessary - Never use quotes - Quote all non-numeric fields Encoding Avro Schema (Optional): The Avro schema. Example: { "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] } Encoding Metric Tag Values (Select): Controls how metric tag values are encoded. - Tag values will be exposed as single strings (default) - Tags exposed as arrays of strings Note: When set to single, only the last non-bare value of tags will be displayed with the metric. When set to full, all metric tags will be exposed as separate assignments. Encoding Timestamp Format (Select): - RFC3339 format - UNIX format

      Protocol Buffers

      Protobuf Message Type: Specify the fully qualified message type name for Protobuf serialization. Example: package.Message Protobuf Descriptor File: Specify the path to the compiled protobuf descriptor file (.desc). Example: /path/to/descriptor.desc Encoding Avro Schema (Optional): The Avro schema. Example: { "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] } Encoding Metric Tag Values (Select): Controls how metric tag values are encoded. - Tag values will be exposed as single strings (default) - Tags exposed as arrays of strings Note: When set to single, only the last non-bare value of tags will be displayed with the metric. When set to full, all metric tags will be exposed as separate assignments. Encoding Timestamp Format (Select): - RFC3339 format - UNIX format

      Graylog Extended Log Format (GELF)

      Encoding Avro Schema (Optional): The Avro schema. Example: { "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] } Encoding Metric Tag Values (Select): Controls how metric tag values are encoded. - Tag values will be exposed as single strings (default) - Tags exposed as arrays of strings Note: When set to single, only the last non-bare value of tags will be displayed with the metric. When set to full, all metric tags will be exposed as separate assignments. Encoding Timestamp Format (Select): - RFC3339 format - UNIX format

  • Batching Configuration:

    • Batch Max Bytes: Set the maximum batch size in bytes. Default: Empty.

    • Batch Max Events: Set the maximum number of events per batch Default: Empty.

    • Batch Timeout Seconds: Set the maximum time to wait before sending a batch. Default: 1

  • TLS Configuration (Optional):

    • TLS CA: The CA certificate provided as an inline string in PEM format.

    • TLS CRT: The certificate as a string in PEM format.

    • TLS Enabled (False): Whether to require TLS for incoming/outgoing connections. When enabled for incoming connections, TLS certificate is also required. See TLS CRT File for more information.

    • TLS Key: The key provided as a string in PEM format.

    • TLS Verify Certificate (False): Enables certificate verification.

    • Certificates must be valid in terms of not being expired, and being issued by a trusted issuer. This verification operates in a hierarchical manner, checking validity of the certificate, the issuer of that certificate and so on until reaching a root certificate. Relevant for both incoming and outgoing connections. Do NOT set this to false unless you understand the risks of not verifying the validity of certificates.

    • TLS Verify Hostname (False): Enables hostname verification. Hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension. Only relevant for outgoing connections. NOT recommended to set this to false unless you understand the risks.

  • Buffering Configuration (Optional):

    • Buffer Type: Specifies the buffering mechanism for event delivery. Default: Empty

      Options
      Description

      Memory

      High-Performance, in-memory buffering Max Events: The maximum number of events allowed in the buffer. Default: 500 When Full: Event handling behavior when a buffer is full. Default: Block - Block: Wait for free space in the buffer.This applies backpressure up the topology, signalling that sources should slow down the acceptance/consumption of events. This means that while no data is lost, data will pile up at the edge. - Drop Newest: Drop the event instead of waiting for free space in the buffer. The event will be intentionally dropped. This mode is typically used when performance is the highest priority, and it is preferable to temporarily lose events rather than cause a slowdown in the acceptance/consumption of events.

      Disk

      Lower-Performance, Less-costly, on disk buffering Max Bytes Size: The maximum number of bytes size allowed in the buffer. Must be at-least 268435488 When Full: Event handling behavior when a buffer is full. Default: Block - Block: Wait for free space in the buffer. This applies backpressure up the topology, signalling that sources should slow down the acceptance/consumption of events. This means that while no data is lost, data will pile up at the edge. - Drop Newest: Drop the event instead of waiting for free space in the buffer. The event will be intentionally dropped. This mode is typically used when performance is the highest priority, and it is preferable to temporarily lose events rather than cause a slowdown in the acceptance/consumption of events.

  • Advanced Settings:

    • Compression: Supported compression types for Kafka. Default: No compression.

      Options

      Gzip

      LZ4

      No compression

      Snappy

      Zstandard

    • Headers Key (Optional): The log field name to use for the Kafka headers. If omitted, no headers will be written. Example: headers.

    • Key Field (Optional): The log field name or tags key to use for the topic key. If the field does not exist in the log or in tags, a blank value will be used. If unspecified, the key is not sent. Kafka uses a hash of the key to choose the partition or uses round-robin if the record has no key. Example: user_id

    • Librdkafka Options (Add): A librdkafka configuration option. (Add key/value pairs as needed)

    • Message Timeout Ms: Local message timeout, in milliseconds. Default: 300000

    • Socket Timeout Ms: Default timeout, in milliseconds, for network requests. Default: 60000

  • Save and Test Configuration:

    • Save the configuration settings in Observo AI.

    • Send test data through the pipeline and verify that it appears in the Kafka topic using a Kafka consumer or monitoring tool.

    • Check the Observo console’s logs for pipeline status and confirm successful data delivery.

Example Scenarios

RealTime Insights, a fictitious company specializing in real-time analytics, aims to integrate Observo with a Confluent Cloud Kafka cluster to stream security telemetry data for downstream processing. The Kafka cluster is configured with SASL/PLAIN authentication using the SCRAM-SHA-512 mechanism, and data will be sent to a specific topic with JSON encoding and Unix timestamps. The configuration ensures secure and efficient data delivery to support their analytics pipeline.

Standard Kafka Destination Setup

Here is a standard Kafka Destination configuration example. Only the required sections and their associated field updates are displayed in the table below:

General Settings

Field
Value
Description

Name

kafka-dest-realtime-1

Unique identifier for the destination.

Description

Streams security telemetry data to Confluent Cloud Kafka for real-time analytics.

Provides context for the destination's purpose.

Bootstrap Servers

broker1.confluent.cloud:9092,broker2.confluent.cloud:9092

Comma-separated list of Confluent Cloud Kafka bootstrap servers.

Topic

security-logs-2025

Kafka topic to publish security telemetry events.

SASL Authentication

Field
Value
Description

SASL Enable

True

Enables SASL authentication for secure access to the Kafka cluster.

SASL Mechanism

SCRAM-SHA-512

Specifies SCRAM-SHA-512 as the SASL authentication mechanism.

SASL Username

realtime-user

Username for authenticating with the Confluent Cloud Kafka cluster.

Encoding

Field
Value
Description

Encoding Codec

JSON Encoding

Specifies JSON as the encoding format for events sent to Kafka.

Encoding Metric Tag Values

Single Strings

Specifies that metric tag values are exposed as single strings, aligning with the default setting.

Encoding Timestamp Format

Unix Timestamp

Specifies Unix Timestamp format for event timestamps, overriding the default RFC 3339.

Save and Test Configuration:

  • Save settings, send test data, verify ingestion in the security-logs-2025 topic using a Kafka consumer.

  • Saves configuration, tests data flow, and confirms data appears in the Kafka topic.

Notes:

  • Ensure the Confluent Cloud Kafka cluster is accessible at broker1.confluent.cloud:9092,broker2.confluent.cloud:9092 and the credentials (realtime-user, C0nflu3nt#2025!) have write permissions to the security-logs-2025 topic.

  • Verify network connectivity over TCP port 9092 and check firewall rules to allow outbound traffic from Observo AI to the Kafka brokers.

  • Confirm the JSON payload aligns with the topic’s expected schema; use Observo’s transform capabilities to adjust data if needed.

  • Monitor Observo AI’s logs and use a Kafka consumer tool such as Confluent Cloud’s UI or kcat to verify data ingestion and troubleshoot errors.

  • For enhanced security, consider enabling TLS in the TLS Configuration section, as Confluent Cloud supports it by default.

This configuration enables RealTime Insights to stream security telemetry data from Observo to their Confluent Cloud Kafka cluster for real-time analytics.

Troubleshooting

Common issues and solutions when configuring a Kafka destination:

  • Connection Errors:

    • Issue: The Observo Site cannot connect to the Kafka broker.

    • Solution: Verify the bootstrap server addresses and ports such as localhost:9092. Ensure network connectivity and check firewall rules to allow outbound TCP traffic to the Kafka cluster. Test connectivity using tools like telnet or kcat.

  • Authentication Failures:

    • Issue: The Kafka broker rejects requests due to invalid credentials.

    • Solution: Double-check the authentication settings such as SASL/PLAIN credentials or Kerberos keytab. Ensure the keytab file is accessible on all Observo Site nodes and the principal is correct. For Kerberos, verify the /etc/krb5.conf file has rdns = false.

  • Data Format Issues:

    • Issue: The Kafka topic receives data in an incorrect format or schema.

    • Solution: Confirm the topic’s expected data format such as JSON, Avro, or Protobuf. Use Observo’s transform capabilities to align data with the required schema. Check Observo console logs for parsing errors.

  • Backpressure Issues:

    • Issue: The pipeline stalls due to Kafka exerting backpressure.

    • Solution: Check the backpressure behavior setting (Block, Drop, or Queue). If using Persistent Queue, ensure sufficient disk space for queuing. Adjust batching settings (e.g., Max Events per Batch or Flush Period) to reduce load on the Kafka broker.

  • Pipeline Failures:

    • Issue: The pipeline fails to deliver data to the Kafka destination.

    • Solution: Review pipeline configuration for errors in source or transform settings. Check Observo logs for detailed error messages and ensure the Observo Site is operational.

Issue
Possible Cause
Resolution

No data forwarded

Incorrect broker address/port or network block

Verify bootstrap server configuration and network connectivity

Invalid payload format

Unsupported data format or schema mismatch

Use appropriate parser for JSON/Avro/Protobuf payloads

High CPU usage

Large batch sizes or frequent flushes

Adjust batching settings such as Max Events per Batch

Authentication errors

Incorrect credentials or keytab

Verify SASL, Kerberos, or SSL settings

TLS errors

Certificate or version mismatch

Check TLS settings and certificate files

Resources

For additional guidance and detailed information, refer to the following resources:

  • Kafka Documentation: Apache Kafka Documentation for in-depth Kafka setup and management.

  • Best Practices:

    • Use compression such as Gzip or Snappy to optimize data transfer and reduce bandwidth usage.

    • Configure TLS for secure communication with the Kafka cluster unless explicitly unsupported.

    • For high-volume environments, ensure multiple bootstrap servers are specified to distribute load and improve resilience.

Last updated

Was this helpful?