# Copyright (C) 2017 DESY, Notkestr. 85, D-22607 Hamburg
#
# lavue is an image viewing program for photon science imaging detectors.
# Its usual application is as a live viewer using hidra as data source.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation in version 2
# of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor,
# Boston, MA 02110-1301, USA.
#
# Authors:
# Jan Kotanski <jan.kotanski@desy.de>
# Christoph Rosemann <christoph.rosemann@desy.de>
#
""" sardana utils """
import pickle
import json
import time
import sys
import logging
import functools
import numpy as np
try:
try:
import tango
except ImportError:
import PyTango as tango
#: (:obj:`bool`) tango imported
TANGO = True
except ImportError:
#: (:obj:`bool`) tango imported
TANGO = False
if sys.version_info > (3,):
basestring = str
logger = logging.getLogger("lavue")
if sys.version_info > (3,):
unicode = str
[docs]def debugmethod(method):
""" debug wrapper for methods
:param method: any class method
:type method: :class:`any`
:returns: wrapped class method
:rtype: :class:`any`
"""
if logger.getEffectiveLevel() >= 10:
@functools.wraps(method)
def decmethod(*args, **kwargs):
name = "%s.%s.%s" % (
args[0].__class__.__module__,
args[0].__class__.__name__,
method.__name__
)
if args[1:]:
margs = " with %s " % str(args[1:])
else:
margs = ""
logger.debug("%s: excecuted %s" % (name, margs))
ret = method(*args, **kwargs)
logger.debug("%s: returns %s" % (
name, str(ret) if ret is not None else ''))
return ret
return decmethod
else:
return method
[docs]class numpyEncoder(json.JSONEncoder):
""" numpy json encoder with list
"""
[docs] def default(self, obj):
""" default encoder
:param obj: numpy array object
:type obj: :obj:`object` or `any`
"""
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, np.bool_):
return bool(obj)
return json.JSONEncoder.default(self, obj)
[docs]class SardanaUtils(object):
""" sardanamacro server"""
@debugmethod
def __init__(self):
""" constructor """
#: (:obj:`list` <:class:`tango.DeviceProxy`>) pool tango servers
self.__pools = []
try:
#: (:class:`tango.Database`) tango database
self.__db = tango.Database()
except Exception:
self.__db = None
[docs] @classmethod
def openProxy(cls, device, counter=100):
""" opens device proxy of the given device
:param device: device name
:type device: :obj:`str`
:returns: DeviceProxy of device
:rtype: :class:`tango.DeviceProxy`
"""
found = False
cnt = 0
cnfServer = tango.DeviceProxy(str(device))
while not found and cnt < counter:
if cnt > 1:
time.sleep(0.01)
try:
cnfServer.ping()
found = True
except tango.DevFailed:
time.sleep(0.01)
found = False
if cnt == counter - 1:
raise
cnt += 1
return cnfServer
[docs] @debugmethod
def getMacroServer(self, door):
""" door macro server device name
:param door: door device name
:type door: :obj:`str`
:returns: macroserver device proxy
:rtype: :class:`tango.DeviceProxy`
"""
if not door:
raise Exception("Door '%s' cannot be found" % door)
logger.debug("SardanaUtils.getMacroServer: Door = %s" % door)
sdoor = door.split("/")
tangohost = None
if len(sdoor) > 1 and ":" in sdoor[0]:
door = "/".join(sdoor[1:])
tangohost = sdoor[0]
logger.debug(
"SardanaUtils.getMacroServer: Tango Host = %s" % tangohost)
if tangohost or not self.__db:
host, port = tangohost.split(":")
db = tango.Database(host, int(port))
else:
db = self.__db
logger.debug("SardanaUtils.getMacroServer: Database = %s" % str(db))
servers = db.get_device_exported_for_class("MacroServer").value_string
logger.debug(
"SardanaUtils.getMacroServer: MacroSevers = %s" % str(servers))
ms = None
for server in servers:
dp = None
if tangohost and ":" not in server:
msname = "%s/%s" % (tangohost, server)
else:
msname = str(server)
try:
dp = self.openProxy(msname)
except Exception as e:
logger.warning("SardanaUtils.getMacroServer: %s" % str(e))
# print(str(e))
dp = None
if hasattr(dp, "DoorList") and dp.DoorList:
lst = [str(dr).lower() for dr in dp.DoorList]
logger.debug(
"SardanaUtils.getMacroServer: DoorList = %s" % str(lst))
if lst and (door.lower() in lst or
("%s/%s" % (tangohost, door.lower()) in lst)):
ms = dp
logger.debug(
"SardanaUtils.getMacroServer: "
"Door MacroServer = %s" % str(ms))
break
return ms
[docs] @classmethod
def pickleloads(cls, bytestr):
""" loads pickle byte string
:param bytestr: byte string to convert
:type bytesstr: :obj:`bytes`
:returns: loaded bytestring
:rtype: :obj:`any`
"""
if sys.version_info > (3,):
return pickle.loads(bytestr, encoding='latin1')
else:
return pickle.loads(bytestr)
[docs] @debugmethod
def getScanEnv(self, door, params=None):
""" fetches Scan Environment Data
:param door: door device
:type door: :obj:`str`
:returns: JSON String with important variables
:rtype: :obj:`str`
"""
params = params or []
res = {}
msp = self.getMacroServer(door)
rec = msp.Environment
if rec[0] == 'pickle':
dc = self.pickleloads(rec[1])
if 'new' in dc.keys():
for var in params:
if var in dc['new'].keys():
res[var] = dc['new'][var]
return json.dumps(res, cls=numpyEncoder)
[docs] @debugmethod
def getDeviceName(self, cname, db=None):
""" finds device of give class
:param cname: device class name
:type cname: :obj:`str`
:param db: tango database
:type db: :class:`tango.Database`
:returns: device name if exists
:rtype: :obj:`str`
"""
if db is None:
db = self.__db
try:
servers = db.get_device_exported_for_class(cname).value_string
except Exception:
servers = []
device = ''
for server in servers:
try:
dp = self.openProxy(str(server))
dp.ping()
device = server
break
except tango.DevFailed:
pass
return device
[docs] @debugmethod
def setScanEnv(self, door, jdata):
""" stores Scan Environment Data
:param door: door device
:type door: :obj:`str`
:param jdata: JSON String with important variables
:type jdata: :obj:`str`
"""
data = json.loads(jdata)
msp = self.getMacroServer(door)
dc = {'new': {}}
for var in data.keys():
dc['new'][str(var)] = self.toString(data[var])
try:
pk = pickle.dumps(dc, protocol=2)
msp.Environment = ['pickle', pk]
except Exception:
if sys.version_info < (3,):
raise
if isinstance(data, dict):
newvalue = {}
for key, vl in dc.items():
if isinstance(vl, dict):
nvl = {}
for ky, it in vl.items():
nvl[bytes(ky, "utf8")
if isinstance(ky, unicode) else ky] = it
newvalue[bytes(key, "utf8")
if isinstance(key, unicode) else key] = nvl
else:
newvalue[bytes(key, "utf8")
if isinstance(key, unicode) else key] = vl
else:
newvalue = dc
pk = pickle.dumps(newvalue, protocol=2)
msp.Environment = ['pickle', pk]
[docs] @debugmethod
def wait(self, name=None, proxy=None, maxcount=100):
""" stores Scan Environment Data
:param name: device name
:type name: :obj:`str`
:param proxy: door device proxy
:type proxy: :obj:`str`
:param maxcount: number of 0.01s to wait
:type maxcount: :obj:`int`
"""
if proxy is None:
proxy = self.openProxy(name)
for _ in range(maxcount):
if proxy.state() == tango.DevState.ON:
break
time.sleep(0.01)
[docs] @debugmethod
def runMacro(self, door, command, wait=True):
""" stores Scan Environment Data
:param door: door device
:type door: :obj:`str`
:param command: list with the macro name and its parameters
:type command: :obj:`list` <:obj:`str`>
:param wait: wait till macro is finished
:type wait: :obj:`bool`
:returns: result, error or warning
:rtype: [:obj:`str`, :obj:`str`]
"""
doorproxy = self.openProxy(door)
msp = self.getMacroServer(door)
ml = msp.MacroList
if len(command) == 0:
raise Exception("Macro %s not found" % str(command))
elif not command[0]:
raise Exception("Macro %s not found" % str(command))
elif command[0] not in ml:
raise Exception("Macro '%s' not found" % str(command[0]))
state = str(doorproxy.state())
if state not in ["ON", "ALARM"]:
raise Exception("Door in state '%s'" % str(state))
try:
doorproxy.RunMacro(command)
except tango.DevFailed as e:
if e.args and e.args[0].reason == 'API_CommandNotAllowed':
self.wait(proxy=doorproxy)
doorproxy.RunMacro(command)
else:
raise
if wait:
self.wait(proxy=doorproxy)
warn = doorproxy.warning
error = doorproxy.error
res = doorproxy.result
return res, error or warn
else:
return None, None
[docs] @debugmethod
def getError(self, door):
""" stores Scan Environment Data
:param door: door device
:type door: :obj:`str`
:returns: error or warning
:rtype: :obj:`str`
"""
doorproxy = self.openProxy(door)
warn = doorproxy.warning
error = doorproxy.error
return error or warn
[docs] @classmethod
def toString(cls, obj):
""" converts list/dict/object of unicode/string to string object
:param obj: given unicode/string object
:type obj: `any`
:returns: string object
:rtype: :obj:`str`
"""
if isinstance(obj, basestring):
return str(obj)
elif isinstance(obj, list):
return [cls.toString(el) for el in obj]
elif isinstance(obj, dict):
return dict([(cls.toString(key), cls.toString(value))
for key, value in obj.items()])
else:
return obj
[docs] @debugmethod
def getElementNames(self, door, listattr, typefilter=None):
""" provides experimental Channels
:param door: door device name
:type door: :obj:`str`
:param listattr: pool attribute with list
:type listattr: :obj:`str`
:param typefilter: pool attribute with list
:type typefilter: :obj:`list` <:obj:`str`>
:returns: names from given pool listattr
:rtype: :obj:`list` <:obj:`str`>
"""
lst = []
elements = []
if not self.__pools:
self.getPools(door)
for pool in self.__pools:
if hasattr(pool, listattr):
ellist = getattr(pool, listattr)
if ellist:
lst += ellist
for elm in lst:
if elm:
chan = json.loads(elm)
if chan and isinstance(chan, dict):
if typefilter:
if chan['type'] not in typefilter:
continue
elements.append(chan['name'])
return elements
[docs] @debugmethod
def getPools(self, door):
""" provides pool devices
:param door: door device name
:type door: :obj:`str`
"""
self.__pools = []
host = None
port = None
if not door:
raise Exception("Door '%s' cannot be found" % door)
if ":" in door.split("/")[0] and len(door.split("/")) > 1:
host, port = door.split("/")[0].split(":")
msp = self.getMacroServer(door)
poolNames = msp.get_property("PoolNames")["PoolNames"]
if not poolNames:
poolNames = []
poolNames = ["%s/%s" % (door.split("/")[0], pn)
if (host and ":" not in pn)
else pn
for pn in poolNames]
self.__pools = self.getProxies(poolNames)
return self.__pools
[docs] @classmethod
def getProxies(cls, names):
""" provides proxies of given device names
:param names: given device names
:type names: :obj:`list` <:obj:`str`>
:returns: list of device DeviceProxies
:rtype: :obj:`list` <:class:`tango.DeviceProxy`>
"""
dps = []
for name in names:
dp = tango.DeviceProxy(str(name))
try:
dp.ping()
dps.append(dp)
except tango.DevFailed:
pass
return dps