Skip to content

ptbxl

Classes

PtbxlScpCode

PTBXL SCP codes

PtbxlDataset

PtbxlDataset(leads: list[int] | None = None, **kwargs)

PTBXL dataset consists of 21837 clinical 12-lead ECGs from 18885 patients.

Parameters:

  • leads

    (list[int] | None, default: None ) –

    Leads to use. Defaults to None.

Source code in heartkit/datasets/ptbxl.py
def __init__(self, leads: list[int] | None = None, **kwargs) -> None:
    """PTBXL dataset consists of 21837 clinical 12-lead ECGs from 18885 patients.

    Args:
        leads (list[int] | None, optional): Leads to use. Defaults to None.

    """
    super().__init__(**kwargs)
    self.leads = leads or list(range(12))
    self._cached_data: dict[str, np.ndarray] = {}

Attributes

name property
name: str

Dataset name

sampling_rate property
sampling_rate: int

Sampling rate in Hz

mean property
mean: float

Dataset mean

std property
std: float

Dataset st dev

patient_ids property
patient_ids: NDArray

Get dataset patient IDs

Returns:

  • NDArray

    npt.NDArray: patient IDs

Functions

get_train_patient_ids
get_train_patient_ids() -> npt.NDArray

Get dataset training patient IDs

Returns:

  • NDArray

    npt.NDArray: patient IDs

Source code in heartkit/datasets/ptbxl.py
def get_train_patient_ids(self) -> npt.NDArray:
    """Get dataset training patient IDs

    Returns:
        npt.NDArray: patient IDs
    """
    return self.patient_ids[:18500]
get_test_patient_ids
get_test_patient_ids() -> npt.NDArray

Get dataset patient IDs reserved for testing only

Returns:

  • NDArray

    npt.NDArray: patient IDs

Source code in heartkit/datasets/ptbxl.py
def get_test_patient_ids(self) -> npt.NDArray:
    """Get dataset patient IDs reserved for testing only

    Returns:
        npt.NDArray: patient IDs
    """
    return self.patient_ids[18500:]
label_key
label_key(label_type: str = 'scp') -> str

Get label key

Parameters:

  • label_type
    (str, default: 'scp' ) –

    Label type. Defaults to "scp".

Returns:

  • str ( str ) –

    Label key

Source code in heartkit/datasets/ptbxl.py
def label_key(self, label_type: str = "scp") -> str:
    """Get label key

    Args:
        label_type (str, optional): Label type. Defaults to "scp".

    Returns:
        str: Label key
    """
    if label_type == "scp":
        return "slabels"
    if label_type == "beat":
        return "blabels"
    raise ValueError(f"Invalid label type: {label_type}")
patient_data
patient_data(patient_id: int) -> Generator[PatientData, None, None]

Get patient data

Note

If cacheable, data is cached in memory and returned as dict Otherwise, data is provided as HDF5 objects

Patient Data Format
  • data: ECG data of shape (12, N)
  • slabels: SCP labels of shape (N, 2)
  • blabels: Beat labels of shape (N, 2)

Parameters:

  • patient_id
    (int) –

    Patient ID

Returns:

  • None

    Generator[PatientData, None, None]: Patient data

Source code in heartkit/datasets/ptbxl.py
@contextlib.contextmanager
def patient_data(self, patient_id: int) -> Generator[PatientData, None, None]:
    """Get patient data

    !!! note
        If cacheable, data is cached in memory and returned as dict
        Otherwise, data is provided as HDF5 objects

    Patient Data Format:
        - data: ECG data of shape (12, N)
        - slabels: SCP labels of shape (N, 2)
        - blabels: Beat labels of shape (N, 2)

    Args:
        patient_id (int): Patient ID

    Returns:
        Generator[PatientData, None, None]: Patient data
    """
    pt_path = self.path / f"{self._pt_key(patient_id)}.h5"
    if self.cacheable:
        if patient_id not in self._cached_data:
            pt_data = {}
            with h5py.File(pt_path, mode="r") as h5:
                pt_data["data"] = h5["data"][:]
                pt_data["slabels"] = h5["slabels"][:]
                pt_data["blabels"] = h5["blabels"][:]
            self._cached_data[patient_id] = pt_data
        # END IF
        yield self._cached_data[patient_id]
    else:
        with h5py.File(pt_path, mode="r") as h5:
            yield h5
signal_generator
signal_generator(
    patient_generator: PatientGenerator, frame_size: int, samples_per_patient: int = 1, target_rate: int | None = None
) -> Generator[npt.NDArray, None, None]

Generate random frames.

Parameters:

  • patient_generator
    (PatientGenerator) –

    Generator that yields patient data.

  • frame_size
    (int) –

    Frame size

  • samples_per_patient
    (int, default: 1 ) –

    Samples per patient. Defaults to 1.

  • target_rate
    (int | None, default: None ) –

    Target rate. Defaults to None.

Returns:

  • None

    Generator[npt.NDArray, None, None]: Generator of input data of shape (frame_size, 1)

Source code in heartkit/datasets/ptbxl.py
def signal_generator(
    self,
    patient_generator: PatientGenerator,
    frame_size: int,
    samples_per_patient: int = 1,
    target_rate: int | None = None,
) -> Generator[npt.NDArray, None, None]:
    """Generate random frames.

    Args:
        patient_generator (PatientGenerator): Generator that yields patient data.
        frame_size (int): Frame size
        samples_per_patient (int, optional): Samples per patient. Defaults to 1.
        target_rate (int | None, optional): Target rate. Defaults to None.

    Returns:
        Generator[npt.NDArray, None, None]: Generator of input data of shape (frame_size, 1)
    """
    if target_rate is None:
        target_rate = self.sampling_rate

    input_size = int(np.ceil((self.sampling_rate / target_rate) * frame_size))

    for pt in patient_generator:
        with self.patient_data(pt) as h5:
            data: h5py.Dataset = h5["data"][:]
        # END WITH
        for _ in range(samples_per_patient):
            lead = random.choice(self.leads)
            start = np.random.randint(0, data.shape[1] - input_size)
            x = data[lead, start : start + input_size].squeeze()
            x = np.nan_to_num(x).astype(np.float32)
            if self.sampling_rate != target_rate:
                x = pk.signal.resample_signal(x, self.sampling_rate, target_rate, axis=0)
                x = x[:frame_size]  # truncate to frame size
            # END IF
            yield x
signal_label_generator
signal_label_generator(
    patient_generator: PatientGenerator,
    frame_size: int,
    samples_per_patient: int = 1,
    target_rate: int | None = None,
    label_map: dict[int, int] | None = None,
    label_type: str = "scp",
    label_format: str | None = None,
) -> Generator[tuple[npt.NDArray, int], None, None]

Generate frames w/ labels using patient generator.

Parameters:

  • patient_generator
    (PatientGenerator) –

    Patient Generator

  • frame_size
    (int) –

    Frame size

  • samples_per_patient
    (int, default: 1 ) –

    Samples per patient. Defaults to 1.

  • target_rate
    (int, default: None ) –

    Target rate. Defaults to None.

  • label_map
    (dict[int, int], default: None ) –

    Label map. Defaults to None.

  • label_type
    (str, default: 'scp' ) –

    Class type. Defaults to "scp".

  • label_format
    (str, default: None ) –

    Label format. Defaults to None.

Returns:

  • None

    Generator[tuple[npt.NDArray, int], None, None]: Generator of input data and labels

Yields:

Source code in heartkit/datasets/ptbxl.py
def signal_label_generator(
    self,
    patient_generator: PatientGenerator,
    frame_size: int,
    samples_per_patient: int = 1,
    target_rate: int | None = None,
    label_map: dict[int, int] | None = None,
    label_type: str = "scp",
    label_format: str | None = None,
) -> Generator[tuple[npt.NDArray, int], None, None]:
    """Generate frames w/ labels using patient generator.

    Args:
        patient_generator (PatientGenerator): Patient Generator
        frame_size (int): Frame size
        samples_per_patient (int, optional): Samples per patient. Defaults to 1.
        target_rate (int, optional): Target rate. Defaults to None.
        label_map (dict[int, int], optional): Label map. Defaults to None.
        label_type (str, optional): Class type. Defaults to "scp".
        label_format (str, optional): Label format. Defaults to None.

    Returns:
        Generator[tuple[npt.NDArray, int], None, None]: Generator of input data and labels

    Yields:
        tuple[npt.NDArray, int]: Input data and label
    """
    if target_rate is None:
        target_rate = self.sampling_rate
    # END IF

    # Target labels and mapping
    tgt_labels = sorted(set((lbl for lbl in label_map.values() if lbl != -1)))
    label_key = self.label_key(label_type)
    num_classes = len(tgt_labels)

    # If samples_per_patient is a list, then it must be the same length as nclasses
    if isinstance(samples_per_patient, Iterable):
        samples_per_tgt = samples_per_patient
    else:
        num_per_tgt = int(max(1, samples_per_patient / num_classes))
        samples_per_tgt = num_classes * [num_per_tgt]
    # END IF

    input_size = int(np.ceil((self.sampling_rate / target_rate) * frame_size))

    for pt in patient_generator:
        # 1. Grab patient scp label (fixed for all samples)
        with self.patient_data(pt) as h5:
            data = h5["data"][:]
            slabels = h5[label_key][:]
        # END WITH

        # 2. Map scp labels (skip patient if not in class map == -1)
        pt_lbls = []
        pt_lbl_weights = []
        for i in range(slabels.shape[0]):
            label = label_map.get(int(slabels[i, 0]), -1)
            if label == -1:
                continue
            # END IF
            if label not in pt_lbls:
                pt_lbls.append(label)
                pt_lbl_weights.append(1 + slabels[i, 1])
            else:
                i = pt_lbls.index(label)
                pt_lbl_weights[i] += slabels[i, 1]
            # END IF
        # END FOR
        pt_lbls = np.array(pt_lbls, dtype=np.int32)

        if pt_lbls.size == 0:
            continue
        # END IF

        if label_format == "multi_hot":
            y = np.zeros(num_classes, dtype=np.int32)
            y[pt_lbls] = 1
            # y = np.expand_dims(y, axis=0)
            num_samples = sum((samples_per_tgt[tgt_labels.index(i)] for i in pt_lbls))
        elif label_format == "one_hot":
            y = np.zeros(num_classes, dtype=np.int32)
            pt_lbl = random.choices(pt_lbls, pt_lbl_weights, k=1)[0]
            y[pt_lbl] = 1
            num_samples = samples_per_tgt[tgt_labels.index(pt_lbl)]
        elif label_format is None:
            # Its possible to have multiple labels, we assign based on weights
            y = random.choices(pt_lbls, pt_lbl_weights, k=1)[0]
            num_samples = samples_per_tgt[tgt_labels.index(y)]
        else:
            raise ValueError(f"Invalid label_format: {label_format}")

        # 3. Generate samples based on samples_per_tgt

        for _ in range(num_samples):
            # select random lead and start index
            lead = random.choice(self.leads)
            # lead = self.leads
            start = np.random.randint(0, data.shape[1] - input_size)
            # Extract frame
            x = np.nan_to_num(data[lead, start : start + input_size], posinf=0, neginf=0).astype(np.float32)
            # Resample if needed
            if self.sampling_rate != target_rate:
                x = pk.signal.resample_signal(x, self.sampling_rate, target_rate, axis=0)
                x = x[:frame_size]  # truncate to frame size
            x = np.reshape(x, (frame_size, 1))
            yield x, y
split_train_test_patients
split_train_test_patients(
    patient_ids: NDArray,
    test_size: float,
    label_map: dict[int, int] | None = None,
    label_type: str | None = None,
    label_threshold: int | None = 2,
) -> list[list[int]]

Perform train/test split on patients for given task. NOTE: We only perform inter-patient splits and not intra-patient.

Parameters:

  • patient_ids
    (NDArray) –

    Patient Ids

  • test_size
    (float) –

    Test size

  • label_map
    (dict[int, int], default: None ) –

    Label map. Defaults to None.

  • label_type
    (str, default: None ) –

    Label type. Defaults to None.

  • label_threshold
    (int, default: 2 ) –

    Label threshold. Defaults to 2.

Returns:

  • list[list[int]]

    list[list[int]]: Train and test sets of patient ids

Source code in heartkit/datasets/ptbxl.py
def split_train_test_patients(
    self,
    patient_ids: npt.NDArray,
    test_size: float,
    label_map: dict[int, int] | None = None,
    label_type: str | None = None,
    label_threshold: int | None = 2,
) -> list[list[int]]:
    """Perform train/test split on patients for given task.
    NOTE: We only perform inter-patient splits and not intra-patient.

    Args:
        patient_ids (npt.NDArray): Patient Ids
        test_size (float): Test size
        label_map (dict[int, int], optional): Label map. Defaults to None.
        label_type (str, optional): Label type. Defaults to None.
        label_threshold (int, optional): Label threshold. Defaults to 2.

    Returns:
        list[list[int]]: Train and test sets of patient ids
    """
    stratify = None
    if label_map is not None and label_type is not None:
        patients_labels = self.get_patients_labels(patient_ids, label_map=label_map, label_type=label_type)
        # Select random label for stratification or -1 if no labels
        stratify = np.array([random.choice(x) if len(x) > 0 else -1 for x in patients_labels])

        # Remove patients w/ label counts below threshold
        for i, label in enumerate(sorted(set(label_map.values()))):
            class_counts = np.sum(stratify == label)
            if label_threshold is not None and class_counts < label_threshold:
                stratify[stratify == label] = -1
                logger.warning(f"Removed class {label} w/ only {class_counts} samples")
            # END IF
        # END FOR

        # Remove patients w/o labels
        neg_mask = stratify == -1
        stratify = stratify[~neg_mask]
        patient_ids = patient_ids[~neg_mask]
        num_neg = neg_mask.sum()
        if num_neg > 0:
            logger.debug(f"Removed {num_neg} patients w/ no target class")
        # END IF
    # END IF

    # Get occurence of each class along with class index
    if stratify is not None:
        class_counts = np.zeros(len(label_map), dtype=np.int32)
        logger.debug(f"[{self.name}] Stratify class counts:")
        for i, label in enumerate(sorted(set(label_map.values()))):
            class_counts = np.sum(stratify == label)
            logger.debug(f"Class {label}: {class_counts}")
        # END FOR
    # END IF

    return sklearn.model_selection.train_test_split(
        patient_ids,
        test_size=test_size,
        shuffle=True,
        stratify=stratify,
    )
filter_patients_for_labels
filter_patients_for_labels(
    patient_ids: NDArray, label_map: dict[int, int] | None = None, label_type: str | None = None
) -> npt.NDArray

Filter patients based on labels. Useful to remove patients w/o labels for task to speed up data loading.

Parameters:

  • patient_ids
    (NDArray) –

    Patient ids

  • label_map
    (dict[int, int], default: None ) –

    Label map. Defaults to None.

  • label_type
    (str, default: None ) –

    Label type. Defaults to None.

Returns:

  • NDArray

    npt.NDArray: Filtered patient ids

Source code in heartkit/datasets/ptbxl.py
def filter_patients_for_labels(
    self,
    patient_ids: npt.NDArray,
    label_map: dict[int, int] | None = None,
    label_type: str | None = None,
) -> npt.NDArray:
    """Filter patients based on labels.
    Useful to remove patients w/o labels for task to speed up data loading.

    Args:
        patient_ids (npt.NDArray): Patient ids
        label_map (dict[int, int], optional): Label map. Defaults to None.
        label_type (str, optional): Label type. Defaults to None.

    Returns:
        npt.NDArray: Filtered patient ids
    """

    if label_map is None or label_type is None:
        return patient_ids

    patients_labels = self.get_patients_labels(patient_ids, label_map, label_type)
    # Find any patient with empty list
    label_mask = np.array([len(x) > 0 for x in patients_labels])
    neg_mask = ~label_mask
    num_neg = neg_mask.sum()
    if num_neg > 0:
        logger.debug(f"Removed {num_neg} of {patient_ids.size} patients w/ no target class")
    return patient_ids[~neg_mask]
get_patients_labels
get_patients_labels(patient_ids: NDArray, label_map: dict[int, int], label_type: str = 'scp') -> list[list[int]]

Get class labels for each patient

Parameters:

  • patient_ids
    (NDArray) –

    Patient ids

  • label_map
    (dict[int, int]) –

    Label map

  • label_type
    (str, default: 'scp' ) –

    Label type. Defaults to "scp".

Returns:

  • list[list[int]]

    list[list[int]]: List of class labels per patient

Source code in heartkit/datasets/ptbxl.py
def get_patients_labels(
    self,
    patient_ids: npt.NDArray,
    label_map: dict[int, int],
    label_type: str = "scp",
) -> list[list[int]]:
    """Get class labels for each patient

    Args:
        patient_ids (npt.NDArray): Patient ids
        label_map (dict[int, int]): Label map
        label_type (str, optional): Label type. Defaults to "scp".

    Returns:
        list[list[int]]: List of class labels per patient

    """
    ids = patient_ids.tolist()
    func = functools.partial(self.get_patient_labels, label_map=label_map, label_type=label_type)
    pts_labels = process_map(func, ids, desc=f"Sorting {self.name} labels")
    return pts_labels
get_patient_scp_codes
get_patient_scp_codes(patient_id: int) -> list[int]

Get SCP codes for patient

Parameters:

  • patient_id
    (int) –

    Patient id

Returns:

  • list[int]

    list[int]: List of SCP codes

Source code in heartkit/datasets/ptbxl.py
def get_patient_scp_codes(self, patient_id: int) -> list[int]:
    """Get SCP codes for patient

    Args:
        patient_id (int): Patient id

    Returns:
        list[int]: List of SCP codes

    """
    with self.patient_data(patient_id) as h5:
        codes = h5[self.label_key("scp")][:, 0]
    return np.unique(codes).tolist()
get_patient_labels
get_patient_labels(patient_id: int, label_map: dict[int, int], label_type: str = 'scp') -> list[int]

Get class labels for patient

Parameters:

  • patient_id
    (int) –

    Patient id

Returns:

  • list[int]

    list[int]: List of class labels

Source code in heartkit/datasets/ptbxl.py
def get_patient_labels(self, patient_id: int, label_map: dict[int, int], label_type: str = "scp") -> list[int]:
    """Get class labels for patient

    Args:
        patient_id (int): Patient id

    Returns:
        list[int]: List of class labels

    """
    with self.patient_data(patient_id) as h5:
        labels = h5[self.label_key(label_type)][:, 0]
    labels = np.unique(labels)
    labels: list[int] = [label_map[r] for r in labels if label_map.get(r, -1) != -1]
    return labels
download
download(num_workers: int | None = None, force: bool = False)

Download dataset

This will download preprocessed HDF5 files from S3.

Parameters:

  • num_workers
    (int | None, default: None ) –
    parallel workers. Defaults to None.
  • force
    (bool, default: False ) –

    Force redownload. Defaults to False.

Source code in heartkit/datasets/ptbxl.py
def download(self, num_workers: int | None = None, force: bool = False):
    """Download dataset

    This will download preprocessed HDF5 files from S3.

    Args:
        num_workers (int | None, optional): # parallel workers. Defaults to None.
        force (bool, optional): Force redownload. Defaults to False.
    """
    os.makedirs(self.path, exist_ok=True)
    zip_path = self.path / f"{self.name}.zip"

    did_download = helia.utils.download_s3_file(
        key=f"{self.name}/{self.name}.zip",
        dst=zip_path,
        bucket="ambiq-ai-datasets",
        checksum="size",
    )
    if did_download:
        with zipfile.ZipFile(zip_path, "r") as zf:
            zf.extractall(self.path)
download_raw_dataset
download_raw_dataset(num_workers: int | None = None, force: bool = False)

Downloads full dataset zipfile and converts into individial patient HDF5 files.

Parameters:

  • force
    (bool, default: False ) –

    Whether to force re-download if destination exists. Defaults to False.

  • num_workers
    (int, default: None ) –
    parallel workers. Defaults to os.cpu_count().
Source code in heartkit/datasets/ptbxl.py
def download_raw_dataset(self, num_workers: int | None = None, force: bool = False):
    """Downloads full dataset zipfile and converts into individial patient HDF5 files.

    Args:
        force (bool, optional): Whether to force re-download if destination exists. Defaults to False.
        num_workers (int, optional): # parallel workers. Defaults to os.cpu_count().
    """
    logger.debug("Downloading PTB-XL dataset")
    ds_url = (
        "https://www.physionet.org/static/published-projects/ptb-xl/"
        "ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.2.zip"
    )
    ds_zip_path = self.path / "ptbxl.zip"
    os.makedirs(self.path, exist_ok=True)
    if os.path.exists(ds_zip_path) and not force:
        logger.warning(
            f"Zip file already exists. Please delete or set `force` flag to redownload. PATH={ds_zip_path}"
        )
    else:
        helia.utils.download_file(ds_url, ds_zip_path, progress=True)

    # 2. Extract and convert patient ECG data to H5 files
    logger.debug("Processing PTB-XL patient data")
    self._convert_dataset_zip_to_hdf5(
        zip_path=ds_zip_path,
        force=force,
        num_workers=num_workers,
    )
    logger.debug("Finished PTB-XL patient data")
close
close()

Close dataset

Source code in heartkit/datasets/ptbxl.py
def close(self):
    """Close dataset"""
    self._cached_data.clear()