Skip to main content

Authentication

Base URL: https://api.allium.so/api/v1/beam All endpoints require an API key passed via the X-API-Key header:
curl -X GET https://api.allium.so/api/v1/beam \
  -H "X-API-Key: ${ALLIUM_API_KEY}"
Get your API key from app.allium.so/settings/api-keys.

Configuration endpoints

Create pipeline

POST /api/v1/beam
Creates a new pipeline configuration. Request body:
{
  "name": "USDC Transfer Monitor",
  "description": "Monitors USDC ERC20 transfers on Base",
  "tags": ["production", "base"],
  "pipeline_config": {
    "source": {
      "type": "pubsub",
      "chain": "base",
      "entity": "erc20_token_transfer",
      "is_zerolag": false
    },
    "transforms": [
      {
        "type": "redis_set_filter",
        "filter_expr": "root = this.token_address"
      }
    ],
    "sinks": [
      {
        "type": "kafka",
        "name": "usdc-transfers"
      }
    ]
  }
}
Response: BeamConfig (with generated id, owner_org_team_user_id, created_at, updated_at)
Filter values are managed separately from the pipeline config. After creating a pipeline, use the filter values endpoints to add values to set filter transforms.

List pipelines

GET /api/v1/beam
Returns all Beam pipeline configs for the authenticated user. Response: BeamConfig[]

Get pipeline

GET /api/v1/beam/{config_id}
Returns a single pipeline config by ID. Response: BeamConfig

Update pipeline

PUT /api/v1/beam/{config_id}
Updates an existing pipeline configuration. After updating, deploy again to apply changes. Request body: Same as Create pipeline.

Delete pipeline

DELETE /api/v1/beam/{config_id}
Deletes the pipeline configuration and tears down any active deployment (Kafka topics, workers, subscriptions).

Deployment endpoints

Deploy pipeline

POST /api/v1/beam/{config_id}/deploy
Deploys the pipeline to start processing data. Creates Kafka topics, provisions credentials, and spins up workers. Deployment is idempotent — call it again after updating a config to apply changes with zero downtime. Do not teardown before redeploying. Response: Returns deployment status and sink connection details (Kafka credentials, topic names, consumer code snippets).

Teardown pipeline

POST /api/v1/beam/{config_id}/teardown
Removes the deployed infrastructure (workers, Kafka topics, subscriptions). Use this only when you want to fully stop the pipeline. The config is preserved.

Get deployment stats

GET /api/v1/beam/{config_id}/deploy/stats
Returns worker health information for the deployment. Response:
{
  "workers_health": {
    "total_workers": 3,
    "healthy_workers": 3,
    "unhealthy_workers": 0,
    "crashing_workers": 0,
    "oom_killed_workers": 0,
    "workers": [
      {
        "name": "abc123",
        "status": "Running",
        "is_ready": true,
        "is_crash_looping": false,
        "is_oom_killed": false,
        "restart_count": 0,
        "start_time": "2025-01-15T10:30:00Z"
      }
    ]
  }
}
FieldDescription
total_workersNumber of worker pods
healthy_workersWorkers running normally
unhealthy_workersWorkers not ready
crashing_workersWorkers in crash loop
oom_killed_workersWorkers killed due to memory
restart_countHigh restart counts indicate instability

Get deployment metrics

GET /api/v1/beam/{config_id}/deploy/metrics
Returns throughput, latency, and error rate metrics for the deployment. Query parameters:
ParameterDefaultDescription
time_range1hTime range: 1h, 1d, or 1w
aggregatedfalsetrue returns a single summary, false returns time-series data points
metricsallFilter to specific metric types (comma-separated)
Available metric types: messages_received, messages_sent, total_latency, processing_latency, processor_error, output_error, data_age
{
  "config_id": "abc123",
  "metrics": [
    {
      "metric_type": "messages_received",
      "points": [
        { "timestamp": "2026-03-24T07:21:40Z", "value": 0.0 },
        { "timestamp": "2026-03-24T07:22:40Z", "value": 15.3 },
        { "timestamp": "2026-03-24T07:23:40Z", "value": 22.1 }
      ]
    },
    {
      "metric_type": "messages_sent",
      "points": [ ]
    }
  ]
}

Transform endpoints

Update transforms on a deployed pipeline. Changes are synced and the pipeline is automatically redeployed.

Update a single transform

PATCH /api/v1/beam/{config_id}/transforms/{transform_uid}
Updates one transform by its UID. Automatically redeploys the pipeline. Request body:
{
  "type": "v8",
  "script": "function process(data) { return data; }"
}
Response: The updated transform object.

Replace all transforms

PUT /api/v1/beam/{config_id}/transforms
Replaces the entire transforms list. Handles add, remove, update, and reorder in a single call. Automatically redeploys the pipeline. Request body:
[
  { "type": "v8", "script": "function process(d) { return d; }" },
  { "type": "redis_set_filter", "filter_expr": "root = this.from_address" }
]
Response: Array of updated transform objects.
The transforms list cannot be empty.

Filter values endpoints

Manage the values in a set filter transform. Filter values are stored separately from the pipeline config, supporting 10M+ values per filter. Changes take effect immediately — no redeploy needed. Each endpoint requires config_id (pipeline ID) and transform_uid (the set filter transform’s unique identifier).
Use lowercase values when filtering by addresses, labels, or symbols — this is how values are normalized in the system.

List filter values

GET /api/v1/beam/{config_id}/transforms/{transform_uid}/filter-values
Returns a paginated list of values in the filter. Query parameters:
ParameterDefaultDescription
cursornullPagination cursor from a previous response
count100Number of values per page (1-10,000)
Response:
{
  "values": ["0x3c499c542cef5e3811e1192ce70d8cc03d5c3359", "0xc2132d05d31c914a87c6611c10748aeb04b58e8f"],
  "cursor": 42,
  "total_count": 1500
}
FieldDescription
valuesArray of values in the current page
cursorCursor for the next page, null if no more pages
total_countTotal number of values in the filter

Add filter values

POST /api/v1/beam/{config_id}/transforms/{transform_uid}/filter-values
Adds values to the filter. Duplicates are ignored. Request body:
{
  "values": ["0x3c499c542cef5e3811e1192ce70d8cc03d5c3359", "0xc2132d05d31c914a87c6611c10748aeb04b58e8f"]
}
Response:
{ "added": 2 }

Replace filter values

PUT /api/v1/beam/{config_id}/transforms/{transform_uid}/filter-values
Atomically replaces all values in the filter. An empty list clears the filter entirely. Request body:
{
  "values": ["0x3c499c542cef5e3811e1192ce70d8cc03d5c3359"]
}
Response:
{ "count": 1 }

Remove filter values

DELETE /api/v1/beam/{config_id}/transforms/{transform_uid}/filter-values
Removes specific values from the filter. Request body:
{
  "values": ["0x3c499c542cef5e3811e1192ce70d8cc03d5c3359"]
}
Response:
{ "removed": 1 }

Check filter value membership

GET /api/v1/beam/{config_id}/transforms/{transform_uid}/filter-values/check
Check whether a value exists in the filter. Query parameters:
ParameterRequiredDescription
valueYesThe value to check
Response:
{ "value": "0x3c499c542cef5e3811e1192ce70d8cc03d5c3359", "is_member": true }

Count filter values

GET /api/v1/beam/{config_id}/transforms/{transform_uid}/filter-values/count
Returns the total number of values in the filter. Response:
{ "count": 1500 }

Data models

BeamConfig

FieldDescription
idAuto-generated pipeline ID
namePipeline name
descriptionPipeline description
tagsArray of string tags for organizing pipelines
owner_org_team_user_idOwner organization/team/user ID
created_atCreation timestamp
updated_atLast update timestamp
pipeline_configPipeline configuration (source, transforms, sinks)
See Configuration reference for full details on the pipeline_config schema (source, transforms, and sinks).

Supported chains and entities

ChainSourceEntities
PolygonPubSublog, decoded_log, erc20_token_transfer, erc721_token_transfer, erc1155_token_transfer
BasePubSublog, decoded_log, erc20_token_transfer, erc721_token_transfer, erc1155_token_transfer
SolanaPubSubnonvoting_transaction
HyperliquidKafkablock, trade, fill, order, misc_event
More chains and entities are being added regularly. Check the Datastreams catalog for the latest availability, or contact support@allium.so to request support for a specific chain.