Bifrost quickstart
This guide takes you from zero to writing and reading prediction data in under 5 minutes.
Prerequisites
Section titled “Prerequisites”scouter-mlinstalled (pip install scouter-ml)- A running Scouter server with gRPC enabled (default port: 50051)
- For reading:
pyarrowinstalled. Optional:polars,pandas.
1. Define your schema
Section titled “1. Define your schema”Your schema is a Pydantic BaseModel. Every field becomes an Arrow column in the underlying Delta Lake table.
from pydantic import BaseModelfrom typing import Optionalfrom datetime import datetime
class PredictionRecord(BaseModel): user_id: str model_name: str prediction: float confidence: float feature_1: float feature_2: float label: Optional[str] = None predicted_at: datetime2. Create a TableConfig
Section titled “2. Create a TableConfig”TableConfig converts your Pydantic model into an Arrow schema and computes a fingerprint for schema enforcement.
from scouter.bifrost import TableConfig
table_config = TableConfig( model=PredictionRecord, #(1) catalog="production", #(2) schema_name="ml", table="credit_predictions", partition_columns=["model_name"], #(3))
print(table_config.fqn) # "production.ml.credit_predictions"print(table_config.fingerprint_str) # "a1b2c3d4..." (32-char hex)- Pass the class, not an instance.
catalog.schema_name.tableforms the fully-qualified table name and determines the Delta Lake storage path.- Optional. These columns are used for server-side partitioning beyond the automatic
scouter_partition_date.
3. Write data
Section titled “3. Write data”from scouter.bifrost import DatasetProducer, WriteConfigfrom scouter import GrpcConfig
producer = DatasetProducer( table_config=table_config, transport=GrpcConfig(server_uri="localhost:50051"), write_config=WriteConfig( #(1) batch_size=1000, scheduled_delay_secs=30, ),)WriteConfigis optional. Defaults:batch_size=1000,scheduled_delay_secs=30.
from datetime import datetime, timezone
for i in range(5000): record = PredictionRecord( user_id=f"user_{i}", model_name="credit_v2", prediction=0.85, confidence=0.92, feature_1=1.23, feature_2=4.56, predicted_at=datetime.now(timezone.utc), ) producer.insert(record) #(1)insert()callsrecord.model_dump_json()and sends the JSON string through a channel. It does not block and returns in under 1 microsecond.
Data is automatically batched and sent to the server when either condition is met:
- The internal queue reaches
batch_size(1000 by default) scheduled_delay_secs(30s by default) have elapsed since the last publish
4. Read data
Section titled “4. Read data”Create a DatasetClient to query your data. The client is bound to a specific table via TableConfig and validates the schema fingerprint on construction.
from scouter.bifrost import DatasetClient
client = DatasetClient( transport=GrpcConfig(server_uri="localhost:50051"), table_config=table_config, #(1))- Reuse the same
TableConfigfrom the write side. The client validates the fingerprint against the server on construction.
Strict read — get Pydantic models back
Section titled “Strict read — get Pydantic models back”records = client.read() #(1)
for record in records[:5]: print(f"{record.user_id}: {record.prediction:.2f} (confidence: {record.confidence:.2f})")- Returns a
list[PredictionRecord]. Each row is validated throughPredictionRecord.model_validate().
SQL query — get DataFrames
Section titled “SQL query — get DataFrames”# Get a QueryResult (Arrow IPC bytes wrapper)result = client.sql("SELECT * FROM production.ml.credit_predictions WHERE confidence > 0.9")
# Convert to your preferred formatarrow_table = result.to_arrow() # pyarrow.Tablepolars_df = result.to_polars() # polars.DataFramepandas_df = result.to_pandas() # pandas.DataFrameThe SQL supports everything DataFusion supports — joins, CTEs, window functions, aggregations:
# Aggregationresult = client.sql(""" SELECT model_name, COUNT(*) as cnt, AVG(confidence) as avg_conf FROM production.ml.credit_predictions GROUP BY model_name""")df = result.to_polars()print(df)5. Shutdown
Section titled “5. Shutdown”producer.shutdown() #(1)- Flushes any remaining data, cancels background tasks, and cleans up. Always call this on application exit.
Unified Client
Section titled “Unified Client”If you need both read and write access to the same table, use Bifrost instead of creating separate DatasetProducer and DatasetClient instances:
from scouter.bifrost import Bifrost, TableConfigfrom scouter import GrpcConfig
bifrost = Bifrost( table_config=TableConfig( model=PredictionRecord, catalog="production", schema_name="ml", table="credit_predictions", ), transport=GrpcConfig(server_uri="localhost:50051"),)
# Writebifrost.insert(record)bifrost.flush()
# Readrecords = bifrost.read()result = bifrost.sql("SELECT * FROM production.ml.credit_predictions WHERE confidence > 0.9")
# Access underlying clients for the full APIbifrost.producer.register()bifrost.client.list_datasets()
bifrost.shutdown()FastAPI Integration
Section titled “FastAPI Integration”The typical pattern for production use:
from contextlib import asynccontextmanager
from fastapi import FastAPI, Requestfrom pydantic import BaseModelfrom scouter.bifrost import DatasetProducer, TableConfig, WriteConfigfrom scouter import GrpcConfig
class PredictionRecord(BaseModel): user_id: str prediction: float model_version: str
@asynccontextmanagerasync def lifespan(app: FastAPI): app.state.producer = DatasetProducer( table_config=TableConfig( model=PredictionRecord, catalog="prod", schema_name="ml", table="predictions", ), transport=GrpcConfig(server_uri="scouter.internal:50051"), ) yield app.state.producer.shutdown()
app = FastAPI(lifespan=lifespan)
class PredictRequest(BaseModel): user_id: str features: dict
@app.post("/predict")def predict(request: Request, payload: PredictRequest): prediction = model.predict(payload.features) # your model
# Non-blocking -- returns immediately request.app.state.producer.insert( PredictionRecord( user_id=payload.user_id, prediction=prediction, model_version="v2.1", ) )
return {"prediction": prediction}Next Steps
Section titled “Next Steps”- Writing Data — batching behavior, backpressure, shutdown patterns
- Reading Data —
DatasetClient,QueryResult, SQL reference - Schema Reference — type mapping, fingerprinting,
TableConfigutilities