Authentication
Base URL: https://api.allium.so/api/v1/beam
All endpoints require an API key passed via the X-API-Key header:
curl -X GET https://api.allium.so/api/v1/beam \
-H "X-API-Key: ${ALLIUM_API_KEY}"
Get your API key from app.allium.so/settings/api-keys.
Configuration endpoints
Create pipeline
Creates a new pipeline configuration.
Request body:
{
"name": "USDC Transfer Monitor",
"description": "Monitors USDC ERC20 transfers on Base",
"tags": ["production", "base"],
"pipeline_config": {
"source": {
"type": "pubsub",
"chain": "base",
"entity": "erc20_token_transfer",
"is_zerolag": false
},
"transforms": [
{
"type": "redis_set_filter",
"filter_expr": "root = this.token_address"
}
],
"sinks": [
{
"type": "kafka",
"name": "usdc-transfers"
}
]
}
}
Response: BeamConfig (with generated id, owner_org_team_user_id, created_at, updated_at)
Filter values are managed separately from the pipeline config. After creating a pipeline, use the filter values endpoints to add values to set filter transforms.
List pipelines
Returns all Beam pipeline configs for the authenticated user.
Response: BeamConfig[]
Get pipeline
GET /api/v1/beam/{config_id}
Returns a single pipeline config by ID.
Response: BeamConfig
Update pipeline
PUT /api/v1/beam/{config_id}
Updates an existing pipeline configuration. After updating, deploy again to apply changes.
Request body: Same as Create pipeline.
Delete pipeline
DELETE /api/v1/beam/{config_id}
Deletes the pipeline configuration and tears down any active deployment (Kafka topics, workers, subscriptions).
Deployment endpoints
Deploy pipeline
POST /api/v1/beam/{config_id}/deploy
Deploys the pipeline to start processing data. Creates Kafka topics, provisions credentials, and spins up workers.
Deployment is idempotent — call it again after updating a config to apply changes with zero downtime. Do not teardown before redeploying.
Response: Returns deployment status and sink connection details (Kafka credentials, topic names, consumer code snippets).
Teardown pipeline
POST /api/v1/beam/{config_id}/teardown
Removes the deployed infrastructure (workers, Kafka topics, subscriptions). Use this only when you want to fully stop the pipeline. The config is preserved.
Get deployment stats
GET /api/v1/beam/{config_id}/deploy/stats
Returns worker health information for the deployment.
Response:
{
"workers_health": {
"total_workers": 3,
"healthy_workers": 3,
"unhealthy_workers": 0,
"crashing_workers": 0,
"oom_killed_workers": 0,
"workers": [
{
"name": "abc123",
"status": "Running",
"is_ready": true,
"is_crash_looping": false,
"is_oom_killed": false,
"restart_count": 0,
"start_time": "2025-01-15T10:30:00Z"
}
]
}
}
| Field | Description |
|---|
total_workers | Number of worker pods |
healthy_workers | Workers running normally |
unhealthy_workers | Workers not ready |
crashing_workers | Workers in crash loop |
oom_killed_workers | Workers killed due to memory |
restart_count | High restart counts indicate instability |
Get deployment metrics
GET /api/v1/beam/{config_id}/deploy/metrics
Returns throughput, latency, and error rate metrics for the deployment.
Query parameters:
| Parameter | Default | Description |
|---|
time_range | 1h | Time range: 1h, 1d, or 1w |
aggregated | false | true returns a single summary, false returns time-series data points |
metrics | all | Filter to specific metric types (comma-separated) |
Available metric types: messages_received, messages_sent, total_latency, processing_latency, processor_error, output_error, data_age
Time-series (default)
Aggregated
{
"config_id": "abc123",
"metrics": [
{
"metric_type": "messages_received",
"points": [
{ "timestamp": "2026-03-24T07:21:40Z", "value": 0.0 },
{ "timestamp": "2026-03-24T07:22:40Z", "value": 15.3 },
{ "timestamp": "2026-03-24T07:23:40Z", "value": 22.1 }
]
},
{
"metric_type": "messages_sent",
"points": [ ]
}
]
}
{
"pipeline_id": "abc123",
"time_range": "1h",
"messages_received_per_min": 411.98,
"messages_sent_per_min": 411.98,
"total_latency": {
"p50": 222.55,
"p90": 266.46,
"p99": 266.46,
"unit": "ms"
},
"processing_latency": {
"p50": 1.21,
"p90": 2.26,
"p99": 2.26,
"unit": "ms"
},
"processor_error_rate": 0.0,
"output_error_rate": 0.0,
"avg_data_age_ms": 2043.49
}
Update transforms on a deployed pipeline. Changes are synced and the pipeline is automatically redeployed.
PATCH /api/v1/beam/{config_id}/transforms/{transform_uid}
Updates one transform by its UID. Automatically redeploys the pipeline.
Request body:
V8 JavaScript
Redis set filter
{
"type": "v8",
"script": "function process(data) { return data; }"
}
{
"type": "redis_set_filter",
"filter_expr": "root = this.address"
}
Response: The updated transform object.
PUT /api/v1/beam/{config_id}/transforms
Replaces the entire transforms list. Handles add, remove, update, and reorder in a single call. Automatically redeploys the pipeline.
Request body:
[
{ "type": "v8", "script": "function process(d) { return d; }" },
{ "type": "redis_set_filter", "filter_expr": "root = this.from_address" }
]
Response: Array of updated transform objects.
The transforms list cannot be empty.
Filter values endpoints
Manage the values in a set filter transform. Filter values are stored separately from the pipeline config, supporting 10M+ values per filter. Changes take effect immediately — no redeploy needed.
Each endpoint requires config_id (pipeline ID) and transform_uid (the set filter transform’s unique identifier).
Use lowercase values when filtering by addresses, labels, or symbols — this is how values are normalized in the system.
List filter values
GET /api/v1/beam/{config_id}/transforms/{transform_uid}/filter-values
Returns a paginated list of values in the filter.
Query parameters:
| Parameter | Default | Description |
|---|
cursor | null | Pagination cursor from a previous response |
count | 100 | Number of values per page (1-10,000) |
Response:
{
"values": ["0x3c499c542cef5e3811e1192ce70d8cc03d5c3359", "0xc2132d05d31c914a87c6611c10748aeb04b58e8f"],
"cursor": 42,
"total_count": 1500
}
| Field | Description |
|---|
values | Array of values in the current page |
cursor | Cursor for the next page, null if no more pages |
total_count | Total number of values in the filter |
Add filter values
POST /api/v1/beam/{config_id}/transforms/{transform_uid}/filter-values
Adds values to the filter. Duplicates are ignored.
Request body:
{
"values": ["0x3c499c542cef5e3811e1192ce70d8cc03d5c3359", "0xc2132d05d31c914a87c6611c10748aeb04b58e8f"]
}
Response:
Replace filter values
PUT /api/v1/beam/{config_id}/transforms/{transform_uid}/filter-values
Atomically replaces all values in the filter. An empty list clears the filter entirely.
Request body:
{
"values": ["0x3c499c542cef5e3811e1192ce70d8cc03d5c3359"]
}
Response:
Remove filter values
DELETE /api/v1/beam/{config_id}/transforms/{transform_uid}/filter-values
Removes specific values from the filter.
Request body:
{
"values": ["0x3c499c542cef5e3811e1192ce70d8cc03d5c3359"]
}
Response:
Check filter value membership
GET /api/v1/beam/{config_id}/transforms/{transform_uid}/filter-values/check
Check whether a value exists in the filter.
Query parameters:
| Parameter | Required | Description |
|---|
value | Yes | The value to check |
Response:
{ "value": "0x3c499c542cef5e3811e1192ce70d8cc03d5c3359", "is_member": true }
Count filter values
GET /api/v1/beam/{config_id}/transforms/{transform_uid}/filter-values/count
Returns the total number of values in the filter.
Response:
Data models
BeamConfig
| Field | Description |
|---|
id | Auto-generated pipeline ID |
name | Pipeline name |
description | Pipeline description |
tags | Array of string tags for organizing pipelines |
owner_org_team_user_id | Owner organization/team/user ID |
created_at | Creation timestamp |
updated_at | Last update timestamp |
pipeline_config | Pipeline configuration (source, transforms, sinks) |
See Configuration reference for full details on the pipeline_config schema (source, transforms, and sinks).
Supported chains and entities
| Chain | Source | Entities |
|---|
| Polygon | PubSub | log, decoded_log, erc20_token_transfer, erc721_token_transfer, erc1155_token_transfer |
| Base | PubSub | log, decoded_log, erc20_token_transfer, erc721_token_transfer, erc1155_token_transfer |
| Solana | PubSub | nonvoting_transaction |
| Hyperliquid | Kafka | block, trade, fill, order, misc_event |
More chains and entities are being added regularly. Check the Datastreams catalog for the latest availability, or contact support@allium.so to request support for a specific chain.