A Beam pipeline has three components that data flows through in order:
source → transforms[] → sinks[]
Source
Beam sources connect to Allium’s Datastreams. Select a blockchain and entity type to stream.
{
"chain": "polygon",
"entity": "log",
"is_zerolag": false
}
| Field | Required | Description |
|---|
chain | Yes | Blockchain to source data from |
entity | Yes | Entity type to stream (see below) |
is_zerolag | No | Enable tip-of-chain mode to stream data from the absolute tip of the blockchain, before finality. Lower latency but may include data from reorged blocks. Default: false. |
Check the Datastreams catalog for supported chains and entities, or contact support@allium.so to request a specific chain or entity.
Two transform types are available. You can chain multiple transforms together — data flows through them in order.
Set filter
Filter data by matching field values against a set. Only records whose extracted field value exists in your set pass through. Sets can hold 10M+ values.
{
"type": "redis_set_filter",
"filter_expr": "root = this.address"
}
| Field | Description |
|---|
filter_expr | Bloblang expression to extract the field value for filtering |
Filter values are managed separately from the pipeline config — add, remove, replace, and browse values through the UI or the filter values API. This decouples value management from pipeline configuration and allows sets to scale to millions of entries.
When filtering by addresses, labels, or symbols, use lowercase values. This is how these values are normalized in our system.
Bloblang filter_expr examples:
root = this.address # Filter by contract address
root = this.topic0 # Filter by event signature
root = this.from_address # Filter by sender address
root = this.to_address # Filter by recipient address
Log entity field reference:
| Field | Description |
|---|
address | Contract address that emitted the log |
topic0 | First topic (usually the event signature hash) |
topic1 | Second topic (first indexed parameter) |
topic2 | Third topic (second indexed parameter) |
topic3 | Fourth topic (third indexed parameter) |
from_address | Transaction sender |
to_address | Transaction recipient |
data | Non-indexed event data |
Managing filter values
You can manage filter values through the UI or the filter values API.
In the UI, click Edit on a set filter node to open the filter set editor.
From there you can:
- Browse values with paginated navigation (100 per page)
- Add individual values via the text input
- Remove values with the trash icon
- Check membership by entering a value and seeing an instant result indicator
- Bulk upload from
.txt or .csv files with add, remove, or replace modes
Filter values persist across pipeline deploys and teardowns. They are only deleted when the transform or pipeline config is deleted.
JavaScript (v8)
Transform data using JavaScript. Your function receives each record and can modify, enrich, or reshape it. Return null to drop a record.
{
"type": "v8",
"script": "function transform(record) { record.parsed = true; return record; }"
}
| Field | Description |
|---|
script | JavaScript code that processes each record |
Example — add a transfer size tag:
function transform(record) {
const amount = parseFloat(record.amount || "0");
if (amount > 1000000) {
record.size_tag = "whale";
} else if (amount > 10000) {
record.size_tag = "medium";
} else {
record.size_tag = "small";
}
return record;
}
Sinks
Sinks define where your processed data is delivered.
{
"type": "kafka",
"name": "my-output-topic"
}
| Field | Description |
|---|
type | Output type: kafka, sns, or external_kafka (see below) |
name | Topic name suffix for the output (for external_kafka, this is the topic name on your cluster) |
Kafka sinks — Topic naming: beam.{config_id}.{name}. After deployment, you receive connection credentials and consumer code snippets in Python and TypeScript. See our Kafka integration guide for more on consuming Kafka data.
SNS sinks — Topic naming: beam-{config_id}-{name}. See our SNS integration guide for details on consuming SNS data.
External Kafka sinks
Deliver data to your own Kafka cluster instead of Allium-managed infrastructure.
{
"type": "external_kafka",
"name": "my-external-topic",
"bootstrap_servers": "broker1.example.com:9092,broker2.example.com:9092",
"username_secret_id": "my-kafka-username",
"password_secret_id": "my-kafka-password",
"auth_method": "SCRAM-SHA-512"
}
| Field | Required | Description |
|---|
type | Yes | Must be external_kafka |
name | Yes | Kafka topic name on your cluster |
bootstrap_servers | Yes | Comma-separated list of broker addresses |
username_secret_id | Yes | Name of the organization secret containing the SASL username |
password_secret_id | Yes | Name of the organization secret containing the SASL password |
auth_method | No | SASL authentication mechanism: PLAIN (default) or SCRAM-SHA-512 |
Authentication uses SASL_SSL with either PLAIN (default) or SCRAM-SHA-512 mechanism.
If a referenced secret is updated after the pipeline was last deployed, Beam will flag the sink as stale. You must redeploy the pipeline for the new secret value to take effect.
More sinks coming soon: Pub/Sub, ClickHouse, Postgres, GCS, NATS, RabbitMQ. Contact support@allium.so if you need a specific sink type.
Best practices
- Filter early — apply set filters before JavaScript transforms to reduce data volume
- Start simple — begin with basic filtering, add transforms incrementally
- Monitor after deploy — always check worker health after deployment
- Test transforms — validate JavaScript scripts handle edge cases (missing fields, unexpected types)
- Use descriptive names — clear pipeline and sink names help track multiple pipelines
- Lowercase addresses — always use lowercase when filtering by addresses, labels, or symbols
- Redeploy, don’t teardown — when updating a pipeline, just redeploy. It’s idempotent with zero downtime.
- Manage filter values separately — use the UI or filter values API to add/remove values without touching the pipeline config
Troubleshooting
Unhealthy workers
If workers show as unhealthy:
- Check that the pipeline config is valid (source chain/entity exists, filter expressions are syntactically correct)
- Verify source data is available on the selected chain and entity
- Update the config and redeploy (no teardown needed)
OOM killed workers
If workers are being killed for out-of-memory:
- Simplify JavaScript transforms to reduce memory usage
- Add more aggressive filtering earlier in the pipeline to reduce data volume
- Contact support@allium.so for resource limit adjustments
Crashing workers
If workers are in a crash loop:
- Check JavaScript scripts for syntax errors
- Verify filter expressions are valid Bloblang
- Review transform logic for runtime errors (e.g., accessing undefined properties)