from thread import * import os import random import re import string class Calibrate: cs = allocate_lock() state = 0 state_heading = ("calibrating", "ready", "calibrated", "verifying", "verified") auto = 1 # pipes to octave process octave_in = 0 octave_out = 0 def ready(self): return self.state not in (1, 3) # Protected by lock, keyed by mode raw_data = {} verify_mm = [] verify_data = {} # Not protected by lock motor = None sensor = None def __init__(self, motor, sensor): self.cs.acquire() self.motor = motor self.sensor = sensor if os.name == "posix": (self.octave_in, self.octave_out)=os.popen2('octave --silent --no-history --path :../Matlab:../Matlab/Octave') elif os.name == "nt": (self.octave_in, self.octave_out)=os.popen2(r'H:\PROGRAM\GNU\OCTAVE\\bin\bash.exe" --login -c "rxvt -e octave --silent --no-history --path :/cygdrive/c/vms/Matlab:/cygdrive/c/vms/Matlab/Octave') self.octave_in.write("gnuplot_has_frames=1;\n") self.octave_in.flush() self.cs.release() def calibrate_range_mm(self): return range(self.motor.home_mm, self.motor.alt_mm, 10) number_re = re.compile(r"([\-]?[\d]*[\.[\d]+]?)") def calibrate_thread(self): print "calibrate_thread" self.cs.acquire() assert(self.state != 1) self.state = 1 sensor = self.sensor modes = sensor.modes() self.raw_data.clear() self.verify_data.clear() self.statistics = {} for mode in modes: self.raw_data[mode] = [] self.statistics[mode] = {} self.cs.release() # Always start measurements from home self.motor.home() self.motor.wait() for x_mm in self.calibrate_range_mm(): self.motor.goto_mm(x_mm) self.motor.wait() for mode in modes: self.cs.acquire() raw_data = sensor.raw_measure(mode) self.statistics[mode]['Y pix min'] = reduce((lambda m, (_d1, y, _d2): min(m, y)), raw_data, 9999) self.statistics[mode]['Y pix max'] = reduce((lambda m, (_d1, y, _d2): max(m, y)), raw_data, -1) raw_data.sort() # in X order, not aproximately Y... self.raw_data[mode].append( raw_data ) self.cs.release() for mode in modes: self.cs.acquire() raw_data = self.raw_data[mode] self.cs.release() write_octave_data(self.octave_in, self.calibrate_range_mm(), raw_data) self.octave_in.write("kalibrering\n") self.octave_in.flush() calib_params_x = read_octave_data(self.octave_out, "XCoeff:", self.number_re, 11) print "calib_params_x", calib_params_x calib_params_y = read_octave_data(self.octave_out, "YCoeff:", self.number_re, 11) print "calib_params_y", calib_params_y # Two empty lines self.octave_out.readline() self.octave_out.readline() # Four lines with statistics self.statistics[mode]['X err mean'] = float(self.number_re.search(self.octave_out.readline()).group(0)) self.statistics[mode]['X err max'] = float(self.number_re.search(self.octave_out.readline()).group(0)) self.statistics[mode]['Y err mean'] = float(self.number_re.search(self.octave_out.readline()).group(0)) self.statistics[mode]['Y err max'] = float(self.number_re.search(self.octave_out.readline()).group(0)) print self.statistics sensor.set_calibration(mode, calib_params_x, calib_params_y) if not self.auto: self.cs.acquire() self.state = 2 self.cs.release() else: # auto continue, no not pass state 2 ! self.verify_thread() def verify_thread(self): print "verify_thread" self.cs.acquire() self.state = 3 sensor = self.sensor modes = sensor.modes() self.verify_data.clear() for mode in modes: self.verify_data[mode] = [] self.cs.release() self.cs.acquire() self.verify_mm = [0] # aways verify at 0 mm for pos in range(3): self.verify_mm.append(random.randrange(self.motor.home_mm, self.motor.alt_mm)) self.verify_mm.sort() self.cs.release() # Always start measurements from home self.motor.home() self.motor.wait() for x_mm in self.verify_mm: self.motor.goto_mm(x_mm) self.motor.wait() for mode in modes: self.cs.acquire() self.verify_data[mode].append(sensor.measure(mode)) self.cs.release() self.cs.acquire() self.state = 4 self.cs.release() def html_action(self, action, query): self.cs.acquire() for assignment in query.split('&'): (var, value) = assignment.split('=') if var != "action": exec "self.set_" + var + "(value)" if action == "CALIBRATE": if self.state in (0, 2, 4): start_new_thread(self.calibrate_thread,()) elif action == "VERIFY": if self.state in (0, 2, 4): start_new_thread(self.verify_thread,()) elif action == "REFRESH": pass self.cs.release() def html_page(self, f): # self.cs.acquire() if self.state in (1,2,3): f.write('') f.write('\n') # from HEAD to BODY f.write('
\n') f.write('| ') labels = self.statistics[modes[0]].keys() labels.sort() self.write_list(f, ' | ', labels) for mode in modes: f.write(' |
|---|---|
| ' + str(mode)) dict = self.statistics[mode] values = map((lambda label, D=dict: D[label]), labels) self.write_list(f, ' | ', values) f.write(' |
| mm') self.write_list(f, ' | ', self.calibrate_range_mm()) for mode in modes: f.write(' |
|---|---|
| ' + str(mode)) self.write_list(f, ' | ', self.raw_data[mode],
(lambda sub: string.join(map(str, sub),', '))) f.write(' |
| mm') self.write_list(f, ' | ', self.verify_mm) for mode in modes: f.write(' |
|---|---|
| ' + str(mode)) self.write_list(f, ' | ', self.verify_data[mode],
(lambda sub: string.join(map(str, sub),', '))) # self.cs.release() def write_list(self, f, delimiter, item_list, item_formater=str): for item in item_list: f.write(delimiter) f.write(item_formater(item)) # eval methods def set_auto(self, mode): if mode=="cvr": self.auto = 1 elif mode == "manual": self.auto = 0 else: print "Warning: Unexpected automode:", mode self.auto = 0 import sys # used in debug code def write_octave_data(to, xs_mm, raw_data): # TODO remove if random.randrange(2): # 0 or 1 to.write("xycalib_pa_prototyp3\n") else: to.write("xycalib_pa_prototyp4\n") to.flush() to = sys.stdout to.write("cal = [\n") for ix in range(len(xs_mm)): x_mm = xs_mm[ix] measured = raw_data[ix] y_pins = y_range( map((lambda (x,y,w): w), measured) ) # object at y_mm = 0 is wider. ys_mm = map((lambda y: 20*y), y_pins) for iy in range(len(ys_mm)): m = measured[iy] y_mm = ys_mm[iy] to.write("%d %d %d %d %d;\n" % (m[0], m[1], m[2], x_mm, y_mm)) to.write("];\n") to.flush() return def read_octave_data(rd, label, rexp, lines): ret = [] read = rd.readline() while read.find(label) == -1: read = rd.readline() for ix in range(lines): read = rd.readline() print '<', read, '>' match = rexp.search(read) assert match, "Expected number not found in '" + str(read) +"'" ret.append(float(match.group(0))) return ret def y_range(ls): (rd, rl) = y_range_(ls[0], 1, ls[1:]) return [rd] + rl def y_range_(max, maxd, ls): if ls == []: return (maxd-1, []) else: if ls[0] > max: (rd, rl) = y_range_(ls[0], 1, ls[1:]) else: (rd, rl) = y_range_(max, maxd+1, ls[1:]) return (rd-1, [rd] + rl) |