Files
ddOperApi/ddapioper.py
2020-02-06 08:22:20 +01:00

152 lines
4.7 KiB
Python

"""Module to connect to the dd-api-oper
See:
https://digitaledeltaorg.github.io/dd-oper.v201.html
"""
import logging
import requests
from datetime import datetime, timedelta
class ddApiOper:
"""
Class that connects to the digitale delta api with a client certificate.
Methods for retrieving:
- The list of locations
- The list of quantitiess
- The quantities of a location
- Values for a given process, location and quantity
"""
RWS_DD_API = "https://ddapi.rws.nl/dd-oper/2.0"
def __init__(self, certfile=None, certkey=None, url=RWS_DD_API, checkssl=True):
self.certfile = certfile
self.certkey = certkey
self.url = url
self.checkssl = checkssl
logging.debug("ddApiOper.__init__(%s)" % url)
def get(self, url):
logging.debug("ddApiOper.get(%s)" % url)
try:
req = requests.get(url, cert=(self.certfile, self.certkey))
except Exception as e:
logging.exception("ddApiOper.get error: %s" % e)
raise(e)
if (req.status_code != 200):
raise ddApiOperHttpError(req.status_code)
return(req.json())
def locations(self):
return(ddApiLocation(self.get(self.url + "/locations")["results"]))
def quantities(self):
return(ddApiQuantitie(self.get(self.url + "/quantities")["results"]))
def location_quantities(self, location):
return(ddApiQuantitie(self.get(self.url + "/locations/%s/quantities" % location)["results"]))
def values(self, location, quantitie, process="measurement",
starttime=None, endtime=None,
intervalLength="10min", aspectset="minimum"):
if not endtime:
endtime = datetime.utcnow()
if not starttime:
starttime = endtime - timedelta(days=1)
if type(starttime) == datetime:
starttime = starttime.isoformat() + "Z"
if type(endtime) == datetime:
endtime = endtime.isoformat() + "Z"
logging.debug("ddApiOper.values(%s, %s, %s, %s, %s, %s, %s)" % (
location, quantitie, process, starttime, endtime,
intervalLength, aspectset))
return(ddApiValues(self.get((
"%s"
"/locations/%s/quantities/%s"
"/timeseries?&startTime=%s&endTime=%s"
"&process=%s&intervalLength=%s&aspectSet=%s"
) %
(
self.url,
location, quantitie,
starttime, endtime,
process,
intervalLength, aspectset
))))
class ddApiResult(object):
def __init__(self, data):
self.data = data
class ddApiLocation(ddApiResult):
def __init__(self, data):
super().__init__(data)
self.createIndex()
def createIndex(self):
self.__index = {}
i = 0
for loc in self.data:
self.__index[loc["properties"]["locationName"]] = i
i+=1
def locationNames(self):
return(self.__index.keys())
def locationDetail(self, locationName):
return(self.data[self.__index[locationName]])
def displayNameGlobal(self, locationName):
return(self.locationDetail(locationName)["properties"]["displayNameGlobal"])
def coordinate(self, locationName):
return(self.locationDetail(locationName)["geometry"]["coordinates"])
class ddApiQuantitie(ddApiResult):
def quantities(self):
return(self.data)
class ddApiValues(ddApiResult):
def provider(self):
return(self.data["provider"])
def result(self, index=0):
return(self.data["results"][0])
def aspectSet(self):
return(self.result()["observationType"]["aspectSet"]["aspects"])
def location(self):
return(self.result()["location"])
def source(self):
return(self.result()["source"])
def values(self, index=0):
# pprint(v1.data["results"][0]["events"][0]["aspects"][0]["points"][0]["value"])
for e in self.result()["events"]:
yield (e["timeStamp"], e["aspects"][index]["points"][0]["value"])
def sip(self, index=0):
r = "!"
sep = " "
for e in self.result()["events"]:
v = e["aspects"][index]["points"][0]["value"]
q = e["aspects"][index]["points"][0]["quality"]
r += "%s%i/%i" % (sep, v, q)
sep = ";"
return(r)
class ddApiOperHttpError(Exception):
def __init__(self, code):
self.code = code
if __name__ == "__main__":
pass