0 - PredictiveGrid in Python#

A Quick Start Notebook#

This notebook is a quick start to working with data using PredictiveGrid’s Python API. It illustrates the basic, key functionality of the API to give you a running start working with data in the platform.

The very first step is to import all the required packages. Below is a list of basic imports that you can copy and paste into your own notebooks to get going. The external python libraries (matplotlib, numpy, etc.) have wonderful, extensive documentation that you should look up if you want to explore all their functionalities or just unstick yourself. The %matplotlib inline command ensures that plots will be rendered in the notebook itself, rather than in a pop-up window.

[1]:
# PredictiveGrid imports
import btrdb # Platform Python bindings
from btrdb.utils import timez # helpful sub-package for handling time
from btrdb.stream import StreamSet # subpackage with light wrapper for working with multiple streams

# External Python libraries
import numpy as np # scientific computing package
import matplotlib.pyplot as plt # plotting package
import pandas as pd # data analysis library
from tabulate import tabulate # creating & printing neat tables
from datetime import datetime, timedelta # for working with time

%matplotlib inline

Next, we must connect to the database. The connect function optionally accepts endpoint and apikey string arguments, which can be passed like:

btrdb.connect(endpoint_string, apikey=key_string)

When running a notebook on JupyterHub, these do not need to be passed.

[2]:
conn = btrdb.connect()
conn.info()
[2]:
{'majorVersion': 5, 'build': '5.12.5', 'proxy': {'proxyEndpoints': []}}

Find Collections#

The streams in the database are organized into collections, which can be thought of as heirarchical paths such as CALIFORNIA/SanFrancisco/91405/Sensor1 but are internally just strings. A single collection, defined by a path-like name, can contain any number of individual streams. It is best to name collections and organize streams into them in some logically consistent and descriptive way to facilitate searching (think about creating a file heirarchy to organize data files and mirror this in the collection name).

Let us query all and print out some of the collections in this cluster. We will print out the individual collection names to reflect the implicit heirarchical data organization they encode.

[41]:
collections = conn.list_collections()
print('Found', len(collections), 'collections')

collections.sort()

# We limit the number of collections so we don't print too many
for i, c in enumerate(collections[0:3]):
    levels = c.split('/')
    for j, l in enumerate(levels):
        if i == 0:
            pass
        elif l in collections[i-1]:
            continue
        print(j*' ','->', l)
Found 486 collections
 -> Health
  -> EKG
   -> patient001
 -> POW
  -> EPFL
  -> GridSweep

We can also limit the search to collections with a certain prefix (you may want to change the prefix below to obtain interesting results on your particular allocation).

[4]:
prefix = 'sunshine/PMU1'
collections = conn.list_collections(prefix)

print('Found', len(collections), 'collections')
print(collections)
Found 1 collections
['sunshine/PMU1']

Find Streams#

Now that we have a collection, the next step is to find individual streams (though we could have done this directly, if we know what we want). As you may recall, each stream represents a single time series within the database, which contains the atomic, (time, value) pairs, and has some associated metadata.

To get the streams, we will use the streams_in_collection method. This takes an optional collection prefix argument. If we pass in no argument, we will get all streams in the database. Let us work with the collection we found previously. Note that the method returns an iterator, which we convert to a list below.

[5]:
collection = collections[0]
streams = list(conn.streams_in_collection(collection))
print('Found', len(streams), 'streams in collection', collection)
Found 13 streams in collection sunshine/PMU1

Recall that streams have metadata, which can help us understand what the stream is. The convenience function below will obtain and pretilly print a set of streams and their metadata.

[6]:
def describe_streams(streams):
    table = [["Collection", "Name", "Units", "Version", "UUID"]]
    for stream in streams:
        # Get all the tags for this stream.
        tags = stream.tags()
        table.append([
            stream.collection, stream.name, tags["unit"], stream.version(), stream.uuid
        ])
    return tabulate(table, headers="firstrow")

print(describe_streams(streams))
Collection     Name    Units      Version  UUID
-------------  ------  -------  ---------  ------------------------------------
sunshine/PMU1  LSTATE  mask        243640  6ffb2e7e-273c-4963-9143-b416923980b0
sunshine/PMU1  C1ANG   deg         240607  d625793b-721f-46e2-8b8c-18f882366eeb
sunshine/PMU1  C3MAG   amps        240481  fb61e4d1-3e17-48ee-bdf3-43c54b03d7c8
sunshine/PMU1  C2MAG   amps        240718  d765f128-4c00-4226-bacf-0de8ebb090b5
sunshine/PMU1  C1MAG   amps        240380  1187af71-2d54-49d4-9027-bae5d23c4bda
sunshine/PMU1  C3ANG   deg         240781  0be8a8f4-3b45-4fe3-b77c-1cbdadb92039
sunshine/PMU1  L3ANG   deg         240862  e4efd9f6-9932-49b6-9799-90815507aed0
sunshine/PMU1  L2ANG   deg         240662  886203ca-d3e8-4fca-90cc-c88dfd0283d4
sunshine/PMU1  L3MAG   volts       229263  b2936212-253e-488a-87f6-a9927042031f
sunshine/PMU1  L1ANG   deg         229265  51840b07-297a-42e5-a73a-290c0a47bddb
sunshine/PMU1  C2ANG   deg         229263  97de3802-d38d-403c-96af-d23b874b5e95
sunshine/PMU1  L1MAG   volts       229266  35bdb8dc-bf18-4523-85ca-8ebe384bd9b5
sunshine/PMU1  L2MAG   volts       229264  d4cfa9a6-e11a-4370-9eda-16e80773ce8c

We can also search for streams in terms of their metadata tags, by passing an optional tag arguement to the same streams_in_collection method. Let us get all the streams with units of volts.

[7]:
streams = conn.streams_in_collection(collection, tags={"unit": "volts"})
print(describe_streams(streams))
Collection     Name    Units      Version  UUID
-------------  ------  -------  ---------  ------------------------------------
sunshine/PMU1  L3MAG   volts       229263  b2936212-253e-488a-87f6-a9927042031f
sunshine/PMU1  L1MAG   volts       229266  35bdb8dc-bf18-4523-85ca-8ebe384bd9b5
sunshine/PMU1  L2MAG   volts       229264  d4cfa9a6-e11a-4370-9eda-16e80773ce8c

The API also supports SQL queries to find streams and metadata, which you can learn more about in this notebook.

Work with a stream#

Let us work with the first stream in the above list of streams. We will look at its metadata and learn how to query time series data from the stream.

[8]:
stream = streams[0]
print(stream)
<Stream collection=sunshine/PMU1 name=L3MAG>

Let’s checkout the stream’s metadata. Note that stream.tags() returns a dictionary of tags, while stream.annotations returns a tuple, where the first element is the annotation version and the second is the dictionary of annotations.

[9]:
print('COLLECTION:', stream.collection)

print('\nTAGS:\n', stream.tags().keys())
print('\nANNOTATIONS:\n', stream.annotations()[0].keys(),)
COLLECTION: sunshine/PMU1

TAGS:
 dict_keys(['unit', 'ingress', 'distiller', 'name'])

ANNOTATIONS:
 dict_keys(['location', 'impedance'])

Data Types from Time Series Queries#

Before we move on, let’s refresh some concepts on BTrDB data types.

There are two data types (represented by Python objects) that we may obtain when querying time series data from a stream. These are:

  1. RawPoints — RawPoints represent the atomic data type in a time series: a paired timestamp and value that is the raw data of the series. Recall that these are stored at the absolute bottom of the BTrDB tree.

  2. StatPoints — These are stored in the internal nodes of the BTrDB tree and contain precomputed statistical aggregates of the raw data that lies beneath them.

Querying RawPoints will be slower than querying StatPoints. Between StatPoints, it is faster to query those that reside higher up in the tree, corresponding to longer durations of raw data and correspondingly larger numbers of RawPoints.

Data Time Range#

There are a couple useful methods for understanding the time range covered by a stream. We can do this by getting the earliest and latest RawPoints in the stream, as shown below.

A few things: queries for time series Points always return the Point(s) requested along with a *version number*. For most introductory users, the version is not useful, though you can read more about it in our docs. Below, we discard the version and just work with the returned RawPoint.

Time stamps in BTrDB are integers which represent nanoseconds since the Unix epoch (January 1, 1970). These can be easily converted to more readable time stamps using utility functions in the timez package (read the docs here).

[10]:
# Get the earliest & latest RawPoints in the stream
earliest, version = stream.earliest()
latest, version = stream.latest()

# Get the time from the RawPoints
earliest_time = earliest.time
latest_time = latest.time

print('Stream starts at: ', timez.ns_to_datetime(earliest_time))
print('Stream ends at: ', timez.ns_to_datetime(latest_time))
Stream starts at:  2015-10-01 16:08:24.008333+00:00
Stream ends at:  2017-04-15 01:41:35.999999+00:00

Querying Data#

There are three different methods to query data from a stream. These are documented & described below (in order of increasing query speed). 1. stream.values(start, end): This call will return all RawPoints in the stream between the start and end time. 2. stream.windows(start, end, width, depth=0): This call will return StatPoints spanning (and summarizing) windows of length width between the start and end times. The widtharguement is an integer specifying the window length in nanoseconds. The optional integer depth parameter trades off the precision of the window width for efficiency. It is specified as a power of \(2\) in nanoseconds (so depth=30 would make the windows accurate to roughly one second). 3. stream.aligned_windows(start, end, pointwidth): Like stream.windows, this call will return StatPoints corresponding to windows of a given width between the start and end times. However, this time, the width is specified as an integer power of \(2\) in nanoseconds in the pointwidth argument.

To reiterate, values returns RawPoints, while windows and aligned_windows return StatPoints.

All these queries return a list of tuples, where each tuples contains a Point (RawPoint or StatPoint) and a version number. Further, as mentioned, the time stamps in the Points are in the inconvenient nanoseconds since the Unix epoch format.

Overall, we need a function to convert this returned data into a more conducive format. We choose to convert the RawPoints to a pandas Series, and the StatPoints to a pandas DataFrame, which are easy to work with. The functions to do this are defined below (these can be easily modified to produce a numpy.array if desired).

[11]:
def rawpoints_to_series(rawpts_response):
    times, values = [], []
    for rawpoint, version in rawpts_response:
        times.append(timez.ns_to_datetime(rawpoint.time))
        values.append(rawpoint.value)
    return pd.Series(index=times, data=values)

def statpoints_to_dataframe(statpoints):
    attributes = ['min','mean','max','stddev','count','time',]

    df = pd.DataFrame([[getattr(p, attr) for attr in attributes] for p, version in statpoints],
                     columns=attributes)

    df['datetime'] = [timez.ns_to_datetime(t) for t in df['time']]
    return df.set_index('datetime')

stream.values#

Let us choose a start and end time for a values query. CAUTION: Most streams contain very high frequency data, so don’t use too long a duration when querying raw data.

[12]:
start = datetime(2015, 10, 10)
end = start + timedelta(seconds=5)
[13]:
# Query the data
rawpts_response = stream.values(start, end)
# Convert it to a pandas Series
data = rawpoints_to_series(rawpts_response)

It is very easy to plot the data in a pandas series. Just call plot()! Here we pass a few aesthetic arguments to the call.

[14]:
data.plot(linestyle='--', marker='o', figsize=(15, 5));
../_images/tutorials_quickstart_29_0.png

stream.windows#

Let us choose a longer time range and a window length for a windows query, which will return StatPoints. Remeber that the window width must be specified as an integer number of nanoseconds.

[15]:
start = datetime(2015, 10, 10)
end = start + timedelta(hours=24)
width = timedelta(minutes=1)
width_ns = int(1e9*width.seconds)
[16]:
# Query the data
statpts_response = stream.windows(start, end, width_ns)
# Convert it to a pandas Series
data = statpoints_to_dataframe(statpts_response)

You can think of a pandas DataFrame as an excel file. In this case, the rows are timestamps (coressponding to the windows) and the columns contain the different aggregates.

We can visualize the dataframe structure and entries with the head() call.

[17]:
data.head()
[17]:
min mean max stddev count time
datetime
2015-10-10 00:00:00+00:00 7059.082520 7070.945893 7077.476562 2.470590 7200 1444435200000000000
2015-10-10 00:01:00+00:00 7043.672852 7058.894961 7069.110840 3.957179 7200 1444435260000000000
2015-10-10 00:02:00+00:00 7034.760254 7053.560129 7060.030762 4.170064 7200 1444435320000000000
2015-10-10 00:03:00+00:00 7041.854004 7053.804716 7058.428223 2.277033 7200 1444435380000000000
2015-10-10 00:04:00+00:00 7040.025879 7055.781075 7059.692383 1.901117 7200 1444435440000000000

Let us plot the resulting data. We can call .plot() on the DataFrame, as we did earlier, but instead, we will manually plot only some of the columns, for easier and greater control of the visualization.

[18]:
plt.figure(figsize=(15, 5))
# Plot the mean of the windows
plt.plot(data.index, data['mean'], linewidth=2, color='red', linestyle='--', marker='o')
# Show the range of the minimum and maximum over the windows
plt.fill_between(data.index, data['min'], data['max'], color='lightgrey')
[18]:
<matplotlib.collections.PolyCollection at 0x7f981c0be080>
../_images/tutorials_quickstart_36_1.png

stream.aligned_windows#

Let us use the same start and end times in an aligned_windows query. Now, however, the window width must be an integer power of \(2\) nanoseconds, termed a pointwidth. The following function will convert an arbitrary timedelta into the nearest pointwidth, and return the error (in seconds) between the input and returned widths.

[19]:
def to_nearest_pointwidth(dt):
    dt_ns = 1e9*dt.seconds
    prevpw = int(np.floor(np.log2(dt_ns)))
    nextpw = int(np.ceil(np.log2(dt_ns)))

    preverr = (dt_ns-(2**prevpw)) / 1e9
    nexterr = ((2**nextpw)-dt_ns) / 1e9

    if preverr <= nexterr:
        return prevpw, preverr
    else:
        return nextpw, nexterr

print(to_nearest_pointwidth(timedelta(seconds=1)))
(30, 0.073741824)
[20]:
start = datetime(2015, 10, 10)
end = start + timedelta(hours=24)
width = timedelta(minutes=1)
pw, err = to_nearest_pointwidth(width)

print('pointwidth', pw, 'with error', err, 'seconds')
pointwidth 36 with error 8.719476736 seconds
[21]:
# Query the data
statpts_response = stream.aligned_windows(start, end, pw)
# Convert it to a pandas Series
data = statpoints_to_dataframe(statpts_response)
[22]:
plt.figure(figsize=(15, 5))
# Plot the mean of the windows
plt.plot(data.index, data['mean'], linewidth=2, color='red', linestyle='--', marker='o')
# Show the range of the minimum and maximum over the windows
plt.fill_between(data.index, data['min'], data['max'], color='lightgrey')
[22]:
<matplotlib.collections.PolyCollection at 0x7f981c144f60>
../_images/tutorials_quickstart_41_1.png

What if we specify a longer window?

[23]:
start = datetime(2015, 10, 10)
end = start + timedelta(hours=24)
width = timedelta(minutes=10)
pw, err = to_nearest_pointwidth(width)

print('pointwidth', pw, 'with error', err, 'seconds')

# Query the data
statpts_response = stream.aligned_windows(start, end, pw)
# Convert it to a pandas Series
data = statpoints_to_dataframe(statpts_response)

plt.figure(figsize=(15, 5))
# Plot the mean of the windows
plt.plot(data.index, data['mean'], linewidth=2, color='red', linestyle='--', marker='o')
# Show the range of the minimum and maximum over the windows
plt.fill_between(data.index, data['min'], data['max'], color='lightgrey');
pointwidth 39 with error 50.244186112 seconds
../_images/tutorials_quickstart_43_1.png

Work with multiple streams using streamsets#

We often want to query data from a bunch of streams. Of course, we could do this manually by iterating through the individual streams and using the methods described. However, we also have the option of using a `StreamSet <https://btrdb.readthedocs.io/en/latest/working/streamsets.html>`__, which is a light wrapper around a list of streams.

The following cells show a few examples for working with StreamSets. For more, see this notebook. Notice that when querying StreamSets, version numbers are no longer returned, as they were for queries on single streams.

To begin, we use the same collection as before, and get a list of streams, which we then convert to a StreamSet.

[24]:
streams = conn.streams_in_collection(collection, tags={"unit": "volts"})
streamset = StreamSet(streams)

We can now do some queries on all streams at once. For example…

[25]:
earliests = streamset.earliest() # Returns a list of RawPoints, one for each stream
latests = streamset.latest() # Returns a list of RawPoints, one for each stream

for i in range(len(streamset)):
    # Get the times for this stream
    earliest_time = earliests[i].time
    latest_time = latests[i].time
    print('Stream', streamset[i].name)
    print('starts at: ', timez.ns_to_datetime(earliest_time))
    print('ends at: ', timez.ns_to_datetime(latest_time))
    print('--------------------')
Stream L3MAG
starts at:  2015-10-01 16:08:24.008333+00:00
ends at:  2017-04-15 01:41:35.999999+00:00
--------------------
Stream L1MAG
starts at:  2015-10-01 16:08:24.008333+00:00
ends at:  2017-04-15 01:37:11.333333+00:00
--------------------
Stream L2MAG
starts at:  2015-10-01 16:08:24.008333+00:00
ends at:  2017-04-15 01:41:35.999999+00:00
--------------------

Let us make a values (ie raw data) query on this StreamSet. Notice the difference in the query form: we use a function chaining approach to first set the start and end times. Then, the values are requested.

[26]:
start = datetime(2015, 10, 10)
end = start + timedelta(seconds=5)
data = streamset.filter(start, end).values()

The result data, is a list of lists, each containing the RawPoints for one of the streams in the StreamSet.

To make the data more workable, we can instead query the result as a pandas DataFrame, as shown below:

[27]:
data = streamset.filter(start, end).to_dataframe()
[28]:
data.head()
[28]:
sunshine/PMU1/L3MAG sunshine/PMU1/L1MAG sunshine/PMU1/L2MAG
time
1444435200008333000 7068.010742 7161.215332 7076.146484
1444435200016666000 7068.098633 7161.130371 7076.227539
1444435200024999000 7068.275879 7161.411133 7076.442383
1444435200033333000 7068.182617 7161.659668 7076.458984
1444435200041666000 7067.982422 7161.583008 7076.268066

The timestamps still need to be converted, which we do below:

[31]:
def to_human_time(time_ns):
    time_human = []
    for time in time_ns:
        time_human.append(timez.ns_to_datetime(time))
    return time_human

data.index = to_human_time(data.index)
[32]:
data.head()
[32]:
sunshine/PMU1/L3MAG sunshine/PMU1/L1MAG sunshine/PMU1/L2MAG
2015-10-10 00:00:00.008333+00:00 7068.010742 7161.215332 7076.146484
2015-10-10 00:00:00.016666+00:00 7068.098633 7161.130371 7076.227539
2015-10-10 00:00:00.024999+00:00 7068.275879 7161.411133 7076.442383
2015-10-10 00:00:00.033333+00:00 7068.182617 7161.659668 7076.458984
2015-10-10 00:00:00.041666+00:00 7067.982422 7161.583008 7076.268066

And we can plot!

[36]:
data.plot(figsize=(15, 5), linewidth=3)
[36]:
<AxesSubplot:>
../_images/tutorials_quickstart_57_1.png

And that is it! You’ve had a basic introduction to working with PredictiveGrid in Python. You are ready to start working with your data and conducting some interesting analyses.

As you go, you will want to learn about and use more advanced platform capabilities. Make sure to look at the notebooks here containing both tutorials and analytics demos. You can also read the blog, which shares user stories, capabilities, etc. Also, read the docs!

THE END#

[ ]: