2 - Working with Streams#

This tutorial focuses on retrieving data from the individual streams of time series data.

We will also look into the Point classes which represent individual data points that make up a Stream. We will start by looking at basic metadata about the streamm, and then illustrate how to query quantitative data values.

In this tutorial we will retrieve both low level data and aggregated windows of data. Once you have mastered these concepts, we recommend working through the tutorial windows, aligned_windows, and values to learn more about when it’s more appropriate to use windows vs values queries.

Imports#

[1]:
import btrdb
from tabulate import tabulate
from pprint import pprint

Connect To Server#

[2]:
conn = btrdb.connect()
conn.info()

[2]:
{'majorVersion': 5, 'build': '5.12.5', 'proxy': {'proxyEndpoints': []}}

Stream Basics#

Streams contain actual points of data and you can query the time series values in a number of ways. But first, let’s look at the Stream metadata for a better understanding of what is available.

Metadata Properties#

As discussed in the previous notebook, a stream has its own metadata properties such as its collection or uuid. Let’s start by defining each read-only property.

collection: The hierarchichal path where a stream lives. This is used purely for organizational purposes and has no impact on the underlying data.

name: A friendly name for the stream.

uuid: A unique identifier for a given stream. This UUID cannot be changed and can be used to fetch a stream directly using the connection object.

[3]:
streams = list(conn.streams_in_collection('sunshine/PMU6', tags={"unit": "volts"}))
for stream in streams:
    print("collection:{}, name:{}, uuid:{}".format(stream.collection, stream.name, stream.uuid))

collection:sunshine/PMU6, name:L2MAG, uuid:d60fc469-a6da-4c98-8763-fd833293d955
collection:sunshine/PMU6, name:L3MAG, uuid:4833b5e0-ef30-40ed-8db8-352e79d38c28
collection:sunshine/PMU6, name:L1MAG, uuid:d3e9ed52-6db9-4b98-bfda-e1b509148e47

Metadata Methods#

In addition to the metadata properties above, each stream has a set of metadata methods. This information is provided as instance methods to indicate that they typically require a round-trip to the server and may change often.

<dt>tags</dt>
<dd>A dictionary containing metadata used internally by the BTrDB server.  This is also where the name of a stream can be found as the property is just for convenience. Tags are intended for internal use and generally should not be edited.</dd>
<dt>annotations</dt>
<dd>A dictionary so that custom data can be attached to a stream.  As an example you could provide an annotation to indicate a point-of-contact for this data or the make/model of the sensor device.  Whenever you call the annotations method, a separate property version number is also returned. Similar to the stream version, this is a monotonically increasing number but is only updated when the metadata changes. Annotations are well suited for user-defined metadata that may evolve over time.</dd>

Let’s look at some sample tags and annotations below.

[4]:
stream = streams[0]

print("tags")
pprint(stream.tags())

annotations, property_version = stream.annotations()
print("\nannotations")
pprint(annotations)

print("\nproperty_version")
print(property_version)

tags
{'distiller': '', 'ingress': '', 'name': 'L2MAG', 'unit': 'volts'}

annotations
{'impedance': '{"source": "PMU4", "target": "PMU6", "pos_sequence": "0.76 + '
              'j0.463", "neg_sequence": "1.782 + j1.234"}',
 'location': 'building'}

property_version
1

Metadata Example#

As with the collections, we can write a simple helper method to display information about a Stream.

[5]:
def stream_detail(stream):
    """
    Prints detailed information about a stream.
    """
    table = [["Attribute", "Value"]]
    table.append(["collection", stream.collection])
    table.append(["version", stream.version()])

    for k,v in stream.tags().items():
        table.append(["tag/{}".format(k), v])

    for k,v in stream.annotations()[0].items():
        table.append(["annotation/{}".format(k), v])

    return tabulate(table, headers="firstrow")

print(stream_detail(stream))
Attribute             Value
--------------------  -------------------------------------------------------------------------------------------------------
collection            sunshine/PMU6
version               37988
tag/distiller
tag/name              L2MAG
tag/unit              volts
tag/ingress
annotation/location   building
annotation/impedance  {"source": "PMU4", "target": "PMU6", "pos_sequence": "0.76 + j0.463", "neg_sequence": "1.782 + j1.234"}

Stream Version#

Remember that the version of a stream changes whenever data is modified (though not when metadata is modified). This is especially useful on live data such that you can pin all of your queries to a specific version of your data. Otherwise, you could potentially get constantly changing statistics as new data is appended or older data arrives out of order.

Think of the version as a snapshot of your data in time. Most of the calls we use on this page allow for an optional version argument which we have omitted. The default value of zero is used which indicates the current (or latest) version.

Viewing Data#

Most likely you are interested in examining the stored values in each time series stream. This is fairly straightforward though it is important to note that the time series values are actually provided in a RawPoint class which has time and value attributes.

Because a stream may contain billions of values, data queries should provide a start and end time to bound the amount of data retrieved. All times are stored internally in nanoseconds with the zero value being the Unix epoch. Luckily, the btrdb library comes with many convenience functions to help with converting between nanoseconds and other datetime formats.

Let’s start with a simple example to view data in our LINE560V1-MAG stream. We will call the stream’s values method which returns a sequence. Each item in the sequence is a tuple containing a RawPoint and the version of the stream at retrieval time. However, before we call the values method we will need to determine the start time for our call.

[6]:
# retrieve the first point of data and the current stream version
earliest, version = stream.earliest()
print(earliest, version)
RawPoint(1456790400008333000, 283.017822265625) 0

We can see above that the first data point in the stream has a time of 1536710401000000000 nanoseconds. Let’s convert that to a datetime so we can understand it better.

[7]:
from btrdb.utils.timez import ns_to_datetime
ns_to_datetime(earliest.time)
[7]:
datetime.datetime(2016, 3, 1, 0, 0, 0, 8333, tzinfo=<UTC>)

Now, let’s call values using our start time and ending 200 milliseconds later.

[8]:
start = earliest.time
end = earliest.time + 2e8

for point, version in stream.values(start, end):
    print(point)
RawPoint(1456790400008333000, 283.017822265625)
RawPoint(1456790400016666000, 283.0371398925781)
RawPoint(1456790400024999000, 283.05413818359375)
RawPoint(1456790400033333000, 283.05853271484375)
RawPoint(1456790400041666000, 283.0568542480469)
RawPoint(1456790400049999000, 283.06005859375)
RawPoint(1456790400058333000, 283.05340576171875)
RawPoint(1456790400066666000, 283.0347900390625)
RawPoint(1456790400074999000, 283.0113220214844)
RawPoint(1456790400083333000, 283.0077209472656)
RawPoint(1456790400091666000, 283.02532958984375)
RawPoint(1456790400099999000, 283.03515625)
RawPoint(1456790400108333000, 283.0439147949219)
RawPoint(1456790400116666000, 283.05706787109375)
RawPoint(1456790400124999000, 283.0538635253906)
RawPoint(1456790400133333000, 283.0562438964844)
RawPoint(1456790400141666000, 283.0840759277344)
RawPoint(1456790400149999000, 283.11444091796875)
RawPoint(1456790400158333000, 283.1253967285156)
RawPoint(1456790400166666000, 283.1314697265625)
RawPoint(1456790400174999000, 283.1392517089844)
RawPoint(1456790400183333000, 283.1244812011719)
RawPoint(1456790400191666000, 283.1060485839844)
RawPoint(1456790400199999000, 283.09478759765625)
RawPoint(1456790400208333000, 283.088134765625)

We can also access the time and value attribute of each point as follows:

[9]:
for point, version in stream.values(start, end):
    print(point.time, point.value)
1456790400008333000 283.017822265625
1456790400016666000 283.0371398925781
1456790400024999000 283.05413818359375
1456790400033333000 283.05853271484375
1456790400041666000 283.0568542480469
1456790400049999000 283.06005859375
1456790400058333000 283.05340576171875
1456790400066666000 283.0347900390625
1456790400074999000 283.0113220214844
1456790400083333000 283.0077209472656
1456790400091666000 283.02532958984375
1456790400099999000 283.03515625
1456790400108333000 283.0439147949219
1456790400116666000 283.05706787109375
1456790400124999000 283.0538635253906
1456790400133333000 283.0562438964844
1456790400141666000 283.0840759277344
1456790400149999000 283.11444091796875
1456790400158333000 283.1253967285156
1456790400166666000 283.1314697265625
1456790400174999000 283.1392517089844
1456790400183333000 283.1244812011719
1456790400191666000 283.1060485839844
1456790400199999000 283.09478759765625
1456790400208333000 283.088134765625

Windowed Data#

Aside from viewing the raw values, we can also query windows of data. Windowed queries return StatPoint values that contain aggregations of the data in each window rather than individual RawPoint values. Each StatPoint contains attributes for the minimum, maximum, mean, and standard deviation of the values in each window. StatPoints also report the number of raw points (i.e., the count) and the start time of the window. You can find more information about windowing data in the docs or view the API reference.

This time we will use another convenience function from btrdb.utils.timez which will help us to manipulate values at the nanosecond scale by better defining the query range and selecting aggregates of data at 100 millisecond intervals.

[10]:
from btrdb.utils.timez import ns_delta

start = earliest.time
end = earliest.time + ns_delta(seconds=1)   # 1 second later
width = ns_delta(milliseconds=100)          # 100 milleseconds per window

for point, version in stream.windows(start, end, width):
    print(point)
StatPoint(1456790400008333000, 283.0077209472656, 283.0376892089844, 283.06005859375, 12, 0.018201699453480432)
StatPoint(1456790400108333000, 283.0439147949219, 283.09425354003906, 283.1392517089844, 12, 0.03283400327693183)
StatPoint(1456790400208333000, 283.0471496582031, 283.06798299153644, 283.0984191894531, 12, 0.016191161279048426)
StatPoint(1456790400308333000, 282.9364318847656, 283.0061569213867, 283.05145263671875, 12, 0.03689424607932575)
StatPoint(1456790400408333000, 282.95733642578125, 283.0086212158203, 283.03564453125, 12, 0.02022968452356655)
StatPoint(1456790400508333000, 283.01397705078125, 283.04548899332684, 283.07122802734375, 12, 0.01554654929714072)
StatPoint(1456790400608333000, 283.047607421875, 283.08336639404297, 283.13568115234375, 12, 0.0278841838563787)
StatPoint(1456790400708333000, 283.089111328125, 283.13182830810547, 283.16107177734375, 12, 0.01937428873128766)
StatPoint(1456790400808333000, 283.09521484375, 283.1276219685872, 283.15911865234375, 12, 0.017036256335062058)
StatPoint(1456790400908333000, 283.0104064941406, 283.03795623779297, 283.083740234375, 12, 0.0210068369334544)

Let’s create a convenience function to view all of this data in a table using tabulate again.

[11]:
def stat_table(points):
    attrs = ["time", "min", "max", "mean", "stddev", "count"]
    table = [attrs]
    for p in points:
        table.append([getattr(p, attr) for attr in attrs])
    return tabulate(table, headers="firstrow")

points = [point for point, _ in stream.windows(start, end, width)]
print(stat_table(points))
               time      min      max     mean     stddev    count
-------------------  -------  -------  -------  ---------  -------
1456790400008333000  283.008  283.06   283.038  0.0182017       12
1456790400108333000  283.044  283.139  283.094  0.032834        12
1456790400208333000  283.047  283.098  283.068  0.0161912       12
1456790400308333000  282.936  283.051  283.006  0.0368942       12
1456790400408333000  282.957  283.036  283.009  0.0202297       12
1456790400508333000  283.014  283.071  283.045  0.0155465       12
1456790400608333000  283.048  283.136  283.083  0.0278842       12
1456790400708333000  283.089  283.161  283.132  0.0193743       12
1456790400808333000  283.095  283.159  283.128  0.0170363       12
1456790400908333000  283.01   283.084  283.038  0.0210068       12

While we haven’t used it in our examples, you can also optionally supply a depth argument to the windows query. The depth is the precision of the data in each window, which can greatly speed up your queries if you don’t need to be precise down to the nanosecond level. As an example, supplying depth=30 would return to-second precision and take advantage of the BTrDB database to increase the retrieval performance. Please see the docs for more information.

Aligned Windows#

The aligned windows method is an alternative aggregate query that more directly exploits the underlying database structure for the best possible performance. This method, aligned_windows takes a pointwidth argument that defines the window width as a power of two. Like the windows method, each point returned is a statistical aggregate of all the raw data within a window but of width \(2^{pointwidth}\) nanoseconds.

Note that when bounding the query with a time range, start is inclusive, but end is exclusive. That is, results will be returned for all windows that start in the interval \([start, end)\). If \(end < start+2^{pointwidth}\) you will not get any results. If start and end are not powers of two, the bottom pointwidth bits will be cleared. Each window will contain statistical summaries of the window. Statistical points with no points (count=0) will be ommitted.

[12]:
pointwidth = 27    # 2^27 nanoseconds wide
points = [point for point, _ in stream.aligned_windows(start, end, pointwidth)]
print(stat_table(points))
               time      min      max     mean     stddev    count
-------------------  -------  -------  -------  ---------  -------
1456790399996657664  283.008  283.06   283.04   0.017388        15
1456790400130875392  283.054  283.139  283.094  0.0276213       16
1456790400265093120  282.95   283.069  283.027  0.0342351       16
1456790400399310848  282.936  283.04   283.01   0.0272138       17
1456790400533528576  283.034  283.112  283.061  0.0189327       16
1456790400667746304  283.089  283.161  283.127  0.0202038       16
1456790400801964032  283.044  283.159  283.111  0.0338108       16

This can be really useful if you dont particularly care where the start/end of your windows are, or if you just want a high level statistical view.

As an example, the code below will return a single window of all of the data so you can easily view statistics of the entire dataset. We will use the currently_as_ns function to get the current datetime as nanoseconds from epoch to use as our end time.

[13]:
from btrdb.utils.timez import currently_as_ns

pointwidth = 52
points = [point for point, _ in stream.aligned_windows(start, currently_as_ns(), pointwidth)]
print(stat_table(points))
               time      min      max     mean    stddev      count
-------------------  -------  -------  -------  --------  ---------
1454662679640670208  240.445  290.53   285.478   1.41733  285105792
1459166279268040704  271.247  293.465  284.494   1.62677  536368941
1463669878895411200  266.341  289.085  283.935   1.7701   127976711
[ ]: