#!/usr/bin/env python import sys import asyncio import websockets import serial import serial.tools.list_ports import json queue = asyncio.Queue() analabels = [ ['none', 'none', 'none', 'none', 'none'], ['frequency', 'duration', 'delay', 'ramp ratio', 'pulse width'], ['frequency', 'ramp time', 'none', 'none', 'pulse width'], ['frequency', 'burst rate', 'duty cycle', 'ramp ratio', 'pulse width'], ['frequency', 'ramp time', 'on time', 'off time', 'pulse width'], ['frequency', 'none', 'none', 'period', 'pulse width'], ['frequency 1', 'frequency 2', 'period', 'none', 'pulse width'], ['frequency 1', 'frequency 2', 'frequency period', 'amplitude period', 'pulse width'], ['frequency 1', 'frequency 2', 'period', 'none', 'pulse width'], ['frequency 1', 'frequency 2', 'min. amplitude', 'pulse width 1', 'pulse width 2'], ['frequency', 'none', 'base amplitude', 'period', 'pulse width'] ] anavalues = [ [0, 0, 0, 0, 0], [454, 93, 0, 512, 342], [0, 0, 0, 0, 342], [454, 762, 512, 512, 342], [454, 46, 47, 47, 342], [454, 0, 0, 510, 342], [454, 454, 510, 0, 342], [454, 454, 510, 510, 342], [454, 454, 93, 0, 342], [200, 600, 511, 0, 1023], [454, 0, 511, 511, 342] ] # construct JSON messages # def set_control(state, dum): return json.dumps({"element": "control", "value": ["Connect","Disconnect"][int(state)]}) def set_analog(index, value): return json.dumps({"element": "analog"+index, "value": value}) def set_start(state, dum): asyncio.async(queue.put(json.dumps( {"element": "report", "value": ["idle","pulsing"][int(state)], "type": "label"}))) return json.dumps({"element": "start", "value": ["Start","Stop"][int(state)]}) def set_pvariant(variant, dum): return json.dumps({"element": "pvariant", "value": variant}) def set_mode(mode, dum): return json.dumps({"element": "mode", "value": mode}) elements = { 0: set_control, 1: set_analog, 2: set_start, 3: set_pvariant, 5: set_mode } # value display # def dpy_raw(value): ## raw input return str(value) def dpy_freq(value): ## pulse frequency: 2-400 Hz pvalue = value*0.009775171065493646 return "%0.1f Hz"%(4.98*pvalue*pvalue+2) def dpy_width(value): ## pulse width: 24-400 µs usec = 24+value*(400-24)/1023+0.5 return "%d µs"%usec def dpy_ratio(value): ## ratio: 0-100 % return "%d %%"%int(100.0*value/1023) def dpy_dur10s1(value): ## time: 0.1-10 s return "%0.1f s"%(0.1+10*value*0.99/1023) def dpy_dur20s(value): ## time: 0-20 s return "%0.1f s"%(20*value/1023) def dpy_brate(value): ## burst rate: 0.07-10 Hz cp = (1023-value)>>3 period = (cp*cp/(1075))+0.1 return "%0.2f Hz (%0.1f s period)"%(1./period, period) def dpy_dur10s(value): ## time: 0-10 s return "%0.1f s"%(10*value/1023) def dpy_dur10m(value): ## time: 0-10 min # return "%0.1f min"%(10*value/1023) seconds = 600*value/1023 return "%dm%2.0ds"%(seconds/60,seconds%60) def dpy_dur2s1(value): ## time: 0.1-2 s return "%0.1f s"%(0.1+2*value*0.95/1023) def dpy_dur5s1(value): ## time: 0.1-5 s return "%0.1f s"%(0.1+5*value*0.98/1023) dpyfuncs = [ [dpy_raw, dpy_raw, dpy_raw, dpy_raw, dpy_raw], # 0 [dpy_freq, dpy_dur10s1, dpy_dur20s, dpy_ratio, dpy_width], # 1 [dpy_freq, dpy_dur10m, dpy_raw, dpy_raw, dpy_width], # 2 [dpy_freq, dpy_brate, dpy_ratio, dpy_ratio, dpy_width], # 3 [dpy_freq, dpy_dur10s1, dpy_dur10s, dpy_dur10s, dpy_width], # 4 [dpy_freq, dpy_raw, dpy_raw, dpy_dur2s1, dpy_width], # 5 [dpy_freq, dpy_freq, dpy_dur5s1, dpy_raw, dpy_width], # 6 [dpy_freq, dpy_freq, dpy_dur5s1, dpy_dur2s1, dpy_width], # 7 [dpy_freq, dpy_freq, dpy_dur10s, dpy_raw, dpy_width], # 8 [dpy_freq, dpy_freq, dpy_ratio, dpy_width, dpy_width], # 9 [dpy_freq, dpy_raw, dpy_ratio, dpy_dur10s, dpy_width] # 10 ] modenames = ["No pulse generation", "Single shot", "Constant", "Burst", "Ramp", "Amplitude modulation", "Frequency modulation", "Simultaneous amplitude- and frequency modulation", "Chirp", "Random pulses", "Ramp"] def status_message(message): message = json.dumps({"element": "dpymode", "value": message, "type": "label"}) asyncio.async(queue.put(message)) modeindex = 0 def process_command(message, websocket): global modeindex command = json.loads(message) oper = command['function'] index = command['index'] value = command['value'] message = "%s %s %s \r"% (oper, index, value) try: ser.write(bytes(message, 'utf-8')) except: pass # mode change: store current mode and adjust slider labels # if oper==1 and index==0: modeindex = int(value)//100 status_message(modenames[modeindex]) for i in range(5): label = "analabel"+str(i+1) group = "anagroup"+str(i+1) text = analabels[modeindex][i] if text=='none': message = json.dumps({"element": group, "value": text, "type": "display"}) asyncio.async(queue.put(message)) else: message = json.dumps({"element": group, "value": "block", "type": "display"}) asyncio.async(queue.put(message)) message = json.dumps({"element": label, "value": text, "type": "label"}) asyncio.async(queue.put(message)) message = json.dumps({"element": "anadis"+str(i+1), "value": " ", "type": "label"}) asyncio.async(queue.put(message)) message = json.dumps({"element": "analog"+str(i+1), "value": str(anavalues[modeindex][i])}) asyncio.async(queue.put(message)) dvalue = dpyfuncs[modeindex][i](anavalues[modeindex][i]) ## eliminate duplicate code message = json.dumps({"element": "anadis"+str(i+1), "value": dvalue, "type": "label" }) asyncio.async(queue.put(message)) message = "1 %s %s \r"% (i+1, anavalues[modeindex][i]) try: ser.write(bytes(message, 'utf-8')) except: pass # analog input: display value # if oper==1 and index>0: anavalues[modeindex][index-1] = int(value) # store analog value dvalue = dpyfuncs[modeindex][index-1](int(value)) message = json.dumps({"element": "anadis"+str(index), "value": dvalue, "type": "label" }) asyncio.async(queue.put(message)) # connect: send all parameters # if oper==0 and index==1 and ser is not None and ser.is_open: message = "1 0 %s \r"% (modeindex*100) ser.write(bytes(message, 'utf-8')) for i in range(1,6): message = "1 %s %s \r"% (i, anavalues[modeindex][i-1]) ser.write(bytes(message, 'utf-8')) ser = None async def arduino_connect(): global ser loop = asyncio.get_event_loop() while True: await asyncio.sleep(1.0) if ser is None or not ser.is_open: for port in serial.tools.list_ports.grep('USB'): print(port.device) try: ser = serial.Serial(port.device, timeout=0, baudrate=115200) loop.add_reader(ser.fileno(), from_arduino, ser) print("Arduino on") status_message("Arduino ready") break except: pass async def client_handler(websocket): while True: message = await websocket.recv() process_command(message, websocket) async def arduino_handler(websocket): while True: message = await queue.get() if message=="exit": sys.exit() await websocket.send(message) async def handler(websocket, path): client_task= asyncio.ensure_future(client_handler(websocket)) arduino_task = asyncio.ensure_future(arduino_handler(websocket)) connect_task = asyncio.ensure_future(arduino_connect()) done, pending = await asyncio.wait([client_task, arduino_task, connect_task], return_when=asyncio.FIRST_COMPLETED) for task in pending: task.cancel() ser.write(b'0 0 0 \r') # give back control print("Client gone") line = b'' def from_arduino(ser): global line try: inchar = ser.read() except: print("Arduino off") asyncio.get_event_loop().remove_reader(ser.fileno()) ser.close() status_message("Arduino disconnected") line = b'' return if inchar==b'\r' or inchar==b'\n': if len(line)>0: func, index, value = str(line, 'utf-8').split() line = b'' message = elements[int(func)](index, value) asyncio.async(queue.put(message)) else: line = b'' else: line += inchar server = websockets.serve(handler, '0.0.0.0', 5678) loop = asyncio.get_event_loop() loop.run_until_complete(server) loop.run_forever()