Skip to content

Preprocessing

PPG-specific preprocessing, augmentation, and synthetic data generation using heliaEDGE.

compressionkit.preprocessing.ppg.build_preprocessor(frame_size, epsilon=0.001)

Create preprocessing pipeline: random crop + layer normalization.

Parameters:

Name Type Description Default
frame_size int

Number of samples per frame after cropping.

required
epsilon float

LayerNorm epsilon for numerical stability.

0.001
Source code in compressionkit/preprocessing/ppg.py
def build_preprocessor(frame_size: int, epsilon: float = 1e-3) -> keras.layers.Layer:
    """Create preprocessing pipeline: random crop + layer normalization.

    Args:
        frame_size: Number of samples per frame after cropping.
        epsilon: LayerNorm epsilon for numerical stability.
    """
    return helia.layers.preprocessing.AugmentationPipeline(
        layers=[
            helia.layers.preprocessing.RandomCrop1D(duration=frame_size, name="RandomCrop"),
            helia.layers.preprocessing.LayerNormalization1D(epsilon=epsilon, name="LayerNorm"),
        ]
    )

compressionkit.preprocessing.ppg.build_augmenter(noise_factor=(0.01, 0.1))

Create augmentation pipeline: Gaussian noise injection.

Parameters:

Name Type Description Default
noise_factor tuple[float, float]

Range (min_std, max_std) for random noise amplitude.

(0.01, 0.1)
Source code in compressionkit/preprocessing/ppg.py
def build_augmenter(noise_factor: tuple[float, float] = (0.01, 0.1)) -> keras.layers.Layer:
    """Create augmentation pipeline: Gaussian noise injection.

    Args:
        noise_factor: Range ``(min_std, max_std)`` for random noise amplitude.
    """
    return helia.layers.preprocessing.AugmentationPipeline(
        layers=[
            helia.layers.preprocessing.RandomGaussianNoise1D(factor=noise_factor, name="GaussianNoise"),
        ]
    )

compressionkit.preprocessing.ppg.generate_synthetic_ppg_batch(*, num_segments, signal_length, sample_rate, heart_rate_bpm=None, frequency_modulation=None, ibi_randomness=None, seed=1337)

Generate synthetic PPG segments via physiokit.

Parameters:

Name Type Description Default
num_segments int

Number of synthetic segments to generate.

required
signal_length int

Samples per segment.

required
sample_rate int

Sampling rate in Hz.

required
heart_rate_bpm list[float] | None

Range [low, high] for heart rate in BPM.

None
frequency_modulation list[float] | None

Range [low, high] for frequency modulation.

None
ibi_randomness list[float] | None

Range [low, high] for inter-beat-interval randomness.

None
seed int

Random seed for reproducibility.

1337

Returns:

Type Description
ndarray

Array of shape [num_segments, signal_length] with float32 dtype.

Source code in compressionkit/preprocessing/ppg.py
def generate_synthetic_ppg_batch(
    *,
    num_segments: int,
    signal_length: int,
    sample_rate: int,
    heart_rate_bpm: list[float] | None = None,
    frequency_modulation: list[float] | None = None,
    ibi_randomness: list[float] | None = None,
    seed: int = 1337,
) -> np.ndarray:
    """Generate synthetic PPG segments via physiokit.

    Args:
        num_segments: Number of synthetic segments to generate.
        signal_length: Samples per segment.
        sample_rate: Sampling rate in Hz.
        heart_rate_bpm: Range ``[low, high]`` for heart rate in BPM.
        frequency_modulation: Range ``[low, high]`` for frequency modulation.
        ibi_randomness: Range ``[low, high]`` for inter-beat-interval randomness.
        seed: Random seed for reproducibility.

    Returns:
        Array of shape ``[num_segments, signal_length]`` with ``float32`` dtype.
    """
    if num_segments <= 0:
        return np.empty((0, signal_length), dtype=np.float32)

    heart_rate_bpm = heart_rate_bpm or [50.0, 120.0]
    frequency_modulation = frequency_modulation or [0.1, 0.5]
    ibi_randomness = ibi_randomness or [0.02, 0.2]

    rng = np.random.default_rng(seed)
    segments: list[np.ndarray] = []
    for _ in range(num_segments):
        hr = _sample_uniform_range(rng, heart_rate_bpm, default_low=50.0, default_high=120.0)
        fm = _sample_uniform_range(rng, frequency_modulation, default_low=0.1, default_high=0.5)
        ir = _sample_uniform_range(rng, ibi_randomness, default_low=0.02, default_high=0.2)
        signal, _, _ = pk.ppg.synthesize(
            signal_length=int(signal_length),
            sample_rate=float(sample_rate),
            heart_rate=float(hr),
            frequency_modulation=float(fm),
            ibi_randomness=float(ir),
        )
        segments.append(np.asarray(signal, dtype=np.float32).reshape(-1)[:signal_length])
    return np.stack(segments).astype(np.float32)