GCP GCS

Fetch observability events from GCP’s GCS system. Configure the GCS bucket to send change notifications to an PubSub topic. The Observo dataplane will then read messages from the PubSub topic and fetch the corresponding objects from the GCS bucket.

Purpose

The purpose of the Observo AI Google Cloud Storage (GCS) Collector source is to enable users to ingest data from Google Cloud Storage buckets into the Observo AI platform for analysis and processing. It supports formats like JSON, CSV, Parquet, and plain text, facilitating the collection of data such as logs, metrics, or events. This integration helps organizations streamline data pipelines, enhance observability, and support use cases like monitoring, analytics, and security by processing data from GCS in real time or through scheduled ingestion.

Prerequisites

Before configuring the Google Cloud Storage (GCS) Collector source in Observo AI, ensure the following requirements are met to facilitate seamless data ingestion:

  • Observo AI Platform Setup:

    • The Observo AI platform must be installed and operational, with support for GCS as a data source.

    • Verify that the platform supports common data formats such as JSON, CSV, Parquet, or plain text. Additional formats may require specific parser configurations.

  • Google Cloud Storage Bucket Access:

    • An active Google Cloud Storage bucket with data to be ingested is required.

    • The bucket must be configured to send OBJECT_FINALIZE events to a Google Cloud Pub/Sub topic, which Observo AI will use to poll for new objects.

    • Obtain the bucket name and Google Cloud project ID from the Google Cloud Console.

  • Authentication:

    • Prepare the following authentication method:

      • Service Account Credentials: Obtain a service account JSON key file with permissions to read from the target bucket and subscribe to the Pub/Sub topic.

      • Required permissions: Include storage.objects.get, storage.objects.list, and pubsub.subscriptions.consume. For details, see Google Cloud Access Control.

  • Network and Connectivity:

    • Ensure Observo AI can communicate with Google Cloud Storage (storage.googleapis.com) and Pub/Sub (pubsub.googleapis.com) endpoints.

    • Check for proxy settings, firewall rules, or VPC Service Controls that may affect connectivity to Google Cloud services.

Prerequisite
Description
Notes

Observo AI Platform

Must be installed and support GCS sources

Verify support for JSON, CSV, Parquet, etc.; additional parsers may be needed

GCS Bucket

Active bucket with Pub/Sub notifications

Configure OBJECT_FINALIZE events to Pub/Sub topic

Authentication

Service Account JSON key

Ensure permissions for bucket read and Pub/Sub subscription

Network

Connectivity to GCS and Pub/Sub endpoints

Check VPC Service Controls, proxies, and firewalls

Integration

To configure GCP GCS as a source in Observo AI, follow these steps to set up and test the data flow:

  1. Log in to Observo AI:

    • Navigate to the Sources tab.

    • Click the Add Source button and select Create New.

    • Choose GCP GCS from the list of available sources to begin configuration.

  2. General Settings:

    • Name: A unique identifier for the source, such as gcs-source-1.

    • Description (Optional): Provide a description for the source.

    • Project: The project name from which to pull logs , such as my-gcs-bucket.

    • Decoding (Optional): The codec to use for decoding events. Default: Bytes

      Options
      Description

      Bytes

      Encodes data as raw binary byte stream

      JSON

      Human-readable structured text using key-value pairs

      Native

      GCP-specific optimized encoding for internal efficiency

      Native JSON

      Combines GCP-native features with JSON readability

  3. PubSub Configuration:

    • Subscription: The subscription within the project which is configured to receive logs.

    • Credentials Path: Path to a [service account] credentials JSON file. Either an API key, or a path to a service account credentials JSON file can be specified. If both are unset, the GOOGLE_APPLICATION_CREDENTIALS environment variable is checked for a filename. If no filename is named, an attempt is made to fetch an instance service account for the compute instance the program is running on. If this is not on a GCE instance, then you must define it with an API key or service account credentials JSON file.

      Example

      /my/path/credentials.json

    • Acknowledgement deadline: The acknowledgement deadline, in seconds, to use for this stream. Messages that are not acknowledged when this deadline expires may be retransmitted. Default: 600

    • Endpoint: The endpoint from which to pull data. Default: https://pubsub.googleapis.com

      Example

      https://us-central1-pubsub.googleapis.com

  • Full response size: The number of messages in a response to mark a stream as “busy”. This is used to determine if more streams should be started. The GCP Pub/Sub servers send responses with 100 or more messages when the subscription is busy. Default: 100

  1. Multiline Settings (Optional):

    • Multiline Condition Pattern: Regular expression pattern that is used to determine whether or not more lines should be read. This setting must be configured in conjunction with mode.

      Example

      ^[\s]+

      \\$

      ^(INFO

      ;$

    • Multiline Mode: Specifies how log lines are grouped.

      Options
      Description

      Include +1

      Includes all lines matching the pattern and one additional line.

      Include Match

      Includes all lines matching the pattern. Useful for stack traces or continuation indicators

      Stop Before

      Groups all lines until a line matches the pattern, indicating the start of a new message

      Stop After

      Groups all lines up to and including the line that matches the pattern, which often marks the end of a message

      This setting must be configured with condition_pattern.

    • Multiline Start Pattern: Regular expression pattern that is used to match the start of a new message.

      Example

      ^[\s]+

      \\$

      ^(INFO

      ;$

    • Multiline Timeout Ms: The maximum amount of time to wait for the next additional line, in milliseconds. Once this timeout is reached, the buffered message is guaranteed to be flushed, even if incomplete.

      Examples

      1000

      600000

  2. Proxy:

    • Enable proxy (True): Enables proxying support.

    • HTTP proxy endpoint (Optional): Proxy endpoint to use when proxying HTTP traffic.

      Example

      http://foo.bar:3128

    • HTTPS proxy endpoint (Optional): Proxy endpoint to use when proxying HTTPS traffic.

      Example

      https://foo.bar:3128

    • Host list to disable proxying (Add as needed): A list of hosts to avoid proxying. Multiple patterns are allowed. Pattern Example match Domain names example.com matches requests to example.com Wildcard domains .example.com matches requests to example.com and its subdomains IP addresses 127.0.0.1 matches requests to 127.0.0.1 CIDR blocks 192.168.0.0/16 matches requests to any IP addresses in this range Splat * matches all hosts.

      Example

      http://foo.bar:3128

  3. Framing (Optional):

    • Framing Delimiter: The character that delimits byte sequences. Default: Empty

    • Framing Max Length: The maximum length of the byte buffer. This length does not include the trailing delimiter. By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases. If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This will ensure that processing is not truly unbounded. Default: None

    • Framing Method: The framing method. Default: None

      Options
      Description

      Byte Frames

      Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments).

      Character Delimited

      Byte frames which are delimited by a chosen character.

      Length Delimited

      Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length.

      Newline Delimited

      Byte frames which are delimited by a newline character.

      Octet Counting

      Byte frames according to the octet counting format.

    • Framing Newline Delimited Max Length: The maximum length of the byte buffer. This length does not include the trailing delimiter. By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases. If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This will ensure that processing is not truly unbounded. Default: None

    • Framing Octet Counting Max Length: The maximum length of the byte buffer. Default: Empty

  4. TLS Configuration (Optional):

    • TLS CA File (Empty): Absolute path to an additional CA certificate file.

      Example

      /path/to/certificate_authority.crt

    • TLS Crt File (Empty): Absolute path to a certificate file used to identify this server.

      Example

      /path/to/host_certificate.crt

    • TLS Key File (Empty): Absolute path to a private key file used to identify this server.

      Example

      /path/to/host_certificate.key

    • TLS Key Pass (Empty): Passphrase used to unlock the encrypted key file. This has no effect unless key_file is set.

      Examples

      ${KEY_PASS_ENV_VAR}

      PassWord1

    • Verify TLS certificate (True): Enables certificate verification. If enabled, certificates must not be expired and must be issued by a trusted issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and so on until the verification process reaches a root certificate. Relevant for both incoming and outgoing connections. Do NOT set this to false unless you understand the risks of not verifying the validity of certificates

    • TLS Verify Hostname(True): Enables hostname verification. If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension. Only relevant for outgoing connections. Do NOT set this to false unless you understand the risks of not verifying the remote hostname.

  5. Advanced Settings:

    • API key (Optional): Either an API key or a path to a service account credentials JSON file can be specified. If both are unset, the GOOGLE_APPLICATION_CREDENTIALS environment variable is checked for a filename. If no filename is named, an attempt is made to fetch an instance service account for the compute instance the program is running on. If this is not on a GCE instance, then you must define it with an API key or service account credentials JSON file.

    • Keepalive Seconds: The amount of time, in seconds, with no received activity before sending a keepalive request. If this is set larger than 60, you may see periodic errors sent from the server. Default: 60

    • Max concurrency: The maximum number of concurrent stream connections to open at once. Default: 10.

    • Poll time: How often to poll the currently active streams to see if they are all busy and so open a new stream. Default: 30

    • Retry delay: The amount of time, in seconds, to wait between retry attempts after an error. Default: Empty

  6. Parser Config:

    • Enable Source Log Parser: Disabled by default. Toggle to enable and select an appropriate parser for CrowdStrike data such as JSON parser from the Source Log Parser dropdown.

    • Add additional parsers as needed for specific data formats.

  7. Pattern Extractor:

    • Refer to Observo AI’s Pattern Extractor documentation for details on configuring pattern-based data extraction.

  8. Archival Destination:

    • Toggle Enable Archival on Source Switch to enable

    • Under Archival Destination, select from the list of Archival Destinations (Required)

  9. Save and Test Configuration:

    • Save the configuration settings in Observo AI.

    • Send sample data to the GCS bucket and verify ingestion in the Analytics tab to confirm data flow.

Example Scenarios

RetailRiser, a fictitious global retail enterprise, seeks to enhance its observability and analytics by ingesting JSON and CSV transactional logs, customer interaction data, and inventory metrics from its Google Cloud Storage (GCS) bucket, retailriser-transaction-logs, into the Observo AI platform to monitor real-time sales trends, detect customer behavior anomalies, and optimize inventory management. The bucket is configured to send OBJECT_FINALIZE events to a Google Cloud Pub/Sub topic, retailriser-log-events, with a service account ensuring secure access. The following configuration steps, adhering to the required fields in the Observo AI Integration section, outline the setup for seamless data ingestion.

Standard GCP GCS Source Setup

Here is a standard GCP GCS Source configuration example. Only the required sections and their associated field updates are displayed in the table below:

General Settings

Field
Value
Description

Name

retailriser-gcs-logs

Unique identifier for the GCS source

Description

Ingest transactional and customer data from RetailRiser's GCS bucket

Optional description of the source

Project

retailriser-project-2025

Google Cloud project ID containing the GCS bucket

Decoding

JSON

Decodes events as JSON for structured data processing

PubSub Configuration

Field
Value
Description

Subscription

projects/retailriser-project-2025/subscriptions/retailriser-log-sub

Pub/Sub subscription for receiving OBJECT_FINALIZE events

Credentials Path

/opt/observo/credentials/retailriser-service-account.json

Path to the service account JSON key file

Acknowledgement Deadline

600

Time (in seconds) for message acknowledgement to prevent retransmission

Endpoint

https://us-central1-pubsub.googleapis.com

Pub/Sub endpoint for data retrieval

Full Response Size

100

Number of messages indicating a busy stream

Multiline Settings

Field
Value
Description

Multiline Condition Pattern

^(INFO|ERROR)

Regular expression to identify log lines for multiline grouping

Multiline Mode

Include Match

Includes all lines matching the pattern, suitable for log entries

Multiline Start Pattern

^(INFO|ERROR)

Regular expression to mark the start of a new message

Multiline Timeout Ms

1000

Maximum wait time (in milliseconds) for additional lines

Proxy

Field
Value
Description

Enable Proxy

True

Enables proxy support for network traffic

HTTP Proxy Endpoint

http://proxy.retailriser.com:3128

Proxy endpoint for HTTP traffic

HTTPS Proxy Endpoint

https://proxy.retailriser.com:3128

Proxy endpoint for HTTPS traffic

Host List to Disable Proxying

*.retailriser.com

Avoids proxying for internal RetailRiser domains

Framing

Field
Value
Description

Framing Delimiter

\n

Newline character to delimit byte sequences

Framing Max Length

1048576

Maximum byte buffer length (1MB) to prevent memory issues

Framing Method

Newline Delimited

Frames data by newline characters

Framing Newline Delimited Max Length

1048576

Maximum length for newline-delimited frames

Framing Octet Counting Max Length

Empty

Not used as newline-delimited framing is selected

TLS Configuration

Field
Value
Description

TLS CA File

/opt/observo/certs/ca.crt

Path to the CA certificate file

TLS Crt File

/opt/observo/certs/retailriser.crt

Path to the server certificate file

TLS Key File

/opt/observo/certs/retailriser.key

Path to the private key file

TLS Key Pass

RetailRiser2025

Passphrase to unlock the encrypted key file

Verify TLS Certificate

True

Enables certificate verification for security

TLS Verify Hostname

True

Verifies the hostname in the TLS certificate

Advanced Settings

Field
Value
Description

API Key

None

Uses service account credentials instead

Keepalive Seconds

60

Time before sending a keepalive request

Max Concurrency

10

Maximum concurrent stream connections

Poll Time

30

Frequency (in seconds) to poll active streams

Retry Delay

5

Time (in seconds) between retry attempts after errors

Additional Configuration

  • Parser Config: Enable Source Log Parser and select the JSON parser to process transactional and customer data logs.

  • Pattern Extractor: Configure as per Observo AI’s Pattern Extractor documentation to extract fields like transaction ID, customer ID, and timestamp from JSON logs.

  • Archival Destination: Enable Archival on Source Switch and select retailriser-archive-bucket as the archival destination for long-term storage.

  • Save and Test: Save the configuration and upload sample JSON files to retailriser-transaction-logs bucket. Verify data ingestion in the Observo AI Analytics tab to confirm successful setup.

Outcome

With this configuration, RetailRiser successfully ingests transactional and customer data from its GCS bucket into Observo AI. The setup enables real-time monitoring of sales trends, anomaly detection in customer interactions, and optimized inventory management, enhancing RetailRiser's operational efficiency and decision-making capabilities.

Troubleshooting

If issues arise with the GCS source in Observo AI, use the following steps to diagnose and resolve them:

  • Verify Configuration Settings:

    • Ensure all fields, such as bucket name, Pub/Sub subscription URL, and parser settings, are correctly entered and match the Google Cloud setup.

    • Confirm that the bucket is configured to send OBJECT_FINALIZE events to the specified Pub/Sub topic.

  • Check Authentication:

    • Verify that the service account JSON key is valid and not expired.

    • Ensure the service account has the required permissions: storage.objects.get, storage.objects.list, and pubsub.subscriptions.consume.

  • Validate Permissions:

    • Confirm that the service account has access to the bucket and Pub/Sub subscription.

    • Check IAM policies in the Google Cloud Console to ensure correct permissions are assigned.

  • Network and Connectivity:

    • Check for firewall rules, proxy settings, or VPC Service Controls that may block access to storage.googleapis.com or pubsub.googleapis.com.

    • Test connectivity using the gcloud CLI or tools like curl with similar proxy configurations to verify access to GCS and Pub/Sub.

  • Common Error Messages:

    • "Inaccessible host": May indicate DNS issues or firewall restrictions. Ensure endpoints are reachable and check DNS settings.

    • "Missing credentials": Verify that the service account JSON key is correctly configured and accessible.

    • "Bucket does not exist": Check the bucket name and ensure there are no certificate validation issues. Consider adding CA certificates if needed.

  • Monitor Logs and Data:

    • Verify that data is being ingested by monitoring Pub/Sub subscription and GCS bucket activity.

    • Use the Analytics tab in the targeted Observo AI pipeline to monitor data volume and ensure expected throughput.

    • Check Observo AI logs for errors or warnings related to data ingestion from the GCS source.

Issue
Possible Cause
Resolution

Data not ingested

Incorrect bucket or Pub/Sub configuration

Verify bucket name and Pub/Sub event notifications

Authentication errors

Invalid or expired credentials

Check service account JSON key and permissions

Connectivity issues

Firewall or proxy blocking access

Test network connectivity and VPC Service Controls

"Inaccessible host"

DNS or firewall issues

Ensure endpoints are reachable and check DNS

"Missing credentials"

Authentication misconfiguration

Verify service account JSON key

"Bucket does not exist"

Incorrect bucket name or certificate issues

Check bucket name and certificate settings

Resources

For additional guidance and detailed information, refer to the following resources:

Last updated

Was this helpful?