3 - Working with StreamSets#

The previous tutorial demonstrated how to retrieve data from individual streams.

Here, we pull data from groups of streams known as StreamSet objects. You can think of a StreamSet as a wrapper around a list of Stream objects that simplifies workflows like getting data into a dataframe, or time-aligning values across streams.

The StreamSet class has a number of methods to mirror those found in regular Stream objects including methods for transforming and serializing the data to different formats.

As with a Stream, retrieving data from the BTrDB server will fully materialize in memory so please keep this in mind. In other words, do not attempt to retreive data that is greater than the amount of memory available to you.

Imports#

[1]:
import btrdb
from btrdb.stream import StreamSet
from tabulate import tabulate
from btrdb.utils.timez import ns_delta

Connect To Server#

To get started we’ll connect to the server and define a helper method from a previous notebook.

[2]:
# Make sure you add your API key to the config file to connect!
conn = btrdb.connect()
conn.info()
[2]:
{'majorVersion': 5, 'build': '5.11.157', 'proxy': {'proxyEndpoints': []}}
[3]:
def describe_streams(streams):
    table = [["Collection", "Name", "Units", "Version", "Earliest", "Latest"]]
    for stream in streams:
        tags = stream.tags()
        table.append([
            stream.collection, stream.name, tags["unit"], stream.version(),
            stream.earliest()[0].time, stream.latest()[0].time,
        ])
    return tabulate(table, headers="firstrow")

Helper Methods#

The best way to think about the StreamSet is as a wrapper around a list of Stream objects with appropriate methods added to help with examining your data. To create a StreamSet we can just pass in a list of streams, which we will obtain by selecting a few current streams from the database.

[4]:
streams = conn.streams_in_collection('sunshine/PMU1', tags={"unit": "amps"})
streamset = StreamSet(streams)
print(describe_streams(streamset))
Collection     Name    Units      Version             Earliest               Latest
-------------  ------  -------  ---------  -------------------  -------------------
sunshine/PMU1  C3MAG   amps        240481  1443715704008333000  1492220199333333000
sunshine/PMU1  C2MAG   amps        240718  1443715704008333000  1492220199333333000
sunshine/PMU1  C1MAG   amps        240380  1443715704008333000  1492220495999999000

Now that we have a StreamSet with the three streams, let’s take a look at some of the helper methods. In a Stream object, the earliest method would provide the first point in the Stream. StreamSet.earliest will provide a tuple containing the first points from each individual stream. The order is the same as the UUIDs that were provided when creating the instance.

[5]:
streamset.earliest()
[5]:
(RawPoint(1443715704008333000, 0.14318889379501343),
 RawPoint(1443715704008333000, 0.14414265751838684),
 RawPoint(1443715704008333000, 0.14462821185588837))

Similarly, let’s look at the latest points in the streams.

[6]:
streamset.latest()
[6]:
(RawPoint(1492220199333333000, 19.179134368896484),
 RawPoint(1492220199333333000, 23.198545455932617),
 RawPoint(1492220495999999000, 20.29911994934082))

Viewing Data#

Like the Stream object, the StreamSet has a values method which will return a list of lists. Each internal list contains the RawPoint instances for a given stream. Just as before we will return only a little bit of the data from the beginning of the streams.

We will start by finding the earliest time from all of the earliest points although in this case they all have the same beginning time.

[7]:
earliest_point = sorted(streamset.earliest(), key=lambda p: p.time)[0]
earliest_point.time
[7]:
1443715704008333000

Next we will ask for the values in the streams. Stream values are returned as a list of list of points such that the lists of points are ordered according to the UUIDs provided on initialization. Using this method data is fetched for each stream and returned and can be thought of as a helper method to query multiple streams simultaneously.

[8]:
start = earliest_point.time
end = start + ns_delta(milliseconds=100)
streamset.filter(start, end).values()
[8]:
[[RawPoint(1443715704008333000, 0.14318889379501343),
  RawPoint(1443715704016666000, 0.14324119687080383),
  RawPoint(1443715704024999000, 0.14320218563079834),
  RawPoint(1443715704033333000, 0.14330747723579407),
  RawPoint(1443715704041666000, 0.14341890811920166),
  RawPoint(1443715704049999000, 0.1432417333126068),
  RawPoint(1443715704058333000, 0.14306816458702087),
  RawPoint(1443715704066666000, 0.14304737746715546),
  RawPoint(1443715704074999000, 0.14306952059268951),
  RawPoint(1443715704083333000, 0.14308595657348633),
  RawPoint(1443715704091666000, 0.1431303173303604),
  RawPoint(1443715704099999000, 0.14315181970596313)],
 [RawPoint(1443715704008333000, 0.14414265751838684),
  RawPoint(1443715704016666000, 0.14403386414051056),
  RawPoint(1443715704024999000, 0.14402998983860016),
  RawPoint(1443715704033333000, 0.14409348368644714),
  RawPoint(1443715704041666000, 0.14425787329673767),
  RawPoint(1443715704049999000, 0.1443229466676712),
  RawPoint(1443715704058333000, 0.14423619210720062),
  RawPoint(1443715704066666000, 0.1443118453025818),
  RawPoint(1443715704074999000, 0.14431005716323853),
  RawPoint(1443715704083333000, 0.14416487514972687),
  RawPoint(1443715704091666000, 0.14408448338508606),
  RawPoint(1443715704099999000, 0.1440746784210205)],
 [RawPoint(1443715704008333000, 0.14462821185588837),
  RawPoint(1443715704016666000, 0.144479438662529),
  RawPoint(1443715704024999000, 0.14440062642097473),
  RawPoint(1443715704033333000, 0.14451566338539124),
  RawPoint(1443715704041666000, 0.14469324052333832),
  RawPoint(1443715704049999000, 0.14457932114601135),
  RawPoint(1443715704058333000, 0.14438524842262268),
  RawPoint(1443715704066666000, 0.14439848065376282),
  RawPoint(1443715704074999000, 0.14448127150535583),
  RawPoint(1443715704083333000, 0.14456181228160858),
  RawPoint(1443715704091666000, 0.14456859230995178),
  RawPoint(1443715704099999000, 0.14451190829277039)]]

You may have noticed that we first called the filter method and then called the values method with no arguments. The StreamSet class was designed to support a method chaining style of programming and so behaves slightly differently from the Stream.

Data is only ever materialized when calling the values or rows method as demonstrated below. The rows method is similar to the values method but orients the data differently. Here you will notice that the streams are aligned according to time. The first tuple contains all of the data for the first time index. If any streams do not have data at that time index, then None is used as a placeholder.

[9]:
streamset.filter(start, end).rows()
[9]:
[(RawPoint(1443715704008333000, 0.14318889379501343),
  RawPoint(1443715704008333000, 0.14414265751838684),
  RawPoint(1443715704008333000, 0.14462821185588837)),
 (RawPoint(1443715704016666000, 0.14324119687080383),
  RawPoint(1443715704016666000, 0.14403386414051056),
  RawPoint(1443715704016666000, 0.144479438662529)),
 (RawPoint(1443715704024999000, 0.14320218563079834),
  RawPoint(1443715704024999000, 0.14402998983860016),
  RawPoint(1443715704024999000, 0.14440062642097473)),
 (RawPoint(1443715704033333000, 0.14330747723579407),
  RawPoint(1443715704033333000, 0.14409348368644714),
  RawPoint(1443715704033333000, 0.14451566338539124)),
 (RawPoint(1443715704041666000, 0.14341890811920166),
  RawPoint(1443715704041666000, 0.14425787329673767),
  RawPoint(1443715704041666000, 0.14469324052333832)),
 (RawPoint(1443715704049999000, 0.1432417333126068),
  RawPoint(1443715704049999000, 0.1443229466676712),
  RawPoint(1443715704049999000, 0.14457932114601135)),
 (RawPoint(1443715704058333000, 0.14306816458702087),
  RawPoint(1443715704058333000, 0.14423619210720062),
  RawPoint(1443715704058333000, 0.14438524842262268)),
 (RawPoint(1443715704066666000, 0.14304737746715546),
  RawPoint(1443715704066666000, 0.1443118453025818),
  RawPoint(1443715704066666000, 0.14439848065376282)),
 (RawPoint(1443715704074999000, 0.14306952059268951),
  RawPoint(1443715704074999000, 0.14431005716323853),
  RawPoint(1443715704074999000, 0.14448127150535583)),
 (RawPoint(1443715704083333000, 0.14308595657348633),
  RawPoint(1443715704083333000, 0.14416487514972687),
  RawPoint(1443715704083333000, 0.14456181228160858)),
 (RawPoint(1443715704091666000, 0.1431303173303604),
  RawPoint(1443715704091666000, 0.14408448338508606),
  RawPoint(1443715704091666000, 0.14456859230995178)),
 (RawPoint(1443715704099999000, 0.14315181970596313),
  RawPoint(1443715704099999000, 0.1440746784210205),
  RawPoint(1443715704099999000, 0.14451190829277039))]

Let’s use the tabulate library again to better format the data rows.

[10]:
table = [["time"] + [s.name for s in streamset]]

for row in streamset.filter(start, end).rows():
    time = sorted([p.time for p in row])[-1]
    data = [time]
    for point in row:
        data.append(point.value)
    table.append(data)

print(tabulate(table, headers="firstrow"))
               time     C3MAG     C2MAG     C1MAG
-------------------  --------  --------  --------
1443715704008333000  0.143189  0.144143  0.144628
1443715704016666000  0.143241  0.144034  0.144479
1443715704024999000  0.143202  0.14403   0.144401
1443715704033333000  0.143307  0.144093  0.144516
1443715704041666000  0.143419  0.144258  0.144693
1443715704049999000  0.143242  0.144323  0.144579
1443715704058333000  0.143068  0.144236  0.144385
1443715704066666000  0.143047  0.144312  0.144398
1443715704074999000  0.14307   0.14431   0.144481
1443715704083333000  0.143086  0.144165  0.144562
1443715704091666000  0.14313   0.144084  0.144569
1443715704099999000  0.143152  0.144075  0.144512

Transforming Data to Other Formats#

A number of methods have been provided to convert the point data objects into objects you may already be familiar with such as numpy arrays and pandas dataframes. Using these transformation methods materializes the data similar to the values method. Examples of the available transformations follow.

Numpy Arrays#

Converting to Numpy arrays will produce a list of arrays. This output will be similar in structure to calling the values method.

[11]:
start = earliest_point.time
end = start + ns_delta(milliseconds=100)

streamset.filter(start, end).to_array()
[11]:
array([[0.14318889, 0.1432412 , 0.14320219, 0.14330748, 0.14341891,
        0.14324173, 0.14306816, 0.14304738, 0.14306952, 0.14308596,
        0.14313032, 0.14315182],
       [0.14414266, 0.14403386, 0.14402999, 0.14409348, 0.14425787,
        0.14432295, 0.14423619, 0.14431185, 0.14431006, 0.14416488,
        0.14408448, 0.14407468],
       [0.14462821, 0.14447944, 0.14440063, 0.14451566, 0.14469324,
        0.14457932, 0.14438525, 0.14439848, 0.14448127, 0.14456181,
        0.14456859, 0.14451191]])

Pandas Series#

Converting to a pandas series will produce a view of the data similar to calling the values method. The resulting series will be indexed by time.

[12]:
streamset.filter(start, end).to_series()
[12]:
[2015-10-01 16:08:24.008333    0.143189
 2015-10-01 16:08:24.016666    0.143241
 2015-10-01 16:08:24.024999    0.143202
 2015-10-01 16:08:24.033333    0.143307
 2015-10-01 16:08:24.041666    0.143419
 2015-10-01 16:08:24.049999    0.143242
 2015-10-01 16:08:24.058333    0.143068
 2015-10-01 16:08:24.066666    0.143047
 2015-10-01 16:08:24.074999    0.143070
 2015-10-01 16:08:24.083333    0.143086
 2015-10-01 16:08:24.091666    0.143130
 2015-10-01 16:08:24.099999    0.143152
 Name: sunshine/PMU1/C3MAG, dtype: float64,
 2015-10-01 16:08:24.008333    0.144143
 2015-10-01 16:08:24.016666    0.144034
 2015-10-01 16:08:24.024999    0.144030
 2015-10-01 16:08:24.033333    0.144093
 2015-10-01 16:08:24.041666    0.144258
 2015-10-01 16:08:24.049999    0.144323
 2015-10-01 16:08:24.058333    0.144236
 2015-10-01 16:08:24.066666    0.144312
 2015-10-01 16:08:24.074999    0.144310
 2015-10-01 16:08:24.083333    0.144165
 2015-10-01 16:08:24.091666    0.144084
 2015-10-01 16:08:24.099999    0.144075
 Name: sunshine/PMU1/C2MAG, dtype: float64,
 2015-10-01 16:08:24.008333    0.144628
 2015-10-01 16:08:24.016666    0.144479
 2015-10-01 16:08:24.024999    0.144401
 2015-10-01 16:08:24.033333    0.144516
 2015-10-01 16:08:24.041666    0.144693
 2015-10-01 16:08:24.049999    0.144579
 2015-10-01 16:08:24.058333    0.144385
 2015-10-01 16:08:24.066666    0.144398
 2015-10-01 16:08:24.074999    0.144481
 2015-10-01 16:08:24.083333    0.144562
 2015-10-01 16:08:24.091666    0.144569
 2015-10-01 16:08:24.099999    0.144512
 Name: sunshine/PMU1/C1MAG, dtype: float64]

Pandas DataFrame#

Converting to a pandas dataframe will produce a tabular view of the data similar to calling the rows method. The resulting dataframe will be indexed by time.

[13]:
streamset.filter(start, end).to_dataframe()
[13]:
sunshine/PMU1/C3MAG sunshine/PMU1/C2MAG sunshine/PMU1/C1MAG
time
1443715704008333000 0.143189 0.144143 0.144628
1443715704016666000 0.143241 0.144034 0.144479
1443715704024999000 0.143202 0.144030 0.144401
1443715704033333000 0.143307 0.144093 0.144516
1443715704041666000 0.143419 0.144258 0.144693
1443715704049999000 0.143242 0.144323 0.144579
1443715704058333000 0.143068 0.144236 0.144385
1443715704066666000 0.143047 0.144312 0.144398
1443715704074999000 0.143070 0.144310 0.144481
1443715704083333000 0.143086 0.144165 0.144562
1443715704091666000 0.143130 0.144084 0.144569
1443715704099999000 0.143152 0.144075 0.144512

Dictionaries#

Converting to Python dictionaries produces a list of OrderedDicts similar to calling the rows method.

[14]:
streamset.filter(start, end).to_dict()
[14]:
[OrderedDict([('time', 1443715704008333000),
              ('sunshine/PMU1/C3MAG', 0.14318889379501343),
              ('sunshine/PMU1/C2MAG', 0.14414265751838684),
              ('sunshine/PMU1/C1MAG', 0.14462821185588837)]),
 OrderedDict([('time', 1443715704016666000),
              ('sunshine/PMU1/C3MAG', 0.14324119687080383),
              ('sunshine/PMU1/C2MAG', 0.14403386414051056),
              ('sunshine/PMU1/C1MAG', 0.144479438662529)]),
 OrderedDict([('time', 1443715704024999000),
              ('sunshine/PMU1/C3MAG', 0.14320218563079834),
              ('sunshine/PMU1/C2MAG', 0.14402998983860016),
              ('sunshine/PMU1/C1MAG', 0.14440062642097473)]),
 OrderedDict([('time', 1443715704033333000),
              ('sunshine/PMU1/C3MAG', 0.14330747723579407),
              ('sunshine/PMU1/C2MAG', 0.14409348368644714),
              ('sunshine/PMU1/C1MAG', 0.14451566338539124)]),
 OrderedDict([('time', 1443715704041666000),
              ('sunshine/PMU1/C3MAG', 0.14341890811920166),
              ('sunshine/PMU1/C2MAG', 0.14425787329673767),
              ('sunshine/PMU1/C1MAG', 0.14469324052333832)]),
 OrderedDict([('time', 1443715704049999000),
              ('sunshine/PMU1/C3MAG', 0.1432417333126068),
              ('sunshine/PMU1/C2MAG', 0.1443229466676712),
              ('sunshine/PMU1/C1MAG', 0.14457932114601135)]),
 OrderedDict([('time', 1443715704058333000),
              ('sunshine/PMU1/C3MAG', 0.14306816458702087),
              ('sunshine/PMU1/C2MAG', 0.14423619210720062),
              ('sunshine/PMU1/C1MAG', 0.14438524842262268)]),
 OrderedDict([('time', 1443715704066666000),
              ('sunshine/PMU1/C3MAG', 0.14304737746715546),
              ('sunshine/PMU1/C2MAG', 0.1443118453025818),
              ('sunshine/PMU1/C1MAG', 0.14439848065376282)]),
 OrderedDict([('time', 1443715704074999000),
              ('sunshine/PMU1/C3MAG', 0.14306952059268951),
              ('sunshine/PMU1/C2MAG', 0.14431005716323853),
              ('sunshine/PMU1/C1MAG', 0.14448127150535583)]),
 OrderedDict([('time', 1443715704083333000),
              ('sunshine/PMU1/C3MAG', 0.14308595657348633),
              ('sunshine/PMU1/C2MAG', 0.14416487514972687),
              ('sunshine/PMU1/C1MAG', 0.14456181228160858)]),
 OrderedDict([('time', 1443715704091666000),
              ('sunshine/PMU1/C3MAG', 0.1431303173303604),
              ('sunshine/PMU1/C2MAG', 0.14408448338508606),
              ('sunshine/PMU1/C1MAG', 0.14456859230995178)]),
 OrderedDict([('time', 1443715704099999000),
              ('sunshine/PMU1/C3MAG', 0.14315181970596313),
              ('sunshine/PMU1/C2MAG', 0.1440746784210205),
              ('sunshine/PMU1/C1MAG', 0.14451190829277039)])]