Skip to content

Generator

MODULE

GENERATOR

ARUS dataflow: generators.

Generator functions can take external data source and generate values in buffer_size (number of samples) with Python generator pattern.

  • Author: Qu Tang
  • Date: 01/28/2020
  • License: GNU v3
Classes
class

Generator(buffer_size=1000000)

Bases
arus.operator.Operator

Base generator class.

Note

This class should always be inherited and should not be called directly. Subclasses should override run method with its own data source.

Parameters
  • buffer_size (int, optional) The sample size for each burst of the streaming data.
Example

Use generator classes in the following pattern.

# Replace Generator, args, kwargs with proper names for different generator classes
gen = Generator(*args, **kwargs)
# Start generator
gen.start()
# Get chunked data
for data, context in gen.get_result():
    # handle data
    if data is None: # Some condition for early termination
        break
# Stop generator
gen.stop()
Methods
  • run(values, src, context) Implementation of data generation.
  • start() Generate burst of streaming data.
method
start()

Generate burst of streaming data.

Use this method instead of run when you are using generators directly instead of relying on arus.Node.

method
run(values=None, src=None, context={})

Implementation of data generation.

This method must be overrided by subclasses and developers should implement it to load data from data sources and generate chunks with the data.

Examples

Developers should implement with the following template.

# You can accept data source from the `__init__` method
for data in self._load_data(self._data_sources):
    # Call this to buffer input data with `buffer_size`
    result = self._buffering(data)
    # Put the generated data into `self._result`. You should always attach the `self._context` so that it can be chained with other operators via `arus.Node`.
    self._result.put((result, self._context))
    # Implement stop condition
    if data is None or self._stop:
        break
Parameters
  • values (optional) Not used.
  • src (optional) Not used.
  • context (optional) Not used.
class

MhealthSensorFileGenerator(*filepaths, **kwargs)

Bases
arus.generator.Generator arus.operator.Operator

Generator class for sensor files stored in mhealth format.

Note

The file paths should be sorted before loading. The generator will load data from the files one by one in order.

Parameters
  • *filepaths (str) The sensor file paths as data sources.
  • **kwargs (object) Other keyword arguments passed to parent class, which is buffer_size.
Examples

Generate mhealth sensor data in chunks, with each chunk includes 10 samples.

gen = arus.MhealthSensorFileGenerator("path/to/sensor_file.csv", buffer_size=10)
gen.start()
for data, context in gen.get_result():
    print(data.shape[0]) # should be 10
    if data is None: # end condition
        break
gen.stop()
Methods
  • run(values, src, context) Implementation of data generation (Hidden).
  • start() Generate burst of streaming data.
method
start()

Generate burst of streaming data.

Use this method instead of run when you are using generators directly instead of relying on arus.Node.

method
run(values=None, src=None, context={})

Implementation of data generation (Hidden).

class

MhealthAnnotationFileGenerator(*filepaths, **kwargs)

Bases
arus.generator.Generator arus.operator.Operator

Generator class for annotation files stored in mhealth format.

Note

The file paths should be sorted before loading. The generator will load data from the files one by one in order.

Parameters
  • *filepaths The annotation file paths as data sources.
  • **kwargs Other keyword arguments passed to parent class, which is buffer_size.
Examples

Generate mhealth annotation data in chunks, with each chunk includes 10 rows of annotations.

gen = arus.MhealthAnnotationFileGenerator("path/to/annotation_file.csv", buffer_size=10)
gen.start()
for data, context in gen.get_result():
    print(data.shape[0]) # should be 10
    if data is None: # end condition
        break
gen.stop()
Methods
  • run(values, src, context) Implementation of data generation (Hidden).
  • start() Generate burst of streaming data.
method
start()

Generate burst of streaming data.

Use this method instead of run when you are using generators directly instead of relying on arus.Node.

method
run(values=None, src=None, context={})

Implementation of data generation (Hidden).

class

RandomAccelDataGenerator(sr, grange=8, st=None, sigma=1, max_samples=None, **kwargs)

Bases
arus.generator.Generator arus.operator.Operator

Generator class for raw accelerometer data synthesized randomly.

Parameters
  • sr (int) The sampling rate in Hz.
  • grange (int, optional) The dynamic range in g value.
  • st (str, datetime, numpy.datetime64, pandas.Timestamp, optional) The start timestamp of the generated data. If None, it will be the current timestamp.
  • sigma (float, optional) The variance of the generated data sampled from Gaussian Distribution.
  • max_samples (int, optional) The maximum number of samples to be generated.
  • **kwargs (object) Other keyword arguments passed to parent class, which is buffer_size.
Examples

Generate accelerometer data in chunks, with each chunk includes 10 samples for at most 100 samples (10 chunks).

gen = arus.RandomAccelDataGenerator(80, grange=8, st=datetime.datetime.now(), sigma=1.5, max_samples=100, buffer_size=10)
gen.start()
for data, context in gen.get_result():
    print(data.shape[0]) # should be 10
    # should end loop after 10 cycles
gen.stop()
Methods
  • run(values, src, context) Implementation of data generation (Hidden).
  • start() Generate burst of streaming data.
method
start()

Generate burst of streaming data.

Use this method instead of run when you are using generators directly instead of relying on arus.Node.

method
run(values=None, src=None, context={})

Implementation of data generation (Hidden).

class

RandomAnnotationDataGenerator(labels, duration_mu=5, duration_sigma=5, st=None, num_mu=2, num_sigma=1, max_samples=None, **kwargs)

Bases
arus.generator.Generator arus.operator.Operator

Generator class for annotation data synthesized randomly.

Parameters
  • labels (list) List of annotation labels to be randomly selected.
  • duration_mu (float, optional) The mean of the Gaussian distribution in seconds used to decide the annotation duration.
  • duration_sigma (float, optional) The standard deviation of the Gaussian distribution in seconds used to decide the annotation duration.
  • st (str, datetime, numpy.datetime64, pandas.Timestamp, optional) The start timestamp of the generated data. If None, it will be the current timestamp.
  • num_mu (float, optional) The mean of the Gaussian distribution used to decide the number of annotations for each generation.
  • num_sigma (float, optional) The standard deviation of the Gaussian distribution used to decide the number of annotations for each generation.
  • max_samples (int, optional) The maximum number of rows of annotations to be generated.
  • **kwargs (object) Other keyword arguments passed to parent class, which is buffer_size.
Examples

Generate annotation data in chunks, with each chunk includes 10 samples for at most 100 samples (10 chunks).

gen = arus.RandomAnnotationDataGenerator(['Sit', 'Walk'], duration_mu=5, duration_sigma=5, st=st=datetime.datetime.now(), num_mu=3, num_sigma=1, max_samples=100, buffer_size=10)
gen.start()
for data, context in gen.get_result():
    print(data.shape[0]) # should be 10
    # should end loop after 10 cycles
gen.stop()
Methods
  • run(values, src, context) Implementation of data generation (Hidden).
  • start() Generate burst of streaming data.
method
start()

Generate burst of streaming data.

Use this method instead of run when you are using generators directly instead of relying on arus.Node.

method
run(values=None, src=None, context={})

Implementation of data generation (Hidden).


Last update: July 16, 2020