8 - Windows, aligned windows, and values#

The term “big data” refers to data that is so large, fast or complex that it’s difficult or impossible to process using traditional methods.

This tutorial offers a guide on using non-traditional methods in PredictiveGrid to work with big time series data sets.

For high resolution streams a simple query such as “Give me all of the data in this stream” can return a volume of data large enough to overload any computing environment. This tutorial describes options for interacting with data in various ways to enable interactions with very large volumes of data.

We’ll describe three methods for querying data in PredictiveGrid. In practice none of these is better or worse; there is a time and a place for each. This post will explore when each is appropriate to use.

Functions used#

  • stream.values()

  • stream.windows()

  • stream.aligned_windows()

[1]:
import btrdb
import pandas as pd
import numpy as np
from btrdb.utils.timez import *
from datetime import datetime, timedelta

from matplotlib import pyplot as plt
[2]:
db = btrdb.connect()

Querying data#

To illustrate what’s meant by BIG DATA, let’s investigate the very simple task of querying data from a single stream.

If you ask for all of the data in a stream, what will that yield?

[3]:
streams = db.streams_in_collection('sunshine/PMU1', tags={'name': 'L1MAG'})
stream = streams[0]
print('collection:\t', stream.collection)
print('stream name:\t', stream.name)

# How many points is that?
print('size:\t\t', round(stream.count()/1e9,2), 'billion points')
print('volume:\t\t', round(stream.count()*64*2/8/1e9,2), 'gigabytes')
collection:      sunshine/PMU1
stream name:     L1MAG
size:            5.14 billion points
volume:          82.29 gigabytes

That’s a lot of data!#

Querying that much data will likely overload your computing environment and will likely take quite a long time to get the data back to you.

*Is there a better way?*

Windows Queries#

Windows queries provide statistical aggregates or “summary statistics” of raw data points in a given time interval. A windows query will return a time series of StatPoint objects, which can be used to explore summary statistics of raw values over time.

New to StatPoints? Start with the tutorial below.

https://github.com/PingThingsIO/ni4ai-notebooks/blob/main/tutorials/5%20-%20Working%20with%20StatPoints.ipynb

[4]:
t0 = currently_as_ns()

start, _ = stream.earliest()
start = ns_to_datetime(start.time)

end, _ = stream.latest()
end = ns_to_datetime(end.time)

window = ns_delta(days=5)

start_time = datetime(start.year, start.month, start.day+1)
statpoints = stream.windows(start_time, end, window)
print('Runtime: %.2f seconds'%((currently_as_ns()-t0)/1e9))
Runtime: 0.24 seconds
[5]:
def statpoints_to_dataframe(statpoints, datetime_index=True):
    attributes = ['min','mean','max','stddev','count','time',]

    df = pd.DataFrame([[getattr(p, attr) for attr in attributes] for p, _ in statpoints],
                     columns=attributes)

    if datetime_index:
        df['datetime'] = [ns_to_datetime(t) for t in df['time']]
        return df.set_index('datetime')
    else:
        return df

df = statpoints_to_dataframe(statpoints)
df.head()
[5]:
min mean max stddev count time
datetime
2015-10-02 00:00:00+00:00 116.784149 119.321719 121.366257 0.566179 51840000 1443744000000000000
2015-10-07 00:00:00+00:00 105.813171 4269.307051 7299.838379 3462.597022 51721157 1444176000000000000
2015-10-12 00:00:00+00:00 5.467874 7154.005115 7355.261230 46.374464 51816197 1444608000000000000
2015-10-17 00:00:00+00:00 6681.592773 7154.443182 7249.294922 35.945926 51840320 1445040000000000000
2015-10-22 00:00:00+00:00 6954.883789 7154.556317 7270.524902 38.951711 51840000 1445472000000000000

What just happened?#

The query stream.windows() scanned through 18 months [!!!] of data and returned a tuple of StatPoint objects.

Those 18 months are partitioned into 5-day time increments (as specified by the window parameter). Each StatPoint reports summary statistics of values observed during that time frame.

Note that pulling all 9+ billion raw point values for the same interval would have taken MUCH longer, and would have returned more data than would have been feasible to hold in RAM. Leveraging StatPoint objects makes it feasible to mine through long time intervals of data to look for trends or event signatures that warrant more detailed / granular analysis.

What happens if we zoom in?#

[6]:
t0 = currently_as_ns()

window = ns_delta(days=1)
statpoints = stream.windows(start, end, window)
print('Data Duration:',(end-start))
print('Aggregation window:', timedelta(seconds=int(window/1e9)))
print('Runtime: %.2f seconds'%((currently_as_ns()-t0)/1e9))

df = statpoints_to_dataframe(statpoints)
df.head()
Data Duration: 561 days, 9:28:47.325000
Aggregation window: 1 day, 0:00:00
Runtime: 1.39 seconds
[6]:
min mean max stddev count time
datetime
2015-10-01 16:08:24.008333+00:00 117.231087 119.139902 121.366257 0.759523 10287287 1443715704008333056
2015-10-02 16:08:24.008333+00:00 117.449776 119.251162 121.302170 0.569169 10368000 1443802104008333056
2015-10-03 16:08:24.008333+00:00 116.784149 119.547277 120.763474 0.502983 10368000 1443888504008333056
2015-10-04 16:08:24.008333+00:00 117.354645 119.350104 120.760208 0.495989 10368000 1443974904008333056
2015-10-05 16:08:24.008333+00:00 117.235992 119.055069 120.682983 0.525771 10368000 1444061304008333056

Aligned windows#

Aligned windows return results that look very much like windows queries. The only differece is that time stamps are adjusted to align with time windows stored inherently in the database. Where windows queries may need to re-compute statistical aggregates over the time window requested, aligned_windows queries can leverage pre-computed values.

Let’s look at the difference in performance.

[7]:
window = ns_delta(days=1)
pw = np.log2(window)

t0 = currently_as_ns()
statpoints = stream.aligned_windows(start, end, pointwidth=pw)
print('Data Duration:',(end-start))
print('Aggregation window:', timedelta(seconds=int(window/1e9)))
print('Runtime: %.2f seconds'%((currently_as_ns()-t0)/1e9))

df = statpoints_to_dataframe(statpoints)
df.head()
Data Duration: 561 days, 9:28:47.325000
Aggregation window: 1 day, 0:00:00
Runtime: 0.01 seconds
[7]:
min mean max stddev count time
datetime
2015-10-01 07:39:15.548955+00:00 117.231087 118.795732 121.259659 0.750870 4697722 1443685155548954624
2015-10-02 03:12:04.293132+00:00 117.318283 119.183483 121.366257 0.681662 8444249 1443755524293132288
2015-10-02 22:44:53.037310+00:00 117.655342 119.438784 121.302170 0.441992 8444249 1443825893037309952
2015-10-03 18:17:41.781488+00:00 116.784149 119.591669 120.763474 0.531194 8444250 1443896261781487616
2015-10-04 13:50:30.525665+00:00 117.887520 119.463267 120.760208 0.432865 8444249 1443966630525665280

That’s much faster! The only thing to note is that the time increment in an aligned_windows query is rounded to the nearest time increment that matches the inherent structure of the database. This means your start time, end time, and window may be modified slightly to optimize performance.

[8]:
print('WINDOW DURATION')
print('\tAs specified:', timedelta(seconds=int(window/1e9)))
print('\tAs returned:', btrdb.utils.general.pointwidth(pw))


print('\n\nSTART TIME')
print('\tAs specified:', start)
print('\tAs returned:', df.index.min())


print('\n\nEND TIME')
print('\tAs specified:', end)
print('\tAs returned:', df.index.max()+timedelta(seconds=int(window/1e9)))
WINDOW DURATION
        As specified: 1 day, 0:00:00
        As returned: 19.55 hours


START TIME
        As specified: 2015-10-01 16:08:24.008333+00:00
        As returned: 2015-10-01 07:39:15.548955+00:00


END TIME
        As specified: 2017-04-15 01:37:11.333333+00:00
        As returned: 2017-04-14 15:54:11.543187+00:00

Getting more granular with aligned_windows#

Performance on aligned_windows queries is much faster, and will enable you to query data more quickly and at finer resolutions that you’ll be able to do using windows.

[9]:
window = ns_delta(hours=6)
pw = np.log2(window)

t0 = currently_as_ns()
statpoints = stream.aligned_windows(start, end, pointwidth=pw)

print('Data Duration:',(end-start))
print('Aggregation window:', btrdb.utils.general.pointwidth(pw))
print('Runtime: %.2f seconds'%((currently_as_ns()-t0)/1e9))
Data Duration: 561 days, 9:28:47.325000
Aggregation window: 4.89 hours
Runtime: 0.02 seconds
[10]:
window = ns_delta(minutes=30)
pw = np.log2(window)

t0 = currently_as_ns()
statpoints = stream.aligned_windows(start, end, pointwidth=pw)

print('Data Duration:',(end-start))
print('Aggregation window:', btrdb.utils.general.pointwidth(pw))
print('Runtime: %.2f seconds'%((currently_as_ns()-t0)/1e9))
Data Duration: 561 days, 9:28:47.325000
Aggregation window: 18.33 minutes
Runtime: 2.13 seconds
[11]:
window = ns_delta(minutes=5)
pw = np.log2(window)

t0 = currently_as_ns()
statpoints = stream.aligned_windows(start, end, pointwidth=pw)

print('Data Duration:',(end-start))
print('Aggregation window:', btrdb.utils.general.pointwidth(pw))
print('Runtime: %.2f seconds'%((currently_as_ns()-t0)/1e9))
Data Duration: 561 days, 9:28:47.325000
Aggregation window: 4.58 minutes
Runtime: 1.51 seconds

That last query took a while! Let’s make note that querying 1.5 years of data at 1-minute resolution is starting to push the limits of what our environment (or patience!) can handle.

It is possible to speed that up, however, by using a larger computing environment.

When to use values#

Many analytics can be done using StatPoints to summarize steady state characteristics of the data at the time-scale that is of interest, or to identify intervals in the data where there is an “event” in the data.

Here, we’ll simply explore at what point values queries become intractable to perform.

[12]:
window = ns_delta(minutes=1)
start_time = datetime_to_ns(start)
end_time = start_time + window

t0 = currently_as_ns()
statpoints = stream.values(start_time, end_time)
print('Runtime: %.2f seconds'%((currently_as_ns()-t0)/1e9))
Runtime: 0.06 seconds
[13]:
window = ns_delta(minutes=10)
start_time = datetime_to_ns(start)
end_time = start_time + window

t0 = currently_as_ns()
statpoints = stream.values(start_time, end_time)
print('Runtime: %.2f seconds'%((currently_as_ns()-t0)/1e9))
Runtime: 0.17 seconds
[14]:
window = ns_delta(hours=1)
start_time = datetime_to_ns(start)
end_time = start_time + window

t0 = currently_as_ns()
statpoints = stream.values(start_time, end_time)
print('Runtime: %.2f seconds'%((currently_as_ns()-t0)/1e9))
Runtime: 0.94 seconds
[15]:
window = ns_delta(hours=3)
start_time = datetime_to_ns(start)
end_time = start_time + window

t0 = currently_as_ns()
statpoints = stream.values(start_time, end_time)
print('Runtime: %.2f seconds'%((currently_as_ns()-t0)/1e9))
Runtime: 2.64 seconds

Final note…#

When running values queries, be sure to check how much working memory you have available in your jupyterhub instance. Bringing large amounts of data into memory can easily cause your environment to crash! You may need to shut down and move to a larger instance.

aligned_windows queries in action#

Here are some examples where we use statpoints to hone in on time intervals that are known (or likely) to be of interest for a given analytic: - Voltage sags: https://github.com/PingThingsIO/ni4ai-notebooks/blob/main/demo/Voltage%20Sag%20Exploration.ipynb - Tap changes: https://github.com/PingThingsIO/ni4ai-notebooks/blob/main/demo/Voltage%20Change%20Detection.ipynb

values queries in action#

Here are examples where we use values queries to examine events that warrant full-resolution queries: - Spectral analysis: https://github.com/PingThingsIO/ni4ai-notebooks/blob/main/demo/PV_spectrogram.ipynb - Phase angle differencing: https://github.com/PingThingsIO/ni4ai-notebooks/blob/main/demo/Phase%20Angle%20Monitoring.ipynb