generated from MBI-Div-B/pytango-TemplateDeviceServer
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
6b67d57
commit 880a190
Showing
4 changed files
with
266 additions
and
132 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
""" Driver for Brooks s-protocol """ | ||
import time | ||
import struct | ||
import logging | ||
import serial | ||
from six import b, indexbytes | ||
|
||
class Brooks(object): | ||
""" Driver for Brooks s-protocol """ | ||
def __init__(self, device, port='/dev/ttyUSB0'): | ||
self.ser = serial.Serial(port, 19200) | ||
self.ser.parity = serial.PARITY_ODD | ||
self.ser.bytesize = serial.EIGHTBITS | ||
self.ser.stopbits = serial.STOPBITS_ONE | ||
deviceid = self.comm('8280000000000b06' | ||
+ self.pack(device[-8:])) | ||
manufactor_code = '0a' | ||
device_type = deviceid[12:14] | ||
long_address = manufactor_code + device_type + deviceid[-6:] | ||
self.long_address = long_address | ||
|
||
def pack(self, input_string): | ||
""" Turns a string in packed-ascii format """ | ||
#This function lacks basic error checking.... | ||
klaf = '' | ||
for s in input_string: | ||
klaf += bin((ord(s) % 128) % 64)[2:].zfill(6) | ||
result = '' | ||
for i in range(0, 6): | ||
result = result + hex(int('' + klaf[i * 8:i * 8 + 8], | ||
2))[2:].zfill(2) | ||
return result | ||
|
||
def crc(self, command): | ||
""" Calculate crc value of command """ | ||
i = 0 | ||
while command[i:i + 2] == 'FF': | ||
i += 2 | ||
command = command[i:] | ||
n = len(command) | ||
result = 0 | ||
for i in range(0, (n//2)): | ||
byte_string = command[i*2:i*2+2] | ||
byte = int(byte_string, 16) | ||
result = byte ^ result | ||
return hex(result) | ||
|
||
def comm(self, command): | ||
""" Implements low-level details of the s-protocol """ | ||
check = str(self.crc(command)) | ||
check = check[2:].zfill(2) | ||
final_com = 'FFFFFFFF' + command + check | ||
bin_comm = '' | ||
for i in range(0, len(final_com) // 2): | ||
bin_comm += chr(int(final_com[i * 2:i * 2 + 2], 16)) | ||
bin_comm += chr(0) | ||
bytes_for_serial = b(bin_comm) | ||
error = 1 | ||
while (error > 0) and (error < 10): | ||
self.ser.write(bytes_for_serial) | ||
time.sleep(0.2) | ||
s = self.ser.read(self.ser.inWaiting()) | ||
st = '' | ||
for i in range(0, len(s)): | ||
#char = hex(ord(s[i]))[2:].zfill(2) | ||
#char = hex(s[i])[2:].zfill(2) | ||
char = hex(indexbytes(s, i))[2:].zfill(2) | ||
if not char.upper() == 'FF': | ||
st = st + char | ||
try: | ||
# delimiter = st[0:2] | ||
# address = st[2:12] | ||
command = st[12:14] | ||
byte_count = int(st[14:16], 16) | ||
response = st[16:16 + 2 * byte_count] | ||
error = 0 | ||
except ValueError: | ||
error = error + 1 | ||
response = 'Error' | ||
return response | ||
|
||
def read_flow(self): | ||
""" Read the current flow-rate """ | ||
response = self.comm('82' + self.long_address + '0100') | ||
try: # TODO: This should be handled be re-sending command | ||
#status_code = response[0:4] | ||
unit_code = int(response[4:6], 16) | ||
flow_code = response[6:] | ||
byte0 = chr(int(flow_code[0:2], 16)) | ||
byte1 = chr(int(flow_code[2:4], 16)) | ||
byte2 = chr(int(flow_code[4:6], 16)) | ||
byte3 = chr(int(flow_code[6:8], 16)) | ||
flow = struct.unpack('>f', b(byte0 + byte1 + byte2 + byte3)) | ||
value = flow[0] | ||
except ValueError: | ||
value = -1 | ||
unit_code = 171 # Satisfy assertion check, we know what is wrong | ||
# assert unit_code == 171 # Flow unit should always be mL/min | ||
# print('read flow: ' + value) | ||
return value | ||
|
||
def read_full_range(self): | ||
""" | ||
Report the full range of the device | ||
Apparantly this does not work for SLA-series... | ||
""" | ||
response = self.comm('82' + self.long_address + '980106')#Command 152 | ||
# Double check what gas-selection code really means... | ||
# currently 01 is used | ||
# status_code = response[0:4] | ||
unit_code = int(response[4:6], 16) | ||
# assert unit_code == 171 #Flow controller should always be set to mL/min | ||
|
||
flow_code = response[6:] | ||
byte0 = chr(int(flow_code[0:2], 16)) | ||
byte1 = chr(int(flow_code[2:4], 16)) | ||
byte2 = chr(int(flow_code[4:6], 16)) | ||
byte3 = chr(int(flow_code[6:8], 16)) | ||
max_flow = struct.unpack('>f', byte0 + byte1 + byte2 + byte3) | ||
return max_flow[0] | ||
|
||
def set_flow(self, flowrate): | ||
""" Set the setpoint of the flow """ | ||
ieee = struct.pack('>f', flowrate) | ||
ieee_flowrate = '' | ||
for i in range(0, 4): | ||
ieee_flowrate += hex(ord(ieee[i]))[2:].zfill(2) | ||
#39 = unit code for percent | ||
#FA = unit code for 'same unit as flowrate measurement' | ||
response = self.comm('82' + self.long_address + | ||
'ec05' + 'FA' + ieee_flowrate) | ||
status_code = response[0:4] | ||
unit_code = int(response[4:6], 16) | ||
# print('set flow: ' + flowrate) | ||
return status_code, unit_code | ||
|
||
if __name__ == '__main__': | ||
BROOKS = Brooks() | ||
print(BROOKS.long_address) | ||
print(BROOKS.read_flow()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
#!/usr/bin/python3 -u | ||
# -*- coding: utf-8 -*- | ||
# | ||
# This file is part of the BronkhorstPressureCtrl project | ||
# | ||
# | ||
# | ||
# Distributed under the terms of the GPL license. | ||
# See LICENSE.txt for more info. | ||
|
||
""" BronkhorstPressureCtrl PyTango Class (Serial connection) | ||
Class for controlling Bronkhorst Pressure/Flow controller via serial connection | ||
""" | ||
|
||
# PyTango imports | ||
from tango import Database, DevState, AttrWriteType | ||
from tango.server import run, Device, attribute, device_property | ||
# Additional import | ||
import Brooks as b | ||
|
||
|
||
__all__ = ["BrooksSLA", "main"] | ||
|
||
|
||
class BrooksSLA(Device): | ||
# ----------------- | ||
# Device Properties | ||
# ----------------- | ||
|
||
Port = device_property( | ||
dtype='DevString', | ||
doc='e.g., /dev/ttyBrooks' | ||
) | ||
|
||
ID = device_property( | ||
dtype='DevString', | ||
doc='12345678' | ||
) | ||
|
||
|
||
# ---------- | ||
# Attributes | ||
# ---------- | ||
|
||
Setpoint = attribute( | ||
dtype='DevDouble', | ||
access=AttrWriteType.READ_WRITE, | ||
unit="mbar", | ||
memorized=True, | ||
) | ||
Readback = attribute( | ||
dtype='DevDouble', | ||
access=AttrWriteType.READ, | ||
unit="mbar", | ||
memorized=True, | ||
) | ||
|
||
# --------------- | ||
# General methods | ||
# --------------- | ||
|
||
def init_device(self): | ||
"""Initialises the attributes and properties of the BrooksSLA.""" | ||
self.info_stream("init_device()") | ||
Device.init_device(self) | ||
self.set_state(DevState.INIT) | ||
|
||
self.info_stream("port: {:s}".format(self.Port)) | ||
self.info_stream("ID: {:s}".format(self.ID)) | ||
|
||
# connect to device | ||
self.sla = b.Brooks(self.ID, self.Port) | ||
|
||
attr = Database().get_device_attribute_property(self.get_name(), ["Setpoint"]) | ||
self.__setpoint = float(attr["Setpoint"]["__value"][0]) | ||
|
||
self.set_status("The device is in ON state") | ||
self.set_state(DevState.ON) | ||
|
||
|
||
def always_executed_hook(self): | ||
"""Method always executed before any TANGO command is executed.""" | ||
|
||
def delete_device(self): | ||
self.set_status("The device is in OFF state") | ||
self.set_state(DevState.OFF) | ||
|
||
|
||
# ------------------ | ||
# Attributes methods | ||
# ------------------ | ||
|
||
def read_Readback(self): | ||
return self.sla.read_flow() | ||
|
||
|
||
def read_Setpoint(self): | ||
return self.__setpoint | ||
|
||
def write_Setpoint(self, value): | ||
self.sla.set_flow(value) | ||
self.__setpoint=value | ||
pass | ||
|
||
# -------- | ||
# Commands | ||
# -------- | ||
|
||
def dev_state(self): | ||
self.set_status("The device is in ON state") | ||
self.debug_stream("device state: ON") | ||
return DevState.ON | ||
|
||
|
||
# ---------- | ||
# Run server | ||
# ---------- | ||
|
||
|
||
def main(args=None, **kwargs): | ||
return run((BrooksSLA,), args=args, **kwargs) | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
Oops, something went wrong.