Module arus_stream_metawear.stream

Source code
import logging
import time

import pandas as pd
import numpy as np
from arus.core.stream import Stream
from mbientlab.metawear import cbindings, libmetawear
from pymetawear.client import MetaWearClient

from .corrector import MetawearTimestampCorrector


class MetawearStream(Stream):
    """Data stream to syncly or asyncly load real-time data from a metawear device

    This class inherits `Stream` class to load metawear device data coming in real-time.

    Examples:
        1. Loading a metawear device as dataframe every 10 seconds asynchronously and print out the head of each one.

        ```python
        addr = "D2:C6:AF:2B:DB:22"
        stream = MetawearStream(data_source=addr, sr=50,
                                grange=8, window_size=10, start_time=None, name='metawear-stream')
        stream.start(scheduler='thread')
        for data in stream.get_iterator():
            print(data.head())
        ```
    """

    def __init__(self, data_source, sr=50, grange=8, window_size=10, start_time=None, name='metawear-stream'):
        """
        Args:
            data_source (str): mac address of the metawear device.
            sr (int): the sampling rate (Hz) to be set for the given device.
            grange (int): the dynamice range (g) to be set for the given device.
            name (str, optional): see `Stream.name`.
        """
        super().__init__(data_source=data_source, name=name,
                         start_time=start_time, window_size=window_size)
        self._sr = sr
        self._grange = grange
        self._device = None
        self._corrector = MetawearTimestampCorrector(sr)
        self._chunk_buffer = []
        self._chunk_boundary = np.floor(self._start_time.timestamp())
        self._chunk_sample_count = 0
        if self._chunk_boundary is not None:
            logging.info('Use session begin time:' +
                         self._start_time.strftime("%Y-%m-%d %H:%M:%S"))

    def get_device_name(self):
        model_code = libmetawear.mbl_mw_metawearboard_get_model(
            self._device.mw.board)
        metawear_models = cbindings.Model()
        model_names = list(
            filter(lambda attr: '__' not in attr, dir(metawear_models)))
        for name in model_names:
            if getattr(metawear_models, name) == model_code:
                return name
        return 'NA'

    def _setup_metawear(self, addr):
        try:
            self._device = MetaWearClient(addr, connect=True, debug=False)
            self._device_name = self.get_device_name()
        except Exception as e:
            logging.error(str(e))
            logging.info('Retry connect to ' + addr)
            time.sleep(1)
        logging.info("New metawear connected: {0}".format(
            self._device))
        # high frequency throughput connection setup
        self._device.settings.set_connection_parameters(
            7.5, 7.5, 0, 6000)
        # Up to 4dB for Class 2 BLE devices
        # https://github.com/hbldh/pymetawear/blob/master/pymetawear/modules/settings.py
        # https://mbientlab.com/documents/metawear/cpp/0/settings_8h.html#a335f712d5fc0587eff9671b8b105d3ed
        # Hossain AKMM, Soh WS. A comprehensive study of Bluetooth signal parameters for localization. 2007 Ieee 18th International Symposium on Personal, Indoor and Mobile Radio Communications, Vols 1-9. 2007:428-32.
        self._device.settings.set_tx_power(power=4)

        self._device.accelerometer.set_settings(
            data_rate=self._sr, data_range=self._grange)
        self._device.accelerometer.high_frequency_stream = True

    def _start_metawear(self):
        def _start():
            logging.info('starting accelerometer module...')
            self._device.accelerometer.notifications(
                callback=self._pack_and_send_data)
        _start()
        return self

    def _pack_and_send_data(self, data):
        package = self._pack_data(data)
        if self._chunk_boundary is None:
            self._chunk_boundary = np.floor(package['ts_withloss'])
        if package['ts_withloss'] < self._chunk_boundary:
            # discard samples in the past
            return
        if package['ts_withloss'] - self._chunk_boundary >= self._window_size and len(self._chunk_buffer) == 0:
            # adjust chunk window to match the closest sample
            n_chunks = int((package['ts_withloss'] -
                            self._chunk_boundary) / self._window_size)
            self._chunk_boundary += n_chunks * self._window_size
        if package['ts_withloss'] - self._chunk_boundary >= self._window_size:
            df = self._format_chunk(self._chunk_buffer)
            self._chunk_boundary += self._window_size
            self._chunk_buffer = []
            self._put_data_in_queue(df)
        package['ts_withloss'] = pd.Timestamp.fromtimestamp(
            package['ts_withloss'])
        package['ts_noloss'] = pd.Timestamp.fromtimestamp(
            package['ts_noloss'])
        package['ts_nofix'] = pd.Timestamp.fromtimestamp(
            package['ts_nofix'])
        package['ts_realworld'] = pd.Timestamp.fromtimestamp(
            package['ts_realworld'])
        self._chunk_buffer.append(package)

    def _calibrate_coord_system(self, data):
        # axis values are calibrated according to the coordinate system of Actigraph GT9X
        # http://www.correctline.pl/wp-content/uploads/2015/01/ActiGraph_Device_Axes_Link.png
        x = data['value'].x
        y = data['value'].y
        z = data['value'].z
        if self._device_name == 'METAMOTION_R':
            # as normal wear in the case on wrist
            calibrated_x = y
            calibrated_y = -x
            calibrated_z = z
        else:
            calibrated_x = x
            calibrated_y = y
            calibrated_z = z
        return (calibrated_x, calibrated_y, calibrated_z)

    def _pack_data(self, data):
        real_world_ts = time.time()
        ts_set = self._corrector.correct(data, real_world_ts)
        calibrated_values = self._calibrate_coord_system(data)
        package = {
            'index': self._chunk_sample_count,
            'mac_address': self._data_source,
            'stream_name': self.name,
            'device_name': self._device_name,
            'data_type': "accel",
            'ts_realworld': real_world_ts,
            'ts_nofix': ts_set[0],
            'ts_noloss': ts_set[1],
            'ts_withloss': ts_set[2],
            'x': calibrated_values[0],
            'y': calibrated_values[1],
            'z': calibrated_values[2]}
        self._chunk_sample_count += 1
        return package

    def _format_chunk(self, chunk_buffer):
        df = pd.DataFrame(data=self._chunk_buffer)
        df = df[['ts_withloss', 'x', 'y', 'z',
                 'index', 'mac_address', 'stream_name', 'device_name', 'data_type', 'ts_realworld', 'ts_nofix', 'ts_noloss']]
        df.columns = ['HEADER_TIME_STAMP', 'X', 'Y', 'Z', 'INDEX', 'SOURCE', 'STREAM_NAME',
                      'SENSOR_TYPE', 'DATA_TYPE', 'TIME_STAMP_REAL', 'TIME_STAMP_ORIGINAL', 'TIME_STAMP_NOLOSS']
        df.insert(len(df.columns), 'TIME_STAMP_CHUNK_BEGIN',
                  pd.Timestamp.fromtimestamp(self._chunk_boundary))
        return df

    def _load_metawear(self, addr):
        self._setup_metawear(addr)
        self._start_metawear()

    def load_(self, obj_toload):
        if isinstance(obj_toload, str):
            addr = obj_toload
            self._load_metawear(addr)
        else:
            raise RuntimeError(
                "Data source should be the mac address of the metawear device")

Classes

class MetawearStream (data_source, sr=50, grange=8, window_size=10, start_time=None, name='metawear-stream')

Data stream to syncly or asyncly load real-time data from a metawear device

This class inherits Stream class to load metawear device data coming in real-time.

Examples

  1. Loading a metawear device as dataframe every 10 seconds asynchronously and print out the head of each one.
addr = "D2:C6:AF:2B:DB:22"
stream = MetawearStream(data_source=addr, sr=50,
                        grange=8, window_size=10, start_time=None, name='metawear-stream')
stream.start(scheduler='thread')
for data in stream.get_iterator():
    print(data.head())

Args

data_source : str
mac address of the metawear device.
sr : int
the sampling rate (Hz) to be set for the given device.
grange : int
the dynamice range (g) to be set for the given device.
name : str, optional
see Stream.name.
Source code
class MetawearStream(Stream):
    """Data stream to syncly or asyncly load real-time data from a metawear device

    This class inherits `Stream` class to load metawear device data coming in real-time.

    Examples:
        1. Loading a metawear device as dataframe every 10 seconds asynchronously and print out the head of each one.

        ```python
        addr = "D2:C6:AF:2B:DB:22"
        stream = MetawearStream(data_source=addr, sr=50,
                                grange=8, window_size=10, start_time=None, name='metawear-stream')
        stream.start(scheduler='thread')
        for data in stream.get_iterator():
            print(data.head())
        ```
    """

    def __init__(self, data_source, sr=50, grange=8, window_size=10, start_time=None, name='metawear-stream'):
        """
        Args:
            data_source (str): mac address of the metawear device.
            sr (int): the sampling rate (Hz) to be set for the given device.
            grange (int): the dynamice range (g) to be set for the given device.
            name (str, optional): see `Stream.name`.
        """
        super().__init__(data_source=data_source, name=name,
                         start_time=start_time, window_size=window_size)
        self._sr = sr
        self._grange = grange
        self._device = None
        self._corrector = MetawearTimestampCorrector(sr)
        self._chunk_buffer = []
        self._chunk_boundary = np.floor(self._start_time.timestamp())
        self._chunk_sample_count = 0
        if self._chunk_boundary is not None:
            logging.info('Use session begin time:' +
                         self._start_time.strftime("%Y-%m-%d %H:%M:%S"))

    def get_device_name(self):
        model_code = libmetawear.mbl_mw_metawearboard_get_model(
            self._device.mw.board)
        metawear_models = cbindings.Model()
        model_names = list(
            filter(lambda attr: '__' not in attr, dir(metawear_models)))
        for name in model_names:
            if getattr(metawear_models, name) == model_code:
                return name
        return 'NA'

    def _setup_metawear(self, addr):
        try:
            self._device = MetaWearClient(addr, connect=True, debug=False)
            self._device_name = self.get_device_name()
        except Exception as e:
            logging.error(str(e))
            logging.info('Retry connect to ' + addr)
            time.sleep(1)
        logging.info("New metawear connected: {0}".format(
            self._device))
        # high frequency throughput connection setup
        self._device.settings.set_connection_parameters(
            7.5, 7.5, 0, 6000)
        # Up to 4dB for Class 2 BLE devices
        # https://github.com/hbldh/pymetawear/blob/master/pymetawear/modules/settings.py
        # https://mbientlab.com/documents/metawear/cpp/0/settings_8h.html#a335f712d5fc0587eff9671b8b105d3ed
        # Hossain AKMM, Soh WS. A comprehensive study of Bluetooth signal parameters for localization. 2007 Ieee 18th International Symposium on Personal, Indoor and Mobile Radio Communications, Vols 1-9. 2007:428-32.
        self._device.settings.set_tx_power(power=4)

        self._device.accelerometer.set_settings(
            data_rate=self._sr, data_range=self._grange)
        self._device.accelerometer.high_frequency_stream = True

    def _start_metawear(self):
        def _start():
            logging.info('starting accelerometer module...')
            self._device.accelerometer.notifications(
                callback=self._pack_and_send_data)
        _start()
        return self

    def _pack_and_send_data(self, data):
        package = self._pack_data(data)
        if self._chunk_boundary is None:
            self._chunk_boundary = np.floor(package['ts_withloss'])
        if package['ts_withloss'] < self._chunk_boundary:
            # discard samples in the past
            return
        if package['ts_withloss'] - self._chunk_boundary >= self._window_size and len(self._chunk_buffer) == 0:
            # adjust chunk window to match the closest sample
            n_chunks = int((package['ts_withloss'] -
                            self._chunk_boundary) / self._window_size)
            self._chunk_boundary += n_chunks * self._window_size
        if package['ts_withloss'] - self._chunk_boundary >= self._window_size:
            df = self._format_chunk(self._chunk_buffer)
            self._chunk_boundary += self._window_size
            self._chunk_buffer = []
            self._put_data_in_queue(df)
        package['ts_withloss'] = pd.Timestamp.fromtimestamp(
            package['ts_withloss'])
        package['ts_noloss'] = pd.Timestamp.fromtimestamp(
            package['ts_noloss'])
        package['ts_nofix'] = pd.Timestamp.fromtimestamp(
            package['ts_nofix'])
        package['ts_realworld'] = pd.Timestamp.fromtimestamp(
            package['ts_realworld'])
        self._chunk_buffer.append(package)

    def _calibrate_coord_system(self, data):
        # axis values are calibrated according to the coordinate system of Actigraph GT9X
        # http://www.correctline.pl/wp-content/uploads/2015/01/ActiGraph_Device_Axes_Link.png
        x = data['value'].x
        y = data['value'].y
        z = data['value'].z
        if self._device_name == 'METAMOTION_R':
            # as normal wear in the case on wrist
            calibrated_x = y
            calibrated_y = -x
            calibrated_z = z
        else:
            calibrated_x = x
            calibrated_y = y
            calibrated_z = z
        return (calibrated_x, calibrated_y, calibrated_z)

    def _pack_data(self, data):
        real_world_ts = time.time()
        ts_set = self._corrector.correct(data, real_world_ts)
        calibrated_values = self._calibrate_coord_system(data)
        package = {
            'index': self._chunk_sample_count,
            'mac_address': self._data_source,
            'stream_name': self.name,
            'device_name': self._device_name,
            'data_type': "accel",
            'ts_realworld': real_world_ts,
            'ts_nofix': ts_set[0],
            'ts_noloss': ts_set[1],
            'ts_withloss': ts_set[2],
            'x': calibrated_values[0],
            'y': calibrated_values[1],
            'z': calibrated_values[2]}
        self._chunk_sample_count += 1
        return package

    def _format_chunk(self, chunk_buffer):
        df = pd.DataFrame(data=self._chunk_buffer)
        df = df[['ts_withloss', 'x', 'y', 'z',
                 'index', 'mac_address', 'stream_name', 'device_name', 'data_type', 'ts_realworld', 'ts_nofix', 'ts_noloss']]
        df.columns = ['HEADER_TIME_STAMP', 'X', 'Y', 'Z', 'INDEX', 'SOURCE', 'STREAM_NAME',
                      'SENSOR_TYPE', 'DATA_TYPE', 'TIME_STAMP_REAL', 'TIME_STAMP_ORIGINAL', 'TIME_STAMP_NOLOSS']
        df.insert(len(df.columns), 'TIME_STAMP_CHUNK_BEGIN',
                  pd.Timestamp.fromtimestamp(self._chunk_boundary))
        return df

    def _load_metawear(self, addr):
        self._setup_metawear(addr)
        self._start_metawear()

    def load_(self, obj_toload):
        if isinstance(obj_toload, str):
            addr = obj_toload
            self._load_metawear(addr)
        else:
            raise RuntimeError(
                "Data source should be the mac address of the metawear device")

Ancestors

  • arus.core.stream.Stream

Methods

def get_device_name(self)
Source code
def get_device_name(self):
    model_code = libmetawear.mbl_mw_metawearboard_get_model(
        self._device.mw.board)
    metawear_models = cbindings.Model()
    model_names = list(
        filter(lambda attr: '__' not in attr, dir(metawear_models)))
    for name in model_names:
        if getattr(metawear_models, name) == model_code:
            return name
    return 'NA'
def load_(self, obj_toload)

Implement this in the sub class.

You may use Stream._put_data_in_queue method to put the loaded data into the queue. Must use None as stop signal for the data queue iterator.

Raises

NotImplementedError
Must implement in subclass.
Source code
def load_(self, obj_toload):
    if isinstance(obj_toload, str):
        addr = obj_toload
        self._load_metawear(addr)
    else:
        raise RuntimeError(
            "Data source should be the mac address of the metawear device")