Collector
Collector utilizes pull-based mechanism which provides a robust framework and offers advantages in flexibility, reliability, and scalability. In a pull-based setup, the monitoring server sends HTTP requests to the endpoints exposed by the monitored applications. These endpoints return the current metrics, allowing the monitoring system to collect real-time data.
Purpose
The purpose of the Collector source in Observo AI is to enable the platform to actively retrieve data from HTTP/S endpoints, such as Bulk APIs, by polling or making periodic requests to the specified URL. It pulls data (e.g., logs, metrics, or events in formats like JSON, CSV, or plain text) from external systems or applications into Observo AI for analysis and processing. This integration supports streamlined data pipelines, real-time monitoring, and analytics, allowing organizations to enhance observability, security, and data-driven decision-making by proactively fetching data from configured HTTP/S sources.
Prerequisites
Before configuring the Collector source in Observo AI, ensure the following requirements are met to facilitate seamless data ingestion:
Observo AI Platform Setup:
The Observo AI platform must be installed and operational, with support for the HTTP Collector (Pull) as a data source.
Verify that the platform supports common data formats such as JSON, CSV, or plain text. Additional formats may require specific parser configurations.
HTTP/S Endpoint Access:
An active HTTP/S endpoint such as a Bulk API must be available to send data to Observo AI.
Obtain the endpoint URL and any required authentication credentials such as API key, token, or basic auth username/password from the data provider.
Authentication:
Prepare one of the following authentication methods:
API Key: Obtain an API key or token from the data provider for secure access.
Basic Authentication: Provide a username and password for HTTP basic auth, if required.
Secret Authentication: Use a stored secret within Observo AI's secure storage for credentials.
Network and Connectivity:
Ensure Observo AI can communicate with the HTTP/S endpoint such as api.example.com.
Check for proxy settings, firewall rules, or VPC endpoint configurations that may affect connectivity to the external endpoint.
Observo AI Platform
Must be installed and support HTTP Collector (Pull)
Verify support for JSON, CSV, etc.; additional parsers may be needed
HTTP/S Endpoint
Active endpoint for data submission
Obtain URL and credentials from the data provider
Authentication
API Key, Basic Auth, or Secret
Prepare credentials as required by the endpoint
Network
Connectivity to the HTTP/S endpoint
Check VPC endpoints, proxies, and firewalls
Integration
The Integration section outlines the configurations for the Script source. To configure the Script as a source in Observo AI, follow these steps to set up and test the data flow:
Log in to Observo AI:
Navigate to the Sources tab.
Click the Add Source button and select Create New.
Choose Collector from the list of available sources to begin configuration.
General Settings:
Name: A unique identifier for the source, such as http-collector.
Description: (Optional): Provide a description for the source.
Config: (Optional): Config to be passed to the Lua script. This is where you can define environment variables and other parameters. This is passed as a map to the Lua script.
| Key (Default) | Value (Default) | | ------------------ | --------------- | | API_TOKEN | <API Token> | | BASE_URL | https://api.example.com |
Checkpoints: (Optional): Enable checkpoints to track the last successful data collection point. This is useful for incremental data collection. This will be the seed value for the first run.
Key (Default)Value (Default)PAGE
1
StartTime
2025-08-01T00:00:00Z
Lua Script (Modify as needed): Lua script to transform the data collected by the main script. This is the place to modify the checkpoint value if needed and to transform the data into the expected format.
function start(config) -- local page = get_chkpt("PAGE") -- local page_int = tonumber(page) -- local query = {_page = page, _limit = config["LIMIT"]}, fetch { params = { url = "https://mydomain.com/api/data", -- Example API endpoint query = {}, method = "GET", headers = { ["Accept"] = "application/json", } }, fn = "data", retry = false, context = {} } end -- local json = require('json') function data(config, response, context) if response.status == 200 then -- local body = json.decode(response.body) emit{log = { ret = response.ret, message = response.body}} else emit{log={message = "Failed to fetch data", status=response.status}} end end
Authentication (Optional):
Basic Authentication
Username: Username for basic authentication.
Password: Password for basic authentication.
Bearer Authentication
** Auth Token:** Token for bearer authentication.
OAuth 2.0
Client ID: Client ID for OAuth2 authentication.
Client Secret: Client secret for OAuth2 authentication.
Token URL: URL to get the OAuth2 token.
Scopes (Add as needed): Scopes to request for OAuth2 authentication.
Token Refresh Margin(seconds): Time in seconds before token expiry to refresh the token. Default is 60 seconds.
Example Scenarios
TechCorp Digital, a fictional technology consulting company, maintains a public blog API that exposes their latest articles, tutorials, and company updates. The marketing team wants to monitor blog engagement metrics and analyze content performance by ingesting this data into Observo AI. The collector is configured to fetch blog posts in paginated batches, cycling through pages 1-10 to capture recent content for analytics and reporting dashboards.
Standard Collector Setup
Here is a standard Collector Source configuration example. Only the required sections and their associated field updates are displayed in the table below:
General Settings
Name
techcorp-blog-collector
Unique identifier for the Collector source.
Description
Ingest customer activity logs for usage and anomaly monitoring
Optional description of the source's purpose.
Time in seconds to pause between script executions.
60s
Data collection every 60 seconds for near real-time monitoring.
Config
{ "Limit": 10 }
Config that will be used during execution
Checkpoints
{ "Page": 1 }
Seed Checkpoints that will be used during execution and for state maintainence
Lua Script
function start(config)
local page = get_chkpt("PAGE")
local page_int = tonumber(page)
fetch {
params = {
url = "https://api.techcorp.com/logs",
query = {_page = page, _limit = config["LIMIT"]},
method = "GET",
headers = {
["Accept"] = "application/json"
}
},
fn = "data",
retry = false,
context = {page = page_int}
}
end
local json = require('json')
function data(config, response, context)
if response.status == 200 then
local results = {}
local body_bytes = response.body
local body = json.decode(body_bytes)
for i, e in ipairs(body) do
e.idx = i
emit{log = e}
end
set_chkpt("PAGE",tostring(context.page+1))
else
emit{log={message = "Failed to fetch data", status=response.status}}
end
end
```Last updated
Was this helpful?

