Evaluation
How manyLatents dispatches, evaluates, and samples embeddings. The core engine lives in experiment.py; evaluation helpers live in evaluate.py.
Algorithm Dispatch
manyLatents uses a two-level dispatch system to handle both LatentModule (fit/transform) and LightningModule (training loop) algorithms through a unified interface.
Algorithm Resolution
run_experiment() determines which algorithm type to instantiate from the algorithm dict:
if "latent" in algorithms and algorithms["latent"] is not None:
algorithm = instantiate_algorithm(algorithms["latent"], datamodule)
elif "lightning" in algorithms and algorithms["lightning"] is not None:
algorithm = instantiate_algorithm(algorithms["lightning"], datamodule)
else:
raise ValueError("No algorithm specified in configuration")
Only one of algorithms["latent"] or algorithms["lightning"] should be set per run. The key determines which path is taken.
Execution
run_experiment() routes via isinstance() checks (the former execute_step() logic is now inlined in run_experiment()):
if isinstance(algorithm, LatentModule):
algorithm.fit(train_tensor, train_labels)
latents = algorithm.transform(test_tensor)
elif isinstance(algorithm, LightningModule):
trainer.fit(algorithm, datamodule=datamodule)
latents = algorithm.encode(test_tensor)
LatentModule path: Direct fit() on training data, then transform() on test data. Labels are passed for supervised modules (e.g., ClassifierModule) and ignored by unsupervised ones.
LightningModule path: Full Lightning training loop via trainer.fit(), optional pretrained checkpoint loading, model evaluation via evaluate(), then embedding extraction via encode().
Evaluation: evaluate() in evaluate.py
The unified evaluate() function (in evaluate.py) handles both metric formats:
def evaluate(
embeddings,
*,
dataset=None,
module=None,
metrics=None, # list[str] OR dict[str, DictConfig]
sampling=None, # dict of instantiated samplers
cache_dir=None,
cache=None,
) -> dict[str, Any]:
| Metric format | Path | When used |
|---|---|---|
list[str] (registry names) |
_evaluate_registry() |
Python API (run(metrics=["trustworthiness"])) |
dict[str, DictConfig] (Hydra configs) |
_evaluate_hydra() |
CLI path (configs with _target_ and at fields) |
For LightningModule runs, _evaluate_lightningmodule() in experiment.py handles model-level metrics (trainer.test()), then evaluate() runs on the extracted embeddings.
Sampling Strategies
Large datasets can make metric computation expensive. manyLatents provides pluggable sampling strategies that subsample embeddings and datasets before evaluation.
Protocol
All strategies implement the SamplingStrategy protocol:
class SamplingStrategy(Protocol):
def sample(
self,
embeddings: np.ndarray,
dataset: object,
n_samples: Optional[int] = None,
fraction: Optional[float] = None,
seed: int = 42,
) -> Tuple[np.ndarray, object, np.ndarray]:
# Returns (subsampled_embeddings, subsampled_dataset, indices)
...
The returned dataset is a deep copy with subsampled data, latitude, longitude, and population_label attributes (when present).
Available Strategies
| Strategy | Config | Use Case |
|---|---|---|
RandomSampling |
sampling/random |
Default. Uniform random without replacement |
StratifiedSampling |
sampling/stratified |
Preserves label distribution across strata |
FarthestPointSampling |
sampling/farthest_point |
Maximum coverage of embedding space |
FixedIndexSampling |
(programmatic) | Reproducible cross-setting comparisons |
Configuration
Sampling is configured under top-level sampling in Hydra, keyed by output name:
# Pre-fit: subsample dataset before algorithm fitting
sampling:
dataset:
_target_: manylatents.utils.sampling.RandomSampling
seed: 42
fraction: 0.5
# Post-fit: subsample embeddings before metric evaluation
sampling:
embedding:
_target_: manylatents.utils.sampling.RandomSampling
seed: 42
fraction: 0.1
Deterministic Indices
RandomSampling.get_indices() precomputes indices for reproducible comparisons:
sampler = RandomSampling(seed=42)
indices = sampler.get_indices(data, fraction=0.1)
np.save('shared_indices.npy', indices)
# Reuse across runs
fixed = FixedIndexSampling(indices=np.load('shared_indices.npy'))
emb_sub, ds_sub, _ = fixed.sample(embeddings, dataset)
How Sampling Integrates
In evaluate(), post-fit sampling runs before any metrics:
# sampling is a dict of pre-instantiated sampler objects
if sampling is not None:
for output_name, sampler in sampling.items():
if output_name == "dataset":
continue # pre-fit sampling handled in run_experiment()
indices = sampler.get_indices(outputs[output_name])
outputs[output_name] = outputs[output_name][indices]
Pre-fit sampling (sampling["dataset"]) runs in run_experiment() before fit(), reducing the data the algorithm sees. Post-fit sampling (e.g., sampling["embedding"]) runs in evaluate() before metrics. If no sampling is configured, metrics run on the full dataset.
Shared Cache
Several metrics need k-nearest neighbors, SVD decompositions, or eigenvalue computations. Computing these per-metric would be redundant. manyLatents pre-warms a shared cache dict and passes it to all metrics.
How It Works
evaluate() uses the config sleuther (extract_k_requirements, in evaluate.py) to discover all k/n_neighbors values from metric configs, then calls prewarm_cache() (also in evaluate.py) to compute kNN and eigenvalues once with max(k):
# 1. Sleuther extracts requirements from metric configs
reqs = extract_k_requirements(metric_cfgs)
# reqs = {"emb_k": {5, 10, 25}, "data_k": {10, 25}, "spectral": True}
# 2. Pre-warm cache with optimal k values
cache = prewarm_cache(metric_cfgs, embeddings, dataset, module)
# cache is keyed by id(data) for kNN, "eigenvalues" for spectral
# 3. All metrics receive the same cache dict
result = metric_fn(embeddings=emb, dataset=ds, module=module, cache=cache)
compute_knn with cache
compute_knn() uses the cache dict to avoid redundant computation. If a cached result exists with k >= requested k, it slices and returns immediately:
from manylatents.utils.metrics import compute_knn
cache = {}
# First call: computes kNN with k=25
dists, idxs = compute_knn(data, k=25, cache=cache)
# Second call: reuses cached result, slices to k=10
dists, idxs = compute_knn(data, k=10, cache=cache) # instant
compute_knn() automatically selects the fastest backend: FAISS-GPU > FAISS-CPU > sklearn.
SVD Cache
compute_svd_cache() batches local SVD computation with GPU acceleration (torch) when CUDA is available, falling back to CPU numpy. Results are stored in the same cache dict.
Metric Protocol
All metrics receive cache= as a keyword argument. Metrics that need kNN call compute_knn(..., cache=cache) internally — the cache ensures no redundant computation. Extension metrics that don't accept cache= are handled gracefully via a TypeError fallback.
Metric Expansion
flatten_and_unroll_metrics() handles list-valued parameters via Cartesian product:
# This config:
trustworthiness:
_target_: manylatents.metrics.trustworthiness.Trustworthiness
_partial_: true
n_neighbors: [5, 10, 20]
# Expands to three separate evaluations:
# trustworthiness__n_neighbors_5
# trustworthiness__n_neighbors_10
# trustworthiness__n_neighbors_20
This expansion happens before kNN extraction, so all k values from expanded metrics contribute to the shared cache.