"""Module to connect to the dd-api-oper See: https://digitaledeltaorg.github.io/dd-oper.v201.html """ import logging import requests from datetime import datetime, timedelta class ddApiOper: """ Class that connects to the digitale delta api with a client certificate. Methods for retrieving: - The list of locations - The list of quantitiess - The quantities of a location - Values for a given process, location and quantity """ RWS_DD_API = "https://ddapi.rws.nl/dd-oper/2.0" def __init__(self, certfile=None, certkey=None, url=RWS_DD_API, checkssl=True): self.certfile = certfile self.certkey = certkey self.url = url self.checkssl = checkssl logging.debug("ddApiOper.__init__(%s)" % url) def get(self, url): logging.debug("ddApiOper.get(%s)" % url) try: req = requests.get(url, cert=(self.certfile, self.certkey)) except Exception as e: logging.exception("ddApiOper.get error: %s" % e) raise(e) if (req.status_code != 200): raise ddApiOperHttpError(req.status_code) return(req.json()) def locations(self): return(ddApiLocation(self.get(self.url + "/locations")["results"])) def quantities(self): return(ddApiQuantitie(self.get(self.url + "/quantities")["results"])) def location_quantities(self, location): return(ddApiQuantitie(self.get(self.url + "/locations/%s/quantities" % location)["results"])) def values(self, location, quantitie, process="measurement", starttime=None, endtime=None, intervalLength="10min", aspectset="minimum"): if not endtime: endtime = datetime.utcnow() if not starttime: starttime = endtime - timedelta(days=1) if type(starttime) == datetime: starttime = starttime.isoformat() + "Z" if type(endtime) == datetime: endtime = endtime.isoformat() + "Z" logging.debug("ddApiOper.values(%s, %s, %s, %s, %s, %s, %s)" % ( location, quantitie, process, starttime, endtime, intervalLength, aspectset)) return(ddApiValues(self.get(( "%s" "/locations/%s/quantities/%s" "/timeseries?&startTime=%s&endTime=%s" "&process=%s&intervalLength=%s&aspectSet=%s" ) % ( self.url, location, quantitie, starttime, endtime, process, intervalLength, aspectset )))) class ddApiResult(object): def __init__(self, data): self.data = data class ddApiLocation(ddApiResult): def __init__(self, data): super().__init__(data) self.createIndex() def createIndex(self): self.__index = {} i = 0 for loc in self.data: self.__index[loc["properties"]["locationName"]] = i i+=1 def locationNames(self): return(self.__index.keys()) def locationDetail(self, locationName): return(self.data[self.__index[locationName]]) def displayNameGlobal(self, locationName): return(self.locationDetail(locationName)["properties"]["displayNameGlobal"]) def coordinate(self, locationName): return(self.locationDetail(locationName)["geometry"]["coordinates"]) class ddApiQuantitie(ddApiResult): def quantities(self): return(self.data) class ddApiValues(ddApiResult): def provider(self): return(self.data["provider"]) def result(self, index=0): return(self.data["results"][0]) def aspectSet(self): return(self.result()["observationType"]["aspectSet"]["aspects"]) def location(self): return(self.result()["location"]) def source(self): return(self.result()["source"]) def values(self, index=0): # pprint(v1.data["results"][0]["events"][0]["aspects"][0]["points"][0]["value"]) for e in self.result()["events"]: yield (e["timeStamp"], e["aspects"][index]["points"][0]["value"]) def sip(self, index=0): r = "!" sep = " " for e in self.result()["events"]: v = e["aspects"][index]["points"][0]["value"] q = e["aspects"][index]["points"][0]["quality"] r += "%s%i/%i" % (sep, v, q) sep = ";" return(r) class ddApiOperHttpError(Exception): def __init__(self, code): self.code = code if __name__ == "__main__": pass