Source code for lavuelib.controllerClient

# Copyright (C) 2017  DESY, Notkestr. 85, D-22607 Hamburg
#
# lavue is an image viewing program for photon science imaging detectors.
# Its usual application is as a live viewer using hidra as data source.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation in  version 2
# of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor,
# Boston, MA  02110-1301, USA.
#
# Authors:
#     Jan Kotanski <jan.kotanski@desy.de>
#     Christoph Rosemann <christoph.rosemann@desy.de>
#


""" tango lavue controller client"""

from pyqtgraph import QtCore

import logging

try:
    try:
        import tango
    except ImportError:
        import PyTango as tango
    #: (:obj:`bool`) tango imported
    TANGO = True
except ImportError:
    #: (:obj:`bool`) tango imported
    TANGO = False


logger = logging.getLogger("lavue")


[docs]class TangoCB(object): """ tango attribute callback class""" def __init__(self, client, name, signal): """ constructor :param client: tango controller client :type client: :class:`str` :param name: attribute name :type name: :obj:`str` :param signal: signal to emit :type signal: :class:`pyqtgraph.QtCore.pyqtSignal` """ self.__client = client self.__name = name self.__signal = signal
[docs] def push_event(self, *args, **kwargs): '''callback method receiving the event''' event_data = args[0] if event_data.err: result = event_data.errors logger.warning(str(result)) # print(result) else: result = event_data.attr_value.value self.__signal.emit(result)
[docs]class ControllerClient(QtCore.QObject): """ lavue controller client """ #: (:class:`pyqtgraph.QtCore.pyqtSignal`) energy changed signal energyChanged = QtCore.pyqtSignal(float) #: (:class:`pyqtgraph.QtCore.pyqtSignal`) detector distance changed signal detectorDistanceChanged = QtCore.pyqtSignal(float) #: (:class:`pyqtgraph.QtCore.pyqtSignal`) detector ROIs changed signal detectorROIsChanged = QtCore.pyqtSignal(str) #: (:class:`pyqtgraph.QtCore.pyqtSignal`) beam Center X changed signal beamCenterXChanged = QtCore.pyqtSignal(float) #: (:class:`pyqtgraph.QtCore.pyqtSignal`) beam Center Y changed signal beamCenterYChanged = QtCore.pyqtSignal(float) #: (:class:`pyqtgraph.QtCore.pyqtSignal`) pixel Size X changed signal pixelSizeXChanged = QtCore.pyqtSignal(float) #: (:class:`pyqtgraph.QtCore.pyqtSignal`) pixel Size Y changed signal pixelSizeYChanged = QtCore.pyqtSignal(float) #: (:class:`pyqtgraph.QtCore.pyqtSignal`) lavueState changed signal lavueStateChanged = QtCore.pyqtSignal(str) def __init__(self, device): """ constructor :param device: tango device name :type device: :obj:`str` """ QtCore.QObject.__init__(self) #: (:class:`tango.DeviceProxy`) controller device proxy self.__dp = tango.DeviceProxy(device) #: (:obj:`str`) tango device name self.__device = device #: (:obj:`bool`) subscribe flag self.__subscribed = False
[docs] def device(self): """ provides tango device name :returns: tango device name :rtype: :obj:`str` """ return self.__device
[docs] def subscribe(self): """ subscribe callback methods """ energy_cb = TangoCB(self, "Energy", self.energyChanged) distance_cb = TangoCB( self, "DetectorDistance", self.detectorDistanceChanged) rois_cb = TangoCB( self, "DetectorROIs", self.detectorROIsChanged) centerx_cb = TangoCB( self, "BeamCenterX", self.beamCenterXChanged) centery_cb = TangoCB( self, "BeamCenterY", self.beamCenterYChanged) pixelsizex_cb = TangoCB( self, "PixelSizeX", self.pixelSizeXChanged) pixelsizey_cb = TangoCB( self, "PixelSizeY", self.pixelSizeYChanged) lavuestate_cb = TangoCB( self, "LavueState", self.lavueStateChanged) self.__energy_id = self.__dp.subscribe_event( "Energy", tango.EventType.CHANGE_EVENT, energy_cb) self.__distance_id = self.__dp.subscribe_event( "DetectorDistance", tango.EventType.CHANGE_EVENT, distance_cb) self.__rois_id = self.__dp.subscribe_event( "DetectorROIs", tango.EventType.CHANGE_EVENT, rois_cb) self.__centerx_id = self.__dp.subscribe_event( "BeamCenterX", tango.EventType.CHANGE_EVENT, centerx_cb) self.__centery_id = self.__dp.subscribe_event( "BeamCenterY", tango.EventType.CHANGE_EVENT, centery_cb) self.__pixelsizex_id = self.__dp.subscribe_event( "PixelSizeX", tango.EventType.CHANGE_EVENT, pixelsizex_cb) self.__pixelsizey_id = self.__dp.subscribe_event( "PixelSizeY", tango.EventType.CHANGE_EVENT, pixelsizey_cb) self.__lavuestate_id = self.__dp.subscribe_event( "LavueState", tango.EventType.CHANGE_EVENT, lavuestate_cb) self.__subscribed = True
[docs] def writeAttribute(self, name, value): """ writes attribute value of device :param name: attribute name :type name: :obj:`str` :param value: attribute value :type value: :obj:`any` """ self.__dp.write_attribute(name, value)
[docs] def unsubscribe(self): """ unsubscribe callback methods """ if self.__subscribed: self.__dp.unsubscribe_event(self.__energy_id) self.__dp.unsubscribe_event(self.__distance_id) self.__dp.unsubscribe_event(self.__rois_id) self.__dp.unsubscribe_event(self.__centerx_id) self.__dp.unsubscribe_event(self.__centery_id) self.__dp.unsubscribe_event(self.__pixelsizex_id) self.__dp.unsubscribe_event(self.__pixelsizey_id) self.__dp.unsubscribe_event(self.__lavuestate_id) self.__subscribed = False