EcgSyntheticDataset(
num_pts: int = 250,
leads: list[int] | None = None,
params: dict | None = None,
path: str = Path(tempfile.gettempdir()) / "ecg-synthetic",
**kwargs
)
ECG synthetic dataset creates 12-lead ECG signals using physioKIT.
Parameters:
-
num_pts
(int, default:
250
)
–
Number of patients. Defaults to 250.
-
leads
(list[int] | None, default:
None
)
–
Leads to use. Defaults to None.
-
params
(dict | None, default:
None
)
–
ECG synthetic parameters for EcgSyntheticParams. Defaults to None.
-
path
(str, default:
Path(gettempdir()) / 'ecg-synthetic'
)
–
Path to store dataset. Defaults to Path(tempfile.gettempdir()) / "ecg-synthetic".
Example:
import heartkit as hk
ds = hk.datasets.EcgSyntheticDataset(
num_pts=100,
params=dict(
sample_rate=1000, # Hz
duration=10, # seconds
heart_rate=(40, 120),
)
)
with ds.patient_data(patient_id=ds.patient_ids[0]) as pt:
ecg = pt["data"][:]
segs = pt["segmentations"][:]
fids = pt["fiducials"][:]
# END WITH
Source code in heartkit/datasets/ecg_synthetic.py
| def __init__(
self,
num_pts: int = 250,
leads: list[int] | None = None,
params: dict | None = None,
path: str = Path(tempfile.gettempdir()) / "ecg-synthetic",
**kwargs,
) -> None:
"""ECG synthetic dataset creates 12-lead ECG signals using physioKIT.
Args:
num_pts (int, optional): Number of patients. Defaults to 250.
leads (list[int] | None, optional): Leads to use. Defaults to None.
params (dict | None, optional): ECG synthetic parameters for EcgSyntheticParams. Defaults to None.
path (str, optional): Path to store dataset. Defaults to Path(tempfile.gettempdir()) / "ecg-synthetic".
Example:
```python
import heartkit as hk
ds = hk.datasets.EcgSyntheticDataset(
num_pts=100,
params=dict(
sample_rate=1000, # Hz
duration=10, # seconds
heart_rate=(40, 120),
)
)
with ds.patient_data(patient_id=ds.patient_ids[0]) as pt:
ecg = pt["data"][:]
segs = pt["segmentations"][:]
fids = pt["fiducials"][:]
# END WITH
```
"""
super().__init__(path=path, **kwargs)
self._noise_gen = None
self._num_pts = num_pts
self.leads = leads or list(range(12))
self.params = EcgSyntheticParams(**params or {})
|
Attributes
patient_ids
property
Get dataset patient IDs
Returns:
Functions
get_train_patient_ids
get_train_patient_ids() -> npt.NDArray
Get dataset training patient IDs
Returns:
Source code in heartkit/datasets/ecg_synthetic.py
| def get_train_patient_ids(self) -> npt.NDArray:
"""Get dataset training patient IDs
Returns:
npt.NDArray: patient IDs
"""
numel = int(0.80 * self._num_pts)
return self.patient_ids[:numel]
|
get_test_patient_ids
get_test_patient_ids() -> npt.NDArray
Get dataset patient IDs reserved for testing only
Returns:
Source code in heartkit/datasets/ecg_synthetic.py
| def get_test_patient_ids(self) -> npt.NDArray:
"""Get dataset patient IDs reserved for testing only
Returns:
npt.NDArray: patient IDs
"""
numel = int(0.80 * self._num_pts)
return self.patient_ids[numel:]
|
pt_key
Get patient key
Source code in heartkit/datasets/ecg_synthetic.py
| def pt_key(self, patient_id: int):
"""Get patient key"""
return f"{patient_id:05d}"
|
build_cache
Build in-memory cache to speed up data access
Source code in heartkit/datasets/ecg_synthetic.py
| def build_cache(self):
"""Build in-memory cache to speed up data access"""
logger.info(f"Creating synthetic dataset cache with {self._num_pts} patients")
pts_data = process_map(self.load_patient_data, self.patient_ids, desc=f"Building {self.name} cache")
self._cached_data = {self.pt_key(i): pt_data for i, pt_data in enumerate(pts_data)}
|
patient_data
patient_data(patient_id: int) -> Generator[PatientData, None, None]
Get access to patient data
Patient data contains following fields:
- data: ECG signal of shape (12, N)
- segmentations: Segmentation of ECG signal
- fiducials: Fiducials of ECG signal
Parameters:
Returns:
-
None
–
Generator[PatientData, None, None]: Patient data
Source code in heartkit/datasets/ecg_synthetic.py
| @contextlib.contextmanager
def patient_data(self, patient_id: int) -> Generator[PatientData, None, None]:
"""Get access to patient data
Patient data contains following fields:
- data: ECG signal of shape (12, N)
- segmentations: Segmentation of ECG signal
- fiducials: Fiducials of ECG signal
Args:
patient_id (int): Patient ID
Returns:
Generator[PatientData, None, None]: Patient data
"""
pt_key = self.pt_key(patient_id)
if self.cacheable:
if pt_key not in self._cached_data:
self.build_cache()
yield self._cached_data[pt_key]
else:
pt_data = self.load_patient_data(patient_id)
yield pt_data
|
signal_generator
signal_generator(
patient_generator: PatientGenerator, frame_size: int, samples_per_patient: int = 1, target_rate: int | None = None
) -> Generator[npt.NDArray, None, None]
Generate frames using patient generator.
Parameters:
-
patient_generator
(PatientGenerator)
–
Generator that yields patient data.
-
frame_size
(int)
–
-
samples_per_patient
(int, default:
1
)
–
Samples per patient. Defaults to 1.
-
target_rate
(int | None, default:
None
)
–
Target rate. Defaults to None.
Returns:
-
SampleGenerator ( None
) –
Generator of input data of shape (frame_size, 1)
Source code in heartkit/datasets/ecg_synthetic.py
| def signal_generator(
self,
patient_generator: PatientGenerator,
frame_size: int,
samples_per_patient: int = 1,
target_rate: int | None = None,
) -> Generator[npt.NDArray, None, None]:
"""Generate frames using patient generator.
Args:
patient_generator (PatientGenerator): Generator that yields patient data.
frame_size (int): Frame size
samples_per_patient (int, optional): Samples per patient. Defaults to 1.
target_rate (int | None, optional): Target rate. Defaults to None.
Returns:
SampleGenerator: Generator of input data of shape (frame_size, 1)
"""
if target_rate is None:
target_rate = self.sampling_rate
input_size = int(np.ceil((self.sampling_rate / target_rate) * frame_size))
for pt in patient_generator:
with self.patient_data(pt) as h5:
data: h5py.Dataset = h5["data"][:]
# END WITH
for _ in range(samples_per_patient):
lead = random.choice(self.leads)
start = np.random.randint(0, data.shape[1] - input_size)
x = data[lead, start : start + input_size].squeeze()
x = np.nan_to_num(x).astype(np.float32)
x = self.add_noise(x)
if self.sampling_rate != target_rate:
x = pk.signal.resample_signal(x, self.sampling_rate, target_rate, axis=0)
x = x[:frame_size]
# END IF
yield x
|
download
download(num_workers: int | None = None, force: bool = False)
Download dataset
Parameters:
Source code in heartkit/datasets/ecg_synthetic.py
| def download(self, num_workers: int | None = None, force: bool = False):
"""Download dataset
Args:
num_workers (int | None, optional): # parallel workers. Defaults to None.
force (bool, optional): Force redownload. Defaults to False.
"""
|
close
Close dataset
Source code in heartkit/datasets/ecg_synthetic.py
| def close(self):
"""Close dataset"""
if self._noise_gen is not None:
self._noise_gen.close()
# END IF
self._cached_data.clear()
|
add_noise
Add noise to ECG signal.
Source code in heartkit/datasets/ecg_synthetic.py
| def add_noise(self, ecg: npt.NDArray):
"""Add noise to ECG signal."""
noise_range = self.params.noise_multiplier
if noise_range[0] == 0 and noise_range[1] == 0:
return ecg
noise_level = np.random.uniform(noise_range[0], noise_range[1])
if self._noise_gen is None:
self._noise_gen = NstdbNoise(target_rate=self.sampling_rate)
# END IF
self._noise_gen.apply_noise(ecg, noise_level)
return ecg
|