Compare commits
2 Commits
61997df1f3
...
08cd160713
Author | SHA1 | Date | |
---|---|---|---|
|
08cd160713 | ||
|
da80bd93d9 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -7,3 +7,4 @@ lmwsip_marceln.egg-info
|
|||||||
test/__pycache__
|
test/__pycache__
|
||||||
lmwsip.egg-info
|
lmwsip.egg-info
|
||||||
*.swp
|
*.swp
|
||||||
|
.tox
|
||||||
|
@@ -6,5 +6,5 @@ VERSION=$(grep version setup.cfg | sed 's/.*= *//')
|
|||||||
sed -i "s/^__version__ = .*/__version__ = '${VERSION}'/" src/lmwsip/__init__.py
|
sed -i "s/^__version__ = .*/__version__ = '${VERSION}'/" src/lmwsip/__init__.py
|
||||||
git add lmwsip/__init__.py
|
git add lmwsip/__init__.py
|
||||||
|
|
||||||
python setup.py test
|
tox
|
||||||
yamllint .drone.yml
|
yamllint .drone.yml
|
||||||
|
@@ -1,64 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import getopt
|
|
||||||
import argparse
|
|
||||||
import logging
|
|
||||||
from lmwsip import LmwSip
|
|
||||||
|
|
||||||
def run(args):
|
|
||||||
logging.basicConfig(level=args.debug)
|
|
||||||
logging.debug("lmwsip.run %s" % args)
|
|
||||||
try:
|
|
||||||
lmwsip = LmwSip(host=args.host, port=args.port,
|
|
||||||
ssl=not args.unencrypted,
|
|
||||||
check_ssl=not args.acceptssl,
|
|
||||||
cleartelnet=args.cleartelnet)
|
|
||||||
except Exception as e:
|
|
||||||
print("Connect to lmw failed: %s" % e)
|
|
||||||
exit(1)
|
|
||||||
for f in args.files:
|
|
||||||
for cmd in f:
|
|
||||||
cmd = cmd.replace('{DATE}', args.date)
|
|
||||||
cmd = cmd.replace('{TIME}', args.time)
|
|
||||||
cmd = cmd.replace('\n', '\r')
|
|
||||||
print("> [%s]" % (cmd.strip('\r')))
|
|
||||||
try:
|
|
||||||
lmwsip.send(cmd)
|
|
||||||
print("< [%s]" % (lmwsip.recv().strip('\r')))
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
try:
|
|
||||||
lmwsip.closesocket()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def main():
|
|
||||||
lastTime=LmwSip(host=None).lasttime("H10")
|
|
||||||
parser = argparse.ArgumentParser(description="Run a sip file.")
|
|
||||||
parser.add_argument("-u", "--unencrypted", action="store_true",
|
|
||||||
help="Run a sip connection without ssl")
|
|
||||||
parser.add_argument("-a", "--acceptssl", action="store_true",
|
|
||||||
help="Accept ssl certificate")
|
|
||||||
parser.add_argument("-c", "--cleartelnet", action="store_true",
|
|
||||||
help="Clear telnet protocol in tcp session")
|
|
||||||
parser.add_argument("-H", "--host", action='store',
|
|
||||||
default="sip-lmw.rws.nl",
|
|
||||||
help="Host to connect to")
|
|
||||||
parser.add_argument("-p", "--port", action='store', type=int, default=443,
|
|
||||||
help="Port to connect to")
|
|
||||||
parser.add_argument("-d", "--date", action='store',
|
|
||||||
default=lastTime["day"],
|
|
||||||
help="Date replacement string [DD-MM-YYYY]")
|
|
||||||
parser.add_argument("-t", "--time", action='store',
|
|
||||||
default=lastTime["time_of_day"],
|
|
||||||
help="Time replacement string [HH:MM]")
|
|
||||||
parser.add_argument("-D", "--debug", action='store',
|
|
||||||
default="WARN",
|
|
||||||
help="Debug level")
|
|
||||||
parser.add_argument("files", type=argparse.FileType('r'), nargs="+",
|
|
||||||
help="Sip files to run")
|
|
||||||
run(parser.parse_args())
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
@@ -1,164 +0,0 @@
|
|||||||
#!/usr/bin/python
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import io
|
|
||||||
import unittest
|
|
||||||
import lmwsip
|
|
||||||
import lmwsip.tests.stubSipServer
|
|
||||||
import logging
|
|
||||||
|
|
||||||
from lmwsip.tests.stubSipServer import sipServer
|
|
||||||
from lmwsip.run import run
|
|
||||||
from datetime import datetime, timedelta
|
|
||||||
from dateutil import tz
|
|
||||||
from time import sleep
|
|
||||||
|
|
||||||
class myTestArgs():
|
|
||||||
pass
|
|
||||||
|
|
||||||
class lmwsipTest(unittest.TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.sipserver = sipServer()
|
|
||||||
self.sip = None
|
|
||||||
self.sipserver.run()
|
|
||||||
|
|
||||||
def login(self, **args):
|
|
||||||
log = logging.basicConfig(level=logging.DEBUG)
|
|
||||||
self.sip = lmwsip.LmwSip("USER", "PASS", "localhost",
|
|
||||||
self.sipserver.port, ssl=False,
|
|
||||||
log=log, **args)
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
if self.sip:
|
|
||||||
self.sip.closesocket()
|
|
||||||
self.sipserver.kill()
|
|
||||||
|
|
||||||
def test_sipobj(self):
|
|
||||||
self.login()
|
|
||||||
self.assertEqual(type(self.sip), lmwsip.LmwSip)
|
|
||||||
|
|
||||||
def test_H1(self):
|
|
||||||
self.sip = lmwsip.LmwSip(host=None)
|
|
||||||
self.assertEqual(self.sip.period('H1'), 1)
|
|
||||||
|
|
||||||
def test_H10(self):
|
|
||||||
self.sip = lmwsip.LmwSip(host=None)
|
|
||||||
self.assertEqual(self.sip.period('H10'), 10)
|
|
||||||
|
|
||||||
def test_xHm0(self):
|
|
||||||
self.sip = lmwsip.LmwSip(host=None)
|
|
||||||
self.assertEqual(self.sip.period('xHm0'), 10)
|
|
||||||
|
|
||||||
def test_Noparm(self):
|
|
||||||
self.sip = lmwsip.LmwSip(host=None)
|
|
||||||
with self.assertRaises(lmwsip.LmwParmWarn):
|
|
||||||
self.assertEqual(self.sip.period('Noparm'), None)
|
|
||||||
|
|
||||||
def test_loginfail(self):
|
|
||||||
with self.assertRaises(lmwsip.LmwLoginFailure):
|
|
||||||
self.sip = lmwsip.LmwSip("FAIL", "FAIL", "localhost",
|
|
||||||
self.sipserver.port, ssl=False)
|
|
||||||
|
|
||||||
def test_ti(self):
|
|
||||||
self.login()
|
|
||||||
self.assertEqual(type(self.sip.ti()), str)
|
|
||||||
|
|
||||||
def test_telnetti(self):
|
|
||||||
self.login(cleartelnet=True)
|
|
||||||
self.assertEqual(type(self.sip.ti()), str)
|
|
||||||
|
|
||||||
def test_cmd(self):
|
|
||||||
self.login()
|
|
||||||
self.assertEqual(type(self.sip.cmd("WN", "DUMMY", "H10", "+00:59", "2020-01-01", "00:00")), str)
|
|
||||||
|
|
||||||
def test_cmderr(self):
|
|
||||||
self.login()
|
|
||||||
with self.assertRaises(lmwsip.LmwCmdWarn):
|
|
||||||
self.assertEqual(type(self.sip.cmd("NOP", "DUMMY", "H10", "+00:59", "2020-01-01", "00:00")), str)
|
|
||||||
|
|
||||||
def test_value(self):
|
|
||||||
self.login()
|
|
||||||
self.assertEqual(type(self.sip.value("WN", "DUMMY", "H10")), str)
|
|
||||||
|
|
||||||
def test_value1min(self):
|
|
||||||
self.login()
|
|
||||||
self.assertEqual(type(self.sip.value("WN", "DUMMY", "H1")), str)
|
|
||||||
|
|
||||||
def test_valueStr(self):
|
|
||||||
self.login()
|
|
||||||
self.assertEqual(type(self.sip.valueStr("WN", "DUMMY", "H10")), str)
|
|
||||||
|
|
||||||
def test_logout(self):
|
|
||||||
self.login()
|
|
||||||
self.assertEqual(self.sip.logout(), None)
|
|
||||||
|
|
||||||
def test_lmwTimeSerie(self):
|
|
||||||
self.login()
|
|
||||||
timezone = tz.gettz('GMT+1')
|
|
||||||
res = self.sip.timeSerie("WN", "DUMMY", "H10",
|
|
||||||
datetime.now(timezone)-timedelta(minutes=60),
|
|
||||||
datetime.now(timezone))
|
|
||||||
self.assertEqual(type(res.ts), list)
|
|
||||||
self.assertEqual(len(res.ts), 6)
|
|
||||||
self.assertEqual(res.ts[1][1][0], '1')
|
|
||||||
|
|
||||||
def test_roundtime(self):
|
|
||||||
self.login()
|
|
||||||
timezone = tz.gettz('GMT+1')
|
|
||||||
t1 = datetime(2020, 1, 1, 0, 10, 0, 0, timezone)
|
|
||||||
t2 = datetime(2020, 1, 1, 0, 0, 0, 1, timezone)
|
|
||||||
self.assertEqual(self.sip._roundtime_(t1, timedelta(minutes=10)), t1)
|
|
||||||
self.assertEqual(self.sip._roundtime_(t2, timedelta(minutes=10)), t1)
|
|
||||||
|
|
||||||
def test_closerecv(self):
|
|
||||||
self.login()
|
|
||||||
self.sip.send("CLOSE")
|
|
||||||
with self.assertRaises(lmwsip.LmwSipConnectError):
|
|
||||||
self.sip.recv()
|
|
||||||
|
|
||||||
def test_closeti(self):
|
|
||||||
self.login()
|
|
||||||
self.sip.send("CLOSE")
|
|
||||||
self.assertEqual(type(self.sip.ti()), str)
|
|
||||||
|
|
||||||
def test_closecmd(self):
|
|
||||||
self.login()
|
|
||||||
self.sip.send("CLOSE")
|
|
||||||
self.assertEqual(type(self.sip.cmd("WN", "DUMMY", "H10", "+00:59", "2020-01-01", "00:00")), str)
|
|
||||||
|
|
||||||
def test_reconnect(self):
|
|
||||||
self.login(reconnecttime=1)
|
|
||||||
sleep(2)
|
|
||||||
self.assertEqual(self.sip.sendrecv("LOGOUTCOUNT"), "1\r")
|
|
||||||
|
|
||||||
def test_idlereconnect(self):
|
|
||||||
self.login(idlereconnect=1)
|
|
||||||
sleep(2)
|
|
||||||
self.assertEqual(self.sip.sendrecv("LOGOUTCOUNT"), "1\r")
|
|
||||||
|
|
||||||
def test_versionstr(self):
|
|
||||||
self.assertEqual(type(lmwsip.__version__), str)
|
|
||||||
|
|
||||||
def test_run(self):
|
|
||||||
capturedOutput = io.StringIO()
|
|
||||||
sys.stdout = capturedOutput
|
|
||||||
testSipFile = io.StringIO("LI USER,PASS\rTI LMW\rLO")
|
|
||||||
testSipFile.seek(0)
|
|
||||||
args = myTestArgs()
|
|
||||||
args.debug = "DEBUG"
|
|
||||||
args.host = "localhost"
|
|
||||||
args.port = self.sipserver.port
|
|
||||||
args.unencrypted = True
|
|
||||||
args.acceptssl = True
|
|
||||||
args.cleartelnet = False
|
|
||||||
args.time = "+00:59"
|
|
||||||
args.date = "2020-01-01"
|
|
||||||
args.files = [testSipFile]
|
|
||||||
run(args)
|
|
||||||
args.files[0].close()
|
|
||||||
self.assertEqual(capturedOutput.getvalue().find("!")>= 0, True)
|
|
||||||
self.assertEqual(capturedOutput.getvalue().find("?"), -1)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main()
|
|
@@ -1,131 +0,0 @@
|
|||||||
#!/usr/bin/python
|
|
||||||
|
|
||||||
"""A stub sipserver for testing lmwsip
|
|
||||||
|
|
||||||
This is a stub sipserver that implements a small subset of the sip
|
|
||||||
protocol to perform unit tests.
|
|
||||||
|
|
||||||
Implements the following commands:
|
|
||||||
|
|
||||||
CMD> LI USER,PASS
|
|
||||||
ANS< !
|
|
||||||
|
|
||||||
CMD> TI LMW
|
|
||||||
ANS< ! 20-JAN-01 00:00:00
|
|
||||||
|
|
||||||
CMD> WN LMW,DUMMY,H10,+HH:MM,yyyy-mm-dd,HH:MM,DATA
|
|
||||||
ANS< ! 1/10,;2/10;....
|
|
||||||
|
|
||||||
CMD> WN LMW,DUMMY,H10,+HH:MM,yyyy-mm-dd,HH:MM,DATB
|
|
||||||
ANS< ! 1/10/0;2/10/0;....
|
|
||||||
|
|
||||||
CMD> WN LMW,DUMMY,H1,+HH:MM,yyyy-mm-dd,HH:MM,DATA
|
|
||||||
ANS< ! 1/10,;2/10;....
|
|
||||||
|
|
||||||
CMD> LO
|
|
||||||
ANS< !
|
|
||||||
|
|
||||||
All other commands result in a "?"
|
|
||||||
|
|
||||||
CMD> *
|
|
||||||
ANS< ? ERROR
|
|
||||||
|
|
||||||
Note:
|
|
||||||
for a WN command the time and date are ignored.
|
|
||||||
The duration is used to calculare the number of results to send.
|
|
||||||
|
|
||||||
The sip syntax for time is much flexibler.
|
|
||||||
The stub only support this format!
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
import os
|
|
||||||
import time
|
|
||||||
import random
|
|
||||||
import socketserver
|
|
||||||
|
|
||||||
logoutcount=0
|
|
||||||
|
|
||||||
class sipProtocol(socketserver.BaseRequestHandler):
|
|
||||||
def match(self, m):
|
|
||||||
return(self.data.find(m.encode()) == 0)
|
|
||||||
|
|
||||||
def send(self, a):
|
|
||||||
a = "%s\r" % a
|
|
||||||
self.request.sendall(a.encode())
|
|
||||||
|
|
||||||
def read(self):
|
|
||||||
try:
|
|
||||||
self.data = self.request.recv(1024).strip()
|
|
||||||
except:
|
|
||||||
self.data = None
|
|
||||||
|
|
||||||
def number(self, b):
|
|
||||||
if b[0] == b'0':
|
|
||||||
return(int(b[0:1]))
|
|
||||||
else:
|
|
||||||
return(int(b[0:2]))
|
|
||||||
|
|
||||||
def meting(self, delta=10):
|
|
||||||
res = ""
|
|
||||||
sep = "! "
|
|
||||||
elem = self.data.decode().split(",")
|
|
||||||
h = self.number(elem[3][1:3])
|
|
||||||
m = self.number(elem[3][4:6])
|
|
||||||
aantal = 1+(60*h+m)//delta
|
|
||||||
if self.data[-1:] == b'A':
|
|
||||||
data = "%i/10"
|
|
||||||
else:
|
|
||||||
data = "%i/10/0"
|
|
||||||
for i in range(aantal):
|
|
||||||
res += sep+data % i
|
|
||||||
sep=";"
|
|
||||||
self.send(res)
|
|
||||||
|
|
||||||
def handle(self):
|
|
||||||
global logoutcount
|
|
||||||
self.read()
|
|
||||||
while self.data:
|
|
||||||
if self.match("LI USER,PASS"):
|
|
||||||
self.send("!")
|
|
||||||
elif self.match("TI LMW"):
|
|
||||||
self.send("! 20-JAN-01 00:00:00")
|
|
||||||
elif self.match("WN LMW,DUMMY,H10,"):
|
|
||||||
self.meting(10)
|
|
||||||
elif self.match("WN LMW,DUMMY,H1,"):
|
|
||||||
self.meting(1)
|
|
||||||
elif self.match("LOGOUTCOUNT"):
|
|
||||||
self.send(str(logoutcount))
|
|
||||||
elif self.match("LO"):
|
|
||||||
logoutcount+=1
|
|
||||||
self.send("!")
|
|
||||||
elif self.match("CLOSE"):
|
|
||||||
self.request.close()
|
|
||||||
else:
|
|
||||||
self.send("? ERROR")
|
|
||||||
self.read()
|
|
||||||
|
|
||||||
class sipServer(socketserver.TCPServer):
|
|
||||||
def __init__(self):
|
|
||||||
self.port = None
|
|
||||||
while self.port == None:
|
|
||||||
self.port = random.randint(20000, 50000)
|
|
||||||
try:
|
|
||||||
super(sipServer, self).__init__(("localhost", self.port), sipProtocol)
|
|
||||||
except:
|
|
||||||
self.port = None
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
self.pid = os.fork()
|
|
||||||
if self.pid == 0:
|
|
||||||
self.serve_forever()
|
|
||||||
|
|
||||||
def kill(self):
|
|
||||||
if self.pid != 0:
|
|
||||||
os.kill(self.pid, 15)
|
|
||||||
self.server_close()
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
s = sipServer()
|
|
||||||
s.run()
|
|
||||||
pass
|
|
@@ -15,6 +15,8 @@ classifiers =
|
|||||||
[options]
|
[options]
|
||||||
package_dir =
|
package_dir =
|
||||||
= src
|
= src
|
||||||
|
install_requires =
|
||||||
|
python-dateutil
|
||||||
packages = find:
|
packages = find:
|
||||||
python_requires = >= 3.6
|
python_requires = >= 3.6
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user