Module arus_components_hci.streams

Source code
from arus.core.stream import Stream
import PySimpleGUI as sg
from datetime import datetime
from arus.core.libs.date import parse_timestamp
import pandas as pd
from arus.core.libs.mhealth_format.data import segment_annotation
import threading


class AnnotatorGUIStream(Stream):
    """Annotator stream to asyncly load real-time annotations from an interactive GUI interface

    This class inherits `Stream` class to load annotations coming in real-time from an interactive GUI.

    Examples:
        1. Loading annotations every 10 seconds asynchronously from an interactive GUI as users select different annotation labels and print out each one instantly.

        ```python
        labels = ['Sitting', "Standing", "Walking", "Jumping", "Climbing", "Lying"]
        stream = AnnotatorGUIStream(labels,
                                    window_size=5, start_time=None)
        stream.start(scheduler='thread')
        for data in stream.get_iterator():
            print(data)
        ```
    """

    def __init__(self, labels, window_size, start_time=None, name='annotator-gui', scheduler='thread'):
        super().__init__(data_source=labels, window_size=window_size,
                         start_time=start_time, name=name, scheduler=scheduler)
        self._start_time = parse_timestamp(
            datetime.now()) if self._start_time is None else self._start_time
        self._annotations = {}

    def _init_annotator_gui(self, labels):
        layout = []
        row_layout = []
        i = 0
        for label in labels:
            i = i + 1
            if i % 5 == 0:
                layout.append(row_layout)
                row_layout = []
            label_button = sg.Button(button_text=label, key="LABEL_" + label,
                                     button_color=("white", "red"), font='Arial', size=(10, 2))
            row_layout.append(label_button)
            self._annotations[label_button.ButtonText] = {
                "START_TIME": [],
                "STOP_TIME": [],
                "Element": label_button
            }
        layout.append(row_layout)
        layout.append([sg.CloseButton('Close')])
        window = sg.Window("Annotator pad").layout(layout)
        while True:
            event, values = window.read()
            if event in (None, 'Close'):   # if user closes window or clicks cancel
                break
            if 'LABEL' in event:
                data = event.replace('LABEL_', '')
                self._update_button(data)
                self._update_annotations(data)

    def _send_data(self, st):
        while True:
            current_time = parse_timestamp(datetime.now())
            if current_time - st >= pd.Timedelta(self._window_size, 's'):
                # format annotatons
                data = {'HEADER_TIME_STAMP': [], 'START_TIME': [],
                        'STOP_TIME': [], 'LABEL_NAME': []}
                for label, times in self._annotations.items():
                    stop_time_list = times['STOP_TIME'].copy()
                    if len(times['START_TIME']) > len(stop_time_list):
                        stop_time_list.append(current_time)
                    for start_time, stop_time in zip(times['START_TIME'], stop_time_list):
                        data['HEADER_TIME_STAMP'].append(start_time)
                        data['START_TIME'].append(start_time)
                        data['STOP_TIME'].append(stop_time)
                        data['LABEL_NAME'].append(label)
                data = pd.DataFrame.from_dict(data, orient='columns')
                data = data.sort_values(by=['START_TIME'])
                if data.empty:
                    st = current_time
                    self._put_data_in_queue('No annotations available')
                    continue
                data = segment_annotation(
                    data, start_time=st, stop_time=current_time)
                if data.empty:
                    self._put_data_in_queue('No annotations available')
                else:
                    self._put_data_in_queue(data)
                st = current_time

    def _update_button(self, label):
        current_color = self._annotations[label]["Element"].ButtonColor
        if current_color[1] == "red":
            self._annotations[label]["Element"].Update(
                button_color=('white', 'green'))
        else:
            self._annotations[label]["Element"].Update(
                button_color=('white', 'red'))

    def _update_annotations(self, label):
        ongoing = len(self._annotations[label]['START_TIME']) > len(
            self._annotations[label]['STOP_TIME'])
        if ongoing:
            self._annotations[label]['STOP_TIME'].append(
                parse_timestamp(datetime.now()))
        else:
            self._annotations[label]['START_TIME'].append(
                parse_timestamp(datetime.now()))

    def load_(self, obj_toload):
        labels = obj_toload
        sender_thread = threading.Thread(
            target=self._send_data, name=self.name+"-sender", args=(self._start_time,))
        sender_thread.start()
        self._init_annotator_gui(labels)

Classes

class AnnotatorGUIStream (labels, window_size, start_time=None, name='annotator-gui', scheduler='thread')

Annotator stream to asyncly load real-time annotations from an interactive GUI interface

This class inherits Stream class to load annotations coming in real-time from an interactive GUI.

Examples

  1. Loading annotations every 10 seconds asynchronously from an interactive GUI as users select different annotation labels and print out each one instantly.
labels = ['Sitting', "Standing", "Walking", "Jumping", "Climbing", "Lying"]
stream = AnnotatorGUIStream(labels,
                            window_size=5, start_time=None)
stream.start(scheduler='thread')
for data in stream.get_iterator():
    print(data)

Args

data_source : object
An object that may be loaded into memory. The type of the object is decided by the implementation of subclass.
window_size : float
Number of seconds. Each data in the queue would be a short chunk of data lasting window_size seconds loaded from the data_source.
start_time : str or datetime or datetime64 or pandas.Timestamp, optional
The start time of data source. This is used to sync between multiple streams. If it is None, the default value would be extracted from the first sample of the loaded data.
name : str, optional
The name of the data stream will also be used as the name of the sub-thread that is used to load data. Defaults to 'default-stream'.
scheduler : str, optional
The scheduler used to load the data source. It can be either 'thread' or 'sync'. Defaults to 'thread'.
Source code
class AnnotatorGUIStream(Stream):
    """Annotator stream to asyncly load real-time annotations from an interactive GUI interface

    This class inherits `Stream` class to load annotations coming in real-time from an interactive GUI.

    Examples:
        1. Loading annotations every 10 seconds asynchronously from an interactive GUI as users select different annotation labels and print out each one instantly.

        ```python
        labels = ['Sitting', "Standing", "Walking", "Jumping", "Climbing", "Lying"]
        stream = AnnotatorGUIStream(labels,
                                    window_size=5, start_time=None)
        stream.start(scheduler='thread')
        for data in stream.get_iterator():
            print(data)
        ```
    """

    def __init__(self, labels, window_size, start_time=None, name='annotator-gui', scheduler='thread'):
        super().__init__(data_source=labels, window_size=window_size,
                         start_time=start_time, name=name, scheduler=scheduler)
        self._start_time = parse_timestamp(
            datetime.now()) if self._start_time is None else self._start_time
        self._annotations = {}

    def _init_annotator_gui(self, labels):
        layout = []
        row_layout = []
        i = 0
        for label in labels:
            i = i + 1
            if i % 5 == 0:
                layout.append(row_layout)
                row_layout = []
            label_button = sg.Button(button_text=label, key="LABEL_" + label,
                                     button_color=("white", "red"), font='Arial', size=(10, 2))
            row_layout.append(label_button)
            self._annotations[label_button.ButtonText] = {
                "START_TIME": [],
                "STOP_TIME": [],
                "Element": label_button
            }
        layout.append(row_layout)
        layout.append([sg.CloseButton('Close')])
        window = sg.Window("Annotator pad").layout(layout)
        while True:
            event, values = window.read()
            if event in (None, 'Close'):   # if user closes window or clicks cancel
                break
            if 'LABEL' in event:
                data = event.replace('LABEL_', '')
                self._update_button(data)
                self._update_annotations(data)

    def _send_data(self, st):
        while True:
            current_time = parse_timestamp(datetime.now())
            if current_time - st >= pd.Timedelta(self._window_size, 's'):
                # format annotatons
                data = {'HEADER_TIME_STAMP': [], 'START_TIME': [],
                        'STOP_TIME': [], 'LABEL_NAME': []}
                for label, times in self._annotations.items():
                    stop_time_list = times['STOP_TIME'].copy()
                    if len(times['START_TIME']) > len(stop_time_list):
                        stop_time_list.append(current_time)
                    for start_time, stop_time in zip(times['START_TIME'], stop_time_list):
                        data['HEADER_TIME_STAMP'].append(start_time)
                        data['START_TIME'].append(start_time)
                        data['STOP_TIME'].append(stop_time)
                        data['LABEL_NAME'].append(label)
                data = pd.DataFrame.from_dict(data, orient='columns')
                data = data.sort_values(by=['START_TIME'])
                if data.empty:
                    st = current_time
                    self._put_data_in_queue('No annotations available')
                    continue
                data = segment_annotation(
                    data, start_time=st, stop_time=current_time)
                if data.empty:
                    self._put_data_in_queue('No annotations available')
                else:
                    self._put_data_in_queue(data)
                st = current_time

    def _update_button(self, label):
        current_color = self._annotations[label]["Element"].ButtonColor
        if current_color[1] == "red":
            self._annotations[label]["Element"].Update(
                button_color=('white', 'green'))
        else:
            self._annotations[label]["Element"].Update(
                button_color=('white', 'red'))

    def _update_annotations(self, label):
        ongoing = len(self._annotations[label]['START_TIME']) > len(
            self._annotations[label]['STOP_TIME'])
        if ongoing:
            self._annotations[label]['STOP_TIME'].append(
                parse_timestamp(datetime.now()))
        else:
            self._annotations[label]['START_TIME'].append(
                parse_timestamp(datetime.now()))

    def load_(self, obj_toload):
        labels = obj_toload
        sender_thread = threading.Thread(
            target=self._send_data, name=self.name+"-sender", args=(self._start_time,))
        sender_thread.start()
        self._init_annotator_gui(labels)

Ancestors

  • arus.core.stream.Stream

Methods

def load_(self, obj_toload)

Implement this in the sub class.

You may use Stream._put_data_in_queue method to put the loaded data into the queue. Must use None as stop signal for the data queue iterator.

Raises

NotImplementedError
Must implement in subclass.
Source code
def load_(self, obj_toload):
    labels = obj_toload
    sender_thread = threading.Thread(
        target=self._send_data, name=self.name+"-sender", args=(self._start_time,))
    sender_thread.start()
    self._init_annotator_gui(labels)