#!/usr/bin/python import json from os import fork, kill from time import sleep from random import getrandbits from tempfile import NamedTemporaryFile from datetime import datetime, timedelta from dateutil.parser import parse from dateutil.tz import gettz from OpenSSL import crypto, SSL from flask import Flask, request # # TODO: createCert verplaatsen zodat die ook beschikbaar is in de module. # Dan kan deze als client certificaat gebruikt worden en kan er # getest worden met client certificaat. # De extra subclass kan dan ook weg! # class testServer(Flask): def run(self): self.createCert() self.writeCert() super().run(ssl_context=(self.certFile.name, self.keyFile.name)) def createCert(self): CN="*" k = crypto.PKey() k.generate_key(crypto.TYPE_RSA, 2048) serialnumber=getrandbits(64) cert = crypto.X509() cert.get_subject().C = "TS" cert.get_subject().L = "Test" cert.get_subject().ST = "Test" cert.get_subject().O = "Test" cert.get_subject().OU = "Test" cert.get_subject().CN = CN cert.set_serial_number(serialnumber) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(3600) cert.set_issuer(cert.get_subject()) cert.add_extensions([crypto.X509Extension(b'subjectAltName', False, b'DNS:*')]) cert.set_pubkey(k) cert.sign(k, 'sha512') self.cert=crypto.dump_certificate(crypto.FILETYPE_PEM, cert) self.key=crypto.dump_privatekey(crypto.FILETYPE_PEM, k) def writeCert(self): self.certFile = NamedTemporaryFile() self.certFile.file.write(self.cert) self.certFile.file.flush() self.keyFile = NamedTemporaryFile() self.keyFile.file.write(self.key) self.keyFile.flush() app = testServer(__name__) def timeStr(dt): return("%sz" % dt.isoformat()) def provider(responseType): return({ "name": "testServerDDOper", "supportUrl": "https://git.marceln.org/marceln/ddOperApi", "responseType": responseType, "responseTimestamp": timeStr(datetime.now()) }) def geometryPoint(x, y): return({ "type": "Point", "coordinates": [ x, y ] }) def roundTime(startTime, intervalSec): t = startTime.timestamp() m = intervalSec if (t%m > 0): s = int(t/m+1)*m return(datetime.fromtimestamp(s, tz=gettz("UTC"))) else: return(startTime) def singleEvent(eventTime, intervalSec, location, quantity, aspectSet): return({ "timeStamp": timeStr(eventTime), "startTime": timeStr(eventTime-timedelta(seconds=intervalSec/2)), "endTime": timeStr(eventTime+timedelta(seconds=intervalSec/2)), "quality": 10, "value": 0, "additionalInfo": 0 }) def events(location, quantity, aspectSet, intervalLength, startTime, endTime): ret = [] if intervalLength == "10min": intervalSec=600 elif intervalLength == "1min": intervalSec=60 eventTime = roundTime(startTime, intervalSec) while (eventTime < endTime): ret.append(singleEvent(eventTime, intervalSec, location, quantity, aspectSet)) eventTime = eventTime + timedelta(seconds=intervalSec) return(ret) def jsonReturn(data): return(json.dumps(data)) @app.route("/") def root(): return("Hello world!\n") @app.route("/dd-oper/2.0/locations") def locations(): ret = { "provider": provider("LocationListResponse"), "results": [ { "type": "Feature", "geometry": geometryPoint(0, 0), "properties": { "locationName": "test1", "locationNameSpace": "TS.DDOPER", "displayNameGlobal": "Test location 1" } }, { "type": "Feature", "geometry": geometryPoint(1, 0), "properties": { "locationName": "test2", "locationNameSpace": "TS.DDOPER", "displayNameGlobal": "Test location 2" } }, { "type": "Feature", "geometry": geometryPoint(0, 1), "properties": { "locationName": "test3", "locationNameSpace": "TS.DDOPER", "displayNameGlobal": "Test location 3" } } ], "_comment": "MKK-DL: Location catalogue" } return(jsonReturn(ret)) @app.route("/dd-oper/2.0/quantities") def quantities(): ret = { "provider": provider("QuantityListResponse"), "results": [ "time", "null" ], "_comment": "MKK-DL: Quantities catalogue" } return(jsonReturn(ret)) @app.route("/dd-oper/2.0/locations//quantities") def locationsQuantities(location): return("TODO: locations: %s\n" % location, 404) @app.route("/dd-oper/2.0/quantities/") def quantitiesLocations(quantity): return("TODO: quantities: %s\n" % quantity, 404) @app.route("/dd-oper/2.0/locations//quantities//timeseries") def values(location, quantity): startTime = request.args.get('startTime', type = str) endTime = request.args.get('endTime', type = str) process = request.args.get('process', default='measurement', type = str) intervalLength = request.args.get('intervalLength', default='10min', type = str) aspectSet = request.args.get('aspectSet', default="minimum", type = str) try: startTime = parse(startTime) endTime = parse(endTime) except Exception as e: return("parseerror time", 404) ret = { "provider": provider("Timeseries"), "results": [{ "source": { "process": process, "institution": { "name": "testServerDDOper" }, "location": { "geometry": { "type": "Point", "coordinates": [ 0, 0] } }, "type": "Feature", "properties": { "locationName": location, "locationNameSpace": "TS.DDOPER", "crsName": "WGS84", "displayNameGlobal": "test 1" } }, "startTime": timeStr(startTime), "endTime": timeStr(endTime), "events": events(location, quantity, aspectSet, intervalLength, startTime, endTime), "observationType": { "quantityName": quantity, "aspectSet": { "name": "minimum", "aspects": [ { "name": "Average", "unit": "" } ] } } }] } return(jsonReturn(ret)) def forkTestServer(): pid = None try: pid = fork() if (pid == 0): app.run() else: sleep(1) except Exception as e: raise(e) return(pid) def stopTestServer(pid): try: kill(pid, 15) sleep(0.01) except Exception as e: print("ERROR kill: %s" % e) return() def main(): app.run() if __name__ == '__main__': main()