Source code for lavuelib.motorWatchThread

# Copyright (C) 2017  DESY, Christoph Rosemann, Notkestr. 85, D-22607 Hamburg
#
# lavue is an image viewing program for photon science imaging detectors.
# Its usual application is as a live viewer using hidra as data source.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation in  version 2
# of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor,
# Boston, MA  02110-1301, USA.
#
# Authors:
#     Christoph Rosemann <christoph.rosemann@desy.de>
#     Jan Kotanski <jan.kotanski@desy.de>
#

""" motor watch thread """

from __future__ import print_function
from __future__ import unicode_literals

import time
import json
import logging

from pyqtgraph import QtCore

from .omniQThread import OmniQThread
# from .sardanaUtils import debugmethod


#: (:obj:`float`) refresh time in seconds
GLOBALREFRESHTIME = .1
#: (:obj:`float`) polling inverval in seconds
POLLINGINTERVAL = 1.

logger = logging.getLogger("lavue")


# subclass for threading
[docs]class MotorWatchThread(OmniQThread): #: (:class:`pyqtgraph.QtCore.pyqtSignal`) signal with motor status motorStatusSignal = QtCore.pyqtSignal(float, str, float, str) #: (:class:`pyqtgraph.QtCore.pyqtSignal`) watching finished watchingFinished = QtCore.pyqtSignal() def __init__(self, motor1, motor2, server=None): """ constructor :param motor1: first motor device proxy :type motor1: :class:`tango.DeviceProxy` :param motor2: second motor device proxy :type motor2: :class:`tango.DeviceProxy` :param mserver: door server device proxy :type mserver: :class:`tango.DeviceProxy` """ OmniQThread.__init__(self) #: (:obj:`bool`) execute loop flag self.__loop = False #: (:class:`tango.DeviceProxy`) first motor device proxy self.__motor1 = motor1 #: (:class:`tango.DeviceProxy`) second motor device proxy self.__motor2 = motor2 #: (:class:`tango.DeviceProxy`) door server device proxy self.__mserver = server # @debugmethod def _run(self): """ runner of the fetching thread """ self.__loop = True while self.__loop: if time: time.sleep(GLOBALREFRESHTIME) try: state1 = str(self.__motor1.state()) pos1 = float(self.__motor1.position) state2 = str(self.__motor2.state()) pos2 = float(self.__motor2.position) self.motorStatusSignal.emit(pos1, state1, pos2, state2) if self.__mserver is not None: mstate = str(self.__mserver.state()) else: if state1 == "MOVING" or state2 == "MOVING": mstate = "MOVING" elif state1 == "RUNNING" or state2 == "RUNNING": mstate = "RUNNING" else: mstate = "ON" if mstate not in ["RUNNING", "MOVING"]: self.watchingFinished.emit() except Exception as e: logger.warning(str(e)) # @debugmethod
[docs] def isWatching(self): """ is datasource source connected :returns: if datasource source connected :rtype: :obj:`bool` """ return self.__loop
# @debugmethod
[docs] def stop(self): """ stops loop """ self.__loop = False
# subclass for threading
[docs]class AttributeWatchThread(OmniQThread): #: (:class:`pyqtgraph.QtCore.pyqtSignal`) signal with attribute values attrValuesSignal = QtCore.pyqtSignal(str) #: (:class:`pyqtgraph.QtCore.pyqtSignal`) watching finished watchingFinished = QtCore.pyqtSignal() def __init__(self, aproxies, refreshtime=None): """ constructor :param refreshtime: refresh time :type refreshtime: :class:`tango.DeviceProxy` :param aproxies: attribute proxies :type aproxies: :obj:`list` <:class:`tango.DeviceProxy`> """ OmniQThread.__init__(self) #: (:obj:`bool`) execute loop flag self.__loop = False #: (:obj:`bool`) execute loop flag self.__refreshtime = refreshtime or POLLINGINTERVAL #: (:obj:`list` <:class:`tango.DeviceProxy`>) attribute proxies self.__aproxies = aproxies or [] # @debugmethod def _run(self): """ runner of the fetching thread """ self.__loop = True while self.__loop: # logger.debug("ATTR LOOP %s" % (self.__loop)) try: attrs = [] for ap in self.__aproxies: ra = ap.read() vl = ra.value if hasattr(vl, "tolist"): vl = vl.tolist() attrs.append(vl) self.attrValuesSignal.emit(str(json.dumps(attrs))) except Exception as e: logger.warning(str(e)) if time and self.__loop: time.sleep(self.__refreshtime) # @debugmethod
[docs] def isWatching(self): """ is datasource source connected :returns: if datasource source connected :rtype: :obj:`bool` """ return self.__loop
# @debugmethod
[docs] def stop(self): """ stops loop """ self.__loop = False
# logger.debug("STOP LOOP %s" % (self.__loop))