Skip to main content
A Beam pipeline has three components that data flows through in order:
source → transforms[] → sinks[]

Source

Beam sources connect to Allium’s Datastreams. Select a blockchain and entity type to stream.
{
  "chain": "polygon",
  "entity": "log",
  "is_zerolag": false
}
FieldRequiredDescription
chainYesBlockchain to source data from
entityYesEntity type to stream (see below)
is_zerolagNoEnable tip-of-chain mode to stream data from the absolute tip of the blockchain, before finality. Lower latency but may include data from reorged blocks. Default: false.
Check the Datastreams catalog for supported chains and entities, or contact support@allium.so to request a specific chain or entity.

Transforms

Two transform types are available. You can chain multiple transforms together — data flows through them in order.

Set filter

Filter data by matching field values against a set. Only records whose extracted field value exists in your set pass through. Sets can hold 10M+ values.
{
  "type": "redis_set_filter",
  "filter_expr": "root = this.address"
}
FieldDescription
filter_exprBloblang expression to extract the field value for filtering
Filter values are managed separately from the pipeline config — add, remove, replace, and browse values through the UI or the filter values API. This decouples value management from pipeline configuration and allows sets to scale to millions of entries.
When filtering by addresses, labels, or symbols, use lowercase values. This is how these values are normalized in our system.
Bloblang filter_expr examples:
root = this.address        # Filter by contract address
root = this.topic0         # Filter by event signature
root = this.from_address   # Filter by sender address
root = this.to_address     # Filter by recipient address
Log entity field reference:
FieldDescription
addressContract address that emitted the log
topic0First topic (usually the event signature hash)
topic1Second topic (first indexed parameter)
topic2Third topic (second indexed parameter)
topic3Fourth topic (third indexed parameter)
from_addressTransaction sender
to_addressTransaction recipient
dataNon-indexed event data

Managing filter values

You can manage filter values through the UI or the filter values API. In the UI, click Edit on a set filter node to open the filter set editor.
Beam filter set editor with paginated values
From there you can:
  • Browse values with paginated navigation (100 per page)
  • Add individual values via the text input
  • Remove values with the trash icon
  • Check membership by entering a value and seeing an instant result indicator
  • Bulk upload from .txt or .csv files with add, remove, or replace modes
Filter value membership check with green indicator
Bulk upload options: add to set, remove from set, replace entire set
Filter values persist across pipeline deploys and teardowns. They are only deleted when the transform or pipeline config is deleted.

JavaScript (v8)

Transform data using JavaScript. Your function receives each record and can modify, enrich, or reshape it. Return null to drop a record.
{
  "type": "v8",
  "script": "function transform(record) { record.parsed = true; return record; }"
}
FieldDescription
scriptJavaScript code that processes each record
Example — add a transfer size tag:
function transform(record) {
  const amount = parseFloat(record.amount || "0");
  if (amount > 1000000) {
    record.size_tag = "whale";
  } else if (amount > 10000) {
    record.size_tag = "medium";
  } else {
    record.size_tag = "small";
  }
  return record;
}

Sinks

Sinks define where your processed data is delivered.
{
  "type": "kafka",
  "name": "my-output-topic"
}
FieldDescription
typeOutput type: kafka, sns, or external_kafka (see below)
nameTopic name suffix for the output (for external_kafka, this is the topic name on your cluster)
Kafka sinks — Topic naming: beam.{config_id}.{name}. After deployment, you receive connection credentials and consumer code snippets in Python and TypeScript. See our Kafka integration guide for more on consuming Kafka data. SNS sinks — Topic naming: beam-{config_id}-{name}. See our SNS integration guide for details on consuming SNS data.

External Kafka sinks

Deliver data to your own Kafka cluster instead of Allium-managed infrastructure.
Beam editor with external Kafka sink
External Kafka configuration fields
{
  "type": "external_kafka",
  "name": "my-external-topic",
  "bootstrap_servers": "broker1.example.com:9092,broker2.example.com:9092",
  "username_secret_id": "my-kafka-username",
  "password_secret_id": "my-kafka-password",
  "auth_method": "SCRAM-SHA-512"
}
FieldRequiredDescription
typeYesMust be external_kafka
nameYesKafka topic name on your cluster
bootstrap_serversYesComma-separated list of broker addresses
username_secret_idYesName of the organization secret containing the SASL username
password_secret_idYesName of the organization secret containing the SASL password
auth_methodNoSASL authentication mechanism: PLAIN (default) or SCRAM-SHA-512
Authentication uses SASL_SSL with either PLAIN (default) or SCRAM-SHA-512 mechanism.
If a referenced secret is updated after the pipeline was last deployed, Beam will flag the sink as stale. You must redeploy the pipeline for the new secret value to take effect.
More sinks coming soon: Pub/Sub, ClickHouse, Postgres, GCS, NATS, RabbitMQ. Contact support@allium.so if you need a specific sink type.

Best practices

  1. Filter early — apply set filters before JavaScript transforms to reduce data volume
  2. Start simple — begin with basic filtering, add transforms incrementally
  3. Monitor after deploy — always check worker health after deployment
  4. Test transforms — validate JavaScript scripts handle edge cases (missing fields, unexpected types)
  5. Use descriptive names — clear pipeline and sink names help track multiple pipelines
  6. Lowercase addresses — always use lowercase when filtering by addresses, labels, or symbols
  7. Redeploy, don’t teardown — when updating a pipeline, just redeploy. It’s idempotent with zero downtime.
  8. Manage filter values separately — use the UI or filter values API to add/remove values without touching the pipeline config

Troubleshooting

Unhealthy workers

If workers show as unhealthy:
  1. Check that the pipeline config is valid (source chain/entity exists, filter expressions are syntactically correct)
  2. Verify source data is available on the selected chain and entity
  3. Update the config and redeploy (no teardown needed)

OOM killed workers

If workers are being killed for out-of-memory:
  1. Simplify JavaScript transforms to reduce memory usage
  2. Add more aggressive filtering earlier in the pipeline to reduce data volume
  3. Contact support@allium.so for resource limit adjustments

Crashing workers

If workers are in a crash loop:
  1. Check JavaScript scripts for syntax errors
  2. Verify filter expressions are valid Bloblang
  3. Review transform logic for runtime errors (e.g., accessing undefined properties)