Skip to content

Electrocardiogram (ECG)

Electrocardiography (ECG) is a non-invasive technique used to measure the electrical activity of the heart. ECG signals are often used to measure heart rate, heart rate variability (HRV), and respiratory rate. In PhysioKit, we provide a variety of routines for processing ECG signals.

physiokit.ecg.defines

EcgFiducial

Bases: IntEnum

ECG fiducial labels

Source code in physiokit/ecg/defines.py
class EcgFiducial(IntEnum):
    """ECG fiducial labels"""

    p_wave = 8
    q_wave = 9
    q_trough = 10
    r_peak = 11
    rpr_peak = 12
    s_trough = 13
    j_point = 14
    qt_segment = 15  # end
    t_wave = 16  # end

EcgSegment

Bases: IntEnum

ECG Segment labels

Source code in physiokit/ecg/defines.py
class EcgSegment(IntEnum):
    """ECG Segment labels"""

    background = 0
    p_wave = 1
    qrs_complex = 2
    t_wave = 3
    u_wave = 4

    pr_segment = 11  # End of P-wave to start of QRS
    st_segment = 12  # End of QRS to start of T-wave
    tp_segment = 13  # End of T-wave to start of P-wave
    tp_overlap = 14  # T-on-P

physiokit.ecg.clean

ECG cleaning methods.

clean(data, lowcut=0.5, highcut=30, sample_rate=1000, order=3, axis=-1, forward_backward=True)

Clean ECG signal by applying biquad filter.

By default, a 3rd order Butterworth bandpass filter from 0.5 to 30 Hz is applied.

Parameters:

  • data (NDArray) –

    ECG signal.

  • lowcut (float, default: 0.5 ) –

    Lower cutoff in Hz. Defaults to 0.5 Hz.

  • highcut (float, default: 30 ) –

    Upper cutoff in Hz. Defaults to 30 Hz.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • order (int, default: 3 ) –

    Filter order. Defaults to 3 (3rd order Butterworth filter).

  • axis (int, default: -1 ) –

    Axis to apply against. Defaults to -1.

  • forward_backward (bool, default: True ) –

    Apply filter forward and backward. Defaults to True.

Returns:

  • NDArray

    npt.NDArray: Cleaned ECG signal.

Source code in physiokit/ecg/clean.py
def clean(
    data: npt.NDArray,
    lowcut: float = 0.5,
    highcut: float = 30,
    sample_rate: float = 1000,
    order: int = 3,
    axis: int = -1,
    forward_backward: bool = True,
) -> npt.NDArray:
    """Clean ECG signal by applying biquad filter.

    By default, a 3rd order Butterworth bandpass filter from 0.5 to 30 Hz is applied.

    Args:
        data (npt.NDArray): ECG signal.
        lowcut (float, optional): Lower cutoff in Hz. Defaults to 0.5 Hz.
        highcut (float, optional): Upper cutoff in Hz. Defaults to 30 Hz.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        order (int, optional): Filter order. Defaults to 3 (3rd order Butterworth filter).
        axis (int, optional): Axis to apply against. Defaults to -1.
        forward_backward (bool, optional): Apply filter forward and backward. Defaults to True.

    Returns:
        npt.NDArray: Cleaned ECG signal.
    """

    # Bandpass filter
    ecg_clean = filter_signal(
        data=data,
        lowcut=lowcut,
        highcut=highcut,
        sample_rate=sample_rate,
        order=order,
        axis=axis,
        forward_backward=forward_backward,
    )

    return ecg_clean

square_filter_mask(rr_ints, lowcut=300, highcut=900)

Mask out RR intervals that fall outside bounds.

Parameters:

  • rr_ints (NDArray) –

    RR-interval list in ms.

  • lowcut (float, default: 300 ) –

    Lower cutoff limit. Defaults to 300 ms.

  • highcut (float, default: 900 ) –

    Upper cutoff limit. Defaults to 900 ms.

Returns:

  • NDArray

    npt.NDArray: RR rejection mask 0=accept, 1=reject.

Source code in physiokit/ecg/clean.py
def square_filter_mask(rr_ints: npt.NDArray, lowcut: float = 300, highcut: float = 900) -> npt.NDArray:
    """Mask out RR intervals that fall outside bounds.

    Args:
        rr_ints (npt.NDArray): RR-interval list in ms.
        lowcut (float, optional): Lower cutoff limit. Defaults to 300 ms.
        highcut (float, optional): Upper cutoff limit. Defaults to 900 ms.

    Returns:
        npt.NDArray: RR rejection mask 0=accept, 1=reject.
    """
    rr_mask = np.where((rr_ints < lowcut) | (rr_ints > highcut), 1, 0)
    return rr_mask

physiokit.ecg.metrics

compute_heart_rate(data, sample_rate=1000, method='peak', **kwargs)

Compute heart rate from ECG signal.

Parameters:

  • data (array) –

    ECG signal.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • method (str, default: 'peak' ) –

    Method to compute heart rate. Defaults to 'peak'.

  • **kwargs (dict, default: {} ) –

    Keyword arguments to pass to method.

Returns:

  • tuple[float, float]

    tuple[float, float]: Heart rate in BPM and qos metric.

Source code in physiokit/ecg/metrics.py
def compute_heart_rate(
    data: npt.NDArray, sample_rate: float = 1000, method: str = "peak", **kwargs: dict
) -> tuple[float, float]:
    """Compute heart rate from ECG signal.

    Args:
        data (array): ECG signal.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        method (str, optional): Method to compute heart rate. Defaults to 'peak'.
        **kwargs (dict): Keyword arguments to pass to method.

    Returns:
        tuple[float, float]: Heart rate in BPM and qos metric.
    """
    match method:
        case "peak":
            bpm, qos = compute_heart_rate_from_peaks(data=data, sample_rate=sample_rate, **kwargs)
        case _:
            raise NotImplementedError(f"Heart rate computation method {method} not implemented.")
    # END MATH
    return bpm, qos

compute_heart_rate_from_peaks(data, sample_rate=1000, min_rr=0.3, max_rr=2.0, min_delta=0.3)

Compute heart rate from peaks of ECG signal.

Parameters:

  • data (array) –

    ECG signal.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

Returns:

  • tuple[float, float]

    tuple[float, float]: Heart rate in BPM and qos metric.

Source code in physiokit/ecg/metrics.py
def compute_heart_rate_from_peaks(
    data: npt.NDArray,
    sample_rate: float = 1000,
    min_rr: float = 0.3,
    max_rr: float = 2.0,
    min_delta: float | None = 0.3,
) -> tuple[float, float]:
    """Compute heart rate from peaks of ECG signal.

    Args:
        data (array): ECG signal.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.

    Returns:
        tuple[float, float]: Heart rate in BPM and qos metric.
    """
    peaks = find_peaks(data=data, sample_rate=sample_rate)
    rri = compute_rr_intervals(peaks=peaks)
    rmask = filter_rr_intervals(rr_ints=rri, sample_rate=sample_rate, min_rr=min_rr, max_rr=max_rr, min_delta=min_delta)
    bpm = 60 / (np.nanmean(rri[rmask == 0]) / sample_rate)
    qos = rmask[rmask == 0].size / rmask.size
    return bpm, qos

derive_respiratory_rate(peaks, rri=None, sample_rate=1000, method='rifv', lowcut=0.1, highcut=1.0, order=3, threshold=0.85, interpolate_method='linear')

Derive respiratory rate from ECG signal using given method.

Parameters:

  • peaks (array) –

    QRS peaks of ECG signal.

  • rri (array, default: None ) –

    RR intervals. Defaults to None.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • method (str, default: 'rifv' ) –

    Method to compute respiratory rate. Defaults to 'riav'.

  • lowcut (float, default: 0.1 ) –

    Lowcut frequency in Hz. Defaults to 0.1 Hz.

  • highcut (float, default: 1.0 ) –

    Highcut frequency in Hz. Defaults to 1.0 Hz.

  • order (int, default: 3 ) –

    Order of filter. Defaults to 3.

  • threshold (float, default: 0.85 ) –

    Threshold for peak detection. Defaults to 0.85.

Returns:

  • tuple[float, float]

    tuple[float, float]: Respiratory rate in BPM and qos metric.

Source code in physiokit/ecg/metrics.py
def derive_respiratory_rate(
    peaks: npt.NDArray,
    rri: npt.NDArray | None = None,
    sample_rate: float = 1000,
    method: Literal["rifv"] = "rifv",
    lowcut: float = 0.1,
    highcut: float = 1.0,
    order: int = 3,
    threshold: float | None = 0.85,
    interpolate_method: str = "linear",
) -> tuple[float, float]:
    """Derive respiratory rate from ECG signal using given method.

    Args:
        peaks (array): QRS peaks of ECG signal.
        rri (array, optional): RR intervals. Defaults to None.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        method (str, optional): Method to compute respiratory rate. Defaults to 'riav'.
        lowcut (float, optional): Lowcut frequency in Hz. Defaults to 0.1 Hz.
        highcut (float, optional): Highcut frequency in Hz. Defaults to 1.0 Hz.
        order (int, optional): Order of filter. Defaults to 3.
        threshold (float, optional): Threshold for peak detection. Defaults to 0.85.

    Returns:
        tuple[float, float]: Respiratory rate in BPM and qos metric.
    """
    if peaks.size < 4:
        raise ValueError("At least 4 peaks are required to compute respiratory rate")

    ts = np.arange(peaks[0], peaks[-1], 1)
    match method:
        case "rifv":
            rsp = rri
        case _:
            raise ValueError(f"Method {method} not implemented")
    rsp = scipy.interpolate.interp1d(peaks, rsp, kind=interpolate_method, fill_value="extrapolate")(ts)
    rsp = filter_signal(rsp, lowcut=lowcut, highcut=highcut, sample_rate=sample_rate, order=order)

    freqs, rsp_sp = compute_fft(rsp, sample_rate=sample_rate)
    l_idx = np.where(freqs >= lowcut)[0][0]
    r_idx = np.where(freqs >= highcut)[0][0]
    rsp_ps = 2 * np.abs(rsp_sp)
    freqs = freqs[l_idx:r_idx]
    rsp_ps = rsp_ps[l_idx:r_idx]

    fft_pk_idx = np.argmax(rsp_ps)
    if threshold is not None:
        fft_pk_indices = np.where(rsp_ps > threshold * rsp_ps[fft_pk_idx])[0]
    else:
        fft_pk_indices = [fft_pk_idx]

    rsp_bpm_weights = rsp_ps[fft_pk_indices]
    tgt_pwr = np.sum(rsp_bpm_weights)
    rsp_bpm = 60 * np.sum(rsp_bpm_weights * freqs[fft_pk_indices]) / tgt_pwr
    qos = tgt_pwr / np.mean(rsp_ps)

    return rsp_bpm, qos

physiokit.ecg.peaks

compute_rr_intervals(peaks)

Compute RR intervals from R peaks.

Parameters:

  • peaks (array) –

    R peaks.

Returns:

  • NDArray

    npt.NDArray: RR intervals.

Source code in physiokit/ecg/peaks.py
def compute_rr_intervals(
    peaks: npt.NDArray,
) -> npt.NDArray:
    """Compute RR intervals from R peaks.

    Args:
        peaks (array): R peaks.

    Returns:
        npt.NDArray: RR intervals.
    """

    rr_ints = np.diff(peaks)
    if rr_ints.size == 0:
        return rr_ints
    rr_ints = np.hstack((rr_ints[0], rr_ints))
    return rr_ints

filter_peaks(peaks, sample_rate=1000, min_rr=0.3, max_rr=2.0, min_delta=0.3)

Filter out peaks with RR intervals outside of normal range.

Parameters:

  • peaks (array) –

    R peaks.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • min_rr (float, default: 0.3 ) –

    Minimum RR interval in seconds. Defaults to 0.3 s.

  • max_rr (float, default: 2.0 ) –

    Maximum RR interval in seconds. Defaults to 2.0 s.

  • min_delta (float, default: 0.3 ) –

    Minimum RR interval delta. Defaults to 0.3.

Returns:

  • NDArray

    npt.NDArray: Filtered peaks.

Source code in physiokit/ecg/peaks.py
def filter_peaks(
    peaks: npt.NDArray,
    sample_rate: float = 1000,
    min_rr: float = 0.3,
    max_rr: float = 2.0,
    min_delta: float | None = 0.3,
) -> npt.NDArray:
    """Filter out peaks with RR intervals outside of normal range.

    Args:
        peaks (array): R peaks.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        min_rr (float, optional): Minimum RR interval in seconds. Defaults to 0.3 s.
        max_rr (float, optional): Maximum RR interval in seconds. Defaults to 2.0 s.
        min_delta (float, optional): Minimum RR interval delta. Defaults to 0.3.

    Returns:
        npt.NDArray: Filtered peaks.
    """

    # Capture RR intervals
    rr_ints = np.diff(peaks)
    rr_ints = np.hstack((rr_ints[0], rr_ints))

    # Filter out peaks with RR intervals outside of normal range
    rr_mask = np.where((rr_ints < min_rr * sample_rate) | (rr_ints > max_rr * sample_rate), 1, 0)

    # Filter out peaks that deviate more than delta
    if min_delta is not None:
        rr_mask = quotient_filter_mask(rr_ints, mask=rr_mask, lowcut=1 - min_delta, highcut=1 + min_delta)
    filt_peaks = peaks[np.where(rr_mask == 0)[0]]
    return filt_peaks

filter_rr_intervals(rr_ints, sample_rate=1000, min_rr=0.3, max_rr=2.0, min_delta=0.3)

Filter out peaks with RR intervals outside of normal range.

Parameters:

  • rr_ints (array) –

    RR intervals.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • min_rr (float, default: 0.3 ) –

    Minimum RR interval in seconds. Defaults to 0.3 s.

  • max_rr (float, default: 2.0 ) –

    Maximum RR interval in seconds. Defaults to 2.0 s.

  • min_delta (float, default: 0.3 ) –

    Minimum RR interval delta. Defaults to 0.3.

Returns:

  • NDArray

    npt.NDArray: RR interval mask.

Source code in physiokit/ecg/peaks.py
def filter_rr_intervals(
    rr_ints: npt.NDArray,
    sample_rate: float = 1000,
    min_rr: float = 0.3,
    max_rr: float = 2.0,
    min_delta: float | None = 0.3,
) -> npt.NDArray:
    """Filter out peaks with RR intervals outside of normal range.

    Args:
        rr_ints (array): RR intervals.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        min_rr (float, optional): Minimum RR interval in seconds. Defaults to 0.3 s.
        max_rr (float, optional): Maximum RR interval in seconds. Defaults to 2.0 s.
        min_delta (float, optional): Minimum RR interval delta. Defaults to 0.3.

    Returns:
        npt.NDArray: RR interval mask.
    """
    if rr_ints.size == 0:
        return np.array([])

    # Filter out peaks with RR intervals outside of normal range
    rr_mask = np.where((rr_ints < min_rr * sample_rate) | (rr_ints > max_rr * sample_rate), 1, 0)

    # Filter out peaks that deviate more than delta
    if min_delta is not None:
        rr_mask = quotient_filter_mask(rr_ints, mask=rr_mask, lowcut=1 - min_delta, highcut=1 + min_delta)

    return rr_mask

find_peaks(data, sample_rate=1000, qrs_window=0.1, avg_window=1.0, qrs_prom_weight=1.5, qrs_min_len_weight=0.4, qrs_min_delay=0.3)

Find R peaks in ECG signal using QRS gradient method.

Parameters:

  • data (array) –

    ECG signal.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • qrs_window (float, default: 0.1 ) –

    Window size in seconds to compute QRS gradient. Defaults to 0.1 s.

  • avg_window (float, default: 1.0 ) –

    Window size in seconds to compute average gradient. Defaults to 1.0 s.

  • qrs_prom_weight (float, default: 1.5 ) –

    Weight to compute minimum QRS height. Defaults to 1.5.

  • qrs_min_len_weight (float, default: 0.4 ) –

    Weight to compute minimum QRS length. Defaults to 0.4.

  • qrs_min_delay (float, default: 0.3 ) –

    Minimum delay between QRS complexes. Defaults to 0.3 s.

Returns:

  • NDArray

    npt.NDArray: R peaks.

Source code in physiokit/ecg/peaks.py
def find_peaks(
    data: npt.NDArray,
    sample_rate: float = 1000,
    qrs_window: float = 0.1,
    avg_window: float = 1.0,
    qrs_prom_weight: float = 1.5,
    qrs_min_len_weight: float = 0.4,
    qrs_min_delay: float = 0.3,
) -> npt.NDArray:
    """Find R peaks in ECG signal using QRS gradient method.

    Args:
        data (array): ECG signal.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        qrs_window (float, optional): Window size in seconds to compute QRS gradient. Defaults to 0.1 s.
        avg_window (float, optional): Window size in seconds to compute average gradient. Defaults to 1.0 s.
        qrs_prom_weight (float, optional): Weight to compute minimum QRS height. Defaults to 1.5.
        qrs_min_len_weight (float, optional): Weight to compute minimum QRS length. Defaults to 0.4.
        qrs_min_delay (float, optional): Minimum delay between QRS complexes. Defaults to 0.3 s.

    Returns:
        npt.NDArray: R peaks.
    """

    # Identify start and end of QRS complexes.
    qrs = (
        moving_gradient_filter(
            data, sample_rate=sample_rate, sig_window=qrs_window, avg_window=avg_window, sig_prom_weight=qrs_prom_weight
        )
        > 0
    )

    beg_qrs = np.where(np.logical_and(np.logical_not(qrs[0:-1]), qrs[1:]))[0]
    end_qrs = np.where(np.logical_and(qrs[0:-1], np.logical_not(qrs[1:])))[0]
    end_qrs = end_qrs[end_qrs > beg_qrs[0]]

    num_qrs = min(beg_qrs.size, end_qrs.size)
    min_qrs_len = np.mean(end_qrs[:num_qrs] - beg_qrs[:num_qrs]) * qrs_min_len_weight
    min_qrs_delay = int(np.rint(qrs_min_delay * sample_rate))

    peaks = []
    for i in range(num_qrs):
        beg, end = beg_qrs[i], end_qrs[i]
        peak = beg + np.argmax(data[beg:end])
        qrs_len = end - beg
        qrs_delay = peak - peaks[-1] if peaks else min_qrs_delay

        # Enforce minimum delay between peaks
        if qrs_delay < min_qrs_delay or qrs_len < min_qrs_len:
            continue
        peaks.append(peak)
    # END FOR

    return np.array(peaks, dtype=int)

physiokit.ecg.segment

apply_segmentation(data, sample_rate=1000)

Apply segmentation to ECG signal.

Source code in physiokit/ecg/segment.py
def apply_segmentation(
    data: npt.NDArray,
    sample_rate: float = 1000,
    # lead: int|None = None
) -> npt.NDArray:
    """Apply segmentation to ECG signal."""
    segs = np.zeros(data.size, dtype=int)
    segs[:] = EcgSegment.background

    qrs_segs: list[tuple[int, int, int]] = []
    p_segs: list[tuple[int, int, int]] = []
    t_segs: list[tuple[int, int, int]] = []

    # Identify QRS segments and peaks
    # qrs = clean_ecg(data, sample_rate=sample_rate, lowcut=10.0, highcut=30.0)
    qrs_segs = locate_qrs(data, sample_rate=sample_rate)
    # Extract nominal RR interval, filter out R peaks (mark as noise)
    # For each QRS segment, extract P wave and T wave
    for qrs_seg in qrs_segs:
        print("QRS", qrs_seg)
        segs[qrs_seg[0] : qrs_seg[2]] = EcgSegment.qrs_complex
        # Extract P wave and T wave
        print("P-wave")
        p_seg = locate_pwave_from_qrs_anchor(data, qrs_seg, sample_rate=sample_rate)
        if p_seg:
            segs[p_seg[0] : p_seg[2]] = EcgSegment.p_wave
            p_segs.append(p_seg)
        # END IF
        print("T-wave")
        t_seg = locate_twave_from_qrs_anchor(data, qrs_seg, sample_rate=sample_rate)
        if t_seg:
            segs[t_seg[0] : t_seg[2]] = EcgSegment.t_wave
            t_segs.append(t_seg)
        # END IF
    # END FOR
    return segs

find_pwave()

Find P wave in ECG signal

Source code in physiokit/ecg/segment.py
def find_pwave():
    """Find P wave in ECG signal"""
    raise NotImplementedError()

find_qrs()

Find QRS complex in ECG signal

Source code in physiokit/ecg/segment.py
def find_qrs():
    """Find QRS complex in ECG signal"""
    raise NotImplementedError()

find_twave()

Find T wave in ECG signal

Source code in physiokit/ecg/segment.py
def find_twave():
    """Find T wave in ECG signal"""
    raise NotImplementedError()

locate_pwave_from_qrs_anchor(data, qrs_seg, sample_rate=1000, wave_window=0.1, avg_window=0.3, wave_prom_weight=1.0, wave_min_window=0.01)

Locate P wave in ECG signal using QRS anchor method.

Parameters:

  • data (array) –

    ECG signal.

  • qrs_seg (tuple[int, int, int]) –

    QRS segment.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • wave_window (float, default: 0.1 ) –

    Window size in seconds to compute wave gradient. Defaults to 0.1 s.

  • avg_window (float, default: 0.3 ) –

    Window size in seconds to compute average gradient. Defaults to 0.3 s.

  • wave_prom_weight (float, default: 1.0 ) –

    Weight to compute minimum wave height. Defaults to 1.0.

  • wave_min_window (float, default: 0.01 ) –

    Minimum wave length in seconds. Defaults to 0.05 s.

Returns:

  • tuple[int, int, int] | None

    tuple[int, int, int] | None: Wave onset, peak, and offset.

Source code in physiokit/ecg/segment.py
def locate_pwave_from_qrs_anchor(
    data: npt.NDArray,
    qrs_seg: tuple[int, int, int],
    sample_rate: float = 1000,
    wave_window: float = 0.1,
    avg_window: float = 0.3,
    wave_prom_weight: float = 1.0,
    wave_min_window: float = 0.01,
) -> tuple[int, int, int] | None:
    """Locate P wave in ECG signal using QRS anchor method.

    Args:
        data (array): ECG signal.
        qrs_seg (tuple[int, int, int]): QRS segment.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        wave_window (float, optional): Window size in seconds to compute wave gradient. Defaults to 0.1 s.
        avg_window (float, optional): Window size in seconds to compute average gradient. Defaults to 0.3 s.
        wave_prom_weight (float, optional): Weight to compute minimum wave height. Defaults to 1.0.
        wave_min_window (float, optional): Minimum wave length in seconds. Defaults to 0.05 s.

    Returns:
        tuple[int, int, int] | None: Wave onset, peak, and offset.
    """
    # Grab window from end of QRS to 300 ms before (PQ interval)
    pq_window = int(np.rint(0.3 * sample_rate))
    # roi_offset = qrs_seg[2]
    # roi_onset = max(0, roi_offset - pq_window)
    roi_offset = qrs_seg[0]
    roi_onset = max(0, roi_offset - pq_window)

    roi = data[roi_onset:roi_offset].copy()

    # Zero out QRS region
    # qrs_offset = qrs_seg[0] - roi_onset
    # roi[qrs_offset:] = roi[qrs_offset]

    wave = _locate_wave_in_region(
        roi,
        sample_rate=sample_rate,
        wave_window=wave_window,
        avg_window=avg_window,
        wave_prom_weight=wave_prom_weight,
        wave_min_window=wave_min_window,
        reverse=True,
    )
    if wave:
        return (roi_onset + wave[0], roi_onset + wave[1], roi_onset + wave[2])
    return None

locate_qrs(data, sample_rate=1000, qrs_window=0.1, avg_window=1.0, qrs_prom_weight=1.5, qrs_min_len_weight=0.4, qrs_min_delay=0.3)

Find QRS segments in ECG signal using QRS gradient method.

Parameters:

  • data (array) –

    ECG signal.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • qrs_window (float, default: 0.1 ) –

    Window size in seconds to compute QRS gradient. Defaults to 0.1 s.

  • avg_window (float, default: 1.0 ) –

    Window size in seconds to compute average gradient. Defaults to 1.0 s.

  • qrs_prom_weight (float, default: 1.5 ) –

    Weight to compute minimum QRS height. Defaults to 1.5.

  • qrs_min_len_weight (float, default: 0.4 ) –

    Weight to compute minimum QRS length. Defaults to 0.4.

  • qrs_min_delay (float, default: 0.3 ) –

    Minimum delay between QRS complexes. Defaults to 0.3 s.

Returns:

  • tuple[NDArray]

    tuple[npt.NDArray]: QRS segments

Source code in physiokit/ecg/segment.py
def locate_qrs(
    data: npt.NDArray,
    sample_rate: float = 1000,
    qrs_window: float = 0.1,
    avg_window: float = 1.0,
    qrs_prom_weight: float = 1.5,
    qrs_min_len_weight: float = 0.4,
    qrs_min_delay: float = 0.3,
) -> tuple[npt.NDArray]:
    """Find QRS segments in ECG signal using QRS gradient method.

    Args:
        data (array): ECG signal.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        qrs_window (float, optional): Window size in seconds to compute QRS gradient. Defaults to 0.1 s.
        avg_window (float, optional): Window size in seconds to compute average gradient. Defaults to 1.0 s.
        qrs_prom_weight (float, optional): Weight to compute minimum QRS height. Defaults to 1.5.
        qrs_min_len_weight (float, optional): Weight to compute minimum QRS length. Defaults to 0.4.
        qrs_min_delay (float, optional): Minimum delay between QRS complexes. Defaults to 0.3 s.

    Returns:
        tuple[npt.NDArray]: QRS segments
    """

    # Identify start and end of QRS complexes.
    qrs = (
        moving_gradient_filter(
            data, sample_rate=sample_rate, sig_window=qrs_window, avg_window=avg_window, sig_prom_weight=qrs_prom_weight
        )
        > 0
    )
    beg_qrs = np.where(np.logical_and(np.logical_not(qrs[0:-1]), qrs[1:]))[0]
    end_qrs = np.where(np.logical_and(qrs[0:-1], np.logical_not(qrs[1:])))[0]
    end_qrs = end_qrs[end_qrs > beg_qrs[0]]

    num_qrs = min(beg_qrs.size, end_qrs.size)
    min_qrs_len = int(np.mean(end_qrs[:num_qrs] - beg_qrs[:num_qrs]) * qrs_min_len_weight)
    min_qrs_delay = int(np.rint(qrs_min_delay * sample_rate))

    peaks: list[tuple(int, int, int)] = []
    for i in range(num_qrs):
        beg, end = beg_qrs[i], end_qrs[i]
        peak = beg + np.argmax(data[beg:end])
        qrs_len = int(end - beg)
        qrs_delay = peak - peaks[-1][1] if peaks else min_qrs_delay

        # Enforce minimum delay between peaks
        if qrs_delay < min_qrs_delay or qrs_len < min_qrs_len:
            continue
        peaks.append((beg, peak, end))
    # END FOR

    return np.array(peaks, dtype=int)

locate_twave_from_qrs_anchor(data, qrs_seg, sample_rate=1000, wave_window=0.3, avg_window=0.4, wave_prom_weight=1.0, wave_min_window=0.1)

Locate T wave in ECG signal using QRS anchor method.

Parameters:

  • data (array) –

    ECG signal.

  • qrs_seg (tuple[int, int, int]) –

    QRS segment.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • wave_window (float, default: 0.3 ) –

    Window size in seconds to compute wave gradient. Defaults to 0.3 s.

  • avg_window (float, default: 0.4 ) –

    Window size in seconds to compute average gradient. Defaults to 0.6 s.

  • wave_prom_weight (float, default: 1.0 ) –

    Weight to compute minimum wave height. Defaults to 1.0.

  • wave_min_window (float, default: 0.1 ) –

    Minimum wave length in seconds. Defaults to 0.1 s.

Returns:

  • tuple[int, int, int] | None

    tuple[int, int, int] | None: Wave onset, peak, and offset.

Source code in physiokit/ecg/segment.py
def locate_twave_from_qrs_anchor(
    data: npt.NDArray,
    qrs_seg: tuple[int, int, int],
    sample_rate: float = 1000,
    wave_window: float = 0.3,
    avg_window: float = 0.4,
    wave_prom_weight: float = 1.0,
    wave_min_window: float = 0.1,
) -> tuple[int, int, int] | None:
    """Locate T wave in ECG signal using QRS anchor method.

    Args:
        data (array): ECG signal.
        qrs_seg (tuple[int, int, int]): QRS segment.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        wave_window (float, optional): Window size in seconds to compute wave gradient. Defaults to 0.3 s.
        avg_window (float, optional): Window size in seconds to compute average gradient. Defaults to 0.6 s.
        wave_prom_weight (float, optional): Weight to compute minimum wave height. Defaults to 1.0.
        wave_min_window (float, optional): Minimum wave length in seconds. Defaults to 0.1 s.

    Returns:
        tuple[int, int, int] | None: Wave onset, peak, and offset.
    """

    # Grab window from end of QRS to 400 ms after (ST interval)
    st_window = int(np.rint(0.4 * sample_rate))
    roi_onset = qrs_seg[2]
    roi_offset = min(roi_onset + st_window, data.size)
    # roi_onset = qrs_seg[0]
    # roi_offset = min(roi_onset + qt_window, data.size)
    roi = data[roi_onset:roi_offset].copy()

    # Zero out QRS region
    # qrs_offset = qrs_seg[2] - roi_onset
    # roi[:qrs_offset] = roi[qrs_offset]

    wave = _locate_wave_in_region(
        roi,
        sample_rate=sample_rate,
        wave_window=wave_window,
        avg_window=avg_window,
        wave_prom_weight=wave_prom_weight,
        wave_min_window=wave_min_window,
    )
    if wave:
        return (roi_onset + wave[0], roi_onset + wave[1], roi_onset + wave[2])
    return None

physiokit.ecg.synthesize

simulate_daubechies(signal_length=10000, sample_rate=1000, heart_rate=70)

Generate an artificial (synthetic) ECG signal of a given duration and sampling rate.

It uses a 'Daubechies' wavelet that roughly approximates a single cardiac cycle. This function is based on this script <https://github.com/diarmaidocualain/ecg_simulation>_.

Parameters:

  • signal_length (int, default: 10000 ) –

    Length of the ECG signal. Defaults to 10000.

  • sample_rate (float, default: 1000 ) –

    ECG sampling frequency. Defaults to 1000.

  • heart_rate (float, default: 70 ) –

    Heart rate in BPM. Defaults to 70.

Returns:

  • NDArray

    npt.NDArray: Synthetic ECG signal

Source code in physiokit/ecg/synthesize.py
def simulate_daubechies(signal_length: int = 10000, sample_rate: float = 1000, heart_rate: float = 70) -> npt.NDArray:
    """Generate an artificial (synthetic) ECG signal of a given duration and sampling rate.

    It uses a 'Daubechies' wavelet that roughly approximates a single cardiac cycle.
    This function is based on `this script <https://github.com/diarmaidocualain/ecg_simulation>`_.

    Args:
        signal_length (int, optional): Length of the ECG signal. Defaults to 10000.
        sample_rate (float, optional): ECG sampling frequency. Defaults to 1000.
        heart_rate (float, optional): Heart rate in BPM. Defaults to 70.

    Returns:
        npt.NDArray: Synthetic ECG signal

    """
    duration = signal_length / sample_rate

    # The "Daubechies" wavelet is a rough approximation to a real, single, cardiac cycle
    cardiac = scipy.signal.daub(10)

    # Add the gap after the pqrst when the heart is resting.
    cardiac = np.concatenate([cardiac, np.zeros(10)])

    # Caculate the number of beats in capture time period
    num_heart_beats = int(duration * heart_rate / 60)

    # Concatenate together the number of heart beats needed
    ecg = np.tile(cardiac, num_heart_beats)

    # Change amplitude
    ecg = ecg * 10

    # Resample
    ecg = scipy.ndimage.zoom(ecg, signal_length / len(ecg))

    ecg = ecg[:signal_length]

    return ecg

simulate_ecgsyn(signal_length=10000, sample_rate=256, leads=12, heart_rate=60, hr_std=1, lfhfratio=0.5, sfint=512, ti=(-70, -15, 0, 15, 100), ai=(1.2, -5, 30, -7.5, 0.75), bi=(0.25, 0.1, 0.1, 0.1, 0.4), gamma=None)

Simulate ECG using the ECGSYN algorithm.

This function is a python translation of the matlab script by McSharry & Clifford (2013) <https://physionet.org/content/ecgsyn>_.

Parameters:

  • signal_length (int, default: 10000 ) –

    Length of the ECG signal. Defaults to 10000.

  • sample_rate (float, default: 256 ) –

    ECG sampling frequency. Defaults to 256.

  • heart_rate (float, default: 60 ) –

    Mean heart rate. Defaults to 60.

  • hr_std (float, default: 1 ) –

    Heart rate standard deviation. Defaults to 1.

  • lfhfratio (float, default: 0.5 ) –

    Low frequency high frequency ratio. Defaults to 0.5.

  • sfint (float, default: 512 ) –

    Internal sampling frequency. Defaults to 512.

  • ti (tuple[int], default: (-70, -15, 0, 15, 100) ) –

    Time parameters. Defaults to (-70, -15, 0, 15, 100).

  • ai (tuple[float], default: (1.2, -5, 30, -7.5, 0.75) ) –

    Amplitude parameters. Defaults to (1.2, -5, 30, -7.5, 0.75).

  • bi (tuple[float], default: (0.25, 0.1, 0.1, 0.1, 0.4) ) –

    Width parameters. Defaults to (0.25, 0.1, 0.1, 0.1, 0.4).

  • gamma (NDArray | None, default: None ) –

    Lead modification matrix. Defaults to None.

Returns:

  • NDArray

    tuple[list[npt.NDArray], list[npt.NDArray]]: ECG signals and results

Source code in physiokit/ecg/synthesize.py
def simulate_ecgsyn(
    signal_length: int = 10000,
    sample_rate: float = 256,
    leads: int = 12,
    heart_rate: float = 60,
    hr_std: float = 1,
    lfhfratio: float = 0.5,
    sfint: float = 512,
    ti: tuple[int] = (-70, -15, 0, 15, 100),
    ai: tuple[float] = (1.2, -5, 30, -7.5, 0.75),
    bi: tuple[float] = (0.25, 0.1, 0.1, 0.1, 0.4),
    gamma: npt.NDArray | None = None,
) -> npt.NDArray:
    """Simulate ECG using the ECGSYN algorithm.

    This function is a python translation of the matlab script by `McSharry & Clifford (2013)
    <https://physionet.org/content/ecgsyn>`_.

    Args:
        signal_length (int, optional): Length of the ECG signal. Defaults to 10000.
        sample_rate (float, optional): ECG sampling frequency. Defaults to 256.
        heart_rate (float, optional): Mean heart rate. Defaults to 60.
        hr_std (float, optional): Heart rate standard deviation. Defaults to 1.
        lfhfratio (float, optional): Low frequency high frequency ratio. Defaults to 0.5.
        sfint (float, optional): Internal sampling frequency. Defaults to 512.
        ti (tuple[int], optional): Time parameters. Defaults to (-70, -15, 0, 15, 100).
        ai (tuple[float], optional): Amplitude parameters. Defaults to (1.2, -5, 30, -7.5, 0.75).
        bi (tuple[float], optional): Width parameters. Defaults to (0.25, 0.1, 0.1, 0.1, 0.4).
        gamma (npt.NDArray|None, optional): Lead modification matrix. Defaults to None.

    Returns:
        tuple[list[npt.NDArray], list[npt.NDArray]]: ECG signals and results

    """
    if gamma is None:
        gamma = np.array(
            [
                [1, 0.1, 1, 1.2, 1],
                [2, 0.2, 0.2, 0.2, 3],
                [1, -0.1, -0.8, -1.1, 2.5],
                [-1, -0.05, -0.8, -0.5, -1.2],
                [0.05, 0.05, 1, 1, 1],
                [1, -0.05, -0.1, -0.1, 3],
                [-0.5, 0.05, 0.2, 0.5, 1],
                [0.05, 0.05, 1.3, 2.5, 2],
                [1, 0.05, 1, 2, 1],
                [1.2, 0.05, 1, 2, 2],
                [1.5, 0.1, 0.8, 1, 2],
                [1.8, 0.05, 0.5, 0.1, 2],
            ]
        )
    # END IF

    if not isinstance(ti, np.ndarray):
        ti = np.array(ti)
    if not isinstance(ai, np.ndarray):
        ai = np.array(ai)
    if not isinstance(bi, np.ndarray):
        bi = np.array(bi)

    duration = signal_length / sample_rate

    # Number of beats
    N = int(np.round(duration * (heart_rate / 60)))

    ti = ti * np.pi / 180

    # Adjust extrema parameters for mean heart rate
    hrfact = np.sqrt(heart_rate / 60)
    hrfact2 = np.sqrt(hrfact)
    bi = hrfact * bi
    ti = np.array([hrfact2, hrfact, 1, hrfact, hrfact2]) * ti

    # Check that sfint is an integer multiple of sfecg
    q = np.round(sfint / sample_rate)
    qd = sfint / sample_rate
    if q != qd:
        raise ValueError(
            "Internal sampling frequency (sfint) must be an integer multiple of the ECG sampling frequency"
            " (sfecg). Your current choices are: sfecg = " + str(sample_rate) + " and sfint = " + str(sfint) + "."
        )

    # Define frequency parameters for rr process
    # flo and fhi correspond to the Mayer waves and respiratory rate respectively
    flo = 0.1
    fhi = 0.25
    flostd = 0.01
    fhistd = 0.01

    # Calculate time scales for rr and total output
    sfrr = 1
    trr = 1 / sfrr
    rrmean = 60 / heart_rate
    n = 2 ** (np.ceil(np.log2(N * rrmean / trr)))

    rr0 = _ecg_simulate_rrprocess(flo, fhi, flostd, fhistd, lfhfratio, heart_rate, hr_std, sfrr, n)

    # Upsample rr time series from 1 Hz to sfint Hz
    desired_length = int(np.round(len(rr0) * sfint / 1))
    rr = scipy.ndimage.zoom(rr0, desired_length / len(rr0))

    # Make the rrn time series
    dt = 1 / sfint
    rrn = np.zeros(len(rr))
    tecg = 0
    i = 0
    while i < len(rr):
        tecg += rr[i]
        ip = int(np.round(tecg / dt))
        rrn[i:ip] = rr[i]
        i = ip
    Nt = ip

    # Integrate system using fourth order Runge-Kutta
    x0 = np.array([1, 0, 0.04])

    # tspan is a tuple of (min, max) which defines the lower and upper bound of t in ODE
    # t_eval is the list of desired t points for ODE
    # in Matlab, ode45 can accepts both tspan and t_eval in one argument
    Tspan = [0, (Nt - 1) * dt]
    t_eval = np.linspace(0, (Nt - 1) * dt, Nt)

    # Initialize results containers
    results = []
    signals = []

    # Multichannel modification (#625):
    # --------------------------------------------------
    # Loop over the twelve leads modifying ai in the loop to generate each lead's data
    # Because these are all starting at the same position, it may make sense to grab a random
    # segment within the series to simulate random phase and to forget the initial conditions

    for i in range(leads):
        gamma_row = gamma[i, :]
        result = scipy.integrate.solve_ivp(
            fun=functools.partial(_ecg_simulate_derivsecgsyn, rr=rrn, ti=ti, sfint=sfint, ai=gamma_row * ai, bi=bi),
            t_span=Tspan,
            y0=x0,
            t_eval=t_eval,
        )
        results.append(result)
        X0 = result.y  # get signal

        # downsample to required sfecg
        X = X0[:, np.arange(0, X0.shape[1], q).astype(int)]

        # Scale signal to lie between -0.4 and 1.2 mV
        z = X[2, :].copy()
        zmin = np.min(z)
        zmax = np.max(z)
        zrange = zmax - zmin
        z = (z - zmin) * 1.6 / zrange - 0.4

        signals.append(z)
    # END FOR

    signals = np.hstack(signals)

    signals = signals[:signal_length]
    return signals

synthesize(signal_length=10000, sample_rate=1000, leads=12, heart_rate=60, preset=EcgPreset.SR, noise_multiplier=1.0, impedance=1.0, p_multiplier=1.0, t_multiplier=1.0, voltage_factor=300)

Generate synthetic ECG signal via brisk method.

Utilize pk.signal.noise methods to make more realistic.

Leads are indexed as follows

["I", "II", "III", "aVR", "aVL", "aVF", "V1", "V2", "V3", "V4", "V5", "V6"]

Parameters:

  • signal_length (int, default: 10000 ) –

    Length of the ECG signal. Defaults to 10000.

  • sample_rate (float, default: 1000 ) –

    ECG sampling frequency. Defaults to 1000.

  • leads (int, default: 12 ) –

    Number of leads. Defaults to 12.

  • heart_rate (float, default: 60 ) –

    Mean heart rate. Defaults to 60.

  • preset (EcgPreset, default: SR ) –

    ECG preset. Defaults to EcgPreset.SR.

  • noise_multiplier (float, default: 1.0 ) –

    Noise multiplier. Defaults to 1.0.

  • impedance (float, default: 1.0 ) –

    Impedance. Defaults to 1.0.

  • p_multiplier (float, default: 1.0 ) –

    P wave multiplier. Defaults to 1.0.

  • t_multiplier (float, default: 1.0 ) –

    T wave multiplier. Defaults to 1.0.

  • voltage_factor (float, default: 300 ) –

    Voltage factor. Defaults to 300.

Returns:

  • tuple[NDArray, NDArray, NDArray]

    npt.NDArray: Synthetic ECG signals

Source code in physiokit/ecg/synthesize.py
def synthesize(
    signal_length: int = 10000,
    sample_rate: float = 1000,
    leads: int = 12,
    heart_rate: float = 60,
    preset: EcgPreset = EcgPreset.SR,
    noise_multiplier: float = 1.0,
    impedance: float = 1.0,
    p_multiplier: float = 1.0,
    t_multiplier: float = 1.0,
    voltage_factor: float = 300,
) -> tuple[npt.NDArray, npt.NDArray, npt.NDArray]:
    """Generate synthetic ECG signal via brisk method.

    Utilize pk.signal.noise methods to make more realistic.

    Leads are indexed as follows:
        ["I", "II", "III", "aVR", "aVL", "aVF", "V1", "V2", "V3", "V4", "V5", "V6"]

    Args:
        signal_length (int, optional): Length of the ECG signal. Defaults to 10000.
        sample_rate (float, optional): ECG sampling frequency. Defaults to 1000.
        leads (int, optional): Number of leads. Defaults to 12.
        heart_rate (float, optional): Mean heart rate. Defaults to 60.
        preset (EcgPreset, optional): ECG preset. Defaults to EcgPreset.SR.
        noise_multiplier (float, optional): Noise multiplier. Defaults to 1.0.
        impedance (float, optional): Impedance. Defaults to 1.0.
        p_multiplier (float, optional): P wave multiplier. Defaults to 1.0.
        t_multiplier (float, optional): T wave multiplier. Defaults to 1.0.
        voltage_factor (float, optional): Voltage factor. Defaults to 300.

    Returns:
        npt.NDArray: Synthetic ECG signals
    """

    _, ecg, segs, fids, _ = simulate_brisk(
        signal_length=signal_length,
        sample_rate=sample_rate,
        leads=leads,
        heart_rate=heart_rate,
        preset=preset,
        noise_multiplier=noise_multiplier,
        impedance=impedance,
        p_multiplier=p_multiplier,
        t_multiplier=t_multiplier,
        voltage_factor=voltage_factor,
    )

    return ecg, segs, fids