Introduction
Cherry is a python library for building blockchain data pipelines.
It is designed to make building production-ready blockchain data pipelines easy.
Sponsors
Getting Started
This section explains how to install and use cherry.
Installation
Cherry is published to PyPI as cherry-etl.
Core tooling that is used with cherry is published as cherry-core.
You can add it to your python project like this:
pip install cherry-etl cherry-core
Or if you are using uv:
uv add cherry-etl cherry-core
Import it in your python scripts like this:
import cherry_etl
import cherry_core
Data Providers
Everything in cherry works the same regardless of which data provider you choose.
You can change the provider by just changing provider_kind
and provider_url
arguments when constructing a ProviderConfig
object.
from cherry_core import ingest
provider = ingest.ProviderConfig(
kind=my_provider_kind,
url=my_provider_url,
)
Check out the specific section of the provider you want to use to get more info.
Data availability
Provider | Ethereum (EVM) | Solana (SVM) |
---|---|---|
HyperSync | ✅ | ❌ |
SQD | ✅ | ✅ |
Yellowstone-GRPC | ❌ | ✅ |
Full ProviderConfig
class ProviderKind(str, Enum):
SQD = "sqd"
HYPERSYNC = "hypersync"
YELLOWSTONE_GRPC = "yellowstone_grpc"
@dataclass
class ProviderConfig:
kind: ProviderKind
url: Optional[str] = None
bearer_token: Optional[str] = None
max_num_retries: Optional[int] = None
retry_backoff_ms: Optional[int] = None
retry_base_ms: Optional[int] = None
retry_ceiling_ms: Optional[int] = None
req_timeout_millis: Optional[int] = None
stop_on_head: bool = False
head_poll_interval_millis: Optional[int] = None
buffer_size: Optional[int] = None
HyperSync
HyperSync is a purpose-built, high-performance data retrieval layer that gives developers unprecedented access to blockchain data. Built from the ground up in Rust, HyperSync serves as an alternative to traditional JSON-RPC endpoints, offering dramatically faster queries and more flexible data access patterns.
HyperSync currently supports 70+ EVM chains.
Read more about it at the offical documentation.
Example ProviderConfig
from cherry_core import ingest
provider = ingest.ProviderConfig(
kind=ingest.ProviderKind.HYPERSYNC,
bearer_token="<Your Hypersync API Token>",
url="https://eth.hypersync.xyz",
)
You can get an url for a desired chain from the supported networks page.
Getting API token
HyperSync is currently free and unlimited but it is transitioning into API Tokens.
It is recommended to get an API token and use it with your queries.
SQD
SQD Network is a decentralized query engine optimized for batch extraction of large volumes of data. It currently serves historical on-chain data ingested from 100+ EVM and Substrate networks, as well as Solana (in beta), Tron, Starknet and Fuel. The data is comprehensive: for example, on EVM it includes event logs, transaction receipts, traces and per-transaction state diffs.
SQD currently supports 100+ EVM Chains
SQD has beta support for Solana.
Warning: Data starts from block slot 317617480
when using solana with sqd.
Read more at the official documentation.
Example ProviderConfig
from cherry_core import ingest
provider = ingest.ProviderConfig(
kind=ingest.ProviderKind.HYPERSYNC,
url="https://portal.sqd.dev/datasets/solana-beta",
)
You can change solana-beta
with any dataset name from the supported networks page.
For example you can use https://portal.sqd.dev/datasets/arbitrum-nova
to get arbitrum-nova data.
Deploying portals
Access to SQD portal API is free but slightly limited at the moment.
You can deploy a portal to have unlimited access. You can contact us in telegram for help with deploying and maintaining portal instances.
Writers
Writers control how data is written to the output after it is ingested from the provider and processed by the configured steps.
Supported output formats
- ClickHouse
- Iceberg
- Deltalake
- DuckDB
- Arrow Datasets
- Parquet
Full Writer configuration API
class WriterKind(str, Enum):
CLICKHOUSE = "clickhouse"
ICEBERG = "iceberg"
DELTA_LAKE = "delta_lake"
PYARROW_DATASET = "pyarrow_dataset"
DUCKDB = "duckdb"
@dataclass
class Writer:
kind: WriterKind
config: (
ClickHouseWriterConfig
| IcebergWriterConfig
| DeltaLakeWriterConfig
| PyArrowDatasetWriterConfig
| DuckdbWriterConfig
)
See specific writer pages to learn more about your writer of choice.
If you don't have a choice, we highly recommend duckdb for development and testing as it is very solid and easy to use.
ClickHouse
Config
@dataclass
class ClickHouseWriterConfig:
client: ClickHouseClient
codec: Dict[str, Dict[str, str]] = field(default_factory=dict)
order_by: Dict[str, List[str]] = field(default_factory=dict)
engine: str = "MergeTree()"
skip_index: Dict[str, List[ClickHouseSkipIndex]] = field(default_factory=dict)
anchor_table: Optional[str] = None
Dict[str, _]
fields generally mean config per table name for example codec["my_table"]["my_column"] would give the codec to use for my_column column of my_table table.
Example
from cherry_etl import config as cc
clickhouse_client = await clickhouse_connect.get_async_client(
host=os.environ.get("CLICKHOUSE_HOST", "localhost"),
port=int(os.environ.get("CLICKHOUSE_PORT", "8123")),
username=os.environ.get("CLICKHOUSE_USER", "default"),
password=os.environ.get("CLICKHOUSE_PASSWORD", "clickhouse"),
database=os.environ.get("CLICKHOUSE_DATABASE", "blockchain"),
)
writer = cc.Writer(
kind=cc.WriterKind.CLICKHOUSE,
config=cc.ClickHouseWriterConfig(
client=clickhouse_client,
order_by={"transfers": ["block_number"]},
codec={"transfers": {"data": "ZSTD"}},
skip_index={
"transfers": [
cc.ClickHouseSkipIndex(
name="log_addr_idx",
val="address",
type_="bloom_filter(0.01)",
granularity=1,
),
cc.ClickHouseSkipIndex(
name="from_addr_idx",
val="from",
type_="bloom_filter(0.01)",
granularity=1,
),
cc.ClickHouseSkipIndex(
name="to_addr_idx",
val="to",
type_="bloom_filter(0.01)",
granularity=1,
),
]
},
),
)
Anchor table
All tables are written in parallel but anchor table is written seperately so it can be used to implement crash-resistance
.
DeltaLake
Config
@dataclass
class DeltaLakeWriterConfig:
data_uri: str
partition_by: Dict[str, list[str]] = field(default_factory=dict)
storage_options: Optional[Dict[str, str]] = None
writer_properties: Optional[deltalake.WriterProperties] = None
anchor_table: Optional[str] = None
Dict[str, _]
fields generally mean config per table name for example partition_by["my_table"] would give list of columns to partition the my_table
table by.
Example
data_uri = "./data"
writer = cc.Writer(
kind=cc.WriterKind.DELTA_LAKE,
config=cc.DeltaLakeWriterConfig(
data_uri=data_uri,
),
)
Anchor table
All tables are written in parallel but anchor table is written seperately so it can be used to implement crash-resistance
.
DuckDB
We highly recommend duckdb for development and testing as it is very solid and easy to use.
Read their python docs
Config
@dataclass
class DuckdbWriterConfig:
connection: duckdb.DuckDBPyConnection
Warning: Do not pass a duckdb.DuckDBPyConnection
directly if you are using it while the pipeline is running. It is not thread safe.
use connection.cursor()
to create a cursor and pass that instead.
`
Example
# create an in-memory duckdb database
connection = duckdb.connect()
writer = cc.Writer(
kind=cc.WriterKind.DUCKDB,
config=cc.DuckdbWriterConfig(
connection=connection.cursor(),
),
)
Iceberg
Config
from pyiceberg.catalog import Catalog as IcebergCatalog
@dataclass
class IcebergWriterConfig:
namespace: str
catalog: IcebergCatalog
write_location: str
See pyiceberg docs.
Example
from cherry_etl import config as cc
from pyiceberg.catalog.sql import SqlCatalog
catalog = SqlCatalog(
name="cherry",
uri="postgresql+psycopg2://postgres:postgres@localhost/iceberg",
warehouse="s3://blockchain-data",
**{
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "minioadmin",
"s3.secret-access-key": "minioadmin",
},
)
writer = cc.Writer(
kind=cc.WriterKind.ICEBERG,
config=cc.IcebergWriterConfig(
namespace="my_namespace",
catalog=catalog,
write_location="s3://blockchain-data/",
),
)
PyArrowDataset(Parquet)
See the documentation to learn more about arrow datasets.
This is the writer you want if you just want a directory of parquet files.
Config
@dataclass
class PyArrowDatasetWriterConfig:
base_dir: str
basename_template: Optional[str] = None
partitioning: Dict[str, pa_dataset.Partitioning | list[str]] = field(
default_factory=dict
)
partitioning_flavor: Dict[str, str] = field(default_factory=dict)
filesystem: Optional[pa_fs.FileSystem] = None
file_options: Optional[pa_dataset.FileWriteOptions] = None
use_threads: bool = True
max_partitions: int = 1024
max_open_files: int = 1024
max_rows_per_file: int = 0
min_rows_per_group: int = 0
max_rows_per_group: int = 1024 * 1024
create_dir: bool = True
anchor_table: Optional[str] = None
See pyarrow docs for more explanation about the parameters.
Example
from cherry_eth import config as cc
base_dir = "./data"
writer = cc.Writer(
kind=cc.WriterKind.PYARROW_DATASET,
config=cc.PyArrowDatasetWriterConfig(
base_dir=base_dir,
),
)
Starting from where you left off
Since the pipeline is created purely in python at runtime. It is possible to set from_block parameter programatically.
For example when using duckdb
, you can create a function like this to get last written block:
def get_start_block(con: duckdb.DuckDBPyConnection) -> int:
try:
res = con.sql("SELECT MAX(block_number) from my_table").fetchone()
if res is not None:
return int(res[0])
else:
return 0
except Exception:
print(f"failed to get start block from db: {traceback.format_exc()}")
return 0
Then you can use it like this when creating the pipeline object:
pipeline = datasets.evm.blocks(
provider,
writer,
from_block=get_start_block(duckdb_connection),
to_block
)
Crash resistance
Some writers like duckdb
are naturally crash-resistant because writes run inside a transaction.
Other writers are naturally crash-resistant as long as there is only one output table. If there are multiple
output tables, you can utilize the anchor_table
parameter to implement crash-resistance.
Output tables are written in parallel for performance but the anchor_table
is always written last. So you can prune the
non-anchor tables at startup from the output database and start from max_block of anchor_table (see restarting from where you left off section)
Logging
Python code uses the standard logging
module of python, so it can be configured according to python docs.
Set RUST_LOG
environment variable according to env_logger docs in order to see logs from rust modules.
To run an example with trace level logging for rust modules:
RUST_LOG=trace uv run examples/path/to/my/example
Writing Pipelines
There are three components of a pipeline.
- First we get the data from the Provider.
- The data we get from the provider depends on the query we use.
- Then we run the steps one by one on the incoming data to transform it.
- The transformed data is written to the output using the writer in the end.
We already covered Provider and Writer in the previous pages so we will cover writing queries and steps in this section.
Example
Here is a simple example pipeline that ingests and decodes erc20 transfers into duckdb: source code
Query
The query is used to specify the data we get from the provider.
It consists of:
- from_block, the block to start from
- to_block (optional), the block to stop at (inclusive).
- request fields
- field selection
Request Fields
The request fields are named after tables, e.g. logs
from cherry_core import ingest
query = ingest.Query(
kind=ingest.QueryKind.EVM,
params=ingest.evm.Query(
from_block=from_block,
to_block=to_block,
logs=[
ingest.evm.LogRequest(
topic0=[cherry_core.evm_signature_to_topic0(dex.event_signature)],
include_blocks=True,
include_transactions=True,
)
],
fields=request_fields,
),
)
This example creates an EVM query looking for logs matching a single LogRequest
.
include_
fields determine if related data should be included. For example, include_blocks=True
means the response will contain the blocks table with the blocks containing the matched logs.
Log Filtering
The core filtering behaviors are:
-
Between multiple
LogRequest
objects: When multipleLogRequest
objects are provided, logs matching ANY of them will be returned (OR logic). -
Between different fields in the same
LogRequest
: When multiple fields are specified (e.g.,address
ANDtopic0
), logs matching ALL criteria will be returned (AND logic). -
Within a single field: When multiple values are provided for one field (e.g., multiple addresses in the
address
field), logs matching ANY of those values will be returned (OR logic).
OR Mode: Separate Log Requests
Use separate log requests to match events that satisfy ANY of the conditions:
logs=[
# Will match USDC Transfer OR WETH Approval
LogRequest(address=[USDC], topic0=[TRANSFER_TOPIC]),
LogRequest(address=[WETH], topic0=[APPROVAL_TOPIC])
]
Results:
┌─────────────┬──────────┐
│ address ┆ topic0 │
╞═════════════╪══════════╡
│ USDC ┆ Transfer │ # Only USDC Transfers
│ WETH ┆ Approval │ # Only WETH Approvals
└─────────────┴──────────┘
CROSS JOIN: Multiple Values in Single Request
Put multiple values in a single request to match ALL combinations:
logs=[
LogRequest(
address=[USDC, WETH],
topic0=[TRANSFER_TOPIC, APPROVAL_TOPIC]
)
]
Results:
┌─────────────┬──────────┐
│ address ┆ topic0 │
╞═════════════╪══════════╡
│ WETH ┆ Approval │ # All possible combinations
│ WETH ┆ Transfer │ # of addresses and topics
│ USDC ┆ Approval │ # will be matched
│ USDC ┆ Transfer │
└─────────────┴──────────┘
Common Use Cases
-
Track specific events from multiple contracts:
logs=[ LogRequest( address=[CONTRACT_A, CONTRACT_B, CONTRACT_C], topic0=[SPECIFIC_EVENT_TOPIC] ) ]
-
Track multiple events from a specific contract:
logs=[ LogRequest( address=[SPECIFIC_CONTRACT], topic0=[EVENT_1_TOPIC, EVENT_2_TOPIC, EVENT_3_TOPIC] ) ]
-
Track specific event combinations:
logs=[ LogRequest(address=[CONTRACT_A], topic0=[EVENT_X_TOPIC]), LogRequest(address=[CONTRACT_B], topic0=[EVENT_Y_TOPIC]) ]
Filtering by Indexed Parameters (topic1, topic2, etc.)
For EVM events, indexed parameters are stored in the topics array (topic1, topic2, etc.). You can filter by these parameters to get more specific results:
# Filter Transfer events where "from" address is specific
from_address = "0x000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" # Padded to 32 bytes
transfer_topic0 = cherry_core.evm_signature_to_topic0("Transfer(address,address,uint256)")
query = ingest.Query(
kind=ingest.QueryKind.EVM,
params=ingest.evm.Query(
from_block=20123123,
to_block=20123223,
logs=[
ingest.evm.LogRequest(
topic0=[transfer_topic0],
topic1=[from_address], # Filter by 'from' address (first indexed parameter)
include_blocks=True,
)
],
fields=ingest.evm.Fields(
# Fields definition...
),
),
)
Important notes about indexed parameters:
- Topic values should be padded to 32 bytes (for addresses, prepend with zeros)
- For addresses, the correct format is:
"0x000000000000000000000000" + address[2:]
- Parameter ordering follows the indexed parameters in the event signature
- Non-indexed parameters are encoded in the
data
field and can't be filtered directly
Field Selection
Query.fields
specifies which columns of each table we want to select.
request_fields = ingest.evm.Fields(
block=ingest.evm.BlockFields(number=True, timestamp=True),
transaction=ingest.evm.TransactionFields(
block_number=True, transaction_index=True, hash=True, from_=True, to=True
),
log=ingest.evm.LogFields(
block_number=True,
transaction_index=True,
log_index=True,
address=True,
data=True,
topic0=True,
topic1=True,
topic2=True,
topic3=True,
),
)
This means we want to get blocks.number
, blocks.timestamp
, transactions.block_number
... fields in the response.
Minimizing the fields we request can make a massive difference in performance and response body sizes, both can drastically effect cost of running infrastructure.
Modern providers like SQD and HyperSync fully utilize this selection so it makes a big difference.
Data Schema
Each chain format has it's own arrow schema. All providers return data according to this schema.
Ethereum (EVM)
All providers return data according to this schema when queried using ingest.evm.Query
.
blocks
Field Name | Data Type | Nullable |
---|---|---|
number | UInt64 | Yes |
hash | Binary | Yes |
parent_hash | Binary | Yes |
nonce | Binary | Yes |
sha3_uncles | Binary | Yes |
logs_bloom | Binary | Yes |
transactions_root | Binary | Yes |
state_root | Binary | Yes |
receipts_root | Binary | Yes |
miner | Binary | Yes |
difficulty | Decimal256(76, 0) | Yes |
total_difficulty | Decimal256(76, 0) | Yes |
extra_data | Binary | Yes |
size | Decimal256(76, 0) | Yes |
gas_limit | Decimal256(76, 0) | Yes |
gas_used | Decimal256(76, 0) | Yes |
timestamp | Decimal256(76, 0) | Yes |
uncles | List(Binary) | Yes |
base_fee_per_gas | Decimal256(76, 0) | Yes |
blob_gas_used | Decimal256(76, 0) | Yes |
excess_blob_gas | Decimal256(76, 0) | Yes |
parent_beacon_block_root | Binary | Yes |
withdrawals_root | Binary | Yes |
withdrawals | List(Struct(index: UInt64, validator_index: UInt64, address: Binary, amount: Decimal256(76, 0))) | Yes |
l1_block_number | UInt64 | Yes |
send_count | Decimal256(76, 0) | Yes |
send_root | Binary | Yes |
mix_hash | Binary | Yes |
transactions
Field Name | Data Type | Nullable |
---|---|---|
block_hash | Binary | Yes |
block_number | UInt64 | Yes |
from | Binary | Yes |
gas | Decimal256(76, 0) | Yes |
gas_price | Decimal256(76, 0) | Yes |
hash | Binary | Yes |
input | Binary | Yes |
nonce | Decimal256(76, 0) | Yes |
to | Binary | Yes |
transaction_index | UInt64 | Yes |
value | Decimal256(76, 0) | Yes |
v | UInt8 | Yes |
r | Binary | Yes |
s | Binary | Yes |
max_priority_fee_per_gas | Decimal256(76, 0) | Yes |
max_fee_per_gas | Decimal256(76, 0) | Yes |
chain_id | Decimal256(76, 0) | Yes |
cumulative_gas_used | Decimal256(76, 0) | Yes |
effective_gas_price | Decimal256(76, 0) | Yes |
gas_used | Decimal256(76, 0) | Yes |
contract_address | Binary | Yes |
logs_bloom | Binary | Yes |
type | UInt8 | Yes |
root | Binary | Yes |
status | UInt8 | Yes |
sighash | Binary | Yes |
y_parity | Boolean | Yes |
access_list | List(Struct(address: Binary, storage_keys: List(Binary))) | Yes |
l1_fee | Decimal256(76, 0) | Yes |
l1_gas_price | Decimal256(76, 0) | Yes |
l1_gas_used | Decimal256(76, 0) | Yes |
l1_fee_scalar | Decimal256(76, 0) | Yes |
gas_used_for_l1 | Decimal256(76, 0) | Yes |
max_fee_per_blob_gas | Decimal256(76, 0) | Yes |
blob_versioned_hashes | List(Binary) | Yes |
deposit_nonce | Decimal256(76, 0) | Yes |
blob_gas_price | Decimal256(76, 0) | Yes |
deposit_receipt_version | Decimal256(76, 0) | Yes |
blob_gas_used | Decimal256(76, 0) | Yes |
l1_base_fee_scalar | Decimal256(76, 0) | Yes |
l1_blob_base_fee | Decimal256(76, 0) | Yes |
l1_blob_base_fee_scalar | Decimal256(76, 0) | Yes |
l1_block_number | UInt64 | Yes |
mint | Decimal256(76, 0) | Yes |
source_hash | Binary | Yes |
logs
Field Name | Data Type | Nullable |
---|---|---|
removed | Boolean | Yes |
log_index | UInt64 | Yes |
transaction_index | UInt64 | Yes |
transaction_hash | Binary | Yes |
block_hash | Binary | Yes |
block_number | UInt64 | Yes |
address | Binary | Yes |
data | Binary | Yes |
topic0 | Binary | Yes |
topic1 | Binary | Yes |
topic2 | Binary | Yes |
topic3 | Binary | Yes |
traces
Field Name | Data Type | Nullable |
---|---|---|
from | Binary | Yes |
to | Binary | Yes |
call_type | Utf8 | Yes |
gas | Decimal256(76, 0) | Yes |
input | Binary | Yes |
init | Binary | Yes |
value | Decimal256(76, 0) | Yes |
author | Binary | Yes |
reward_type | Utf8 | Yes |
block_hash | Binary | Yes |
block_number | UInt64 | Yes |
address | Binary | Yes |
code | Binary | Yes |
gas_used | Decimal256(76, 0) | Yes |
output | Binary | Yes |
subtraces | UInt64 | Yes |
trace_address | List(UInt64) | Yes |
transaction_hash | Binary | Yes |
transaction_position | UInt64 | Yes |
type | Utf8 | Yes |
error | Utf8 | Yes |
sighash | Binary | Yes |
action_address | Binary | Yes |
balance | Decimal256(76, 0) | Yes |
refund_address | Binary | Yes |
Solana (SVM)
All providers return data according to this schema when queried using ingest.svm.Query
.
blocks
Field Name | Data Type | Nullable |
---|---|---|
slot | UInt64 | Yes |
hash | Binary | Yes |
parent_slot | UInt64 | Yes |
parent_hash | Binary | Yes |
height | UInt64 | Yes |
timestamp | Int64 | Yes |
rewards
Field Name | Data Type | Nullable |
---|---|---|
block_slot | UInt64 | Yes |
block_hash | Binary | Yes |
pubkey | Binary | Yes |
lamports | Int64 | Yes |
post_balance | UInt64 | Yes |
reward_type | Utf8 | Yes |
commission | UInt8 | Yes |
token_balances
Field Name | Data Type | Nullable |
---|---|---|
block_slot | UInt64 | Yes |
block_hash | Binary | Yes |
transaction_index | UInt32 | Yes |
account | Binary | Yes |
pre_mint | Binary | Yes |
post_mint | Binary | Yes |
pre_decimals | UInt16 | Yes |
post_decimals | UInt16 | Yes |
pre_program_id | Binary | Yes |
post_program_id | Binary | Yes |
pre_owner | Binary | Yes |
post_owner | Binary | Yes |
pre_amount | UInt64 | Yes |
post_amount | UInt64 | Yes |
balances
Field Name | Data Type | Nullable |
---|---|---|
block_slot | UInt64 | Yes |
block_hash | Binary | Yes |
transaction_index | UInt32 | Yes |
account | Binary | Yes |
pre | UInt64 | Yes |
post | UInt64 | Yes |
logs
Field Name | Data Type | Nullable |
---|---|---|
block_slot | UInt64 | Yes |
block_hash | Binary | Yes |
transaction_index | UInt32 | Yes |
log_index | UInt32 | Yes |
instruction_address | List(UInt32) | Yes |
program_id | Binary | Yes |
kind | Utf8 | Yes |
message | Utf8 | Yes |
transactions
Field Name | Data Type | Nullable |
---|---|---|
block_slot | UInt64 | Yes |
block_hash | Binary | Yes |
transaction_index | UInt32 | Yes |
signature | Binary | Yes |
version | Int8 | Yes |
account_keys | List(Binary) | Yes |
address_table_lookups | List(Struct(account_key: Binary, writable_indexes: List(UInt64), readonly_indexes: List(UInt64))) | Yes |
num_readonly_signed_accounts | UInt32 | Yes |
num_readonly_unsigned_accounts | UInt32 | Yes |
num_required_signatures | UInt32 | Yes |
recent_blockhash | Binary | Yes |
signatures | List(Binary) | Yes |
err | Utf8 | Yes |
fee | UInt64 | Yes |
compute_units_consumed | UInt64 | Yes |
loaded_readonly_addresses | List(Binary) | Yes |
loaded_writable_addresses | List(Binary) | Yes |
fee_payer | Binary | Yes |
has_dropped_log_messages | Boolean | Yes |
instructions
Field Name | Data Type | Nullable |
---|---|---|
block_slot | UInt64 | Yes |
block_hash | Binary | Yes |
transaction_index | UInt32 | Yes |
instruction_address | List(UInt32) | Yes |
program_id | Binary | Yes |
a0 | Binary | Yes |
a1 | Binary | Yes |
a2 | Binary | Yes |
a3 | Binary | Yes |
a4 | Binary | Yes |
a5 | Binary | Yes |
a6 | Binary | Yes |
a7 | Binary | Yes |
a8 | Binary | Yes |
a9 | Binary | Yes |
rest_of_accounts | List(Binary) | Yes |
data | Binary | Yes |
d1 | Binary | Yes |
d2 | Binary | Yes |
d4 | Binary | Yes |
d8 | Binary | Yes |
error | Utf8 | Yes |
compute_units_consumed | UInt64 | Yes |
is_committed | Boolean | Yes |
has_dropped_log_messages | Boolean | Yes |
Steps
Steps are run between the provider and writer to modify the data or do other arbitrary actions.
There are built-in steps that can be configured and there is a mechanism to write custom steps which enables running arbitrary python code on the data as a part of the pipeline.
Base58Encode
Encode all binary fields as solana(bitcoin) style base58
@dataclass
class Base58EncodeConfig:
tables: Optional[list[str]] = None
tables
parameter can be used to specify which tables should be encoded.
All tables are encoded if tables
parameter is left None.
Cast
Cast columns to specified arrow types.
@dataclass
class CastConfig:
table_name: str
mappings: Dict[str, pa.DataType]
allow_decode_fail: bool = False
When allow_decode_fail
is set to True
, this step will write null
if it fails to cast a value instead of erroring out.
Example
Cast transfers.block_timestamp
to int64
type.
from cherry_etl import config as cc
cc.Step(
kind=cc.StepKind.CAST,
config=cc.CastConfig(
table_name="transfers",
mappings={"block_timestamp": pa.int64()},
),
),
CastByType
Cast all columns with the from_type
to to_type
.
@dataclass
class CastByTypeConfig:
from_type: pa.DataType
to_type: pa.DataType
allow_decode_fail: bool = False
When allow_decode_fail
is set to True
, this step will write null
if it fails to cast a value instead of erroring out.
EvmDecodeEvents
Decode evm events using the given signature
@dataclass
class EvmDecodeEventsConfig:
event_signature: str
allow_decode_fail: bool = False
input_table: str = "logs"
output_table: str = "decoded_logs"
hstack: bool = True
allow_decode_fail
controls if the decoder should fail if a row fails to decode or if it should write null instead.
hstack
means to horizontally stack raw logs and decoded logs tables together. So decoded values will have raw log values like
block_number, address, topic0 etc. next to them.
Example
from cherry_etl import config as cc
cc.Step(
kind=cc.StepKind.EVM_DECODE_EVENTS,
config=cc.EvmDecodeEventsConfig(
event_signature="Transfer(address indexed from, address indexed to, uint256 amount)",
output_table="transfers",
),
),
HexEncode
Encode all binary fields as hex strings
@dataclass
class HexEncodeConfig:
tables: Optional[list[str]] = None
prefixed: bool = True
tables
parameter can be used to specify which tables should be encoded.
All tables are encoded if tables
parameter is left None.
prefixed
parameter chooses between prefixed-hex('0x..', eth style) and regular hex encoding.
U256ToBinary
Convert all Decimal256 columns to trimmed and big endian encoded binary values. This binary representation is the same as how it is done in ETH RPC and all eth related tools.
@dataclass
class U256ToBinaryConfig:
tables: Optional[list[str]] = None
tables
parameter can be used to specify which tables should be encoded.
All tables are encoded if tables
parameter is left None.
Appendix
Polars and Decimal types (UInt256)
Note: This section is irrelevant if you are using datasets
since datasets handle this internally.
Polars doesn't properly support Decimal256 type yet so it is not possible to work with ethereum uint256 values natively. We prefer to cast these values to Decimal128 if possible, we cast it to binary or string if it doesn't fit in that.
You can use the built-in CastByType step to cast all Decimal256 columns to Decimal128 like this:
from cherry_etl import config as cc
cc.Step(
kind=cc.StepKind.CAST_BY_TYPE,
config=cc.CastByTypeConfig(
from_type=pa.decimal256(76, 0),
# can also use `pa.string()` as `to_type` here to get string values like '123213'
to_type=pa.decimal128(38, 0),
),
),
Or you can use U256ToBinary step to cast all u256 columns to big endian (eth compatible) encoded binary values
from cherry_etl import config as cc
cc.Step(
kind=cc.StepKind.U256_TO_BINARY,
config=cc.U256ToBinary(),
)
Some output formats also don't support Decimal256 but all of them support Decimal128 so this can be useful if you are having trouble writing Decimal256 to your output.