PpgSyntheticDataset(
num_pts: int = 250, params: dict | None = None, path: str = Path(tempfile.gettempdir()) / "ppg-synthetic", **kwargs
)
PPG synthetic dataset creates 1-lead PPG signal using physioKIT.
Parameters:
-
num_pts
(int, default:
250
)
–
Number of patients. Defaults to 250.
-
params
(dict | None, default:
None
)
–
PPG synthetic parameters (PpgSyntheticParams). Defaults to None.
-
path
(str, default:
Path(gettempdir()) / 'ppg-synthetic'
)
–
Path to dataset. Defaults to Path(tempfile.gettempdir()) / "ppg-synthetic".
Example:
import heartkit as hk
# Create synthetic PPG dataset:
# - 10 patients
# - 500 Hz sample rate
# - 10 sec duration
# - heart rate between 40 and 120 bpm
# - frequency modulation between 0.2 and 0.4
# - IBI randomness between 0.05 and 0.15
# - no noise
ds = hk.datasets.PpgSyntheticDataset(
num_pts=10,
params=dict(
sample_rate=500,
duration=10,
heart_rate=(40, 120),
frequency_modulation=(0.2, 0.4),
ibi_randomness=(0.05, 0.15),
noise_multiplier=(0, 0),
)
)
with ds.patient_data[ds.patient_ids[0]] as pt:
ppg = pt["data"][:]
segs = pt["segmentations"][:]
fids = pt["fiducials"][:]
# END WITH
Source code in heartkit/datasets/ppg_synthetic.py
| def __init__(
self,
num_pts: int = 250,
params: dict | None = None,
path: str = Path(tempfile.gettempdir()) / "ppg-synthetic",
**kwargs,
) -> None:
"""PPG synthetic dataset creates 1-lead PPG signal using physioKIT.
Args:
num_pts (int, optional): Number of patients. Defaults to 250.
params (dict | None, optional): PPG synthetic parameters (PpgSyntheticParams). Defaults to None.
path (str, optional): Path to dataset. Defaults to Path(tempfile.gettempdir()) / "ppg-synthetic".
Example:
```python
import heartkit as hk
# Create synthetic PPG dataset:
# - 10 patients
# - 500 Hz sample rate
# - 10 sec duration
# - heart rate between 40 and 120 bpm
# - frequency modulation between 0.2 and 0.4
# - IBI randomness between 0.05 and 0.15
# - no noise
ds = hk.datasets.PpgSyntheticDataset(
num_pts=10,
params=dict(
sample_rate=500,
duration=10,
heart_rate=(40, 120),
frequency_modulation=(0.2, 0.4),
ibi_randomness=(0.05, 0.15),
noise_multiplier=(0, 0),
)
)
with ds.patient_data[ds.patient_ids[0]] as pt:
ppg = pt["data"][:]
segs = pt["segmentations"][:]
fids = pt["fiducials"][:]
# END WITH
```
"""
super().__init__(path=path, **kwargs)
self._noise_gen = None
self._num_pts = num_pts
self.params = PpgSyntheticParams(**params or {})
|
Attributes
patient_ids
property
Get dataset patient IDs
Returns:
Functions
get_train_patient_ids
get_train_patient_ids() -> npt.NDArray
Get dataset training patient IDs
Returns:
Source code in heartkit/datasets/ppg_synthetic.py
| def get_train_patient_ids(self) -> npt.NDArray:
"""Get dataset training patient IDs
Returns:
npt.NDArray: patient IDs
"""
numel = int(0.80 * self._num_pts)
return self.patient_ids[:numel]
|
get_test_patient_ids
get_test_patient_ids() -> npt.NDArray
Get dataset patient IDs reserved for testing only
Returns:
Source code in heartkit/datasets/ppg_synthetic.py
| def get_test_patient_ids(self) -> npt.NDArray:
"""Get dataset patient IDs reserved for testing only
Returns:
npt.NDArray: patient IDs
"""
numel = int(0.80 * self._num_pts)
return self.patient_ids[numel:]
|
pt_key
Get patient key
Source code in heartkit/datasets/ppg_synthetic.py
| def pt_key(self, patient_id: int):
"""Get patient key"""
return f"{patient_id:05d}"
|
build_cache
Build cache
Source code in heartkit/datasets/ppg_synthetic.py
| def build_cache(self):
"""Build cache"""
logger.info(f"Creating synthetic dataset cache with {self._num_pts} patients")
pts_data = process_map(self.load_patient_data, self.patient_ids, desc=f"Building {self.name} cache")
self._cached_data = {self.pt_key(i): pt_data for i, pt_data in enumerate(pts_data)}
|
patient_data
patient_data(patient_id: int) -> Generator[PatientData, None, None]
Get patient data
Parameters:
Returns:
-
None
–
Generator[PatientData, None, None]: Patient data
Source code in heartkit/datasets/ppg_synthetic.py
| @contextlib.contextmanager
def patient_data(self, patient_id: int) -> Generator[PatientData, None, None]:
"""Get patient data
Args:
patient_id (int): Patient ID
Returns:
Generator[PatientData, None, None]: Patient data
"""
pt_key = self.pt_key(patient_id)
if self.cacheable:
if pt_key not in self._cached_data:
self.build_cache()
yield self._cached_data[pt_key]
else:
pt_data = self.load_patient_data(patient_id)
yield pt_data
|
signal_generator
signal_generator(
patient_generator: PatientGenerator, frame_size: int, samples_per_patient: int = 1, target_rate: int | None = None
) -> Generator[npt.NDArray, None, None]
Generate frames using patient generator.
Parameters:
-
patient_generator
(PatientGenerator)
–
Generator that yields patient data.
-
frame_size
(int)
–
-
samples_per_patient
(int, default:
1
)
–
Samples per patient. Defaults to 1.
-
target_rate
(int | None, default:
None
)
–
Target rate. Defaults to None.
Returns:
-
SampleGenerator ( None
) –
Generator of input data of shape (frame_size, 1)
Source code in heartkit/datasets/ppg_synthetic.py
| def signal_generator(
self,
patient_generator: PatientGenerator,
frame_size: int,
samples_per_patient: int = 1,
target_rate: int | None = None,
) -> Generator[npt.NDArray, None, None]:
"""Generate frames using patient generator.
Args:
patient_generator (PatientGenerator): Generator that yields patient data.
frame_size (int): Frame size
samples_per_patient (int, optional): Samples per patient. Defaults to 1.
target_rate (int | None, optional): Target rate. Defaults to None.
Returns:
SampleGenerator: Generator of input data of shape (frame_size, 1)
"""
if target_rate is None:
target_rate = self.sampling_rate
input_size = int(np.ceil((self.sampling_rate / target_rate) * frame_size))
for pt in patient_generator:
with self.patient_data(pt) as h5:
data: h5py.Dataset = h5["data"][:]
# END WITH
for _ in range(samples_per_patient):
start = np.random.randint(0, data.shape[0] - input_size)
x = data[start : start + input_size].squeeze()
x = np.nan_to_num(x).astype(np.float32)
x = self.add_noise(x)
if self.sampling_rate != target_rate:
x = pk.signal.resample_signal(x, self.sampling_rate, target_rate, axis=0)
x = x[:frame_size]
# END IF
yield x
|
download
download(num_workers: int | None = None, force: bool = False)
Download dataset
Parameters:
Source code in heartkit/datasets/ppg_synthetic.py
| def download(self, num_workers: int | None = None, force: bool = False):
"""Download dataset
Args:
num_workers (int | None, optional): # parallel workers. Defaults to None.
force (bool, optional): Force redownload. Defaults to False.
"""
|
close
Close dataset
Source code in heartkit/datasets/ppg_synthetic.py
| def close(self):
"""Close dataset"""
if self._noise_gen is not None:
self._noise_gen.close()
# END IF
self._cached_data.clear()
|
add_noise
Add noise to PPG signal.
Source code in heartkit/datasets/ppg_synthetic.py
| def add_noise(self, ppg: npt.NDArray):
"""Add noise to PPG signal."""
noise_range = self.params.noise_multiplier
if noise_range[0] == 0 and noise_range[1] == 0:
return ppg
noise_level = np.random.uniform(noise_range[0], noise_range[1])
if self._noise_gen is None:
self._noise_gen = NstdbNoise(target_rate=self.sampling_rate)
# END IF
self._noise_gen.apply_noise(ppg, noise_level)
return ppg
|