Files
lmwsip/lmwsip.py
2019-02-12 22:00:49 +01:00

218 lines
6.1 KiB
Python

"""Module to support the LmwSip class
See: LmwSip"""
import socket
import ssl
import time
import re
class LmwSip:
"""Class to connect to the LMW Standard Interface prototcol (sip)
This class iplement connection to the Rijkswaterstaat Meetnet
Water (LMW) with the Standard Interface Protocol using the
Rijkswaterstaat Meetnet Infrastructuur (RMI).
https://waterberichtgeving.rws.nl/water-en-weer/metingen
Support for:
ti
cmd(wn, vw, as)
"""
def __init__(self, user, password, host="sip-lmw.rws.nl", port=443,
meetnet="LMW", ssl = True, check_ssl = True):
"""LmwSip(user, password, [host], [port], [meetnet], [ssl], [check_ssl])
user: Lmw user name
password: Lmw password
host(optional): Default sip-lmw.rws.nl
port(optional): Default 443
meetnet(optional): Default LMW
ssl(optional): Default true
check_ssl(optional): true
Opens the connection and logs in
"""
self.user = user
self.password = password
self.host = host
self.port = port
self.meetnet = meetnet
self.ssl = ssl
self.check_ssl = check_ssl
self.connect()
self.login()
def lasttime(self, parameter):
#
# Find the last valid 10 minute window.
# The measurement of 12:00 is avaiable at 12:05:30.
# Before 12:05:30 we should use 11:50:00.
#
# At 12:05:29 we substract 15:29 from the time!
#
# Also note that we use GMT. So we should add one hour
# because we use GMT +1 (MET, UTC-1)
#
if (parameter.find("10") != -1):
now=time.time()
dt = now%600
if (dt < 330):
now = 3000 + now - dt
else:
now = 3600 + now - dt
else:
#
# e.g. H1 use 30 seconds to calculate the time.
#
dt = now%600
if (dt < 30):
now = 3540 + now - dt
else:
now = 3600 + now - dt
time_of_day=time.strftime("%H:%M", time.gmtime(now))
return { "day": time.strftime("%d-%m-%Y", time.gmtime(now)),
"time_of_day": time.strftime("%H:%M", time.gmtime(now)) }
def connect(self):
"""connect()
connects to lmw with tcp using the values of the object creation.
"""
self._tcp = socket.create_connection((self.host, self.port))
if (self.ssl):
self._context = ssl.create_default_context()
self._context.check_hostname = self.check_ssl
self._ssl = self._context.wrap_socket(self._tcp,
server_hostname=self.host)
self._socket = self._ssl
else:
self._socket = self._tcp
def send(self, sipcmd):
"""send(sipcmd)
send a sip command to the server
"""
self._socket.sendall(sipcmd.encode('ascii'))
def recv(self):
"""recv()
recieve a answer from the sip server
"""
return(self._socket.recv())
def login(self):
"""login()
Login lmw using the object creation user, password.
Raises a LmwLoginFailure exception on failure
"""
li="LI " + self.user + "," + self.password + "\r"
self.send(li)
d = self.recv()
if (d.decode('ascii')[0] != '!'):
raise LmwLoginFailure(self.user + ":" + d.decode('ascii'))
def ti(self):
"""ti()
Request the time from lmw and returns the string.
Raises a LmwCmdWarn of failure
"""
ti="TI " + self.meetnet + "\r"
self.send(ti)
d = self.recv()
if (d.decode('ascii')[0] != '!'):
raise LmwCmdWarn("TI " + self.meetnet +":" + d.decode('ascii'))
return (d.decode('ascii')[2:-1])
def cmd(self, process, location, parameter, time_delta, day,
time_of_day, cmd_type="DATA"):
"""cmd(process, location, parameter, time_delta, day, time_of_day)
Send a cmd to LMW and returns the lmw string
process: <WN|VW|AS>
location: <lmw location (e.g. HOEK)>
parameter: <lmw parameter (e.g. H10)>
time_delta: <Time windows (max 23:59, e.g. +01:00>
day: <Date>
time_of_day: <Time>
cmd_type: [DATA|DATB|OORS|OORB|""]
Example:
lmw.cmd("WN", "HOEK", "H10", "+01:00", "13-08-2018", "16:00")
Returns:
The LMW answer string
"""
if (process == "AS"):
data=""
else:
data="," + cmd_type
cmdstr=process + " " + self.meetnet + "," + location + "," + \
parameter + "," + time_delta + "," + day + "," + \
time_of_day + data + "\r"
self.send(cmdstr)
d = self.recv()
if (d.decode('ascii')[0] != '!'):
raise LmwCmdWarn(d.decode('ascii'))
return (d.decode('ascii')[2:-1])
def value(self, process, location, parameter, day = None,
time_of_day = None):
"""value(process, location, parameter, [day], [time_of_day], [cmd_type="DATA"]):
Parameters:
process: <WN|VW|AS>
location: <lmw location (e.g. HOEK)>
parameter: <lmw parameter (e.g. H10)>
day: [date = now()]
time_of_day: [time = now()]
The default returns the last value.
Example:
lmw.data_string("WN", "HOEK", "H10")
Returns a single string value or None
"""
if (day == None or time_of_day == None):
last = self.lasttime(parameter)
if (day==None):
day=last["day"]
if (time_of_day==None):
time_of_day=last["time_of_day"]
res = self.cmd(process, location, parameter, "+00:00", day,
time_of_day, "DATA")
value=re.sub("/.*$", "", res)
if (value == "99999"):
value=""
elif (value == "-999999999"):
value=""
#
# We should check the "kwaliteit"
#
return(value)
def logout(self):
"""logout()
Logs of
"""
self.send("LO\r")
class LmwLoginFailure(Exception):
"""Exception from LmwSip on login failure"""
class LmwCmdWarn(Warning):
"""Exception fro LmwSip on a cmd"""