AWS SQS
Integrate AWS SQS (Simple Queue Service) as a data source in the pipeline. The Observo dataplane will continuously poll the specified SQS queue, retrieve messages, process them through the pipeline, and delete them upon successful completion.
Purpose
The Observo AI AWS SQS Source enables users to ingest message-based data from Amazon SQS queues into Observo AI for real-time analysis and processing. It supports various message formats including JSON, plain text, and structured logs, making it ideal for handling application events, system notifications, and log streams delivered through SQS. This integration helps organizations build scalable event-driven architectures, streamline data pipelines, and improve operational visibility across distributed systems.
How it works
The AWS SQS source operates by continuously polling an AWS SQS queue for available messages. When messages are present in the queue, the Observo dataplane retrieves them using long polling for efficiency, processes each message through the configured Observo pipeline, and subsequently deletes the message from the queue upon successful processing. By default, each message is treated as an individual event in Observo, though various framing methods allow parsing of message content into multiple log entries or events when needed.
The source leverages SQS's visibility timeout mechanism to ensure reliable message processing. When a message is received, it becomes temporarily invisible to other consumers. If processing completes successfully within the timeout period, the message is deleted. If processing fails or exceeds the timeout, the message automatically becomes available again for reprocessing, ensuring no data loss while preventing duplicate processing under normal conditions.
Authentication Requirements
Permissions Required
sqs:ReceiveMessage
sqs:DeleteMessageAWS Authentication Mechanism
Observo recommends using instance profiles or roles in order to authenticate with AWS. If these are not possible, you can create an AWS access key for any user in your AWS account and use the Access Key ID and Secret to authenticate with AWS. Instructions on creating the access keys can be found here.Observo evaluates AWS credentials in the following order:
The Access Key and Secret Access Key options.
The AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables in the Observo Dataplane deployment.
Web Identity Token credentials from the environment or container (including EKS). These credentials will automatically refresh when expired.
ECS credentials (IAM roles for tasks). These credentials will automatically refresh when expired.
The IAM instance profile (only works if running on an EC2 instance with an instance profile/role). Requires IMDSv2 to be enabled. For EKS, you may need to increase the metadata token response hop limit to 2. These credentials will automatically refresh when expired.
Role Definition Example
When using the Assume Role authentication method, you'll need to create an IAM role in your AWS account with the appropriate permissions and trust relationship. Below are examples of how to define this role:
Replace the following placeholders:
your-region: The AWS region where your SQS queue is located
your-account-id: Your AWS account ID
your-queue-name: The name of your SQS queue
Trust Relationship Policy
The role must have a trust relationship that allows Observo's service to assume the role you're creating in your account. Use the following trust policy:
Prerequisites
Before configuring the AWS SQS source in Observo AI, ensure the following requirements are met to facilitate seamless data ingestion:
AWS Account and Permissions:
An active AWS account with access to the target SQS queues is required.
The SQS queue must be configured and actively receiving messages from your applications or services.
Required IAM permissions:
For SQS: sqs:ReceiveMessage, sqs:DeleteMessage, sqs:ChangeMessageVisibility, sqs:GetQueueAttributes, and sqs:GetQueueUrl (Amazon SQS Permissions).
Authentication:
Prepare one of the following authentication methods:
Auto Authentication: Use IAM roles, shared credentials, environment variables, or a JSON file.
Manual Authentication: Provide an AWS access key and secret key.
Secret Authentication: Use a stored secret within Observo AI's secure storage.
Network and Connectivity:
Ensure Observo AI can communicate with AWS SQS endpoints. If using VPC endpoints for SQS, verify their configuration.
Check for proxy settings or firewall rules that may affect connectivity to AWS SQS endpoints.
Observo AI Platform
Must be installed and support SQS
Verify throughput capabilities
AWS Account
Active account with SQS access
Queue must be actively receiving
IAM Permissions
Required for SQS access
See permissions list above
Authentication
Auto, Manual, or Secret
Prepare credentials accordingly
Network
Connectivity to AWS SQS endpoints
Check VPC endpoints and proxies
Integration
To configure AWS SQS as a source in Observo AI, follow these steps to set up and test the data flow:
Log in to Observo AI:
Navigate to Sources Tab
Click on "Add Source" button and select "Create New"
Choose "AWS SQS" from the list of available sources to begin configuration.
General Settings:
Name: Provide a unique identifier for the destination, e.g., sqs-security-events.
Description (Optional): Add a brief description of the destination's purpose.
SQS Queue Url: The URL of the SQS queue to poll for messages.
Examplehttps://sqs.us-east-2.amazonaws.com/999999999999/DummyQueue
Region: The AWS region where the SQS queue exists.
Exampleus-east-1
Visibility Timeout Secs (Optional): The visibility timeout to use for messages, in seconds. This controls how long a message is left unavailable after it is received. If a message is received and takes longer than visibility_timeout_secs to process and delete the message from the queue, it is made available again for another consumer. This can happen if there is an issue between consuming a message and deleting it. Default: 300
SQS Delete Message (Optional): Whether to delete the message once it is processed. It can be useful to set this to false for debugging or during the initial setup. Default: True
SQS Poll Secs (Optional): How long to wait while polling the queue for new messages, in seconds. Generally should not be changed unless instructed to do so, as if messages are available, they will always be consumed, regardless of the value of poll_secs. Default: 15
Authentication:
Access Key: Enter the AWS access key ID to use for assumed role.
ExampleAKIAIOSFODNN7EXAMPLE
Secret Access Key: Enter the AWS secret access key to use for assume role.
ExamplewJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
Assume Role: Enter the IAM role to use to read events from SQS.
Examplearn:aws:iam::999999999999:role/dummy_role
Assume Role Region: Enter the region to be used for STS assume role.
Exampleus-east-1
External Id: The external ID to use when assuming a role.
Example12345
Credentials File Path: Specify the path to credentials file (if you do not use access key & secret to authenticate).
Example/my/aws/credentials
Auth Imds Connect Timeout Seconds (Optional): Connect timeout for IMDS. Default: Empty
Auth Imds Max Attempts (Optional): Enter number of IMDS retries for fetching tokens and metadata. Default: Empty
Auth Imds Read Timeout Seconds (Optional): Read timeout for IMDS. Default: Empty
Auth Load Timeout Secs (Optional): Enter timeout for successfully loading any credentials, in seconds.
Example30
Auth Profile (Optional): The credentials profile to use. Used to select AWS credentials from a provided credentials file. Default: Empty
Exampledevelop
Framing (Optional):
Framing Delimiter: The character that delimits byte sequences. Default: Empty
Framing Max Length: The maximum length of the byte buffer. This length does not include the trailing delimiter. By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases. If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This will ensure that processing is not truly unbounded. Default: None
Framing Method: The framing method. Default: None
OptionsDescriptionByte Frames
Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments).
Character Delimited
Byte frames which are delimited by a chosen character.
Length Delimited
Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length.
Newline Delimited
Byte frames which are delimited by a newline character.
Octet Counting
Byte frames according to the octet counting format.
Framing Newline Delimited Max Length: The maximum length of the byte buffer. This length does not include the trailing delimiter. By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases. If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This will ensure that processing is not truly unbounded. Default: None
Framing Octet Counting Max Length: The maximum length of the byte buffer. Default: Empty
TLS Options (Optional):
TLS Options CA File (Empty): Absolute path to an additional CA certificate file. The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.
Example/path/to/certificate_authority.crt
TLS Options Crt File (Empty): Absolute path to a certificate file used to identify this server. The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as an inline string in PEM format. If this is set, and is not a PKCS#12 archive, key_file must also be set.
Example/path/to/host_certificate.crt
TLS Options Key File (Empty): Absolute path to a private key file used to identify this server. The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.
Example/path/to/host_certificate.key
TLS Options Key Passphrase (Empty): Passphrase to unlock the encrypted key file, if applicable.
Example${KEY_PASSWORD_ENV_VAR}
PassWord1
TLS Options Verify Hostname (False): Enables hostname verification. If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension. Only relevant for outgoing connections. Do NOT set this to false unless you understand the risks of not verifying the remote hostname.
Advanced Settings:
Endpoint (Optional): Custom endpoint if not using standard AWS such as for private clouds or AWS-compatible services.
Examplehttp://127.0.0.0:5000/path/to/service
SQS Client Concurrency (Optional): Number of concurrent tasks to create for polling the queue for messages. Defaults to the number of available CPUs on the system. Should not typically need to be changed, but it can sometimes be beneficial to raise this value when there is a high rate of messages being pushed into the queue and the messages being processed are small. In these cases, system resources may not be fully utilized without fetching more messages per second.
Parser Config:
Enable Source Log parser: (False)
Toggle Enable Source Log parser Switch to enable
Select appropriate Parser from the Source Log Parser dropdown
Add additional Parsers as needed
Pattern Extractor:
See Pattern Extractor for details.
Archival Destination:
Toggle Enable Archival on Source Switch to enable
Under Archival Destination, select from the list of Archival Destinations (Required)
Save and Test Configuration:
Save the configuration settings.
Send sample messages to the SQS queue and verify that they are ingested and processed correctly.
Troubleshooting
If issues arise with the AWS SQS source in Observo AI, use the following steps to diagnose and resolve them:
Verify Configuration Settings:
Ensure all fields such as SQS Queue Url, Region, and Visibility Timeout are correctly entered and match the AWS setup.
Confirm that the SQS queue is actively receiving messages and that applications are publishing to the correct queue.
Check Authentication:
Verify the authentication method:
For Auto authentication, ensure IAM roles, shared credentials, or environment variables are correctly configured.
For Manual authentication, check that the access key and secret key are valid and have not expired.
For Secret authentication, confirm the secret is accessible in Observo AI.
Validate Permissions:
Ensure the credentials have the required permissions:
SQS: sqs:ReceiveMessage, sqs:DeleteMessage, sqs:ChangeMessageVisibility, sqs:GetQueueAttributes, sqs:GetQueueUrl (Amazon SQS Permissions).
Test permissions using the AWS CLI to verify that the credentials can receive and delete messages from the queue.
Network and Connectivity:
Check for firewall rules, VPC endpoint configurations, or proxy settings that may block access to AWS SQS.
Test connectivity using the AWS CLI with similar proxy configurations to verify access to SQS endpoints.
If using VPC endpoints, ensure they are properly configured for the SQS service.
Common Error Messages:
"Inaccessible host": May indicate TLS version mismatches such as TLS 1.3 issues or DNS problems. Ensure the host supports the required TLS version and check DNS settings.
"Missing credentials": Verify that the authentication method is correctly configured. For IAM roles, ensure the role is assumed correctly and has the necessary trust relationship.
"Queue does not exist": Check the queue URL for typos and ensure the queue exists in the specified region. Verify there are no certificate validation issues.
"Access Denied": Confirm that the IAM policy attached to the role or user includes the required SQS permissions for the specific queue ARN.
Monitor Queue Activity:
Use the Analytics tab in the targeted Observo AI pipeline to monitor data volume and ensure expected throughput.
Message Processing Issues:
If messages are not being deleted, verify that SQS Delete Message is set to True and that processing completes within the visibility timeout period.
Increase Visibility Timeout Secs if processing takes longer than the current setting to prevent premature message reappearance.
Review Observo AI logs for parsing errors that might indicate message format issues or framing configuration problems.
Messages not ingested
Incorrect queue URL or region
Verify queue URL and region configuration
Authentication errors
Invalid credentials or expired keys
Check authentication method and renew if needed
Connectivity issues
Firewall or proxy blocking access
Test network connectivity and VPC endpoints
"Inaccessible host"
TLS or DNS issues
Ensure TLS compatibility and check DNS
"Missing credentials"
Authentication misconfiguration
Verify IAM roles or manual credentials
"Queue does not exist"
Incorrect queue URL or certificate issues
Check queue URL and certificate settings
"Access Denied"
Insufficient IAM permissions
Review and update IAM policy for queue access
Messages reappear in queue
Visibility timeout too short
Increase visibility timeout duration
High processing latency
Insufficient client concurrency
Increase SQS Client Concurrency setting
Memory issues
Malformed messages without max length limits
Set appropriate Framing Max Length values
Resources
For additional guidance and detailed information, refer to the following resources:
Best Practices:
Configure Dead Letter Queues (DLQ) on your SQS queue to capture messages that fail processing repeatedly.
Monitor queue depth and age of oldest message to detect processing bottlenecks early.
Last updated
Was this helpful?

