143 lines
3.5 KiB
Python
Executable File
143 lines
3.5 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
"""A stub sipserver for testing lmwsip
|
|
|
|
This is a stub sipserver that implements a small subset of the sip
|
|
protocol to perform unit tests.
|
|
|
|
Implements the following commands:
|
|
|
|
CMD> LI USER,PASS
|
|
ANS< !
|
|
|
|
CMD> TI LMW
|
|
ANS< ! 20-JAN-01 00:00:00
|
|
|
|
CMD> WN LMW,DUMMY,H10,STAT
|
|
ANS< ! 20-JAN-01 00:00
|
|
|
|
CMD> VW LMW,DUMMY,H10V,STAT
|
|
ANS< ! 20-JAN-01 00:00;20-JAN-01 01:00
|
|
|
|
CMD> WN LMW,DUMMY,H10,+HH:MM,yyyy-mm-dd,HH:MM,DATA
|
|
ANS< ! 1/10,;2/10;....
|
|
|
|
CMD> WN LMW,DUMMY,H10,+HH:MM,yyyy-mm-dd,HH:MM,DATB
|
|
ANS< ! 1/10/0;2/10/0;....
|
|
|
|
CMD> WN LMW,DUMMY,H1,+HH:MM,yyyy-mm-dd,HH:MM,DATA
|
|
ANS< ! 1/10,;2/10;....
|
|
|
|
CMD> LO
|
|
ANS< !
|
|
|
|
All other commands result in a "?"
|
|
|
|
CMD> *
|
|
ANS< ? ERROR
|
|
|
|
Note:
|
|
for a WN command the time and date are ignored.
|
|
The duration is used to calculare the number of results to send.
|
|
|
|
The sip syntax for time is much flexibler.
|
|
The stub only support this format!
|
|
"""
|
|
|
|
|
|
import os
|
|
import time
|
|
import random
|
|
import socketserver
|
|
|
|
logoutcount=0
|
|
|
|
class sipProtocol(socketserver.BaseRequestHandler):
|
|
def match(self, m):
|
|
return(self.data.find(m.encode()) != -1)
|
|
|
|
def send(self, a):
|
|
a = "%s\r" % a
|
|
self.request.sendall(a.encode())
|
|
|
|
def read(self):
|
|
try:
|
|
self.data = self.request.recv(1024).strip()
|
|
except:
|
|
self.data = None
|
|
|
|
def number(self, b):
|
|
if b[0] == b'0':
|
|
return(int(b[0:1]))
|
|
else:
|
|
return(int(b[0:2]))
|
|
|
|
def meting(self, delta=10):
|
|
res = ""
|
|
sep = "! "
|
|
elem = self.data.decode().split(",")
|
|
h = self.number(elem[3][1:3])
|
|
m = self.number(elem[3][4:6])
|
|
aantal = 1+(60*h+m)//delta
|
|
if self.data[-1:] == b'A':
|
|
data = "%i/10"
|
|
else:
|
|
data = "%i/10/0"
|
|
for i in range(aantal):
|
|
res += sep+data % i
|
|
sep=";"
|
|
self.send(res)
|
|
|
|
def handle(self):
|
|
global logoutcount
|
|
self.read()
|
|
while self.data:
|
|
if self.match("CLOSE"):
|
|
self.request.close()
|
|
elif self.match("LI USER,PASS"):
|
|
self.send("!")
|
|
elif self.match("TI LMW"):
|
|
self.send("! 20-JAN-01 00:00:00")
|
|
elif self.match("WN LMW,DUMMY,H10,"):
|
|
if self.match("STAT"):
|
|
self.send("! 20-JAN-01 00:00")
|
|
else:
|
|
self.meting(10)
|
|
elif self.match("VW LMW,DUMMY,H10V,STAT"):
|
|
self.send("! 20-JAN-01 00:00;20-JAN-01 01:00")
|
|
elif self.match("WN LMW,DUMMY,H1,"):
|
|
self.meting(1)
|
|
elif self.match("LOGOUTCOUNT"):
|
|
self.send(str(logoutcount))
|
|
elif self.match("LO"):
|
|
logoutcount+=1
|
|
self.send("!")
|
|
else:
|
|
self.send("? ERROR")
|
|
self.read()
|
|
|
|
class sipServer(socketserver.TCPServer):
|
|
def __init__(self):
|
|
self.port = None
|
|
while self.port == None:
|
|
self.port = random.randint(20000, 50000)
|
|
try:
|
|
super(sipServer, self).__init__(("localhost", self.port), sipProtocol)
|
|
except:
|
|
self.port = None
|
|
|
|
def run(self):
|
|
self.pid = os.fork()
|
|
if self.pid == 0:
|
|
self.serve_forever()
|
|
|
|
def kill(self):
|
|
if self.pid != 0:
|
|
os.kill(self.pid, 15)
|
|
self.server_close()
|
|
|
|
if __name__ == '__main__':
|
|
s = sipServer()
|
|
s.run()
|
|
pass
|