diff options
author | Vasilii Chernov <vchernov@inr.ru> | 2016-02-17 18:13:36 +0100 |
---|---|---|
committer | Vasilii Chernov <vchernov@inr.ru> | 2016-02-17 18:13:36 +0100 |
commit | 52eb7f4fb76ddf99dedf44332aae7af4df76ab36 (patch) | |
tree | 607cd6617966ad9d9998ce0af87f1142de957a0c /pywrap/test_pcipywrap.py | |
parent | 9a9ffd5594a5d27bbecf6160de2c33d44870f5bd (diff) | |
download | pcitool-52eb7f4fb76ddf99dedf44332aae7af4df76ab36.tar.gz pcitool-52eb7f4fb76ddf99dedf44332aae7af4df76ab36.tar.bz2 pcitool-52eb7f4fb76ddf99dedf44332aae7af4df76ab36.tar.xz pcitool-52eb7f4fb76ddf99dedf44332aae7af4df76ab36.zip |
Add test to repository. Add GIL states to pcilib_set_value_from_pyobject
Diffstat (limited to 'pywrap/test_pcipywrap.py')
-rwxr-xr-x | pywrap/test_pcipywrap.py | 119 |
1 files changed, 119 insertions, 0 deletions
diff --git a/pywrap/test_pcipywrap.py b/pywrap/test_pcipywrap.py new file mode 100755 index 0000000..257b4a5 --- /dev/null +++ b/pywrap/test_pcipywrap.py @@ -0,0 +1,119 @@ +import threading +import pcipywrap +import random +import os +import json +import requests +import time + +class test_pcipywrap(): + def __init__(self, device, model, num_threads = 150, + write_percentage = 0.1, register = 'test_prop2', + server_host = 'http://localhost', server_port = 12412, + server_message_delay = 0): + #initialize enviroment variables + if not 'APP_PATH' in os.environ: + APP_PATH = '' + file_dir = os.path.dirname(os.path.abspath(__file__)) + APP_PATH = str(os.path.abspath(file_dir + '/../..')) + os.environ["APP_PATH"] = APP_PATH + + if not 'PCILIB_MODEL_DIR' in os.environ: + os.environ['PCILIB_MODEL_DIR'] = os.environ["APP_PATH"] + "/xml" + if not 'LD_LIBRARY_PATH' in os.environ: + os.environ['LD_LIBRARY_PATH'] = os.environ["APP_PATH"] + "/pcilib" + + random.seed() + #create pcilib_instance + self.pcilib = pcipywrap.Pcipywrap(device, model) + self.num_threads = num_threads + self.write_percentage = write_percentage + self.register = register + self.server_message_delay = server_message_delay + self.server_port = server_port + self.server_host = server_host + + def testThreadSafeReadWrite(self): + def threadFunc(): + if random.randint(0, 100) >= (self.write_percentage * 100): + ret = self.pcilib.get_property('/test/prop2') + print self.register, ':', ret + del ret + else: + val = random.randint(0, 65536) + print 'set value:', val + self.pcilib.write_register(val, self.register) + try: + while(1): + thread_list = [threading.Thread(target=threadFunc) for i in range(0, self.num_threads)] + for i in range(0, self.num_threads): + thread_list[i].start() + for i in range(0, self.num_threads): + thread_list[i].join() + print 'cycle done' + except KeyboardInterrupt: + print 'testing done' + pass + + def testMemoryLeak(self): + try: + while(1): + #print self.pcilib.create_pcilib_instance('/dev/fpga0','test_pywrap') + + print self.pcilib.get_property_list('/test') + print self.pcilib.get_register_info('test_prop1') + #print self.pcilib.get_registers_list(); + + #print self.pcilib.read_register('reg1') + #print self.pcilib.write_register(12, 'reg1') + + #print self.pcilib.get_property('/test/prop2') + #print self.pcilib.set_property(12, '/test/prop2') + except KeyboardInterrupt: + print 'testing done' + pass + + def testServer(self): + url = str(self.server_host + ':' + str(self.server_port)) + headers = {'content-type': 'application/json'} + payload =[{'com': 'open', 'data2' : '12341'}, + #{'command': 'open', 'device' : '/dev/fpga0', 'model': 'test_pywrap'}, + {'command': 'help'}, + {'command': 'get_registers_list'}, + {'command': 'get_register_info', 'reg': 'reg1'}, + {'command': 'get_property_list'}, + {'command': 'read_register', 'reg': 'reg1'}, + {'command': 'write_register', 'reg': 'reg1'}, + {'command': 'get_property', 'prop': '/test/prop2'}, + {'command': 'set_property', 'prop': '/test/prop2'}] + + def sendRandomMessage(): + message_number = random.randint(1, len(payload) - 1) + print 'message number: ', message_number + payload[message_number]['value'] = random.randint(0, 65535) + r = requests.get(url, data=json.dumps(payload[message_number]), headers=headers) + print json.dumps(r.json(), sort_keys=True, indent=4, separators=(',', ': ')) + + try: + r = requests.get(url, data=json.dumps(payload[1]), headers=headers) + print json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': ')) + + while(1): + time.sleep(self.server_message_delay) + thread_list = [threading.Thread(target=sendRandomMessage) for i in range(0, self.num_threads)] + for i in range(0, self.num_threads): + thread_list[i].start() + for i in range(0, self.num_threads): + thread_list[i].join() + print 'cycle done' + + except KeyboardInterrupt: + print 'testing done' + pass + +if __name__ == '__main__': + lib = test_pcipywrap('/dev/fpga0','test_pywrap', num_threads = 150, + write_percentage = 0.1, register = 'test_prop2',server_host = 'http://localhost', server_port = 12412, + server_message_delay = 0) + lib.testThreadSafeReadWrite() + |