Collector

Collector utilizes pull-based mechanism which provides a robust framework and offers advantages in flexibility, reliability, and scalability. In a pull-based setup, the monitoring server sends HTTP requests to the endpoints exposed by the monitored applications. These endpoints return the current metrics, allowing the monitoring system to collect real-time data.

Purpose

The purpose of the Collector source in Observo AI is to enable the platform to actively retrieve data from HTTP/S endpoints, such as Bulk APIs, by polling or making periodic requests to the specified URL. It pulls data (e.g., logs, metrics, or events in formats like JSON, CSV, or plain text) from external systems or applications into Observo AI for analysis and processing. This integration supports streamlined data pipelines, real-time monitoring, and analytics, allowing organizations to enhance observability, security, and data-driven decision-making by proactively fetching data from configured HTTP/S sources.

Prerequisites

Before configuring the Collector source in Observo AI, ensure the following requirements are met to facilitate seamless data ingestion:

  • Observo AI Platform Setup:

    • The Observo AI platform must be installed and operational, with support for the HTTP Collector (Pull) as a data source.

    • Verify that the platform supports common data formats such as JSON, CSV, or plain text. Additional formats may require specific parser configurations.

  • HTTP/S Endpoint Access:

    • An active HTTP/S endpoint such as a Bulk API must be available to send data to Observo AI.

    • Obtain the endpoint URL and any required authentication credentials such as API key, token, or basic auth username/password from the data provider.

  • Authentication:

    • Prepare one of the following authentication methods:

      • API Key: Obtain an API key or token from the data provider for secure access.

      • Basic Authentication: Provide a username and password for HTTP basic auth, if required.

      • Secret Authentication: Use a stored secret within Observo AI's secure storage for credentials.

  • Network and Connectivity:

    • Ensure Observo AI can communicate with the HTTP/S endpoint such as api.example.com.

    • Check for proxy settings, firewall rules, or VPC endpoint configurations that may affect connectivity to the external endpoint.

Prerequisite
Description
Notes

Observo AI Platform

Must be installed and support HTTP Collector (Pull)

Verify support for JSON, CSV, etc.; additional parsers may be needed

HTTP/S Endpoint

Active endpoint for data submission

Obtain URL and credentials from the data provider

Authentication

API Key, Basic Auth, or Secret

Prepare credentials as required by the endpoint

Network

Connectivity to the HTTP/S endpoint

Check VPC endpoints, proxies, and firewalls

Integration

The Integration section outlines the configurations for the Script source. To configure the Script as a source in Observo AI, follow these steps to set up and test the data flow:

  1. Log in to Observo AI:

  • Navigate to the Sources tab.

  • Click the Add Source button and select Create New.

  • Choose Collector from the list of available sources to begin configuration.

  1. General Settings:

  • Name: A unique identifier for the source, such as http-collector.

  • Description: (Optional): Provide a description for the source.

  • Config: (Optional): Config to be passed to the Lua script. This is where you can define environment variables and other parameters. This is passed as a map to the Lua script.

    | Key (Default) | Value (Default) | | ------------------ | --------------- | | API_TOKEN | <API Token> | | BASE_URL | https://api.example.com |

  • Checkpoints: (Optional): Enable checkpoints to track the last successful data collection point. This is useful for incremental data collection. This will be the seed value for the first run.

    Key (Default)
    Value (Default)

    PAGE

    1

    StartTime

    2025-08-01T00:00:00Z

  • Lua Script (Modify as needed): Lua script to transform the data collected by the main script. This is the place to modify the checkpoint value if needed and to transform the data into the expected format.

    function start(config)
    -- local page = get_chkpt("PAGE")
    -- local page_int = tonumber(page)
    -- local query = {_page = page, _limit = config["LIMIT"]},
    fetch {
        params = {
            url = "https://mydomain.com/api/data", -- Example API endpoint
            query = {},
            method = "GET",
            headers = {
                ["Accept"] = "application/json",
            }
        },
        fn = "data",
        retry = false,
        context = {}
    }
    end
    
    -- local json = require('json')
    
    function data(config, response, context)
      if response.status == 200 then
        -- local body = json.decode(response.body)
        emit{log = { ret = response.ret, message = response.body}}
      else
        emit{log={message = "Failed to fetch data", status=response.status}}
      end
    end
  1. Authentication (Optional):

    • Basic Authentication

      • Username: Username for basic authentication.

      • Password: Password for basic authentication.

    • Bearer Authentication

      • ** Auth Token:** Token for bearer authentication.

    • OAuth 2.0

      • Client ID: Client ID for OAuth2 authentication.

      • Client Secret: Client secret for OAuth2 authentication.

      • Token URL: URL to get the OAuth2 token.

      • Scopes (Add as needed): Scopes to request for OAuth2 authentication.

      • Token Refresh Margin(seconds): Time in seconds before token expiry to refresh the token. Default is 60 seconds.

Example Scenarios

TechCorp Digital, a fictional technology consulting company, maintains a public blog API that exposes their latest articles, tutorials, and company updates. The marketing team wants to monitor blog engagement metrics and analyze content performance by ingesting this data into Observo AI. The collector is configured to fetch blog posts in paginated batches, cycling through pages 1-10 to capture recent content for analytics and reporting dashboards.

Standard Collector Setup

Here is a standard Collector Source configuration example. Only the required sections and their associated field updates are displayed in the table below:

General Settings

Field
Value
Description

Name

techcorp-blog-collector

Unique identifier for the Collector source.

Description

Ingest customer activity logs for usage and anomaly monitoring

Optional description of the source's purpose.

Time in seconds to pause between script executions.

60s

Data collection every 60 seconds for near real-time monitoring.

Config

{ "Limit": 10 }

Config that will be used during execution

Checkpoints

{ "Page": 1 }

Seed Checkpoints that will be used during execution and for state maintainence

Lua Script

function start(config)
  local page = get_chkpt("PAGE")
  local page_int = tonumber(page)
  fetch {
      params = {
        url = "https://api.techcorp.com/logs",
        query = {_page = page, _limit = config["LIMIT"]},
        method = "GET",
        headers = {
          ["Accept"] = "application/json"
        }
      },
      fn = "data",
      retry = false,
      context = {page = page_int}
  }
end

local json = require('json')
function data(config, response, context)
  if response.status == 200 then
      local results = {}
      local body_bytes = response.body
      local body = json.decode(body_bytes)

      for i, e in ipairs(body) do
          e.idx = i
          emit{log = e}
      end
      set_chkpt("PAGE",tostring(context.page+1))
  else
      emit{log={message = "Failed to fetch data", status=response.status}}
  end
end
    ```

Last updated

Was this helpful?