Compare commits
1 Commits
ef778c3e0a
...
clearbuf
Author | SHA1 | Date | |
---|---|---|---|
2ed0f86fd4 |
@@ -1,21 +0,0 @@
|
||||
---
|
||||
name: build
|
||||
on:
|
||||
- push
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: fedora-builder
|
||||
steps:
|
||||
- name: Check out repository code
|
||||
uses: actions/checkout@v3
|
||||
- name: Build package
|
||||
run: python3 -m build
|
||||
- name: run tox
|
||||
run: tox
|
||||
- name: Upload na pypi
|
||||
env:
|
||||
TWINE_USERNAME: ${{ secrets.PYPI_USER }}
|
||||
TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }}
|
||||
if: github.ref_type == 'tag'
|
||||
run: python3 -m twine upload dist/*
|
11
.gitignore
vendored
11
.gitignore
vendored
@@ -1,10 +1,3 @@
|
||||
*.pyc
|
||||
tmp
|
||||
__pycache__
|
||||
build
|
||||
dist
|
||||
lmwsip_marceln.egg-info
|
||||
test/__pycache__
|
||||
lmwsip.egg-info
|
||||
*.swp
|
||||
.tox
|
||||
lmwsip-test
|
||||
*.sip
|
||||
|
20
LICENSE
20
LICENSE
@@ -1,20 +0,0 @@
|
||||
Copyright (c) 2021 Rijkswaterstaat
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
|
92
README.md
92
README.md
@@ -3,89 +3,59 @@
|
||||
|
||||
lmwsip is a python library for the lmw sip protocol.
|
||||
|
||||
## Package
|
||||
## Library
|
||||
|
||||
The lmwsip package contains the class LmwSip to connect to the
|
||||
[LMW](https://waterberichtgeving.rws.nl/water-en-weer/metingen/lmw-info)
|
||||
meetnet using de SIP protocol. The library contains documentation
|
||||
The lmwsip.py library contains the class LmwSip to connect to the
|
||||
LMW meetnet using de SIP protocol. The library contains documentation
|
||||
how to use it.
|
||||
|
||||
## Installing
|
||||
## Tools
|
||||
### lmwsip-test
|
||||
|
||||
Just install the package with 'pip':
|
||||
A small test program containing some functions to test the library.
|
||||
|
||||
```
|
||||
pip install lmwsip
|
||||
```
|
||||
### siprun
|
||||
|
||||
## Examples
|
||||
A prgram to run SIP command files.
|
||||
See "siprun -H" for usage information.
|
||||
|
||||
### Username password
|
||||
#### hoek-h10.sip
|
||||
|
||||
All examples contain "USER", "PASS".
|
||||
A sample sip command file to request the waterlevel at hoek van holland
|
||||
from the last hour
|
||||
|
||||
## Username password
|
||||
|
||||
All files contain "USER", "PASS".
|
||||
These values should be replaced by real credentials.
|
||||
Otherwise the connection fails.
|
||||
|
||||
### Use send (low level)
|
||||
## Examples
|
||||
|
||||
### Library
|
||||
|
||||
``` python
|
||||
from lmwsip import LmwSip
|
||||
|
||||
sip = LmwSip(ssl=True, host="sip-lmw.rws.nl", port=443)
|
||||
sip.send("LI user,pass\r")
|
||||
print("< [%s]" % (sip.recv().strip('\r')))
|
||||
sip.send("TI LMW\r")
|
||||
print("< [%s]" % (sip.recv().strip('\r')))
|
||||
sip.send("LO\r")
|
||||
print("< [%s]" % (sip.recv().strip('\r')))
|
||||
lmwsip = LmwSip(ssl=True, host="sip-lmw.ad.rws.nl", port=443)
|
||||
lmwsip.send("LI user,pass\r")
|
||||
print("< [%s]" % (lmwsip.recv().strip('\r')))
|
||||
lmwsip.send("TI LMW\r")
|
||||
print("< [%s]" % (lmwsip.recv().strip('\r')))
|
||||
lmwsip.send("LO\r")
|
||||
print("< [%s]" % (lmwsip.recv().strip('\r')))
|
||||
```
|
||||
|
||||
#### Use value
|
||||
### siprun
|
||||
|
||||
``` python
|
||||
from lmwsip import LmwSip
|
||||
sip = LmwSip("USER", "PASS")
|
||||
print(sip.ti())
|
||||
print(sip.value("WN", "HOEK", "H10"))
|
||||
sip.logout()
|
||||
```
|
||||
|
||||
#### Use timeseries
|
||||
``` python
|
||||
from lmwsip import LmwSip
|
||||
from datetime import datetime, timedelta
|
||||
from pprint import pprint
|
||||
|
||||
end = datetime.now()
|
||||
start = end - timedelta(hours=1)
|
||||
|
||||
sip = LmwSip("USER", "PASS")
|
||||
pprint(sip.timeSerie("WN", "HOEK", "H10", start, end).ts)
|
||||
```
|
||||
|
||||
### lmwsip.run
|
||||
```
|
||||
$ python -m lmwsip.run /tmp/hoek-h10.sip
|
||||
$ ./siprun -h sip-lmw.ad.rws.nl -s -p 443 hoek-h10.sip
|
||||
> [LI USER,PASS]
|
||||
< [! ]
|
||||
> [TI LMW]
|
||||
< [! 08-SEP-20 12:03:27]
|
||||
> [WN LMW,HOEK,H10,-01:00,08-09-2020,11:50,DATA]
|
||||
< [! -17/50;-21/50;-24/50;-26/50;-27/50;-28/50;-28/50]
|
||||
< [! 05-SEP-19 08:24:43]
|
||||
> [WN LMW,HOEK,H10,-25:00,05-09-2019,08:10,DATA]
|
||||
< [? 0211 Waarnemingen vallen buiten opslag range]
|
||||
> [LO]
|
||||
< [! ]
|
||||
```
|
||||
|
||||
## Unit tests
|
||||
|
||||
The code containts a python unittest.
|
||||
|
||||
This code runs a dummy sip server and runs a number of test against the dummy
|
||||
server.
|
||||
|
||||
## Git pre commit hook
|
||||
|
||||
There is a pre-commit `githooks/pre-commit' with two functions:
|
||||
* Updating the `__version__` in the module from setup.py
|
||||
* Running the unit test code.
|
||||
* Running a syntaxt test.
|
||||
|
@@ -1,30 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from lmwsip import LmwSip
|
||||
from datetime import datetime, timedelta
|
||||
from pprint import pprint
|
||||
|
||||
def printLocPar(sip, proces, loc, par, start, end):
|
||||
try:
|
||||
r = sip.timeSerie(proces, loc, par, start, end)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print("\n%s %s %s:\n" % (proces, loc, par))
|
||||
pprint(r.ts)
|
||||
|
||||
def main():
|
||||
end = datetime.now() + timedelta(minutes=10)
|
||||
start = end - timedelta(hours=1)
|
||||
|
||||
try:
|
||||
sip = LmwSip("<user>", "<pass>")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
printLocPar(sip, "WN", "HOEK", "H10", start, end)
|
||||
printLocPar(sip, "WN", "LEG1", "Czz10", start, end)
|
||||
printLocPar(sip, "VW", "HOEK", "H10V", start, end)
|
||||
printLocPar(sip, "AS", "HOEK", "H10A", start, end)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@@ -1,11 +0,0 @@
|
||||
#!/bin/sh
|
||||
|
||||
set -e
|
||||
|
||||
yamllint .gitea/workflows/*yml
|
||||
|
||||
VERSION=$(grep version setup.cfg | sed 's/.*= *//')
|
||||
sed -i "s/^__version__ = .*/__version__ = '${VERSION}'/" src/lmwsip/__init__.py
|
||||
git add src/lmwsip/__init__.py
|
||||
|
||||
tox
|
326
lmwsip.py
Normal file
326
lmwsip.py
Normal file
@@ -0,0 +1,326 @@
|
||||
"""Module to support the LmwSip class
|
||||
|
||||
See: LmwSip"""
|
||||
|
||||
import socket
|
||||
import ssl
|
||||
import select
|
||||
import time
|
||||
import re
|
||||
import logging
|
||||
|
||||
class LmwSip:
|
||||
"""Class to connect to the LMW Standard Interface prototcol (sip)
|
||||
|
||||
This class iplement connection to the Rijkswaterstaat Meetnet
|
||||
Water (LMW) with the Standard Interface Protocol using the
|
||||
Rijkswaterstaat Meetnet Infrastructuur (RMI).
|
||||
|
||||
https://waterberichtgeving.rws.nl/water-en-weer/metingen
|
||||
|
||||
Support for:
|
||||
ti
|
||||
cmd(wn, vw, as)
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, user=None, password=None,
|
||||
host="sip-lmw.rws.nl", port=443, meetnet="LMW", ssl = True,
|
||||
check_ssl = True, timeout = 10, log = None, clearbuf = False):
|
||||
"""LmwSip(user, password, [host], [port], [meetnet], [ssl], [check_ssl], [timeout], [log])
|
||||
|
||||
user(optinal): Lmw user name
|
||||
password(optional): Lmw password
|
||||
host(optional): Default sip-lmw.rws.nl
|
||||
port(optional): Default 443
|
||||
meetnet(optional): Default LMW
|
||||
ssl(optional): Default true
|
||||
check_ssl(optional): true
|
||||
timeout(optional): 10
|
||||
log(optional): None
|
||||
clearbuf(optinal): None
|
||||
|
||||
Opens the connection and logs in
|
||||
"""
|
||||
self.user = user
|
||||
self.password = password
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.meetnet = meetnet
|
||||
self.ssl = ssl
|
||||
self.check_ssl = check_ssl
|
||||
self.timeout = timeout
|
||||
self.clearbuf = clearbuf
|
||||
self._socket = None
|
||||
if (log != None):
|
||||
self.log = log
|
||||
self.log.debug("LmwSip.init")
|
||||
else:
|
||||
try:
|
||||
self.log = logging.getLogger("lmwsip")
|
||||
self.log.debug("LmwSip.init: Start log")
|
||||
except Exception as e:
|
||||
print("Logger failed: %s" % e)
|
||||
if (self.host != None):
|
||||
self.connect()
|
||||
if (self.user != None):
|
||||
self.login()
|
||||
|
||||
def lasttime(self, parameter):
|
||||
#
|
||||
# Find the last valid 10 minute window.
|
||||
# The measurement of 12:00 is avaiable at 12:05:30.
|
||||
# Before 12:05:30 we should use 11:50:00.
|
||||
#
|
||||
# At 12:05:29 we substract 15:29 from the time!
|
||||
#
|
||||
# Also note that we use GMT. So we should add one hour
|
||||
# because we use GMT +1 (MET, UTC-1)
|
||||
#
|
||||
if (parameter.find("10") != -1):
|
||||
now=time.time()
|
||||
dt = now%600
|
||||
if (dt < 330):
|
||||
now = 3000 + now - dt
|
||||
else:
|
||||
now = 3600 + now - dt
|
||||
else:
|
||||
#
|
||||
# e.g. H1 use 30 seconds to calculate the time.
|
||||
#
|
||||
dt = now%600
|
||||
if (dt < 30):
|
||||
now = 3540 + now - dt
|
||||
else:
|
||||
now = 3600 + now - dt
|
||||
time_of_day=time.strftime("%H:%M", time.gmtime(now))
|
||||
return { "day": time.strftime("%d-%m-%Y", time.gmtime(now)),
|
||||
"time_of_day": time.strftime("%H:%M", time.gmtime(now)) }
|
||||
|
||||
def connect(self):
|
||||
"""connect()
|
||||
|
||||
connects to lmw with tcp using the values of the object creation.
|
||||
"""
|
||||
try:
|
||||
self._tcp = socket.create_connection((self.host, self.port))
|
||||
except Exception as e:
|
||||
self.log.error("LmwSip.connect(%s, %s) failed: %s",
|
||||
self.host, self.port, e)
|
||||
raise LmwSipConnectError("LmwSip.connect: Socket create failed")
|
||||
if (self.ssl):
|
||||
try:
|
||||
self._context = ssl.create_default_context()
|
||||
self._context.check_hostname = self.check_ssl
|
||||
self._ssl = self._context.wrap_socket(self._tcp,
|
||||
server_hostname=self.host)
|
||||
self._socket = self._ssl
|
||||
except Exception as e:
|
||||
self.log.error("LmwSip.connect setup ssl failed:\n%s", e)
|
||||
raise LmwSipConnectError("LmwSip.connect: setup ssl failed")
|
||||
else:
|
||||
self._socket = self._tcp
|
||||
self._socket.settimeout(self.timeout)
|
||||
|
||||
def closesocket(self):
|
||||
"""Closes the socket and set the socket to None. Doesn't logout"""
|
||||
|
||||
try:
|
||||
self.log.debug("LmwSip.closesocket")
|
||||
self._socket.close()
|
||||
except Exception as e:
|
||||
pass
|
||||
self._socket = None
|
||||
|
||||
def emptysocket(self):
|
||||
"""emptysocket()
|
||||
|
||||
If clearbuf read all the data from the socket and ignore it.
|
||||
"""
|
||||
if self._socket != None and self.clearbuf:
|
||||
try:
|
||||
self._socket.settimeout(0)
|
||||
b = self._socket.recv(4096)
|
||||
except Exception as e:
|
||||
pass
|
||||
self._socket.settimeout(self.timeout)
|
||||
|
||||
def send(self, sipcmd):
|
||||
"""send(sipcmd)
|
||||
|
||||
send a sip command to the server
|
||||
"""
|
||||
if self._socket != None:
|
||||
try:
|
||||
logcmd = sipcmd.strip('\r')
|
||||
if re.match("^LI", logcmd, re.IGNORECASE):
|
||||
logcmd = re.sub(",.*", ", ******", logcmd)
|
||||
self.log.debug("LmwSip.send(%s)" % logcmd)
|
||||
self.emptysocket()
|
||||
self._socket.sendall(sipcmd.encode('ascii'))
|
||||
except Exception as e:
|
||||
self.log.error("LmwSip.send(%s) failed: %s", sipcmd, e)
|
||||
self.closesocket()
|
||||
raise LmwSipConnectError("LmwSip.send: Socket connection lost")
|
||||
else:
|
||||
self.log.warn("LmwSip.send: No connection")
|
||||
|
||||
def recv(self):
|
||||
"""recv()
|
||||
|
||||
recieve a answer from the sip server
|
||||
"""
|
||||
buf=""
|
||||
while self._socket != None and re.search("\r$", buf) == None:
|
||||
try:
|
||||
self.log.debug("LmwSip.recv: Waiting for data");
|
||||
b = self._socket.recv(4096).decode('utf-8')
|
||||
if (b == ""):
|
||||
self.log.error("SipLmw.recv: socket closed")
|
||||
self.closesocket()
|
||||
raise LmwSipConnectError("LmwSip.recv: Socket close")
|
||||
else:
|
||||
buf += b
|
||||
except Exception as e:
|
||||
self.log.error("SipLmw.recv: socket timeout: %s", e)
|
||||
self.closesocket()
|
||||
raise LmwSipConnectError("LmwSip.recv: Socket timeout")
|
||||
if self._socket == None:
|
||||
self.log.warn("LmwSip.recv: No connection")
|
||||
elif buf[0] != '!':
|
||||
self.log.warn("LmwSip.recv: Sip error: %s" % buf.strip('\r'))
|
||||
else:
|
||||
self.log.debug("LmwSip.recv: result: %s" % buf.strip('\r'))
|
||||
return(buf)
|
||||
|
||||
def login(self):
|
||||
"""login()
|
||||
|
||||
Login lmw using the object creation user, password.
|
||||
Raises a LmwLoginFailure exception on failure
|
||||
"""
|
||||
li="LI " + self.user + "," + self.password + "\r"
|
||||
self.send(li)
|
||||
d = self.recv()
|
||||
if (d[0] != '!'):
|
||||
raise LmwLoginFailure(self.user + ":" + d)
|
||||
|
||||
def ti(self):
|
||||
"""ti()
|
||||
|
||||
Request the time from lmw and returns the string.
|
||||
|
||||
Raises a LmwCmdWarn of failure
|
||||
"""
|
||||
ti="TI " + self.meetnet + "\r"
|
||||
self.send(ti)
|
||||
d = self.recv()
|
||||
return (d[2:-1])
|
||||
|
||||
def cmd(self, process, location, parameter, time_delta, day,
|
||||
time_of_day, cmd_type="DATA"):
|
||||
"""cmd(process, location, parameter, time_delta, day, time_of_day)
|
||||
|
||||
Send a cmd to LMW and returns the lmw string
|
||||
|
||||
process: <WN|VW|AS>
|
||||
location: <lmw location (e.g. HOEK)>
|
||||
parameter: <lmw parameter (e.g. H10)>
|
||||
time_delta: <Time windows (max 23:59, e.g. +01:00>
|
||||
day: <Date>
|
||||
time_of_day: <Time>
|
||||
cmd_type: [DATA|DATB|OORS|OORB|""]
|
||||
|
||||
Example:
|
||||
lmw.cmd("WN", "HOEK", "H10", "+01:00", "13-08-2018", "16:00")
|
||||
|
||||
Returns:
|
||||
The LMW answer string
|
||||
"""
|
||||
if (process == "AS"):
|
||||
data=""
|
||||
else:
|
||||
data="," + cmd_type
|
||||
|
||||
cmdstr=process + " " + self.meetnet + "," + location + "," + \
|
||||
parameter + "," + time_delta + "," + day + "," + \
|
||||
time_of_day + data + "\r"
|
||||
|
||||
self.send(cmdstr)
|
||||
d = self.recv()
|
||||
if (d[0] != '!'):
|
||||
raise LmwCmdWarn(cmdstr, d)
|
||||
return (d[2:-1])
|
||||
|
||||
def value(self, process, location, parameter, day = None,
|
||||
time_of_day = None):
|
||||
"""value(process, location, parameter, [day], [time_of_day]):
|
||||
|
||||
Parameters:
|
||||
process: <WN|VW|AS>
|
||||
location: <lmw location (e.g. HOEK)>
|
||||
parameter: <lmw parameter (e.g. H10)>
|
||||
day: [date = now()]
|
||||
time_of_day: [time = now()]
|
||||
|
||||
The default returns the last value.
|
||||
|
||||
Example:
|
||||
lmw.data_string("WN", "HOEK", "H10")
|
||||
|
||||
Returns a single string value or None
|
||||
"""
|
||||
if (day == None or time_of_day == None):
|
||||
last = self.lasttime(parameter)
|
||||
if (day==None):
|
||||
day=last["day"]
|
||||
if (time_of_day==None):
|
||||
time_of_day=last["time_of_day"]
|
||||
res = self.cmd(process, location, parameter, "+00:00", day,
|
||||
time_of_day, "DATA")
|
||||
value=re.sub("/.*$", "", res)
|
||||
if (value == "99999"):
|
||||
value=""
|
||||
elif (value == "-999999999"):
|
||||
value=""
|
||||
#
|
||||
# We should check the "kwaliteit"
|
||||
#
|
||||
return(value)
|
||||
|
||||
def logout(self):
|
||||
"""logout()
|
||||
|
||||
Logs of
|
||||
"""
|
||||
self.send("LO\r")
|
||||
self.closesocket()
|
||||
|
||||
|
||||
class LmwSipConnectError(Exception):
|
||||
"""Connection exceptions for LmwSip"""
|
||||
|
||||
def __init__(self, message):
|
||||
self.message = message
|
||||
|
||||
def __str__(self):
|
||||
return(self.message)
|
||||
|
||||
class LmwLoginFailure(Exception):
|
||||
"""Exception from LmwSip on login failure"""
|
||||
|
||||
def __init__(self, user, message):
|
||||
self.user = user
|
||||
self.message = message
|
||||
|
||||
def __str__(self):
|
||||
return("Login with user %s failed: %s", self.user, self.message)
|
||||
|
||||
class LmwCmdWarn(Warning):
|
||||
"""Exception fro LmwSip on a cmd"""
|
||||
def __init__(self, cmd, message):
|
||||
self.cmd = cmd
|
||||
self.message = message
|
||||
|
||||
def __str__(self):
|
||||
return("Cmd %s failed: %s", self.cmd, self.message)
|
@@ -1,6 +0,0 @@
|
||||
[build-system]
|
||||
requires = [
|
||||
"setuptools>=42",
|
||||
"wheel"
|
||||
]
|
||||
build-backend = "setuptools.build_meta"
|
25
setup.cfg
25
setup.cfg
@@ -1,25 +0,0 @@
|
||||
[metadata]
|
||||
name = lmwsip
|
||||
version = 0.9.8
|
||||
author = Marcel Nijenhof
|
||||
author_email = pypi@marceln.org
|
||||
description = Interface for the lmw sip protocol
|
||||
long_description = file: README.md
|
||||
long_description_content_type = text/markdown
|
||||
url = https://git.marceln.org/Werk/lmwsip
|
||||
classifiers =
|
||||
Programming Language :: Python :: 3
|
||||
License :: OSI Approved :: MIT License
|
||||
Operating System :: OS Independent
|
||||
Development Status :: 4 - Beta
|
||||
|
||||
[options]
|
||||
package_dir =
|
||||
= src
|
||||
install_requires =
|
||||
python-dateutil
|
||||
packages = find:
|
||||
python_requires = >= 3.6
|
||||
|
||||
[options.packages.find]
|
||||
where = src
|
4
setup.py
4
setup.py
@@ -1,4 +0,0 @@
|
||||
from setuptools import setup
|
||||
|
||||
if __name__ == "__main__":
|
||||
setup()
|
75
siprun
Executable file
75
siprun
Executable file
@@ -0,0 +1,75 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import sys
|
||||
import getopt
|
||||
from lmwsip import LmwSip
|
||||
|
||||
def usage():
|
||||
print("siprun [-H] [-s] [-d <date>] [-t <time>] [-h <host>] [-p <port>] [<files>]")
|
||||
print("\t-H: Show usage")
|
||||
print("\t-s: SSL connection")
|
||||
print("\t-d <date>: Date replacement string (2019-02-14)")
|
||||
print("\t-t <time>: Time replacement string (17:00)")
|
||||
print("\t-h <host>: Connect to host")
|
||||
print("\t-p <port>: Connect to port")
|
||||
print("\t-<files>: LMW commando files")
|
||||
|
||||
def main():
|
||||
try:
|
||||
opts, args = getopt.getopt(sys.argv[1:], "sh:p:d:t:", ["help", "output="])
|
||||
except getopt.GetoptError as err:
|
||||
# print help information and exit:
|
||||
print(err) # will print something like "option -a not recognized"
|
||||
usage()
|
||||
sys.exit(2)
|
||||
ssl=False
|
||||
time=None
|
||||
date=None
|
||||
host=None
|
||||
port=None
|
||||
for o, a in opts:
|
||||
if o == "-H":
|
||||
usage()
|
||||
sys.exit(0)
|
||||
elif o == "-s":
|
||||
ssl=True
|
||||
elif o == "-d":
|
||||
date=a
|
||||
elif o == "-t":
|
||||
time=a
|
||||
elif o == "-h":
|
||||
host=a
|
||||
elif o == "-p":
|
||||
port=a
|
||||
if (host==None or port==None):
|
||||
print("Set host and port")
|
||||
usage()
|
||||
sys.exit(3)
|
||||
try:
|
||||
lmwsip = LmwSip(ssl=ssl, host=host, port=port)
|
||||
except Exception as e:
|
||||
print("Connect to lmw failed: %s" % e)
|
||||
exit(1)
|
||||
if (date == None or time == None):
|
||||
# We assume a 10 minut interval so we use H10
|
||||
r = lmwsip.lasttime("H10")
|
||||
if (date == None):
|
||||
date = r["day"]
|
||||
if (time == None):
|
||||
time = r["time_of_day"]
|
||||
for cmdfile in args:
|
||||
with open(cmdfile, "r") as f:
|
||||
for cmd in f:
|
||||
cmd = cmd.replace('{DATE}', date)
|
||||
cmd = cmd.replace('{TIME}', time)
|
||||
cmd = cmd.replace('\n', '\r')
|
||||
print("> [%s]" % (cmd.strip('\r')))
|
||||
try:
|
||||
lmwsip.send(cmd)
|
||||
print("< [%s]" % (lmwsip.recv().strip('\r')))
|
||||
except:
|
||||
pass
|
||||
f.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
File diff suppressed because it is too large
Load Diff
@@ -1,64 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import sys
|
||||
import getopt
|
||||
import argparse
|
||||
import logging
|
||||
from lmwsip import LmwSip
|
||||
|
||||
def run(args):
|
||||
logging.basicConfig(level=args.debug)
|
||||
logging.debug("lmwsip.run %s" % args)
|
||||
try:
|
||||
lmwsip = LmwSip(host=args.host, port=args.port,
|
||||
ssl=not args.unencrypted,
|
||||
check_ssl=not args.acceptssl,
|
||||
cleartelnet=args.cleartelnet)
|
||||
except Exception as e:
|
||||
print("Connect to lmw failed: %s" % e)
|
||||
exit(1)
|
||||
for f in args.files:
|
||||
for cmd in f:
|
||||
cmd = cmd.replace('{DATE}', args.date)
|
||||
cmd = cmd.replace('{TIME}', args.time)
|
||||
cmd = cmd.replace('\n', '\r')
|
||||
print("> [%s]" % (cmd.strip('\r')))
|
||||
try:
|
||||
lmwsip.send(cmd)
|
||||
print("< [%s]" % (lmwsip.recv().strip('\r')))
|
||||
except:
|
||||
pass
|
||||
try:
|
||||
lmwsip.closesocket()
|
||||
except:
|
||||
pass
|
||||
|
||||
def main():
|
||||
lastTime=LmwSip(host=None).lasttime("H10")
|
||||
parser = argparse.ArgumentParser(description="Run a sip file.")
|
||||
parser.add_argument("-u", "--unencrypted", action="store_true",
|
||||
help="Run a sip connection without ssl")
|
||||
parser.add_argument("-a", "--acceptssl", action="store_true",
|
||||
help="Accept ssl certificate")
|
||||
parser.add_argument("-c", "--cleartelnet", action="store_true",
|
||||
help="Clear telnet protocol in tcp session")
|
||||
parser.add_argument("-H", "--host", action='store',
|
||||
default="sip-lmw.rws.nl",
|
||||
help="Host to connect to")
|
||||
parser.add_argument("-p", "--port", action='store', type=int, default=443,
|
||||
help="Port to connect to")
|
||||
parser.add_argument("-d", "--date", action='store',
|
||||
default=lastTime["day"],
|
||||
help="Date replacement string [DD-MM-YYYY]")
|
||||
parser.add_argument("-t", "--time", action='store',
|
||||
default=lastTime["time_of_day"],
|
||||
help="Time replacement string [HH:MM]")
|
||||
parser.add_argument("-D", "--debug", action='store',
|
||||
default="WARN",
|
||||
help="Debug level")
|
||||
parser.add_argument("files", type=argparse.FileType('r'), nargs="+",
|
||||
help="Sip files to run")
|
||||
run(parser.parse_args())
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
168
tests/main.py
168
tests/main.py
@@ -1,168 +0,0 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
import sys
|
||||
import io
|
||||
import unittest
|
||||
import lmwsip
|
||||
import stubSipServer
|
||||
import logging
|
||||
|
||||
from stubSipServer import sipServer
|
||||
from lmwsip.run import run
|
||||
from datetime import datetime, timedelta
|
||||
from dateutil import tz
|
||||
from time import sleep
|
||||
|
||||
class myTestArgs():
|
||||
pass
|
||||
|
||||
class lmwsipTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.sipserver = sipServer()
|
||||
self.sip = None
|
||||
self.sipserver.run()
|
||||
|
||||
def login(self, **args):
|
||||
log = logging.basicConfig(level=logging.DEBUG)
|
||||
self.sip = lmwsip.LmwSip("USER", "PASS", "localhost",
|
||||
self.sipserver.port, ssl=False,
|
||||
log=log, **args)
|
||||
|
||||
def tearDown(self):
|
||||
if self.sip:
|
||||
self.sip.closesocket()
|
||||
self.sipserver.kill()
|
||||
|
||||
def test_sipobj(self):
|
||||
self.login()
|
||||
self.assertEqual(type(self.sip), lmwsip.LmwSip)
|
||||
|
||||
def test_H1(self):
|
||||
self.sip = lmwsip.LmwSip(host=None)
|
||||
self.assertEqual(self.sip.period('H1'), 1)
|
||||
|
||||
def test_H10(self):
|
||||
self.sip = lmwsip.LmwSip(host=None)
|
||||
self.assertEqual(self.sip.period('H10'), 10)
|
||||
|
||||
def test_xHm0(self):
|
||||
self.sip = lmwsip.LmwSip(host=None)
|
||||
self.assertEqual(self.sip.period('xHm0'), 10)
|
||||
|
||||
def test_Noparm(self):
|
||||
self.sip = lmwsip.LmwSip(host=None)
|
||||
with self.assertRaises(lmwsip.LmwParmWarn):
|
||||
self.assertEqual(self.sip.period('Noparm'), None)
|
||||
|
||||
def test_loginfail(self):
|
||||
with self.assertRaises(lmwsip.LmwLoginFailure):
|
||||
self.sip = lmwsip.LmwSip("FAIL", "FAIL", "localhost",
|
||||
self.sipserver.port, ssl=False)
|
||||
|
||||
def test_ti(self):
|
||||
self.login()
|
||||
self.assertEqual(type(self.sip.ti()), str)
|
||||
|
||||
def test_telnetti(self):
|
||||
self.login(cleartelnet=True)
|
||||
self.assertEqual(type(self.sip.ti()), str)
|
||||
|
||||
def test_cmd(self):
|
||||
self.login()
|
||||
self.assertEqual(type(self.sip.cmd("WN", "DUMMY", "H10", "+00:59", "2020-01-01", "00:00")), str)
|
||||
|
||||
def test_cmderr(self):
|
||||
self.login()
|
||||
with self.assertRaises(lmwsip.LmwCmdWarn):
|
||||
self.assertEqual(type(self.sip.cmd("NOP", "DUMMY", "H10", "+00:59", "2020-01-01", "00:00")), str)
|
||||
|
||||
def test_cmdWrite(self):
|
||||
self.login()
|
||||
self.assertEqual(type(self.sip.cmdWrite("WN", "DUMMY", "H10", "+00:20", "2020-01-01", "00:00", "35/10;33/10")), str)
|
||||
|
||||
def test_value(self):
|
||||
self.login()
|
||||
self.assertEqual(type(self.sip.value("WN", "DUMMY", "H10")), str)
|
||||
|
||||
def test_value1min(self):
|
||||
self.login()
|
||||
self.assertEqual(type(self.sip.value("WN", "DUMMY", "H1")), str)
|
||||
|
||||
def test_valueStr(self):
|
||||
self.login()
|
||||
self.assertEqual(type(self.sip.valueStr("WN", "DUMMY", "H10")), str)
|
||||
|
||||
def test_logout(self):
|
||||
self.login()
|
||||
self.assertEqual(self.sip.logout(), None)
|
||||
|
||||
def test_lmwTimeSerie(self):
|
||||
self.login()
|
||||
timezone = tz.gettz('GMT+1')
|
||||
res = self.sip.timeSerie("WN", "DUMMY", "H10",
|
||||
datetime.now(timezone)-timedelta(minutes=60),
|
||||
datetime.now(timezone))
|
||||
self.assertEqual(type(res.ts), list)
|
||||
self.assertEqual(len(res.ts), 6)
|
||||
self.assertEqual(res.ts[1][1][0], '1')
|
||||
|
||||
def test_roundtime(self):
|
||||
self.login()
|
||||
timezone = tz.gettz('GMT+1')
|
||||
t1 = datetime(2020, 1, 1, 0, 10, 0, 0, timezone)
|
||||
t2 = datetime(2020, 1, 1, 0, 0, 0, 1, timezone)
|
||||
self.assertEqual(self.sip._roundtime_(t1, timedelta(minutes=10)), t1)
|
||||
self.assertEqual(self.sip._roundtime_(t2, timedelta(minutes=10)), t1)
|
||||
|
||||
def test_closerecv(self):
|
||||
self.login()
|
||||
self.sip.send("CLOSE")
|
||||
with self.assertRaises(lmwsip.LmwSipConnectError):
|
||||
self.sip.recv()
|
||||
|
||||
def test_closeti(self):
|
||||
self.login()
|
||||
self.sip.send("CLOSE")
|
||||
self.assertEqual(type(self.sip.ti()), str)
|
||||
|
||||
def test_closecmd(self):
|
||||
self.login()
|
||||
self.sip.send("CLOSE")
|
||||
self.assertEqual(type(self.sip.cmd("WN", "DUMMY", "H10", "+00:59", "2020-01-01", "00:00")), str)
|
||||
|
||||
def test_reconnect(self):
|
||||
self.login(reconnecttime=1)
|
||||
sleep(2)
|
||||
self.assertEqual(self.sip.sendrecv("LOGOUTCOUNT"), "1\r")
|
||||
|
||||
def test_idlereconnect(self):
|
||||
self.login(idlereconnect=1)
|
||||
sleep(2)
|
||||
self.assertEqual(self.sip.sendrecv("LOGOUTCOUNT"), "1\r")
|
||||
|
||||
def test_versionstr(self):
|
||||
self.assertEqual(type(lmwsip.__version__), str)
|
||||
|
||||
def test_run(self):
|
||||
capturedOutput = io.StringIO()
|
||||
sys.stdout = capturedOutput
|
||||
testSipFile = io.StringIO("LI USER,PASS\rTI LMW\rLO")
|
||||
testSipFile.seek(0)
|
||||
args = myTestArgs()
|
||||
args.debug = "DEBUG"
|
||||
args.host = "localhost"
|
||||
args.port = self.sipserver.port
|
||||
args.unencrypted = True
|
||||
args.acceptssl = True
|
||||
args.cleartelnet = False
|
||||
args.time = "+00:59"
|
||||
args.date = "2020-01-01"
|
||||
args.files = [testSipFile]
|
||||
run(args)
|
||||
args.files[0].close()
|
||||
self.assertEqual(capturedOutput.getvalue().find("!")>= 0, True)
|
||||
self.assertEqual(capturedOutput.getvalue().find("?"), -1)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
@@ -1,131 +0,0 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
"""A stub sipserver for testing lmwsip
|
||||
|
||||
This is a stub sipserver that implements a small subset of the sip
|
||||
protocol to perform unit tests.
|
||||
|
||||
Implements the following commands:
|
||||
|
||||
CMD> LI USER,PASS
|
||||
ANS< !
|
||||
|
||||
CMD> TI LMW
|
||||
ANS< ! 20-JAN-01 00:00:00
|
||||
|
||||
CMD> WN LMW,DUMMY,H10,+HH:MM,yyyy-mm-dd,HH:MM,DATA
|
||||
ANS< ! 1/10,;2/10;....
|
||||
|
||||
CMD> WN LMW,DUMMY,H10,+HH:MM,yyyy-mm-dd,HH:MM,DATB
|
||||
ANS< ! 1/10/0;2/10/0;....
|
||||
|
||||
CMD> WN LMW,DUMMY,H1,+HH:MM,yyyy-mm-dd,HH:MM,DATA
|
||||
ANS< ! 1/10,;2/10;....
|
||||
|
||||
CMD> LO
|
||||
ANS< !
|
||||
|
||||
All other commands result in a "?"
|
||||
|
||||
CMD> *
|
||||
ANS< ? ERROR
|
||||
|
||||
Note:
|
||||
for a WN command the time and date are ignored.
|
||||
The duration is used to calculare the number of results to send.
|
||||
|
||||
The sip syntax for time is much flexibler.
|
||||
The stub only support this format!
|
||||
"""
|
||||
|
||||
|
||||
import os
|
||||
import time
|
||||
import random
|
||||
import socketserver
|
||||
|
||||
logoutcount=0
|
||||
|
||||
class sipProtocol(socketserver.BaseRequestHandler):
|
||||
def match(self, m):
|
||||
return(self.data.find(m.encode()) == 0)
|
||||
|
||||
def send(self, a):
|
||||
a = "%s\r" % a
|
||||
self.request.sendall(a.encode())
|
||||
|
||||
def read(self):
|
||||
try:
|
||||
self.data = self.request.recv(1024).strip()
|
||||
except:
|
||||
self.data = None
|
||||
|
||||
def number(self, b):
|
||||
if b[0] == b'0':
|
||||
return(int(b[0:1]))
|
||||
else:
|
||||
return(int(b[0:2]))
|
||||
|
||||
def meting(self, delta=10):
|
||||
res = ""
|
||||
sep = "! "
|
||||
elem = self.data.decode().split(",")
|
||||
h = self.number(elem[3][1:3])
|
||||
m = self.number(elem[3][4:6])
|
||||
aantal = 1+(60*h+m)//delta
|
||||
if self.data[-1:] == b'A':
|
||||
data = "%i/10"
|
||||
else:
|
||||
data = "%i/10/0"
|
||||
for i in range(aantal):
|
||||
res += sep+data % i
|
||||
sep=";"
|
||||
self.send(res)
|
||||
|
||||
def handle(self):
|
||||
global logoutcount
|
||||
self.read()
|
||||
while self.data:
|
||||
if self.match("LI USER,PASS"):
|
||||
self.send("!")
|
||||
elif self.match("TI LMW"):
|
||||
self.send("! 20-JAN-01 00:00:00")
|
||||
elif self.match("WN LMW,DUMMY,H10,"):
|
||||
self.meting(10)
|
||||
elif self.match("WN LMW,DUMMY,H1,"):
|
||||
self.meting(1)
|
||||
elif self.match("LOGOUTCOUNT"):
|
||||
self.send(str(logoutcount))
|
||||
elif self.match("LO"):
|
||||
logoutcount+=1
|
||||
self.send("!")
|
||||
elif self.match("CLOSE"):
|
||||
self.request.close()
|
||||
else:
|
||||
self.send("? ERROR")
|
||||
self.read()
|
||||
|
||||
class sipServer(socketserver.TCPServer):
|
||||
def __init__(self):
|
||||
self.port = None
|
||||
while self.port == None:
|
||||
self.port = random.randint(20000, 50000)
|
||||
try:
|
||||
super(sipServer, self).__init__(("localhost", self.port), sipProtocol)
|
||||
except:
|
||||
self.port = None
|
||||
|
||||
def run(self):
|
||||
self.pid = os.fork()
|
||||
if self.pid == 0:
|
||||
self.serve_forever()
|
||||
|
||||
def kill(self):
|
||||
if self.pid != 0:
|
||||
os.kill(self.pid, 15)
|
||||
self.server_close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
s = sipServer()
|
||||
s.run()
|
||||
pass
|
Reference in New Issue
Block a user