Module arus_stream_metawear.corrector

Source code
class MetawearTimestampCorrector(object):
    def __init__(self, sr):
        self._current_nofix_ts = None
        self._current_noloss_ts = None
        self._current_withloss_ts = None
        self._sample_interval = 1.0 / sr
        self._real_world_offset = None
        self._last_real_world_ts = None

    def _get_current_nofix_ts_in_seconds(self, data):
        return data['epoch'] / 1000.0

    def _get_real_world_offset(self, data, current_real_world_ts):
        return current_real_world_ts - \
            self._get_current_nofix_ts_in_seconds(data)

    def _apply_no_fix(self, data):
        return self._get_current_nofix_ts_in_seconds(data) + self._real_world_offset

    def _apply_fix_noloss(self, data, previous_noloss_ts):
        if previous_noloss_ts == None:
            current_noloss_ts = self._apply_no_fix(data)
        else:
            current_noloss_ts = previous_noloss_ts + self._sample_interval
        return current_noloss_ts

    def _apply_fix_withloss(self, data, previous_withloss_ts, current_real_world_ts, previous_real_world_ts):
        if previous_withloss_ts == None:
            current_withloss_ts = self._apply_fix_noloss(
                data, previous_withloss_ts)
        elif current_real_world_ts - previous_real_world_ts <= 2 * self._sample_interval:
            current_withloss_ts = self._apply_fix_noloss(
                data, previous_withloss_ts)
        elif abs(previous_withloss_ts - self._get_current_nofix_ts_in_seconds(data)) < 2 * self._sample_interval:
            # if there is no loss occuring
            current_withloss_ts = self._apply_fix_noloss(
                data, previous_withloss_ts)
        else:
            # data loss occurs
            current_nofix_ts = self._apply_no_fix(data)
            current_noloss_ts = self._apply_fix_noloss(
                data, previous_withloss_ts)
            if current_nofix_ts - current_noloss_ts > 2 * self._sample_interval:
                # late for more than two intervals, renew timestamp
                current_withloss_ts = current_nofix_ts
            else:
                current_withloss_ts = current_noloss_ts
        return current_withloss_ts

    def correct(self, data, current_real_world_ts):
        if self._real_world_offset == None:
            self._real_world_offset = self._get_real_world_offset(
                data, current_real_world_ts)
        if self._last_real_world_ts == None:
            self._last_real_world_ts = current_real_world_ts
        self._current_nofix_ts = self._apply_no_fix(data)
        self._current_noloss_ts = self._apply_fix_noloss(
            data, self._current_noloss_ts)
        self._current_withloss_ts = self._apply_fix_withloss(
            data, self._current_withloss_ts, current_real_world_ts, self._last_real_world_ts)
        diff_rw = current_real_world_ts - self._last_real_world_ts
        self._last_real_world_ts = current_real_world_ts
        return self._current_nofix_ts, self._current_noloss_ts, self._current_withloss_ts, diff_rw

Classes

class MetawearTimestampCorrector (sr)
Source code
class MetawearTimestampCorrector(object):
    def __init__(self, sr):
        self._current_nofix_ts = None
        self._current_noloss_ts = None
        self._current_withloss_ts = None
        self._sample_interval = 1.0 / sr
        self._real_world_offset = None
        self._last_real_world_ts = None

    def _get_current_nofix_ts_in_seconds(self, data):
        return data['epoch'] / 1000.0

    def _get_real_world_offset(self, data, current_real_world_ts):
        return current_real_world_ts - \
            self._get_current_nofix_ts_in_seconds(data)

    def _apply_no_fix(self, data):
        return self._get_current_nofix_ts_in_seconds(data) + self._real_world_offset

    def _apply_fix_noloss(self, data, previous_noloss_ts):
        if previous_noloss_ts == None:
            current_noloss_ts = self._apply_no_fix(data)
        else:
            current_noloss_ts = previous_noloss_ts + self._sample_interval
        return current_noloss_ts

    def _apply_fix_withloss(self, data, previous_withloss_ts, current_real_world_ts, previous_real_world_ts):
        if previous_withloss_ts == None:
            current_withloss_ts = self._apply_fix_noloss(
                data, previous_withloss_ts)
        elif current_real_world_ts - previous_real_world_ts <= 2 * self._sample_interval:
            current_withloss_ts = self._apply_fix_noloss(
                data, previous_withloss_ts)
        elif abs(previous_withloss_ts - self._get_current_nofix_ts_in_seconds(data)) < 2 * self._sample_interval:
            # if there is no loss occuring
            current_withloss_ts = self._apply_fix_noloss(
                data, previous_withloss_ts)
        else:
            # data loss occurs
            current_nofix_ts = self._apply_no_fix(data)
            current_noloss_ts = self._apply_fix_noloss(
                data, previous_withloss_ts)
            if current_nofix_ts - current_noloss_ts > 2 * self._sample_interval:
                # late for more than two intervals, renew timestamp
                current_withloss_ts = current_nofix_ts
            else:
                current_withloss_ts = current_noloss_ts
        return current_withloss_ts

    def correct(self, data, current_real_world_ts):
        if self._real_world_offset == None:
            self._real_world_offset = self._get_real_world_offset(
                data, current_real_world_ts)
        if self._last_real_world_ts == None:
            self._last_real_world_ts = current_real_world_ts
        self._current_nofix_ts = self._apply_no_fix(data)
        self._current_noloss_ts = self._apply_fix_noloss(
            data, self._current_noloss_ts)
        self._current_withloss_ts = self._apply_fix_withloss(
            data, self._current_withloss_ts, current_real_world_ts, self._last_real_world_ts)
        diff_rw = current_real_world_ts - self._last_real_world_ts
        self._last_real_world_ts = current_real_world_ts
        return self._current_nofix_ts, self._current_noloss_ts, self._current_withloss_ts, diff_rw

Methods

def correct(self, data, current_real_world_ts)
Source code
def correct(self, data, current_real_world_ts):
    if self._real_world_offset == None:
        self._real_world_offset = self._get_real_world_offset(
            data, current_real_world_ts)
    if self._last_real_world_ts == None:
        self._last_real_world_ts = current_real_world_ts
    self._current_nofix_ts = self._apply_no_fix(data)
    self._current_noloss_ts = self._apply_fix_noloss(
        data, self._current_noloss_ts)
    self._current_withloss_ts = self._apply_fix_withloss(
        data, self._current_withloss_ts, current_real_world_ts, self._last_real_world_ts)
    diff_rw = current_real_world_ts - self._last_real_world_ts
    self._last_real_world_ts = current_real_world_ts
    return self._current_nofix_ts, self._current_noloss_ts, self._current_withloss_ts, diff_rw