Kafka

The Observo AI Kafka source enables real-time ingestion of data streams from Apache Kafka topics into the Observo AI platform, supporting scalable and secure consumption of high-throughput event data like logs or telemetry for enhanced observability and analytics.

Purpose

The Observo AI Kafka source enables real-time ingestion of data streams from Apache Kafka topics into the Observo AI platform for observability and analytics. It supports scalable, secure consumption of high-throughput event data, such as logs or telemetry, for monitoring and insights. This integration allows organizations to process and analyze Kafka messages to enhance system visibility and operational intelligence.

Prerequisites

Before configuring the Kafka source in Observo AI, ensure the following requirements are met to facilitate seamless data ingestion:

  • Observo AI Platform Setup:

    • The Observo Site must be installed and available.

    • Validate data formats, such as JSON, Avro, or text, from Kafka messages.

  • Kafka Cluster Access:

    • Access to an Apache Kafka cluster (version 0.10 or later) with at least one topic containing data (Apache Kafka Documentation).

    • Obtain the Kafka broker addresses (e.g., broker1:9092,broker2:9092) and ensure they are reachable from Observo AI.

  • Authentication and Security:

    • Configure one of the following authentication methods, if required by the Kafka cluster:

      • None: For open clusters with no authentication.

      • SASL: Supported mechanisms include PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, or OAUTHBEARER. Provide username, password, or Kerberos details as needed.

      • SSL: Use SSL certificates for encryption and/or authentication. Prepare client certificates, keys, and CA certificates if required.

    • Ensure the Kafka user or credentials have permissions to read from the specified topic and consumer group (Kafka Security).

  • Network and Connectivity:

    • Ensure Observo AI can communicate with Kafka brokers over the specified ports (default: 9092 for plaintext, 9093 for SSL).

    • If using SSL or SASL, verify that firewall rules allow traffic to the brokers and that any proxies support Kafka’s binary protocol (no HTTP proxies).

Prerequisite
Description
Notes

Observo AI Platform

Must support Kafka source

Verify data format compatibility

Kafka Cluster

Active cluster with topic

Version 0.10 or later

Authentication

None, SASL, or SSL

Prepare credentials or certificates

Permissions

Read access to topic and group

Configure Kafka ACLs if needed

Network

Connectivity to brokers

Allow ports 9092/9093, no HTTP proxies

How it works

Each Observo Dataplane pod uses this configuration defined in the Source to establish connections to the Kafka cluster. Observo relies on librdkafka in order to reliably communicate with Kafka.

Each Dataplane pod typically runs one or more Kafka consumers that join a shared consumer group. When a new pod starts or an existing one stops, the Kafka group coordinator rebalances the partitions to ensure every partition has exactly one consumer.

  • If there are more partitions than pods, some pods may consume multiple partitions.

  • If there are more pods than partitions, some pods will remain idle.

As more pods come up, they register with the Kafka cluster as consumers within the same group. The rebalance protocol ensures that partitions are redistributed.

Kafka partitions serve as the unit of load balancing. The number of partitions in a topic determines the maximum level of parallelism.

Each Dataplane pod commits offsets to Kafka (or an external storage) to track progress. This ensures:

  • Messages are not processed multiple times during pod crashes.

  • Messages are not lost if a pod terminates unexpectedly.

Connecting to Azure Event Hub

Observo Kafka Source can be used in order to consume events from Azure Event Hub. Please take note of the following configuration requirements in order to read from Azure Event Hub using the Kafka Source:

• Bootstrap Servers - <namespace name>.servicebus.windows.net:9093
• Group ID - The consumer group. Note that if the default group ($Default) is used it must be specified as $$Default to escape the $ used for environment variables.
• Topics - The event hub name.
• SASL Enabled - true.
• SASL Mechanism - PLAIN.
• SASL Username - $$ConnectionString (note the double $$).
• SASL Password - Set to the connection string. See here.
• TLS Enabled - true.
• TLS CA File - The certificate authority file.
• TLS Verify Certificate - Set to true.

Integration

The Integration section outlines default configurations. To configure Kafka as a Source in Observo AI, follow these steps:

  1. Log in to Observo AI:

    • Navigate to Sources Tab

    • Click on “Add Sources” button and select “Create New

    • Choose “Kafka” from the list of available destinations to begin configuration.

  2. General Settings:

    • Name: Add a unique identifier such as kafka-source-1

    • Description (Optional): Add description

    • Group ID: Consumer group identifier such as observo-consumer-group. Required for consuming messages.

      Example

      observo-consumer

    • Bootstrap Servers: A comma-separated list of Kafka bootstrap servers

      Example

      broker1:9092, broker2:9092

      10.14.22.123:9092,10.14.23.332:9092

    • Topics: Add the Kafka topics names to read events from such as my-topic. The Add button allows multiple Kafka topics names.

      Examples

      ^(key1 | key2)-.+

      topic1

      topic2

  3. SASL Authentication (Optional):

    • SASL Enable (False): Enables SASL authentication. Only PLAIN and SCRAM mechanisms are supported when configuring SASL authentication via sasl.*. For other mechanisms, Librdkafka Options must be used directly to configure other librdkafka-specific values i.e. sasl.kerberos.* and so on. NOT supported on Windows.

    • SASL Mechanism: Enter the authentication mechanism.

      Examples

      SCRAM-SHA-256

      SCRAM-SHA-512

    • SASL Password: Provide the SASL password or select a stored secret.

      Example

      password

    • SASL Username: Provide the SASL username.

      Example

      username

  4. Decoding (Optional):

    • Decoding Codec: Specifies the decoding mechanism for event. Configures how events are decoded from raw bytes. Default: Bytes

      Options
      Description

      Bytes

      Uses the raw bytes as-is. This is the default configuration

      Protobuf

      This decoder interprets raw bytes using the protobuf serialization format. To use this feature, you must upload a Protobuf descriptor file (.desc) as a file of type DATA. This descriptor describes the structure of your .proto message definitions.

      JSON

      JSON-specific decoding options.

      Syslog

      Syslog-specific decoding options.

    • Protobuf Descriptor File: The path to the protobuf descriptor set file. This file is the output of the below protoc command. Sample generation of Descriptor File:

       protoc \
       -I ./proto_def_dir/ \
       --descriptor_set_out=./proto_def_dir/proto1.desc \
       --include_imports \
       ./proto_def_dir/top_level_message.proto
      
       # ./proto_def_dir - Path containing all the .proto files
       # ./proto_def_dir/proto1.desc - Generated output binary
       #./proto_def_dir/top_level_message.proto - Main .proto file (entry point)
      Example

      /my/path/message.proto

    • Protobuf Message Type: Specify the fully qualified message type name for Protobuf serialization.

      Example

      package.Message

    • JSON Lossy (True): Determines whether or not to replace invalid UTF-8 sequences instead of failing. When true, invalid UTF-8 sequences are replaced with the U+FFFD REPLACEMENT CHARACTER.

    • Syslog Lossy (True): Determines whether or not to replace invalid UTF-8 sequences instead of failing. When true, invalid UTF-8 sequences are replaced with the U+FFFD REPLACEMENT CHARACTER.

  5. Framing (Optional):

    • Framing Delimiter (Empty): The character that delimits byte sequences.

    • Framing Max Length (None): The maximum length of the byte buffer. This length does not include the trailing delimiter. By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases. If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This will ensure that processing is not truly unbounded.

    • Framing Method (Empty): The framing method.

      Options
      Description

      Raw Bytes

      Unstructured binary data without any formatting or metadata applied.

      GELF

      Graylog Extended Log Format with structured fields and compression.

      JSON

      Lightweight text format using key-value pairs for structured data.

      Syslog (RFC 3164 or RFC 5424)

      Modern syslog with structured data, improved timestamp, and extensibility.

    • Framing Newline Delimited Max Length: (None)

    • Framing Octet Counting Max Length: (None)

  6. TLS Configurations (Optional):

    • TLS CA: The CA certificate provided as an inline string in PEM format.

      Example

      /etc/certs/ca.crt

    • TLS Crt File: The certificate as a string in PEM format.

      Example

      /etc/certs/tls.crt

    • TLS Enable (False): Whether to require TLS for incoming/outgoing connections. When enabled for incoming connections, TLS certificate is also required. See TLS CRT File for more information.

    • TLS Verify Certificate (False): Enables certificate verification. Certificates must be valid in terms of not being expired, and being issued by a trusted issuer. This verification operates in a hierarchical manner, checking validity of the certificate, the issuer of that certificate and so on until reaching a root certificate. Relevant for both incoming and outgoing connections. Do NOT set this to false unless you understand the risks of not verifying the validity of certificates.

    • TLS Key: Absolute path to a private key file used to identify this server. The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.

      Example

      /etc/certs/tls.key

    • TLS Verify Hostname (False): Enables hostname verification. Hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension. Only relevant for outgoing connections. NOT recommended to set this to false unless you understand the risks.

  7. Advanced Settings (Optional):

    • Headers Key: Overrides the name of the log field used to add the headers to each event. Default: headers

      Example

      headers

    • Key Field: Overrides the name of the log field used to add the message key to each event Default: message_key

      Example

      message_key

    • Topic Key (Empty): Overrides the name of the log field used to add the topic to each event.

      Example

      topic

    • Partition Key (Empty): Overrides the name of the log field used to add the partition to each event.

      Example

      partition

    • Offset Key: Overrides the name of the log field used to add the offset to each event. Default: offset

      Example

      offset

    • Commit Interval in Milliseconds: (5000)

      Examples

      5000

      10000

    • Fetch Wait Max in Milliseconds: (100)

      Examples

      50

      100

    • Session Timeout in Milliseconds: (10000)

      Examples

      5000

      10000

    • Socket Timeout in Milliseconds.Defaults to 60 seconds: (60000)

      Example

      30000

      60000

    • Librdkafka Options (Click Add button): A librdkafka configuration options key-value pair.

    • Auto Offset Reset: If offsets for consumer groups do not exist, set them using this strategy. See the librdkafka documentation for the auto.offset.reset option for further clarification. Default: largest

      Examples

      smallest

      earliest

      beginning

      largest

      latest

      end

      error

  8. Parser Config:

    • Enable Source Log parser: (False)

    • Toggle Enable Source Log parser Switch to enable

      • Select appropriate Parser from the Source Log Parser dropdown

      • Add additional Parsers as needed

  9. Pattern Extractor:

  10. Archival Destination:

    • Toggle Enable Archival on Source Switch to enable

    • Under Archival Destination, select from the list of Archival Destinations (Required)

  11. Save and Test Configuration:

    • Save the configuration settings.

    • Verify that data is being ingested from the Kafka topic.

Example Scenarios

DataStream Inc., a fictitious Technology enterprise, wants to integrate their Apache Kafka cluster, which streams JSON-formatted log data, into the Observo platform for real-time monitoring and analytics. They use a Kafka topic named "app-logs" hosted on a secure Kafka cluster with brokers at "kafka1.datastream.com:9093, kafka2.datastream.com:9093". The configuration employs SASL SCRAM-SHA-512 for authentication and TLS for encryption, with certificate verification to ensure secure communication. The consumer group is set to manage message offsets, and default Advanced Settings are used for simplicity.

Standard Kafka Source Setup

Here is a standard Kafka Source configuration example. Only the required sections and their associated field updates are displayed in the table below:

General Settings

Field
Value
Notes

Name

kafka-datastream-logs

Unique identifier for the Kafka source, indicating DataStream’s log ingestion.

Description

Ingest JSON logs from DataStream’s Kafka topic for real-time monitoring

Optional, provides context for the source’s purpose.

Group ID

observo-datastream-group

Consumer group identifier for managing message offsets.

Bootstrap Servers

kafka1.datastream.com:9093,kafka2.datastream.com:9093

Comma-separated list of Kafka broker addresses, using port 9093 for SSL.

Topics

app-logs

Kafka topic name to read events from; additional topics can be added if needed.

SASL Authentication

Field
Value
Notes

SASL Enable

True

Toggle enabled to use SASL authentication, as required.

SASL Mechanism

SCRAM-SHA-512

Selected authentication mechanism for secure credential exchange.

SASL Username

datastream_user

Username for SASL authentication, configured in the Kafka cluster.

SASL Password

D@t@Str3am2025!

Password for SASL authentication, securely stored or selected from a secret.

Decoding

Field
Value
Notes

Decoding Codec

Bytes

Default decoding method selected from the list.

TLS Configuration

Field
Value
Notes

TLS Enabled

True

Enabled to use SSL for encryption, as required for secure communication.

TLS CA File

/etc/certs/ca.crt

Absolute path to the CA certificate file for verifying broker certificates.

TLS Crt File

/etc/certs/client.crt

Absolute path to the client certificate file to identify the Observo AI consumer.

TLS Key

/etc/certs/client.key

Absolute path to the client private key file for the certificate.

TLS Verify Hostname

True

Enabled to ensure the broker hostname matches the certificate hostname.

Framing

Field
Value
Notes

Framing Delimiter

Empty

Default setting, as no specific delimiter is required.

Framing Max Length

None

Default setting, no maximum byte buffer length specified.

Framing Method

Empty

Default framing method selected from the list.

Framing Newline Delimited Max Length

None

Default setting, not specified in this scenario.

Framing Octet Counting Max Length

None

Default setting, not specified in this scenario.

Advanced Settings

Field
Value
Notes

Headers Key

headers

Default name for the log field storing message headers.

Key Field

message_key

Default name for the log field storing the message key.

Topic Key

Empty

Default, no override for the topic field.

Partition Key

Empty

Default, no override for the partition field.

Offset Key

offset

Default name for the log field storing the message offset.

Commit Interval in Milliseconds

5000

Default interval for committing offsets.

Fetch Wait Max in Milliseconds

100

Default maximum wait time for fetching messages.

Session Timeout in Milliseconds

10000

Default session timeout for the consumer group.

Socket Timeout in Milliseconds

60000

Default socket timeout, set to 60 seconds.

Librdkafka Options

None

Default, no additional librdkafka options specified.

Auto Offset Reset

largest

Default strategy for setting offsets if none exist (latest messages).

Parser Config

Field
Value
Notes

Enable Source Log Parser

True

Enabled to parse JSON data from the log_data column.

Source Log Parser

JSON Parser

Selected from the dropdown to match the JSON format of the log_data column.

Additional Parsers

(None)

Not needed unless specific transformations are required.

Test Configuration:

  • Click “Save” to store the configuration settings in Observo.

  • Verify data ingestion from the “app-logs” topic by publishing sample JSON messages to the topic and monitoring the Analytics tab in the Observo pipeline for event counts and throughput.

Notes:

  • Authentication: The SASL SCRAM-SHA-512 credentials (username: datastream_user, password: D@t@Str3am2025!) are fictional but follow secure practices. DataStream Inc. must ensure these match the Kafka cluster’s configuration.

  • TLS Configuration: The certificate and key file paths are examples assuming a secure directory on the Observo AI server. DataStream Inc. must ensure these files are accessible and valid.

  • Network: The bootstrap servers use port 9093 for SSL, and firewall rules must allow traffic from Observo to the brokers. No HTTP proxies should be used, as Kafka uses a binary protocol.

  • Troubleshooting: If issues occur (e.g., “Not authorized” or “No data ingested”), verify the topic name, Group ID, SASL credentials, TLS settings, and network connectivity using tools like Kafka Manager, as outlined in the Troubleshooting section.

  • Resources: For further details, refer to the Resources section, including Apache Kafka Documentation for configuration and Kafka Security for SASL/SSL setup

Troubleshooting

If issues arise with the Kafka source in Observo AI, use the following steps to diagnose and resolve them:

  • Verify Configuration Settings:

    • Ensure Topics, Bootstrap Servers and Group ID are correctly configured and match the Kafka cluster setup.

    • Confirm that the topic exists and contains data (Kafka Topic Management).

  • Check Authentication:

    • For SASL, verify that the username, password, and mechanism (e.g., PLAIN, SCRAM-SHA-256) are correct.

    • For SSL, ensure client certificates, keys, and CA certificates are valid and correctly configured.

    • Check Kafka ACLs to confirm the user has read permissions for the topic and consumer group (Kafka Security).

  • Monitor Logs:

    • Check Observo AI’s Logs tab for errors or warnings related to data ingestion.

    • Use Kafka’s monitoring tools (e.g., Kafka Manager or Confluent Control Center) to verify topic activity and consumer group status (Kafka Monitoring).

  • Validate Connectivity:

    • Ensure Observo AI can reach Kafka brokers on the specified ports (9092 for plaintext, 9093 for SSL).

    • Confirm that no HTTP proxies are used, as Kafka uses a binary protocol.

    • Check firewall rules or VPC configurations to allow traffic to the brokers.

  • Common Error Messages:

    • “Not authorized to access topics”: Indicates insufficient permissions. Verify Kafka ACLs or SAS SSL credentials.

    • “Unknown topic or partition”: Check the Topic name and ensure it exists in the Kafka cluster.

    • “No data ingested”: Confirm messages are being published to the topic and the Group ID is correct. Check From Beginning setting for offset issues.

  • Test Data Flow:

    • Verify data ingestion within the targeted Observo AI pipeline.

    • Use the Analytics tab in the targeted Observo AI pipeline to monitor data volume and ensure expected throughput

Issue
Possible Cause
Resolution

Data not ingested

Incorrect Topic or Group ID

Verify topic exists and Group ID is correct

“Not authorized”

Insufficient permissions

Check Kafka ACLs or credentials

“Unknown topic”

Topic does not exist

Confirm topic name and create if needed

Connectivity issues

Firewall or proxy issues

Allow ports 9092/9093, remove HTTP proxies

No data in topic

No messages published

Verify messages are sent to topic

Resources

For additional guidance and detailed information, refer to the following resources:

Last updated

Was this helpful?