10 - Data Ingestion#
Inserting data into btrdb involves five core steps: 1. Data loading 2. Data manipulation 3. Stream creation 4. Inserting values 5. Updating metadata
This tutorial walks through each of these steps using a USGS dataset measuring the Earth’s geomagnetic field. We’ll use the USGS webservice to request and load the data, and then convert it into a format that can be inserted into the database.
[1]:
import btrdb
import urllib
import json
[2]:
db = btrdb.connect()
Data loading#
[3]:
url = 'https://geomag.usgs.gov/ws/data/?id=USGS&format=json&sampling_period=1&starttime=2021-10-25T00:26:54.177Z&endtime=2021-10-25T12:26:54.177Z'
data_str = urllib.request.urlopen(url).read()
data_dict = json.loads(data_str)
[4]:
metadata = data_dict['metadata']
timestamps = data_dict['times']
stream_data = data_dict['values']
Data manipulation#
Generating a UUID for each stream#
Here, we’ll use the uuid python library to generate a new uuid for each of the streams we want to create.
[5]:
import uuid
stream_uuids = [uuid.uuid4() for ix in range(len(stream_data))]
Convert time stamps to nanoseconds#
[7]:
from btrdb.utils.timez import datetime_to_ns
from datetime import datetime
[8]:
to_datetime = lambda t: datetime.strptime(t, '%Y-%m-%dT%H:%M:%S.%fZ')
ns_times = [datetime_to_ns(to_datetime(t)) for t in timestamps]
Stream creation#
[9]:
stream_objects = []
for i, name in enumerate(stream_names):
try:
# To create a stream, we must specify various parameters
stream = db.create(stream_uuids[i], # UUID
collection, # collection
tags=tags_dict[name], # tags
annotations=annotations_dict[name] # annotations
)
stream_objects.append(stream)
# If a stream already exists with the UUID, trying to create it again will raise an error
except Exception as e:
if str(e) == 'a stream already exists with uuid %s'%(stream_uuids[i]):
# Here we'll query the database to retrieve the stream
# so we can then insert data into it
stream = db.stream_from_uuid(stream_uuids[i])
stream_objects.append(stream)
else:
assert False, e
Inserting values#
Each point in the database is a time-value pair. Just as database queries return a list of tuples describing the [(time1, value1), (time2, value2), ...], data insertions follow the same structure.
When inserting data, you can specify one of several merge policies. Valid merge policies include - ’never’: the default, no points are merged - ’equal’: points are deduplicated if the time and value are equal - ’retain’: if two points have the same timestamp, the old one is kept - ’replace’: if two points have the same timestamp, the new one is kept
[10]:
for stream, data in zip(stream_objects, stream_data):
points = list(zip(ns_times, data['values']))
stream.insert(points, merge='replace')
Updating metadata#
It is often the case metadata needs to be added, updated, or further refined after a stream has already been created. This can be done using stream.update().
Here, we’ll go through the process of adding a url to indicate where the data originated from. Note that there is a ‘url’ field in the raw metadata we had inserted previously, but the value was set to ‘null’.
[11]:
streams = db.streams_in_collection(collection)
annotations, _ = streams[0].annotations()
annotations
[11]:
{'url': 'null',
'channel': 'X',
'station': 'USGS',
'element': 'X',
'network': 'NT',
'status': '200',
'location': 'A0',
'generated': '2021-12-10T21:39:06Z',
'intermagnet': '{"imo": {"iaga_code": "USGS", "name": "USGS", "coordinates": [254.764, 40.137, 1682.0]}, "reported_orientation": "XYZF", "sensor_orientation": "HDZF", "data_type": "adjusted", "sampling_period": 1, "digital_sampling_rate": 0.01}'}
[12]:
for stream in streams:
new_annotations = {'url': url}
stream.update(annotations=new_annotations, replace=False)
Here we can query the stream’s annotations again to confirm that the url has been stored.
[13]:
stream.annotations()[0]['url']
[13]:
'https://geomag.usgs.gov/ws/data/?id=USGS&format=json&sampling_period=1&starttime=2021-10-25T00:26:54.177Z&endtime=2021-10-25T12:26:54.177Z'
A note on the keyword replace#
Setting replace=False above allowed us to insert/update a single metadata field, without touching metadata fields that were not in the new_annotations dictionary.
If the intended behavior is to completely overhaul the metadata that was previously in place, you can do so by setting replace=True.
Here, we’ll go through an example that creates separate metadata fields for each fo the parameters stored in the intermagnet metadata field (see example above).
[14]:
for stream in streams:
old_annotations, _ = stream.annotations()
intermagnet = old_annotations.pop('intermagnet')
imo = intermagnet.pop('imo')
new_annotations = {**old_annotations, **intermagnet, **imo}
stream.update(annotations=new_annotations, replace=True)
[15]:
stream.annotations()
[15]:
({'location': 'A0',
'reported_orientation': 'XYZF',
'coordinates': [254.764, 40.137, 1682.0],
'iaga_code': 'USGS',
'channel': 'F',
'url': 'https://geomag.usgs.gov/ws/data/?id=USGS&format=json&sampling_period=1&starttime=2021-10-25T00:26:54.177Z&endtime=2021-10-25T12:26:54.177Z',
'station': 'USGS',
'data_type': 'adjusted',
'name': 'USGS',
'element': 'F',
'sensor_orientation': 'HDZF',
'digital_sampling_rate': 0.01,
'generated': '2021-12-10T21:39:06Z',
'network': 'NT',
'status': 200,
'sampling_period': 1},
3)
Starting over#
If you make a mistake, you can start with a clean state by obliterating the streams we just created.
[16]:
for stream in db.streams_in_collection(collection):
stream.obliterate()
[ ]: