Skip to content

Writing data

DatasetProducer is the write-side client for Bifrost. It maintains a persistent background queue that batches Pydantic model instances into Arrow RecordBatches and sends them to the server via gRPC.

from scouter.bifrost import DatasetProducer, TableConfig, WriteConfig
from scouter import GrpcConfig
producer = DatasetProducer(
table_config=TableConfig(
model=MyModel,
catalog="prod",
schema_name="ml",
table="predictions",
),
transport=GrpcConfig(server_uri="scouter.internal:50051"),
write_config=WriteConfig(batch_size=1000, scheduled_delay_secs=30),
)
ParameterTypeRequiredDescription
table_configTableConfigYesSchema, namespace, and fingerprint derived from your Pydantic model
transportGrpcConfigYesgRPC server connection settings
write_configWriteConfigNoBatching and flush timing. Defaults to batch_size=1000, scheduled_delay_secs=30

Controls when data is flushed to the server.

from scouter.dataset import WriteConfig
# Flush every 500 records or every 10 seconds, whichever comes first
config = WriteConfig(batch_size=500, scheduled_delay_secs=10)
ParameterDefaultDescription
batch_size1000Number of records that triggers an immediate flush. Minimum: 1.
scheduled_delay_secs30Maximum seconds between flushes, regardless of queue size

Tuning guidance:

  • High-throughput inference (>1000 req/s): Use batch_size=5000, scheduled_delay_secs=10. Larger batches amortize gRPC overhead.
  • Low-throughput / latency-sensitive: Use batch_size=100, scheduled_delay_secs=5. Data reaches the server faster.
  • Batch jobs: Use batch_size=10000, scheduled_delay_secs=60. Maximize throughput.
producer.insert(record)

insert() calls record.model_dump_json() to serialize the Pydantic model, then sends the JSON string through an unbounded Tokio channel. The call:

  • Does not block
  • Does not perform I/O
  • Does not acquire the GIL for anything beyond the initial model_dump_json() call
  • Returns in under 1 microsecond (after serialization)

The JSON string enters a crossbeam::ArrayQueue (capacity = batch_size * 2). When the queue reaches batch_size, the event handler triggers an immediate publish cycle:

  1. Drain queue into a Vec<String>
  2. Build an Arrow RecordBatch via DynamicBatchBuilder
  3. Inject system columns (scouter_created_at, scouter_partition_date, scouter_batch_id)
  4. Serialize to Arrow IPC bytes
  5. Send via insert_batch gRPC call

If the internal queue is full (producer is inserting faster than the gRPC client can flush), insert() retries with exponential backoff:

RetryDelay
1100ms
2200ms
3400ms

After 3 retries, the insert raises an error. This is a signal that either:

  • batch_size is too small for your throughput
  • The server is unreachable or slow
  • Network latency is high
producer.flush()

Sends a flush signal through the event channel. The event handler will publish whatever is in the queue, even if batch_size hasn’t been reached. This is non-blocking — it signals intent, it doesn’t wait for completion.

Use flush() when you need data to reach the server before a specific point (e.g., end of a batch job, before a model swap).

producer.shutdown()

Graceful shutdown sequence:

  1. Sends a Flush event through the channel
  2. Waits 250ms for the event handler to process it
  3. Cancels the event handler task
  4. Waits 250ms for in-flight gRPC calls to complete
  5. Aborts the event handler
  6. Cancels and aborts the background flush task
  7. Drops the channel sender

After shutdown(), calling insert() or flush() raises AlreadyShutdown.

status = producer.register()

Explicitly registers the dataset table with the server. This is optional — the producer auto-registers on the first flush. Explicit registration is useful for:

  • Startup validation: Verify the server is reachable and the schema is accepted
  • Schema conflict detection: If a table with the same name but different schema exists, registration fails immediately rather than on first flush

The server verifies the schema fingerprint. If the table already exists with the same fingerprint, registration succeeds. If the fingerprint differs, it returns an error.

PropertyTypeDescription
fingerprintstr32-char hex schema fingerprint
namespacestrFully-qualified name (catalog.schema.table)
is_registeredboolWhether the dataset has been registered with the server