Skip to content

SporeDB API Reference

Core

sporedb.SporeDB

SporeDB(data_root: str | Path = './sporedb_data', *, endpoint: str | None = None, api_key: str | None = None)

Primary entry point for SporeDB operations.

Composes storage, ingestion, analytics, export, and query layers behind a single high-level API so scientists never interact with internal store objects directly.

Parameters:

  • data_root (str | Path, default: './sporedb_data' ) –

    Path to local data directory. Defaults to "./sporedb_data".

  • endpoint (str | None, default: None ) –

    Cloud API endpoint URL. If provided, operates in cloud mode.

  • api_key (str | None, default: None ) –

    API key for cloud authentication. Required when endpoint is set.

Raises:

  • ValueError

    If endpoint is provided without api_key.

Example

with SporeDB("./my_data") as db: ... batch = db.create_batch("CHO-Run-001", strain="CHO-K1") ... result = db.import_csv("telemetry.csv", "CHO-Run-001") ... df = db.get_telemetry(result.batch_id)

Source code in src/sporedb/client.py
def __init__(
    self,
    data_root: str | Path = "./sporedb_data",
    *,
    endpoint: str | None = None,
    api_key: str | None = None,
) -> None:
    self._engine: StorageEngine | None
    self._batches: BatchStore | None
    self._timeseries: TimeSeriesStore | None
    if endpoint is not None:
        # Cloud mode -- lazy import to avoid httpx dependency when local-only
        from sporedb.cloud_client import CloudClient

        if api_key is None:
            raise ValueError(
                "api_key is required when using cloud mode (endpoint=...)"
            )
        self._cloud: CloudClient | None = CloudClient(endpoint, api_key)
        self._engine = None
        self._batches = None
        self._timeseries = None
    else:
        self._cloud = None
        self._engine = StorageEngine(data_root)
        self._batches = BatchStore(self._engine)
        self._timeseries = TimeSeriesStore(self._engine)

is_cloud property

is_cloud: bool

Return True if this instance delegates to the cloud tier.

align

align(batch_ids: list[UUID], signal: str = 'OD600') -> DataFrame

Align multiple batch runs by phase boundary for comparison.

Detects phases for each batch, then aligns them by elapsed time from the exponential phase boundary.

Parameters:

  • batch_ids (list[UUID]) –

    List of batch UUIDs to align.

  • signal (str, default: 'OD600' ) –

    Telemetry variable used for phase detection and alignment. Defaults to "OD600".

Returns:

  • A ( DataFrame ) –

    class:pandas.DataFrame with aligned time-series data indexed

  • DataFrame

    by elapsed hours from the exponential phase boundary.

Source code in src/sporedb/client.py
def align(
    self,
    batch_ids: list[UUID],
    signal: str = "OD600",
) -> pd.DataFrame:
    """Align multiple batch runs by phase boundary for comparison.

    Detects phases for each batch, then aligns them by elapsed time
    from the exponential phase boundary.

    Args:
        batch_ids: List of batch UUIDs to align.
        signal: Telemetry variable used for phase detection and alignment.
            Defaults to ``"OD600"``.

    Returns:
        A :class:`pandas.DataFrame` with aligned time-series data indexed
        by elapsed hours from the exponential phase boundary.
    """
    if self._cloud is not None:
        return self._cloud.align(batch_ids, signal=signal)
    from sporedb.analytics.alignment import align as _align
    from sporedb.analytics.models import PhaseType

    batches: dict[str, pd.DataFrame] = {}
    phase_annotations: dict[str, list[Any]] = {}
    for bid in batch_ids:
        df = self.get_telemetry(bid)
        batches[str(bid)] = df
        phases = self.detect_phases(bid, signal=signal)
        phase_annotations[str(bid)] = phases

    return _align(
        batches,
        phase_annotations,
        anchor_phase=PhaseType.EXPONENTIAL,
        variables=[signal],
    )

close

close() -> None

Close the underlying storage engine or cloud client.

Source code in src/sporedb/client.py
def close(self) -> None:
    """Close the underlying storage engine or cloud client."""
    if self._cloud is not None:
        self._cloud.close()
    elif self._engine is not None:
        self._engine.close()

compute_metrics

compute_metrics(batch_id: UUID) -> list[BatchMetrics]

Compute derived bioprocess metrics for a batch.

Runs phase detection first, then calculates kinetic parameters (growth rate, productivity, yields) for each detected phase.

Parameters:

  • batch_id (UUID) –

    UUID of the batch to analyze.

Returns:

  • list[BatchMetrics]

    A list of :class:BatchMetrics, one per detected phase.

Source code in src/sporedb/client.py
def compute_metrics(
    self,
    batch_id: UUID,
) -> list[BatchMetrics]:
    """Compute derived bioprocess metrics for a batch.

    Runs phase detection first, then calculates kinetic parameters
    (growth rate, productivity, yields) for each detected phase.

    Args:
        batch_id: UUID of the batch to analyze.

    Returns:
        A list of :class:`BatchMetrics`, one per detected phase.
    """
    from sporedb.analytics.metrics import compute_batch_metrics

    df = self.get_telemetry(batch_id)
    phases = self.detect_phases(batch_id)
    return compute_batch_metrics(df, phases, batch_id)

create_batch

create_batch(name: str, *, strain: str | None = None, media: str | None = None, scale_liters: float | None = None, operator: str | None = None, tags: list[str] | None = None, inoculation: datetime | None = None) -> Batch

Create a new batch.

Constructs a Batch model internally from keyword arguments and persists it via the batch store.

Parameters:

  • name (str) –

    Human-readable batch identifier (e.g. "CHO-Run-001").

  • strain (str | None, default: None ) –

    Organism strain name.

  • media (str | None, default: None ) –

    Growth media description.

  • scale_liters (float | None, default: None ) –

    Bioreactor working volume in liters.

  • operator (str | None, default: None ) –

    Name of the operator running the batch.

  • tags (list[str] | None, default: None ) –

    Optional list of free-form tags for categorization.

  • inoculation (datetime | None, default: None ) –

    Inoculation timestamp (timezone-aware).

Returns:

  • Batch

    The newly created :class:Batch with a generated batch_id.

Example

batch = db.create_batch( ... "CHO-Run-001", strain="CHO-K1", scale_liters=5.0 ... )

Source code in src/sporedb/client.py
def create_batch(
    self,
    name: str,
    *,
    strain: str | None = None,
    media: str | None = None,
    scale_liters: float | None = None,
    operator: str | None = None,
    tags: list[str] | None = None,
    inoculation: datetime | None = None,
) -> Batch:
    """Create a new batch.

    Constructs a ``Batch`` model internally from keyword arguments and
    persists it via the batch store.

    Args:
        name: Human-readable batch identifier (e.g. ``"CHO-Run-001"``).
        strain: Organism strain name.
        media: Growth media description.
        scale_liters: Bioreactor working volume in liters.
        operator: Name of the operator running the batch.
        tags: Optional list of free-form tags for categorization.
        inoculation: Inoculation timestamp (timezone-aware).

    Returns:
        The newly created :class:`Batch` with a generated ``batch_id``.

    Example:
        >>> batch = db.create_batch(
        ...     "CHO-Run-001", strain="CHO-K1", scale_liters=5.0
        ... )
    """
    if self._cloud is not None:
        return self._cloud.create_batch(
            name,
            strain=strain,
            media=media,
            scale_liters=scale_liters,
            operator=operator,
            tags=tags,
            inoculation=inoculation,
        )
    batch = Batch(
        name=name,
        lifecycle=BatchLifecycle.INOCULATED,
        timestamps=CanonicalTimestamps(inoculation=inoculation),
        metadata=BatchMetadata(
            strain=strain,
            media=media,
            scale_liters=scale_liters,
            operator=operator,
        ),
        tags=tags or [],
    )
    assert self._batches is not None
    return self._batches.create_batch(batch)

create_golden_profile

create_golden_profile(batch_ids: list[UUID], variables: list[str], signal: str = 'OD600', metadata: dict[str, Any] | None = None) -> GoldenBatchProfile

Create a golden batch reference profile from aligned runs.

Aligns the given batches and computes mean/std trajectories across the specified variables.

Parameters:

  • batch_ids (list[UUID]) –

    UUIDs of the reference batches to include.

  • variables (list[str]) –

    Telemetry variable names to include in the profile.

  • signal (str, default: 'OD600' ) –

    Variable used for phase-based alignment. Defaults to "OD600".

  • metadata (dict[str, Any] | None, default: None ) –

    Optional metadata dict stored with the profile.

Returns:

Raises:

  • NotImplementedError

    If called in cloud mode.

Source code in src/sporedb/client.py
def create_golden_profile(
    self,
    batch_ids: list[UUID],
    variables: list[str],
    signal: str = "OD600",
    metadata: dict[str, Any] | None = None,
) -> GoldenBatchProfile:
    """Create a golden batch reference profile from aligned runs.

    Aligns the given batches and computes mean/std trajectories
    across the specified variables.

    Args:
        batch_ids: UUIDs of the reference batches to include.
        variables: Telemetry variable names to include in the profile.
        signal: Variable used for phase-based alignment. Defaults to ``"OD600"``.
        metadata: Optional metadata dict stored with the profile.

    Returns:
        A :class:`GoldenBatchProfile` with mean and standard deviation
        trajectories.

    Raises:
        NotImplementedError: If called in cloud mode.
    """
    if self._cloud is not None:
        raise NotImplementedError(
            "Golden batch profiling not yet supported in cloud mode"
        )
    from sporedb.analytics.golden_batch import (
        create_golden_profile as _create_golden_profile,
    )

    aligned_df = self.align(batch_ids, signal=signal)
    batch_names = [str(bid) for bid in batch_ids]
    return _create_golden_profile(
        aligned_df, batch_names, variables, metadata=metadata
    )

delete_batch

delete_batch(batch_id: UUID) -> bool

Delete a batch.

Parameters:

  • batch_id (UUID) –

    UUID of the batch to delete.

Returns:

  • bool

    True if the batch existed and was deleted, False otherwise.

Source code in src/sporedb/client.py
def delete_batch(self, batch_id: UUID) -> bool:
    """Delete a batch.

    Args:
        batch_id: UUID of the batch to delete.

    Returns:
        ``True`` if the batch existed and was deleted, ``False`` otherwise.
    """
    if self._cloud is not None:
        return self._cloud.delete_batch(batch_id)
    assert self._batches is not None
    return self._batches.delete_batch(batch_id)

detect_phases

detect_phases(batch_id: UUID, signal: str = 'OD600', min_size: int = 10) -> list[PhaseAnnotation]

Run PELT changepoint detection on a batch's telemetry.

Uses the ruptures library with an RBF kernel cost function to identify changepoints in the specified signal and classify the resulting segments as growth phases.

Parameters:

  • batch_id (UUID) –

    UUID of the batch to analyze.

  • signal (str, default: 'OD600' ) –

    Telemetry variable to analyze. Defaults to "OD600".

  • min_size (int, default: 10 ) –

    Minimum segment length for PELT. Defaults to 10.

Returns:

  • list[PhaseAnnotation]

    A list of :class:PhaseAnnotation objects describing the

  • list[PhaseAnnotation]

    detected growth phases (lag, exponential, stationary, decline).

Example

phases = db.detect_phases(batch_id) for p in phases: ... print(f"{p.phase_type.value}: {p.start_ts} - {p.end_ts}")

Source code in src/sporedb/client.py
def detect_phases(
    self,
    batch_id: UUID,
    signal: str = "OD600",
    min_size: int = 10,
) -> list[PhaseAnnotation]:
    """Run PELT changepoint detection on a batch's telemetry.

    Uses the ``ruptures`` library with an RBF kernel cost function to
    identify changepoints in the specified signal and classify the
    resulting segments as growth phases.

    Args:
        batch_id: UUID of the batch to analyze.
        signal: Telemetry variable to analyze. Defaults to ``"OD600"``.
        min_size: Minimum segment length for PELT. Defaults to ``10``.

    Returns:
        A list of :class:`PhaseAnnotation` objects describing the
        detected growth phases (lag, exponential, stationary, decline).

    Example:
        >>> phases = db.detect_phases(batch_id)
        >>> for p in phases:
        ...     print(f"{p.phase_type.value}: {p.start_ts} - {p.end_ts}")
    """
    if self._cloud is not None:
        return self._cloud.detect_phases(batch_id, signal=signal, min_size=min_size)
    from sporedb.analytics.detector import PhaseDetector
    from sporedb.analytics.models import DetectionConfig
    from sporedb.analytics.phase_store import PhaseStore

    df = self.get_telemetry(batch_id)
    detector = PhaseDetector(
        DetectionConfig(signal_variable=signal, min_size=min_size)
    )
    annotations = detector.detect(df, batch_id)
    # Persist detected phases via PhaseStore
    assert self._engine is not None
    PhaseStore(self._engine).save_phases(batch_id, annotations)
    return annotations

detect_phases_online

detect_phases_online(batch_id: UUID, signal: str = 'OD600', *, hazard_rate: float = 1 / 250, threshold: float = 0.5) -> list[PhaseAnnotation]

Run Bayesian Online Changepoint Detection on a batch.

Uses BOCPD (Adams & MacKay 2007) for real-time / streaming-style phase detection. Results are persisted via PhaseStore.

Parameters:

  • batch_id (UUID) –

    UUID of the batch to analyze.

  • signal (str, default: 'OD600' ) –

    Telemetry variable to analyze. Defaults to "OD600".

  • hazard_rate (float, default: 1 / 250 ) –

    Prior probability of a changepoint at each step. Defaults to 1/250.

  • threshold (float, default: 0.5 ) –

    Posterior probability threshold for declaring a changepoint. Defaults to 0.5.

Returns:

Raises:

  • NotImplementedError

    If called in cloud mode.

Source code in src/sporedb/client.py
def detect_phases_online(
    self,
    batch_id: UUID,
    signal: str = "OD600",
    *,
    hazard_rate: float = 1 / 250,
    threshold: float = 0.5,
) -> list[PhaseAnnotation]:
    """Run Bayesian Online Changepoint Detection on a batch.

    Uses BOCPD (Adams & MacKay 2007) for real-time / streaming-style
    phase detection. Results are persisted via PhaseStore.

    Args:
        batch_id: UUID of the batch to analyze.
        signal: Telemetry variable to analyze. Defaults to ``"OD600"``.
        hazard_rate: Prior probability of a changepoint at each step.
            Defaults to ``1/250``.
        threshold: Posterior probability threshold for declaring a
            changepoint. Defaults to ``0.5``.

    Returns:
        A list of :class:`PhaseAnnotation` objects.

    Raises:
        NotImplementedError: If called in cloud mode.
    """
    if self._cloud is not None:
        raise NotImplementedError(
            "Online phase detection not yet supported in cloud mode"
        )
    from sporedb.analytics.bocpd import BOCPDDetector
    from sporedb.analytics.models import BOCPDConfig
    from sporedb.analytics.phase_store import PhaseStore

    config = BOCPDConfig(
        signal_variable=signal,
        hazard_rate=hazard_rate,
        threshold=threshold,
    )
    detector = BOCPDDetector(config)
    df = self.get_telemetry(batch_id)
    annotations = detector.detect_batch(df, batch_id)
    assert self._engine is not None
    PhaseStore(self._engine).save_phases(batch_id, annotations)
    return annotations

export

export(batch_id: UUID, format: str = 'csv', output_path: str | Path | None = None) -> bytes | None

Export batch data in the specified format.

Parameters:

  • batch_id (UUID) –

    Batch to export.

  • format (str, default: 'csv' ) –

    "csv", "parquet", or "arrow".

  • output_path (str | Path | None, default: None ) –

    If given, write to file and return None.

Returns:

  • bytes | None

    Serialized bytes, or None when output_path is provided.

Source code in src/sporedb/client.py
def export(
    self,
    batch_id: UUID,
    format: str = "csv",
    output_path: str | Path | None = None,
) -> bytes | None:
    """Export batch data in the specified format.

    Args:
        batch_id: Batch to export.
        format: ``"csv"``, ``"parquet"``, or ``"arrow"``.
        output_path: If given, write to file and return ``None``.

    Returns:
        Serialized bytes, or ``None`` when *output_path* is provided.
    """
    if self._cloud is not None:
        raise NotImplementedError("Export not yet supported in cloud mode")
    from sporedb.export import export_batch

    assert self._engine is not None
    return export_batch(
        batch_id,
        self._engine,
        format=format,
        output_path=Path(output_path) if output_path else None,
    )

get_assay

get_assay(batch_id: UUID) -> DataFrame

Return assay measurements for a batch as a pandas DataFrame.

Parameters:

  • batch_id (UUID) –

    UUID of the batch to retrieve assay data for.

Returns:

  • A ( DataFrame ) –

    class:pandas.DataFrame with assay measurement rows.

Source code in src/sporedb/client.py
def get_assay(self, batch_id: UUID) -> pd.DataFrame:
    """Return assay measurements for a batch as a pandas DataFrame.

    Args:
        batch_id: UUID of the batch to retrieve assay data for.

    Returns:
        A :class:`pandas.DataFrame` with assay measurement rows.
    """
    if self._cloud is not None:
        return self._cloud.get_assay(batch_id)
    assert self._timeseries is not None
    return self._timeseries.get_assay(batch_id)

get_batch

get_batch(batch_id: UUID) -> Batch | None

Retrieve a batch by its ID, or None if not found.

Parameters:

  • batch_id (UUID) –

    UUID of the batch to retrieve.

Returns:

  • The ( Batch | None ) –

    class:Batch if found, otherwise None.

Source code in src/sporedb/client.py
def get_batch(self, batch_id: UUID) -> Batch | None:
    """Retrieve a batch by its ID, or ``None`` if not found.

    Args:
        batch_id: UUID of the batch to retrieve.

    Returns:
        The :class:`Batch` if found, otherwise ``None``.
    """
    if self._cloud is not None:
        return self._cloud.get_batch(batch_id)
    assert self._batches is not None
    return self._batches.get_batch(batch_id)

get_telemetry

get_telemetry(batch_id: UUID) -> DataFrame

Return telemetry data for a batch as a pandas DataFrame.

Parameters:

  • batch_id (UUID) –

    UUID of the batch to retrieve telemetry for.

Returns:

  • A ( DataFrame ) –

    class:pandas.DataFrame with columns ts, variable,

  • DataFrame

    value, and unit.

Source code in src/sporedb/client.py
def get_telemetry(self, batch_id: UUID) -> pd.DataFrame:
    """Return telemetry data for a batch as a pandas DataFrame.

    Args:
        batch_id: UUID of the batch to retrieve telemetry for.

    Returns:
        A :class:`pandas.DataFrame` with columns ``ts``, ``variable``,
        ``value``, and ``unit``.
    """
    if self._cloud is not None:
        return self._cloud.get_telemetry(batch_id)
    assert self._timeseries is not None
    return self._timeseries.get_telemetry(batch_id)

get_unified_view

get_unified_view(batch_id: UUID) -> DataFrame

Return combined telemetry + assay data for a batch.

Parameters:

  • batch_id (UUID) –

    UUID of the batch.

Returns:

  • A ( DataFrame ) –

    class:pandas.DataFrame with telemetry and assay data merged

  • DataFrame

    via an ASOF JOIN on timestamp.

Raises:

  • NotImplementedError

    If called in cloud mode.

Source code in src/sporedb/client.py
def get_unified_view(self, batch_id: UUID) -> pd.DataFrame:
    """Return combined telemetry + assay data for a batch.

    Args:
        batch_id: UUID of the batch.

    Returns:
        A :class:`pandas.DataFrame` with telemetry and assay data merged
        via an ASOF JOIN on timestamp.

    Raises:
        NotImplementedError: If called in cloud mode.
    """
    if self._cloud is not None:
        raise NotImplementedError("Unified view not yet supported in cloud mode")
    assert self._timeseries is not None
    return self._timeseries.get_unified_view(batch_id)

import_csv

import_csv(file_path: str | Path, batch_name: str, inoculation_ts: datetime | None = None) -> ImportResult

Import a CSV file into SporeDB.

Creates a batch automatically and returns an :class:ImportResult.

Parameters:

  • file_path (str | Path) –

    Path to the CSV file on disk.

  • batch_name (str) –

    Human-readable name for the new batch.

  • inoculation_ts (datetime | None, default: None ) –

    Optional inoculation timestamp (timezone-aware).

Returns:

  • An ( ImportResult ) –

    class:ImportResult with row count, column mappings, and timing.

Raises:

  • NotImplementedError

    If called in cloud mode.

Example

result = db.import_csv("telemetry.csv", "CHO-Run-001") print(f"Imported {result.rows_imported} rows")

Source code in src/sporedb/client.py
def import_csv(
    self,
    file_path: str | Path,
    batch_name: str,
    inoculation_ts: datetime | None = None,
) -> ImportResult:
    """Import a CSV file into SporeDB.

    Creates a batch automatically and returns an :class:`ImportResult`.

    Args:
        file_path: Path to the CSV file on disk.
        batch_name: Human-readable name for the new batch.
        inoculation_ts: Optional inoculation timestamp (timezone-aware).

    Returns:
        An :class:`ImportResult` with row count, column mappings, and timing.

    Raises:
        NotImplementedError: If called in cloud mode.

    Example:
        >>> result = db.import_csv("telemetry.csv", "CHO-Run-001")
        >>> print(f"Imported {result.rows_imported} rows")
    """
    if self._cloud is not None:
        raise NotImplementedError("CSV import not yet supported in cloud mode")
    from sporedb.ingestion.csv_reader import import_csv as _import_csv

    assert self._engine is not None
    return _import_csv(
        Path(file_path),
        batch_name,
        self._engine,
        inoculation_ts=inoculation_ts,
    )

import_excel

import_excel(file_path: str | Path, batch_name: str, inoculation_ts: datetime | None = None) -> ImportResult | list[ImportResult]

Import an Excel file into SporeDB.

Creates a batch automatically and returns an :class:ImportResult (or a list when multiple sheets are present).

Parameters:

  • file_path (str | Path) –

    Path to the Excel file on disk.

  • batch_name (str) –

    Human-readable name for the new batch.

  • inoculation_ts (datetime | None, default: None ) –

    Optional inoculation timestamp (timezone-aware).

Returns:

Raises:

  • NotImplementedError

    If called in cloud mode.

Source code in src/sporedb/client.py
def import_excel(
    self,
    file_path: str | Path,
    batch_name: str,
    inoculation_ts: datetime | None = None,
) -> ImportResult | list[ImportResult]:
    """Import an Excel file into SporeDB.

    Creates a batch automatically and returns an :class:`ImportResult`
    (or a list when multiple sheets are present).

    Args:
        file_path: Path to the Excel file on disk.
        batch_name: Human-readable name for the new batch.
        inoculation_ts: Optional inoculation timestamp (timezone-aware).

    Returns:
        An :class:`ImportResult` for single-sheet files, or a list of
        :class:`ImportResult` when the workbook contains multiple sheets.

    Raises:
        NotImplementedError: If called in cloud mode.
    """
    if self._cloud is not None:
        raise NotImplementedError("Excel import not yet supported in cloud mode")
    from sporedb.ingestion.excel_reader import import_excel as _import_excel

    assert self._engine is not None
    return _import_excel(
        Path(file_path),
        batch_name,
        self._engine,
        inoculation_ts=inoculation_ts,
    )

list_batches

list_batches() -> list[Batch]

Return all batches.

Returns:

  • list[Batch]

    A list of all :class:Batch records in the store.

Source code in src/sporedb/client.py
def list_batches(self) -> list[Batch]:
    """Return all batches.

    Returns:
        A list of all :class:`Batch` records in the store.
    """
    if self._cloud is not None:
        return self._cloud.list_batches()
    assert self._batches is not None
    return self._batches.list_batches()

predict_pat

predict_pat(batch_id: UUID, sensor: SoftSensor) -> DataFrame

Run a PAT soft-sensor and return predictions merged with telemetry.

Retrieves telemetry, extracts the sensor's input variables, calls sensor.predict(), and returns the original telemetry DataFrame with predicted rows appended.

Parameters:

  • batch_id (UUID) –

    UUID of the batch to predict on.

  • sensor (SoftSensor) –

    A :class:SoftSensor model instance.

Returns:

  • A ( DataFrame ) –

    class:pandas.DataFrame combining original telemetry with

  • DataFrame

    predicted values.

Raises:

  • NotImplementedError

    If called in cloud mode.

Source code in src/sporedb/client.py
def predict_pat(
    self,
    batch_id: UUID,
    sensor: SoftSensor,
) -> pd.DataFrame:
    """Run a PAT soft-sensor and return predictions merged with telemetry.

    Retrieves telemetry, extracts the sensor's input variables,
    calls ``sensor.predict()``, and returns the original telemetry
    DataFrame with predicted rows appended.

    Args:
        batch_id: UUID of the batch to predict on.
        sensor: A :class:`SoftSensor` model instance.

    Returns:
        A :class:`pandas.DataFrame` combining original telemetry with
        predicted values.

    Raises:
        NotImplementedError: If called in cloud mode.
    """
    if self._cloud is not None:
        raise NotImplementedError("PAT prediction not yet supported in cloud mode")
    from sporedb.analytics.pat import apply_soft_sensor

    df = self.get_telemetry(batch_id)
    predictions = apply_soft_sensor(sensor, df)
    return pd.concat([df, predictions], ignore_index=True)

query

query(dsl_query: str) -> DataFrame

Execute a bioprocess DSL query and return results as a DataFrame.

The query string is parsed via the Lark-based grammar, compiled to parameterized DuckDB SQL, and executed against the storage engine.

Parameters:

  • dsl_query (str) –

    A SporeDB DSL query string (PromQL-style syntax).

Returns:

  • A ( DataFrame ) –

    class:pandas.DataFrame with the query results.

Example

df = db.query("SELECT OD600 FROM batch WHERE name = 'CHO-Run-001'")

Source code in src/sporedb/client.py
def query(self, dsl_query: str) -> pd.DataFrame:
    """Execute a bioprocess DSL query and return results as a DataFrame.

    The query string is parsed via the Lark-based grammar, compiled to
    parameterized DuckDB SQL, and executed against the storage engine.

    Args:
        dsl_query: A SporeDB DSL query string (PromQL-style syntax).

    Returns:
        A :class:`pandas.DataFrame` with the query results.

    Example:
        >>> df = db.query("SELECT OD600 FROM batch WHERE name = 'CHO-Run-001'")
    """
    if self._cloud is not None:
        return self._cloud.query(dsl_query)
    from sporedb.query import DuckDBCompiler, parse_query

    ast = parse_query(dsl_query)
    compiler = DuckDBCompiler()
    sql, params = compiler.compile(ast)
    assert self._engine is not None
    return self._engine.con.execute(sql, params).fetchdf()

score_batch

score_batch(profile: GoldenBatchProfile, batch_id: UUID) -> BatchScore

Score a batch against a golden batch profile.

Parameters:

  • profile (GoldenBatchProfile) –

    The :class:GoldenBatchProfile to compare against.

  • batch_id (UUID) –

    UUID of the batch to score.

Returns:

  • A ( BatchScore ) –

    class:BatchScore with a 0--100 similarity score

  • BatchScore

    derived from Dynamic Time Warping distance.

Raises:

  • NotImplementedError

    If called in cloud mode.

Source code in src/sporedb/client.py
def score_batch(
    self,
    profile: GoldenBatchProfile,
    batch_id: UUID,
) -> BatchScore:
    """Score a batch against a golden batch profile.

    Args:
        profile: The :class:`GoldenBatchProfile` to compare against.
        batch_id: UUID of the batch to score.

    Returns:
        A :class:`BatchScore` with a 0--100 similarity score
        derived from Dynamic Time Warping distance.

    Raises:
        NotImplementedError: If called in cloud mode.
    """
    if self._cloud is not None:
        raise NotImplementedError("Batch scoring not yet supported in cloud mode")
    from sporedb.analytics.golden_batch import (
        extract_batch_trajectory,
        score_against_profile,
    )

    df = self.get_telemetry(batch_id)
    trajectory = extract_batch_trajectory(df, profile.variables)
    return score_against_profile(profile, trajectory, batch_id)

Data Models

sporedb.Batch

Bases: BaseModel

A fermentation batch record.

Represents a single bioreactor run with its metadata, lifecycle state, canonical timestamps, and tags.

Attributes:

  • batch_id (UUID) –

    UUIDv7 identifier (auto-generated if not provided).

  • name (str) –

    Human-readable batch name (e.g. "CHO-Run-001").

  • lifecycle (BatchLifecycle) –

    Current lifecycle state. Defaults to PLANNED.

  • timestamps (CanonicalTimestamps) –

    Canonical timestamps for key events.

  • metadata (BatchMetadata) –

    Strain, media, scale, and operator metadata.

  • tags (list[str]) –

    Free-form tags for categorization and filtering.

  • created_at (datetime) –

    Creation timestamp (auto-set to current UTC time).

  • updated_at (datetime) –

    Last-modified timestamp (auto-set to current UTC time).

Example

from sporedb.models.batch import Batch batch = Batch(name="CHO-Run-001") print(batch.batch_id)

sporedb.BatchMetadata

Bases: BaseModel

Metadata describing the conditions of a fermentation batch.

Attributes:

  • strain (str | None) –

    Organism strain name (e.g. "CHO-K1", "E. coli BL21").

  • media (str | None) –

    Growth media description (e.g. "DMEM + 10% FBS").

  • scale_liters (float | None) –

    Bioreactor working volume in liters.

  • operator (str | None) –

    Name of the operator running the batch.

  • extra (dict[str, str | int | float | bool]) –

    Additional key-value metadata. Values must be scalar types.

sporedb.BatchLifecycle

Bases: StrEnum

Lifecycle states for a fermentation batch.

A batch progresses through these states from planning to completion: PLANNED -> INOCULATED -> RUNNING -> HARVESTED (or ABORTED).

sporedb.CanonicalTimestamps

Bases: BaseModel

Key timestamps in a fermentation batch lifecycle.

Attributes:

  • inoculation (datetime | None) –

    When the bioreactor was inoculated.

  • feed_start (datetime | None) –

    When feed addition began (fed-batch).

  • induction (datetime | None) –

    When gene expression was induced.

  • harvest (datetime | None) –

    When the batch was harvested.

sporedb.ImportResult

Bases: BaseModel

Result of a data import operation.

Returned by :meth:~sporedb.SporeDB.import_csv and :meth:~sporedb.SporeDB.import_excel to report import statistics.

Attributes:

  • batch_id (UUID) –

    UUID of the batch that was created or updated.

  • rows_imported (int) –

    Total number of rows successfully imported.

  • columns_mapped (dict[str, str]) –

    Mapping of source column names to SporeDB variable names.

  • units_converted (dict[str, tuple[str, str]]) –

    Mapping of variable names to (source_unit, target_unit) tuples where unit conversion was applied.

  • warnings (list[str]) –

    List of warning messages generated during import.

  • elapsed_seconds (float) –

    Wall-clock time for the import operation.

Example

result = db.import_csv("telemetry.csv", "CHO-Run-001") print( ... f"Imported {result.rows_imported} rows in {result.elapsed_seconds:.2f}s" ... )

sporedb.TelemetryRecord

Bases: BaseModel

A single telemetry data point from a bioreactor sensor.

Represents one time-stamped measurement from an online sensor (e.g. dissolved oxygen, pH, temperature, optical density).

Attributes:

  • batch_id (UUID) –

    UUID of the batch this record belongs to.

  • ts (datetime) –

    Measurement timestamp (must be timezone-aware).

  • variable (str) –

    Sensor variable name (e.g. "OD600", "dissolved_oxygen").

  • value (float) –

    Measured value.

  • unit (str | None) –

    Unit of measurement (e.g. "%", "deg_C").

Raises:

  • ValueError

    If ts is not timezone-aware.

Phase Detection

sporedb.DetectionConfig

Bases: BaseModel

Configuration for changepoint detection algorithms.

Controls PELT algorithm parameters used by :class:~sporedb.SporeDB.detect_phases.

Attributes:

  • signal_variable (str) –

    Telemetry variable to analyze. Defaults to "OD600".

  • kernel (str) –

    Cost function kernel for ruptures. Defaults to "rbf".

  • min_size (int) –

    Minimum segment length. Defaults to 10.

  • penalty (float | None) –

    Penalty value for PELT. Auto-calibrated if None.

  • smoothing_window (int) –

    Rolling average window applied before detection. Defaults to 5.

Example

from sporedb.analytics.models import DetectionConfig config = DetectionConfig(signal_variable="pH", min_size=20)

sporedb.PhaseAnnotation

Bases: BaseModel

A detected or manually annotated phase boundary in a batch run.

Attributes:

  • annotation_id (UUID) –

    UUIDv7 identifier (auto-generated).

  • batch_id (UUID) –

    UUID of the batch this annotation belongs to.

  • phase_type (PhaseType) –

    The :class:PhaseType of this segment.

  • start_ts (datetime) –

    Start timestamp of the phase (must be timezone-aware).

  • end_ts (datetime) –

    End timestamp of the phase (must be timezone-aware).

  • signal_variable (str) –

    The telemetry variable that was analyzed.

  • confidence (float) –

    Detection confidence score (0.0 to 1.0). Defaults to 0.0.

  • metadata (dict[str, object]) –

    Additional metadata (e.g. algorithm parameters).

Raises:

  • ValueError

    If start_ts or end_ts is not timezone-aware.

sporedb.PhaseType

Bases: StrEnum

Growth phases in a bioprocess batch.

Each phase corresponds to a distinct segment of the growth curve:

  • LAG: Initial adaptation period after inoculation.
  • EXPONENTIAL: Rapid cell growth at maximum specific growth rate.
  • STATIONARY: Growth rate equals death rate; nutrient limitation.
  • DECLINE: Cell viability decreasing; nutrient depletion.
  • UNKNOWN: Phase could not be classified.

sporedb.BatchMetrics

Bases: BaseModel

Computed kinetic metrics for a specific phase of a batch run.

Attributes:

  • batch_id (UUID) –

    UUID of the batch.

  • phase_type (PhaseType) –

    The growth phase these metrics apply to.

  • mu (float | None) –

    Specific growth rate in h^-1.

  • qp (float | None) –

    Volumetric productivity in g/L/h.

  • yx_s (float | None) –

    Biomass yield coefficient (g biomass / g substrate).

  • yp_s (float | None) –

    Product yield coefficient (g product / g substrate).

  • r_squared (float | None) –

    Regression fit quality (0.0 to 1.0).

  • signal_variable (str) –

    Telemetry variable used for computation. Defaults to "OD600".

sporedb.GoldenBatchProfile

Bases: BaseModel

Reference trajectory from top-N aligned batches for golden batch scoring.

Stores the mean and standard deviation of aligned time-series trajectories for a set of reference (golden) batches.

Attributes:

  • profile_id (UUID) –

    UUIDv7 identifier (auto-generated).

  • variables (list[str]) –

    List of telemetry variable names in the profile.

  • mean_trajectory (list[list[float]]) –

    Mean trajectory matrix (n_timepoints x n_variables).

  • std_trajectory (list[list[float]]) –

    Standard deviation matrix (same shape as mean).

  • elapsed_hours (list[float]) –

    Elapsed time values for each row (n_timepoints,).

  • source_batch_ids (list[str]) –

    String UUIDs of the batches used to build this profile.

  • metadata (dict[str, object]) –

    Optional metadata (e.g. creation date, notes).

Assay & Measurements

sporedb.AssayMeasurement

Bases: BaseModel

An offline assay measurement for a batch.

Represents a single analytical measurement taken outside the bioreactor (e.g. HPLC, cell count, LC-MS).

Attributes:

  • batch_id (UUID) –

    UUID of the batch this measurement belongs to.

  • ts (datetime) –

    Sampling timestamp (must be timezone-aware).

  • variable (str) –

    Measured quantity name (e.g. "glucose", "viable_cells").

  • value (float) –

    Measured value.

  • uncertainty (float) –

    Measurement uncertainty (1 sigma). Defaults to 0.0.

  • unit (str | None) –

    Unit of measurement (e.g. "g/L").

  • method (str | None) –

    Analytical method used (e.g. "HPLC", "cell_count").

Raises:

  • ValueError

    If ts is not timezone-aware.

sporedb.UncertainValue

Bases: BaseModel

A measurement with associated uncertainty (1 sigma).

Attributes:

  • value (float) –

    The measured value.

  • uncertainty (float) –

    One standard deviation uncertainty. Defaults to 0.0.

  • unit (str) –

    Unit of measurement (e.g. "g/L", "cells/mL").

to_ufloat

to_ufloat() -> object

Convert to uncertainties.ufloat for error propagation.

Source code in src/sporedb/models/assay.py
def to_ufloat(self) -> object:
    """Convert to uncertainties.ufloat for error propagation."""
    from uncertainties import ufloat

    return ufloat(self.value, self.uncertainty)

sporedb.UnitOperation

Bases: BaseModel

A single processing step in a batch's lineage (DAG node).

Each unit operation represents one step in the bioprocess workflow (e.g. seed train, fermentation, centrifugation). Operations form a directed acyclic graph (DAG) via parent_ids.

Attributes:

  • operation_id (UUID) –

    UUIDv7 identifier (auto-generated).

  • batch_id (UUID) –

    UUID of the batch this operation belongs to.

  • name (str) –

    Operation name (e.g. "seed_train", "centrifugation").

  • operation_type (str) –

    Category (e.g. "upstream", "downstream", "analytical").

  • parent_ids (list[UUID]) –

    UUIDs of parent operations in the DAG.

  • started_at (datetime | None) –

    When this operation started (timezone-aware).

  • ended_at (datetime | None) –

    When this operation completed (timezone-aware).

  • parameters (dict[str, str | int | float | bool]) –

    Process parameters as key-value pairs.

Raises:

  • ValueError

    If started_at or ended_at is not timezone-aware.

Storage

sporedb.StorageEngine

StorageEngine(data_root: Path | str)

Manages DuckDB connection and data root directory.

Uses an in-memory DuckDB instance that reads/writes Parquet files directly. The data_root directory is created if it does not exist.

Parameters:

  • data_root (Path | str) –

    Path to the directory where Parquet files are stored. Created automatically if it does not exist.

Source code in src/sporedb/storage/engine.py
def __init__(self, data_root: Path | str) -> None:
    self.data_root = Path(data_root)
    self.data_root.mkdir(parents=True, exist_ok=True)
    self._con: duckdb.DuckDBPyConnection | None = None
    self._lock = threading.Lock()
    self._closed = False

con property

con: DuckDBPyConnection

Lazy-initialize and return the DuckDB connection.

Thread-safe via _lock. Raises RuntimeError after close().

close

close() -> None

Close the DuckDB connection if open. Further access raises RuntimeError.

Source code in src/sporedb/storage/engine.py
def close(self) -> None:
    """Close the DuckDB connection if open. Further access raises RuntimeError."""
    with self._lock:
        if self._con is not None:
            self._con.close()
            self._con = None
        self._closed = True

sporedb.BatchStore

BatchStore(engine: StorageEngine)

Batch CRUD operations backed by a Parquet catalog file.

Uses PyArrow for direct catalog reads/writes (small file), and DuckDB for search queries with predicate pushdown.

Parameters:

  • engine (StorageEngine) –

    A :class:StorageEngine instance providing the DuckDB connection and data root path.

Source code in src/sporedb/storage/batch_store.py
def __init__(self, engine: StorageEngine) -> None:
    self._engine = engine
    self._layout = ParquetLayout(engine.data_root)

create_batch

create_batch(batch: Batch) -> Batch

Persist a new batch to the catalog.

Raises ValueError if batch_id already exists.

Source code in src/sporedb/storage/batch_store.py
def create_batch(self, batch: Batch) -> Batch:
    """Persist a new batch to the catalog.

    Raises ValueError if batch_id already exists.
    """
    from sporedb.storage._locking import parquet_lock

    flat = _batch_to_flat_dict(batch)
    new_table = pa.Table.from_pylist([flat], schema=CATALOG_SCHEMA)

    catalog_path = self._layout.batches_catalog()
    with parquet_lock(catalog_path):
        if catalog_path.exists():
            existing = pq.read_table(catalog_path, schema=CATALOG_SCHEMA)  # type: ignore[no-untyped-call]
            bid_str = str(batch.batch_id)
            existing_ids = existing.column("batch_id").to_pylist()
            if bid_str in existing_ids:
                raise ValueError(f"Batch {batch.batch_id} already exists")
            combined = pa.concat_tables([existing, new_table])
        else:
            combined = new_table

        _atomic_write_table(combined, catalog_path)
    return batch

delete_batch

delete_batch(batch_id: UUID) -> bool

Remove a batch from the catalog. Returns True if found and deleted.

Source code in src/sporedb/storage/batch_store.py
def delete_batch(self, batch_id: UUID) -> bool:
    """Remove a batch from the catalog. Returns True if found and deleted."""
    catalog_path = self._layout.batches_catalog()
    if not catalog_path.exists():
        return False

    table = pq.read_table(catalog_path, schema=CATALOG_SCHEMA)  # type: ignore[no-untyped-call]
    bid_str = str(batch_id)
    rows: list[dict[str, Any]] = []
    found = False

    for i in range(table.num_rows):
        row = {col: table.column(col)[i].as_py() for col in table.column_names}
        if row["batch_id"] == bid_str:
            found = True
        else:
            rows.append(row)

    if not found:
        return False

    if rows:
        new_table = pa.Table.from_pylist(rows, schema=CATALOG_SCHEMA)
        _atomic_write_table(new_table, catalog_path)
    else:
        # All batches deleted -- remove catalog file
        catalog_path.unlink()

    return True

get_batch

get_batch(batch_id: UUID) -> Batch | None

Retrieve a batch by ID. Returns None if not found.

Source code in src/sporedb/storage/batch_store.py
def get_batch(self, batch_id: UUID) -> Batch | None:
    """Retrieve a batch by ID. Returns None if not found."""
    catalog_path = self._layout.batches_catalog()
    if not catalog_path.exists():
        return None

    table = pq.read_table(catalog_path, schema=CATALOG_SCHEMA)  # type: ignore[no-untyped-call]
    bid_str = str(batch_id)
    for i in range(table.num_rows):
        if table.column("batch_id")[i].as_py() == bid_str:
            row = {col: table.column(col)[i].as_py() for col in table.column_names}
            return _flat_dict_to_batch(row)
    return None

list_batches

list_batches() -> list[Batch]

Return all batches in the catalog. Empty list if no catalog exists.

Source code in src/sporedb/storage/batch_store.py
def list_batches(self) -> list[Batch]:
    """Return all batches in the catalog. Empty list if no catalog exists."""
    catalog_path = self._layout.batches_catalog()
    if not catalog_path.exists():
        return []

    table = pq.read_table(catalog_path, schema=CATALOG_SCHEMA)  # type: ignore[no-untyped-call]
    return _table_to_batches(table)

search_batches

search_batches(filter: BatchFilter | None = None) -> list[Batch]

Search batches using compound filter conditions via DuckDB.

All filter values are passed as parameterized query parameters to prevent SQL injection.

Source code in src/sporedb/storage/batch_store.py
def search_batches(self, filter: BatchFilter | None = None) -> list[Batch]:
    """Search batches using compound filter conditions via DuckDB.

    All filter values are passed as parameterized query parameters
    to prevent SQL injection.
    """
    catalog_path = self._layout.batches_catalog()
    if not catalog_path.exists():
        return []

    if filter is None:
        return self.list_batches()

    clauses, params = filter.to_sql_clauses()
    if not clauses:
        return self.list_batches()

    where = " AND ".join(clauses)
    sql = f"SELECT * FROM read_parquet(?) WHERE {where}"
    # First parameter is always the file path
    all_params = [str(catalog_path)] + params

    cursor = self._engine.con.execute(sql, all_params)
    result = cursor.fetchall()
    col_names = [desc[0] for desc in cursor.description]

    batches: list[Batch] = []
    for row_tuple in result:
        row = dict(zip(col_names, row_tuple, strict=False))
        batches.append(_flat_dict_to_batch(row))
    return batches

update_batch

update_batch(batch: Batch) -> Batch

Update a batch in the catalog. Sets updated_at to now(UTC).

Reads all rows, replaces the matching batch_id, and rewrites.

Source code in src/sporedb/storage/batch_store.py
def update_batch(self, batch: Batch) -> Batch:
    """Update a batch in the catalog. Sets updated_at to now(UTC).

    Reads all rows, replaces the matching batch_id, and rewrites.
    """
    catalog_path = self._layout.batches_catalog()
    if not catalog_path.exists():
        msg = f"Batch {batch.batch_id} not found"
        raise ValueError(msg)

    table = pq.read_table(catalog_path, schema=CATALOG_SCHEMA)  # type: ignore[no-untyped-call]
    bid_str = str(batch.batch_id)
    found = False
    rows: list[dict[str, Any]] = []

    for i in range(table.num_rows):
        row = {col: table.column(col)[i].as_py() for col in table.column_names}
        if row["batch_id"] == bid_str:
            batch.updated_at = datetime.now(UTC)
            rows.append(_batch_to_flat_dict(batch))
            found = True
        else:
            rows.append(row)

    if not found:
        msg = f"Batch {batch.batch_id} not found"
        raise ValueError(msg)

    new_table = pa.Table.from_pylist(rows, schema=CATALOG_SCHEMA)
    _atomic_write_table(new_table, catalog_path)
    return batch

sporedb.TimeSeriesStore

TimeSeriesStore(engine: StorageEngine)

Storage for telemetry and assay time-series data.

Uses Parquet files organized by batch_id (Hive partitioning) and DuckDB ASOF JOIN for unified temporal views.

Parameters:

  • engine (StorageEngine) –

    A :class:StorageEngine instance providing the DuckDB connection and data root path.

Source code in src/sporedb/storage/ts_store.py
def __init__(self, engine: StorageEngine) -> None:
    self._engine = engine
    self._layout = ParquetLayout(engine.data_root)

append_assay

append_assay(records: list[AssayMeasurement]) -> int

Append assay measurements to batch Parquet file. Returns count appended.

All records must share the same batch_id.

Source code in src/sporedb/storage/ts_store.py
def append_assay(self, records: list[AssayMeasurement]) -> int:
    """Append assay measurements to batch Parquet file. Returns count appended.

    All records must share the same batch_id.
    """
    if not records:
        return 0
    batch_ids = {r.batch_id for r in records}
    if len(batch_ids) != 1:
        raise ValueError(
            f"All records must share the same batch_id; got {batch_ids}"
        )
    batch_id = records[0].batch_id
    table = _records_to_table(records, _ASSAY_SCHEMA, _serialize_assay)
    path = self._layout.assay_file(batch_id)
    return _append_to_parquet(path, table)

append_telemetry

append_telemetry(records: list[TelemetryRecord]) -> int

Append telemetry records to batch Parquet file. Returns count appended.

All records must share the same batch_id.

Source code in src/sporedb/storage/ts_store.py
def append_telemetry(self, records: list[TelemetryRecord]) -> int:
    """Append telemetry records to batch Parquet file. Returns count appended.

    All records must share the same batch_id.
    """
    if not records:
        return 0
    batch_ids = {r.batch_id for r in records}
    if len(batch_ids) != 1:
        raise ValueError(
            f"All records must share the same batch_id; got {batch_ids}"
        )
    batch_id = records[0].batch_id
    table = _records_to_table(records, _TELEMETRY_SCHEMA, _serialize_telemetry)
    path = self._layout.telemetry_file(batch_id)
    return _append_to_parquet(path, table)

get_assay

get_assay(batch_id: UUID) -> DataFrame

Get all assay data for a batch. Returns empty DataFrame if none.

Source code in src/sporedb/storage/ts_store.py
def get_assay(self, batch_id: UUID) -> pd.DataFrame:
    """Get all assay data for a batch. Returns empty DataFrame if none."""
    path = self._layout.assay_file(batch_id)
    if not path.exists():
        return pd.DataFrame()
    return pq.read_table(path, schema=_ASSAY_SCHEMA).to_pandas()  # type: ignore[no-untyped-call, no-any-return]

get_assay_as_uncertain

get_assay_as_uncertain(batch_id: UUID, variable: str) -> list[UncertainValue]

Get assay measurements as UncertainValue objects.

Used for uncertainty propagation.

Source code in src/sporedb/storage/ts_store.py
def get_assay_as_uncertain(
    self, batch_id: UUID, variable: str
) -> list[UncertainValue]:
    """Get assay measurements as UncertainValue objects.

    Used for uncertainty propagation.
    """
    df = self.get_assay(batch_id)
    if df.empty:
        return []
    filtered = df[df["variable"] == variable]
    return [
        UncertainValue(
            value=row["value"],
            uncertainty=row["uncertainty"],
            unit=row.get("unit", ""),
        )
        for _, row in filtered.iterrows()
    ]

get_telemetry

get_telemetry(batch_id: UUID) -> DataFrame

Get all telemetry for a batch. Returns empty DataFrame if none.

Source code in src/sporedb/storage/ts_store.py
def get_telemetry(self, batch_id: UUID) -> pd.DataFrame:
    """Get all telemetry for a batch. Returns empty DataFrame if none."""
    path = self._layout.telemetry_file(batch_id)
    if not path.exists():
        return pd.DataFrame()
    return pq.read_table(path, schema=_TELEMETRY_SCHEMA).to_pandas()  # type: ignore[no-untyped-call, no-any-return]

get_unified_view

get_unified_view(batch_id: UUID) -> DataFrame

ASOF JOIN telemetry and assay for a unified time-series view.

Links each assay measurement to the nearest prior telemetry timestamp. Uses DuckDB ASOF JOIN for efficient temporal alignment.

Source code in src/sporedb/storage/ts_store.py
def get_unified_view(self, batch_id: UUID) -> pd.DataFrame:
    """ASOF JOIN telemetry and assay for a unified time-series view.

    Links each assay measurement to the nearest prior telemetry timestamp.
    Uses DuckDB ASOF JOIN for efficient temporal alignment.
    """
    telemetry_path = self._layout.telemetry_file(batch_id)
    assay_path = self._layout.assay_file(batch_id)

    if not telemetry_path.exists() or not assay_path.exists():
        return pd.DataFrame()

    # File paths are constructed from validated UUID objects (T-03-01 mitigation).
    # Paths passed as parameterized values, not interpolated into SQL
    # (T-03-02 mitigation).
    sql = """
        SELECT
            a.ts       AS assay_ts,
            a.variable AS analyte,
            a.value    AS assay_value,
            a.uncertainty AS assay_uncertainty,
            a.unit     AS assay_unit,
            a.method   AS assay_method,
            t.ts       AS telemetry_ts,
            t.variable AS sensor,
            t.value    AS sensor_value,
            t.unit     AS sensor_unit
        FROM read_parquet(?) a
        ASOF JOIN read_parquet(?) t
            ON a.ts >= t.ts
        ORDER BY a.ts
    """
    return self._engine.con.execute(
        sql, [str(assay_path), str(telemetry_path)]
    ).fetchdf()

sporedb.LineageStore

LineageStore(engine: StorageEngine)

Storage and traversal for process lineage DAG.

Persists unit operations as Parquet files per batch, with parent_ids encoding DAG edges. Supports BFS traversal in both upstream and downstream directions.

Parameters:

  • engine (StorageEngine) –

    A :class:StorageEngine instance providing the DuckDB connection and data root path.

Source code in src/sporedb/storage/lineage_store.py
def __init__(self, engine: StorageEngine) -> None:
    self._engine = engine
    self._layout = ParquetLayout(engine.data_root)

add_operation

add_operation(operation: UnitOperation) -> UnitOperation

Add a unit operation to the lineage DAG. Returns the operation.

Source code in src/sporedb/storage/lineage_store.py
def add_operation(self, operation: UnitOperation) -> UnitOperation:
    """Add a unit operation to the lineage DAG. Returns the operation."""
    from sporedb.storage._locking import parquet_lock

    row = _serialize_operation(operation)
    arrays = []
    for field in _LINEAGE_SCHEMA:
        arrays.append(pa.array([row[field.name]], type=field.type))
    new_table = pa.table(arrays, schema=_LINEAGE_SCHEMA)

    path = self._layout.lineage_file(operation.batch_id)
    path.parent.mkdir(parents=True, exist_ok=True)
    with parquet_lock(path):
        if path.exists():
            existing = pq.read_table(path, schema=_LINEAGE_SCHEMA)  # type: ignore[no-untyped-call]
            combined = pa.concat_tables([existing, new_table])
        else:
            combined = new_table
        _atomic_write_table(combined, path)
    return operation

get_downstream

get_downstream(operation_id: UUID, batch_id: UUID) -> list[UnitOperation]

Get all downstream operations from a given operation (BFS traversal).

Source code in src/sporedb/storage/lineage_store.py
def get_downstream(self, operation_id: UUID, batch_id: UUID) -> list[UnitOperation]:
    """Get all downstream operations from a given operation (BFS traversal)."""
    all_ops = self.get_operations(batch_id)

    visited: set[UUID] = set()
    queue = [operation_id]
    result: list[UnitOperation] = []

    while queue:
        current_id = queue.pop(0)
        if current_id in visited:
            continue
        visited.add(current_id)

        for op in all_ops:
            if current_id in op.parent_ids and op.operation_id not in visited:
                result.append(op)
                queue.append(op.operation_id)

    return result

get_operations

get_operations(batch_id: UUID) -> list[UnitOperation]

Get all operations for a batch. Returns empty list if none.

Source code in src/sporedb/storage/lineage_store.py
def get_operations(self, batch_id: UUID) -> list[UnitOperation]:
    """Get all operations for a batch. Returns empty list if none."""
    path = self._layout.lineage_file(batch_id)
    if not path.exists():
        return []
    table = pq.read_table(path, schema=_LINEAGE_SCHEMA)  # type: ignore[no-untyped-call]
    df = table.to_pandas()
    return [_deserialize_operation(row.to_dict()) for _, row in df.iterrows()]

get_upstream

get_upstream(operation_id: UUID, batch_id: UUID) -> list[UnitOperation]

Get all upstream (ancestor) operations from a given operation.

Source code in src/sporedb/storage/lineage_store.py
def get_upstream(self, operation_id: UUID, batch_id: UUID) -> list[UnitOperation]:
    """Get all upstream (ancestor) operations from a given operation."""
    all_ops = self.get_operations(batch_id)
    ops_by_id = {op.operation_id: op for op in all_ops}

    visited: set[UUID] = set()
    queue = [operation_id]
    result: list[UnitOperation] = []

    while queue:
        current_id = queue.pop(0)
        if current_id in visited:
            continue
        visited.add(current_id)

        current_op = ops_by_id.get(current_id)
        if current_op is None:
            continue

        for parent_id in current_op.parent_ids:
            if parent_id not in visited:
                parent_op = ops_by_id.get(parent_id)
                if parent_op:
                    result.append(parent_op)
                    queue.append(parent_id)

    return result

Cloud

sporedb.CloudClient

CloudClient(endpoint: str, api_key: str, timeout: float = 30.0)

HTTP-based SporeDB client for the cloud tier.

Mirrors the method signatures of :class:sporedb.client.SporeDB so that switching between local and cloud mode is transparent to callers.

Parameters

endpoint: Base URL of the SporeDB cloud instance (e.g. https://cloud.sporedb.io). api_key: JWT bearer token for authentication. timeout: HTTP request timeout in seconds.

Source code in src/sporedb/cloud_client.py
def __init__(
    self,
    endpoint: str,
    api_key: str,
    timeout: float = 30.0,
) -> None:
    self._endpoint = endpoint.rstrip("/")
    if not self._endpoint.startswith("https://"):
        import warnings

        warnings.warn(
            f"SporeDB cloud endpoint uses insecure HTTP: {self._endpoint}. "
            "API key will be transmitted in plaintext. Use HTTPS in production.",
            UserWarning,
            stacklevel=2,
        )
    self._client = httpx.Client(
        base_url=self._endpoint + "/api/v1",
        headers={"Authorization": f"Bearer {api_key}"},
        timeout=timeout,
    )

align

align(batch_ids: list[UUID], signal: str = 'OD600') -> DataFrame

Align multiple batch runs via the cloud API.

Source code in src/sporedb/cloud_client.py
def align(
    self,
    batch_ids: list[UUID],
    signal: str = "OD600",
) -> pd.DataFrame:
    """Align multiple batch runs via the cloud API."""
    response = self._client.post(
        "/analytics/align",
        json={
            "batch_ids": [str(bid) for bid in batch_ids],
            "signal": signal,
        },
    )
    self._raise_for_status(response)
    return pd.DataFrame(response.json())

close

close() -> None

Close the underlying HTTP client.

Source code in src/sporedb/cloud_client.py
def close(self) -> None:
    """Close the underlying HTTP client."""
    self._client.close()

compute_metrics

compute_metrics(batch_id: UUID, signal: str = 'OD600', min_size: int = 10) -> list[dict[str, Any]]

Compute batch metrics via the cloud API.

Returns a list of metric dicts (one per detected phase).

Source code in src/sporedb/cloud_client.py
def compute_metrics(
    self,
    batch_id: UUID,
    signal: str = "OD600",
    min_size: int = 10,
) -> list[dict[str, Any]]:
    """Compute batch metrics via the cloud API.

    Returns a list of metric dicts (one per detected phase).
    """
    response = self._client.post(
        "/analytics/metrics",
        json={
            "batch_id": str(batch_id),
            "signal": signal,
            "min_size": min_size,
        },
    )
    self._raise_for_status(response)
    return response.json()["metrics"]  # type: ignore[no-any-return]

create_batch

create_batch(name: str, *, strain: str | None = None, media: str | None = None, scale_liters: float | None = None, operator: str | None = None, tags: list[str] | None = None, inoculation: datetime | None = None) -> Batch

Create a new batch via the cloud API.

Source code in src/sporedb/cloud_client.py
def create_batch(
    self,
    name: str,
    *,
    strain: str | None = None,
    media: str | None = None,
    scale_liters: float | None = None,
    operator: str | None = None,
    tags: list[str] | None = None,
    inoculation: datetime | None = None,
) -> Batch:
    """Create a new batch via the cloud API."""
    metadata: dict[str, Any] = {}
    if strain is not None:
        metadata["strain"] = strain
    if media is not None:
        metadata["media"] = media
    if scale_liters is not None:
        metadata["scale_liters"] = scale_liters
    if operator is not None:
        metadata["operator"] = operator
    if inoculation is not None:
        metadata["inoculation"] = inoculation.isoformat()

    body: dict[str, Any] = {"name": name}
    if metadata:
        body["metadata"] = metadata
    if tags:
        body["tags"] = tags

    response = self._client.post("/batches/", json=body)
    self._raise_for_status(response)
    return self._batch_from_response(response.json())

delete_batch

delete_batch(batch_id: UUID) -> bool

Delete a batch. Returns True if it existed.

Source code in src/sporedb/cloud_client.py
def delete_batch(self, batch_id: UUID) -> bool:
    """Delete a batch. Returns ``True`` if it existed."""
    response = self._client.delete(f"/batches/{batch_id}")
    if response.status_code == 404:
        return False
    self._raise_for_status(response)
    return True

detect_phases

detect_phases(batch_id: UUID, signal: str = 'OD600', min_size: int = 10) -> list[Any]

Run phase detection via the cloud API.

Source code in src/sporedb/cloud_client.py
def detect_phases(
    self,
    batch_id: UUID,
    signal: str = "OD600",
    min_size: int = 10,
) -> list[Any]:
    """Run phase detection via the cloud API."""
    from sporedb.analytics.models import PhaseAnnotation

    response = self._client.post(
        "/analytics/detect-phases",
        json={
            "batch_id": str(batch_id),
            "signal": signal,
            "min_size": min_size,
        },
    )
    self._raise_for_status(response)
    return [PhaseAnnotation.model_validate(item) for item in response.json()]

detect_phases_online

detect_phases_online(batch_id: UUID, signal: str = 'OD600', *, hazard_rate: float = 0.004, threshold: float = 0.5) -> list[Any]

Run BOCPD online phase detection via the cloud API.

Source code in src/sporedb/cloud_client.py
def detect_phases_online(
    self,
    batch_id: UUID,
    signal: str = "OD600",
    *,
    hazard_rate: float = 0.004,
    threshold: float = 0.5,
) -> list[Any]:
    """Run BOCPD online phase detection via the cloud API."""
    response = self._client.post(
        "/analytics/detect-phases-online",
        json={
            "batch_id": str(batch_id),
            "signal": signal,
            "hazard_rate": hazard_rate,
            "threshold": threshold,
        },
    )
    self._raise_for_status(response)
    return response.json()  # type: ignore[no-any-return]

export

export(batch_id: UUID, format: str = 'csv') -> bytes

Export batch data via the cloud API.

Parameters:

  • batch_id (UUID) –

    Batch to export.

  • format (str, default: 'csv' ) –

    "csv" or "arrow".

Returns:

  • bytes

    Raw bytes of the exported data.

Source code in src/sporedb/cloud_client.py
def export(
    self,
    batch_id: UUID,
    format: str = "csv",
) -> bytes:
    """Export batch data via the cloud API.

    Args:
        batch_id: Batch to export.
        format: ``"csv"`` or ``"arrow"``.

    Returns:
        Raw bytes of the exported data.
    """
    response = self._client.get(
        f"/data/export/{batch_id}",
        params={"format": format},
    )
    self._raise_for_status(response)
    return response.content

get_assay

get_assay(batch_id: UUID) -> DataFrame

Return assay measurements for a batch as a pandas DataFrame.

Source code in src/sporedb/cloud_client.py
def get_assay(self, batch_id: UUID) -> pd.DataFrame:
    """Return assay measurements for a batch as a pandas DataFrame."""
    response = self._client.get(f"/data/assay/{batch_id}")
    self._raise_for_status(response)
    return pd.read_parquet(BytesIO(response.content))

get_batch

get_batch(batch_id: UUID) -> Batch | None

Retrieve a batch by ID, or None if not found.

Source code in src/sporedb/cloud_client.py
def get_batch(self, batch_id: UUID) -> Batch | None:
    """Retrieve a batch by ID, or ``None`` if not found."""
    response = self._client.get(f"/batches/{batch_id}")
    if response.status_code == 404:
        return None
    self._raise_for_status(response)
    return self._batch_from_response(response.json())

get_telemetry

get_telemetry(batch_id: UUID) -> DataFrame

Return telemetry data for a batch as a pandas DataFrame.

Source code in src/sporedb/cloud_client.py
def get_telemetry(self, batch_id: UUID) -> pd.DataFrame:
    """Return telemetry data for a batch as a pandas DataFrame."""
    response = self._client.get(f"/data/telemetry/{batch_id}")
    self._raise_for_status(response)
    return pd.read_parquet(BytesIO(response.content))

list_batches

list_batches() -> list[Batch]

Return all batches for the authenticated tenant.

Source code in src/sporedb/cloud_client.py
def list_batches(self) -> list[Batch]:
    """Return all batches for the authenticated tenant."""
    response = self._client.get("/batches/")
    self._raise_for_status(response)
    return [self._batch_from_response(b) for b in response.json()]

query

query(dsl_query: str) -> DataFrame

Execute a bioprocess DSL query via the cloud API.

Source code in src/sporedb/cloud_client.py
def query(self, dsl_query: str) -> pd.DataFrame:
    """Execute a bioprocess DSL query via the cloud API."""
    response = self._client.post(
        "/query/execute",
        json={"query": dsl_query},
    )
    self._raise_for_status(response)
    data = response.json()
    if isinstance(data, list):
        return pd.DataFrame(data)
    # Handle structured response with columns/rows
    if "columns" in data and "rows" in data:
        return pd.DataFrame(data["rows"], columns=data["columns"])
    return pd.DataFrame(data)