AWS SQS

Integrate AWS SQS (Simple Queue Service) as a data source in the pipeline. The Observo dataplane will continuously poll the specified SQS queue, retrieve messages, process them through the pipeline, and delete them upon successful completion.

Purpose

The Observo AI AWS SQS Source enables users to ingest message-based data from Amazon SQS queues into Observo AI for real-time analysis and processing. It supports various message formats including JSON, plain text, and structured logs, making it ideal for handling application events, system notifications, and log streams delivered through SQS. This integration helps organizations build scalable event-driven architectures, streamline data pipelines, and improve operational visibility across distributed systems.

How it works

The AWS SQS source operates by continuously polling an AWS SQS queue for available messages. When messages are present in the queue, the Observo dataplane retrieves them using long polling for efficiency, processes each message through the configured Observo pipeline, and subsequently deletes the message from the queue upon successful processing. By default, each message is treated as an individual event in Observo, though various framing methods allow parsing of message content into multiple log entries or events when needed.

The source leverages SQS's visibility timeout mechanism to ensure reliable message processing. When a message is received, it becomes temporarily invisible to other consumers. If processing completes successfully within the timeout period, the message is deleted. If processing fails or exceeds the timeout, the message automatically becomes available again for reprocessing, ensuring no data loss while preventing duplicate processing under normal conditions.

Authentication Requirements

Permissions Required

sqs:ReceiveMessage
sqs:DeleteMessage

AWS Authentication Mechanism

Observo recommends using instance profiles or roles in order to authenticate with AWS. If these are not possible, you can create an AWS access key for any user in your AWS account and use the Access Key ID and Secret to authenticate with AWS. Instructions on creating the access keys can be found here.

Observo evaluates AWS credentials in the following order:

  1. The Access Key and Secret Access Key options.

  2. The AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables in the Observo Dataplane deployment.

  3. Web Identity Token credentials from the environment or container (including EKS). These credentials will automatically refresh when expired.

  4. ECS credentials (IAM roles for tasks). These credentials will automatically refresh when expired.

  5. The IAM instance profile (only works if running on an EC2 instance with an instance profile/role). Requires IMDSv2 to be enabled. For EKS, you may need to increase the metadata token response hop limit to 2. These credentials will automatically refresh when expired.

Role Definition Example

When using the Assume Role authentication method, you'll need to create an IAM role in your AWS account with the appropriate permissions and trust relationship. Below are examples of how to define this role:

Replace the following placeholders:

  • your-region: The AWS region where your SQS queue is located

  • your-account-id: Your AWS account ID

  • your-queue-name: The name of your SQS queue

Trust Relationship Policy

The role must have a trust relationship that allows Observo's service to assume the role you're creating in your account. Use the following trust policy:

Prerequisites

Before configuring the AWS SQS source in Observo AI, ensure the following requirements are met to facilitate seamless data ingestion:

  • AWS Account and Permissions:

    • An active AWS account with access to the target SQS queues is required.

    • The SQS queue must be configured and actively receiving messages from your applications or services.

    • Required IAM permissionsarrow-up-right:

  • Authentication:

    • Prepare one of the following authentication methods:

      • Auto Authentication: Use IAM roles, shared credentials, environment variables, or a JSON file.

      • Manual Authentication: Provide an AWS access key and secret key.

      • Secret Authentication: Use a stored secret within Observo AI's secure storage.

  • Network and Connectivity:

    • Ensure Observo AI can communicate with AWS SQS endpoints. If using VPC endpoints for SQS, verify their configuration.

    • Check for proxy settings or firewall rules that may affect connectivity to AWS SQS endpoints.

Prerequisite
Description
Notes

Observo AI Platform

Must be installed and support SQS

Verify throughput capabilities

AWS Account

Active account with SQS access

Queue must be actively receiving

IAM Permissions

Required for SQS access

See permissions list above

Authentication

Auto, Manual, or Secret

Prepare credentials accordingly

Network

Connectivity to AWS SQS endpoints

Check VPC endpoints and proxies

Integration

To configure AWS SQS as a source in Observo AI, follow these steps to set up and test the data flow:

  1. Log in to Observo AI:

    • Navigate to Sources Tab

    • Click on "Add Source" button and select "Create New"

    • Choose "AWS SQS" from the list of available sources to begin configuration.

  2. General Settings:

    • Name: Provide a unique identifier for the destination, e.g., sqs-security-events.

    • Description (Optional): Add a brief description of the destination's purpose.

    • SQS Queue Url: The URL of the SQS queue to poll for messages.

      Example

      https://sqs.us-east-2.amazonaws.com/999999999999/DummyQueue

  • Region: The AWS region where the SQS queue exists.

    Example

    us-east-1

  • Visibility Timeout Secs (Optional): The visibility timeout to use for messages, in seconds. This controls how long a message is left unavailable after it is received. If a message is received and takes longer than visibility_timeout_secs to process and delete the message from the queue, it is made available again for another consumer. This can happen if there is an issue between consuming a message and deleting it. Default: 300

  • SQS Delete Message (Optional): Whether to delete the message once it is processed. It can be useful to set this to false for debugging or during the initial setup. Default: True

  • SQS Poll Secs (Optional): How long to wait while polling the queue for new messages, in seconds. Generally should not be changed unless instructed to do so, as if messages are available, they will always be consumed, regardless of the value of poll_secs. Default: 15

  1. Authentication:

    • Access Key: Enter the AWS access key ID to use for assumed role.

      Example

      AKIAIOSFODNN7EXAMPLE

    • Secret Access Key: Enter the AWS secret access key to use for assume role.

      Example

      wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY

    • Assume Role: Enter the IAM role to use to read events from SQS.

      Example

      arn:aws:iam::999999999999:role/dummy_role

    • Assume Role Region: Enter the region to be used for STS assume role.

      Example

      us-east-1

    • External Id: The external ID to use when assuming a role.

      Example

      12345

    • Credentials File Path: Specify the path to credentials file (if you do not use access key & secret to authenticate).

      Example

      /my/aws/credentials

    • Auth Imds Connect Timeout Seconds (Optional): Connect timeout for IMDS. Default: Empty

    • Auth Imds Max Attempts (Optional): Enter number of IMDS retries for fetching tokens and metadata. Default: Empty

    • Auth Imds Read Timeout Seconds (Optional): Read timeout for IMDS. Default: Empty

    • Auth Load Timeout Secs (Optional): Enter timeout for successfully loading any credentials, in seconds.

      Example

      30

    • Auth Profile (Optional): The credentials profile to use. Used to select AWS credentials from a provided credentials file. Default: Empty

      Example

      develop

  2. Framing (Optional):

    • Framing Delimiter: The character that delimits byte sequences. Default: Empty

    • Framing Max Length: The maximum length of the byte buffer. This length does not include the trailing delimiter. By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases. If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This will ensure that processing is not truly unbounded. Default: None

    • Framing Method: The framing method. Default: None

      Options
      Description

      Byte Frames

      Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments).

      Character Delimited

      Byte frames which are delimited by a chosen character.

      Length Delimited

      Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length.

      Newline Delimited

      Byte frames which are delimited by a newline character.

      Octet Counting

      Byte frames according to the octet counting format.

    • Framing Newline Delimited Max Length: The maximum length of the byte buffer. This length does not include the trailing delimiter. By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases. If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This will ensure that processing is not truly unbounded. Default: None

    • Framing Octet Counting Max Length: The maximum length of the byte buffer. Default: Empty

  3. TLS Options (Optional):

    • TLS Options CA File (Empty): Absolute path to an additional CA certificate file. The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.

      Example

      /path/to/certificate_authority.crt

    • TLS Options Crt File (Empty): Absolute path to a certificate file used to identify this server. The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as an inline string in PEM format. If this is set, and is not a PKCS#12 archive, key_file must also be set.

      Example

      /path/to/host_certificate.crt

    • TLS Options Key File (Empty): Absolute path to a private key file used to identify this server. The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.

      Example

      /path/to/host_certificate.key

    • TLS Options Key Passphrase (Empty): Passphrase to unlock the encrypted key file, if applicable.

      Example

      ${KEY_PASSWORD_ENV_VAR}

      PassWord1

    • TLS Options Verify Hostname (False): Enables hostname verification. If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension. Only relevant for outgoing connections. Do NOT set this to false unless you understand the risks of not verifying the remote hostname.

  4. Advanced Settings:

    • Endpoint (Optional): Custom endpoint if not using standard AWS such as for private clouds or AWS-compatible services.

      Example

      http://127.0.0.0:5000/path/to/service

    • SQS Client Concurrency (Optional): Number of concurrent tasks to create for polling the queue for messages. Defaults to the number of available CPUs on the system. Should not typically need to be changed, but it can sometimes be beneficial to raise this value when there is a high rate of messages being pushed into the queue and the messages being processed are small. In these cases, system resources may not be fully utilized without fetching more messages per second.

  5. Parser Config:

    • Enable Source Log parser: (False)

    • Toggle Enable Source Log parser Switch to enable

      • Select appropriate Parser from the Source Log Parser dropdown

      • Add additional Parsers as needed

  6. Pattern Extractor:

  7. Archival Destination:

    • Toggle Enable Archival on Source Switch to enable

    • Under Archival Destination, select from the list of Archival Destinations (Required)

  8. Save and Test Configuration:

    • Save the configuration settings.

    • Send sample messages to the SQS queue and verify that they are ingested and processed correctly.

Troubleshooting

If issues arise with the AWS SQS source in Observo AI, use the following steps to diagnose and resolve them:

  • Verify Configuration Settings:

    • Ensure all fields such as SQS Queue Url, Region, and Visibility Timeout are correctly entered and match the AWS setup.

    • Confirm that the SQS queue is actively receiving messages and that applications are publishing to the correct queue.

  • Check Authentication:

    • Verify the authentication method:

      • For Auto authentication, ensure IAM roles, shared credentials, or environment variables are correctly configured.

      • For Manual authentication, check that the access key and secret key are valid and have not expired.

      • For Secret authentication, confirm the secret is accessible in Observo AI.

  • Validate Permissions:

    • Ensure the credentials have the required permissions:

    • Test permissions using the AWS CLI to verify that the credentials can receive and delete messages from the queue.

  • Network and Connectivity:

    • Check for firewall rules, VPC endpoint configurations, or proxy settings that may block access to AWS SQS.

    • Test connectivity using the AWS CLI with similar proxy configurations to verify access to SQS endpoints.

    • If using VPC endpoints, ensure they are properly configured for the SQS service.

  • Common Error Messages:

    • "Inaccessible host": May indicate TLS version mismatches such as TLS 1.3 issues or DNS problems. Ensure the host supports the required TLS version and check DNS settings.

    • "Missing credentials": Verify that the authentication method is correctly configured. For IAM roles, ensure the role is assumed correctly and has the necessary trust relationship.

    • "Queue does not exist": Check the queue URL for typos and ensure the queue exists in the specified region. Verify there are no certificate validation issues.

    • "Access Denied": Confirm that the IAM policy attached to the role or user includes the required SQS permissions for the specific queue ARN.

  • Monitor Queue Activity:

    • Use the Analytics tab in the targeted Observo AI pipeline to monitor data volume and ensure expected throughput.

  • Message Processing Issues:

    • If messages are not being deleted, verify that SQS Delete Message is set to True and that processing completes within the visibility timeout period.

    • Increase Visibility Timeout Secs if processing takes longer than the current setting to prevent premature message reappearance.

    • Review Observo AI logs for parsing errors that might indicate message format issues or framing configuration problems.

Issue
Possible Cause
Resolution

Messages not ingested

Incorrect queue URL or region

Verify queue URL and region configuration

Authentication errors

Invalid credentials or expired keys

Check authentication method and renew if needed

Connectivity issues

Firewall or proxy blocking access

Test network connectivity and VPC endpoints

"Inaccessible host"

TLS or DNS issues

Ensure TLS compatibility and check DNS

"Missing credentials"

Authentication misconfiguration

Verify IAM roles or manual credentials

"Queue does not exist"

Incorrect queue URL or certificate issues

Check queue URL and certificate settings

"Access Denied"

Insufficient IAM permissions

Review and update IAM policy for queue access

Messages reappear in queue

Visibility timeout too short

Increase visibility timeout duration

High processing latency

Insufficient client concurrency

Increase SQS Client Concurrency setting

Memory issues

Malformed messages without max length limits

Set appropriate Framing Max Length values

Resources

For additional guidance and detailed information, refer to the following resources:

Last updated

Was this helpful?