GCP PubSub
Observo is capable of data record ingestion from Google Cloud Pub/Sub, which is a managed real-time messaging service designed for seamless message exchange between applications.
Purpose
The Observo AI GCP Pub/Sub source enables real-time ingestion of data streams from Google Cloud Pub/Sub into the Observo AI platform for processing and analysis. It supports scalable, secure message consumption from Pub/Sub topics, facilitating observability and analytics for event-driven data. This integration allows organizations to monitor and derive insights from high-throughput, real-time data published to Google Cloud Pub/Sub.
Prerequisites
Before configuring the GCP Pub/Sub source in Observo AI, ensure the following requirements are met to facilitate seamless data ingestion:
Observo AI Platform Setup:
The Observo AI Site must be installed and operational.
Verify expected data formats, such as JSON or text, from Pub/Sub messages.
Google Cloud Project:
An active Google Cloud project with Pub/Sub enabled (Set Up Pub/Sub).
A Pub/Sub topic and subscription created in the project to send and receive messages (Create a Topic and Subscription).
Note the project ID, topic ID, and subscription ID. Use fully-qualified names (e.g., projects/my-project-id/topics/my-topic-id) if the topic/subscription resides in a different project than the credentials’ origin.
Service Account and Permissions:
A Google Cloud service account with appropriate roles for Pub/Sub access, such as “Pub/Sub Subscriber” (roles/pubsub.subscriber) or higher (e.g., “Pub/Sub Editor” for creating topics/subscriptions) (Google Cloud Access Control).
Download the service account credentials file (JSON key) or configure environment variables (PUBSUB_PROJECT and PUBSUB_CREDENTIALS) (Service Account Authentication).
Required permissions:
For reading: pubsub.subscriptions.consume, pubsub.topics.get.
For creating topics/subscriptions (if enabled): pubsub.topics.create, pubsub.subscriptions.create.
Network and Connectivity:
Ensure Observo AI can communicate with Google Cloud Pub/Sub APIs over HTTPS (port 443).
If using VPC Service Controls or firewalls, configure them to allow access to Pub/Sub endpoints (VPC Service Controls).
Authentication:
In order to read data from GCP Pub/Sub, you need to set up a secret file for authentication first. Here are the steps to follow:
Create and download a service account json key with proper permission to the Pub/Sub subscription to ingest data from. Follow the guideline here to get more details.
Navigate to Settings>Files>Add Files option from the home page of Observo Manager. Provide a Filename and a short Description in the window. Upload/paste the secret key here in the window and save
Observo AI Platform
Must support GCP Pub/Sub
Verify data format compatibility
Google Cloud Project
Active project with Pub/Sub
Create topic and subscription
Service Account
Credentials with Pub/Sub roles
Use JSON key or environment variables
Permissions
Read or create access
pubsub.subscriptions.consume, etc.
Network
HTTPS connectivity
Allow port 443, configure VPC if needed
Integration
The Integration section outlines default configurations. To configure GCP Pub/Sub as a source in Observo AI, follow these steps to set up and test the data flow:
Log in to Observo AI:
Navigate to Sources Tab
Click on “Add Sources” button and select “Create New”
Choose “GCP Pub/Sub” from the list of available destinations to begin configuration.
General Settings:
Name: Add a unique identifier such as pubsub-source-1
Description (Optional): Add description
Project: The GCP Project Name from which logs should be retrieved.
Examplesmy-proj-id
projects/my-project-id
Subscription: The GCP Pub/Sub Subscription name within the mentioned GCP Project that is set up to receive logs.
Examplesmy-subscription-id
projects/my-project-id/subscriptions/my-subscription-id
Credentials Path: The path to a JSON file containing service account credentials. You can specify either an API key or the path to a JSON file with service account credentials. For more information on gcp_service_account_credentials, refer to: https://cloud.google.com/docs/authentication/production#manually.
Example/my/path/credentials.json
Decoding: Select the codec to use for decoding events. Default: Bytes
OptionsBytes
JSON
Native
Native JSON
Syslog
Proxy:
Enable proxy (Enabled): Enables proxying support
HTTP proxy endpoint: Proxy endpoint to use when proxying HTTP traffic.
Examplehttp://foo.bar:3128HTTPS proxy endpoint: Proxy endpoint to use when proxying HTTPS traffic.
Host list to disable proxying: A list of hosts to avoid proxying. Multiple patterns are permissible:
Domain names: For instance, "example.com" matches requests directed at example.com.
Wildcard domains: ".example.com" includes requests to example.com and its subdomains.
IP addresses: "127.0.0.1" corresponds to requests targeting 127.0.0.1.
CIDR blocks: "192.168.0.0/16" matches requests to any IP addresses within this specified range.
Splat: "*" encompasses all hosts, matching requests to any host.
Framing (Optional):
Framing Delimiter (Empty): The character that delimits byte sequences.
Framing Max Length (None): The maximum length of the byte buffer.
Framing Method (Empty): The framing method.
OptionsDescriptionByte Frames
Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments).
Character Delimited
Byte frames which are delimited by a chosen character.
Length Delimited
Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length.
Newline Delimited
Byte frames which are delimited by a newline character.
Octet Counting
Byte frames according to the octet counting format.
Framing Newline Delimited Max Length: (None)
Framing Octet Counting Max Length: (None)
TLS Configuration (Optional):
TLS CA File: The CA certificate provided as an inline string in PEM format.
Example/etc/certs/ca.crt
TLS Crt File: The certificate as a string in PEM format.
Example/etc/certs/tls.crt
TLS Key File: Absolute path to a private key file used to identify this server. The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.
Example/etc/certs/tls.key
TLS Key Pass: Passphrase used to unlock the encrypted key file. This has no effect unless key_file is set.
Examples${KEY_PASS_ENV_VAR}
PassWord1
TLS Verify Hostname (False): Enables hostname verification. Hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension. Only relevant for outgoing connections. NOT recommended to set this to false unless you understand the risks.
TLS Verify Certificate (False): Enables certificate verification. Certificates must be valid in terms of not being expired, and being issued by a trusted issuer. This verification operates in a hierarchical manner, checking validity of the certificate, the issuer of that certificate and so on until reaching a root certificate. Relevant for both incoming and outgoing connections. Do NOT set this to false unless you understand the risks of not verifying the validity of certificates.
Advanced Settings:
API key (Optional): Either an API key or a path to a service account credentials JSON file can be specified. If both are unset, the GOOGLE_APPLICATION_CREDENTIALS environment variable is checked for a filename. If no filename is named, an attempt is made to fetch an instance service account for the compute instance the program is running on. If this is not on a GCE instance, then you must define it with an API key or service account credentials JSON file.
Acknowledgement deadline: The acknowledgement deadline, in seconds, to use for this stream. Default: 600
Endpoint: The endpoint from which to pull data.
Examplehttps://pubsub.googleapis.comFull response size: The number of messages in a response to mark a stream as “busy”. This is used to determine if more streams should be started. Default: 100
Keepalive Seconds: The amount of time, in seconds, with no received activity before sending a keepalive request. If this is set larger than 60, you may see periodic errors sent from the server. Default: 60
Max concurrency: The maximum number of concurrent stream connections to open at once. Default: 10
Poll time: How often to poll the currently active streams to see if they are all busy and so open a new stream. Default: 30
Retry delay: The amount of time, in seconds, to wait between retry attempts after an error. Default: None
Parser Config:
Enable Source Log parser: (False)
Toggle Enable Source Log parser Switch to enable
Select appropriate Parser from the Source Log Parser dropdown
Add additional Parsers as needed
Pattern Extractor:
See Pattern Extractor for details.
Archival Destination:
Toggle Enable Archival on Source Switch to enable
Under Archival Destination, select from the list of Archival Destinations (Required)
Save and Test Configuration:
Save the configuration settings.
Verify that data is being ingested from the Pub/Sub subscription.
Example Scenarios
CloudInsights Corp, a fictitious company specializing in real-time event monitoring, aims to integrate Observo AI with Google Cloud Pub/Sub to ingest security event data from a Pub/Sub subscription for processing and analysis. The Pub/Sub topic and subscription are set up in a Google Cloud project, and a service account with appropriate permissions is configured. The Observo AI instance operates behind a corporate proxy, requiring specific proxy settings, and uses a service account JSON key for authentication. The data is expected in JSON format.
Standard GCP PubSub Source Setup
Here is a standard GCP PubSub Source configuration example. Only the required sections and their associated field updates are displayed in the table below:
General Settings
Description
Ingests security event data from GCP Pub/Sub for CloudInsights' real-time monitoring.
Provides context for the source’s purpose.
Project
projects/cloudinsights-prod
Fully-qualified Google Cloud project name hosting the Pub/Sub topic and subscription.
Subscription
projects/cloudinsights-prod/subscriptions/security-events-sub
Fully-qualified subscription name configured to receive security event messages.
Credentials Path
/etc/observo/credentials/cloudinsights-sa.json
Path to the service account credentials JSON file for Pub/Sub authentication.
Decoding
JSON
Specifies JSON as the codec for decoding Pub/Sub messages.
Proxy
Enable Proxy
True
Enables proxy support for HTTP/HTTPS traffic.
HTTPS Proxy Endpoint
https://proxy.cloudinsights.local:3128
Proxy endpoint for HTTPS traffic to Google Cloud Pub/Sub APIs.
Host List to Disable Proxying
*.internal.cloudinsights.local
List of hosts to bypass the proxy, ensuring internal traffic is not proxied.
Test Configuration:
Save settings, verify data ingestion from the security-events-sub subscription using Observo’s Analytics tab.
Saves configuration, tests data flow, and confirms messages are ingested correctly.
Notes:
Ensure the service account associated with /etc/observo/credentials/cloudinsights-sa.json has the pubsub.subscriptions.consume and pubsub.topics.get permissions for the cloudinsights-prod project.
Verify the Pub/Sub topic and subscription (projects/cloudinsights-prod/subscriptions/security-events-sub) exist and are receiving messages in JSON format.
Confirm HTTPS connectivity (port 443) to Google Cloud Pub/Sub APIs (pubsub.googleapis.com) through the proxy
https://proxy.cloudinsights.local:3128.Monitor Observo’s Logs tab and Google Cloud’s Pub/Sub monitoring tools to verify message ingestion and troubleshoot errors like “Permission denied” (check service account roles) or “Topic not found” (verify topic/subscription IDs).
Ensure the proxy configuration allows access to Pub/Sub endpoints and excludes internal hosts as specified.
This configuration enables CloudInsights Corp to ingest security event data from Google Cloud Pub/Sub into Observo AI for real-time processing and analysis.
Troubleshooting
If issues arise with the GCP Pub/Sub source in Observo AI, use the following steps to diagnose and resolve them:
Verify Configuration Settings:
Ensure Topic ID and Subscription ID are correct, using fully-qualified names (e.g., projects/my-project-id/topics/my-topic-id) if credentials originate from a different project.
Confirm that the topic and subscription exist in Google Cloud Pub/Sub (Check Pub/Sub Resources).
Check Authentication:
For Auto authentication, verify that PUBSUB_PROJECT and PUBSUB_CREDENTIALS environment variables are set correctly.
For Manual authentication, ensure the service account JSON key is valid and not expired.
For Secret authentication, confirm the secret is accessible in Observo AI’s secrets manager or external KMS.
Verify the service account has required permissions (pubsub.subscriptions.consume, pubsub.topics.get) (Google Cloud Access Control).
Monitor Logs:
Check Observo AI’s Logs tab for errors or warnings related to data ingestion.
Use Google Cloud’s Pub/Sub monitoring tools to verify message publishing and subscription activity (Monitor Pub/Sub).
Validate Connectivity:
Ensure Observo AI can reach Pub/Sub APIs over HTTPS (port 443).
If using VPC Service Controls or firewalls, confirm access to Pub/Sub endpoints (VPC Service Controls).
Common Error Messages:
“Permission denied”: Indicates insufficient permissions. Verify the service account has pubsub.subscriptions.consume and, if creating resources, pubsub.topics.create or pubsub.subscriptions.create.
“Topic or subscription not found”: Check Topic ID and Subscription ID accuracy and existence in the project.
“No data ingested”: Confirm messages are being published to the topic and the subscription is active. Check Ordered Delivery settings if enabled.
Test Data Flow:
Capture real-time events and verify ingestion.
Use the Analytics tab in the targeted Observo AI pipeline to monitor data volume and ensure expected throughput
Data not ingested
Incorrect Topic/Subscription ID
Verify IDs and existence in project
Permission denied
Insufficient service account roles
Add pubsub.subscriptions.consume role
“Topic not found”
Incorrect or non-existent topic
Check Topic ID and create if needed
Connectivity issues
Firewall or VPC restrictions
Allow HTTPS on port 443, verify VPC settings
No data in subscription
No messages published
Confirm messages are sent to topic
Resources
For additional guidance and detailed information, refer to the following resources:
Last updated
Was this helpful?

