AWS S3
Integrate S3 as a data source in the pipeline. Configure the S3 bucket to send change notifications to an SQS queue. The Observo dataplane will then read messages from the SQS queue and fetch the corresponding objects from the S3 bucket.
Purpose
The Observo AI AWS S3 Source enables users to pull data from Amazon S3 buckets into Observo AI for analysis and processing, making it easier to manage large datasets. It likely supports formats like JSON, CSV, and Parquet, and is designed for handling security logs and CloudTrail logs stored in S3. This integration helps organizations streamline their data pipelines, reduce storage costs, and improve efficiency in security and observability operations.
How it works
The AWS S3 source works by polling for bucket notifications published by AWS S3 to an AWS SQS queue. The messages published to the SQS queue indicate when a new object has been created in the S3 bucket. When a new notification is published to the SQS queue, the Observo S3 Source pulls the notification and processes the object that the notification refers to. Once the object has been successfully processed through the Observo pipeline, the notification is deleted from the SQS queue. For each object, the default behavior of the Observo Source is to treat each new line separated entry in the object as a separate event in Observo.
Observo AI processed the following types of bucket notifications:
s3:ObjectCreated:*Setup instructions for configuring S3 bucket notifications can be found here.
Authentication Requirements
Permissions Required
s3:GetObject
sqs:ReceiveMessage
sqs:DeleteMessageAWS Authentication Mechanism
Observo recommends using instance profiles or roles in order to authenticate with AWS. If these are not possible, you can create an AWS access key for any user in your AWS account and use the Access Key ID and Secret to authenticate with AWS. Instructions on creating the access keys can be found here.Observo evaluated AWS credentials in the following order:
The Access Key and Secret Access Key options.
The AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables in the Observo Dataplane deployment.
Web Identity Token credentials from the environment or container (including EKS). These credentials will automatically refresh when expired.
ECS credentials (IAM roles for tasks). These credentials will automatically refresh when expired.
The IAM instance profile (only works if running on an EC2 instance with an instance profile/role). Requires IMDSv2 to be enabled. For EKS, you may need to increase the metadata token response hop limit to 2. These credentials will automatically refresh when expired.
Role Definition Example
When using the Assume Role authentication method, you'll need to create an IAM role in your AWS account with the appropriate permissions and trust relationship. Below are examples of how to define this role:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:GetObject"],
"Resource": "arn:aws:s3:::your-bucket-name/*"
},
{
"Effect": "Allow",
"Action": ["sqs:ReceiveMessage", "sqs:DeleteMessage"],
"Resource": "arn:aws:sqs:your-region:your-account-id:your-queue-name"
}
]
}Replace the following placeholders:
your-bucket-name: The name of your S3 bucket
your-region: The AWS region where your SQS queue is located
your-account-id: Your AWS account ID
your-queue-name: Your AWS account ID
Trust Relationship Policy
The role must have a trust relationship that allows Observo's service to assume the role you're creating in your account. Use the following trust policy:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "ARN provided by Observo"
},
"Action": "sts:AssumeRole"
}
]
}Prerequisites
Before configuring the AWS S3 source in Observo AI, ensure the following requirements are met to facilitate seamless data ingestion:
Observo AI Platform Setup:
The Observo AI platform must be installed and operational, with support for AWS S3 as a data source.
If processing Parquet files (.parquet, .parq, .pqt), verify that the platform supports this format, potentially requiring specific configurations.
AWS Account and Permissions:
An active AWS account with access to the target S3 buckets and SQS queues is required.
The S3 bucket must be configured to send s3:ObjectCreated:* events to an SQS queue, either directly or via an SNS topic (Amazon S3 Event Notifications).
Required IAM permissions:
For S3: s3:GetObject and s3:ListBucket.
For SQS: sqs:ReceiveMessage, sqs:DeleteMessage, sqs:ChangeMessageVisibility, sqs:GetQueueAttributes, and sqs:GetQueueUrl (Amazon SQS Permissions).
Authentication:
Prepare one of the following authentication methods:
Auto Authentication: Use IAM roles, shared credentials, environment variables, or a JSON file.
Manual Authentication: Provide an AWS access key and secret key.
Secret Authentication: Use a stored secret within Observo AI’s secure storage.
Network and Connectivity:
Ensure Observo AI can communicate with AWS services. If using VPC endpoints for S3 or SQS, verify their configuration.
Check for proxy settings or firewall rules that may affect connectivity to AWS endpoints.
Observo AI Platform
Must be installed and support S3 sources
Verify Parquet support if needed
AWS Account
Active account with S3/SQS access
Configure S3 events to SQS
IAM Permissions
Required for S3 and SQS access
See permissions list above
Authentication
Auto, Manual, or Secret
Prepare credentials accordingly
Network
Connectivity to AWS services
Check VPC endpoints and proxies
Integration
To configure AWS S3 as a source in Observo AI, follow these steps to set up and test the data flow:
Log in to Observo AI:
Navigate to Sources Tab
Click on “Add Source” button and select “Create New”
Choose “AWS S3” from the list of available sources to begin configuration.
General Settings:
Name: A unique identifier for the source such as s3-source-1.
Description(Optional): Description for the source
Region: The AWS region of the S3 bucket
Exampleus-east-1
Compression: Select the compression scheme used for decompressing objects retrieved from S3 such as Gzip. Default: None
Options (Select one)DescriptionAutomatically determine the compression schema
Detects compression format based on input data
Gzip
Widely used, DEFLATE-based, moderate compression speed
Uncompressed
No compression applied, raw original data preserved
ZSTD
Fast, modern compression with adjustable compression ratio
Authentication:
Access Key: Enter The AWS access key ID to use for assumed role.
ExampleAKIAIOSFODNN7EXAMPLE
Secret Access Key: Enter the AWS secret access key to use for assume role
ExamplewJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
Assume Role: Enter the IAM role to use to read events from SQS and objects from S3.
Examplearn:aws:iam::999999999999:role/dummy_role
Assume Role Region: Enter the region to be used for STS assume role.
Exampleus-east-1
Credentials File Path: Specify the path to credentials file (if you do not use access key & secret to authenticate).
Example/my/aws/credentials
Auth Imds Connect Timeout Seconds (Optional): Connect timeout for IMDS. Default: Empty
Auth Imds Max Attempts (Optional): Enter number of IMDS retries for fetching tokens and metadata. Default: Empty
Auth Imds Read Timeout Seconds (Optional): Read timeout for IMDS. Default: Empty
Auth Load Timeout Secs (Optional): Enter timeout for successfully loading any credentials, in seconds.
Example30
Auth Profile: (Optional): The credentials profile to use. Used to select AWS credentials from a provided credentials file. Default: Empty
Exampledevelop
SQS Configuration (Optional):
SQS Queue Url: The URL of the SQS queue to poll for bucket notification.
Examplehttps://sqs.us-east-2.amazonaws.com/999999999999/DummyQueue
SQS Visibility Timeout Secs: The visibility timeout to use for messages, in seconds. This controls how long a message is left unavailable after it is received. If a message is received, and takes longer than visibility_timeout_secs to process and delete the message from the queue, it is made available again for another consumer. This can happen if there is an issue between consuming a message and deleting it. Default: Empty.
SQS Client Concurrency: Number of concurrent tasks to create for polling the queue for messages. Defaults to the number of available CPUs on the system. Should not typically need to be changed, but it can sometimes be beneficial to raise this value when there is a high rate of messages being pushed into the queue and the objects being fetched are small. In these cases, system resources may not be fully utilized without fetching more messages per second, as the SQS message consumption rate affects the S3 object retrieval rate.
Example5
SQS Delete Message (True): Whether to delete the message once it is processed. It can be useful to set this to false for debugging or during the initial setup.
SQS Poll Secs: How long to wait while polling the queue for new messages, in seconds. Generally should not be changed unless instructed to do so, as if messages are available, they will always be consumed, regardless of the value of poll_secs.
SQS TLS Options CA File: Absolute path to an additional CA certificate file. The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.
Example/path/to/certificate_authority.crt
SQS TLS Options Crt File: Absolute path to a certificate file used to identify this server.The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as an inline string in PEM format. If this is set, and is not a PKCS#12 archive, key_file must also be set.
Example/path/to/host_certificate.crt
SQS TLS Options Key File: Absolute path to a private key file used to identify this server. The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.
Example/path/to/host_certificate.key
SQS TLS Options Key Pass: Passphrase used to unlock the encrypted key file. This has no effect unless key_file is set.
Examples${KEY_PASS_ENV_VAR}
PassWord1
Multiline Settings (Optional):
Multiline Condition Pattern: Regular expression pattern that is used to determine whether or not more lines should be read. This setting must be configured in conjunction with mode.
Example^[\s]+
\\$
^(INFO
;$
Multiline Mode: Specifies how log lines are grouped.
OptionsDescriptionInclude +1
Includes all lines matching the pattern and one additional line.
Include Match
Includes all lines matching the pattern. Useful for stack traces or continuation indicators
Stop Before
Groups all lines until a line matches the pattern, indicating the start of a new message
Stop After
Groups all lines up to and including the line that matches the pattern, which often marks the end of a message
This setting must be configured with condition_pattern.
Multiline Start Pattern: Regular expression pattern that is used to match the start of a new message.
Example^[\s]+
\\$
^(INFO
;$
Multiline Timeout Ms: The maximum amount of time to wait for the next additional line, in milliseconds. Once this timeout is reached, the buffered message is guaranteed to be flushed, even if incomplete.
Examples1000
600000
TLS Options (Optional):
TLS Options CA File (Empty): Absolute path to an additional CA certificate file. The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.
Example/path/to/certificate_authority.crt
TLS Options Crt File (Empty): Absolute path to a certificate file used to identify this server. The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as an inline string in PEM format. If this is set, and is not a PKCS#12 archive, key_file must also be set.
Example/path/to/host_certificate.crt
TLS Options Key File (Empty): Absolute path to a private key file used to identify this server. The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.
Example/path/to/host_certificate.key
TLS Options Key Passphrase (Empty): Passphrase to unlock the encrypted key file, if applicable.
Example${KEY_PASSWORD_ENV_VAR}
PassWord1
TLS Options Verify Hostname (False): Enables hostname verification. If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension. Only relevant for outgoing connections.Do NOT set this to false unless you understand the risks of not verifying the remote hostname.
Framing (Optional):
Framing Delimiter: The character that delimits byte sequences. Default: Empty
Framing Max Length: The maximum length of the byte buffer. This length does not include the trailing delimiter. By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases. If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This will ensure that processing is not truly unbounded. Default: None
Framing Method: The framing method. Default: None
OptionsDescriptionByte Frames
Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments).
Character Delimited
Byte frames which are delimited by a chosen character.
Length Delimited
Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length.
Newline Delimited
Byte frames which are delimited by a newline character.
Octet Counting
Byte frames according to the octet counting format.
Framing Newline Delimited Max Length: The maximum length of the byte buffer. This length does not include the trailing delimiter. By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases. If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This will ensure that processing is not truly unbounded. Default: None
Framing Octet Counting Max Length: The maximum length of the byte buffer. Default: Empty
Advanced Settings:
Endpoint (Optional): Custom endpoint if not using standard AWS such as for private clouds.
Examplehttp://127.0.0.0:5000/path/to/service
Parser Config:
Enable Source Log parser: (False)
Toggle Enable Source Log parser Switch to enable
Select appropriate Parser from the Source Log Parser dropdown
Add additional Parsers as needed
Pattern Extractor:
See Pattern Extractor for details.
Archival Destination:
Toggle Enable Archival on Source Switch to enable
Under Archival Destination, select from the list of Archival Destinations (Required)
Save and Test Configuration:
Save the configuration settings.
Send sample data and verify that it is ingested from the S3 bucket.
Example Scenarios
HealthCarePlus, a fictitious healthcare enterprise, operates a network of hospitals and telehealth services, generating vast amounts of patient data, medical logs, and compliance records stored in Amazon S3 buckets in JSON and Parquet formats. To enhance observability and ensure regulatory compliance, HealthCarePlus aims to ingest these logs, including patient interaction data and audit trails, into the Observo AI platform for real-time analysis and monitoring. The S3 bucket, healthcareplus-patient-logs, is configured to send s3:ObjectCreated:* events to an AWS SQS queue, healthcareplus-log-queue, to notify Observo AI of new data. A dedicated IAM role with necessary permissions ensures secure access to the bucket and queue. The configuration below outlines the steps to set up the AWS S3 source in Observo AI, adhering to the required fields specified in the Integration section of the provided document.
Standard AWS S3 Destination Setup
Here is a standard AWS S3 Source configuration example. Only the required sections and their associated field updates are displayed in the table below:
General Settings
Name
healthcareplus-s3-logs
Unique identifier for the S3 source
Description
Ingest patient and compliance logs from HealthCarePlus S3 bucket
Optional description of the source
Region
us-east-1
AWS region where the S3 bucket is located
Compression
Gzip
Decompresses Gzip-compressed objects retrieved from S3
Authentication
Access Key
AKIAHEALTHCAREPLUS123
AWS access key ID for authentication
Secret Access Key
wJalrXUtnHEALTHCAREPLUSKEY
AWS secret access key for authentication
Assume Role
arn:aws:iam::123456789012:role/healthcareplus-observo-role
IAM role for accessing SQS and S3
Assume Role Region
us-east-1
Region for STS assume role
Credentials File Path
/opt/observo/credentials/healthcareplus-credentials
Path to credentials file (optional if using access key)
Auth Imds Connect Timeout Seconds
5
Connect timeout for IMDS (optional)
Auth Imds Max Attempts
3
Number of IMDS retries for fetching tokens
Auth Imds Read Timeout Seconds
5
Read timeout for IMDS (optional)
Auth Load Timeout Secs
30
Timeout for loading credentials
Auth Profile
healthcare-prod
Credentials profile from credentials file
SQS Configuration
SQS Queue Url
https://sqs.us-east-1.amazonaws.com/123456789012/healthcareplus-log-queue
URL of the SQS queue for bucket notifications
SQS Visibility Timeout Secs
600
Time (in seconds) a message is unavailable after being received
SQS Client Concurrency
5
Number of concurrent tasks for polling the queue
SQS Delete Message
True
Deletes messages after processing
SQS Poll Secs
10
Time (in seconds) to wait while polling for new messages
SQS TLS Options CA File
/opt/observo/certs/ca.crt
Path to CA certificate file for SQS TLS
SQS TLS Options Crt File
/opt/observo/certs/healthcareplus.crt
Path to certificate file for SQS TLS
SQS TLS Options Key File
/opt/observo/certs/healthcareplus.key
Path to private key file for SQS TLS
SQS TLS Options Key Pass
HealthCare2025
Passphrase to unlock the encrypted key file
Multiline Settings
Multiline Condition Pattern
^(INFO|ERROR)
Regular expression to identify log lines for multiline grouping
Multiline Mode
Include Match
Includes all lines matching the pattern, suitable for log entries
Multiline Start Pattern
^(INFO|ERROR)
Regular expression to mark the start of a new message
Multiline Timeout Ms
1000
Maximum wait time (in milliseconds) for additional lines
TLS Options
TLS Options CA File
/opt/observo/certs/ca.crt
Path to CA certificate file for TLS
TLS Options Crt File
/opt/observo/certs/healthcareplus.crt
Path to certificate file for TLS
TLS Options Key File
/opt/observo/certs/healthcareplus.key
Path to private key file for TLS
TLS Options Key Passphrase
HealthCare2025
Passphrase to unlock the encrypted key file
TLS Options Verify Hostname
True
Verifies the hostname in the TLS certificate
Framing
Framing Delimiter
\n
Newline character to delimit byte sequences
Framing Max Length
1048576
Maximum byte buffer length (1MB) to prevent memory issues
Framing Method
Newline Delimited
Frames data by newline characters
Framing Newline Delimited Max Length
1048576
Maximum length for newline-delimited frames
Framing Octet Counting Max Length
Empty
Not used as newline-delimited framing is selected
Advanced Settings
Endpoint
None
Uses standard AWS endpoints (optional)
Additional Configuration
Parser Config: Enable Source Log Parser and select the JSON parser for patient and compliance logs, with an additional Parquet parser for structured data.
Pattern Extractor: Configure as per Observo AI’s Pattern Extractor documentation to extract fields like patient ID, timestamp, and event type from JSON and Parquet logs.
Archival Destination: Enable Archival on Source Switch and select healthcareplus-archive-bucket as the archival destination for compliance retention.
Save and Test: Save the configuration and upload sample JSON and Parquet files to the healthcareplus-patient-logs bucket. Verify data ingestion in the Observo AI Analytics tab to confirm successful setup.
Outcome
With this configuration, HealthCarePlus successfully ingests patient interaction data and compliance logs from its S3 bucket into Observo AI, enabling real-time monitoring of healthcare operations, anomaly detection in patient data, and adherence to regulatory requirements, thereby improving operational efficiency and patient care quality.
Troubleshooting
If issues arise with the AWS S3 source in Observo AI, use the following steps to diagnose and resolve them:
Verify Configuration Settings:
Ensure all fields such as Queue, Region, Filename Filter are correctly entered and match the AWS setup.
Confirm that the S3 bucket is configured to send s3:ObjectCreated:* events to the specified SQS queue (Amazon S3 Event Notifications).
Check Authentication:
Verify the authentication method:
For Auto authentication, ensure IAM roles, shared credentials, or environment variables are correctly configured.
For Manual authentication, check that the access key and secret key are valid.
For Secret authentication, confirm the secret is accessible in Observo AI.
Validate Permissions:
Ensure the credentials have the required permissions:
S3: s3:GetObject, s3:ListBucket.
SQS: sqs:ReceiveMessage, sqs:DeleteMessage, sqs:ChangeMessageVisibility, sqs:GetQueueAttributes, sqs:GetQueueUrl (Amazon SQS Permissions).
Network and Connectivity:
Check for firewall rules, VPC endpoint configurations, or proxy settings that may block access to AWS services.
Test connectivity using the AWS CLI with similar proxy configurations to verify access to S3 and SQS.
Common Error Messages:
“Inaccessible host”: May indicate TLS version mismatches such as TLS 1.3 issues or DNS problems. Ensure the host supports the required TLS version and check DNS settings.
“Missing credentials”: Verify that the authentication method is correctly configured. For IAM roles, ensure the role is assumed correctly.
“Bucket does not exist”: Check the bucket name and ensure there are no certificate validation issues. Consider disabling “Reject unauthorized certificates” or adding CA certificates if needed.
Monitor Logs and Data:
Verify that data is being ingested by monitoring the SQS queue and S3 bucket activity.
Use the Analytics tab in the targeted Observo AI pipeline to monitor data volume and ensure expected throughput
Data not ingested
Incorrect queue URL or bucket configuration
Verify S3 event notifications to SQS
Authentication errors
Invalid credentials or role
Check authentication method and permissions
Connectivity issues
Firewall or proxy blocking access
Test network connectivity and VPC endpoints
“Inaccessible host”
TLS or DNS issues
Ensure TLS compatibility and check DNS
“Missing credentials”
Authentication misconfiguration
Verify IAM roles or manual credentials
“Bucket does not exist”
Incorrect bucket name or certificate issues
Check bucket name and certificate settings
Resources
For additional guidance and detailed information, refer to the following resources:
Best Practices:
Refer to general best practices for integrating S3 with data streaming platforms, such as optimizing event notifications and file filtering.
Last updated
Was this helpful?

