Introduction

PyPI Telegram GitHub

Cherry is a python library for building blockchain data pipelines.

It is designed to make building production-ready blockchain data pipelines easy.

Getting Started

See getting started section of the docs.

Features

  • Pure python library. Don't need yaml, SQL, toml etc.
  • High-level datasets API and flexible pipeline API.
  • High-performance, low-cost and uniform data access. Ability to use advanced providers without platform lock-in.
  • Included functionality to decode, validate, transform blockchain data. All implemented in rust for performance.
  • Write transformations using polars, pyarrow, datafusion, pandas, duckdb or any other pyarrow compatible library.
  • Schema inference automatically creates output tables.
  • Keep datasets fresh with continuous ingestion.
  • Parallelized, next batch of data is being fetched while your pre-processing function is running, while the database writes are being executed in parallel. Don't need to hand optimize anything.
  • Included library of transformations.
  • Included functionality to implement crash-resistance.

Data providers

ProviderEthereum (EVM)Solana (SVM)
HyperSync
SQD
Yellowstone-GRPC

Supported output formats

  • ClickHouse
  • Iceberg
  • Deltalake
  • DuckDB
  • Arrow Datasets
  • Parquet

Examples

See examples on github.

And getting started section.

License

Licensed under either of

  • Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
  • MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)

at your option.

Sponsors

Getting Started

This section explains how to install and use cherry.

Installation

Cherry is published to PyPI as cherry-etl.

Core tooling that is used with cherry is published as cherry-core.

You can add it to your python project like this:

pip install cherry-etl cherry-core

Or if you are using uv:

uv add cherry-etl cherry-core

Import it in your python scripts like this:

import cherry_etl
import cherry_core

Using datasets

Easiest way to start using cherry is to use the datasets feature. datasets make creating pipelines easy.

There are datasets for both svm and evm under the cherry_etl.datasets module

from cherry_core import ingest
from cherry_etl import config as cc
from cherry_etl import datasets
from cherry_etl.pipeline import run_pipeline
import asyncio
import duckdb

# create in-memory duckdb database
db = duckdb.connect()

async def sync_data():
    # configure a data provider
    # See Providers section
    provider = ingest.ProviderConfig(
        kind=provider_kind,
        url=provider_url,
    )

    # write data to duckdb
    writer = cc.Writer(
        kind=cc.WriterKind.DUCKDB,
        config=cc.DuckdbWriterConfig(
            connection=db.cursor(),
        ),
    )

    # Create the pipeline using the blocks dataset
    pipeline = datasets.evm.blocks(provider, writer, 18123123, 18123200)

    # Run the pipeline
    await run_pipeline(pipeline_name="blocks", pipeline=pipeline)

asyncio.run(sync_data())

data = db.sql("SELECT * FROM blocks LIMIT 20")
print(data)

Here is the full example

Other datasets examples

Writing custom pipelines

Cherry is able to do much more than just the datasets API.

Read the writing custom pipelines section.

Data Providers

Everything in cherry works the same regardless of which data provider you choose.

You can change the provider by just changing provider_kind and provider_url arguments when constructing a ProviderConfig object.

from cherry_core import ingest

provider = ingest.ProviderConfig(
    kind=my_provider_kind,
    url=my_provider_url,
)

Check out the specific section of the provider you want to use to get more info.

Data availability

ProviderEthereum (EVM)Solana (SVM)
HyperSync
SQD
Yellowstone-GRPC

Full ProviderConfig

class ProviderKind(str, Enum):
    SQD = "sqd"
    HYPERSYNC = "hypersync"
    YELLOWSTONE_GRPC = "yellowstone_grpc"

@dataclass
class ProviderConfig:
    kind: ProviderKind
    url: Optional[str] = None
    bearer_token: Optional[str] = None
    max_num_retries: Optional[int] = None
    retry_backoff_ms: Optional[int] = None
    retry_base_ms: Optional[int] = None
    retry_ceiling_ms: Optional[int] = None
    req_timeout_millis: Optional[int] = None
    stop_on_head: bool = False
    head_poll_interval_millis: Optional[int] = None
    buffer_size: Optional[int] = None

HyperSync

HyperSync is a purpose-built, high-performance data retrieval layer that gives developers unprecedented access to blockchain data. Built from the ground up in Rust, HyperSync serves as an alternative to traditional JSON-RPC endpoints, offering dramatically faster queries and more flexible data access patterns.

HyperSync currently supports 70+ EVM chains.

Read more about it at the offical documentation.

Example ProviderConfig

from cherry_core import ingest

provider = ingest.ProviderConfig(
    kind=ingest.ProviderKind.HYPERSYNC,
    bearer_token="<Your Hypersync API Token>",
    url="https://eth.hypersync.xyz",
)

You can get an url for a desired chain from the supported networks page.

Getting API token

HyperSync is currently free and unlimited but it is transitioning into API Tokens.

It is recommended to get an API token and use it with your queries.

SQD

SQD Network is a decentralized query engine optimized for batch extraction of large volumes of data. It currently serves historical on-chain data ingested from 100+ EVM and Substrate networks, as well as Solana (in beta), Tron, Starknet and Fuel. The data is comprehensive: for example, on EVM it includes event logs, transaction receipts, traces and per-transaction state diffs.

SQD currently supports 100+ EVM Chains

SQD has beta support for Solana.

Warning: Data starts from block slot 317617480 when using solana with sqd.

Read more at the official documentation.

Example ProviderConfig

from cherry_core import ingest

provider = ingest.ProviderConfig(
    kind=ingest.ProviderKind.HYPERSYNC,
    url="https://portal.sqd.dev/datasets/solana-beta",
)

You can change solana-beta with any dataset name from the supported networks page.

For example you can use https://portal.sqd.dev/datasets/arbitrum-nova to get arbitrum-nova data.

Deploying portals

Access to SQD portal API is free but slightly limited at the moment.

You can deploy a portal to have unlimited access. You can contact us in telegram for help with deploying and maintaining portal instances.

Yellowstone-GRPC

Support for yellostone-grpc is still partial. We recommend using sqd instead.

You can read more about yellowstone_grpc here and here.

Example ProviderConfig

from cherry_core import ingest

provider = ingest.ProviderConfig(
    kind=ingest.ProviderKind.YELLOWSTONE_GRPC,
    bearer_token="<Your X-Token>",
    url="<URL to a Yellowstone GRPC endpoint>",
)

Writers

Writers control how data is written to the output after it is ingested from the provider and processed by the configured steps.

Supported output formats

  • ClickHouse
  • Iceberg
  • Deltalake
  • DuckDB
  • Arrow Datasets
  • Parquet

Full Writer configuration API

class WriterKind(str, Enum):
    CLICKHOUSE = "clickhouse"
    ICEBERG = "iceberg"
    DELTA_LAKE = "delta_lake"
    PYARROW_DATASET = "pyarrow_dataset"
    DUCKDB = "duckdb"

@dataclass
class Writer:
    kind: WriterKind
    config: (
        ClickHouseWriterConfig
        | IcebergWriterConfig
        | DeltaLakeWriterConfig
        | PyArrowDatasetWriterConfig
        | DuckdbWriterConfig
    )

See specific writer pages to learn more about your writer of choice.

If you don't have a choice, we highly recommend duckdb for development and testing as it is very solid and easy to use.

ClickHouse

Config

@dataclass
class ClickHouseWriterConfig:
    client: ClickHouseClient
    codec: Dict[str, Dict[str, str]] = field(default_factory=dict)
    order_by: Dict[str, List[str]] = field(default_factory=dict)
    engine: str = "MergeTree()"
    skip_index: Dict[str, List[ClickHouseSkipIndex]] = field(default_factory=dict)
    anchor_table: Optional[str] = None

Dict[str, _] fields generally mean config per table name for example codec["my_table"]["my_column"] would give the codec to use for my_column column of my_table table.

Example

from cherry_etl import config as cc

clickhouse_client = await clickhouse_connect.get_async_client(
    host=os.environ.get("CLICKHOUSE_HOST", "localhost"),
    port=int(os.environ.get("CLICKHOUSE_PORT", "8123")),
    username=os.environ.get("CLICKHOUSE_USER", "default"),
    password=os.environ.get("CLICKHOUSE_PASSWORD", "clickhouse"),
    database=os.environ.get("CLICKHOUSE_DATABASE", "blockchain"),
)

writer = cc.Writer(
    kind=cc.WriterKind.CLICKHOUSE,
    config=cc.ClickHouseWriterConfig(
        client=clickhouse_client,
        order_by={"transfers": ["block_number"]},
        codec={"transfers": {"data": "ZSTD"}},
        skip_index={
            "transfers": [
                cc.ClickHouseSkipIndex(
                    name="log_addr_idx",
                    val="address",
                    type_="bloom_filter(0.01)",
                    granularity=1,
                ),
                cc.ClickHouseSkipIndex(
                    name="from_addr_idx",
                    val="from",
                    type_="bloom_filter(0.01)",
                    granularity=1,
                ),
                cc.ClickHouseSkipIndex(
                    name="to_addr_idx",
                    val="to",
                    type_="bloom_filter(0.01)",
                    granularity=1,
                ),
            ]
        },
    ),
)

Anchor table

All tables are written in parallel but anchor table is written seperately so it can be used to implement crash-resistance.

DeltaLake

Config

@dataclass
class DeltaLakeWriterConfig:
    data_uri: str
    partition_by: Dict[str, list[str]] = field(default_factory=dict)
    storage_options: Optional[Dict[str, str]] = None
    writer_properties: Optional[deltalake.WriterProperties] = None
    anchor_table: Optional[str] = None

Dict[str, _] fields generally mean config per table name for example partition_by["my_table"] would give list of columns to partition the my_table table by.

Example

data_uri = "./data"

writer = cc.Writer(
    kind=cc.WriterKind.DELTA_LAKE,
    config=cc.DeltaLakeWriterConfig(
        data_uri=data_uri,
    ),
)

Anchor table

All tables are written in parallel but anchor table is written seperately so it can be used to implement crash-resistance.

DuckDB

We highly recommend duckdb for development and testing as it is very solid and easy to use.

Read their python docs

Config

@dataclass
class DuckdbWriterConfig:
    connection: duckdb.DuckDBPyConnection

Warning: Do not pass a duckdb.DuckDBPyConnection directly if you are using it while the pipeline is running. It is not thread safe. use connection.cursor() to create a cursor and pass that instead. `

Example

# create an in-memory duckdb database
connection = duckdb.connect()

writer = cc.Writer(
    kind=cc.WriterKind.DUCKDB,
    config=cc.DuckdbWriterConfig(
        connection=connection.cursor(),
    ),
)

Iceberg

Config

from pyiceberg.catalog import Catalog as IcebergCatalog

@dataclass
class IcebergWriterConfig:
    namespace: str
    catalog: IcebergCatalog
    write_location: str

See pyiceberg docs.

Example

from cherry_etl import config as cc
from pyiceberg.catalog.sql import SqlCatalog

catalog = SqlCatalog(
    name="cherry",
    uri="postgresql+psycopg2://postgres:postgres@localhost/iceberg",
    warehouse="s3://blockchain-data",
    **{
        "s3.endpoint": "http://localhost:9000",
        "s3.access-key-id": "minioadmin",
        "s3.secret-access-key": "minioadmin",
    },
)

writer = cc.Writer(
    kind=cc.WriterKind.ICEBERG,
    config=cc.IcebergWriterConfig(
        namespace="my_namespace",
        catalog=catalog,
        write_location="s3://blockchain-data/",
    ),
)

PyArrowDataset(Parquet)

See the documentation to learn more about arrow datasets.

This is the writer you want if you just want a directory of parquet files.

Config

@dataclass
class PyArrowDatasetWriterConfig:
    base_dir: str
    basename_template: Optional[str] = None
    partitioning: Dict[str, pa_dataset.Partitioning | list[str]] = field(
        default_factory=dict
    )
    partitioning_flavor: Dict[str, str] = field(default_factory=dict)
    filesystem: Optional[pa_fs.FileSystem] = None
    file_options: Optional[pa_dataset.FileWriteOptions] = None
    use_threads: bool = True
    max_partitions: int = 1024
    max_open_files: int = 1024
    max_rows_per_file: int = 0
    min_rows_per_group: int = 0
    max_rows_per_group: int = 1024 * 1024
    create_dir: bool = True
    anchor_table: Optional[str] = None

See pyarrow docs for more explanation about the parameters.

Example

from cherry_eth import config as cc

base_dir = "./data"

writer = cc.Writer(
    kind=cc.WriterKind.PYARROW_DATASET,
    config=cc.PyArrowDatasetWriterConfig(
        base_dir=base_dir,
    ),
)

Datasets

Datasets are pre-built, high-level abstractions that make it easy to extract common blockchain data. They are implemented as helper functions that construct complete Pipeline objects with predefined schemas and transformations.

Available Datasets

Ethereum (EVM) Datasets

DatasetDescriptionUse Cases
BlocksExtract block headers and metadataBlock analysis, network statistics, gas analysis
Address AppearancesTrack all address appearances in tracesContract interactions, address relationships, contract creation tracking
All ContractsInformation about all contractsContract deployment analysis

Solana (SVM) Datasets

DatasetDescriptionUse Cases
Token BalancesTrack token account balancesToken holdings, transfers, token program analysis

Usage Pattern

All datasets follow a similar usage pattern:

from cherry_etl import datasets
from cherry_etl.pipeline import run_pipeline

# Create a pipeline using a dataset
pipeline = datasets.evm.blocks(  # or any other dataset
    provider=provider,
    writer=writer,
    from_block=18123123,  # or from_slot for Solana
    to_block=18123200     # or to_slot for Solana
)

# Run the pipeline
await run_pipeline(pipeline_name="dataset_name", pipeline=pipeline)

Common Features

All datasets share these common features:

  • Predefined Schemas: Each dataset has a well-defined output schema
  • Optimized Performance: Leverages Rust-based core components
  • Parallel Processing: Data ingestion and processing happen in parallel
  • Crash Resistance: Built-in support for crash recovery
  • Continuous Ingestion: Can keep datasets fresh with continuous updates

Data Providers

Datasets work with any supported data provider:

  • EVM Chains: HyperSync, SQD
  • Solana: SQD (beta), Yellowstone-GRPC

Output Formats

Datasets can write to any supported output format:

  • ClickHouse
  • Iceberg
  • Deltalake
  • DuckDB
  • Arrow Datasets
  • Parquet

Writing Custom Datasets

While the built-in datasets cover common use cases, you can also create custom datasets by:

  1. Defining your schema
  2. Creating transformation steps
  3. Building a pipeline configuration

See the Writing Custom Pipelines section for more details.

Notes

  • Datasets are inspired by cryo
  • Each dataset is optimized for its specific use case
  • Datasets handle all the complexity of data extraction and transformation
  • You can combine datasets with custom pipeline steps for advanced use cases

Ethereum (EVM)

Address Appearances Dataset

The address appearances dataset tracks all occurrences of addresses in transaction traces, including their relationships to the transactions. This is useful for analyzing address interactions, contract creations, and address relationships in the blockchain.

Usage

from cherry_etl import datasets
from cherry_etl.pipeline import run_pipeline

# Create a pipeline for address appearances
pipeline = datasets.evm.address_appearances(
    provider=provider,
    writer=writer,
    from_block=18123123,  # Starting block number
    to_block=18123200     # Ending block number
)

# Run the pipeline
await run_pipeline(pipeline_name="address_appearances", pipeline=pipeline)

Output Schema

FieldTypeDescription
block_numberuint64Block number where the address appeared
block_hashbinaryHash of the block
transaction_hashbinaryHash of the transaction
addressbinaryThe address that appeared
relationshipstringThe relationship of the address to the trace. Possible values:
- call_from: Address that initiated the call
- call_to: Address that received the call
- factory: Contract factory address
- suicide: Address that was self-destructed
- suicide_refund: Address that received refund from self-destruct
- author: Address that authored the block
- create: Address that was created

Example Queries

Find All Contract Creations by an Address

SELECT 
    block_number,
    transaction_hash,
    address as created_contract
FROM address_appearances
WHERE relationship = 'create'
    AND address = '0x...';  -- Replace with the address you're interested in

Get Address Interaction Timeline

SELECT 
    block_number,
    transaction_hash,
    relationship,
    address
FROM address_appearances
WHERE address = '0x...'  -- Replace with the address you're interested in
ORDER BY block_number, transaction_hash;

Count Address Relationships

SELECT 
    relationship,
    COUNT(*) as count
FROM address_appearances
WHERE address = '0x...'  -- Replace with the address you're interested in
GROUP BY relationship;

Notes

  • The dataset requires trace data from the blockchain
  • The dataset captures all address appearances in transaction traces, including:
    • Direct calls between contracts
    • Contract creations
    • Self-destruct operations
    • Block author addresses
  • This dataset is particularly useful for:
    • Tracking contract interactions
    • Analyzing address relationships
    • Monitoring contract creation patterns
    • Understanding address behavior in the blockchain

Blocks Dataset

The blocks dataset allows you to extract block headers from any EVM chain. This is one of the most fundamental datasets for blockchain analysis.

Usage

from cherry_etl import datasets
from cherry_etl.pipeline import run_pipeline

# Create a pipeline for blocks
pipeline = datasets.evm.blocks(
    provider=provider,
    writer=writer,
    from_block=18123123,  # Starting block number
    to_block=18123200     # Ending block number
)

# Run the pipeline
await run_pipeline(pipeline_name="blocks", pipeline=pipeline)

Output Schema

FieldTypeDescription
numberuint64Block number
hashstringBlock hash
parent_hashstringHash of the parent block
noncestringBlock nonce
logs_bloomstringBloom filter for logs
transactions_rootstringMerkle root of transactions
state_rootstringMerkle root of state
receipts_rootstringMerkle root of receipts
minerstringAddress of the block miner
difficultyDecimal128(38, 0)Block difficulty
total_difficultyDecimal128(38, 0)Cumulative difficulty
extra_datastringExtra data field
sizeDecimal128(38, 0)Block size in bytes
gas_limitDecimal128(38, 0)Gas limit for the block
gas_usedDecimal128(38, 0)Gas used in the block
timestampDecimal128(38, 0)Block timestamp
uncleslist(binary)List of uncle block hashes
base_fee_per_gasDecimal128(38, 0)Base fee per gas (EIP-1559)
withdrawals_rootstringRoot hash of withdrawals (Shanghai upgrade)

Example Queries

Get Latest Block

SELECT * FROM blocks ORDER BY number DESC LIMIT 1;

Get Blocks in a Time Range

SELECT * FROM blocks 
WHERE timestamp BETWEEN 1704067200 AND 1704153600;  -- Last 24 hours

Get Block Statistics

SELECT 
    MIN(number) as first_block,
    MAX(number) as last_block,
    AVG(gas_used) as avg_gas_used,
    AVG(size) as avg_block_size
FROM blocks;

Notes

  • The dataset includes all fields from EVM block header
  • Timestamps are in Unix timestamp format
  • The base_fee_per_gas field is only available for blocks after the London hard fork (EIP-1559)
  • The withdrawals_root field is only available for blocks after the Shanghai upgrade

All Contracts Dataset

The all_contracts dataset track all contracts ever created on any EVM chains, including their creation details. This dataset is essential for analyzing contract deployments.

Usage

from cherry_etl import datasets
from cherry_etl.pipeline import run_pipeline

# Create a pipeline for all contracts
pipeline = datasets.evm.all_contracts(
    provider=provider,
    writer=writer,
    from_block=18123123,  # Starting block number
    to_block=18123200     # Ending block number
)

# Run the pipeline
await run_pipeline(pipeline_name="all_contracts", pipeline=pipeline)

Output Schema

FieldTypeDescription
block_numberuint64Block number where the contract was created
transaction_hashbinaryHash of the transaction that created the contract
addressbinaryThe contract's address

Example Queries

Find Contracts Created by a Specific Address

SELECT 
    block_number,
    transaction_hash,
    address as contract_address
FROM all_contracts
ORDER BY block_number DESC
LIMIT 10;

Get Contract Creation Statistics

SELECT 
    COUNT(*) as contracts_created,
FROM all_contracts

Notes

  • This dataset captures all contract creations on any EVM chain
  • The dataset includes both contract creation transactions and contract creation through internal transactions
  • This dataset is particularly useful for:
    • Tracking contract deployments
    • Analyzing contract creation patterns
    • Identifying contract factories
    • Monitoring contract ecosystem growth

Solana (SVM)

Token Balances Dataset

The token balances dataset tracks token account balances on the Solana blockchain. This dataset is essential for analyzing token holdings, transfers, and token-related activities on Solana.

Starting from where you left off

Since the pipeline is created purely in python at runtime. It is possible to set from_block parameter programatically.

For example when using duckdb, you can create a function like this to get last written block:

def get_start_block(con: duckdb.DuckDBPyConnection) -> int:
    try:
        res = con.sql("SELECT MAX(block_number) from my_table").fetchone()
        if res is not None:
            return int(res[0])
        else:
            return 0
    except Exception:
        print(f"failed to get start block from db: {traceback.format_exc()}")
        return 0

Then you can use it like this when creating the pipeline object:

pipeline = datasets.evm.blocks(
    provider,
    writer,
    from_block=get_start_block(duckdb_connection),
    to_block
)

Crash resistance

Some writers like duckdb are naturally crash-resistant because writes run inside a transaction.

Other writers are naturally crash-resistant as long as there is only one output table. If there are multiple output tables, you can utilize the anchor_table parameter to implement crash-resistance.

Output tables are written in parallel for performance but the anchor_table is always written last. So you can prune the non-anchor tables at startup from the output database and start from max_block of anchor_table (see restarting from where you left off section)

Logging

Python code uses the standard logging module of python, so it can be configured according to python docs.

Set RUST_LOG environment variable according to env_logger docs in order to see logs from rust modules.

To run an example with trace level logging for rust modules:

RUST_LOG=trace uv run examples/path/to/my/example

Writing Custom Pipelines

There are three components of a pipeline.

  • First we get the data from the Provider.
  • The data we get from the provider depends on the query we use.
  • Then we run the steps one by one on the incoming data to transform it.
  • The transformed data is written to the output using the writer in the end.

We already covered Provider and Writer in the previous pages so we will cover writing queries and steps in this section.

Example

Here is a simple example pipeline that ingests and decodes erc20 transfers into duckdb: source code.

Query

The query is used to specify the data we get from the provider.

It consists of:

  • from_block, the block to start from
  • to_block (optional), the block to stop at (inclusive).
  • request fields
  • field selection

Request Fields

The request fields are named after tables, e.g. logs

from cherry_core import ingest

query = ingest.Query(
    kind=ingest.QueryKind.EVM,
    params=ingest.evm.Query(
        from_block=from_block,
        to_block=to_block,
        logs=[
            ingest.evm.LogRequest(
                topic0=[cherry_core.evm_signature_to_topic0(dex.event_signature)],
                include_blocks=True,
                include_transactions=True,
            )
        ],
        fields=request_fields,
    ),
)

This example creates an EVM query looking for logs matching a single LogRequest.

include_ fields determine if related data should be included. For example, include_blocks=True means the response will contain the blocks table with the blocks containing the matched logs.

Log Filtering

The core filtering behaviors are:

  1. Between multiple LogRequest objects: When multiple LogRequest objects are provided, logs matching ANY of them will be returned (OR logic).

  2. Between different fields in the same LogRequest: When multiple fields are specified (e.g., address AND topic0), logs matching ALL criteria will be returned (AND logic).

  3. Within a single field: When multiple values are provided for one field (e.g., multiple addresses in the address field), logs matching ANY of those values will be returned (OR logic).

OR Mode: Separate Log Requests

Use separate log requests to match events that satisfy ANY of the conditions:

logs=[
    # Will match USDC Transfer OR WETH Approval
    LogRequest(address=[USDC], topic0=[TRANSFER_TOPIC]),
    LogRequest(address=[WETH], topic0=[APPROVAL_TOPIC])
]

Results:

┌─────────────┬──────────┐
│ address     ┆ topic0   │
╞═════════════╪══════════╡
│ USDC        ┆ Transfer │  # Only USDC Transfers
│ WETH        ┆ Approval │  # Only WETH Approvals
└─────────────┴──────────┘

CROSS JOIN: Multiple Values in Single Request

Put multiple values in a single request to match ALL combinations:

logs=[
    LogRequest(
        address=[USDC, WETH],
        topic0=[TRANSFER_TOPIC, APPROVAL_TOPIC]
    )
]

Results:

┌─────────────┬──────────┐
│ address     ┆ topic0   │
╞═════════════╪══════════╡
│ WETH        ┆ Approval │  # All possible combinations
│ WETH        ┆ Transfer │  # of addresses and topics
│ USDC        ┆ Approval │  # will be matched
│ USDC        ┆ Transfer │
└─────────────┴──────────┘

Common Use Cases

  1. Track specific events from multiple contracts:

    logs=[
        LogRequest(
            address=[CONTRACT_A, CONTRACT_B, CONTRACT_C],
            topic0=[SPECIFIC_EVENT_TOPIC]
        )
    ]
    
  2. Track multiple events from a specific contract:

    logs=[
        LogRequest(
            address=[SPECIFIC_CONTRACT],
            topic0=[EVENT_1_TOPIC, EVENT_2_TOPIC, EVENT_3_TOPIC]
        )
    ]
    
  3. Track specific event combinations:

    logs=[
        LogRequest(address=[CONTRACT_A], topic0=[EVENT_X_TOPIC]),
        LogRequest(address=[CONTRACT_B], topic0=[EVENT_Y_TOPIC])
    ]
    

Filtering by Indexed Parameters (topic1, topic2, etc.)

For EVM events, indexed parameters are stored in the topics array (topic1, topic2, etc.). You can filter by these parameters to get more specific results:

# Filter Transfer events where "from" address is specific
from_address = "0x000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"  # Padded to 32 bytes
transfer_topic0 = cherry_core.evm_signature_to_topic0("Transfer(address,address,uint256)")

query = ingest.Query(
    kind=ingest.QueryKind.EVM,
    params=ingest.evm.Query(
        from_block=20123123,
        to_block=20123223,
        logs=[
            ingest.evm.LogRequest(
                topic0=[transfer_topic0],
                topic1=[from_address],  # Filter by 'from' address (first indexed parameter)
                include_blocks=True,
            )
        ],
        fields=ingest.evm.Fields(
            # Fields definition...
        ),
    ),
)

Important notes about indexed parameters:

  1. Topic values should be padded to 32 bytes (for addresses, prepend with zeros)
  2. For addresses, the correct format is: "0x000000000000000000000000" + address[2:]
  3. Parameter ordering follows the indexed parameters in the event signature
  4. Non-indexed parameters are encoded in the data field and can't be filtered directly

Field Selection

Query.fields specifies which columns of each table we want to select.

request_fields = ingest.evm.Fields(
    block=ingest.evm.BlockFields(number=True, timestamp=True),
    transaction=ingest.evm.TransactionFields(
        block_number=True, transaction_index=True, hash=True, from_=True, to=True
    ),
    log=ingest.evm.LogFields(
        block_number=True,
        transaction_index=True,
        log_index=True,
        address=True,
        data=True,
        topic0=True,
        topic1=True,
        topic2=True,
        topic3=True,
    ),
)

This means we want to get blocks.number, blocks.timestamp, transactions.block_number ... fields in the response.

Minimizing the fields we request can make a massive difference in performance and response body sizes, both can drastically effect cost of running infrastructure.

Modern providers like SQD and HyperSync fully utilize this selection so it makes a big difference.

Data Schema

Each chain format has it's own arrow schema. All providers return data according to this schema.

Ethereum (EVM)

All providers return data according to this schema when queried using ingest.evm.Query.

source code.

blocks

Field NameData TypeNullable
numberUInt64Yes
hashBinaryYes
parent_hashBinaryYes
nonceBinaryYes
sha3_unclesBinaryYes
logs_bloomBinaryYes
transactions_rootBinaryYes
state_rootBinaryYes
receipts_rootBinaryYes
minerBinaryYes
difficultyDecimal256(76, 0)Yes
total_difficultyDecimal256(76, 0)Yes
extra_dataBinaryYes
sizeDecimal256(76, 0)Yes
gas_limitDecimal256(76, 0)Yes
gas_usedDecimal256(76, 0)Yes
timestampDecimal256(76, 0)Yes
unclesList(Binary)Yes
base_fee_per_gasDecimal256(76, 0)Yes
blob_gas_usedDecimal256(76, 0)Yes
excess_blob_gasDecimal256(76, 0)Yes
parent_beacon_block_rootBinaryYes
withdrawals_rootBinaryYes
withdrawalsList(Struct(index: UInt64, validator_index: UInt64, address: Binary, amount: Decimal256(76, 0)))Yes
l1_block_numberUInt64Yes
send_countDecimal256(76, 0)Yes
send_rootBinaryYes
mix_hashBinaryYes

transactions

Field NameData TypeNullable
block_hashBinaryYes
block_numberUInt64Yes
fromBinaryYes
gasDecimal256(76, 0)Yes
gas_priceDecimal256(76, 0)Yes
hashBinaryYes
inputBinaryYes
nonceDecimal256(76, 0)Yes
toBinaryYes
transaction_indexUInt64Yes
valueDecimal256(76, 0)Yes
vUInt8Yes
rBinaryYes
sBinaryYes
max_priority_fee_per_gasDecimal256(76, 0)Yes
max_fee_per_gasDecimal256(76, 0)Yes
chain_idDecimal256(76, 0)Yes
cumulative_gas_usedDecimal256(76, 0)Yes
effective_gas_priceDecimal256(76, 0)Yes
gas_usedDecimal256(76, 0)Yes
contract_addressBinaryYes
logs_bloomBinaryYes
typeUInt8Yes
rootBinaryYes
statusUInt8Yes
sighashBinaryYes
y_parityBooleanYes
access_listList(Struct(address: Binary, storage_keys: List(Binary)))Yes
l1_feeDecimal256(76, 0)Yes
l1_gas_priceDecimal256(76, 0)Yes
l1_gas_usedDecimal256(76, 0)Yes
l1_fee_scalarDecimal256(76, 0)Yes
gas_used_for_l1Decimal256(76, 0)Yes
max_fee_per_blob_gasDecimal256(76, 0)Yes
blob_versioned_hashesList(Binary)Yes
deposit_nonceDecimal256(76, 0)Yes
blob_gas_priceDecimal256(76, 0)Yes
deposit_receipt_versionDecimal256(76, 0)Yes
blob_gas_usedDecimal256(76, 0)Yes
l1_base_fee_scalarDecimal256(76, 0)Yes
l1_blob_base_feeDecimal256(76, 0)Yes
l1_blob_base_fee_scalarDecimal256(76, 0)Yes
l1_block_numberUInt64Yes
mintDecimal256(76, 0)Yes
source_hashBinaryYes

logs

Field NameData TypeNullable
removedBooleanYes
log_indexUInt64Yes
transaction_indexUInt64Yes
transaction_hashBinaryYes
block_hashBinaryYes
block_numberUInt64Yes
addressBinaryYes
dataBinaryYes
topic0BinaryYes
topic1BinaryYes
topic2BinaryYes
topic3BinaryYes

traces

Field NameData TypeNullable
fromBinaryYes
toBinaryYes
call_typeUtf8Yes
gasDecimal256(76, 0)Yes
inputBinaryYes
initBinaryYes
valueDecimal256(76, 0)Yes
authorBinaryYes
reward_typeUtf8Yes
block_hashBinaryYes
block_numberUInt64Yes
addressBinaryYes
codeBinaryYes
gas_usedDecimal256(76, 0)Yes
outputBinaryYes
subtracesUInt64Yes
trace_addressList(UInt64)Yes
transaction_hashBinaryYes
transaction_positionUInt64Yes
typeUtf8Yes
errorUtf8Yes
sighashBinaryYes
action_addressBinaryYes
balanceDecimal256(76, 0)Yes
refund_addressBinaryYes

Solana (SVM)

All providers return data according to this schema when queried using ingest.svm.Query.

source code.

blocks

Field NameData TypeNullable
slotUInt64Yes
hashBinaryYes
parent_slotUInt64Yes
parent_hashBinaryYes
heightUInt64Yes
timestampInt64Yes

rewards

Field NameData TypeNullable
block_slotUInt64Yes
block_hashBinaryYes
pubkeyBinaryYes
lamportsInt64Yes
post_balanceUInt64Yes
reward_typeUtf8Yes
commissionUInt8Yes

token_balances

Field NameData TypeNullable
block_slotUInt64Yes
block_hashBinaryYes
transaction_indexUInt32Yes
accountBinaryYes
pre_mintBinaryYes
post_mintBinaryYes
pre_decimalsUInt16Yes
post_decimalsUInt16Yes
pre_program_idBinaryYes
post_program_idBinaryYes
pre_ownerBinaryYes
post_ownerBinaryYes
pre_amountUInt64Yes
post_amountUInt64Yes

balances

Field NameData TypeNullable
block_slotUInt64Yes
block_hashBinaryYes
transaction_indexUInt32Yes
accountBinaryYes
preUInt64Yes
postUInt64Yes

logs

Field NameData TypeNullable
block_slotUInt64Yes
block_hashBinaryYes
transaction_indexUInt32Yes
log_indexUInt32Yes
instruction_addressList(UInt32)Yes
program_idBinaryYes
kindUtf8Yes
messageUtf8Yes

transactions

Field NameData TypeNullable
block_slotUInt64Yes
block_hashBinaryYes
transaction_indexUInt32Yes
signatureBinaryYes
versionInt8Yes
account_keysList(Binary)Yes
address_table_lookupsList(Struct(account_key: Binary, writable_indexes: List(UInt64), readonly_indexes: List(UInt64)))Yes
num_readonly_signed_accountsUInt32Yes
num_readonly_unsigned_accountsUInt32Yes
num_required_signaturesUInt32Yes
recent_blockhashBinaryYes
signaturesList(Binary)Yes
errUtf8Yes
feeUInt64Yes
compute_units_consumedUInt64Yes
loaded_readonly_addressesList(Binary)Yes
loaded_writable_addressesList(Binary)Yes
fee_payerBinaryYes
has_dropped_log_messagesBooleanYes

instructions

Field NameData TypeNullable
block_slotUInt64Yes
block_hashBinaryYes
transaction_indexUInt32Yes
instruction_addressList(UInt32)Yes
program_idBinaryYes
a0BinaryYes
a1BinaryYes
a2BinaryYes
a3BinaryYes
a4BinaryYes
a5BinaryYes
a6BinaryYes
a7BinaryYes
a8BinaryYes
a9BinaryYes
rest_of_accountsList(Binary)Yes
dataBinaryYes
d1BinaryYes
d2BinaryYes
d4BinaryYes
d8BinaryYes
errorUtf8Yes
compute_units_consumedUInt64Yes
is_committedBooleanYes
has_dropped_log_messagesBooleanYes

Steps

Steps are run between the provider and writer to modify the data or do other arbitrary actions.

There are built-in steps that can be configured and there is a mechanism to write custom steps which enables running arbitrary python code on the data as a part of the pipeline.

Base58Encode

Encode all binary fields as solana(bitcoin) style base58

@dataclass
class Base58EncodeConfig:
    tables: Optional[list[str]] = None

tables parameter can be used to specify which tables should be encoded. All tables are encoded if tables parameter is left None.

Cast

Cast columns to specified arrow types.

@dataclass
class CastConfig:
    table_name: str
    mappings: Dict[str, pa.DataType]
    allow_decode_fail: bool = False

When allow_decode_fail is set to True, this step will write null if it fails to cast a value instead of erroring out.

Example

Cast transfers.block_timestamp to int64 type.

from cherry_etl import config as cc

cc.Step(
    kind=cc.StepKind.CAST,
    config=cc.CastConfig(
        table_name="transfers",
        mappings={"block_timestamp": pa.int64()},
    ),
),

CastByType

Cast all columns with the from_type to to_type.

@dataclass
class CastByTypeConfig:
    from_type: pa.DataType
    to_type: pa.DataType
    allow_decode_fail: bool = False

When allow_decode_fail is set to True, this step will write null if it fails to cast a value instead of erroring out.

EvmDecodeEvents

Decode evm events using the given signature

@dataclass
class EvmDecodeEventsConfig:
    event_signature: str
    allow_decode_fail: bool = False
    input_table: str = "logs"
    output_table: str = "decoded_logs"
    hstack: bool = True

allow_decode_fail controls if the decoder should fail if a row fails to decode or if it should write null instead.

hstack means to horizontally stack raw logs and decoded logs tables together. So decoded values will have raw log values like block_number, address, topic0 etc. next to them.

Example

from cherry_etl import config as cc

cc.Step(
    kind=cc.StepKind.EVM_DECODE_EVENTS,
    config=cc.EvmDecodeEventsConfig(
        event_signature="Transfer(address indexed from, address indexed to, uint256 amount)",
        output_table="transfers",
    ),
),

HexEncode

Encode all binary fields as hex strings

@dataclass
class HexEncodeConfig:
    tables: Optional[list[str]] = None
    prefixed: bool = True

tables parameter can be used to specify which tables should be encoded. All tables are encoded if tables parameter is left None.

prefixed parameter chooses between prefixed-hex('0x..', eth style) and regular hex encoding.

U256ToBinary

Convert all Decimal256 columns to trimmed and big endian encoded binary values. This binary representation is the same as how it is done in ETH RPC and all eth related tools.

@dataclass
class U256ToBinaryConfig:
    tables: Optional[list[str]] = None

tables parameter can be used to specify which tables should be encoded. All tables are encoded if tables parameter is left None.

Writing Custom Steps

Custom steps can be specified like so:

from cherry_etl import config as cc

cc.Step(
    kind=cc.StepKind.CUSTOM,
    config=cc.CustomStepConfig(
        runner=my_custom_step,
    ),
),

where join_data is an arbitrary python function with this signature:

def my_custom_step(data: Dict[str, polars.DataFrame], _: Any) -> Dict[str, polars.DataFrame]:
    pass

This mechanism can be used to enrich the data with external sources like eth_call or ipfs.

It can also be used to just join the input tables like this:

def join_data(data: Dict[str, polars.DataFrame], _: Any) -> Dict[str, polars.DataFrame]:
    blocks = data["blocks"]
    transfers = data["transfers"]

    bn = blocks.get_column("number")
    logger.info(f"processing data from: {bn.min()} to: {bn.max()}")

    blocks = blocks.select(
        polars.col("number").alias("block_number"),
        polars.col("timestamp").alias("block_timestamp"),
    )
    out = transfers.join(blocks, on="block_number")

    return {"transfers": out}

Appendix

Polars and Decimal types (UInt256)

Note: This section is irrelevant if you are using datasets since datasets handle this internally.

Polars doesn't properly support Decimal256 type yet so it is not possible to work with ethereum uint256 values natively. We prefer to cast these values to Decimal128 if possible, we cast it to binary or string if it doesn't fit in that.

You can use the built-in CastByType step to cast all Decimal256 columns to Decimal128 like this:

from cherry_etl import config as cc

cc.Step(
    kind=cc.StepKind.CAST_BY_TYPE,
    config=cc.CastByTypeConfig(
        from_type=pa.decimal256(76, 0),
        # can also use `pa.string()` as `to_type` here to get string values like '123213'
        to_type=pa.decimal128(38, 0),
    ),
),

Or you can use U256ToBinary step to cast all u256 columns to big endian (eth compatible) encoded binary values

from cherry_etl import config as cc

cc.Step(
    kind=cc.StepKind.U256_TO_BINARY,
    config=cc.U256ToBinary(),
)

Some output formats also don't support Decimal256 but all of them support Decimal128 so this can be useful if you are having trouble writing Decimal256 to your output.