/alps/pcitool

To get this branch, use:
bzr branch http://suren.me/webbzr/alps/pcitool

« back to all changes in this revision

Viewing changes to pywrap/test_pcilib.py

  • Committer: Suren A. Chilingaryan
  • Date: 2016-03-02 22:36:19 UTC
  • mfrom: (346.1.35 pcitool)
  • Revision ID: csa@suren.me-20160302223619-r1zd62x6kwbyd321
Further improvements of Python scripting and web-interface API for register manipulations by Vasiliy Chernov

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
import threading
2
 
import pcipywrap
 
2
import pcilib
3
3
import random
4
4
import os
5
5
import json
7
7
import time
8
8
from optparse import OptionParser, OptionGroup
9
9
 
10
 
class test_pcipywrap():
 
10
class test_pcilib():
11
11
   def __init__(self, 
12
12
                device, 
13
13
                model, 
35
35
      #create pcilib_instance
36
36
      self.device = device
37
37
      self.model = model
38
 
      self.pcilib = pcipywrap.Pcipywrap(device, model)
 
38
      self.pcilib = pcilib.pcilib(device, model)
39
39
      self.num_threads = num_threads
40
40
      self.write_percentage = write_percentage
41
41
      self.register = register
55
55
                {'command': 'unlock_global'},
56
56
                {'command': 'help'}]  
57
57
      r = requests.get(url, data=json.dumps(payload[message]), headers=headers)
58
 
      print json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': '))
 
58
      print(json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': ')))
59
59
    
60
60
   def testThreadSafeReadWrite(self):
61
61
      def threadFunc():
62
62
         if random.randint(0, 100) >= (self.write_percentage * 100):
63
63
            ret = self.pcilib.get_property(self.prop)
64
 
            print self.register, ':', ret
 
64
            print(self.register, ':', ret)
65
65
            del ret
66
66
         else:
67
67
            val = random.randint(0, 65536)
68
 
            print 'set value:', val
 
68
            print('set value:', val)
69
69
            self.pcilib.set_property(val, self.prop)
70
70
      try:
71
71
         while(1):
74
74
               thread_list[i].start()
75
75
            for i in range(0, self.num_threads):
76
76
               thread_list[i].join()
77
 
            print 'cycle done'
 
77
            print('cycle done')
78
78
      except KeyboardInterrupt:
79
 
         print 'testing done'
 
79
         print('testing done')
80
80
         pass
81
81
   
82
82
   def testMemoryLeak(self):
83
83
      try:
84
84
         while(1):
85
 
            val = random.randint(0, 8096)
86
 
            self.pcilib = pcipywrap.Pcipywrap(self.device, self.model)
87
 
            print self.pcilib.get_property_list(self.branch)
88
 
            print self.pcilib.get_register_info(self.register)
89
 
            print self.pcilib.get_registers_list();
90
 
            print self.pcilib.read_register(self.register)
91
 
            print self.pcilib.write_register(val, self.register)
92
 
            print self.pcilib.get_property(self.prop)
93
 
            print self.pcilib.set_property(val, self.prop)
 
85
            val = long(random.randint(0, 8096))
 
86
            self.pcilib = pcilib.pcilib(self.device, self.model)
 
87
            print(self.pcilib.get_property_list(self.branch))
 
88
            print(self.pcilib.get_register_info(self.register))
 
89
            print(self.pcilib.get_registers_list())
 
90
            print(self.pcilib.write_register(val, self.register))
 
91
            print(self.pcilib.read_register(self.register))
 
92
            print(self.pcilib.set_property(val, self.prop))
 
93
            print(self.pcilib.get_property(self.prop))
94
94
      except KeyboardInterrupt:
95
 
         print 'testing done'
 
95
         print('testing done')
96
96
         pass
97
97
 
98
98
   def testServer(self):
111
111
      
112
112
      def sendRandomMessage():
113
113
         message_number = random.randint(1, len(payload) - 1)
114
 
         print 'message number: ', message_number
 
114
         print('message number: ', message_number)
115
115
         payload[message_number]['value'] =  random.randint(0, 8096)
116
116
         r = requests.get(url, data=json.dumps(payload[message_number]), headers=headers)
117
 
         print json.dumps(r.json(), sort_keys=True, indent=4, separators=(',', ': '))
118
 
      
 
117
         if(r.headers['content-type'] == 'application/json'):
 
118
            print(json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': ')))
 
119
         else:
 
120
            print(r.content)      
119
121
      try:    
120
122
         r = requests.get(url, data=json.dumps(payload[1]), headers=headers)
121
 
         print json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': '))
 
123
         if(r.headers['content-type'] == 'application/json'):
 
124
            print(json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': ')))
 
125
         else:
 
126
            print(r.content)
122
127
   
123
128
         while(1):
124
129
            time.sleep(self.server_message_delay)
127
132
               thread_list[i].start()
128
133
            for i in range(0, self.num_threads):
129
134
               thread_list[i].join()
130
 
            print 'cycle done'
 
135
            print('cycle done')
131
136
            
132
137
      except KeyboardInterrupt:
133
 
         print 'testing done'
 
138
         print('testing done')
134
139
         pass
135
140
 
136
141
if __name__ == '__main__':
140
145
                     type="string", dest="device", default=str('/dev/fpga0'),
141
146
                     help="FPGA device (/dev/fpga0)")                     
142
147
   parser.add_option("-m", "--model",  action="store",
143
 
                     type="string", dest="model", default=None,
 
148
                     type="string", dest="model", default=str('test'),
144
149
                     help="Memory model (autodetected)")
145
150
   parser.add_option("-t", "--threads",  action="store",
146
151
                     type="int", dest="threads", default=150,
232
237
   opts = parser.parse_args()[0]
233
238
 
234
239
   #create pcilib test instance
235
 
   lib = test_pcipywrap(opts.device,
 
240
   lib = test_pcilib(opts.device,
236
241
                        opts.model,
237
242
                        num_threads = opts.threads,
238
243
                        write_percentage = opts.write_percentage,