10 - Data Ingestion#

Inserting data into btrdb involves five core steps: 1. Data loading 2. Data manipulation 3. Stream creation 4. Inserting values 5. Updating metadata

This tutorial walks through each of these steps using a USGS dataset measuring the Earth’s geomagnetic field. We’ll use the USGS webservice to request and load the data, and then convert it into a format that can be inserted into the database.

[1]:
import btrdb
import urllib
import json
[2]:
db = btrdb.connect()

Data loading#

[3]:
url = 'https://geomag.usgs.gov/ws/data/?id=USGS&format=json&sampling_period=1&starttime=2021-10-25T00:26:54.177Z&endtime=2021-10-25T12:26:54.177Z'
data_str = urllib.request.urlopen(url).read()
data_dict = json.loads(data_str)
[4]:
metadata = data_dict['metadata']
timestamps = data_dict['times']
stream_data = data_dict['values']

Data manipulation#

Generating a UUID for each stream#

Here, we’ll use the uuid python library to generate a new uuid for each of the streams we want to create.

[5]:
import uuid

stream_uuids = [uuid.uuid4() for ix in range(len(stream_data))]

Define information about each stream (i.e., collection, annotations and tags)#

[6]:
collection = 'zz/insertion/example/usgs'

stream_names = []
annotations_dict = {}
tags_dict = {}
for stream in stream_data:
    name = stream['id']
    stream_names.append(name)
    annotations_dict[name] = {**metadata, **stream['metadata']}
    tags_dict[name] = {'name': name,'unit': 'nanoteslas'}

Convert time stamps to nanoseconds#

[7]:
from btrdb.utils.timez import datetime_to_ns
from datetime import datetime
[8]:
to_datetime = lambda t: datetime.strptime(t, '%Y-%m-%dT%H:%M:%S.%fZ')
ns_times = [datetime_to_ns(to_datetime(t)) for t in timestamps]

Stream creation#

[9]:
stream_objects = []
for i, name in enumerate(stream_names):
    try:
        # To create a stream, we must specify various parameters
        stream = db.create(stream_uuids[i], # UUID
                           collection, # collection
                           tags=tags_dict[name], # tags
                           annotations=annotations_dict[name] # annotations
                          )

        stream_objects.append(stream)

    # If a stream already exists with the UUID, trying to create it again will raise an error
    except Exception as e:
        if str(e) == 'a stream already exists with uuid %s'%(stream_uuids[i]):
            # Here we'll query the database to retrieve the stream
            #    so we can then insert data into it
            stream = db.stream_from_uuid(stream_uuids[i])
            stream_objects.append(stream)
        else:
            assert False, e

Inserting values#

Each point in the database is a time-value pair. Just as database queries return a list of tuples describing the [(time1, value1), (time2, value2), ...], data insertions follow the same structure.

When inserting data, you can specify one of several merge policies. Valid merge policies include - ’never’: the default, no points are merged - ’equal’: points are deduplicated if the time and value are equal - ’retain’: if two points have the same timestamp, the old one is kept - ’replace’: if two points have the same timestamp, the new one is kept

[10]:
for stream, data in zip(stream_objects, stream_data):
    points = list(zip(ns_times, data['values']))
    stream.insert(points, merge='replace')

Updating metadata#

It is often the case metadata needs to be added, updated, or further refined after a stream has already been created. This can be done using stream.update().

Here, we’ll go through the process of adding a url to indicate where the data originated from. Note that there is a ‘url’ field in the raw metadata we had inserted previously, but the value was set to ‘null’.

[11]:
streams = db.streams_in_collection(collection)
annotations, _ = streams[0].annotations()

annotations
[11]:
{'url': 'null',
 'channel': 'X',
 'station': 'USGS',
 'element': 'X',
 'network': 'NT',
 'status': '200',
 'location': 'A0',
 'generated': '2021-12-10T21:39:06Z',
 'intermagnet': '{"imo": {"iaga_code": "USGS", "name": "USGS", "coordinates": [254.764, 40.137, 1682.0]}, "reported_orientation": "XYZF", "sensor_orientation": "HDZF", "data_type": "adjusted", "sampling_period": 1, "digital_sampling_rate": 0.01}'}
[12]:
for stream in streams:
    new_annotations = {'url': url}
    stream.update(annotations=new_annotations, replace=False)

Here we can query the stream’s annotations again to confirm that the url has been stored.

[13]:
stream.annotations()[0]['url']
[13]:
'https://geomag.usgs.gov/ws/data/?id=USGS&format=json&sampling_period=1&starttime=2021-10-25T00:26:54.177Z&endtime=2021-10-25T12:26:54.177Z'

A note on the keyword replace#

Setting replace=False above allowed us to insert/update a single metadata field, without touching metadata fields that were not in the new_annotations dictionary.

If the intended behavior is to completely overhaul the metadata that was previously in place, you can do so by setting replace=True.

Here, we’ll go through an example that creates separate metadata fields for each fo the parameters stored in the intermagnet metadata field (see example above).

[14]:
for stream in streams:
    old_annotations, _ = stream.annotations()
    intermagnet = old_annotations.pop('intermagnet')
    imo = intermagnet.pop('imo')
    new_annotations = {**old_annotations, **intermagnet, **imo}

    stream.update(annotations=new_annotations, replace=True)
[15]:
stream.annotations()
[15]:
({'location': 'A0',
  'reported_orientation': 'XYZF',
  'coordinates': [254.764, 40.137, 1682.0],
  'iaga_code': 'USGS',
  'channel': 'F',
  'url': 'https://geomag.usgs.gov/ws/data/?id=USGS&format=json&sampling_period=1&starttime=2021-10-25T00:26:54.177Z&endtime=2021-10-25T12:26:54.177Z',
  'station': 'USGS',
  'data_type': 'adjusted',
  'name': 'USGS',
  'element': 'F',
  'sensor_orientation': 'HDZF',
  'digital_sampling_rate': 0.01,
  'generated': '2021-12-10T21:39:06Z',
  'network': 'NT',
  'status': 200,
  'sampling_period': 1},
 3)

Starting over#

If you make a mistake, you can start with a clean state by obliterating the streams we just created.

[16]:
for stream in db.streams_in_collection(collection):
    stream.obliterate()
[ ]: