Skip to content

Photoplethysmography (PPG)

Photoplethysmography (PPG) is a non-invasive optical technique used to measure blood volume changes in the microvascular bed of tissue. PPG signals are often used to measure heart rate, heart rate variability (HRV), respiratory rate, and oxygen saturation (SpO2). In PhysioKit, we provide a variety of routines for processing PPG signals.

physiokit.ppg.clean

clean(data, lowcut=0.5, highcut=4, sample_rate=1000, order=3, axis=-1, forward_backward=True)

Clean PPG signal using biquad filter.

By default applies a 3rd order Butterworth filter between 0.5 and 4 Hz.

Parameters:

  • data (NDArray) –

    Signal

  • lowcut (float, default: 0.5 ) –

    Lower cutoff in Hz. Defaults to 0.5 Hz.

  • highcut (float, default: 4 ) –

    Upper cutoff in Hz. Defaults to 4 Hz.

  • sample_rate (float, default: 1000 ) –

    Sample rate in Hz. Defaults to 1000 Hz.

  • order (int, default: 3 ) –

    Filter order. Defaults to 3 (3rd order Butterworth filter).

  • axis (int, default: -1 ) –

    Axis to apply against. Defaults to -1.

  • forward_backward (bool, default: True ) –

    Apply filter forward and backward. Defaults to True.

Returns:

  • NDArray

    npt.NDArray: Cleaned signal

Source code in physiokit/ppg/clean.py
def clean(
    data: npt.NDArray,
    lowcut: float = 0.5,
    highcut: float = 4,
    sample_rate: float = 1000,
    order: int = 3,
    axis: int = -1,
    forward_backward: bool = True,
) -> npt.NDArray:
    """Clean PPG signal using biquad filter.

    By default applies a 3rd order Butterworth filter between 0.5 and 4 Hz.

    Args:
        data (npt.NDArray): Signal
        lowcut (float, optional): Lower cutoff in Hz. Defaults to 0.5 Hz.
        highcut (float, optional): Upper cutoff in Hz. Defaults to 4 Hz.
        sample_rate (float, optional): Sample rate in Hz. Defaults to 1000 Hz.
        order (int, optional): Filter order. Defaults to 3 (3rd order Butterworth filter).
        axis (int, optional): Axis to apply against. Defaults to -1.
        forward_backward (bool, optional): Apply filter forward and backward. Defaults to True.

    Returns:
        npt.NDArray: Cleaned signal
    """

    return filter_signal(
        data=data,
        lowcut=lowcut,
        highcut=highcut,
        sample_rate=sample_rate,
        order=order,
        axis=axis,
        forward_backward=forward_backward,
    )

physiokit.ppg.metrics

compute_heart_rate(data, sample_rate=1000, method='fft', **kwargs)

Compute heart rate in BPM from PPG signal.

Parameters:

  • data (array) –

    PPG signal.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • method (str, default: 'fft' ) –

    Method to compute heart rate. Defaults to 'fft'.

  • **kwargs (dict, default: {} ) –

    Keyword arguments to pass to method.

Returns:

Source code in physiokit/ppg/metrics.py
def compute_heart_rate(
    data: npt.NDArray, sample_rate: float = 1000, method: str = "fft", **kwargs: dict
) -> tuple[float, float]:
    """Compute heart rate in BPM from PPG signal.

    Args:
        data (array): PPG signal.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        method (str, optional): Method to compute heart rate. Defaults to 'fft'.
        **kwargs (dict): Keyword arguments to pass to method.

    Returns:
        float: Heart rate in BPM.
    """
    match method:
        case "fft":
            bpm, qos = compute_heart_rate_from_fft(data=data, sample_rate=sample_rate, **kwargs)
        case "peak":
            bpm, qos = compute_heart_rate_from_peaks(data=data, sample_rate=sample_rate, **kwargs)
        case _:
            raise NotImplementedError(f"Heart rate computation method {method} not implemented.")
    # END MATCH
    return bpm, qos

compute_heart_rate_from_fft(data, sample_rate=1000, lowcut=0.5, highcut=4.0)

Compute heart rate from FFT of PPG signal.

Parameters:

  • data (array) –

    PPG signal.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • lowcut (float, default: 0.5 ) –

    Lowcut frequency in Hz. Defaults to 0.5 Hz.

  • highcut (float, default: 4.0 ) –

    Highcut frequency in Hz. Defaults to 4.0 Hz.

Returns:

  • tuple[float, float]

    tuple[float, float]: Heart rate (BPM) and qos metric.

Source code in physiokit/ppg/metrics.py
def compute_heart_rate_from_fft(
    data: npt.NDArray, sample_rate: float = 1000, lowcut: float = 0.5, highcut: float = 4.0
) -> tuple[float, float]:
    """Compute heart rate from FFT of PPG signal.

    Args:
        data (array): PPG signal.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        lowcut (float, optional): Lowcut frequency in Hz. Defaults to 0.5 Hz.
        highcut (float, optional): Highcut frequency in Hz. Defaults to 4.0 Hz.

    Returns:
        tuple[float, float]: Heart rate (BPM) and qos metric.
    """
    freqs, sp = compute_fft(data, sample_rate)
    l_idx = np.where(freqs >= lowcut)[0][0]
    r_idx = np.where(freqs >= highcut)[0][0]
    freqs = freqs[l_idx:r_idx]
    ps = 2 * np.abs(sp[l_idx:r_idx])
    fft_pk_idx = np.argmax(ps)
    bpm = 60 * freqs[fft_pk_idx]
    qos = ps[fft_pk_idx] / np.sum(ps)
    return bpm, qos

compute_heart_rate_from_peaks(data, sample_rate=1000, min_rr=0.3, max_rr=2.0, min_delta=0.3)

Compute heart rate from peaks of PPG signal.

Parameters:

  • data (array) –

    PPG signal.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • min_rr (float, default: 0.3 ) –

    Minimum RR interval in seconds. Defaults to 0.3 s.

  • max_rr (float, default: 2.0 ) –

    Maximum RR interval in seconds. Defaults to 2.0 s.

  • min_delta (float, default: 0.3 ) –

    Minimum RR interval delta. Defaults to 0.3.

Returns:

  • tuple[float, float]

    tuple[float, float]: Heart rate (BPM) and qos metric.

Source code in physiokit/ppg/metrics.py
def compute_heart_rate_from_peaks(
    data: npt.NDArray, sample_rate: float = 1000, min_rr: float = 0.3, max_rr: float = 2.0, min_delta: float = 0.3
) -> tuple[float, float]:
    """Compute heart rate from peaks of PPG signal.

    Args:
        data (array): PPG signal.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        min_rr (float, optional): Minimum RR interval in seconds. Defaults to 0.3 s.
        max_rr (float, optional): Maximum RR interval in seconds. Defaults to 2.0 s.
        min_delta (float, optional): Minimum RR interval delta. Defaults to 0.3.

    Returns:
        tuple[float, float]: Heart rate (BPM) and qos metric.
    """
    peaks = find_peaks(data=data, sample_rate=sample_rate)
    rri = compute_rr_intervals(peaks=peaks)
    rmask = filter_rr_intervals(rr_ints=rri, sample_rate=sample_rate, min_rr=min_rr, max_rr=max_rr, min_delta=min_delta)
    bpm = 60 / (np.nanmean(rri[rmask == 0]) / sample_rate)
    qos = rmask[rmask == 0].size / rmask.size
    return bpm, qos

compute_spo2_from_perfusion(dc1, ac1, dc2, ac2, coefs=(1, 0, 0))

Compute SpO2 from ratio of perfusion indexes (AC/DC).

Device Coefficients
  • MAX30101: [1.5958422, -34.6596622, 112.6898759]
  • MAX8614X: [-16.666666, 8.333333, 100]

Parameters:

  • dc1 (float) –

    DC component of 1st PPG signal (e.g RED).

  • ac1 (float) –

    AC component of 1st PPG signal (e.g RED).

  • dc2 (float) –

    DC component of 2nd PPG signal (e.g. IR).

  • ac2 (float) –

    AC component of 2nd PPG signal (e.g. IR).

  • coefs (tuple[float, float, float], default: (1, 0, 0) ) –

    Calibration coefficients. Defaults to (1, 0, 0).

Returns:

  • float ( float ) –

    SpO2 value clipped to [50, 100].

Source code in physiokit/ppg/metrics.py
def compute_spo2_from_perfusion(
    dc1: float, ac1: float, dc2: float, ac2: float, coefs: tuple[float, float, float] = (1, 0, 0)
) -> float:
    """Compute SpO2 from ratio of perfusion indexes (AC/DC).

    Device Coefficients:
        * MAX30101: [1.5958422, -34.6596622, 112.6898759]
        * MAX8614X: [-16.666666, 8.333333, 100]

    Args:
        dc1 (float): DC component of 1st PPG signal (e.g RED).
        ac1 (float): AC component of 1st PPG signal (e.g RED).
        dc2 (float): DC component of 2nd PPG signal (e.g. IR).
        ac2 (float): AC component of 2nd PPG signal (e.g. IR).
        coefs (tuple[float, float, float], optional): Calibration coefficients. Defaults to (1, 0, 0).

    Returns:
        float: SpO2 value clipped to [50, 100].
    """
    r = (ac1 / dc1) / (ac2 / dc2)
    spo2 = coefs[0] * r**2 + coefs[1] * r + coefs[2]
    return max(min(spo2, 100), 50)

compute_spo2_in_frequency(ppg1, ppg2, coefs=(1, 0, 0), sample_rate=1000, lowcut=0.5, highcut=4.0, order=3)

Compute SpO2 from PPG signals in frequency domain.

Parameters:

  • ppg1 (array) –

    1st PPG signal (e.g RED).

  • ppg2 (array) –

    2nd PPG signal (e.g. IR).

  • coefs (tuple[float, float, float], default: (1, 0, 0) ) –

    Calibration coefficients. Defaults to (1, 0, 0).

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • lowcut (float, default: 0.5 ) –

    Lowcut frequency in Hz. Defaults to 0.5 Hz.

  • highcut (float, default: 4.0 ) –

    Highcut frequency in Hz. Defaults to 4.0 Hz.

  • order (int, default: 3 ) –

    Order of filter. Defaults to 3.

Returns:

  • float ( float ) –

    SpO2 value

Source code in physiokit/ppg/metrics.py
def compute_spo2_in_frequency(
    ppg1: npt.NDArray,
    ppg2: npt.NDArray,
    coefs: tuple[float, float, float] = (1, 0, 0),
    sample_rate: float = 1000,
    lowcut: float = 0.5,
    highcut: float = 4.0,
    order: int = 3,
) -> float:
    """Compute SpO2 from PPG signals in frequency domain.

    Args:
        ppg1 (array): 1st PPG signal (e.g RED).
        ppg2 (array): 2nd PPG signal (e.g. IR).
        coefs (tuple[float, float, float], optional): Calibration coefficients. Defaults to (1, 0, 0).
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        lowcut (float, optional): Lowcut frequency in Hz. Defaults to 0.5 Hz.
        highcut (float, optional): Highcut frequency in Hz. Defaults to 4.0 Hz.
        order (int, optional): Order of filter. Defaults to 3.

    Returns:
        float: SpO2 value
    """

    # Compute DC
    ppg1_dc = np.mean(ppg1)
    ppg2_dc = np.mean(ppg2)

    # Bandpass filter
    ppg1_clean = filter_signal(
        data=ppg1, lowcut=lowcut, highcut=highcut, sample_rate=sample_rate, order=order, forward_backward=True
    )
    ppg2_clean = filter_signal(
        data=ppg2, lowcut=lowcut, highcut=highcut, sample_rate=sample_rate, order=order, forward_backward=True
    )

    # Compute AC via FFT
    freqs, ppg1_fft = compute_fft(ppg1_clean, sample_rate=sample_rate)
    freqs, ppg2_fft = compute_fft(ppg2_clean, sample_rate=sample_rate)

    l_idx = np.where(freqs >= lowcut)[0][0]
    r_idx = np.where(freqs >= highcut)[0][0]

    freqs = freqs[l_idx:r_idx]
    ppg1_ps = 2 * np.abs(ppg1_fft[l_idx:r_idx])
    ppg2_ps = 2 * np.abs(ppg2_fft[l_idx:r_idx])

    # Find peak
    fft_pk_idx = np.argmax(ppg1_ps + ppg2_ps)

    # Compute AC
    ppg1_ac = ppg1_ps[fft_pk_idx]
    ppg2_ac = ppg2_ps[fft_pk_idx]

    spo2 = compute_spo2_from_perfusion(dc1=ppg1_dc, ac1=ppg1_ac, dc2=ppg2_dc, ac2=ppg2_ac, coefs=coefs)

    return spo2

compute_spo2_in_time(ppg1, ppg2, coefs=(1, 0, 0), sample_rate=1000, lowcut=0.5, highcut=4, order=3)

Compute SpO2 from PPG signals in time domain.

Parameters:

  • ppg1 (array) –

    1st PPG signal (e.g RED).

  • ppg2 (array) –

    2nd PPG signal (e.g. IR).

  • coefs (tuple[float, float, float], default: (1, 0, 0) ) –

    Calibration coefficients. Defaults to (1, 0, 0).

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • lowcut (float, default: 0.5 ) –

    Lowcut frequency in Hz. Defaults to 0.5 Hz.

  • highcut (float, default: 4 ) –

    Highcut frequency in Hz. Defaults to 4.0 Hz.

  • order (int, default: 3 ) –

    Order of filter. Defaults to 3.

Returns:

  • float ( float ) –

    SpO2 value

Source code in physiokit/ppg/metrics.py
def compute_spo2_in_time(
    ppg1: npt.NDArray,
    ppg2: npt.NDArray,
    coefs: tuple[float, float, float] = (1, 0, 0),
    sample_rate: float = 1000,
    lowcut: float = 0.5,
    highcut: float = 4,
    order: int = 3,
) -> float:
    """Compute SpO2 from PPG signals in time domain.

    Args:
        ppg1 (array): 1st PPG signal (e.g RED).
        ppg2 (array): 2nd PPG signal (e.g. IR).
        coefs (tuple[float, float, float], optional): Calibration coefficients. Defaults to (1, 0, 0).
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        lowcut (float, optional): Lowcut frequency in Hz. Defaults to 0.5 Hz.
        highcut (float, optional): Highcut frequency in Hz. Defaults to 4.0 Hz.
        order (int, optional): Order of filter. Defaults to 3.

    Returns:
        float: SpO2 value
    """

    # Compute DC
    ppg1_dc = np.mean(ppg1)
    ppg2_dc = np.mean(ppg2)

    # Bandpass filter
    ppg1_clean = filter_signal(
        data=ppg1, lowcut=lowcut, highcut=highcut, sample_rate=sample_rate, order=order, forward_backward=True
    )

    ppg2_clean = filter_signal(
        data=ppg2, lowcut=lowcut, highcut=highcut, sample_rate=sample_rate, order=order, forward_backward=True
    )

    # Compute AC via RMS
    ppg1_ac = np.sqrt(np.mean(ppg1_clean**2))
    ppg2_ac = np.sqrt(np.mean(ppg2_clean**2))

    spo2 = compute_spo2_from_perfusion(dc1=ppg1_dc, ac1=ppg1_ac, dc2=ppg2_dc, ac2=ppg2_ac, coefs=coefs)
    return spo2

derive_respiratory_rate(ppg, peaks, troughs=None, rri=None, sample_rate=1000, method='rifv', lowcut=0.1, highcut=1.0, order=3, threshold=0.85, interpolate_method='linear')

Derive respiratory rate from PPG signal using given method.

Parameters:

  • ppg (array) –

    PPG signal.

  • peaks (array) –

    Peaks of PPG signal.

  • troughs (array, default: None ) –

    Troughs of PPG signal. Defaults to None.

  • rri (array, default: None ) –

    Respiratory interval. Defaults to None.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • method (str, default: 'rifv' ) –

    Method to compute respiratory rate. Defaults to 'riav'.

  • lowcut (float, default: 0.1 ) –

    Lowcut frequency in Hz. Defaults to 0.1 Hz.

  • highcut (float, default: 1.0 ) –

    Highcut frequency in Hz. Defaults to 1.0 Hz.

  • order (int, default: 3 ) –

    Order of filter. Defaults to 3.

  • threshold (float, default: 0.85 ) –

    Threshold for peak detection. Defaults to 0.85.

  • interpolate_method (str, default: 'linear' ) –

    Interpolation method. Defaults to 'linear'.

Returns:

  • tuple[float, float]

    tuple[float, float]: Respiratory rate (BPM) and qos metric.

Source code in physiokit/ppg/metrics.py
def derive_respiratory_rate(
    ppg: npt.NDArray,
    peaks: npt.NDArray,
    troughs: npt.NDArray | None = None,
    rri: npt.NDArray | None = None,
    sample_rate: float = 1000,
    method: Literal["riav", "riiv", "rifv"] = "rifv",
    lowcut: float = 0.1,
    highcut: float = 1.0,
    order: int = 3,
    threshold: float | None = 0.85,
    interpolate_method: str = "linear",
) -> tuple[float, float]:
    """Derive respiratory rate from PPG signal using given method.

    Args:
        ppg (array): PPG signal.
        peaks (array): Peaks of PPG signal.
        troughs (array, optional): Troughs of PPG signal. Defaults to None.
        rri (array, optional): Respiratory interval. Defaults to None.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        method (str, optional): Method to compute respiratory rate. Defaults to 'riav'.
        lowcut (float, optional): Lowcut frequency in Hz. Defaults to 0.1 Hz.
        highcut (float, optional): Highcut frequency in Hz. Defaults to 1.0 Hz.
        order (int, optional): Order of filter. Defaults to 3.
        threshold (float, optional): Threshold for peak detection. Defaults to 0.85.
        interpolate_method (str, optional): Interpolation method. Defaults to 'linear'.

    Returns:
        tuple[float, float]: Respiratory rate (BPM) and qos metric.
    """
    if peaks.size < 4:
        raise ValueError("At least 4 peaks are required to compute respiratory rate")

    ts = np.arange(peaks[0], peaks[-1], 1)
    match method:
        case "riav":
            rsp = ppg[peaks] - ppg[troughs]
        case "riiv":
            rsp = ppg[peaks]
        case "rifv":
            rsp = rri
        case _:
            raise ValueError(f"Method {method} not implemented")
    rsp = scipy.interpolate.interp1d(peaks, rsp, kind=interpolate_method, fill_value="extrapolate")(ts)
    rsp = filter_signal(rsp, lowcut=lowcut, highcut=highcut, sample_rate=sample_rate, order=order)

    freqs, rsp_sp = compute_fft(rsp, sample_rate=sample_rate)
    l_idx = np.where(freqs >= lowcut)[0][0]
    r_idx = np.where(freqs >= highcut)[0][0]
    rsp_ps = 2 * np.abs(rsp_sp)
    freqs = freqs[l_idx:r_idx]
    rsp_ps = rsp_ps[l_idx:r_idx]

    fft_pk_idx = np.argmax(rsp_ps)
    if threshold is not None:
        fft_pk_indices = np.where(rsp_ps > threshold * rsp_ps[fft_pk_idx])[0]
    else:
        fft_pk_indices = [fft_pk_idx]

    rsp_bpm_weights = rsp_ps[fft_pk_indices]
    tgt_pwr = np.sum(rsp_bpm_weights)
    qos = tgt_pwr / np.sum(rsp_ps)
    rsp_bpm = 60 * np.sum(rsp_bpm_weights * freqs[fft_pk_indices]) / tgt_pwr
    return rsp_bpm, qos

physiokit.ppg.peaks

compute_rr_intervals(peaks)

Compute RR intervals from R peaks.

Parameters:

  • peaks (array) –

    R peaks.

Returns:

  • NDArray

    npt.NDArray: RR intervals.

Source code in physiokit/ppg/peaks.py
def compute_rr_intervals(
    peaks: npt.NDArray,
) -> npt.NDArray:
    """Compute RR intervals from R peaks.

    Args:
        peaks (array): R peaks.

    Returns:
        npt.NDArray: RR intervals.
    """

    rr_ints = np.diff(peaks)
    if rr_ints.size == 0:
        return rr_ints
    rr_ints = np.hstack((rr_ints[0], rr_ints))
    return rr_ints

filter_peaks(peaks, sample_rate=1000, min_rr=0.3, max_rr=2.0, min_delta=0.3)

Filter out peaks with RR intervals outside of normal range.

Parameters:

  • peaks (array) –

    Systolic peaks.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • min_rr (float, default: 0.3 ) –

    Minimum RR interval in seconds. Defaults to 0.3 s.

  • max_rr (float, default: 2.0 ) –

    Maximum RR interval in seconds. Defaults to 2.0 s.

  • min_delta (float, default: 0.3 ) –

    Minimum RR interval delta. Defaults to 0.3.

Returns:

  • NDArray

    npt.NDArray: Filtered peaks.

Source code in physiokit/ppg/peaks.py
def filter_peaks(
    peaks: npt.NDArray,
    sample_rate: float = 1000,
    min_rr: float = 0.3,
    max_rr: float = 2.0,
    min_delta: float | None = 0.3,
) -> npt.NDArray:
    """Filter out peaks with RR intervals outside of normal range.

    Args:
        peaks (array): Systolic peaks.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        min_rr (float, optional): Minimum RR interval in seconds. Defaults to 0.3 s.
        max_rr (float, optional): Maximum RR interval in seconds. Defaults to 2.0 s.
        min_delta (float, optional): Minimum RR interval delta. Defaults to 0.3.

    Returns:
        npt.NDArray: Filtered peaks.
    """
    if peaks.size <= 1:
        return peaks
    # Capture RR intervals
    rr_ints = np.diff(peaks)
    rr_ints = np.hstack((rr_ints[0], rr_ints))

    # Filter out peaks with RR intervals outside of normal range
    rr_mask = np.where((rr_ints < min_rr * sample_rate) | (rr_ints > max_rr * sample_rate), 1, 0)

    # Filter out peaks that deviate more than delta
    if min_delta is not None:
        rr_mask = quotient_filter_mask(rr_ints, mask=rr_mask, lowcut=1 - min_delta, highcut=1 + min_delta)
    filt_peaks = peaks[np.where(rr_mask == 0)[0]]
    return filt_peaks

filter_rr_intervals(rr_ints, sample_rate=1000, min_rr=0.3, max_rr=2.0, min_delta=0.3)

Filter out peaks with RR intervals outside of normal range.

Parameters:

  • rr_ints (array) –

    RR intervals.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • min_rr (float, default: 0.3 ) –

    Minimum RR interval in seconds. Defaults to 0.3 s.

  • max_rr (float, default: 2.0 ) –

    Maximum RR interval in seconds. Defaults to 2.0 s.

  • min_delta (float, default: 0.3 ) –

    Minimum RR interval delta. Defaults to 0.3.

Returns:

  • NDArray

    npt.NDArray: Filtered RR intervals.

Source code in physiokit/ppg/peaks.py
def filter_rr_intervals(
    rr_ints: npt.NDArray, sample_rate: float = 1000, min_rr: float = 0.3, max_rr: float = 2.0, min_delta: float = 0.3
) -> npt.NDArray:
    """Filter out peaks with RR intervals outside of normal range.

    Args:
        rr_ints (array): RR intervals.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        min_rr (float, optional): Minimum RR interval in seconds. Defaults to 0.3 s.
        max_rr (float, optional): Maximum RR interval in seconds. Defaults to 2.0 s.
        min_delta (float, optional): Minimum RR interval delta. Defaults to 0.3.

    Returns:
        npt.NDArray: Filtered RR intervals.
    """
    if rr_ints.size == 0:
        return np.array([])

    # Filter out peaks with RR intervals outside of normal range
    rr_mask = np.where((rr_ints < min_rr * sample_rate) | (rr_ints > max_rr * sample_rate), 1, 0)

    # Filter out peaks that deviate more than delta
    rr_mask = quotient_filter_mask(rr_ints, mask=rr_mask, lowcut=1 - min_delta, highcut=1 + min_delta)

    return rr_mask

find_peaks(data, sample_rate=1000, peak_window=0.111, beat_window=0.667, beat_offset=0.02, peak_delay=0.3)

Find systolic peaks in PPG signal.

Implementation based on Elgendi M, Norton I, Brearley M, Abbott D, Schuurmans D (2013) Systolic Peak Detection in Acceleration Photoplethysmograms Measured from Emergency Responders in Tropical Conditions. PLoS ONE 8(10): e76585. doi:10.1371/journal.pone.0076585. Assumes input data is bandpass filtered with a lowcut of .5 Hz and a highcut of 8 Hz.

Parameters:

  • data (array) –

    PPG signal.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • peak_window (float, default: 0.111 ) –

    Peak window in seconds. Defaults to 0.111 s.

  • beat_window (float, default: 0.667 ) –

    Beat window in seconds. Defaults to 0.667 s.

  • beat_offset (float, default: 0.02 ) –

    Beat offset in seconds. Defaults to 0.02 s.

  • peak_delay (float, default: 0.3 ) –

    Peak delay in seconds. Defaults to 0.3 s.

Returns:

  • NDArray

    npt.NDArray: Peak locations.

Source code in physiokit/ppg/peaks.py
def find_peaks(
    data: npt.NDArray,
    sample_rate: float = 1000,
    peak_window: float = 0.111,
    beat_window: float = 0.667,
    beat_offset: float = 0.02,
    peak_delay: float = 0.3,
) -> npt.NDArray:
    """Find systolic peaks in PPG signal.

    Implementation based on Elgendi M, Norton I, Brearley M, Abbott D, Schuurmans D (2013) Systolic Peak Detection in
    Acceleration Photoplethysmograms Measured from Emergency Responders in Tropical Conditions. PLoS ONE 8(10): e76585.
    doi:10.1371/journal.pone.0076585.
    Assumes input data is bandpass filtered with a lowcut of .5 Hz and a highcut of 8 Hz.

    Args:
        data (array): PPG signal.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        peak_window (float, optional): Peak window in seconds. Defaults to 0.111 s.
        beat_window (float, optional): Beat window in seconds. Defaults to 0.667 s.
        beat_offset (float, optional): Beat offset in seconds. Defaults to 0.02 s.
        peak_delay (float, optional): Peak delay in seconds. Defaults to 0.3 s.

    Returns:
        npt.NDArray: Peak locations.
    """

    # Clip negative values and square the signal
    sqrd = np.where(data > 0, data**2, 0)

    # Apply 1st moving average filter
    ma_peak_kernel = int(np.rint(peak_window * sample_rate))
    ma_peak = spn.uniform_filter1d(sqrd, ma_peak_kernel, mode="nearest")

    # Apply 2nd moving average filter
    ma_beat_kernel = int(np.rint(beat_window * sample_rate))
    ma_beat = spn.uniform_filter1d(sqrd, ma_beat_kernel, mode="nearest")

    # Thresholds
    min_height = ma_beat + beat_offset * np.mean(sqrd)
    min_width = int(np.rint(peak_window * sample_rate))
    min_delay = int(np.rint(peak_delay * sample_rate))

    # Identify wave boundaries
    waves = ma_peak > min_height
    beg_waves = np.where(np.logical_and(np.logical_not(waves[0:-1]), waves[1:]))[0]
    end_waves = np.where(np.logical_and(waves[0:-1], np.logical_not(waves[1:])))[0]
    end_waves = end_waves[end_waves > beg_waves[0]]

    # Identify systolic peaks
    peaks = []
    for i in range(min(beg_waves.size, end_waves.size)):
        beg, end = beg_waves[i], end_waves[i]
        peak = beg + np.argmax(data[beg:end])
        peak_width = end - beg
        peak_delay = peak - peaks[-1] if peaks else min_delay

        # Enforce minimum length and delay between peaks
        if (peak_width < min_width) or (peak_delay < min_delay):
            continue
        peaks.append(peak)
    # END FOR

    return np.array(peaks, dtype=int)

physiokit.ppg.synthesize

synthesize(signal_length=10000, sample_rate=1000, heart_rate=60, frequency_modulation=0.3, ibi_randomness=0.1)

Generate synthetic PPG signal. Utilize pk.signal.noise methods to make more realistic.

Parameters:

  • signal_length (int, default: 10000 ) –

    Length of signal in samples. Defaults to 10000.

  • sample_rate (float, default: 1000 ) –

    Sample rate in Hz. Defaults to 1000 Hz.

  • heart_rate (float, default: 60 ) –

    Heart rate in BPM. Defaults to 60 BPM.

  • frequency_modulation (float, default: 0.3 ) –

    Frequency modulation strength [0,1]. Defaults to 0.3.

  • ibi_randomness (float, default: 0.1 ) –

    IBI randomness in range [0,1]. Defaults to 0.1.

Returns:

  • tuple[NDArray, NDArray, NDArray]

    npt.NDArray: Synthetic PPG, segmentation mask, fiducial mask

Source code in physiokit/ppg/synthesize.py
def synthesize(
    signal_length: int = 10000,
    sample_rate: float = 1000,
    heart_rate: float = 60,
    frequency_modulation: float = 0.3,
    ibi_randomness: float = 0.1,
) -> tuple[npt.NDArray, npt.NDArray, npt.NDArray]:
    """Generate synthetic PPG signal. Utilize pk.signal.noise methods to make more realistic.

    Args:
        signal_length (int, optional): Length of signal in samples. Defaults to 10000.
        sample_rate (float, optional): Sample rate in Hz. Defaults to 1000 Hz.
        heart_rate (float, optional): Heart rate in BPM. Defaults to 60 BPM.
        frequency_modulation (float, optional): Frequency modulation strength [0,1]. Defaults to 0.3.
        ibi_randomness (float, optional): IBI randomness in range [0,1]. Defaults to 0.1.

    Returns:
        npt.NDArray: Synthetic PPG, segmentation mask, fiducial mask
    """
    duration = signal_length / sample_rate
    period = 60 / heart_rate  # in seconds
    n_period = int(np.rint(duration / period) + 1)
    periods = np.ones(n_period) * period

    # Mark onset of each wave in seconds
    x_onset = np.cumsum(periods)
    x_onset -= x_onset[0]  # make sure seconds start at zero

    # Add respiratory sinus arrythmia (RSA)
    periods, x_onset = _frequency_modulation(
        periods,
        x_onset,
        modulation_frequency=0.05,
        modulation_strength=frequency_modulation,
    )

    # Modulate onset of each wave randomly ~[0, ibi_randomness]
    x_onset = _random_x_offset(x_onset, ibi_randomness)
    y_onset = np.random.normal(0, 0.1, n_period)

    # Create systolic peaks within the waves in seconds
    x_sys = x_onset + np.random.normal(0.175, 0.01, n_period) * periods
    y_sys = y_onset + np.random.normal(1.5, 0.15, n_period)

    # Create dicrotic notches within the waves in seconds
    x_notch = x_onset + np.random.normal(0.4, 0.001, n_period) * periods
    y_notch = y_sys * np.random.normal(0.49, 0.01, n_period)

    # Create diastolic peaks within the waves in seconds
    x_dia = x_onset + np.random.normal(0.45, 0.001, n_period) * periods
    y_dia = y_sys * np.random.normal(0.51, 0.01, n_period)

    # Convert seconds to sample
    x_onset_n = np.ceil(x_onset * sample_rate).astype(int)
    x_sys_n = np.ceil(x_sys * sample_rate).astype(int)
    x_notch_n = np.ceil(x_notch * sample_rate).astype(int)
    x_dia_n = np.ceil(x_dia * sample_rate).astype(int)

    # Concatenate all landmarks and sort them
    x_all = np.concatenate((x_onset_n, x_sys_n, x_notch_n, x_dia_n))
    x_all.sort(kind="mergesort")

    y_all = np.zeros(n_period * 4)
    y_all[0::4] = y_onset
    y_all[1::4] = y_sys
    y_all[2::4] = y_notch
    y_all[3::4] = y_dia

    # Interpolate a continuous signal between the landmarks (i.e., Cartesian coordinates).
    samples = np.arange(int(np.ceil(duration * sample_rate)))

    # Create fiducial mask
    fids = np.zeros(len(samples), dtype=np.int32)
    fids[x_sys_n[x_sys_n < fids.size]] = PpgFiducial.systolic_peak
    fids[x_notch_n[x_notch_n < fids.size]] = PpgFiducial.dicrotic_notch
    fids[x_dia_n[x_dia_n < fids.size]] = PpgFiducial.diastolic_peak

    # Create segmentation mask
    x_sys_seg = np.concatenate((x_onset_n, x_dia_n - 1))
    x_sys_seg.sort(kind="mergesort")
    segs = np.full(len(samples), fill_value=PpgSegment.diastolic, dtype=np.int32)
    for i in range(len(x_sys_seg) // 2):
        segs[x_sys_seg[2 * i] : x_sys_seg[2 * i + 1]] = PpgSegment.systolic

    # Interpolate
    interp_function = scipy.interpolate.Akima1DInterpolator(x_all, y_all)
    ppg = interp_function(samples)

    ppg = ppg[:signal_length]
    segs = segs[:signal_length]
    fids = fids[:signal_length]

    return ppg, segs, fids