This commit is contained in:
@@ -3,7 +3,7 @@
|
|||||||
set -e
|
set -e
|
||||||
|
|
||||||
VERSION=$(grep version setup.cfg | sed 's/.*= *//')
|
VERSION=$(grep version setup.cfg | sed 's/.*= *//')
|
||||||
sed -i "s/^__version__ = .*/__version__ = '${VERSION}'/" lmwsip/__init__.py
|
sed -i "s/^__version__ = .*/__version__ = '${VERSION}'/" src/lmwsip/__init__.py
|
||||||
git add lmwsip/__init__.py
|
git add lmwsip/__init__.py
|
||||||
|
|
||||||
python setup.py test
|
python setup.py test
|
||||||
|
@@ -13,4 +13,10 @@ classifiers =
|
|||||||
Operating System :: OS Independent
|
Operating System :: OS Independent
|
||||||
|
|
||||||
[options]
|
[options]
|
||||||
|
package_dir =
|
||||||
|
= src
|
||||||
|
packages = find:
|
||||||
python_requires = >= 3.6
|
python_requires = >= 3.6
|
||||||
|
|
||||||
|
[options.packages.find]
|
||||||
|
where = src
|
||||||
|
64
src/lmwsip/run.py
Executable file
64
src/lmwsip/run.py
Executable file
@@ -0,0 +1,64 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import getopt
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
from lmwsip import LmwSip
|
||||||
|
|
||||||
|
def run(args):
|
||||||
|
logging.basicConfig(level=args.debug)
|
||||||
|
logging.debug("lmwsip.run %s" % args)
|
||||||
|
try:
|
||||||
|
lmwsip = LmwSip(host=args.host, port=args.port,
|
||||||
|
ssl=not args.unencrypted,
|
||||||
|
check_ssl=not args.acceptssl,
|
||||||
|
cleartelnet=args.cleartelnet)
|
||||||
|
except Exception as e:
|
||||||
|
print("Connect to lmw failed: %s" % e)
|
||||||
|
exit(1)
|
||||||
|
for f in args.files:
|
||||||
|
for cmd in f:
|
||||||
|
cmd = cmd.replace('{DATE}', args.date)
|
||||||
|
cmd = cmd.replace('{TIME}', args.time)
|
||||||
|
cmd = cmd.replace('\n', '\r')
|
||||||
|
print("> [%s]" % (cmd.strip('\r')))
|
||||||
|
try:
|
||||||
|
lmwsip.send(cmd)
|
||||||
|
print("< [%s]" % (lmwsip.recv().strip('\r')))
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
lmwsip.closesocket()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def main():
|
||||||
|
lastTime=LmwSip(host=None).lasttime("H10")
|
||||||
|
parser = argparse.ArgumentParser(description="Run a sip file.")
|
||||||
|
parser.add_argument("-u", "--unencrypted", action="store_true",
|
||||||
|
help="Run a sip connection without ssl")
|
||||||
|
parser.add_argument("-a", "--acceptssl", action="store_true",
|
||||||
|
help="Accept ssl certificate")
|
||||||
|
parser.add_argument("-c", "--cleartelnet", action="store_true",
|
||||||
|
help="Clear telnet protocol in tcp session")
|
||||||
|
parser.add_argument("-H", "--host", action='store',
|
||||||
|
default="sip-lmw.rws.nl",
|
||||||
|
help="Host to connect to")
|
||||||
|
parser.add_argument("-p", "--port", action='store', type=int, default=443,
|
||||||
|
help="Port to connect to")
|
||||||
|
parser.add_argument("-d", "--date", action='store',
|
||||||
|
default=lastTime["day"],
|
||||||
|
help="Date replacement string [DD-MM-YYYY]")
|
||||||
|
parser.add_argument("-t", "--time", action='store',
|
||||||
|
default=lastTime["time_of_day"],
|
||||||
|
help="Time replacement string [HH:MM]")
|
||||||
|
parser.add_argument("-D", "--debug", action='store',
|
||||||
|
default="WARN",
|
||||||
|
help="Debug level")
|
||||||
|
parser.add_argument("files", type=argparse.FileType('r'), nargs="+",
|
||||||
|
help="Sip files to run")
|
||||||
|
run(parser.parse_args())
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
164
src/lmwsip/tests/__init__.py
Executable file
164
src/lmwsip/tests/__init__.py
Executable file
@@ -0,0 +1,164 @@
|
|||||||
|
#!/usr/bin/python
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import io
|
||||||
|
import unittest
|
||||||
|
import lmwsip
|
||||||
|
import lmwsip.tests.stubSipServer
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from lmwsip.tests.stubSipServer import sipServer
|
||||||
|
from lmwsip.run import run
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from dateutil import tz
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
class myTestArgs():
|
||||||
|
pass
|
||||||
|
|
||||||
|
class lmwsipTest(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.sipserver = sipServer()
|
||||||
|
self.sip = None
|
||||||
|
self.sipserver.run()
|
||||||
|
|
||||||
|
def login(self, **args):
|
||||||
|
log = logging.basicConfig(level=logging.DEBUG)
|
||||||
|
self.sip = lmwsip.LmwSip("USER", "PASS", "localhost",
|
||||||
|
self.sipserver.port, ssl=False,
|
||||||
|
log=log, **args)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
if self.sip:
|
||||||
|
self.sip.closesocket()
|
||||||
|
self.sipserver.kill()
|
||||||
|
|
||||||
|
def test_sipobj(self):
|
||||||
|
self.login()
|
||||||
|
self.assertEqual(type(self.sip), lmwsip.LmwSip)
|
||||||
|
|
||||||
|
def test_H1(self):
|
||||||
|
self.sip = lmwsip.LmwSip(host=None)
|
||||||
|
self.assertEqual(self.sip.period('H1'), 1)
|
||||||
|
|
||||||
|
def test_H10(self):
|
||||||
|
self.sip = lmwsip.LmwSip(host=None)
|
||||||
|
self.assertEqual(self.sip.period('H10'), 10)
|
||||||
|
|
||||||
|
def test_xHm0(self):
|
||||||
|
self.sip = lmwsip.LmwSip(host=None)
|
||||||
|
self.assertEqual(self.sip.period('xHm0'), 10)
|
||||||
|
|
||||||
|
def test_Noparm(self):
|
||||||
|
self.sip = lmwsip.LmwSip(host=None)
|
||||||
|
with self.assertRaises(lmwsip.LmwParmWarn):
|
||||||
|
self.assertEqual(self.sip.period('Noparm'), None)
|
||||||
|
|
||||||
|
def test_loginfail(self):
|
||||||
|
with self.assertRaises(lmwsip.LmwLoginFailure):
|
||||||
|
self.sip = lmwsip.LmwSip("FAIL", "FAIL", "localhost",
|
||||||
|
self.sipserver.port, ssl=False)
|
||||||
|
|
||||||
|
def test_ti(self):
|
||||||
|
self.login()
|
||||||
|
self.assertEqual(type(self.sip.ti()), str)
|
||||||
|
|
||||||
|
def test_telnetti(self):
|
||||||
|
self.login(cleartelnet=True)
|
||||||
|
self.assertEqual(type(self.sip.ti()), str)
|
||||||
|
|
||||||
|
def test_cmd(self):
|
||||||
|
self.login()
|
||||||
|
self.assertEqual(type(self.sip.cmd("WN", "DUMMY", "H10", "+00:59", "2020-01-01", "00:00")), str)
|
||||||
|
|
||||||
|
def test_cmderr(self):
|
||||||
|
self.login()
|
||||||
|
with self.assertRaises(lmwsip.LmwCmdWarn):
|
||||||
|
self.assertEqual(type(self.sip.cmd("NOP", "DUMMY", "H10", "+00:59", "2020-01-01", "00:00")), str)
|
||||||
|
|
||||||
|
def test_value(self):
|
||||||
|
self.login()
|
||||||
|
self.assertEqual(type(self.sip.value("WN", "DUMMY", "H10")), str)
|
||||||
|
|
||||||
|
def test_value1min(self):
|
||||||
|
self.login()
|
||||||
|
self.assertEqual(type(self.sip.value("WN", "DUMMY", "H1")), str)
|
||||||
|
|
||||||
|
def test_valueStr(self):
|
||||||
|
self.login()
|
||||||
|
self.assertEqual(type(self.sip.valueStr("WN", "DUMMY", "H10")), str)
|
||||||
|
|
||||||
|
def test_logout(self):
|
||||||
|
self.login()
|
||||||
|
self.assertEqual(self.sip.logout(), None)
|
||||||
|
|
||||||
|
def test_lmwTimeSerie(self):
|
||||||
|
self.login()
|
||||||
|
timezone = tz.gettz('GMT+1')
|
||||||
|
res = self.sip.timeSerie("WN", "DUMMY", "H10",
|
||||||
|
datetime.now(timezone)-timedelta(minutes=60),
|
||||||
|
datetime.now(timezone))
|
||||||
|
self.assertEqual(type(res.ts), list)
|
||||||
|
self.assertEqual(len(res.ts), 6)
|
||||||
|
self.assertEqual(res.ts[1][1][0], '1')
|
||||||
|
|
||||||
|
def test_roundtime(self):
|
||||||
|
self.login()
|
||||||
|
timezone = tz.gettz('GMT+1')
|
||||||
|
t1 = datetime(2020, 1, 1, 0, 10, 0, 0, timezone)
|
||||||
|
t2 = datetime(2020, 1, 1, 0, 0, 0, 1, timezone)
|
||||||
|
self.assertEqual(self.sip._roundtime_(t1, timedelta(minutes=10)), t1)
|
||||||
|
self.assertEqual(self.sip._roundtime_(t2, timedelta(minutes=10)), t1)
|
||||||
|
|
||||||
|
def test_closerecv(self):
|
||||||
|
self.login()
|
||||||
|
self.sip.send("CLOSE")
|
||||||
|
with self.assertRaises(lmwsip.LmwSipConnectError):
|
||||||
|
self.sip.recv()
|
||||||
|
|
||||||
|
def test_closeti(self):
|
||||||
|
self.login()
|
||||||
|
self.sip.send("CLOSE")
|
||||||
|
self.assertEqual(type(self.sip.ti()), str)
|
||||||
|
|
||||||
|
def test_closecmd(self):
|
||||||
|
self.login()
|
||||||
|
self.sip.send("CLOSE")
|
||||||
|
self.assertEqual(type(self.sip.cmd("WN", "DUMMY", "H10", "+00:59", "2020-01-01", "00:00")), str)
|
||||||
|
|
||||||
|
def test_reconnect(self):
|
||||||
|
self.login(reconnecttime=1)
|
||||||
|
sleep(2)
|
||||||
|
self.assertEqual(self.sip.sendrecv("LOGOUTCOUNT"), "1\r")
|
||||||
|
|
||||||
|
def test_idlereconnect(self):
|
||||||
|
self.login(idlereconnect=1)
|
||||||
|
sleep(2)
|
||||||
|
self.assertEqual(self.sip.sendrecv("LOGOUTCOUNT"), "1\r")
|
||||||
|
|
||||||
|
def test_versionstr(self):
|
||||||
|
self.assertEqual(type(lmwsip.__version__), str)
|
||||||
|
|
||||||
|
def test_run(self):
|
||||||
|
capturedOutput = io.StringIO()
|
||||||
|
sys.stdout = capturedOutput
|
||||||
|
testSipFile = io.StringIO("LI USER,PASS\rTI LMW\rLO")
|
||||||
|
testSipFile.seek(0)
|
||||||
|
args = myTestArgs()
|
||||||
|
args.debug = "DEBUG"
|
||||||
|
args.host = "localhost"
|
||||||
|
args.port = self.sipserver.port
|
||||||
|
args.unencrypted = True
|
||||||
|
args.acceptssl = True
|
||||||
|
args.cleartelnet = False
|
||||||
|
args.time = "+00:59"
|
||||||
|
args.date = "2020-01-01"
|
||||||
|
args.files = [testSipFile]
|
||||||
|
run(args)
|
||||||
|
args.files[0].close()
|
||||||
|
self.assertEqual(capturedOutput.getvalue().find("!")>= 0, True)
|
||||||
|
self.assertEqual(capturedOutput.getvalue().find("?"), -1)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
131
src/lmwsip/tests/stubSipServer.py
Executable file
131
src/lmwsip/tests/stubSipServer.py
Executable file
@@ -0,0 +1,131 @@
|
|||||||
|
#!/usr/bin/python
|
||||||
|
|
||||||
|
"""A stub sipserver for testing lmwsip
|
||||||
|
|
||||||
|
This is a stub sipserver that implements a small subset of the sip
|
||||||
|
protocol to perform unit tests.
|
||||||
|
|
||||||
|
Implements the following commands:
|
||||||
|
|
||||||
|
CMD> LI USER,PASS
|
||||||
|
ANS< !
|
||||||
|
|
||||||
|
CMD> TI LMW
|
||||||
|
ANS< ! 20-JAN-01 00:00:00
|
||||||
|
|
||||||
|
CMD> WN LMW,DUMMY,H10,+HH:MM,yyyy-mm-dd,HH:MM,DATA
|
||||||
|
ANS< ! 1/10,;2/10;....
|
||||||
|
|
||||||
|
CMD> WN LMW,DUMMY,H10,+HH:MM,yyyy-mm-dd,HH:MM,DATB
|
||||||
|
ANS< ! 1/10/0;2/10/0;....
|
||||||
|
|
||||||
|
CMD> WN LMW,DUMMY,H1,+HH:MM,yyyy-mm-dd,HH:MM,DATA
|
||||||
|
ANS< ! 1/10,;2/10;....
|
||||||
|
|
||||||
|
CMD> LO
|
||||||
|
ANS< !
|
||||||
|
|
||||||
|
All other commands result in a "?"
|
||||||
|
|
||||||
|
CMD> *
|
||||||
|
ANS< ? ERROR
|
||||||
|
|
||||||
|
Note:
|
||||||
|
for a WN command the time and date are ignored.
|
||||||
|
The duration is used to calculare the number of results to send.
|
||||||
|
|
||||||
|
The sip syntax for time is much flexibler.
|
||||||
|
The stub only support this format!
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
import random
|
||||||
|
import socketserver
|
||||||
|
|
||||||
|
logoutcount=0
|
||||||
|
|
||||||
|
class sipProtocol(socketserver.BaseRequestHandler):
|
||||||
|
def match(self, m):
|
||||||
|
return(self.data.find(m.encode()) == 0)
|
||||||
|
|
||||||
|
def send(self, a):
|
||||||
|
a = "%s\r" % a
|
||||||
|
self.request.sendall(a.encode())
|
||||||
|
|
||||||
|
def read(self):
|
||||||
|
try:
|
||||||
|
self.data = self.request.recv(1024).strip()
|
||||||
|
except:
|
||||||
|
self.data = None
|
||||||
|
|
||||||
|
def number(self, b):
|
||||||
|
if b[0] == b'0':
|
||||||
|
return(int(b[0:1]))
|
||||||
|
else:
|
||||||
|
return(int(b[0:2]))
|
||||||
|
|
||||||
|
def meting(self, delta=10):
|
||||||
|
res = ""
|
||||||
|
sep = "! "
|
||||||
|
elem = self.data.decode().split(",")
|
||||||
|
h = self.number(elem[3][1:3])
|
||||||
|
m = self.number(elem[3][4:6])
|
||||||
|
aantal = 1+(60*h+m)//delta
|
||||||
|
if self.data[-1:] == b'A':
|
||||||
|
data = "%i/10"
|
||||||
|
else:
|
||||||
|
data = "%i/10/0"
|
||||||
|
for i in range(aantal):
|
||||||
|
res += sep+data % i
|
||||||
|
sep=";"
|
||||||
|
self.send(res)
|
||||||
|
|
||||||
|
def handle(self):
|
||||||
|
global logoutcount
|
||||||
|
self.read()
|
||||||
|
while self.data:
|
||||||
|
if self.match("LI USER,PASS"):
|
||||||
|
self.send("!")
|
||||||
|
elif self.match("TI LMW"):
|
||||||
|
self.send("! 20-JAN-01 00:00:00")
|
||||||
|
elif self.match("WN LMW,DUMMY,H10,"):
|
||||||
|
self.meting(10)
|
||||||
|
elif self.match("WN LMW,DUMMY,H1,"):
|
||||||
|
self.meting(1)
|
||||||
|
elif self.match("LOGOUTCOUNT"):
|
||||||
|
self.send(str(logoutcount))
|
||||||
|
elif self.match("LO"):
|
||||||
|
logoutcount+=1
|
||||||
|
self.send("!")
|
||||||
|
elif self.match("CLOSE"):
|
||||||
|
self.request.close()
|
||||||
|
else:
|
||||||
|
self.send("? ERROR")
|
||||||
|
self.read()
|
||||||
|
|
||||||
|
class sipServer(socketserver.TCPServer):
|
||||||
|
def __init__(self):
|
||||||
|
self.port = None
|
||||||
|
while self.port == None:
|
||||||
|
self.port = random.randint(20000, 50000)
|
||||||
|
try:
|
||||||
|
super(sipServer, self).__init__(("localhost", self.port), sipProtocol)
|
||||||
|
except:
|
||||||
|
self.port = None
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.pid = os.fork()
|
||||||
|
if self.pid == 0:
|
||||||
|
self.serve_forever()
|
||||||
|
|
||||||
|
def kill(self):
|
||||||
|
if self.pid != 0:
|
||||||
|
os.kill(self.pid, 15)
|
||||||
|
self.server_close()
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
s = sipServer()
|
||||||
|
s.run()
|
||||||
|
pass
|
Reference in New Issue
Block a user