Skip to content

mesa

Classes

MesaSleepStage

MESA sleep stages

MesaDataset

MesaDataset(target_rate: int = 128, **kwargs)

MESA dataset

Parameters:

  • target_rate (int, default: 128 ) –

    Target rate. Defaults to 128.

Source code in sleepkit/datasets/mesa.py
def __init__(
    self,
    target_rate: int = 128,
    **kwargs,
) -> None:
    """MESA dataset

    Args:
        target_rate (int, optional): Target rate. Defaults to 128.

    """

    super().__init__(**kwargs)
    # If last folder is not "mesa", then add it
    if self.path.parts[-1] != "mesa":
        self.path = self.path / "mesa"
    self.target_rate = target_rate

Attributes

subject_ids property
subject_ids: list[str]

Get dataset subject IDs

Returns:

  • list[str]

    list[str]: Subject IDs

train_subject_ids property
train_subject_ids: list[str]

Get train subject ids

test_subject_ids property
test_subject_ids: list[str]

Get test subject ids

actigraphy_signal_names property
actigraphy_signal_names: list[str]

Actigraphy signal names

psg_signal_names property
psg_signal_names: list[str]

PSG signal names

signal_names property
signal_names: list[str]

Signal names as they appear in the EDF files

Functions

uniform_subject_generator
uniform_subject_generator(
    subject_ids: list[str] | None = None, repeat: bool = True, shuffle: bool = True
) -> SubjectGenerator

Yield Subject IDs uniformly.

Parameters:

  • subject_ids (list[str], default: None ) –

    Array of subject ids. Defaults to None.

  • repeat (bool, default: True ) –

    Whether to repeat generator. Defaults to True.

  • shuffle (bool, default: True ) –

    Whether to shuffle subject ids. Defaults to True.

Returns:

  • SubjectGenerator ( SubjectGenerator ) –

    Subject generator

Source code in sleepkit/datasets/mesa.py
def uniform_subject_generator(
    self,
    subject_ids: list[str] | None = None,
    repeat: bool = True,
    shuffle: bool = True,
) -> SubjectGenerator:
    """Yield Subject IDs uniformly.

    Args:
        subject_ids (list[str], optional): Array of subject ids. Defaults to None.
        repeat (bool, optional): Whether to repeat generator. Defaults to True.
        shuffle (bool, optional): Whether to shuffle subject ids. Defaults to True.

    Returns:
        SubjectGenerator: Subject generator
    """
    if subject_ids is None:
        subject_ids = self.subject_ids

    for idx in nse.utils.uniform_id_generator(list(range(len(subject_ids))), repeat=repeat, shuffle=shuffle):
        subject_id = subject_ids[idx]
        yield (subject_id.decode("ascii") if isinstance(subject_id, bytes) else subject_id)
load_signal_for_subject
load_signal_for_subject(
    subject_id: str, signal_label: str, start: int = 0, data_size: int | None = None
) -> npt.NDArray[np.float32]

Load signal into memory for subject at target rate (resampling if needed) Args: subject_id (str): Subject ID signal_label (str): Signal label start (int): Start location @ target rate data_size (int): Data length @ target rate Returns: npt.NDArray[np.float32]: Signal

Source code in sleepkit/datasets/mesa.py
def load_signal_for_subject(
    self,
    subject_id: str,
    signal_label: str,
    start: int = 0,
    data_size: int | None = None,
) -> npt.NDArray[np.float32]:
    """Load signal into memory for subject at target rate (resampling if needed)
    Args:
        subject_id (str): Subject ID
        signal_label (str): Signal label
        start (int): Start location @ target rate
        data_size (int): Data length @ target rate
    Returns:
        npt.NDArray[np.float32]: Signal
    """
    if signal_label in self.actigraphy_signal_names:
        return self._load_actigraphy_signal_for_subject(subject_id, signal_label, start, data_size)

    with pyedflib.EdfReader(self._get_subject_edf_path(subject_id)) as fp:
        signal_labels = fp.getSignalLabels()
        signal_idx = signal_labels.index(signal_label)
        sample_rate = fp.samplefrequency(signal_idx)
        sig_start = round(start * (sample_rate / self.target_rate))
        sig_len = fp.getNSamples()
        sig_duration = sig_len if data_size is None else math.ceil(data_size * (sample_rate / self.target_rate))
        signal = fp.readSignal(signal_idx, sig_start, sig_duration, digital=False).astype(np.float32)
    # END WITH
    if sample_rate != self.target_rate:
        signal = pk.signal.resample_signal(signal, sample_rate, self.target_rate)
    if data_size is None:
        return signal
    return signal[:data_size]
extract_sleep_events
extract_sleep_events(subject_id: str) -> set[str]

Extract sleep apnea events for subject Args: subject_id (str): Subject ID Returns: list[tuple[int, float, float]]: Apnea events (apnea, start_time, duration)

Source code in sleepkit/datasets/mesa.py
def extract_sleep_events(self, subject_id: str) -> set[str]:
    """Extract sleep apnea events for subject
    Args:
        subject_id (str): Subject ID
    Returns:
        list[tuple[int, float, float]]: Apnea events (apnea, start_time, duration)
    """

    def get_first_element_by_tag_name(element: XmlElement, tag_name: str) -> XmlNode | None:
        """Get first element matching tag name"""
        elements = element.getElementsByTagName(tag_name)
        return elements[0] if elements else None

    def has_element_by_tag_name(element: XmlElement, tag_name: str) -> bool:
        """Check if element has child element matching tag name"""
        return bool(get_first_element_by_tag_name(element, tag_name))

    def element_has_node_value(element: XmlElement, node_value) -> bool:
        """Check if element has child node with value"""
        return any((node for node in element.childNodes if node.nodeValue == node_value))

    def is_apnea_event(event: XmlElement) -> bool:
        """Determine if event is an apnea event"""
        event_type = get_first_element_by_tag_name(event, "EventType")
        return all(
            (
                event_type is not None,
                element_has_node_value(event_type, "Respiratory|Respiratory"),
                has_element_by_tag_name(event, "EventConcept"),
                has_element_by_tag_name(event, "Duration"),
                has_element_by_tag_name(event, "Start"),
            )
        )

    xml_path = self._get_subject_xml_path(subject_id=subject_id)
    doc = xml_parse(xml_path)
    events = doc.getElementsByTagName("ScoredEvent")
    events = [event for event in events if is_apnea_event(event)]
    event_labels = set()
    for event in events:
        event_label: str = get_first_element_by_tag_name(event, "EventConcept").childNodes[0].nodeValue
        event_labels.add(event_label)
    return event_labels
extract_sleep_apneas
extract_sleep_apneas(subject_id: str) -> list[tuple[int, float, float]]

Extract sleep apnea events for subject Args: subject_id (str): Subject ID Returns: list[tuple[int, float, float]]: Apnea events (apnea, start_time, duration)

Source code in sleepkit/datasets/mesa.py
def extract_sleep_apneas(self, subject_id: str) -> list[tuple[int, float, float]]:
    """Extract sleep apnea events for subject
    Args:
        subject_id (str): Subject ID
    Returns:
        list[tuple[int, float, float]]: Apnea events (apnea, start_time, duration)
    """

    def get_first_element_by_tag_name(element: XmlElement, tag_name: str) -> XmlNode | None:
        """Get first element matching tag name"""
        elements = element.getElementsByTagName(tag_name)
        return elements[0] if elements else None

    def has_element_by_tag_name(element: XmlElement, tag_name: str) -> bool:
        """Check if element has child element matching tag name"""
        return bool(get_first_element_by_tag_name(element, tag_name))

    def element_has_node_value(element: XmlElement, node_value) -> bool:
        """Check if element has child node with value"""
        return any((node for node in element.childNodes if node.nodeValue == node_value))

    def is_apnea_event(event: XmlElement) -> bool:
        """Determine if event is an apnea event"""
        event_type = get_first_element_by_tag_name(event, "EventType")
        return all(
            (
                event_type is not None,
                element_has_node_value(event_type, "Respiratory|Respiratory"),
                has_element_by_tag_name(event, "EventConcept"),
                has_element_by_tag_name(event, "Duration"),
                has_element_by_tag_name(event, "Start"),
            )
        )

    apnea_label_map = {
        "Hypopnea|Hypopnea": SleepApnea.hypopnea,  # Hypopnea refers to hypopnea w/ >30% reduction in airflow
        "Unsure|Unsure": SleepApnea.hypopnea,  # Unsure refers to hypopnea w/ >50% reduction in airflow
        "Central apnea|Central Apnea": SleepApnea.central,
        "Obstructive apnea|Obstructive Apnea": SleepApnea.obstructive,
        "Mixed apnea|Mixed Apnea": SleepApnea.mixed,
    }

    xml_path = self._get_subject_xml_path(subject_id=subject_id)
    doc = xml_parse(xml_path)
    events = doc.getElementsByTagName("ScoredEvent")
    events = [event for event in events if is_apnea_event(event)]
    apneas = []
    for event in events:
        event_label = get_first_element_by_tag_name(event, "EventConcept").childNodes[0].nodeValue
        start_time = float(get_first_element_by_tag_name(event, "Start").childNodes[0].nodeValue)
        duration = float(get_first_element_by_tag_name(event, "Duration").childNodes[0].nodeValue)
        apnea = apnea_label_map.get(event_label, SleepApnea.none)
        apneas.append((apnea, start_time, duration))
    return apneas
extract_sleep_stages
extract_sleep_stages(subject_id: str) -> list[tuple[int, float, float]]

Extract sleep stages for subject Args: subject_id (str): Subject ID Returns: list[tuple[int, float, float]]: Sleep stages (stage, start_time, duration)

Source code in sleepkit/datasets/mesa.py
def extract_sleep_stages(self, subject_id: str) -> list[tuple[int, float, float]]:
    """Extract sleep stages for subject
    Args:
        subject_id (str): Subject ID
    Returns:
        list[tuple[int, float, float]]: Sleep stages (stage, start_time, duration)
    """

    def get_first_element_by_tag_name(element: XmlElement, tag_name: str) -> XmlNode | None:
        """Get first element matching tag name"""
        elements = element.getElementsByTagName(tag_name)
        return elements[0] if elements else None

    def has_element_by_tag_name(element: XmlElement, tag_name: str) -> bool:
        """Check if element has child element matching tag name"""
        return bool(get_first_element_by_tag_name(element, tag_name))

    def element_has_node_value(element: XmlElement, node_value):
        """Check if element has child node with value"""
        return any((node for node in element.childNodes if node.nodeValue == node_value))

    def is_sleep_stage_event(event: XmlElement) -> bool:
        """Check if event is a sleep stage event"""
        event_type = get_first_element_by_tag_name(event, "EventType")
        return all(
            (
                event_type is not None,
                element_has_node_value(event_type, "Stages|Stages"),
                has_element_by_tag_name(event, "EventConcept"),
                has_element_by_tag_name(event, "Duration"),
                has_element_by_tag_name(event, "Start"),
            )
        )

    stage_label_map = {
        0: SleepStage.wake,
        1: SleepStage.stage1,
        2: SleepStage.stage2,
        3: SleepStage.stage3,
        4: SleepStage.stage4,
        5: SleepStage.rem,
        6: SleepStage.noise,
        9: SleepStage.noise,
    }
    xml_path = self._get_subject_xml_path(subject_id=subject_id)
    doc = xml_parse(xml_path)
    events = doc.getElementsByTagName("ScoredEvent")
    events = [event for event in events if is_sleep_stage_event(event)]
    sleep_stages: list[tuple[int, float, float]] = []
    for event in events:
        stage_label = get_first_element_by_tag_name(event, "EventConcept").childNodes[0].nodeValue
        start_time = float(get_first_element_by_tag_name(event, "Start").childNodes[0].nodeValue)
        duration = float(get_first_element_by_tag_name(event, "Duration").childNodes[0].nodeValue)
        sleep_stage = stage_label_map.get(int(stage_label.split("|")[-1]), 0)
        sleep_stages.append((sleep_stage, start_time, duration))
    return sleep_stages
get_subject_duration
get_subject_duration(subject_id: str) -> float

Get subject duration in seconds

Source code in sleepkit/datasets/mesa.py
def get_subject_duration(self, subject_id: str) -> float:
    """Get subject duration in seconds"""
    with pyedflib.EdfReader(self._get_subject_edf_path(subject_id)) as fp:
        # return int(min(fp.getNSamples()/[fp.samplefrequency(i) for i in range(fp.signals_in_file)]))
        return fp.getFileDuration()
sleep_stages_to_mask
sleep_stages_to_mask(sleep_stages: list[tuple[int, float, float]], data_size: int) -> npt.NDArray[np.int32]

Convert sleep stages to mask array Args: sleep_stages (list[tuple[int, float, float]]): Sleep stages data_size (int): Data size Returns: npt.NDArray[np.int32]: Sleep mask

Source code in sleepkit/datasets/mesa.py
def sleep_stages_to_mask(
    self, sleep_stages: list[tuple[int, float, float]], data_size: int
) -> npt.NDArray[np.int32]:
    """Convert sleep stages to mask array
    Args:
        sleep_stages (list[tuple[int, float, float]]): Sleep stages
        data_size (int): Data size
    Returns:
        npt.NDArray[np.int32]: Sleep mask
    """
    sleep_mask = np.zeros(data_size, dtype=np.int32)
    for sleep_stage, start_time, duration in sleep_stages:
        left_idx = int(self.target_rate * start_time)
        right_idx = left_idx + int(self.target_rate * duration)
        sleep_mask[left_idx : right_idx + 1] = sleep_stage
    # END FOR
    return sleep_mask
apnea_events_to_mask
apnea_events_to_mask(apnea_events: list[tuple[int, float, float]], data_size: int) -> npt.NDArray[np.int32]

Convert apnea events to mask array Args: apnea_events (list[tuple[int, float, float]]): Apnea events data_size (int): Data size Returns: npt.NDArray[np.int32]: Apnea mask

Source code in sleepkit/datasets/mesa.py
def apnea_events_to_mask(
    self, apnea_events: list[tuple[int, float, float]], data_size: int
) -> npt.NDArray[np.int32]:
    """Convert apnea events to mask array
    Args:
        apnea_events (list[tuple[int, float, float]]): Apnea events
        data_size (int): Data size
    Returns:
        npt.NDArray[np.int32]: Apnea mask
    """
    apnea_mask = np.zeros(data_size, dtype=np.int32)
    for apnea_event, start_time, duration in apnea_events:
        left_idx = int(self.target_rate * start_time)
        right_idx = left_idx + int(self.target_rate * duration)
        apnea_mask[left_idx : right_idx + 1] = apnea_event
    # END FOR
    return apnea_mask
download
download(num_workers: int | None = None, force: bool = False)

Download STAGES dataset from the NSRR website.

Parameters:

  • num_workers (int | None, default: None ) –
    parallel workers. Defaults to None.
  • force (bool, default: False ) –

    Force redownload. Defaults to False.

Source code in sleepkit/datasets/mesa.py
def download(self, num_workers: int | None = None, force: bool = False):
    """Download STAGES dataset from the NSRR website.

    Args:
        num_workers (int | None, optional): # parallel workers. Defaults to None.
        force (bool, optional): Force redownload. Defaults to False.
    """

    download_nsrr(
        db_slug=self.path.stem,
        subfolder="",
        pattern="*",
        data_dir=self.path.parent,
        checksum_type="size",
        num_workers=num_workers,
    )

Functions