Skip to content

Signal Processing

This submodule contains lots of underlying signal processing functionality used by the higher-level modules. This includes filtering routines, noise injection, and transformations.

physiokit.signal.filter

filter_signal(data, lowcut=None, highcut=None, sample_rate=1000, order=2, axis=-1, forward_backward=True)

Apply SOS filter to signal using butterworth design and cascaded filter.

Parameters:

  • data (NDArray) –

    Signal

  • lowcut (float | None, default: None ) –

    Lower cutoff in Hz. Defaults to None.

  • highcut (float | None, default: None ) –

    Upper cutoff in Hz. Defaults to None.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz Defaults to 1000 Hz.

  • order (int, default: 2 ) –

    Filter order. Defaults to 2.

  • forward_backward (bool, default: True ) –

    Apply filter forward and backwards. Defaults to True.

Returns:

  • NDArray

    npt.NDArray: Filtered signal

Source code in physiokit/signal/filter.py
def filter_signal(
    data: npt.NDArray,
    lowcut: float | None = None,
    highcut: float | None = None,
    sample_rate: float = 1000,
    order: int = 2,
    axis: int = -1,
    forward_backward: bool = True,
) -> npt.NDArray:
    """Apply SOS filter to signal using butterworth design and cascaded filter.

    Args:
        data (npt.NDArray): Signal
        lowcut (float|None): Lower cutoff in Hz. Defaults to None.
        highcut (float|None): Upper cutoff in Hz. Defaults to None.
        sample_rate (float): Sampling rate in Hz Defaults to 1000 Hz.
        order (int, optional): Filter order. Defaults to 2.
        forward_backward (bool, optional): Apply filter forward and backwards. Defaults to True.

    Returns:
        npt.NDArray: Filtered signal
    """
    sos = get_butter_sos(lowcut=lowcut, highcut=highcut, sample_rate=sample_rate, order=order)
    if forward_backward:
        return sps.sosfiltfilt(sos, data, axis=axis)
    return sps.sosfilt(sos, data, axis=axis)

generate_arm_biquad_sos(lowcut, highcut, sample_rate, order=3, var_name='biquadFilter')

Generate ARM CMSIS second order section coefficients.

Parameters:

  • lowcut (float) –

    Lower cutoff in Hz.

  • highcut (float) –

    Upper cutoff in Hz.

  • sample_rate (float) –

    Sampling rate in Hz.

  • order (int, default: 3 ) –

    Filter order. Defaults to 3.

  • var_name (str, default: 'biquadFilter' ) –

    Variable name. Defaults to 'biquadFilter'.

Returns:

  • str ( str ) –

    ARM CMSIS second order section coefficients.

Source code in physiokit/signal/filter.py
def generate_arm_biquad_sos(
    lowcut: float,
    highcut: float,
    sample_rate: float,
    order: int = 3,
    var_name: str = "biquadFilter",
) -> str:
    """Generate ARM CMSIS second order section coefficients.

    Args:
        lowcut (float): Lower cutoff in Hz.
        highcut (float): Upper cutoff in Hz.
        sample_rate (float): Sampling rate in Hz.
        order (int, optional): Filter order. Defaults to 3.
        var_name (str, optional): Variable name. Defaults to 'biquadFilter'.

    Returns:
        str: ARM CMSIS second order section coefficients.
    """
    sos = get_butter_sos(lowcut=lowcut, highcut=highcut, sample_rate=sample_rate, order=order)
    # Each section needs to be mapped as follows:
    #   [b0, b1, b2, a0, a1, a2] -> [b0, b1, b2, -a1, -a2]
    sec_len_name = f"{var_name.upper()}_NUM_SECS"
    arm_sos = sos[:, [0, 1, 2, 4, 5]] * [1, 1, 1, -1, -1]
    coef_str = ", ".join(f"{os.linesep:<4}{c}" if i % 5 == 0 else f"{c}" for i, c in enumerate(arm_sos.flatten()))
    arm_str = (
        f"#define {sec_len_name} ({order:0d}){os.linesep}"
        f"static float32_t {var_name}State[2 * {sec_len_name}];{os.linesep}"
        f"static float32_t {var_name}[5 * {sec_len_name}] = {{ {coef_str}\n}};{os.linesep}"
    )
    return arm_str

get_butter_sos(lowcut=None, highcut=None, sample_rate=1000, order=3) cached

Compute biquad filter coefficients as SOS. This function caches. For lowpass, lowcut is required and highcut is ignored. For highpass, highcut is required and lowcut is ignored. For bandpass, both lowcut and highcut are required.

Parameters:

  • lowcut (float | None, default: None ) –

    Lower cutoff in Hz. Defaults to None.

  • highcut (float | None, default: None ) –

    Upper cutoff in Hz. Defaults to None.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • order (int, default: 3 ) –

    Filter order. Defaults to 3.

Returns:

  • NDArray

    npt.NDArray: SOS

Source code in physiokit/signal/filter.py
@functools.cache
def get_butter_sos(
    lowcut: float | None = None,
    highcut: float | None = None,
    sample_rate: float = 1000,
    order: int = 3,
) -> npt.NDArray:
    """Compute biquad filter coefficients as SOS. This function caches.
    For lowpass, lowcut is required and highcut is ignored.
    For highpass, highcut is required and lowcut is ignored.
    For bandpass, both lowcut and highcut are required.

    Args:
        lowcut (float|None): Lower cutoff in Hz. Defaults to None.
        highcut (float|None): Upper cutoff in Hz. Defaults to None.
        sample_rate (float): Sampling rate in Hz. Defaults to 1000 Hz.
        order (int, optional): Filter order. Defaults to 3.

    Returns:
        npt.NDArray: SOS
    """
    nyq = sample_rate / 2
    if lowcut is not None and highcut is not None:
        freqs = [lowcut / nyq, highcut / nyq]
        btype = "bandpass"
    elif lowcut is not None:
        freqs = lowcut / nyq
        btype = "highpass"
    elif highcut is not None:
        freqs = highcut / nyq
        btype = "lowpass"
    else:
        raise ValueError("At least one of lowcut or highcut must be specified")
    sos = sps.butter(order, freqs, btype=btype, output="sos")
    return sos

moving_gradient_filter(data, sample_rate=1000, sig_window=0.1, avg_window=1.0, sig_prom_weight=1.5, mode='nearest', fval=0)

Compute moving gradient filter to identify peaks in stream of data.

Parameters:

  • data (array) –

    Data stream.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • sig_window (float, default: 0.1 ) –

    Window size in seconds to compute signal gradient. Defaults to 0.1 s.

  • avg_window (float, default: 1.0 ) –

    Window size in seconds to compute average gradient. Defaults to 1.0 s.

  • sig_prom_weight (float, default: 1.5 ) –

    Weight to compute minimum signal height. Defaults to 1.5.

Returns:

  • array ( NDArray ) –

    Moving gradient filter.

Source code in physiokit/signal/filter.py
def moving_gradient_filter(
    data: npt.NDArray,
    sample_rate: float = 1000,
    sig_window: float = 0.1,
    avg_window: float = 1.0,
    sig_prom_weight: float = 1.5,
    mode: str = "nearest",
    fval=0,
) -> npt.NDArray:
    """Compute moving gradient filter to identify peaks in stream of data.

    Args:
        data (array): Data stream.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        sig_window (float, optional): Window size in seconds to compute signal gradient. Defaults to 0.1 s.
        avg_window (float, optional): Window size in seconds to compute average gradient. Defaults to 1.0 s.
        sig_prom_weight (float, optional): Weight to compute minimum signal height. Defaults to 1.5.

    Returns:
        array: Moving gradient filter.
    """
    # Compute gradient of signal and average.
    abs_grad = np.abs(np.gradient(data))
    sig_kernel = int(np.rint(sig_window * sample_rate))
    avg_kernel = int(np.rint(avg_window * sample_rate))

    # Smooth gradients
    sig_grad = spn.uniform_filter1d(abs_grad, sig_kernel, mode=mode, cval=fval)
    avg_grad = spn.uniform_filter1d(sig_grad, avg_kernel, mode=mode, cval=fval)

    # Apply prominance weight
    min_qrs_height = sig_prom_weight * avg_grad

    # Remove baseline
    rst = sig_grad - min_qrs_height
    return rst

normalize_signal(data, eps=0.001, axis=-1)

Normalize signal about its mean and std.

Parameters:

  • data (NDArray) –

    Signal

  • eps (float, default: 0.001 ) –

    Epsilon added to st. dev. Defaults to 1e-3.

  • axis (int, default: -1 ) –

    Axis to normalize along. Defaults to -1.

Returns:

  • NDArray

    npt.NDArray: Normalized signal

Source code in physiokit/signal/filter.py
def normalize_signal(data: npt.NDArray, eps: float = 1e-3, axis: int = -1) -> npt.NDArray:
    """Normalize signal about its mean and std.

    Args:
        data (npt.NDArray): Signal
        eps (float, optional): Epsilon added to st. dev. Defaults to 1e-3.
        axis (int, optional): Axis to normalize along. Defaults to -1.

    Returns:
        npt.NDArray: Normalized signal
    """
    mu = np.nanmean(data, axis=axis)
    std = np.nanstd(data, axis=axis) + eps
    return (data - mu) / std

quotient_filter_mask(data, mask=None, iterations=2, lowcut=0.8, highcut=1.2)

Applies a quotient filter to identify outliers from list.

Parameters:

  • data (NDArray) –

    Signal

  • mask (NDArray | None, default: None ) –

    Rejection mask. Defaults to None.

  • iterations (int, default: 2 ) –
    iterations to apply. Defaults to 2.
  • lowcut (float, default: 0.8 ) –

    Lower cutoff ratio. Defaults to 0.8.

  • highcut (float, default: 1.2 ) –

    Upper cutoff ratio. Defaults to 1.2.

Returns:

  • NDArray

    npt.NDArray: Rejection mask 0=accept, 1=reject.

Source code in physiokit/signal/filter.py
def quotient_filter_mask(
    data: npt.NDArray, mask: npt.NDArray | None = None, iterations: int = 2, lowcut: float = 0.8, highcut: float = 1.2
) -> npt.NDArray:
    """Applies a quotient filter to identify outliers from list.

    Args:
        data (npt.NDArray): Signal
        mask (npt.NDArray | None, optional): Rejection mask. Defaults to None.
        iterations (int, optional): # iterations to apply. Defaults to 2.
        lowcut (float, optional): Lower cutoff ratio. Defaults to 0.8.
        highcut (float, optional): Upper cutoff ratio. Defaults to 1.2.

    Returns:
        npt.NDArray: Rejection mask 0=accept, 1=reject.
    """

    if mask is None:
        mask = np.zeros_like(data, dtype=int)

    for _ in range(iterations):
        # Get indices of intervals to be filtered
        filt_idxs = np.where(mask == 0)[0]
        if filt_idxs.size <= 1:
            break
        filt_ints = data[filt_idxs]
        # Compute quotient of each interval with the next
        filt_deltas = np.zeros(filt_ints.size)
        filt_deltas[1:] = filt_ints[:-1] / filt_ints[1:]
        filt_deltas[0] = filt_deltas[1]
        # Get indices of intervals that are outside the range
        delta_idxs = np.where((filt_deltas < lowcut) | (filt_deltas > highcut))[0]
        # Update mask with rejected intervals
        mask[filt_idxs[delta_idxs]] = 1
        # Break if no intervals are rejected
        if delta_idxs.size == 0:
            break
    # END FOR

    return mask

remove_baseline_wander(data, cutoff=0.05, quality=0.005, sample_rate=1000, axis=-1, forward_backward=True)

Remove baseline wander from signal using a notch filter.

Parameters:

  • data (NDArray) –

    Signal

  • cutoff (float, default: 0.05 ) –

    Cutoff frequency in Hz. Defaults to 0.05.

  • quality (float, default: 0.005 ) –

    Quality factor. Defaults to 0.005.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • axis (int, default: -1 ) –

    Axis to filter along. Defaults to 0.

  • forward_backward (bool, default: True ) –

    Apply filter forward and backwards. Defaults to True.

Returns:

  • NDArray

    npt.NDArray: Filtered signal

Source code in physiokit/signal/filter.py
def remove_baseline_wander(
    data: npt.NDArray,
    cutoff: float = 0.05,
    quality: float = 0.005,
    sample_rate: float = 1000,
    axis: int = -1,
    forward_backward: bool = True,
) -> npt.NDArray:
    """Remove baseline wander from signal using a notch filter.

    Args:
        data (npt.NDArray): Signal
        cutoff (float, optional): Cutoff frequency in Hz. Defaults to 0.05.
        quality (float, optional): Quality factor. Defaults to 0.005.
        sample_rate (float): Sampling rate in Hz. Defaults to 1000 Hz.
        axis (int, optional): Axis to filter along. Defaults to 0.
        forward_backward (bool, optional): Apply filter forward and backwards. Defaults to True.

    Returns:
        npt.NDArray: Filtered signal
    """
    b, a = sps.iirnotch(cutoff, Q=quality, fs=sample_rate)
    if forward_backward:
        return sps.filtfilt(b, a, data, axis=axis)
    return sps.lfilter(b, a, data, axis=axis)

resample_categorical(data, sample_rate, target_rate, axis=0)

Resample categorical data using nearest neighbor.

Parameters:

  • data (NDArray) –

    Signal

  • sample_rate (float) –

    Signal sampling rate

  • target_rate (float) –

    Target sampling rate

  • axis (int, default: 0 ) –

    Axis to resample along. Defaults to 0.

Returns:

  • NDArray

    npt.NDArray: Resampled signal

Source code in physiokit/signal/filter.py
def resample_categorical(data: npt.NDArray, sample_rate: float, target_rate: float, axis: int = 0) -> npt.NDArray:
    """Resample categorical data using nearest neighbor.

    Args:
        data (npt.NDArray): Signal
        sample_rate (float): Signal sampling rate
        target_rate (float): Target sampling rate
        axis (int, optional): Axis to resample along. Defaults to 0.

    Returns:
        npt.NDArray: Resampled signal
    """
    if sample_rate == target_rate:
        return data
    ratio = target_rate / sample_rate
    actual_length = data.shape[axis]
    target_length = int(np.round(data.shape[axis] * ratio))
    interp_fn = spi.interp1d(np.arange(0, actual_length), data, kind="nearest", axis=axis)
    return interp_fn(np.arange(0, target_length)).astype(data.dtype)

resample_signal(data, sample_rate=1000, target_rate=500, axis=-1)

Resample signal using scipy FFT-based resample routine.

NOTE: For very large signals, this may be slow. Consider using resample_poly instead.

Parameters:

  • data (NDArray) –

    Signal

  • sample_rate (float, default: 1000 ) –

    Signal sampling rate. Defaults to 1000 Hz.

  • target_rate (float, default: 500 ) –

    Target sampling rate. Defaults to 500 Hz.

  • axis (int, default: -1 ) –

    Axis to resample along. Defaults to -1.

Returns:

  • NDArray

    npt.NDArray: Resampled signal

Source code in physiokit/signal/filter.py
def resample_signal(
    data: npt.NDArray, sample_rate: float = 1000, target_rate: float = 500, axis: int = -1
) -> npt.NDArray:
    """Resample signal using scipy FFT-based resample routine.

    NOTE: For very large signals, this may be slow. Consider using resample_poly instead.

    Args:
        data (npt.NDArray): Signal
        sample_rate (float): Signal sampling rate. Defaults to 1000 Hz.
        target_rate (float): Target sampling rate. Defaults to 500 Hz.
        axis (int, optional): Axis to resample along. Defaults to -1.

    Returns:
        npt.NDArray: Resampled signal
    """
    desired_length = int(np.round(data.shape[axis] * target_rate / sample_rate))
    return sps.resample(data, desired_length, axis=axis)

physiokit.signal.noise

Add various noise sources to signal.

add_baseline_wander(data, amplitude=0.1, frequency=0.05, sample_rate=1000, signal_sd=None)

Add baseline wander to signal.

Parameters:

  • data (NDArray) –

    Signal

  • amplitude (float, default: 0.1 ) –

    Amplitude in st dev. Defaults to 0.1.

  • frequency (float, default: 0.05 ) –

    Baseline wander frequency. Defaults to 0.05 Hz.

  • sample_rate (float, default: 1000 ) –

    Sample rate in Hz. Defaults to 1000 Hz.

  • signal_sd (float | None, default: None ) –

    Signal standard deviation. Defaults to None.

Returns:

  • NDArray

    npt.NDArray: Signal w/ baseline wander

Source code in physiokit/signal/noise.py
def add_baseline_wander(
    data: npt.NDArray,
    amplitude: float = 0.1,
    frequency: float = 0.05,
    sample_rate: float = 1000,
    signal_sd: float | None = None,
) -> npt.NDArray:
    """Add baseline wander to signal.

    Args:
        data (npt.NDArray): Signal
        amplitude (float, optional): Amplitude in st dev. Defaults to 0.1.
        frequency (float, optional): Baseline wander frequency. Defaults to 0.05 Hz.
        sample_rate (float, optional): Sample rate in Hz. Defaults to 1000 Hz.
        signal_sd (float|None, optional): Signal standard deviation. Defaults to None.

    Returns:
        npt.NDArray: Signal w/ baseline wander
    """
    if signal_sd is None:
        signal_sd = np.nanstd(data)

    return data + create_noise_distortions(
        len(data),
        signal_sd=signal_sd,
        sample_rate=sample_rate,
        frequencies=frequency,
        amplitudes=amplitude,
        noise_shapes="laplace",
    )

add_burst_noise(data, amplitude=1, frequency=100, num_bursts=1, sample_rate=1000, signal_sd=None)

Add high frequency burst noise to signal.

Parameters:

  • data (NDArray) –

    Signal

  • amplitude (float, default: 1 ) –

    Amplitude in st dev. Defaults to 1.

  • frequency (float, default: 100 ) –

    High frequency burst in Hz. Defaults to 100 Hz.

  • num_bursts (int, default: 1 ) –
    bursts to inject. Defaults to 1.
  • sample_rate (float, default: 1000 ) –

    Sample rate in Hz. Defaults to 1000 Hz.

  • signal_sd (float | None, default: None ) –

    Signal standard deviation. Defaults to None.

Returns:

  • NDArray

    npt.NDArray: Signal w/ burst noise

Source code in physiokit/signal/noise.py
def add_burst_noise(
    data: npt.NDArray,
    amplitude: float = 1,
    frequency: float = 100,
    num_bursts: int = 1,
    sample_rate: float = 1000,
    signal_sd: float | None = None,
) -> npt.NDArray:
    """Add high frequency burst noise to signal.

    Args:
        data (npt.NDArray): Signal
        amplitude (float, optional): Amplitude in st dev. Defaults to 1.
        frequency (float, optional): High frequency burst in Hz. Defaults to 100 Hz.
        num_bursts (int, optional): # bursts to inject. Defaults to 1.
        sample_rate (float, optional): Sample rate in Hz. Defaults to 1000 Hz.
        signal_sd (float|None, optional): Signal standard deviation. Defaults to None.

    Returns:
        npt.NDArray: Signal w/ burst noise
    """

    if signal_sd is None:
        signal_sd = np.nanstd(data)
    return data + create_noise_artifacts(
        len(data),
        signal_sd=signal_sd,
        sample_rate=sample_rate,
        frequency=frequency,
        amplitude=amplitude,
        num_artifacts=num_bursts,
        min_artifact_percent=0.001,
        max_artifact_percent=0.01,
        artifacts_shape="laplace",
    )

add_emg_noise(data, scale=1e-05, sample_rate=1000)

Add EMG noise to signal.

Parameters:

  • data (NDArray) –

    Signal

  • scale (float, default: 1e-05 ) –

    Noise scale. Defaults to 1e-5.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000.

Returns:

  • NDArray

    npt.NDArray: Signal with EMG noise

Source code in physiokit/signal/noise.py
def add_emg_noise(data: npt.NDArray, scale: float = 1e-5, sample_rate: float = 1000) -> npt.NDArray:
    """Add EMG noise to signal.

    Args:
        data (npt.NDArray): Signal
        scale (float, optional): Noise scale. Defaults to 1e-5.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000.

    Returns:
        npt.NDArray: Signal with EMG noise
    """
    noise = np.tile(
        np.sin(np.linspace(-0.5 * np.pi, 1.5 * np.pi, int(sample_rate)) * 10 * sample_rate),
        int(np.ceil(data.size // sample_rate)),
    )
    return data + scale * noise[: data.size]

add_lead_noise(data, scale=0.001)

Add lead noise to signal.

Parameters:

  • data (NDArray) –

    Signal

  • scale (float, default: 0.001 ) –

    Noise scale. Defaults to 1.

Returns:

  • NDArray

    npt.NDArray: Signal with lead noise

Source code in physiokit/signal/noise.py
def add_lead_noise(data: npt.NDArray, scale: float = 1e-3) -> npt.NDArray:
    """Add lead noise to signal.

    Args:
        data (npt.NDArray): Signal
        scale (float, optional): Noise scale. Defaults to 1.

    Returns:
        npt.NDArray: Signal with lead noise
    """
    return data + np.random.normal(0, scale, size=data.shape)

add_motion_noise(data, amplitude=0.2, frequency=0.5, sample_rate=1000, signal_sd=None)

Add motion noise to signal.

Parameters:

  • data (NDArray) –

    Signal

  • amplitude (float, default: 0.2 ) –

    Amplitude in st dev. Defaults to 0.2.

  • frequency (float, default: 0.5 ) –

    Motion frequency in Hz. Defaults to 0.5 Hz.

  • sample_rate (float, default: 1000 ) –

    Sample rate in Hz. Defaults to 1000 Hz.

  • signal_sd (float | None, default: None ) –

    Signal standard deviation. Defaults to None.

Returns:

  • NDArray

    npt.NDArray: Signal w/ motion noise

Source code in physiokit/signal/noise.py
def add_motion_noise(
    data: npt.NDArray,
    amplitude: float = 0.2,
    frequency: float = 0.5,
    sample_rate: float = 1000,
    signal_sd: float | None = None,
) -> npt.NDArray:
    """Add motion noise to signal.

    Args:
        data (npt.NDArray): Signal
        amplitude (float, optional): Amplitude in st dev. Defaults to 0.2.
        frequency (float, optional): Motion frequency in Hz. Defaults to 0.5 Hz.
        sample_rate (float, optional): Sample rate in Hz. Defaults to 1000 Hz.
        signal_sd (float|None, optional): Signal standard deviation. Defaults to None.

    Returns:
        npt.NDArray: Signal w/ motion noise
    """
    if signal_sd is None:
        signal_sd = np.nanstd(data)

    return data + create_noise_distortions(
        len(data),
        signal_sd=signal_sd,
        sample_rate=sample_rate,
        frequencies=frequency,
        amplitudes=amplitude,
        noise_shapes="laplace",
    )

add_noise_sources(data, amplitudes, frequencies, noise_shapes, sample_rate=1000, signal_sd=None)

Add multiple noise sources to signal.

Parameters:

  • data (NDArray) –

    Signal

  • amplitudes (list[float]) –

    Amplitudes in st dev.

  • frequencies (list[float]) –

    Frequencies in Hz.

  • noise_shapes (list[str]) –

    Noise shapes.

  • sample_rate (float, default: 1000 ) –

    Sample rate in Hz. Defaults to 1000 Hz.

  • signal_sd (float | None, default: None ) –

    Signal standard deviation. Defaults to None.

Returns:

  • NDArray

    npt.NDArray: Signal w/ noise

Source code in physiokit/signal/noise.py
def add_noise_sources(
    data: npt.NDArray,
    amplitudes: list[float],
    frequencies: list[float],
    noise_shapes: list[str],
    sample_rate: float = 1000,
    signal_sd: float | None = None,
) -> npt.NDArray:
    """Add multiple noise sources to signal.

    Args:
        data (npt.NDArray): Signal
        amplitudes (list[float]): Amplitudes in st dev.
        frequencies (list[float]): Frequencies in Hz.
        noise_shapes (list[str]): Noise shapes.
        sample_rate (float, optional): Sample rate in Hz. Defaults to 1000 Hz.
        signal_sd (float|None, optional): Signal standard deviation. Defaults to None.

    Returns:
        npt.NDArray: Signal w/ noise
    """
    if signal_sd is None:
        signal_sd = np.nanstd(data)

    return data + create_noise_distortions(
        len(data),
        signal_sd=signal_sd,
        sample_rate=sample_rate,
        frequencies=frequencies,
        amplitudes=amplitudes,
        noise_shapes=noise_shapes,
    )

add_powerline_noise(data, amplitude=0.01, frequency=50, sample_rate=1000, signal_sd=None)

Add powerline noise to signal.

Parameters:

  • data (NDArray) –

    Signal

  • amplitude (float, default: 0.01 ) –

    Amplitude in st dev. Defaults to 0.01.

  • frequency (float, default: 50 ) –

    Powerline frequency in Hz. Defaults to 50 Hz.

  • sample_rate (float, default: 1000 ) –

    Sample rate in Hz. Defaults to 1000 Hz.

  • signal_sd (float | None, default: None ) –

    Signal standard deviation. Defaults to None.

Returns:

  • NDArray

    npt.NDArray: Signal w/ powerline noise

Source code in physiokit/signal/noise.py
def add_powerline_noise(
    data: npt.NDArray,
    amplitude: float = 0.01,
    frequency: float = 50,
    sample_rate: float = 1000,
    signal_sd: float | None = None,
) -> npt.NDArray:
    """Add powerline noise to signal.

    Args:
        data (npt.NDArray): Signal
        amplitude (float, optional): Amplitude in st dev. Defaults to 0.01.
        frequency (float, optional): Powerline frequency in Hz. Defaults to 50 Hz.
        sample_rate (float, optional): Sample rate in Hz. Defaults to 1000 Hz.
        signal_sd (float|None, optional): Signal standard deviation. Defaults to None.

    Returns:
        npt.NDArray: Signal w/ powerline noise
    """
    return data + create_powerline_noise(
        len(data),
        signal_sd=signal_sd,
        sample_rate=sample_rate,
        frequency=frequency,
        amplitude=amplitude,
    )

add_random_scaling(data, lower=0.5, upper=2.0)

Randomly scale signal.

Parameters:

  • data (NDArray) –

    Signal

  • lower (float, default: 0.5 ) –

    Lower bound. Defaults to 0.5.

  • upper (float, default: 2.0 ) –

    Upper bound. Defaults to 2.0.

Returns:

  • NDArray

    npt.NDArray: Signal with random scaling

Source code in physiokit/signal/noise.py
def add_random_scaling(data: npt.NDArray, lower: float = 0.5, upper: float = 2.0) -> npt.NDArray:
    """Randomly scale signal.

    Args:
        data (npt.NDArray): Signal
        lower (float, optional): Lower bound. Defaults to 0.5.
        upper (float, optional): Upper bound. Defaults to 2.0.

    Returns:
        npt.NDArray: Signal with random scaling
    """
    return data * random.uniform(lower, upper)

add_signal_attenuation()

Add signal attenuation to signal.

Source code in physiokit/signal/noise.py
def add_signal_attenuation():
    """Add signal attenuation to signal."""
    raise NotImplementedError()

add_signal_cutout()

Add cutout augmentation to signal.

Source code in physiokit/signal/noise.py
def add_signal_cutout():
    """Add cutout augmentation to signal."""
    raise NotImplementedError()

add_signal_shift(data, shift_amount=0.1)

Add signal shift augmentation to signal.

Source code in physiokit/signal/noise.py
def add_signal_shift(data: npt.NDArray, shift_amount: float = 0.1):
    """Add signal shift augmentation to signal."""
    shift_idx = random.randint(0, data.size)
    rst = data.copy()
    rst[shift_idx:] += shift_amount
    return rst

physiokit.signal.transform

compute_fft(data, sample_rate=1000, fft_len=None, window='blackman', axis=-1)

Compute FFT of RSP signal.

Parameters:

  • data (array) –

    RSP signal.

  • sample_rate (float, default: 1000 ) –

    Sampling rate in Hz. Defaults to 1000 Hz.

  • fft_len (int | None, default: None ) –

    FFT length. Defaults to None.

  • window (str, default: 'blackman' ) –

    Window to apply. Defaults to 'blackman'.

  • axis (int, default: -1 ) –

    Axis to compute FFT. Defaults to -1.

Returns:

  • tuple[NDArray, NDArray]

    tuple[array, array]: Frequencies and FFT of signal.

Source code in physiokit/signal/transform.py
def compute_fft(
    data: npt.NDArray,
    sample_rate: float = 1000,
    fft_len: int | None = None,
    window: str = "blackman",
    axis: int = -1,
) -> tuple[npt.NDArray, npt.NDArray]:
    """Compute FFT of RSP signal.

    Args:
        data (array): RSP signal.
        sample_rate (float, optional): Sampling rate in Hz. Defaults to 1000 Hz.
        fft_len (int | None, optional): FFT length. Defaults to None.
        window (str, optional): Window to apply. Defaults to 'blackman'.
        axis (int, optional): Axis to compute FFT. Defaults to -1.

    Returns:
        tuple[array, array]: Frequencies and FFT of signal.
    """
    data_len = data.shape[axis]
    if fft_len is None:
        fft_len = int(2 ** np.ceil(np.log2(data_len)))
    if window == "blackman":
        fft_win = np.blackman(data_len)
        amp_corr = 1.93
    else:
        fft_win = np.ones(data_len)
        amp_corr = 1.0
    freqs = np.fft.fftfreq(fft_len, 1 / sample_rate)
    sp = amp_corr * np.fft.fft(fft_win * data, fft_len, axis=axis) / data_len
    return freqs, sp

rescale_signal(x, old_min, old_max, new_min, new_max, clip=True)

Rescale signal to new range.

Parameters:

  • x (NDArray) –

    Signal

  • old_min (float) –

    Old minimum

  • old_max (float) –

    Old maximum

  • new_min (float) –

    New minimum

  • new_max (float) –

    New maximum

  • clip (bool, default: True ) –

    Clip values to range. Defaults to True.

Returns:

  • NDArray

    npt.NDArray: Rescaled signal

Source code in physiokit/signal/transform.py
def rescale_signal(
    x: npt.NDArray, old_min: float, old_max: float, new_min: float, new_max: float, clip: bool = True
) -> npt.NDArray:
    """Rescale signal to new range.

    Args:
        x (npt.NDArray): Signal
        old_min (float): Old minimum
        old_max (float): Old maximum
        new_min (float): New minimum
        new_max (float): New maximum
        clip (bool, optional): Clip values to range. Defaults to True.

    Returns:
        npt.NDArray: Rescaled signal
    """
    if clip:
        x = np.clip(x, old_min, old_max)
    return (x - old_min) / (old_max - old_min) * (new_max - new_min) + new_min