Skip to content

Evaluation

Signal quality metrics and artifact generation for compression evaluation.

Metrics

compressionkit.evaluation.metrics.PRD

Bases: Metric

Percent RMS difference metric with optional energy normalization.

Source code in compressionkit/evaluation/metrics.py
@keras.saving.register_keras_serializable(package="compression_kit")
class PRD(keras.metrics.Metric):
    """Percent RMS difference metric with optional energy normalization."""

    def __init__(self, normalized: bool = True, name: str = "prd", **kwargs):
        super().__init__(name=name, **kwargs)
        self.normalized = normalized
        self._num = self.add_weight(name="num", initializer="zeros")
        self._den = self.add_weight(name="den", initializer="zeros")

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true_f = keras.ops.cast(y_true, "float32")
        err = y_true_f - keras.ops.cast(y_pred, "float32")
        self._num.assign_add(keras.ops.sum(keras.ops.square(err)))
        if self.normalized:
            self._den.assign_add(keras.ops.sum(keras.ops.square(y_true_f)))
        else:
            self._den.assign_add(keras.ops.cast(keras.ops.size(y_true_f), "float32"))

    def result(self):
        ratio = self._num / (self._den + keras.ops.cast(1e-8, "float32"))
        return keras.ops.cast(100.0, "float32") * keras.ops.sqrt(
            keras.ops.maximum(ratio, keras.ops.cast(0.0, "float32"))
        )

    def reset_state(self):
        self._num.assign(0.0)
        self._den.assign(0.0)

compressionkit.evaluation.metrics.TruePRD

Bases: PRD

Normalized PRD metric (convenience alias).

Source code in compressionkit/evaluation/metrics.py
@keras.saving.register_keras_serializable(package="compression_kit")
class TruePRD(PRD):
    """Normalized PRD metric (convenience alias)."""

    def __init__(self, name: str = "prd", **kwargs):
        super().__init__(normalized=True, name=name, **kwargs)

compressionkit.evaluation.metrics.compute_signal_metrics(original, reconstructed)

Compute scalar reconstruction metrics on two aligned signals.

Source code in compressionkit/evaluation/metrics.py
def compute_signal_metrics(
    original: np.ndarray,
    reconstructed: np.ndarray,
) -> dict[str, float]:
    """Compute scalar reconstruction metrics on two aligned signals."""
    orig_flat = np.asarray(original, dtype=np.float32).reshape(-1)
    recon_flat = np.asarray(reconstructed, dtype=np.float32).reshape(-1)
    mse = float(np.mean((orig_flat - recon_flat) ** 2))
    mae = float(np.mean(np.abs(orig_flat - recon_flat)))
    denom = float(np.linalg.norm(orig_flat) * np.linalg.norm(recon_flat) + 1e-8)
    cos_sim = float(np.dot(orig_flat, recon_flat) / denom)
    prd_num = float(np.sum((orig_flat - recon_flat) ** 2))
    prd_den = float(np.sum(orig_flat**2) + 1e-8)
    prd_percent = float(100.0 * np.sqrt(max(prd_num / prd_den, 0.0)))
    return {
        "mse": mse,
        "mae": mae,
        "cosine_similarity": cos_sim,
        "prd_percent": prd_percent,
    }

compressionkit.evaluation.metrics.compute_ppg_physiokit_metrics(signal, *, sample_rate, low_hz, high_hz, order, min_peaks)

Compute HR/HRV metrics for one PPG signal using physiokit.

Source code in compressionkit/evaluation/metrics.py
def compute_ppg_physiokit_metrics(
    signal: np.ndarray,
    *,
    sample_rate: int,
    low_hz: float,
    high_hz: float,
    order: int,
    min_peaks: int,
) -> dict[str, float] | None:
    """Compute HR/HRV metrics for one PPG signal using physiokit."""
    sig = np.asarray(signal, dtype=np.float32).reshape(-1)
    if sig.size < 4:
        return None
    try:
        cleaned = pk.ppg.clean(
            sig, lowcut=low_hz, highcut=high_hz, sample_rate=sample_rate, order=order,
        )
        hr_bpm, hr_qos = pk.ppg.compute_heart_rate(cleaned, sample_rate=sample_rate, method="peak")
        peaks = pk.ppg.find_peaks(cleaned, sample_rate=sample_rate)
        if peaks is None:
            return None
        peaks = np.asarray(peaks).reshape(-1)
        if peaks.size < min_peaks:
            return None
        rr_intervals = pk.ppg.compute_rr_intervals(peaks)
        rr_intervals = np.asarray(rr_intervals).reshape(-1)
        if rr_intervals.size < max(2, min_peaks - 1):
            return None
        hrv_time = pk.hrv.compute_hrv_time(rr_intervals, sample_rate=sample_rate)
        return {
            "hr_bpm": float(hr_bpm),
            "hr_qos": float(hr_qos),
            "sdnn_ms": float(hrv_time.sd_nn),
            "rmssd_ms": float(hrv_time.rms_sd),
            "mean_nn_ms": float(hrv_time.mean_nn),
            "num_peaks": int(peaks.size),
        }
    except Exception:
        return None

compressionkit.evaluation.metrics.summarize_physiokit_alignment(originals, reconstructions, *, sample_rate, low_hz, high_hz, order, min_peaks)

Compare physiokit HR/HRV metrics between original and reconstructed signals.

Source code in compressionkit/evaluation/metrics.py
def summarize_physiokit_alignment(
    originals: np.ndarray,
    reconstructions: np.ndarray,
    *,
    sample_rate: int,
    low_hz: float,
    high_hz: float,
    order: int,
    min_peaks: int,
) -> tuple[dict[str, float] | None, list[dict[str, Any] | None]]:
    """Compare physiokit HR/HRV metrics between original and reconstructed signals."""
    per_sample: list[dict[str, Any] | None] = []
    hr_abs_errors: list[float] = []
    hr_biases: list[float] = []
    rmssd_abs_errors: list[float] = []
    sdnn_abs_errors: list[float] = []
    target_hr_values: list[float] = []
    recon_hr_values: list[float] = []
    target_rmssd_values: list[float] = []
    recon_rmssd_values: list[float] = []
    target_sdnn_values: list[float] = []
    recon_sdnn_values: list[float] = []

    for target, recon in zip(originals, reconstructions):
        target_metrics = compute_ppg_physiokit_metrics(
            target, sample_rate=sample_rate, low_hz=low_hz, high_hz=high_hz,
            order=order, min_peaks=min_peaks,
        )
        recon_metrics = compute_ppg_physiokit_metrics(
            recon, sample_rate=sample_rate, low_hz=low_hz, high_hz=high_hz,
            order=order, min_peaks=min_peaks,
        )
        if target_metrics is None or recon_metrics is None:
            per_sample.append(None)
            continue

        hr_diff = recon_metrics["hr_bpm"] - target_metrics["hr_bpm"]
        rmssd_diff = recon_metrics["rmssd_ms"] - target_metrics["rmssd_ms"]
        sdnn_diff = recon_metrics["sdnn_ms"] - target_metrics["sdnn_ms"]
        target_hr_values.append(target_metrics["hr_bpm"])
        recon_hr_values.append(recon_metrics["hr_bpm"])
        target_rmssd_values.append(target_metrics["rmssd_ms"])
        recon_rmssd_values.append(recon_metrics["rmssd_ms"])
        target_sdnn_values.append(target_metrics["sdnn_ms"])
        recon_sdnn_values.append(recon_metrics["sdnn_ms"])
        hr_abs_errors.append(abs(hr_diff))
        hr_biases.append(hr_diff)
        rmssd_abs_errors.append(abs(rmssd_diff))
        sdnn_abs_errors.append(abs(sdnn_diff))
        per_sample.append({
            "target": target_metrics,
            "reconstructed": recon_metrics,
            "delta": {
                "hr_bpm": float(hr_diff),
                "rmssd_ms": float(rmssd_diff),
                "sdnn_ms": float(sdnn_diff),
            },
        })

    valid_pairs = len(hr_abs_errors)
    if valid_pairs == 0:
        return None, per_sample

    summary = {
        "num_total_pairs": int(len(per_sample)),
        "num_valid_pairs": int(valid_pairs),
        "target_mean_hr_bpm": float(np.mean(target_hr_values)),
        "reconstructed_mean_hr_bpm": float(np.mean(recon_hr_values)),
        "hr_mae_bpm": float(np.mean(hr_abs_errors)),
        "hr_bias_bpm": float(np.mean(hr_biases)),
        "target_mean_rmssd_ms": float(np.mean(target_rmssd_values)),
        "reconstructed_mean_rmssd_ms": float(np.mean(recon_rmssd_values)),
        "rmssd_mae_ms": float(np.mean(rmssd_abs_errors)),
        "target_mean_sdnn_ms": float(np.mean(target_sdnn_values)),
        "reconstructed_mean_sdnn_ms": float(np.mean(recon_sdnn_values)),
        "sdnn_mae_ms": float(np.mean(sdnn_abs_errors)),
    }
    return summary, per_sample

Artifacts

compressionkit.evaluation.artifacts.save_sample_artifacts(sample_id, original, reconstructed, sampling_rate, run_dir, band_original=None, band_reconstructed=None, physiokit_metrics=None)

Save per-sample CSV, plot, and compute metrics for one evaluation sample.

Parameters:

Name Type Description Default
sample_id int

Index of the evaluation sample.

required
original ndarray

Original signal (1D or squeezable).

required
reconstructed ndarray

Reconstructed signal (1D or squeezable).

required
sampling_rate int

Signal sampling rate in Hz.

required
run_dir Path

Directory to write artifacts into.

required
band_original ndarray | None

Optional band-filtered original for band metrics.

None
band_reconstructed ndarray | None

Optional band-filtered reconstruction.

None
physiokit_metrics dict[str, Any] | None

Optional physiokit metrics dict for this sample.

None

Returns:

Type Description
dict[str, Any]

Dictionary with paths and metric values.

Source code in compressionkit/evaluation/artifacts.py
def save_sample_artifacts(
    sample_id: int,
    original: np.ndarray,
    reconstructed: np.ndarray,
    sampling_rate: int,
    run_dir: Path,
    band_original: np.ndarray | None = None,
    band_reconstructed: np.ndarray | None = None,
    physiokit_metrics: dict[str, Any] | None = None,
) -> dict[str, Any]:
    """Save per-sample CSV, plot, and compute metrics for one evaluation sample.

    Args:
        sample_id: Index of the evaluation sample.
        original: Original signal (1D or squeezable).
        reconstructed: Reconstructed signal (1D or squeezable).
        sampling_rate: Signal sampling rate in Hz.
        run_dir: Directory to write artifacts into.
        band_original: Optional band-filtered original for band metrics.
        band_reconstructed: Optional band-filtered reconstruction.
        physiokit_metrics: Optional physiokit metrics dict for this sample.

    Returns:
        Dictionary with paths and metric values.
    """
    orig_flat = original.reshape(-1)
    recon_flat = reconstructed.reshape(-1)

    csv_rows = [
        {"time_index": idx, "original": float(o), "reconstructed": float(r)}
        for idx, (o, r) in enumerate(zip(orig_flat, recon_flat))
    ]
    csv_path = run_dir / f"sample_{sample_id:03d}.csv"
    pd.DataFrame(csv_rows).to_csv(csv_path, index=False)

    metrics = compute_signal_metrics(orig_flat, recon_flat)

    plots_dir = run_dir / "plots"
    plots_dir.mkdir(exist_ok=True)
    ts = np.arange(len(orig_flat)) / sampling_rate
    fig, ax = plt.subplots(figsize=(8, 3))
    ax.plot(ts, orig_flat, label="Original")
    ax.plot(ts, recon_flat, label="Reconstruction", alpha=0.8)
    ax.set_title(f"Sample {sample_id}")
    ax.set_xlabel("Time (s)")
    ax.set_ylabel("Amplitude")
    ax.legend(loc="upper right")
    ax.grid(alpha=0.3)
    fig.tight_layout()
    plot_path = plots_dir / f"sample_{sample_id:03d}.png"
    fig.savefig(plot_path)
    plt.close(fig)

    results: dict[str, Any] = {
        "csv": str(csv_path.relative_to(run_dir)),
        "plot": str(plot_path.relative_to(run_dir)),
        "metrics": {
            "mse": metrics["mse"],
            "mae": metrics["mae"],
            "cosine_similarity": metrics["cosine_similarity"],
        },
    }
    if band_original is not None and band_reconstructed is not None:
        band_metrics = compute_signal_metrics(band_original, band_reconstructed)
        results["band_metrics"] = {
            "mse": band_metrics["mse"],
            "mae": band_metrics["mae"],
            "cosine_similarity": band_metrics["cosine_similarity"],
            "prd_percent": band_metrics["prd_percent"],
        }
    if physiokit_metrics is not None:
        results["physiokit_metrics"] = physiokit_metrics
    return results