# Copyright (c) 2023, TU Wien, Department of Geodesy and Geoinformation.
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the Vienna University of Technology,
# Department of Geodesy and Geoinformation nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL VIENNA UNIVERSITY OF TECHNOLOGY,
# DEPARTMENT OF GEODESY AND GEOINFORMATION BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Abstract class providing an interface for reading and writing time series
in NetCDF files using Climate Forecast Metadata Conventions
(http://cfconventions.org/).
"""
import os
import warnings
from abc import ABC, abstractmethod
import netCDF4
import numpy as np
import pandas as pd
from pygeobase.io_base import GriddedTsBase
from pygeogrids import CellGrid
from pynetcf.base import Dataset
from pynetcf.base import DatasetError
[docs]class DatasetTs(Dataset, ABC):
"""
Abstract class to store common methods for NetCDF time series such as
OrthoMulti-, ContiguousRaggedArray- and IndexedRaggedArray-representation.
Implemented according to the NetCDF CF-conventions 1.6.
Parameters
----------
filename : string
filename of netCDF file. If already exiting then it will be opened
as read only unless the append keyword is set. if the overwrite
keyword is set then the file will be overwritten
n_loc : int, optional
number of locations that this netCDF file contains time series for
only required for new file
loc_dim_name : string, optional
name of the location dimension
obs_dim_name : string, optional
name of the observations dimension
loc_ids_name : string, optional
name of variable that has the location id"s stored
loc_descr_name : string, optional
name of variable that has additional location information
stored
time_units : string, optional
units the time axis is given in.
Default: "days since 1900-01-01 00:00:00"
time_var : string, optional
name of time variable
Default: time
lat_var : string, optional
name of latitude variable
Default: lat
lon_var : string, optional
name of longitude variable
Default: lon
alt_var : string, optional
name of altitude variable
Default: alt
unlim_chunksize : int, optional
chunksize to use along unlimited dimensions, other chunksizes
will be calculated by the netCDF library
read_bulk : boolean, optional
if set to True the data of all locations is read into memory,
and subsequent calls to "read" read from the cache and not from disk
this makes reading complete files faster#
read_dates : boolean, optional
if false dates will not be read automatically but only on specific
request useable for bulk reading because currently the netCDF
num2date routine is very slow for big datasets
"""
def __init__(self,
filename,
n_loc=None,
loc_dim_name="locations",
obs_dim_name="time",
loc_ids_name="location_id",
loc_descr_name="location_description",
time_units="days since 1900-01-01 00:00:00",
time_var="time",
lat_var="lat",
lon_var="lon",
alt_var="alt",
unlim_chunksize=None,
read_bulk=False,
read_dates=True,
**kwargs):
super(DatasetTs, self).__init__(filename, **kwargs)
self.n_loc = n_loc
# dimension names
self.obs_dim_name = obs_dim_name
self.loc_dim_name = loc_dim_name
# location names
self.loc_ids_name = loc_ids_name
self.loc_descr_name = loc_descr_name
# time, time units and location
self.time_var = time_var
self.time_units = time_units
self.lat_var = lat_var
self.lon_var = lon_var
self.alt_var = alt_var
self.unlim_chunksize = unlim_chunksize
if unlim_chunksize is not None:
self.unlim_chunksize = [unlim_chunksize]
self.write_offset = None
# variable which lists the variables that should not be
# considered time series even if they have the correct dimension
self.not_timeseries = [self.time_var]
# location ids, to be read upon first reading operation
self.loc_ids_var = None
# date variables, for OrthogonalMulitTs it can be stored
# since it is the same for all variables in a file
self.constant_dates = True
self.dates = None
self.read_dates_auto = read_dates
if self.mode == "r":
self.read_bulk = read_bulk
else:
self.read_bulk = False
# cache location id during reading
self.prev_loc_id = None
# if read bulk is activated the arrays will be read into the
# local variables dict if it is not activated the data will be read
# from the netCDF variables
if not self.read_bulk:
self.variables = self.dataset.variables
else:
self.variables = {}
@abstractmethod
def _init_dimensions(self):
"""
Initializes the dimensions.
"""
pass
@abstractmethod
def _init_lookup(self):
"""
Initializes variables for the lookup between locations and entries in
the time series.
"""
pass
def _init_location_variables(self):
"""
Initialize location information: longitude, latitude and altitude.
"""
self.write_var(self.lon_var,
data=None,
dim=self.loc_dim_name,
attr={
"standard_name": "longitude",
"long_name": "location longitude",
"units": "degrees_east",
"valid_range": (-180.0, 180.0)
},
dtype=np.float32)
self.write_var(self.lat_var,
data=None,
dim=self.loc_dim_name,
attr={
"standard_name": "latitude",
"long_name": "location latitude",
"units": "degrees_north",
"valid_range": (-90.0, 90.0)
},
dtype=np.float32)
self.write_var(self.alt_var,
data=None,
dim=self.loc_dim_name,
attr={
"standard_name": "height",
"long_name": "vertical distance above the "
"surface",
"units": "m",
"positive": "up",
"axis": "Z"
},
dtype=np.float32)
def _init_location_id_and_time(self):
"""
Initialize the dimensions and variables that are the basis of
the format.
"""
# make variable that contains the location id
self.write_var(self.loc_ids_name,
data=None,
dim=self.loc_dim_name,
dtype=np.int64)
self.write_var(self.loc_descr_name,
data=None,
dim=self.loc_dim_name,
dtype=str)
# initialize time variable
self.write_var(self.time_var,
data=None,
dim=self.obs_dim_name,
attr={
"standard_name": "time",
"long_name": "time of measurement",
"units": self.time_units
},
dtype=np.float64,
chunksizes=self.unlim_chunksize)
def _read_loc_ids(self, force=False):
"""
Load location ids.
"""
if self.loc_ids_var is None or force == True:
loc_ids_var = self.dataset.variables[self.loc_ids_name][:]
self.loc_ids_var = np.ma.masked_array(loc_ids_var)
def _find_free_index_pos(self):
"""
If the index is not yet filled completely this function
gets the id of the first free position.
This function depends on the masked array being used if no
data is yet in the file.
Returns
-------
idx : int
First free index position.
Raises
------
IOError
If no free index is found.
"""
self._read_loc_ids()
masked = np.where(np.atleast_1d(self.loc_ids_var.mask))[0]
# all indexes already filled
if len(masked) == 0:
if self.dataset.dimensions[self.loc_dim_name].isunlimited():
idx = self.loc_ids_var.size
else:
raise IOError("No free index available")
else:
idx = np.min(masked)
return idx
def _get_loc_id_index(self, loc_id):
"""
Gets index of location id in location ids variable.
Parameters
----------
loc_id : int or numpy.ndarray
Location id.
Returns
-------
loc_id_index : int
Location id index.
"""
self._read_loc_ids()
loc_id = np.atleast_1d(loc_id)
# check if the location ids are all actually in the location id
# variable
in1d = np.in1d(loc_id, self.loc_ids_var.data, assume_unique=True)
if loc_id[in1d].size != loc_id.size:
raise IOError("Location not yet defined")
loc_ids_sorted = np.argsort(self.loc_ids_var.data)
ypos = np.searchsorted(self.loc_ids_var[loc_ids_sorted], loc_id)
try:
loc_id_index = loc_ids_sorted[ypos]
# loc_id_index = np.where(loc_id == self.loc_ids_var[:, None])[0]
except IndexError:
# Location not yet defined:
raise IOError("Location not yet defined")
if loc_id_index.size != loc_id.size:
raise IOError(
f"Index problem {loc_id_index.size} elements found for {loc_id.size} locations")
if loc_id.size == 1:
loc_id_index = loc_id_index[0]
return loc_id_index
def _add_location(self, loc_id, lon, lat, alt=None, loc_descr=None):
"""
add a new location to the dataset
Paramters
---------
loc_id : int or numpy.array
location id
lon : float or numpy.array
longitudes of location
lat : float or numpy.array
longitudes of location
alt : float or numpy.array
altitude of location
loc_descr : string or numpy.array
location description
"""
if type(loc_id) != np.ndarray:
loc_id = np.array([loc_id])
if type(lon) != np.ndarray:
lon = np.array([lon])
# netCDF library can not handle arrays of length 1 that contain only a
# None value
if lon.size == 1 and lon[0] is None:
lon = None
if type(lat) != np.ndarray:
lat = np.array([lat])
# netCDF library can not handle arrays of length 1 that contain only a
# None value
if lat.size == 1 and lat[0] is None:
lat = None
if alt is not None:
if type(alt) != np.ndarray:
alt = np.array([alt])
# netCDF library can not handle arrays of length 1 that contain
# onla a None value
if alt.size == 1 and alt[0] is None:
alt = None
# remove location id"s that are already in the file
locations = np.ma.compressed(
self.dataset.variables[self.loc_ids_name][:])
loc_count = len(locations)
if loc_count > 0:
loc_ids_new = np.invert(np.in1d(loc_id, locations))
if len(np.nonzero(loc_ids_new)[0]) == 0:
# no new locations to add
return None
else:
loc_ids_new = slice(None, None, None)
# find free index position for limited location dimension
idx = self._find_free_index_pos()
index = np.arange(len(loc_id[loc_ids_new])) + idx
self.dataset.variables[self.loc_ids_name][index] = loc_id[loc_ids_new]
if lon is not None:
self.dataset.variables[self.lon_var][index] = lon[loc_ids_new]
if lat is not None:
self.dataset.variables[self.lat_var][index] = lat[loc_ids_new]
if alt is not None:
self.dataset.variables[self.alt_var][index] = alt[loc_ids_new]
if loc_descr is not None:
if type(loc_descr) != np.ndarray:
loc_descr = np.array(loc_descr)
if len(index) == 1:
index = int(index[0])
loc_ids_new = 0
self.dataset.variables[self.loc_descr_name][index] = str(
loc_descr)
else:
self.dataset.variables[self.loc_descr_name][index] = loc_descr[
loc_ids_new].astype(object)
# update location ids variable after adding location
self._read_loc_ids(force=True)
return idx
def _get_all_ts_variables(self):
"""
Gets all variable names that have the self.obs_dim_name as only
dimension indicating that they are time series observations. This
does not include the self.time_var variable.
Returns
-------
variables : list
List of variable names.
"""
ts_var = []
for variable_name in self.dataset.variables:
if variable_name not in self.not_timeseries:
if self.obs_dim_name in \
self.dataset.variables[variable_name].dimensions:
ts_var.append(variable_name)
return ts_var
@abstractmethod
def _get_index_of_ts(self, loc_id):
"""
Get the index of a time series.
Parameters
----------
loc_id : int
Location id.
Returns
-------
loc_id_index : int
Location id index.
"""
pass
@abstractmethod
def _get_loc_ix_from_obs_ix(self, obs_ix):
"""
Get location index from observation index. In case of OrthoMultiTs
all measurements are taken at the same time and therefore all
location id"s are affected.
Parameters
----------
obs_ix : int
Observation index.
Returns
-------
loc_ix : int
Location index.
"""
pass
[docs] def read_time(self, loc_id):
"""
Read the time stamps for the given location id
in this case the location id is irrelevant since they
all have the same timestamps
"""
return self.dataset.variables[self.time_var][:]
[docs] def read_dates(self, loc_id):
"""
Read time stamps and convert them.
"""
self.dates = netCDF4.num2date(
self.read_time(loc_id),
units=self.dataset.variables[self.time_var].units,
calendar="standard",
only_use_cftime_datetimes=False,
only_use_python_datetimes=True)
return self.dates.astype("datetime64[ns]")
def _read_var_ts(self, loc_id, var):
"""
Read a time series of a variable at a given location id
Parameters
----------
loc_id : int
id of location, can be a grid point id or some other id
var : string
name of variable to read
"""
if self.prev_loc_id != loc_id:
index = self._get_index_of_ts(loc_id)
self.prev_loc_index = index
else:
index = self.prev_loc_index
self.prev_loc_id = loc_id
if self.read_bulk:
if var not in self.variables.keys():
self.variables[var] = self.dataset.variables[var][:]
return self.variables[var][index]
[docs] def read(self, variables, loc_id, dates_direct=False):
"""
reads time series of variables
Parameters
----------
variables : list or string
loc_id : int
location_id
dates_direct : boolean, optional
if True the dates are read directly from the netCDF file
without conversion to datetime
"""
if type(variables) != list:
variables = [variables]
ts = {}
for variable in variables:
data = self._read_var_ts(loc_id, variable)
ts[variable] = data
if not dates_direct:
# only read dates if they should be read automatically
if self.read_dates_auto:
# only read dates if they have not been read
# or if they are different for each location id which is
# the case if self.constant_dates is set to False
if self.dates is None:
self.read_dates(loc_id)
if not self.constant_dates:
self.read_dates(loc_id)
ts["time"] = self.dates
else:
if self.read_dates_auto:
# only read dates if they have not been read
# or if they are different for each location id which is
# the case if self.constant_dates is set to False
ts["time"] = self.read_time(loc_id)
return ts
[docs] def read_all(self, loc_id, dates_direct=False):
"""
read a time series of all time series variables at a given location id
Parameters
----------
loc_id : int
id of location, can be a grid point id or some other id
dates_direct : boolean, optional
if True the dates are read directly from the netCDF file
without conversion to datetime
Returns
-------
time_series : dict
keys of var and time with numpy.arrays as values
"""
ts = self.read(self._get_all_ts_variables(),
loc_id,
dates_direct=dates_direct)
return ts
[docs] def extend_time(self, dates, direct=False):
"""
Extend the time dimension and variable by the given dates
Parameters
----------
dates : numpy.array of datetime objects or floats
Timestamps.
direct : boolean
if true the dates are already converted into floating
point number of correct magnitude
"""
if direct:
self.append_var(self.time_var, dates)
else:
units = self.dataset.variables[self.time_var].units
self.append_var(
self.time_var,
netCDF4.date2num(dates, units=units, calendar="standard"))
[docs] def get_time_variable_overlap(self, dates):
"""
Figure out if a new date array has a overlap with the already
existing time variable.
Return the index of the existing time variable where the new dates
should be located.
At the moment this only handles cases where all dates are new or none
are new.
Parameters
----------
dates : list
List of datetime objects
Returns
-------
indexes : numpy.ndarray
Array of indexes that overlap
"""
timevar = self.dataset.variables[self.time_var]
if timevar.size == 0:
indexes = np.array([0])
else:
try:
indexes = netCDF4.date2index(dates, timevar)
except ValueError:
indexes = np.array([timevar.size])
return indexes
[docs] @abstractmethod
def write(self,
loc_id,
data,
dates,
loc_descr=None,
lon=None,
lat=None,
alt=None,
fill_values=None,
attributes=None,
dates_direct=False):
"""
Write time series data, if not yet existing also add location to file
for this data format it is assumed that in each write/append cycle
the same amount of data is added.
Parameters
----------
loc_id : int
Location id.
data : dict
Dictionary with variable names as keys and
numpy.ndarrays as values.
dates: numpy.ndarray
Array of datetime objects.
attributes : dict, optional
Dictionary of attributes that should be added to the netCDF
variables. can also be a dict of dicts for each variable name
as in the data dict.
dates_direct : boolean
If true the dates are already converted into floating
point number of correct magnitude.
"""
pass
[docs] def write_all(self,
loc_ids,
data,
dates,
loc_descrs=None,
lons=None,
lats=None,
alts=None,
fill_values=None,
attributes=None,
dates_direct=False):
"""
Write time series data in bulk, for this the user has to provide
a 2D array with dimensions (self.nloc, dates) that is filled with
the time series of all grid points in the file.
Parameters
----------
loc_ids : numpy.ndarray
location ids along the first axis of the data array
data : dict
dictionary with variable names as keys and 2D numpy.arrays as
values
dates: numpy.ndarray
Array of datetime objects with same size as second dimension of
data arrays.
attributes : dict, optional
Dictionary of attributes that should be added to the netCDF
variables. can also be a dict of dicts for each variable name as
in the data dict.
dates_direct : boolean
If true the dates are already converted into floating
point number of correct magnitude
"""
if self.n_loc != loc_ids.size:
raise ValueError(
"loc_ids is not the same number of locations in the file")
for key in data:
if data[key].shape[1] != dates.size:
raise IOError(
f"Timestamps and dataset second dimension {key} must have the same size")
if data[key].shape[0] != self.n_loc:
raise IOError(
f"Datasets first dimension {key} must have the same size as number of locations in the file")
# make sure zip works even if one of the parameters is not given
if lons is None:
lons = np.repeat(None, self.n_loc)
if lats is None:
lats = np.repeat(None, self.n_loc)
if alts is None:
alts = np.repeat(None, self.n_loc)
# find out if attributes is a dict to be used for all variables or if
# there is a dictionary of attributes for each variable
unique_attr = False
if attributes is not None:
if sorted(data.keys()) == sorted(attributes.keys()):
unique_attr = True
self._add_location(loc_ids, lons, lats, alts, loc_descrs)
for key in data:
internal_attributes = {"name": key, "coordinates": "lat lon alt"}
if type(fill_values) == dict:
internal_attributes["_FillValue"] = fill_values[key]
if attributes is not None:
if unique_attr:
variable_attributes = attributes[key]
else:
variable_attributes = attributes
internal_attributes.update(variable_attributes)
if self.unlim_chunksize is None:
chunksizes = None
else:
chunksizes = [self.n_loc, self.unlim_chunksize[0]]
self.write_var(key,
data=None,
dim=(self.loc_dim_name, self.obs_dim_name),
attr=internal_attributes,
dtype=data[key].dtype,
chunksizes=chunksizes)
if self.write_offset is None:
# current shape tells us how many elements are already
# in the file
self.write_offset = self.dataset.variables[key].shape[1]
_slice = slice(self.write_offset, self.write_offset + dates.size,
None)
self.dataset.variables[key][:, _slice] = data[key]
# fill up time variable
if dates_direct:
self.dataset.variables[self.time_var][self.write_offset:] = dates
else:
units = self.dataset.variables[self.time_var].units
self.dataset.variables[self.time_var][self.write_offset:] = \
netCDF4.date2num(dates, units=units, calendar="standard")
[docs]class OrthoMultiTs(DatasetTs):
"""
Implementation of the Orthogonal multidimensional array representation
of time series according to the NetCDF CF-conventions 1.6.
Parameters
----------
filename : string
filename of netCDF file. If already exiting then it will be opened
as read only unless the append keyword is set. if the overwrite
keyword is set then the file will be overwritten
n_loc : int, optional
number of locations that this netCDF file contains time series for
only required for new file
loc_dim_name : string, optional
name of the location dimension
obs_dim_name : string, optional
name of the observations dimension
loc_ids_name : string, optional
name of variable that has the location id"s stored
loc_descr_name : string, optional
name of variable that has additional location information
stored
time_units : string, optional
units the time axis is given in.
Default: "days since 1900-01-01 00:00:00"
time_var : string, optional
name of time variable
Default: time
lat_var : string, optional
name of latitude variable
Default: lat
lon_var : string, optional
name of longitude variable
Default: lon
alt_var : string, optional
name of altitude variable
Default: alt
unlim_chunksize : int, optional
chunksize to use along unlimited dimensions, other chunksizes
will be calculated by the netCDF library
read_bulk : boolean, optional
if set to True the data of all locations is read into memory,
and subsequent calls to "read" read from the cache and not from disk
this makes reading complete files faster#
read_dates : boolean, optional
if false dates will not be read automatically but only on specific
request useable for bulk reading because currently the netCDF
num2date routine is very slow for big datasets
"""
def __init__(self,
filename,
n_loc=None,
loc_dim_name="locations",
obs_dim_name="time",
loc_ids_name="location_id",
loc_descr_name="location_description",
time_units="days since 1900-01-01 00:00:00",
time_var="time",
lat_var="lat",
lon_var="lon",
alt_var="alt",
unlim_chunksize=None,
read_bulk=False,
read_dates=True,
**kwargs):
super(OrthoMultiTs,
self).__init__(filename, n_loc, loc_dim_name, obs_dim_name,
loc_ids_name, loc_descr_name, time_units,
time_var, lat_var, lon_var, alt_var,
unlim_chunksize, read_bulk, read_dates, **kwargs)
# initialize dimensions and index_variable
if self.mode == "w":
self._init_dimensions()
self._init_lookup()
self._init_location_variables()
self._init_location_id_and_time()
self.global_attr["featureType"] = "timeSeries"
def _init_dimensions(self):
"""
Initializes the dimensions.
"""
if self.n_loc is None:
raise ValueError(
"Number of locations have to be set for new OrthoMultiTs file")
self.create_dim(self.loc_dim_name, self.n_loc)
self.create_dim(self.obs_dim_name, None)
def _init_lookup(self):
"""
Initializes variables for the lookup between locations and entries in
the time series.
"""
# nothing to be done for OrthoMultiTs
pass
def _get_index_of_ts(self, loc_id):
"""
Get the index of a time series.
Parameters
----------
loc_id : int
Location id.
Returns
-------
loc_id_index : int
Location id index.
"""
try:
loc_id_index = self._get_loc_id_index(loc_id)
except IOError:
raise IOError(f"Index for location id #{loc_id} not found")
_slice = (loc_id_index, slice(None, None, None))
return _slice
def _get_loc_ix_from_obs_ix(self, obs_ix):
"""
Get location index from observation index. In case of OrthoMultiTs
all measurements are taken at the same time and therefore all
location id"s are affected.
Parameters
----------
obs_ix : int
Observation index.
Returns
-------
loc_ix : int
Location index.
"""
return self.read_var(self.loc_ids_name)
[docs] def write(self,
loc_id,
data,
dates,
loc_descr=None,
lon=None,
lat=None,
alt=None,
fill_values=None,
attributes=None,
dates_direct=False):
"""
Write time series data, if not yet existing also add location to file
for this data format it is assumed that in each write/append cycle
the same amount of data is added.
Parameters
----------
loc_id : int
Location id.
data : dict
Dictionary with variable names as keys and
numpy.ndarrays as values.
dates: numpy.ndarray
Array of datetime objects.
attributes : dict, optional
Dictionary of attributes that should be added to the netCDF
variables. can also be a dict of dicts for each variable name
as in the data dict.
dates_direct : boolean
If true the dates are already converted into floating
point number of correct magnitude.
"""
try:
idx = self._get_loc_id_index(loc_id)
except IOError:
_ = self._add_location(loc_id, lon, lat, alt, loc_descr)
idx = self._get_loc_id_index(loc_id)
# find out if attributes is a dict to be used for all variables or if
# there is a dictionary of attributes for each variable
unique_attr = False
if attributes is not None:
if sorted(data.keys()) == sorted(attributes.keys()):
unique_attr = True
for key in data:
if data[key].size != dates.size:
raise IOError(
f"Timestamps and dataset {key} must have the same size")
overlap_indexes = self.get_time_variable_overlap(dates)
if len(dates) != len(overlap_indexes):
self.extend_time(dates, direct=dates_direct)
self.length_before_extend = overlap_indexes[-1]
else:
self.length_before_extend = 0
for key in data:
internal_attributes = {"name": key, "coordinates": "lat lon alt"}
if type(fill_values) == dict:
internal_attributes["_FillValue"] = fill_values[key]
if attributes is not None:
if unique_attr:
variable_attributes = attributes[key]
else:
variable_attributes = attributes
internal_attributes.update(variable_attributes)
if self.unlim_chunksize is None:
chunksizes = None
else:
chunksizes = [self.n_loc, self.unlim_chunksize[0]]
self.write_var(key,
data=None,
dim=(self.loc_dim_name, self.obs_dim_name),
attr=internal_attributes,
dtype=data[key].dtype,
chunksizes=chunksizes)
if self.write_offset is None:
# find start of elements that are not yet filled with values
_slice_new = slice(self.length_before_extend, None, None)
masked = \
np.where(
self.dataset.variables[key][idx, _slice_new].mask)[0]
# all indexes already filled
if len(masked) == 0:
raise IOError("No free data slots available")
else:
self.write_offset = np.min(
masked) + self.length_before_extend
_slice = slice(self.write_offset, None, None)
# has to be reshaped to 2 dimensions because it is written
# into 2d variable otherwise netCDF library gets confused,
# might be a bug in netCDF?
self.dataset.variables[key][idx, _slice] = \
data[key].reshape(1, data[key].size)
[docs]class ContiguousRaggedTs(DatasetTs):
"""
Class that represents a Contiguous ragged array representation of
time series according to NetCDF CF-conventions 1.6.
Parameters
----------
filename : string
filename of netCDF file. If already exiting then it will be opened
as read only unless the append keyword is set. if the overwrite
keyword is set then the file will be overwritten
n_loc : int, optional
number of locations that this netCDF file contains time series for
only required for new file
n_obs : int, optional
how many observations will be saved into this netCDF file in total
only required for new file
obs_loc_lut : string, optional
variable name in the netCDF file that contains the lookup between
observations and locations
loc_dim_name : string, optional
name of the location dimension
obs_dim_name : string, optional
name of the observations dimension
loc_ids_name : string, optional
name of variable that has the location id"s stored
loc_descr_name : string, optional
name of variable that has additional location information
stored
time_units : string, optional
units the time axis is given in.
Default: "days since 1900-01-01 00:00:00"
time_var : string, optional
name of time variable
Default: time
lat_var : string, optional
name of latitude variable
Default: lat
lon_var : string, optional
name of longitude variable
Default: lon
alt_var : string, optional
name of altitude variable
Default: alt
"""
def __init__(self,
filename,
n_loc=None,
n_obs=None,
obs_loc_lut="row_size",
obs_dim_name="obs",
**kwargs):
self.n_obs = n_obs
self.obs_loc_lut = obs_loc_lut
super(ContiguousRaggedTs, self).__init__(filename,
n_loc=n_loc,
obs_dim_name=obs_dim_name,
**kwargs)
if self.mode == "w":
self._init_dimensions()
self._init_lookup()
self._init_location_variables()
self._init_location_id_and_time()
self.global_attr["featureType"] = "timeSeries"
self.constant_dates = False
def _init_dimensions(self):
"""
Initializes the dimensions.
"""
self.create_dim(self.loc_dim_name, self.n_loc)
self.create_dim(self.obs_dim_name, self.n_obs)
def _init_lookup(self):
"""
Initializes variables for the lookup between locations and entries in
the time series.
"""
attr = {
"long_name": "number of observations at this location",
"sample_dimension": self.obs_dim_name
}
self.write_var(self.obs_loc_lut,
data=None,
dim=self.loc_dim_name,
dtype=np.int64,
attr=attr,
chunksizes=self.unlim_chunksize)
def _get_index_of_ts(self, loc_id):
"""
Get slice object for time series at location loc_id.
Parameters
----------
loc_id : int
Location id.
Returns
-------
slice_obj : slice
Slice object with start and end of time series.
Raises
------
ValueError
If location id could not be found.
"""
try:
loc_id_index = self._get_loc_id_index(loc_id)
except IOError:
raise IOError(
f"Index of time series for location id #{loc_id} not found")
if self.read_bulk and self.obs_loc_lut not in self.variables:
self.variables[self.obs_loc_lut] = np.array(
self.dataset.variables[self.obs_loc_lut][:])
start = np.sum(self.variables[self.obs_loc_lut][:loc_id_index])
end = np.sum(self.variables[self.obs_loc_lut][:loc_id_index + 1])
return slice(start, end)
def _get_loc_ix_from_obs_ix(self, obs_ix):
"""
Get location index from observation index.
Parameters
----------
obs_ix : int
Observation index.
Returns
-------
loc_id_index : int
Location id index.
"""
bins = np.hstack((0, np.cumsum(self.variables[self.obs_loc_lut])))
loc_id_index = np.digitize(obs_ix, bins) - 1
return loc_id_index
[docs] def read_time(self, loc_id):
"""
Read the time stamps for the given location id in this case it
works like a normal time series variable.
Returns
-------
time_var : np.float64
Time variable.
"""
return self._read_var_ts(loc_id, self.time_var)
[docs] def write(self,
loc_id,
data,
dates,
loc_descr=None,
lon=None,
lat=None,
alt=None,
fill_values=None,
attributes=None,
dates_direct=False):
"""
Write time series data, if not yet existing also add location to file.
Parameters
----------
loc_id : int
Location id.
data : dict
Dictionary with variable names as keys and
numpy.ndarrays as values.
dates: numpy.array
Array of datetime objects.
attributes : dict, optional
Dictionary of attributes that should be added to the netCDF
variables. can also be a dict of dicts for each variable name
as in the data dict.
dates_direct : boolean
If true the dates are already converted into floating
point number of correct magnitude.
"""
try:
idx = self._get_loc_id_index(loc_id)
except IOError:
idx = self._add_location(loc_id, lon, lat, alt, loc_descr)
# find out if attributes is a dict to be used for all variables or if
# there is a dictionary of attributes for each variable
unique_attr = False
if attributes is not None:
if sorted(data.keys()) == sorted(attributes.keys()):
unique_attr = True
for key in data:
if data[key].size != dates.size:
raise IOError(
f"Timestamps and dataset {key} must have the same size")
# add number of new elements to index_var
self.dataset.variables[self.obs_loc_lut][idx] = dates.size
index = self._get_index_of_ts(loc_id)
for key in data:
internal_attributes = {
"name": key,
"coordinates": "time lat lon alt"
}
if type(fill_values) == dict:
internal_attributes["_FillValue"] = fill_values[key]
if attributes is not None:
if unique_attr:
variable_attributes = attributes[key]
else:
variable_attributes = attributes
internal_attributes.update(variable_attributes)
self.write_var(key,
data=None,
dim=self.obs_dim_name,
attr=internal_attributes,
dtype=data[key].dtype,
chunksizes=self.unlim_chunksize)
self.dataset.variables[key][index] = data[key]
if dates_direct:
self.dataset.variables[self.time_var][index] = dates
else:
units = self.dataset.variables[self.time_var].units
self.dataset.variables[self.time_var][index] = \
netCDF4.date2num(dates, units=units, calendar="standard")
[docs]class IndexedRaggedTs(DatasetTs):
"""
Class that represents a Indexed ragged array representation of time series
according to NetCDF CF-conventions 1.6.
"""
def __init__(self,
filename,
n_loc=None,
obs_loc_lut="locationIndex",
**kwargs):
self.obs_loc_lut = obs_loc_lut
super(IndexedRaggedTs, self).__init__(filename, n_loc=n_loc, **kwargs)
if self.mode == "w":
self._init_dimensions()
self._init_lookup()
self._init_location_variables()
self._init_location_id_and_time()
self.global_attr["featureType"] = "timeSeries"
self.not_timeseries.append(self.obs_loc_lut)
self.constant_dates = False
def _init_dimensions(self):
"""
Initializes the dimensions.
"""
self.create_dim(self.loc_dim_name, self.n_loc)
self.create_dim(self.obs_dim_name, None)
def _init_lookup(self):
"""
Initializes variables for the lookup between locations and entries
in the time series.
"""
attr = {
"long_name": "which location this observation is for",
"instance_dimension": self.loc_dim_name
}
self.write_var(self.obs_loc_lut,
data=None,
dim=self.obs_dim_name,
dtype=np.int64,
attr=attr,
chunksizes=self.unlim_chunksize)
def _get_index_of_ts(self, loc_id):
"""
Parameters
----------
loc_id: int
Location index.
Raises
------
IOError
if location id could not be found
"""
try:
loc_ix = self._get_loc_id_index(loc_id)
except IOError:
msg = f"Time series for Location #{loc_id.__str__()} not found."
raise IOError(msg)
if self.read_bulk and self.obs_loc_lut not in self.variables:
self.variables[self.obs_loc_lut] = self.dataset.variables[
self.obs_loc_lut][:]
index = np.where(self.variables[self.obs_loc_lut] == loc_ix)[0]
if len(index) == 0:
msg = f"Time series for Location #{loc_id.__str__()} not found."
raise IOError(msg)
return index
def _get_loc_ix_from_obs_ix(self, obs_ix):
"""
Get location index from observation index.
Parameters
----------
obs_ix : int
Observation index.
Returns
-------
loc_ix : int
Location index.
"""
return self.variables[self.obs_loc_lut][obs_ix]
[docs] def read_time(self, loc_id):
"""
Read the time stamps for the given location id in this case it
works like a normal time series variable.
Returns
-------
time_var : np.float64
Time variable.
"""
return self._read_var_ts(loc_id, self.time_var)
[docs] def write(self,
loc_id,
data,
dates,
loc_descr=None,
lon=None,
lat=None,
alt=None,
fill_values=None,
attributes=None,
dates_direct=False):
"""
write time series data, if not yet existing also add location to file
Parameters
----------
loc_id : int or numpy.ndarray
location id, if it is an array the location ids have to match the
data in the data dictionary and in the dates array. In this way data for more than
one point can be written into the file at once.
data : dict or numpy.recarray
dictionary with variable names as keys and numpy.arrays as values
dates: numpy.array
array of datetime objects
attributes : dict, optional
dictionary of attributes that should be added to the netCDF
variables. can also be a dict of dicts for each variable name as
in the data dict.
dates_direct : boolean
if true the dates are already converted into floating
point number of correct magnitude
"""
if type(data) == np.ndarray:
field_names = data.dtype.names
else:
field_names = data.keys()
# we always want to work with arrays
loc_id = np.atleast_1d(loc_id)
if len(loc_id) == 1:
loc_id = loc_id.repeat(dates.size)
(loc_ids_uniq, loc_ids_uniq_index,
loc_ids_uniq_lookup) = np.unique(loc_id,
return_index=True,
return_inverse=True)
lon = np.atleast_1d(lon)
lon_uniq = lon[loc_ids_uniq_index]
lat = np.atleast_1d(lat)
lat_uniq = lat[loc_ids_uniq_index]
if alt is not None:
alt = np.atleast_1d(alt)
alt_uniq = alt[loc_ids_uniq_index]
else:
alt_uniq = None
if loc_descr is not None:
loc_descr = np.atleast_1d(loc_descr)
loc_descr_uniq = loc_descr[loc_ids_uniq_index]
else:
loc_descr_uniq = None
try:
idx = self._get_loc_id_index(loc_id)
except IOError:
idx = self._add_location(loc_ids_uniq, lon_uniq, lat_uniq,
alt_uniq, loc_descr_uniq)
idx = self._get_loc_id_index(loc_id)
# find out if attributes is a dict to be used for all variables or if
# there is a dictionary of attributes for each variable
unique_attr = False
if attributes is not None:
if sorted(data.keys()) == sorted(attributes.keys()):
unique_attr = True
for key in field_names:
if data[key].size != dates.size:
raise DatasetError(
f"timestamps and dataset {key} must have the same size")
# add number of new elements to index_var
indices = np.atleast_1d(idx)
self.append_var(self.obs_loc_lut, indices)
index = np.arange(len(self.variables[
self.obs_loc_lut]))[len(self.variables[self.obs_loc_lut]) -
len(indices):]
for key in field_names:
internal_attributes = {
"name": key,
"coordinates": "time lat lon alt"
}
if type(fill_values) == dict:
internal_attributes["_FillValue"] = fill_values[key]
if attributes is not None:
if unique_attr:
variable_attributes = attributes[key]
else:
variable_attributes = attributes
internal_attributes.update(variable_attributes)
# does nothing if variable exists already
self.write_var(key,
data=None,
dim=self.obs_dim_name,
attr=internal_attributes,
dtype=data[key].dtype,
chunksizes=self.unlim_chunksize)
self.dataset.variables[key][index] = data[key]
if dates_direct:
self.dataset.variables[self.time_var][index] = dates
else:
units = self.dataset.variables[self.time_var].units
self.dataset.variables[self.time_var][index] = \
netCDF4.date2num(dates, units=units, calendar="standard")
[docs]class GriddedNcTs(GriddedTsBase):
def __init__(self, *args, **kwargs):
self.parameters = None
if "parameters" in kwargs:
self.parameters = kwargs.pop("parameters")
self.offsets = None
if "offsets" in kwargs:
self.offsets = kwargs.pop("offsets")
self.scale_factors = None
if "scale_factors" in kwargs:
self.scale_factors = kwargs.pop("scale_factors")
self.dtypes = None
if "dtypes" in kwargs:
self.dtypes = kwargs.pop("dtypes")
self.autoscale = True
if "autoscale" in kwargs:
self.autoscale = kwargs.pop("autoscale")
self.automask = True
if "automask" in kwargs:
self.automask = kwargs.pop("automask")
super(GriddedNcTs, self).__init__(*args, **kwargs)
self.ioclass_kws.update({
"autoscale": self.autoscale,
"automask": self.automask
})
self.dates = None
if self.ioclass == OrthoMultiTs:
self.read_dates = False
else:
self.read_dates = True
def _open(self, gp):
"""
Open file.
Parameters
----------
gp : int
Grid point.
Returns
-------
success : boolean
Flag if opening the file was successful.
"""
success = True
cell = self.grid.gpi2cell(gp)
filename = os.path.join(self.path,
f"{self.fn_format.format(cell)}.nc")
if self.mode == "r":
if self.previous_cell != cell:
self.close()
try:
self.fid = self.ioclass(filename,
mode=self.mode,
**self.ioclass_kws)
self.previous_cell = cell
except (IOError, RuntimeError):
success = False
self.fid = None
warnings.warn(f"I/O error {filename}", RuntimeWarning)
if self.mode in ["w", "a"]:
if self.previous_cell != cell:
self.close()
try:
if self.mode == "w":
if "n_loc" not in self.ioclass_kws:
n_loc = self.grid.grid_points_for_cell(
cell)[0].size
self.ioclass_kws["n_loc"] = n_loc
self.fid = self.ioclass(filename,
mode=self.mode,
**self.ioclass_kws)
self.previous_cell = cell
self.ioclass_kws.pop("n_loc", None)
except (IOError, RuntimeError):
success = False
self.fid = None
warnings.warn(f"I/O error {filename}", RuntimeWarning)
return success
def _read_gp(self, gpi, period=None, **kwargs):
"""
Method reads data for given gpi, additional keyword arguments
are passed to ioclass.read
Parameters
----------
gp : int
Grid point.
period : list
2 element array containing datetimes [start, end]
Returns
-------
ts : pandas.DataFrame
Time series data.
"""
if self.mode in ["w", "a"]:
raise IOError("trying to read file is in write/append mode")
if not self._open(gpi):
return None
if self.parameters is None:
data = self.fid.read_all(gpi, **kwargs)
else:
data = self.fid.read(self.parameters, gpi, **kwargs)
if self.dates is None or self.read_dates:
if "dates_direct" in kwargs:
self.dates = self.fid.read_time(gpi)
else:
self.dates = self.fid.read_dates(gpi)
time = self.dates
# remove time column from dataframe, only index should contain time
try:
data.pop("time")
except KeyError:
# if the time value is not found then do nothing
pass
ts = pd.DataFrame(data, index=time)
if period is not None:
ts = ts[period[0]:period[1]]
if self.dtypes is not None:
for dtype_column in self.dtypes:
if dtype_column in ts.columns:
try:
ts[dtype_column] = ts[dtype_column].astype(
self.dtypes[dtype_column])
except ValueError:
raise ValueError(
"Dtype conversion did not work. Try turning off automatic masking."
)
if self.scale_factors is not None:
for scale_column in self.scale_factors:
if scale_column in ts.columns:
ts[scale_column] *= self.scale_factors[scale_column]
if self.offsets is not None:
for offset_column in self.offsets:
if offset_column in ts.columns:
ts[offset_column] += self.offsets[offset_column]
return ts
def _write_gp(self, gp, data, **kwargs):
"""
Method writing data for given gpi.
Parameters
----------
gp : int
Grid point.
data : pandas.DataFrame
Time series data to write. Index has to be pandas.DateTimeIndex.
"""
if self.mode == "r":
raise IOError("trying to write but file is in read mode")
self._open(gp)
lon, lat = self.grid.gpi2lonlat(gp)
ds = data.to_dict("series")
for key in ds:
ds[key] = ds[key].values
self.fid.write(gp,
ds,
data.index.to_pydatetime(),
lon=lon,
lat=lat,
**kwargs)
[docs]class GriddedNcOrthoMultiTs(GriddedNcTs):
def __init__(self, *args, **kwargs):
kwargs["ioclass"] = OrthoMultiTs
super(GriddedNcOrthoMultiTs, self).__init__(*args, **kwargs)
[docs]class GriddedNcContiguousRaggedTs(GriddedNcTs):
def __init__(self, *args, **kwargs):
kwargs["ioclass"] = ContiguousRaggedTs
super(GriddedNcContiguousRaggedTs, self).__init__(*args, **kwargs)
[docs]class GriddedNcIndexedRaggedTs(GriddedNcTs):
def __init__(self, *args, **kwargs):
kwargs["ioclass"] = IndexedRaggedTs
super(GriddedNcIndexedRaggedTs, self).__init__(*args, **kwargs)
[docs] def write_cell(self, cell, gpi, data, datefield):
"""
Write complete data set into cell file.
Parameters
----------
cell : int
Cell number.
gpi : numpy.ndarray
Location ids.
data : dict or numpy record array
dictionary with variable names as keys and numpy.arrays as values
datefield: string
field in the data dict that contains dates in correct format
"""
if isinstance(self.grid, CellGrid) is False:
raise TypeError("Associated grid is not of type pygeogrids.CellGrid.")
if self.mode != "w":
raise ValueError("File not opened in write mode.")
tmp_cell = np.unique(self.grid.arrcell[gpi])
if tmp_cell.size > 1 or tmp_cell != cell:
raise ValueError("GPIs do not correspond to given cell.")
lons = self.grid.arrlon[gpi]
lats = self.grid.arrlat[gpi]
filename = os.path.join(self.path,
f"{self.fn_format.format(cell)}.nc")
if os.path.isfile(filename):
mode = "a"
else:
mode = "w"
if self.previous_cell != cell:
self.close()
self.previous_cell = cell
if self.mode == "w":
if "n_loc" not in self.ioclass_kws:
n_loc = self.grid.grid_points_for_cell(cell)[0].size
self.ioclass_kws["n_loc"] = n_loc
self.fid = self.ioclass(filename, mode=mode, **self.ioclass_kws)
self.ioclass_kws.pop("n_loc", None)
if type(data) != dict:
data = {key: data[key] for key in data.dtype.names}
dates = data[datefield]
del data[datefield]
self.fid.write(gpi, data, dates, lon=lons, lat=lats, dates_direct=True)
self.close()