Introduction

GitHub

Cherry is a python library for building blockchain data pipelines.

It is designed to make building production-ready blockchain data pipelines easy.

Sponsors

Getting Started

This section explains how to install and use cherry.

Installation

Cherry is published to PyPI as cherry-etl.

Core tooling that is used with cherry is published as cherry-core.

You can add it to your python project like this:

pip install cherry-etl cherry-core

Or if you are using uv:

uv add cherry-etl cherry-core

Import it in your python scripts like this:

import cherry_etl
import cherry_core

Data Providers

Everything in cherry works the same regardless of which data provider you choose.

You can change the provider by just changing provider_kind and provider_url arguments when constructing a ProviderConfig object.

from cherry_core import ingest

provider = ingest.ProviderConfig(
    kind=my_provider_kind,
    url=my_provider_url,
)

Check out the specific section of the provider you want to use to get more info.

Data availability

ProviderEthereum (EVM)Solana (SVM)
HyperSync
SQD
Yellowstone-GRPC

Full ProviderConfig

class ProviderKind(str, Enum):
    SQD = "sqd"
    HYPERSYNC = "hypersync"
    YELLOWSTONE_GRPC = "yellowstone_grpc"

@dataclass
class ProviderConfig:
    kind: ProviderKind
    url: Optional[str] = None
    bearer_token: Optional[str] = None
    max_num_retries: Optional[int] = None
    retry_backoff_ms: Optional[int] = None
    retry_base_ms: Optional[int] = None
    retry_ceiling_ms: Optional[int] = None
    req_timeout_millis: Optional[int] = None
    stop_on_head: bool = False
    head_poll_interval_millis: Optional[int] = None
    buffer_size: Optional[int] = None

HyperSync

HyperSync is a purpose-built, high-performance data retrieval layer that gives developers unprecedented access to blockchain data. Built from the ground up in Rust, HyperSync serves as an alternative to traditional JSON-RPC endpoints, offering dramatically faster queries and more flexible data access patterns.

HyperSync currently supports 70+ EVM chains.

Read more about it at the offical documentation.

Example ProviderConfig

from cherry_core import ingest

provider = ingest.ProviderConfig(
    kind=ingest.ProviderKind.HYPERSYNC,
    bearer_token="<Your Hypersync API Token>",
    url="https://eth.hypersync.xyz",
)

You can get an url for a desired chain from the supported networks page.

Getting API token

HyperSync is currently free and unlimited but it is transitioning into API Tokens.

It is recommended to get an API token and use it with your queries.

SQD

SQD Network is a decentralized query engine optimized for batch extraction of large volumes of data. It currently serves historical on-chain data ingested from 100+ EVM and Substrate networks, as well as Solana (in beta), Tron, Starknet and Fuel. The data is comprehensive: for example, on EVM it includes event logs, transaction receipts, traces and per-transaction state diffs.

SQD currently supports 100+ EVM Chains

SQD has beta support for Solana.

Warning: Data starts from block slot 317617480 when using solana with sqd.

Read more at the official documentation.

Example ProviderConfig

from cherry_core import ingest

provider = ingest.ProviderConfig(
    kind=ingest.ProviderKind.HYPERSYNC,
    url="https://portal.sqd.dev/datasets/solana-beta",
)

You can change solana-beta with any dataset name from the supported networks page.

For example you can use https://portal.sqd.dev/datasets/arbitrum-nova to get arbitrum-nova data.

Deploying portals

Access to SQD portal API is free but slightly limited at the moment.

You can deploy a portal to have unlimited access. You can contact us in telegram for help with deploying and maintaining portal instances.

Writers

Writers control how data is written to the output after it is ingested from the provider and processed by the configured steps.

Supported output formats

  • ClickHouse
  • Iceberg
  • Deltalake
  • DuckDB
  • Arrow Datasets
  • Parquet

Full Writer configuration API

class WriterKind(str, Enum):
    CLICKHOUSE = "clickhouse"
    ICEBERG = "iceberg"
    DELTA_LAKE = "delta_lake"
    PYARROW_DATASET = "pyarrow_dataset"
    DUCKDB = "duckdb"

@dataclass
class Writer:
    kind: WriterKind
    config: (
        ClickHouseWriterConfig
        | IcebergWriterConfig
        | DeltaLakeWriterConfig
        | PyArrowDatasetWriterConfig
        | DuckdbWriterConfig
    )

See specific writer pages to learn more about your writer of choice.

If you don't have a choice, we highly recommend duckdb for development and testing as it is very solid and easy to use.

ClickHouse

Config

@dataclass
class ClickHouseWriterConfig:
    client: ClickHouseClient
    codec: Dict[str, Dict[str, str]] = field(default_factory=dict)
    order_by: Dict[str, List[str]] = field(default_factory=dict)
    engine: str = "MergeTree()"
    skip_index: Dict[str, List[ClickHouseSkipIndex]] = field(default_factory=dict)
    anchor_table: Optional[str] = None

Dict[str, _] fields generally mean config per table name for example codec["my_table"]["my_column"] would give the codec to use for my_column column of my_table table.

Example

from cherry_etl import config as cc

clickhouse_client = await clickhouse_connect.get_async_client(
    host=os.environ.get("CLICKHOUSE_HOST", "localhost"),
    port=int(os.environ.get("CLICKHOUSE_PORT", "8123")),
    username=os.environ.get("CLICKHOUSE_USER", "default"),
    password=os.environ.get("CLICKHOUSE_PASSWORD", "clickhouse"),
    database=os.environ.get("CLICKHOUSE_DATABASE", "blockchain"),
)

writer = cc.Writer(
    kind=cc.WriterKind.CLICKHOUSE,
    config=cc.ClickHouseWriterConfig(
        client=clickhouse_client,
        order_by={"transfers": ["block_number"]},
        codec={"transfers": {"data": "ZSTD"}},
        skip_index={
            "transfers": [
                cc.ClickHouseSkipIndex(
                    name="log_addr_idx",
                    val="address",
                    type_="bloom_filter(0.01)",
                    granularity=1,
                ),
                cc.ClickHouseSkipIndex(
                    name="from_addr_idx",
                    val="from",
                    type_="bloom_filter(0.01)",
                    granularity=1,
                ),
                cc.ClickHouseSkipIndex(
                    name="to_addr_idx",
                    val="to",
                    type_="bloom_filter(0.01)",
                    granularity=1,
                ),
            ]
        },
    ),
)

Anchor table

All tables are written in parallel but anchor table is written seperately so it can be used to implement crash-resistance.

DeltaLake

Config

@dataclass
class DeltaLakeWriterConfig:
    data_uri: str
    partition_by: Dict[str, list[str]] = field(default_factory=dict)
    storage_options: Optional[Dict[str, str]] = None
    writer_properties: Optional[deltalake.WriterProperties] = None
    anchor_table: Optional[str] = None

Dict[str, _] fields generally mean config per table name for example partition_by["my_table"] would give list of columns to partition the my_table table by.

Example

data_uri = "./data"

writer = cc.Writer(
    kind=cc.WriterKind.DELTA_LAKE,
    config=cc.DeltaLakeWriterConfig(
        data_uri=data_uri,
    ),
)

Anchor table

All tables are written in parallel but anchor table is written seperately so it can be used to implement crash-resistance.

DuckDB

We highly recommend duckdb for development and testing as it is very solid and easy to use.

Read their python docs

Config

@dataclass
class DuckdbWriterConfig:
    connection: duckdb.DuckDBPyConnection

Warning: Do not pass a duckdb.DuckDBPyConnection directly if you are using it while the pipeline is running. It is not thread safe. use connection.cursor() to create a cursor and pass that instead. `

Example

# create an in-memory duckdb database
connection = duckdb.connect()

writer = cc.Writer(
    kind=cc.WriterKind.DUCKDB,
    config=cc.DuckdbWriterConfig(
        connection=connection.cursor(),
    ),
)

Iceberg

Config

from pyiceberg.catalog import Catalog as IcebergCatalog

@dataclass
class IcebergWriterConfig:
    namespace: str
    catalog: IcebergCatalog
    write_location: str

See pyiceberg docs.

Example

from cherry_etl import config as cc
from pyiceberg.catalog.sql import SqlCatalog

catalog = SqlCatalog(
    name="cherry",
    uri="postgresql+psycopg2://postgres:postgres@localhost/iceberg",
    warehouse="s3://blockchain-data",
    **{
        "s3.endpoint": "http://localhost:9000",
        "s3.access-key-id": "minioadmin",
        "s3.secret-access-key": "minioadmin",
    },
)

writer = cc.Writer(
    kind=cc.WriterKind.ICEBERG,
    config=cc.IcebergWriterConfig(
        namespace="my_namespace",
        catalog=catalog,
        write_location="s3://blockchain-data/",
    ),
)

PyArrowDataset(Parquet)

See the documentation to learn more about arrow datasets.

This is the writer you want if you just want a directory of parquet files.

Config

@dataclass
class PyArrowDatasetWriterConfig:
    base_dir: str
    basename_template: Optional[str] = None
    partitioning: Dict[str, pa_dataset.Partitioning | list[str]] = field(
        default_factory=dict
    )
    partitioning_flavor: Dict[str, str] = field(default_factory=dict)
    filesystem: Optional[pa_fs.FileSystem] = None
    file_options: Optional[pa_dataset.FileWriteOptions] = None
    use_threads: bool = True
    max_partitions: int = 1024
    max_open_files: int = 1024
    max_rows_per_file: int = 0
    min_rows_per_group: int = 0
    max_rows_per_group: int = 1024 * 1024
    create_dir: bool = True
    anchor_table: Optional[str] = None

See pyarrow docs for more explanation about the parameters.

Example

from cherry_eth import config as cc

base_dir = "./data"

writer = cc.Writer(
    kind=cc.WriterKind.PYARROW_DATASET,
    config=cc.PyArrowDatasetWriterConfig(
        base_dir=base_dir,
    ),
)

Starting from where you left off

Since the pipeline is created purely in python at runtime. It is possible to set from_block parameter programatically.

For example when using duckdb, you can create a function like this to get last written block:

def get_start_block(con: duckdb.DuckDBPyConnection) -> int:
    try:
        res = con.sql("SELECT MAX(block_number) from my_table").fetchone()
        if res is not None:
            return int(res[0])
        else:
            return 0
    except Exception:
        print(f"failed to get start block from db: {traceback.format_exc()}")
        return 0

Then you can use it like this when creating the pipeline object:

pipeline = datasets.evm.blocks(
    provider,
    writer,
    from_block=get_start_block(duckdb_connection),
    to_block
)

Crash resistance

Some writers like duckdb are naturally crash-resistant because writes run inside a transaction.

Other writers are naturally crash-resistant as long as there is only one output table. If there are multiple output tables, you can utilize the anchor_table parameter to implement crash-resistance.

Output tables are written in parallel for performance but the anchor_table is always written last. So you can prune the non-anchor tables at startup from the output database and start from max_block of anchor_table (see restarting from where you left off section)

Logging

Python code uses the standard logging module of python, so it can be configured according to python docs.

Set RUST_LOG environment variable according to env_logger docs in order to see logs from rust modules.

To run an example with trace level logging for rust modules:

RUST_LOG=trace uv run examples/path/to/my/example

Writing Pipelines

There are three components of a pipeline.

  • First we get the data from the Provider.
  • The data we get from the provider depends on the query we use.
  • Then we run the steps one by one on the incoming data to transform it.
  • The transformed data is written to the output using the writer in the end.

We already covered Provider and Writer in the previous pages so we will cover writing queries and steps in this section.

Example

Here is a simple example pipeline that ingests and decodes erc20 transfers into duckdb: source code

Query

The query is used to specify the data we get from the provider.

It consists of:

  • from_block, the block to start from
  • to_block (optional), the block to stop at (inclusive).
  • request fields
  • field selection

Request Fields

The request fields are named after tables, e.g. logs

from cherry_core import ingest

query = ingest.Query(
    kind=ingest.QueryKind.EVM,
    params=ingest.evm.Query(
        from_block=from_block,
        to_block=to_block,
        logs=[
            ingest.evm.LogRequest(
                topic0=[cherry_core.evm_signature_to_topic0(dex.event_signature)],
                include_blocks=True,
                include_transactions=True,
            )
        ],
        fields=request_fields,
    ),
)

This example creates an EVM query looking for logs matching a single LogRequest.

include_ fields determine if related data should be included. For example, include_blocks=True means the response will contain the blocks table with the blocks containing the matched logs.

Log Filtering

The core filtering behaviors are:

  1. Between multiple LogRequest objects: When multiple LogRequest objects are provided, logs matching ANY of them will be returned (OR logic).

  2. Between different fields in the same LogRequest: When multiple fields are specified (e.g., address AND topic0), logs matching ALL criteria will be returned (AND logic).

  3. Within a single field: When multiple values are provided for one field (e.g., multiple addresses in the address field), logs matching ANY of those values will be returned (OR logic).

OR Mode: Separate Log Requests

Use separate log requests to match events that satisfy ANY of the conditions:

logs=[
    # Will match USDC Transfer OR WETH Approval
    LogRequest(address=[USDC], topic0=[TRANSFER_TOPIC]),
    LogRequest(address=[WETH], topic0=[APPROVAL_TOPIC])
]

Results:

┌─────────────┬──────────┐
│ address     ┆ topic0   │
╞═════════════╪══════════╡
│ USDC        ┆ Transfer │  # Only USDC Transfers
│ WETH        ┆ Approval │  # Only WETH Approvals
└─────────────┴──────────┘

CROSS JOIN: Multiple Values in Single Request

Put multiple values in a single request to match ALL combinations:

logs=[
    LogRequest(
        address=[USDC, WETH],
        topic0=[TRANSFER_TOPIC, APPROVAL_TOPIC]
    )
]

Results:

┌─────────────┬──────────┐
│ address     ┆ topic0   │
╞═════════════╪══════════╡
│ WETH        ┆ Approval │  # All possible combinations
│ WETH        ┆ Transfer │  # of addresses and topics
│ USDC        ┆ Approval │  # will be matched
│ USDC        ┆ Transfer │
└─────────────┴──────────┘

Common Use Cases

  1. Track specific events from multiple contracts:

    logs=[
        LogRequest(
            address=[CONTRACT_A, CONTRACT_B, CONTRACT_C],
            topic0=[SPECIFIC_EVENT_TOPIC]
        )
    ]
    
  2. Track multiple events from a specific contract:

    logs=[
        LogRequest(
            address=[SPECIFIC_CONTRACT],
            topic0=[EVENT_1_TOPIC, EVENT_2_TOPIC, EVENT_3_TOPIC]
        )
    ]
    
  3. Track specific event combinations:

    logs=[
        LogRequest(address=[CONTRACT_A], topic0=[EVENT_X_TOPIC]),
        LogRequest(address=[CONTRACT_B], topic0=[EVENT_Y_TOPIC])
    ]
    

Filtering by Indexed Parameters (topic1, topic2, etc.)

For EVM events, indexed parameters are stored in the topics array (topic1, topic2, etc.). You can filter by these parameters to get more specific results:

# Filter Transfer events where "from" address is specific
from_address = "0x000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"  # Padded to 32 bytes
transfer_topic0 = cherry_core.evm_signature_to_topic0("Transfer(address,address,uint256)")

query = ingest.Query(
    kind=ingest.QueryKind.EVM,
    params=ingest.evm.Query(
        from_block=20123123,
        to_block=20123223,
        logs=[
            ingest.evm.LogRequest(
                topic0=[transfer_topic0],
                topic1=[from_address],  # Filter by 'from' address (first indexed parameter)
                include_blocks=True,
            )
        ],
        fields=ingest.evm.Fields(
            # Fields definition...
        ),
    ),
)

Important notes about indexed parameters:

  1. Topic values should be padded to 32 bytes (for addresses, prepend with zeros)
  2. For addresses, the correct format is: "0x000000000000000000000000" + address[2:]
  3. Parameter ordering follows the indexed parameters in the event signature
  4. Non-indexed parameters are encoded in the data field and can't be filtered directly

Field Selection

Query.fields specifies which columns of each table we want to select.

request_fields = ingest.evm.Fields(
    block=ingest.evm.BlockFields(number=True, timestamp=True),
    transaction=ingest.evm.TransactionFields(
        block_number=True, transaction_index=True, hash=True, from_=True, to=True
    ),
    log=ingest.evm.LogFields(
        block_number=True,
        transaction_index=True,
        log_index=True,
        address=True,
        data=True,
        topic0=True,
        topic1=True,
        topic2=True,
        topic3=True,
    ),
)

This means we want to get blocks.number, blocks.timestamp, transactions.block_number ... fields in the response.

Minimizing the fields we request can make a massive difference in performance and response body sizes, both can drastically effect cost of running infrastructure.

Modern providers like SQD and HyperSync fully utilize this selection so it makes a big difference.

Data Schema

Each chain format has it's own arrow schema. All providers return data according to this schema.

Ethereum (EVM)

All providers return data according to this schema when queried using ingest.evm.Query.

source code.

blocks

Field NameData TypeNullable
numberUInt64Yes
hashBinaryYes
parent_hashBinaryYes
nonceBinaryYes
sha3_unclesBinaryYes
logs_bloomBinaryYes
transactions_rootBinaryYes
state_rootBinaryYes
receipts_rootBinaryYes
minerBinaryYes
difficultyDecimal256(76, 0)Yes
total_difficultyDecimal256(76, 0)Yes
extra_dataBinaryYes
sizeDecimal256(76, 0)Yes
gas_limitDecimal256(76, 0)Yes
gas_usedDecimal256(76, 0)Yes
timestampDecimal256(76, 0)Yes
unclesList(Binary)Yes
base_fee_per_gasDecimal256(76, 0)Yes
blob_gas_usedDecimal256(76, 0)Yes
excess_blob_gasDecimal256(76, 0)Yes
parent_beacon_block_rootBinaryYes
withdrawals_rootBinaryYes
withdrawalsList(Struct(index: UInt64, validator_index: UInt64, address: Binary, amount: Decimal256(76, 0)))Yes
l1_block_numberUInt64Yes
send_countDecimal256(76, 0)Yes
send_rootBinaryYes
mix_hashBinaryYes

transactions

Field NameData TypeNullable
block_hashBinaryYes
block_numberUInt64Yes
fromBinaryYes
gasDecimal256(76, 0)Yes
gas_priceDecimal256(76, 0)Yes
hashBinaryYes
inputBinaryYes
nonceDecimal256(76, 0)Yes
toBinaryYes
transaction_indexUInt64Yes
valueDecimal256(76, 0)Yes
vUInt8Yes
rBinaryYes
sBinaryYes
max_priority_fee_per_gasDecimal256(76, 0)Yes
max_fee_per_gasDecimal256(76, 0)Yes
chain_idDecimal256(76, 0)Yes
cumulative_gas_usedDecimal256(76, 0)Yes
effective_gas_priceDecimal256(76, 0)Yes
gas_usedDecimal256(76, 0)Yes
contract_addressBinaryYes
logs_bloomBinaryYes
typeUInt8Yes
rootBinaryYes
statusUInt8Yes
sighashBinaryYes
y_parityBooleanYes
access_listList(Struct(address: Binary, storage_keys: List(Binary)))Yes
l1_feeDecimal256(76, 0)Yes
l1_gas_priceDecimal256(76, 0)Yes
l1_gas_usedDecimal256(76, 0)Yes
l1_fee_scalarDecimal256(76, 0)Yes
gas_used_for_l1Decimal256(76, 0)Yes
max_fee_per_blob_gasDecimal256(76, 0)Yes
blob_versioned_hashesList(Binary)Yes
deposit_nonceDecimal256(76, 0)Yes
blob_gas_priceDecimal256(76, 0)Yes
deposit_receipt_versionDecimal256(76, 0)Yes
blob_gas_usedDecimal256(76, 0)Yes
l1_base_fee_scalarDecimal256(76, 0)Yes
l1_blob_base_feeDecimal256(76, 0)Yes
l1_blob_base_fee_scalarDecimal256(76, 0)Yes
l1_block_numberUInt64Yes
mintDecimal256(76, 0)Yes
source_hashBinaryYes

logs

Field NameData TypeNullable
removedBooleanYes
log_indexUInt64Yes
transaction_indexUInt64Yes
transaction_hashBinaryYes
block_hashBinaryYes
block_numberUInt64Yes
addressBinaryYes
dataBinaryYes
topic0BinaryYes
topic1BinaryYes
topic2BinaryYes
topic3BinaryYes

traces

Field NameData TypeNullable
fromBinaryYes
toBinaryYes
call_typeUtf8Yes
gasDecimal256(76, 0)Yes
inputBinaryYes
initBinaryYes
valueDecimal256(76, 0)Yes
authorBinaryYes
reward_typeUtf8Yes
block_hashBinaryYes
block_numberUInt64Yes
addressBinaryYes
codeBinaryYes
gas_usedDecimal256(76, 0)Yes
outputBinaryYes
subtracesUInt64Yes
trace_addressList(UInt64)Yes
transaction_hashBinaryYes
transaction_positionUInt64Yes
typeUtf8Yes
errorUtf8Yes
sighashBinaryYes
action_addressBinaryYes
balanceDecimal256(76, 0)Yes
refund_addressBinaryYes

Solana (SVM)

All providers return data according to this schema when queried using ingest.svm.Query.

source code.

blocks

Field NameData TypeNullable
slotUInt64Yes
hashBinaryYes
parent_slotUInt64Yes
parent_hashBinaryYes
heightUInt64Yes
timestampInt64Yes

rewards

Field NameData TypeNullable
block_slotUInt64Yes
block_hashBinaryYes
pubkeyBinaryYes
lamportsInt64Yes
post_balanceUInt64Yes
reward_typeUtf8Yes
commissionUInt8Yes

token_balances

Field NameData TypeNullable
block_slotUInt64Yes
block_hashBinaryYes
transaction_indexUInt32Yes
accountBinaryYes
pre_mintBinaryYes
post_mintBinaryYes
pre_decimalsUInt16Yes
post_decimalsUInt16Yes
pre_program_idBinaryYes
post_program_idBinaryYes
pre_ownerBinaryYes
post_ownerBinaryYes
pre_amountUInt64Yes
post_amountUInt64Yes

balances

Field NameData TypeNullable
block_slotUInt64Yes
block_hashBinaryYes
transaction_indexUInt32Yes
accountBinaryYes
preUInt64Yes
postUInt64Yes

logs

Field NameData TypeNullable
block_slotUInt64Yes
block_hashBinaryYes
transaction_indexUInt32Yes
log_indexUInt32Yes
instruction_addressList(UInt32)Yes
program_idBinaryYes
kindUtf8Yes
messageUtf8Yes

transactions

Field NameData TypeNullable
block_slotUInt64Yes
block_hashBinaryYes
transaction_indexUInt32Yes
signatureBinaryYes
versionInt8Yes
account_keysList(Binary)Yes
address_table_lookupsList(Struct(account_key: Binary, writable_indexes: List(UInt64), readonly_indexes: List(UInt64)))Yes
num_readonly_signed_accountsUInt32Yes
num_readonly_unsigned_accountsUInt32Yes
num_required_signaturesUInt32Yes
recent_blockhashBinaryYes
signaturesList(Binary)Yes
errUtf8Yes
feeUInt64Yes
compute_units_consumedUInt64Yes
loaded_readonly_addressesList(Binary)Yes
loaded_writable_addressesList(Binary)Yes
fee_payerBinaryYes
has_dropped_log_messagesBooleanYes

instructions

Field NameData TypeNullable
block_slotUInt64Yes
block_hashBinaryYes
transaction_indexUInt32Yes
instruction_addressList(UInt32)Yes
program_idBinaryYes
a0BinaryYes
a1BinaryYes
a2BinaryYes
a3BinaryYes
a4BinaryYes
a5BinaryYes
a6BinaryYes
a7BinaryYes
a8BinaryYes
a9BinaryYes
rest_of_accountsList(Binary)Yes
dataBinaryYes
d1BinaryYes
d2BinaryYes
d4BinaryYes
d8BinaryYes
errorUtf8Yes
compute_units_consumedUInt64Yes
is_committedBooleanYes
has_dropped_log_messagesBooleanYes

Steps

Steps are run between the provider and writer to modify the data or do other arbitrary actions.

There are built-in steps that can be configured and there is a mechanism to write custom steps which enables running arbitrary python code on the data as a part of the pipeline.

Base58Encode

Encode all binary fields as solana(bitcoin) style base58

@dataclass
class Base58EncodeConfig:
    tables: Optional[list[str]] = None

tables parameter can be used to specify which tables should be encoded. All tables are encoded if tables parameter is left None.

Cast

Cast columns to specified arrow types.

@dataclass
class CastConfig:
    table_name: str
    mappings: Dict[str, pa.DataType]
    allow_decode_fail: bool = False

When allow_decode_fail is set to True, this step will write null if it fails to cast a value instead of erroring out.

Example

Cast transfers.block_timestamp to int64 type.

from cherry_etl import config as cc

cc.Step(
    kind=cc.StepKind.CAST,
    config=cc.CastConfig(
        table_name="transfers",
        mappings={"block_timestamp": pa.int64()},
    ),
),

CastByType

Cast all columns with the from_type to to_type.

@dataclass
class CastByTypeConfig:
    from_type: pa.DataType
    to_type: pa.DataType
    allow_decode_fail: bool = False

When allow_decode_fail is set to True, this step will write null if it fails to cast a value instead of erroring out.

EvmDecodeEvents

Decode evm events using the given signature

@dataclass
class EvmDecodeEventsConfig:
    event_signature: str
    allow_decode_fail: bool = False
    input_table: str = "logs"
    output_table: str = "decoded_logs"
    hstack: bool = True

allow_decode_fail controls if the decoder should fail if a row fails to decode or if it should write null instead.

hstack means to horizontally stack raw logs and decoded logs tables together. So decoded values will have raw log values like block_number, address, topic0 etc. next to them.

Example

from cherry_etl import config as cc

cc.Step(
    kind=cc.StepKind.EVM_DECODE_EVENTS,
    config=cc.EvmDecodeEventsConfig(
        event_signature="Transfer(address indexed from, address indexed to, uint256 amount)",
        output_table="transfers",
    ),
),

HexEncode

Encode all binary fields as hex strings

@dataclass
class HexEncodeConfig:
    tables: Optional[list[str]] = None
    prefixed: bool = True

tables parameter can be used to specify which tables should be encoded. All tables are encoded if tables parameter is left None.

prefixed parameter chooses between prefixed-hex('0x..', eth style) and regular hex encoding.

U256ToBinary

Convert all Decimal256 columns to trimmed and big endian encoded binary values. This binary representation is the same as how it is done in ETH RPC and all eth related tools.

@dataclass
class U256ToBinaryConfig:
    tables: Optional[list[str]] = None

tables parameter can be used to specify which tables should be encoded. All tables are encoded if tables parameter is left None.

Appendix

Polars and Decimal types (UInt256)

Note: This section is irrelevant if you are using datasets since datasets handle this internally.

Polars doesn't properly support Decimal256 type yet so it is not possible to work with ethereum uint256 values natively. We prefer to cast these values to Decimal128 if possible, we cast it to binary or string if it doesn't fit in that.

You can use the built-in CastByType step to cast all Decimal256 columns to Decimal128 like this:

from cherry_etl import config as cc

cc.Step(
    kind=cc.StepKind.CAST_BY_TYPE,
    config=cc.CastByTypeConfig(
        from_type=pa.decimal256(76, 0),
        # can also use `pa.string()` as `to_type` here to get string values like '123213'
        to_type=pa.decimal128(38, 0),
    ),
),

Or you can use U256ToBinary step to cast all u256 columns to big endian (eth compatible) encoded binary values

from cherry_etl import config as cc

cc.Step(
    kind=cc.StepKind.U256_TO_BINARY,
    config=cc.U256ToBinary(),
)

Some output formats also don't support Decimal256 but all of them support Decimal128 so this can be useful if you are having trouble writing Decimal256 to your output.