117 lines
2.7 KiB
Python
Executable File
117 lines
2.7 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
"""A stub sipserver for testing lmwsip
|
|
|
|
This is a stub sipserver that implements a small subset of the sip
|
|
protocol to perform unit tests.
|
|
|
|
Implements the following commands:
|
|
|
|
CMD> LI USER,PASS
|
|
ANS< !
|
|
|
|
CMD> TI LMW
|
|
ANS< ! 20-JAN-01 00:00:00
|
|
|
|
CMD> WN LMW,DUMMY,D10,+HH:MM,yyyy-mm-dd,HH:MM,DATA
|
|
ANS< ! 1/10,;2/10;....
|
|
|
|
CMD> WN LMW,DUMMY,D10,+HH:MM,yyyy-mm-dd,HH:MM,DATB
|
|
ANS< ! 1/10/0;2/10/0;....
|
|
|
|
CMD> LO
|
|
ANS< !
|
|
|
|
All other commands result in a "?"
|
|
|
|
CMD> *
|
|
ANS< ? ERROR
|
|
|
|
Note:
|
|
for a WN command the time and date are ignored.
|
|
The duration is used to calculare the number of results to send.
|
|
|
|
The sip syntax for time is much flexibler.
|
|
The stub only support this format!
|
|
"""
|
|
|
|
|
|
import os
|
|
import time
|
|
import random
|
|
import socketserver
|
|
|
|
class sipProtocol(socketserver.BaseRequestHandler):
|
|
|
|
def match(self, m):
|
|
return(self.data.find(m.encode()) == 0)
|
|
|
|
def send(self, a):
|
|
a = "%s\r" % a
|
|
self.request.sendall(a.encode())
|
|
|
|
def read(self):
|
|
try:
|
|
self.data = self.request.recv(1024).strip()
|
|
except:
|
|
self.data = None
|
|
|
|
def meting(self):
|
|
res = ""
|
|
sep = "! "
|
|
if self.data[18:19] == b'0':
|
|
h = int(self.data[19:20].decode())
|
|
else:
|
|
h = int(self.data[18:20].decode())
|
|
if self.data[21:22] == b'0':
|
|
m = int(self.data[22:23].decode())
|
|
else:
|
|
m = int(self.data[21:23].decode())
|
|
aantal = 1+(60*h+m)//10
|
|
if self.data[-1:] == b'A':
|
|
data = "%i/10"
|
|
else:
|
|
data = "%i/10/0"
|
|
for i in range(aantal):
|
|
res += sep+data % i
|
|
sep=";"
|
|
self.send(res)
|
|
|
|
def handle(self):
|
|
self.read()
|
|
while self.data:
|
|
if self.match("LI USER,PASS"):
|
|
self.send("!")
|
|
elif self.match("TI LMW"):
|
|
self.send("! 20-JAN-01 00:00:00")
|
|
elif self.match("WN LMW,DUMMY,D10"):
|
|
self.meting()
|
|
elif self.match("LO"):
|
|
self.send("!")
|
|
else:
|
|
self.send("? ERROR")
|
|
self.read()
|
|
|
|
class sipServer(socketserver.TCPServer):
|
|
def __init__(self):
|
|
self.port = None
|
|
while self.port == None:
|
|
self.port = random.randint(20000, 50000)
|
|
try:
|
|
super(sipServer, self).__init__(("localhost", self.port), sipProtocol)
|
|
except:
|
|
self.port = None
|
|
|
|
def run(self):
|
|
self.pid = os.fork()
|
|
if self.pid == 0:
|
|
self.serve_forever()
|
|
|
|
def kill(self):
|
|
if self.pid != 0:
|
|
os.kill(self.pid, 15)
|
|
self.server_close()
|
|
|
|
if __name__ == '__main__':
|
|
pass
|