#!/usr/bin/env python
import sys
import asyncio
import websockets
import serial
import serial.tools.list_ports
import json
queue = asyncio.Queue()
analabels = [
['none', 'none', 'none', 'none', 'none'],
['frequency', 'duration', 'delay', 'ramp ratio', 'pulse width'],
['frequency', 'ramp time', 'none', 'none', 'pulse width'],
['frequency', 'burst rate', 'duty cycle', 'ramp ratio', 'pulse width'],
['frequency', 'ramp time', 'on time', 'off time', 'pulse width'],
['frequency', 'none', 'none', 'period', 'pulse width'],
['frequency 1', 'frequency 2', 'period', 'none', 'pulse width'],
['frequency 1', 'frequency 2', 'frequency period', 'amplitude period', 'pulse width'],
['frequency 1', 'frequency 2', 'period', 'none', 'pulse width'],
['frequency 1', 'frequency 2', 'min. amplitude', 'pulse width 1', 'pulse width 2'],
['frequency', 'none', 'base amplitude', 'period', 'pulse width']
]
anavalues = [
[0, 0, 0, 0, 0],
[454, 93, 0, 512, 342],
[0, 0, 0, 0, 342],
[454, 762, 512, 512, 342],
[454, 46, 47, 47, 342],
[454, 0, 0, 510, 342],
[454, 454, 510, 0, 342],
[454, 454, 510, 510, 342],
[454, 454, 93, 0, 342],
[200, 600, 511, 0, 1023],
[454, 0, 511, 511, 342]
]
# construct JSON messages
#
def set_control(state, dum):
return json.dumps({"element": "control", "value": ["Connect","Disconnect"][int(state)]})
def set_analog(index, value):
return json.dumps({"element": "analog"+index, "value": value})
def set_start(state, dum):
asyncio.async(queue.put(json.dumps(
{"element": "report", "value": ["idle","pulsing"][int(state)], "type": "label"})))
return json.dumps({"element": "start", "value": ["Start","Stop"][int(state)]})
def set_pvariant(variant, dum):
return json.dumps({"element": "pvariant", "value": variant})
def set_mode(mode, dum):
return json.dumps({"element": "mode", "value": mode})
elements = {
0: set_control,
1: set_analog,
2: set_start,
3: set_pvariant,
5: set_mode
}
# value display
#
def dpy_raw(value): ## raw input
return str(value)
def dpy_freq(value): ## pulse frequency: 2-400 Hz
pvalue = value*0.009775171065493646
return "%0.1f Hz"%(4.98*pvalue*pvalue+2)
def dpy_width(value): ## pulse width: 24-400 µs
usec = 24+value*(400-24)/1023+0.5
return "%d µs"%usec
def dpy_ratio(value): ## ratio: 0-100 %
return "%d %%"%int(100.0*value/1023)
def dpy_dur10s1(value): ## time: 0.1-10 s
return "%0.1f s"%(0.1+10*value*0.99/1023)
def dpy_dur20s(value): ## time: 0-20 s
return "%0.1f s"%(20*value/1023)
def dpy_brate(value): ## burst rate: 0.07-10 Hz
cp = (1023-value)>>3
period = (cp*cp/(1075))+0.1
return "%0.2f Hz (%0.1f s period)"%(1./period, period)
def dpy_dur10s(value): ## time: 0-10 s
return "%0.1f s"%(10*value/1023)
def dpy_dur10m(value): ## time: 0-10 min
# return "%0.1f min"%(10*value/1023)
seconds = 600*value/1023
return "%dm%2.0ds"%(seconds/60,seconds%60)
def dpy_dur2s1(value): ## time: 0.1-2 s
return "%0.1f s"%(0.1+2*value*0.95/1023)
def dpy_dur5s1(value): ## time: 0.1-5 s
return "%0.1f s"%(0.1+5*value*0.98/1023)
dpyfuncs = [
[dpy_raw, dpy_raw, dpy_raw, dpy_raw, dpy_raw], # 0
[dpy_freq, dpy_dur10s1, dpy_dur20s, dpy_ratio, dpy_width], # 1
[dpy_freq, dpy_dur10m, dpy_raw, dpy_raw, dpy_width], # 2
[dpy_freq, dpy_brate, dpy_ratio, dpy_ratio, dpy_width], # 3
[dpy_freq, dpy_dur10s1, dpy_dur10s, dpy_dur10s, dpy_width], # 4
[dpy_freq, dpy_raw, dpy_raw, dpy_dur2s1, dpy_width], # 5
[dpy_freq, dpy_freq, dpy_dur5s1, dpy_raw, dpy_width], # 6
[dpy_freq, dpy_freq, dpy_dur5s1, dpy_dur2s1, dpy_width], # 7
[dpy_freq, dpy_freq, dpy_dur10s, dpy_raw, dpy_width], # 8
[dpy_freq, dpy_freq, dpy_ratio, dpy_width, dpy_width], # 9
[dpy_freq, dpy_raw, dpy_ratio, dpy_dur10s, dpy_width] # 10
]
modenames = ["No pulse generation",
"Single shot",
"Constant",
"Burst",
"Ramp",
"Amplitude modulation",
"Frequency modulation",
"Simultaneous amplitude- and frequency modulation",
"Chirp",
"Random pulses",
"Ramp"]
def status_message(message):
message = json.dumps({"element": "dpymode",
"value": message, "type": "label"})
asyncio.async(queue.put(message))
modeindex = 0
def process_command(message, websocket):
global modeindex
command = json.loads(message)
oper = command['function']
index = command['index']
value = command['value']
message = "%s %s %s \r"% (oper, index, value)
try:
ser.write(bytes(message, 'utf-8'))
except:
pass
# mode change: store current mode and adjust slider labels
#
if oper==1 and index==0:
modeindex = int(value)//100
status_message(modenames[modeindex])
for i in range(5):
label = "analabel"+str(i+1)
group = "anagroup"+str(i+1)
text = analabels[modeindex][i]
if text=='none':
message = json.dumps({"element": group, "value": text, "type": "display"})
asyncio.async(queue.put(message))
else:
message = json.dumps({"element": group, "value": "block", "type": "display"})
asyncio.async(queue.put(message))
message = json.dumps({"element": label, "value": text, "type": "label"})
asyncio.async(queue.put(message))
message = json.dumps({"element": "anadis"+str(i+1), "value": " ", "type": "label"})
asyncio.async(queue.put(message))
message = json.dumps({"element": "analog"+str(i+1),
"value": str(anavalues[modeindex][i])})
asyncio.async(queue.put(message))
dvalue = dpyfuncs[modeindex][i](anavalues[modeindex][i]) ## eliminate duplicate code
message = json.dumps({"element": "anadis"+str(i+1), "value": dvalue,
"type": "label" })
asyncio.async(queue.put(message))
message = "1 %s %s \r"% (i+1, anavalues[modeindex][i])
try:
ser.write(bytes(message, 'utf-8'))
except:
pass
# analog input: display value
#
if oper==1 and index>0:
anavalues[modeindex][index-1] = int(value) # store analog value
dvalue = dpyfuncs[modeindex][index-1](int(value))
message = json.dumps({"element": "anadis"+str(index), "value": dvalue, "type": "label" })
asyncio.async(queue.put(message))
# connect: send all parameters
#
if oper==0 and index==1 and ser is not None and ser.is_open:
message = "1 0 %s \r"% (modeindex*100)
ser.write(bytes(message, 'utf-8'))
for i in range(1,6):
message = "1 %s %s \r"% (i, anavalues[modeindex][i-1])
ser.write(bytes(message, 'utf-8'))
ser = None
async def arduino_connect():
global ser
loop = asyncio.get_event_loop()
while True:
await asyncio.sleep(1.0)
if ser is None or not ser.is_open:
for port in serial.tools.list_ports.grep('USB'):
print(port.device)
try:
ser = serial.Serial(port.device, timeout=0, baudrate=115200)
loop.add_reader(ser.fileno(), from_arduino, ser)
print("Arduino on")
status_message("Arduino ready")
break
except:
pass
async def client_handler(websocket):
while True:
message = await websocket.recv()
process_command(message, websocket)
async def arduino_handler(websocket):
while True:
message = await queue.get()
if message=="exit":
sys.exit()
await websocket.send(message)
async def handler(websocket, path):
client_task= asyncio.ensure_future(client_handler(websocket))
arduino_task = asyncio.ensure_future(arduino_handler(websocket))
connect_task = asyncio.ensure_future(arduino_connect())
done, pending = await asyncio.wait([client_task, arduino_task,
connect_task], return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
ser.write(b'0 0 0 \r') # give back control
print("Client gone")
line = b''
def from_arduino(ser):
global line
try:
inchar = ser.read()
except:
print("Arduino off")
asyncio.get_event_loop().remove_reader(ser.fileno())
ser.close()
status_message("Arduino disconnected")
line = b''
return
if inchar==b'\r' or inchar==b'\n':
if len(line)>0:
func, index, value = str(line, 'utf-8').split()
line = b''
message = elements[int(func)](index, value)
asyncio.async(queue.put(message))
else:
line = b''
else:
line += inchar
server = websockets.serve(handler, '0.0.0.0', 5678)
loop = asyncio.get_event_loop()
loop.run_until_complete(server)
loop.run_forever()