/alps/pcitool

To get this branch, use:
bzr branch http://suren.me/webbzr/alps/pcitool

« back to all changes in this revision

Viewing changes to pywrap/test_pcipywrap.py

  • Committer: Suren A. Chilingaryan
  • Date: 2016-02-23 06:20:33 UTC
  • mfrom: (346.1.18 pcitool)
  • Revision ID: csa@suren.me-20160223062033-mz8qkpm1a2oioveb
Merge Python scripting support from Vasiliy Chernov

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import threading
 
2
import pcipywrap
 
3
import random
 
4
import os
 
5
import json
 
6
import requests
 
7
import time
 
8
 
 
9
class test_pcipywrap():
 
10
   def __init__(self, device, model, num_threads = 150,
 
11
   write_percentage = 0.1, register = 'test_prop2',
 
12
   server_host = 'http://localhost', server_port = 12412,
 
13
   server_message_delay = 0):
 
14
          #initialize enviroment variables
 
15
      if not 'APP_PATH' in os.environ:
 
16
         APP_PATH = ''
 
17
         file_dir = os.path.dirname(os.path.abspath(__file__))
 
18
         APP_PATH = str(os.path.abspath(file_dir + '/../..'))
 
19
         os.environ["APP_PATH"] = APP_PATH
 
20
       
 
21
      if not 'PCILIB_MODEL_DIR' in os.environ:   
 
22
         os.environ['PCILIB_MODEL_DIR'] = os.environ["APP_PATH"] + "/xml"
 
23
      if not 'LD_LIBRARY_PATH' in os.environ: 
 
24
         os.environ['LD_LIBRARY_PATH'] = os.environ["APP_PATH"] + "/pcilib"
 
25
   
 
26
      random.seed()
 
27
      #create pcilib_instance
 
28
      self.pcilib = pcipywrap.Pcipywrap(device, model)
 
29
      self.num_threads = num_threads
 
30
      self.write_percentage = write_percentage
 
31
      self.register = register
 
32
      self.server_message_delay = server_message_delay
 
33
      self.server_port = server_port
 
34
      self.server_host = server_host
 
35
    
 
36
   def testThreadSafeReadWrite(self):
 
37
      def threadFunc():
 
38
         if random.randint(0, 100) >= (self.write_percentage * 100):
 
39
            ret = self.pcilib.get_property('/test/prop2')
 
40
            print self.register, ':', ret
 
41
            del ret
 
42
         else:
 
43
            val = random.randint(0, 65536)
 
44
            print 'set value:', val
 
45
            self.pcilib.write_register(val, self.register)
 
46
      try:
 
47
         while(1):
 
48
            thread_list = [threading.Thread(target=threadFunc) for i in range(0, self.num_threads)]
 
49
            for i in range(0, self.num_threads):
 
50
               thread_list[i].start()
 
51
            for i in range(0, self.num_threads):
 
52
               thread_list[i].join()
 
53
            print 'cycle done'
 
54
      except KeyboardInterrupt:
 
55
         print 'testing done'
 
56
         pass
 
57
   
 
58
   def testMemoryLeak(self):
 
59
      try:
 
60
         while(1):
 
61
                    #print self.pcilib.create_pcilib_instance('/dev/fpga0','test_pywrap')
 
62
          
 
63
            print self.pcilib.get_property_list('/test')
 
64
            print self.pcilib.get_register_info('test_prop1')
 
65
            #print self.pcilib.get_registers_list();
 
66
         
 
67
            #print self.pcilib.read_register('reg1')
 
68
            #print self.pcilib.write_register(12, 'reg1')
 
69
         
 
70
            #print self.pcilib.get_property('/test/prop2')
 
71
            #print self.pcilib.set_property(12, '/test/prop2')
 
72
      except KeyboardInterrupt:
 
73
         print 'testing done'
 
74
         pass
 
75
 
 
76
   def testServer(self):
 
77
      url = str(self.server_host + ':' + str(self.server_port))
 
78
      headers = {'content-type': 'application/json'}
 
79
      payload =[{'com': 'open', 'data2' : '12341'},
 
80
      #{'command': 'open', 'device' : '/dev/fpga0', 'model': 'test_pywrap'},
 
81
      {'command': 'help'},
 
82
      {'command': 'get_registers_list'},
 
83
      {'command': 'get_register_info', 'reg': 'reg1'},
 
84
      {'command': 'get_property_list'},
 
85
      {'command': 'read_register', 'reg': 'reg1'},
 
86
      {'command': 'write_register', 'reg': 'reg1'},
 
87
      {'command': 'get_property', 'prop': '/test/prop2'},
 
88
      {'command': 'set_property', 'prop': '/test/prop2'}]
 
89
      
 
90
      def sendRandomMessage():
 
91
         message_number = random.randint(1, len(payload) - 1)
 
92
         print 'message number: ', message_number
 
93
         payload[message_number]['value'] =  random.randint(0, 65535)
 
94
         r = requests.get(url, data=json.dumps(payload[message_number]), headers=headers)
 
95
         print json.dumps(r.json(), sort_keys=True, indent=4, separators=(',', ': '))
 
96
      
 
97
      try:    
 
98
         r = requests.get(url, data=json.dumps(payload[1]), headers=headers)
 
99
         print json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': '))
 
100
   
 
101
         while(1):
 
102
            time.sleep(self.server_message_delay)
 
103
            thread_list = [threading.Thread(target=sendRandomMessage) for i in range(0, self.num_threads)]
 
104
            for i in range(0, self.num_threads):
 
105
               thread_list[i].start()
 
106
            for i in range(0, self.num_threads):
 
107
               thread_list[i].join()
 
108
            print 'cycle done'
 
109
            
 
110
      except KeyboardInterrupt:
 
111
         print 'testing done'
 
112
         pass
 
113
 
 
114
if __name__ == '__main__':
 
115
   lib = test_pcipywrap('/dev/fpga0','test_pywrap', num_threads = 150,
 
116
   write_percentage = 0.1, register = 'test_prop2',server_host = 'http://localhost', server_port = 12412,
 
117
   server_message_delay = 0)
 
118
   lib.testThreadSafeReadWrite()
 
119