diff --git a/githooks/pre-commit b/githooks/pre-commit index 9118022..8c79f31 100755 --- a/githooks/pre-commit +++ b/githooks/pre-commit @@ -3,7 +3,7 @@ set -e VERSION=$(grep version setup.cfg | sed 's/.*= *//') -sed -i "s/^__version__ = .*/__version__ = '${VERSION}'/" lmwsip/__init__.py +sed -i "s/^__version__ = .*/__version__ = '${VERSION}'/" src/lmwsip/__init__.py git add lmwsip/__init__.py python setup.py test diff --git a/setup.cfg b/setup.cfg index e41e41a..aef68a0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -13,4 +13,10 @@ classifiers = Operating System :: OS Independent [options] +package_dir = + = src +packages = find: python_requires = >= 3.6 + +[options.packages.find] +where = src diff --git a/lmwsip/__init__.py b/src/lmwsip/__init__.py similarity index 100% rename from lmwsip/__init__.py rename to src/lmwsip/__init__.py diff --git a/src/lmwsip/run.py b/src/lmwsip/run.py new file mode 100755 index 0000000..78bfe03 --- /dev/null +++ b/src/lmwsip/run.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python3 + +import sys +import getopt +import argparse +import logging +from lmwsip import LmwSip + +def run(args): + logging.basicConfig(level=args.debug) + logging.debug("lmwsip.run %s" % args) + try: + lmwsip = LmwSip(host=args.host, port=args.port, + ssl=not args.unencrypted, + check_ssl=not args.acceptssl, + cleartelnet=args.cleartelnet) + except Exception as e: + print("Connect to lmw failed: %s" % e) + exit(1) + for f in args.files: + for cmd in f: + cmd = cmd.replace('{DATE}', args.date) + cmd = cmd.replace('{TIME}', args.time) + cmd = cmd.replace('\n', '\r') + print("> [%s]" % (cmd.strip('\r'))) + try: + lmwsip.send(cmd) + print("< [%s]" % (lmwsip.recv().strip('\r'))) + except: + pass + try: + lmwsip.closesocket() + except: + pass + +def main(): + lastTime=LmwSip(host=None).lasttime("H10") + parser = argparse.ArgumentParser(description="Run a sip file.") + parser.add_argument("-u", "--unencrypted", action="store_true", + help="Run a sip connection without ssl") + parser.add_argument("-a", "--acceptssl", action="store_true", + help="Accept ssl certificate") + parser.add_argument("-c", "--cleartelnet", action="store_true", + help="Clear telnet protocol in tcp session") + parser.add_argument("-H", "--host", action='store', + default="sip-lmw.rws.nl", + help="Host to connect to") + parser.add_argument("-p", "--port", action='store', type=int, default=443, + help="Port to connect to") + parser.add_argument("-d", "--date", action='store', + default=lastTime["day"], + help="Date replacement string [DD-MM-YYYY]") + parser.add_argument("-t", "--time", action='store', + default=lastTime["time_of_day"], + help="Time replacement string [HH:MM]") + parser.add_argument("-D", "--debug", action='store', + default="WARN", + help="Debug level") + parser.add_argument("files", type=argparse.FileType('r'), nargs="+", + help="Sip files to run") + run(parser.parse_args()) + +if __name__ == "__main__": + main() diff --git a/src/lmwsip/tests/__init__.py b/src/lmwsip/tests/__init__.py new file mode 100755 index 0000000..ce89f51 --- /dev/null +++ b/src/lmwsip/tests/__init__.py @@ -0,0 +1,164 @@ +#!/usr/bin/python + +import sys +import io +import unittest +import lmwsip +import lmwsip.tests.stubSipServer +import logging + +from lmwsip.tests.stubSipServer import sipServer +from lmwsip.run import run +from datetime import datetime, timedelta +from dateutil import tz +from time import sleep + +class myTestArgs(): + pass + +class lmwsipTest(unittest.TestCase): + + def setUp(self): + self.sipserver = sipServer() + self.sip = None + self.sipserver.run() + + def login(self, **args): + log = logging.basicConfig(level=logging.DEBUG) + self.sip = lmwsip.LmwSip("USER", "PASS", "localhost", + self.sipserver.port, ssl=False, + log=log, **args) + + def tearDown(self): + if self.sip: + self.sip.closesocket() + self.sipserver.kill() + + def test_sipobj(self): + self.login() + self.assertEqual(type(self.sip), lmwsip.LmwSip) + + def test_H1(self): + self.sip = lmwsip.LmwSip(host=None) + self.assertEqual(self.sip.period('H1'), 1) + + def test_H10(self): + self.sip = lmwsip.LmwSip(host=None) + self.assertEqual(self.sip.period('H10'), 10) + + def test_xHm0(self): + self.sip = lmwsip.LmwSip(host=None) + self.assertEqual(self.sip.period('xHm0'), 10) + + def test_Noparm(self): + self.sip = lmwsip.LmwSip(host=None) + with self.assertRaises(lmwsip.LmwParmWarn): + self.assertEqual(self.sip.period('Noparm'), None) + + def test_loginfail(self): + with self.assertRaises(lmwsip.LmwLoginFailure): + self.sip = lmwsip.LmwSip("FAIL", "FAIL", "localhost", + self.sipserver.port, ssl=False) + + def test_ti(self): + self.login() + self.assertEqual(type(self.sip.ti()), str) + + def test_telnetti(self): + self.login(cleartelnet=True) + self.assertEqual(type(self.sip.ti()), str) + + def test_cmd(self): + self.login() + self.assertEqual(type(self.sip.cmd("WN", "DUMMY", "H10", "+00:59", "2020-01-01", "00:00")), str) + + def test_cmderr(self): + self.login() + with self.assertRaises(lmwsip.LmwCmdWarn): + self.assertEqual(type(self.sip.cmd("NOP", "DUMMY", "H10", "+00:59", "2020-01-01", "00:00")), str) + + def test_value(self): + self.login() + self.assertEqual(type(self.sip.value("WN", "DUMMY", "H10")), str) + + def test_value1min(self): + self.login() + self.assertEqual(type(self.sip.value("WN", "DUMMY", "H1")), str) + + def test_valueStr(self): + self.login() + self.assertEqual(type(self.sip.valueStr("WN", "DUMMY", "H10")), str) + + def test_logout(self): + self.login() + self.assertEqual(self.sip.logout(), None) + + def test_lmwTimeSerie(self): + self.login() + timezone = tz.gettz('GMT+1') + res = self.sip.timeSerie("WN", "DUMMY", "H10", + datetime.now(timezone)-timedelta(minutes=60), + datetime.now(timezone)) + self.assertEqual(type(res.ts), list) + self.assertEqual(len(res.ts), 6) + self.assertEqual(res.ts[1][1][0], '1') + + def test_roundtime(self): + self.login() + timezone = tz.gettz('GMT+1') + t1 = datetime(2020, 1, 1, 0, 10, 0, 0, timezone) + t2 = datetime(2020, 1, 1, 0, 0, 0, 1, timezone) + self.assertEqual(self.sip._roundtime_(t1, timedelta(minutes=10)), t1) + self.assertEqual(self.sip._roundtime_(t2, timedelta(minutes=10)), t1) + + def test_closerecv(self): + self.login() + self.sip.send("CLOSE") + with self.assertRaises(lmwsip.LmwSipConnectError): + self.sip.recv() + + def test_closeti(self): + self.login() + self.sip.send("CLOSE") + self.assertEqual(type(self.sip.ti()), str) + + def test_closecmd(self): + self.login() + self.sip.send("CLOSE") + self.assertEqual(type(self.sip.cmd("WN", "DUMMY", "H10", "+00:59", "2020-01-01", "00:00")), str) + + def test_reconnect(self): + self.login(reconnecttime=1) + sleep(2) + self.assertEqual(self.sip.sendrecv("LOGOUTCOUNT"), "1\r") + + def test_idlereconnect(self): + self.login(idlereconnect=1) + sleep(2) + self.assertEqual(self.sip.sendrecv("LOGOUTCOUNT"), "1\r") + + def test_versionstr(self): + self.assertEqual(type(lmwsip.__version__), str) + + def test_run(self): + capturedOutput = io.StringIO() + sys.stdout = capturedOutput + testSipFile = io.StringIO("LI USER,PASS\rTI LMW\rLO") + testSipFile.seek(0) + args = myTestArgs() + args.debug = "DEBUG" + args.host = "localhost" + args.port = self.sipserver.port + args.unencrypted = True + args.acceptssl = True + args.cleartelnet = False + args.time = "+00:59" + args.date = "2020-01-01" + args.files = [testSipFile] + run(args) + args.files[0].close() + self.assertEqual(capturedOutput.getvalue().find("!")>= 0, True) + self.assertEqual(capturedOutput.getvalue().find("?"), -1) + +if __name__ == '__main__': + unittest.main() diff --git a/src/lmwsip/tests/stubSipServer.py b/src/lmwsip/tests/stubSipServer.py new file mode 100755 index 0000000..75d345c --- /dev/null +++ b/src/lmwsip/tests/stubSipServer.py @@ -0,0 +1,131 @@ +#!/usr/bin/python + +"""A stub sipserver for testing lmwsip + +This is a stub sipserver that implements a small subset of the sip +protocol to perform unit tests. + +Implements the following commands: + + CMD> LI USER,PASS + ANS< ! + + CMD> TI LMW + ANS< ! 20-JAN-01 00:00:00 + + CMD> WN LMW,DUMMY,H10,+HH:MM,yyyy-mm-dd,HH:MM,DATA + ANS< ! 1/10,;2/10;.... + + CMD> WN LMW,DUMMY,H10,+HH:MM,yyyy-mm-dd,HH:MM,DATB + ANS< ! 1/10/0;2/10/0;.... + + CMD> WN LMW,DUMMY,H1,+HH:MM,yyyy-mm-dd,HH:MM,DATA + ANS< ! 1/10,;2/10;.... + + CMD> LO + ANS< ! + +All other commands result in a "?" + + CMD> * + ANS< ? ERROR + +Note: + for a WN command the time and date are ignored. + The duration is used to calculare the number of results to send. + + The sip syntax for time is much flexibler. + The stub only support this format! +""" + + +import os +import time +import random +import socketserver + +logoutcount=0 + +class sipProtocol(socketserver.BaseRequestHandler): + def match(self, m): + return(self.data.find(m.encode()) == 0) + + def send(self, a): + a = "%s\r" % a + self.request.sendall(a.encode()) + + def read(self): + try: + self.data = self.request.recv(1024).strip() + except: + self.data = None + + def number(self, b): + if b[0] == b'0': + return(int(b[0:1])) + else: + return(int(b[0:2])) + + def meting(self, delta=10): + res = "" + sep = "! " + elem = self.data.decode().split(",") + h = self.number(elem[3][1:3]) + m = self.number(elem[3][4:6]) + aantal = 1+(60*h+m)//delta + if self.data[-1:] == b'A': + data = "%i/10" + else: + data = "%i/10/0" + for i in range(aantal): + res += sep+data % i + sep=";" + self.send(res) + + def handle(self): + global logoutcount + self.read() + while self.data: + if self.match("LI USER,PASS"): + self.send("!") + elif self.match("TI LMW"): + self.send("! 20-JAN-01 00:00:00") + elif self.match("WN LMW,DUMMY,H10,"): + self.meting(10) + elif self.match("WN LMW,DUMMY,H1,"): + self.meting(1) + elif self.match("LOGOUTCOUNT"): + self.send(str(logoutcount)) + elif self.match("LO"): + logoutcount+=1 + self.send("!") + elif self.match("CLOSE"): + self.request.close() + else: + self.send("? ERROR") + self.read() + +class sipServer(socketserver.TCPServer): + def __init__(self): + self.port = None + while self.port == None: + self.port = random.randint(20000, 50000) + try: + super(sipServer, self).__init__(("localhost", self.port), sipProtocol) + except: + self.port = None + + def run(self): + self.pid = os.fork() + if self.pid == 0: + self.serve_forever() + + def kill(self): + if self.pid != 0: + os.kill(self.pid, 15) + self.server_close() + +if __name__ == '__main__': + s = sipServer() + s.run() + pass