Skip to content

Evaluation

How manyLatents dispatches, evaluates, and samples embeddings. The core engine lives in experiment.py.

Algorithm Dispatch

manyLatents uses a two-level dispatch system to handle both LatentModule (fit/transform) and LightningModule (training loop) algorithms through a unified interface.

Algorithm Resolution

run_algorithm() determines which algorithm type to instantiate from the Hydra config:

if hasattr(cfg.algorithms, 'latent') and cfg.algorithms.latent is not None:
    algorithm = instantiate_algorithm(cfg.algorithms.latent, datamodule)
elif hasattr(cfg.algorithms, 'lightning') and cfg.algorithms.lightning is not None:
    algorithm = instantiate_algorithm(cfg.algorithms.lightning, datamodule)
else:
    raise ValueError("No algorithm specified in configuration")

Only one of algorithms/latent or algorithms/lightning should be set per run. The config group determines which path is taken.

Execution: execute_step()

execute_step() routes via isinstance() checks:

if isinstance(algorithm, LatentModule):
    algorithm.fit(train_tensor, train_labels)
    latents = algorithm.transform(test_tensor)

elif isinstance(algorithm, LightningModule):
    trainer.fit(algorithm, datamodule=datamodule)
    latents = algorithm.encode(test_tensor)

LatentModule path: Direct fit() on training data, then transform() on test data. Labels are passed for supervised modules (e.g., ClassifierModule) and ignored by unsupervised ones.

LightningModule path: Full Lightning training loop via trainer.fit(), optional pretrained checkpoint loading, model evaluation via evaluate(), then embedding extraction via encode().

Evaluation: @functools.singledispatch

The evaluate() function uses Python's @functools.singledispatch to dispatch on the first argument's type:

@functools.singledispatch
def evaluate(algorithm: Any, /, **kwargs):
    raise NotImplementedError(...)

@evaluate.register(dict)
def evaluate_embeddings(latent_outputs: dict, *, cfg, datamodule, **kwargs):
    # Handles embedding-level metrics (trustworthiness, continuity, etc.)
    ...

@evaluate.register(LightningModule)
def evaluate_lightningmodule(algorithm: LightningModule, *, cfg, trainer, datamodule, **kwargs):
    # Handles trainer.test() and model-specific metrics
    ...
Dispatch Type Handler Evaluates
dict (LatentOutputs) evaluate_embeddings() Embedding metrics (trustworthiness, continuity, kNN preservation, etc.)
LightningModule evaluate_lightningmodule() trainer.test() results + custom model metrics

Both paths are called during a LightningModule run: first evaluate_lightningmodule during execute_step(), then evaluate_embeddings on the extracted embeddings.

Pipeline Mode

run_pipeline() chains multiple steps sequentially, where step N's output embeddings become step N+1's input. The dispatch logic is reused per step via execute_step().

# PCA (1000→50) → PHATE (50→2)
uv run python -m manylatents.main experiment=my_pipeline

Sampling Strategies

Large datasets can make metric computation expensive. manyLatents provides pluggable sampling strategies that subsample embeddings and datasets before evaluation.

Protocol

All strategies implement the SamplingStrategy protocol:

class SamplingStrategy(Protocol):
    def sample(
        self,
        embeddings: np.ndarray,
        dataset: object,
        n_samples: Optional[int] = None,
        fraction: Optional[float] = None,
        seed: int = 42,
    ) -> Tuple[np.ndarray, object, np.ndarray]:
        # Returns (subsampled_embeddings, subsampled_dataset, indices)
        ...

The returned dataset is a deep copy with subsampled data, latitude, longitude, and population_label attributes (when present).

Available Strategies

Strategy Config Use Case
RandomSampling sampling/random Default. Uniform random without replacement
StratifiedSampling sampling/stratified Preserves label distribution across strata
FarthestPointSampling sampling/farthest_point Maximum coverage of embedding space
FixedIndexSampling (programmatic) Reproducible cross-setting comparisons

Configuration

Sampling is configured under metrics.sampling in Hydra:

# Random (default)
metrics:
  sampling:
    _target_: manylatents.utils.sampling.RandomSampling
    seed: 42
    fraction: 0.1

# Stratified by population label
metrics:
  sampling:
    _target_: manylatents.utils.sampling.StratifiedSampling
    stratify_by: population_label
    seed: 42
    fraction: 0.1

# Farthest point (O(n*k) — slower but better coverage)
metrics:
  sampling:
    _target_: manylatents.utils.sampling.FarthestPointSampling
    seed: 42
    fraction: 0.1

Deterministic Indices

RandomSampling.get_indices() precomputes indices without requiring data, enabling reproducible comparisons:

sampler = RandomSampling(seed=42)
indices = sampler.get_indices(n_total=1000, fraction=0.1)
np.save('shared_indices.npy', indices)

# Reuse across runs
fixed = FixedIndexSampling(indices=np.load('shared_indices.npy'))
emb_sub, ds_sub, _ = fixed.sample(embeddings, dataset)

How Sampling Integrates

In evaluate_embeddings(), sampling runs before any metrics:

sampling_cfg = cfg.metrics.get("sampling", None)
if sampling_cfg is not None:
    sampler = hydra.utils.instantiate(sampling_cfg)
    emb_sub, ds_sub, _ = sampler.sample(embeddings, ds)

All metrics then operate on the subsampled data. If no sampling config is provided, metrics run on the full dataset.

Shared Cache

Several metrics need k-nearest neighbors, SVD decompositions, or eigenvalue computations. Computing these per-metric would be redundant. manyLatents pre-warms a shared cache dict and passes it to all metrics.

How It Works

evaluate_embeddings() uses the config sleuther (extract_k_requirements) to discover all k/n_neighbors values from metric configs, then calls prewarm_cache() to compute kNN and eigenvalues once with max(k):

# 1. Sleuther extracts requirements from metric configs
reqs = extract_k_requirements(metric_cfgs)
# reqs = {"emb_k": {5, 10, 25}, "data_k": {10, 25}, "spectral": True}

# 2. Pre-warm cache with optimal k values
cache = prewarm_cache(metric_cfgs, embeddings, dataset, module)
# cache is keyed by id(data) for kNN, "eigenvalues" for spectral

# 3. All metrics receive the same cache dict
result = metric_fn(embeddings=emb, dataset=ds, module=module, cache=cache)

compute_knn with cache

compute_knn() uses the cache dict to avoid redundant computation. If a cached result exists with k >= requested k, it slices and returns immediately:

from manylatents.utils.metrics import compute_knn

cache = {}
# First call: computes kNN with k=25
dists, idxs = compute_knn(data, k=25, cache=cache)

# Second call: reuses cached result, slices to k=10
dists, idxs = compute_knn(data, k=10, cache=cache)  # instant

compute_knn() automatically selects the fastest backend: FAISS-GPU > FAISS-CPU > sklearn.

SVD Cache

compute_svd_cache() batches local SVD computation with GPU acceleration (torch) when CUDA is available, falling back to CPU numpy. Results are stored in the same cache dict.

Metric Protocol

All metrics receive cache= as a keyword argument. Metrics that need kNN call compute_knn(..., cache=cache) internally — the cache ensures no redundant computation. Extension metrics that don't accept cache= are handled gracefully via a TypeError fallback.

Metric Expansion

flatten_and_unroll_metrics() handles list-valued parameters via Cartesian product:

# This config:
trustworthiness:
  _target_: manylatents.metrics.trustworthiness.Trustworthiness
  _partial_: true
  n_neighbors: [5, 10, 20]

# Expands to three separate evaluations:
# embedding.trustworthiness__n_neighbors_5
# embedding.trustworthiness__n_neighbors_10
# embedding.trustworthiness__n_neighbors_20

This expansion happens before kNN extraction, so all k values from expanded metrics contribute to the shared cache.