Kafka
The Observo AI Kafka source enables real-time ingestion of data streams from Apache Kafka topics into the Observo AI platform, supporting scalable and secure consumption of high-throughput event data like logs or telemetry for enhanced observability and analytics.
Purpose
The Observo AI Kafka source enables real-time ingestion of data streams from Apache Kafka topics into the Observo AI platform for observability and analytics. It supports scalable, secure consumption of high-throughput event data, such as logs or telemetry, for monitoring and insights. This integration allows organizations to process and analyze Kafka messages to enhance system visibility and operational intelligence.
Prerequisites
Before configuring the Kafka source in Observo AI, ensure the following requirements are met to facilitate seamless data ingestion:
Observo AI Platform Setup:
The Observo Site must be installed and available.
Validate data formats, such as JSON, Avro, or text, from Kafka messages.
Kafka Cluster Access:
Access to an Apache Kafka cluster (version 0.10 or later) with at least one topic containing data (Apache Kafka Documentation).
Obtain the Kafka broker addresses (e.g., broker1:9092,broker2:9092) and ensure they are reachable from Observo AI.
Authentication and Security:
Configure one of the following authentication methods, if required by the Kafka cluster:
None: For open clusters with no authentication.
SASL: Supported mechanisms include PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, or OAUTHBEARER. Provide username, password, or Kerberos details as needed.
SSL: Use SSL certificates for encryption and/or authentication. Prepare client certificates, keys, and CA certificates if required.
Ensure the Kafka user or credentials have permissions to read from the specified topic and consumer group (Kafka Security).
Network and Connectivity:
Ensure Observo AI can communicate with Kafka brokers over the specified ports (default: 9092 for plaintext, 9093 for SSL).
If using SSL or SASL, verify that firewall rules allow traffic to the brokers and that any proxies support Kafka’s binary protocol (no HTTP proxies).
Observo AI Platform
Must support Kafka source
Verify data format compatibility
Kafka Cluster
Active cluster with topic
Version 0.10 or later
Authentication
None, SASL, or SSL
Prepare credentials or certificates
Permissions
Read access to topic and group
Configure Kafka ACLs if needed
Network
Connectivity to brokers
Allow ports 9092/9093, no HTTP proxies
How it works
Each Observo Dataplane pod uses this configuration defined in the Source to establish connections to the Kafka cluster. Observo relies on librdkafka in order to reliably communicate with Kafka.
Each Dataplane pod typically runs one or more Kafka consumers that join a shared consumer group. When a new pod starts or an existing one stops, the Kafka group coordinator rebalances the partitions to ensure every partition has exactly one consumer.
If there are more partitions than pods, some pods may consume multiple partitions.
If there are more pods than partitions, some pods will remain idle.
As more pods come up, they register with the Kafka cluster as consumers within the same group. The rebalance protocol ensures that partitions are redistributed.
Kafka partitions serve as the unit of load balancing. The number of partitions in a topic determines the maximum level of parallelism.
Each Dataplane pod commits offsets to Kafka (or an external storage) to track progress. This ensures:
Messages are not processed multiple times during pod crashes.
Messages are not lost if a pod terminates unexpectedly.
Connecting to Azure Event Hub
Observo Kafka Source can be used in order to consume events from Azure Event Hub. Please take note of the following configuration requirements in order to read from Azure Event Hub using the Kafka Source:
• Bootstrap Servers - <namespace name>.servicebus.windows.net:9093
• Group ID - The consumer group. Note that if the default group ($Default) is used it must be specified as $$Default to escape the $ used for environment variables.
• Topics - The event hub name.
• SASL Enabled - true.
• SASL Mechanism - PLAIN.
• SASL Username - $$ConnectionString (note the double $$).
• SASL Password - Set to the connection string. See here.
• TLS Enabled - true.
• TLS CA File - The certificate authority file.
• TLS Verify Certificate - Set to true.Integration
The Integration section outlines default configurations. To configure Kafka as a Source in Observo AI, follow these steps:
Log in to Observo AI:
Navigate to Sources Tab
Click on “Add Sources” button and select “Create New”
Choose “Kafka” from the list of available destinations to begin configuration.
General Settings:
Name: Add a unique identifier such as kafka-source-1
Description (Optional): Add description
Group ID: Consumer group identifier such as observo-consumer-group. Required for consuming messages.
Exampleobservo-consumer
Bootstrap Servers: A comma-separated list of Kafka bootstrap servers
Examplebroker1:9092, broker2:9092
10.14.22.123:9092,10.14.23.332:9092
Topics: Add the Kafka topics names to read events from such as my-topic. The Add button allows multiple Kafka topics names.
Examples^(key1 | key2)-.+
topic1
topic2
SASL Authentication (Optional):
SASL Enable (False): Enables SASL authentication. Only PLAIN and SCRAM mechanisms are supported when configuring SASL authentication via sasl.*. For other mechanisms, Librdkafka Options must be used directly to configure other librdkafka-specific values i.e. sasl.kerberos.* and so on. NOT supported on Windows.
SASL Mechanism: Enter the authentication mechanism.
ExamplesSCRAM-SHA-256
SCRAM-SHA-512
SASL Password: Provide the SASL password or select a stored secret.
Examplepassword
SASL Username: Provide the SASL username.
Exampleusername
Decoding (Optional):
Decoding Codec: Specifies the decoding mechanism for event. Configures how events are decoded from raw bytes. Default: Bytes
OptionsDescriptionBytes
Uses the raw bytes as-is. This is the default configuration
Protobuf
This decoder interprets raw bytes using the protobuf serialization format. To use this feature, you must upload a Protobuf descriptor file (
.desc) as a file of typeDATA. This descriptor describes the structure of your.protomessage definitions.JSON
JSON-specific decoding options.
Syslog
Syslog-specific decoding options.
Protobuf Descriptor File: The path to the protobuf descriptor set file. This file is the output of the below protoc command. Sample generation of Descriptor File:
protoc \ -I ./proto_def_dir/ \ --descriptor_set_out=./proto_def_dir/proto1.desc \ --include_imports \ ./proto_def_dir/top_level_message.proto # ./proto_def_dir - Path containing all the .proto files # ./proto_def_dir/proto1.desc - Generated output binary #./proto_def_dir/top_level_message.proto - Main .proto file (entry point)Example/my/path/message.proto
Protobuf Message Type: Specify the fully qualified message type name for Protobuf serialization.
Examplepackage.Message
JSON Lossy (True): Determines whether or not to replace invalid UTF-8 sequences instead of failing. When true, invalid UTF-8 sequences are replaced with the
U+FFFD REPLACEMENT CHARACTER.Syslog Lossy (True): Determines whether or not to replace invalid UTF-8 sequences instead of failing. When true, invalid UTF-8 sequences are replaced with the
U+FFFD REPLACEMENT CHARACTER.
Framing (Optional):
Framing Delimiter (Empty): The character that delimits byte sequences.
Framing Max Length (None): The maximum length of the byte buffer. This length does not include the trailing delimiter. By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases. If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This will ensure that processing is not truly unbounded.
Framing Method (Empty): The framing method.
OptionsDescriptionRaw Bytes
Unstructured binary data without any formatting or metadata applied.
GELF
Graylog Extended Log Format with structured fields and compression.
JSON
Lightweight text format using key-value pairs for structured data.
Syslog (RFC 3164 or RFC 5424)
Modern syslog with structured data, improved timestamp, and extensibility.
Framing Newline Delimited Max Length: (None)
Framing Octet Counting Max Length: (None)
TLS Configurations (Optional):
TLS CA: The CA certificate provided as an inline string in PEM format.
Example/etc/certs/ca.crt
TLS Crt File: The certificate as a string in PEM format.
Example/etc/certs/tls.crt
TLS Enable (False): Whether to require TLS for incoming/outgoing connections. When enabled for incoming connections, TLS certificate is also required. See TLS CRT File for more information.
TLS Verify Certificate (False): Enables certificate verification. Certificates must be valid in terms of not being expired, and being issued by a trusted issuer. This verification operates in a hierarchical manner, checking validity of the certificate, the issuer of that certificate and so on until reaching a root certificate. Relevant for both incoming and outgoing connections. Do NOT set this to false unless you understand the risks of not verifying the validity of certificates.
TLS Key: Absolute path to a private key file used to identify this server. The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.
Example/etc/certs/tls.key
TLS Verify Hostname (False): Enables hostname verification. Hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension. Only relevant for outgoing connections. NOT recommended to set this to false unless you understand the risks.
Advanced Settings (Optional):
Headers Key: Overrides the name of the log field used to add the headers to each event. Default: headers
Exampleheaders
Key Field: Overrides the name of the log field used to add the message key to each event Default: message_key
Examplemessage_key
Topic Key (Empty): Overrides the name of the log field used to add the topic to each event.
Exampletopic
Partition Key (Empty): Overrides the name of the log field used to add the partition to each event.
Examplepartition
Offset Key: Overrides the name of the log field used to add the offset to each event. Default: offset
Exampleoffset
Commit Interval in Milliseconds: (5000)
Examples5000
10000
Fetch Wait Max in Milliseconds: (100)
Examples50
100
Session Timeout in Milliseconds: (10000)
Examples5000
10000
Socket Timeout in Milliseconds.Defaults to 60 seconds: (60000)
Example30000
60000
Librdkafka Options (Click Add button): A librdkafka configuration options key-value pair.
Auto Offset Reset: If offsets for consumer groups do not exist, set them using this strategy. See the librdkafka documentation for the auto.offset.reset option for further clarification. Default: largest
Examplessmallest
earliest
beginning
largest
latest
end
error
Parser Config:
Enable Source Log parser: (False)
Toggle Enable Source Log parser Switch to enable
Select appropriate Parser from the Source Log Parser dropdown
Add additional Parsers as needed
Pattern Extractor:
See Pattern Extractor for details.
Archival Destination:
Toggle Enable Archival on Source Switch to enable
Under Archival Destination, select from the list of Archival Destinations (Required)
Save and Test Configuration:
Save the configuration settings.
Verify that data is being ingested from the Kafka topic.
Example Scenarios
DataStream Inc., a fictitious Technology enterprise, wants to integrate their Apache Kafka cluster, which streams JSON-formatted log data, into the Observo platform for real-time monitoring and analytics. They use a Kafka topic named "app-logs" hosted on a secure Kafka cluster with brokers at "kafka1.datastream.com:9093, kafka2.datastream.com:9093". The configuration employs SASL SCRAM-SHA-512 for authentication and TLS for encryption, with certificate verification to ensure secure communication. The consumer group is set to manage message offsets, and default Advanced Settings are used for simplicity.
Standard Kafka Source Setup
Here is a standard Kafka Source configuration example. Only the required sections and their associated field updates are displayed in the table below:
General Settings
Name
kafka-datastream-logs
Unique identifier for the Kafka source, indicating DataStream’s log ingestion.
Description
Ingest JSON logs from DataStream’s Kafka topic for real-time monitoring
Optional, provides context for the source’s purpose.
Group ID
observo-datastream-group
Consumer group identifier for managing message offsets.
Bootstrap Servers
kafka1.datastream.com:9093,kafka2.datastream.com:9093
Comma-separated list of Kafka broker addresses, using port 9093 for SSL.
Topics
app-logs
Kafka topic name to read events from; additional topics can be added if needed.
SASL Authentication
SASL Enable
True
Toggle enabled to use SASL authentication, as required.
SASL Mechanism
SCRAM-SHA-512
Selected authentication mechanism for secure credential exchange.
SASL Username
datastream_user
Username for SASL authentication, configured in the Kafka cluster.
SASL Password
D@t@Str3am2025!
Password for SASL authentication, securely stored or selected from a secret.
Decoding
Decoding Codec
Bytes
Default decoding method selected from the list.
TLS Configuration
TLS Enabled
True
Enabled to use SSL for encryption, as required for secure communication.
TLS CA File
/etc/certs/ca.crt
Absolute path to the CA certificate file for verifying broker certificates.
TLS Crt File
/etc/certs/client.crt
Absolute path to the client certificate file to identify the Observo AI consumer.
TLS Key
/etc/certs/client.key
Absolute path to the client private key file for the certificate.
TLS Verify Hostname
True
Enabled to ensure the broker hostname matches the certificate hostname.
Framing
Framing Delimiter
Empty
Default setting, as no specific delimiter is required.
Framing Max Length
None
Default setting, no maximum byte buffer length specified.
Framing Method
Empty
Default framing method selected from the list.
Framing Newline Delimited Max Length
None
Default setting, not specified in this scenario.
Framing Octet Counting Max Length
None
Default setting, not specified in this scenario.
Advanced Settings
Headers Key
headers
Default name for the log field storing message headers.
Key Field
message_key
Default name for the log field storing the message key.
Topic Key
Empty
Default, no override for the topic field.
Partition Key
Empty
Default, no override for the partition field.
Offset Key
offset
Default name for the log field storing the message offset.
Commit Interval in Milliseconds
5000
Default interval for committing offsets.
Fetch Wait Max in Milliseconds
100
Default maximum wait time for fetching messages.
Session Timeout in Milliseconds
10000
Default session timeout for the consumer group.
Socket Timeout in Milliseconds
60000
Default socket timeout, set to 60 seconds.
Librdkafka Options
None
Default, no additional librdkafka options specified.
Auto Offset Reset
largest
Default strategy for setting offsets if none exist (latest messages).
Parser Config
Enable Source Log Parser
True
Enabled to parse JSON data from the log_data column.
Source Log Parser
JSON Parser
Selected from the dropdown to match the JSON format of the log_data column.
Additional Parsers
(None)
Not needed unless specific transformations are required.
Test Configuration:
Click “Save” to store the configuration settings in Observo.
Verify data ingestion from the “app-logs” topic by publishing sample JSON messages to the topic and monitoring the Analytics tab in the Observo pipeline for event counts and throughput.
Notes:
Authentication: The SASL SCRAM-SHA-512 credentials (username: datastream_user, password: D@t@Str3am2025!) are fictional but follow secure practices. DataStream Inc. must ensure these match the Kafka cluster’s configuration.
TLS Configuration: The certificate and key file paths are examples assuming a secure directory on the Observo AI server. DataStream Inc. must ensure these files are accessible and valid.
Network: The bootstrap servers use port 9093 for SSL, and firewall rules must allow traffic from Observo to the brokers. No HTTP proxies should be used, as Kafka uses a binary protocol.
Troubleshooting: If issues occur (e.g., “Not authorized” or “No data ingested”), verify the topic name, Group ID, SASL credentials, TLS settings, and network connectivity using tools like Kafka Manager, as outlined in the Troubleshooting section.
Resources: For further details, refer to the Resources section, including Apache Kafka Documentation for configuration and Kafka Security for SASL/SSL setup
Troubleshooting
If issues arise with the Kafka source in Observo AI, use the following steps to diagnose and resolve them:
Verify Configuration Settings:
Ensure Topics, Bootstrap Servers and Group ID are correctly configured and match the Kafka cluster setup.
Confirm that the topic exists and contains data (Kafka Topic Management).
Check Authentication:
For SASL, verify that the username, password, and mechanism (e.g., PLAIN, SCRAM-SHA-256) are correct.
For SSL, ensure client certificates, keys, and CA certificates are valid and correctly configured.
Check Kafka ACLs to confirm the user has read permissions for the topic and consumer group (Kafka Security).
Monitor Logs:
Check Observo AI’s Logs tab for errors or warnings related to data ingestion.
Use Kafka’s monitoring tools (e.g., Kafka Manager or Confluent Control Center) to verify topic activity and consumer group status (Kafka Monitoring).
Validate Connectivity:
Ensure Observo AI can reach Kafka brokers on the specified ports (9092 for plaintext, 9093 for SSL).
Confirm that no HTTP proxies are used, as Kafka uses a binary protocol.
Check firewall rules or VPC configurations to allow traffic to the brokers.
Common Error Messages:
“Not authorized to access topics”: Indicates insufficient permissions. Verify Kafka ACLs or SAS SSL credentials.
“Unknown topic or partition”: Check the Topic name and ensure it exists in the Kafka cluster.
“No data ingested”: Confirm messages are being published to the topic and the Group ID is correct. Check From Beginning setting for offset issues.
Test Data Flow:
Verify data ingestion within the targeted Observo AI pipeline.
Use the Analytics tab in the targeted Observo AI pipeline to monitor data volume and ensure expected throughput
Data not ingested
Incorrect Topic or Group ID
Verify topic exists and Group ID is correct
“Not authorized”
Insufficient permissions
Check Kafka ACLs or credentials
“Unknown topic”
Topic does not exist
Confirm topic name and create if needed
Connectivity issues
Firewall or proxy issues
Allow ports 9092/9093, remove HTTP proxies
No data in topic
No messages published
Verify messages are sent to topic
Resources
For additional guidance and detailed information, refer to the following resources:
Last updated
Was this helpful?

