#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""The device pool submodule.
It contains specific part of sardana device pool"""
__all__ = ["InterruptException", "StopException", "AbortException",
"BaseElement", "ControllerClass", "ControllerLibrary",
"PoolElement", "Controller", "ComChannel", "ExpChannel",
"CTExpChannel", "ZeroDExpChannel", "OneDExpChannel",
"TwoDExpChannel", "PseudoCounter", "Motor", "PseudoMotor",
"MotorGroup", "TriggerGate",
"MeasurementGroup", "IORegister", "Instrument", "Pool",
"registerExtensions", "getChannelConfigs"]
__docformat__ = 'restructuredtext'
import copy
import operator
import os
import sys
import time
import traceback
import weakref
import numpy
import PyTango
from PyTango import DevState, AttrDataFormat, AttrQuality, DevFailed, \
DeviceProxy
from taurus import Factory, Device, Attribute
from taurus.core.taurusbasetypes import TaurusEventType
try:
from taurus.core.taurusvalidator import AttributeNameValidator as \
TangoAttributeNameValidator
except ImportError:
# TODO: For Taurus 4 compatibility
from taurus.core.tango.tangovalidator import TangoAttributeNameValidator
from taurus.core.util.log import Logger
from taurus.core.util.codecs import CodecFactory
from taurus.core.util.containers import CaselessDict
from taurus.core.util.event import EventGenerator, AttributeEventWait, \
AttributeEventIterator
from taurus.core.tango import TangoDevice, FROM_TANGO_TO_STR_TYPE
from .sardana import BaseSardanaElementContainer, BaseSardanaElement
from .motion import Moveable, MoveableSource
Ready = Standby = DevState.ON
Counting = Acquiring = Moving = DevState.MOVING
Alarm = DevState.ALARM
Fault = DevState.FAULT
CHANGE_EVT_TYPES = TaurusEventType.Change, TaurusEventType.Periodic
MOVEABLE_TYPES = 'Motor', 'PseudoMotor', 'MotorGroup'
QUALITY = {
AttrQuality.ATTR_VALID: 'VALID',
AttrQuality.ATTR_INVALID: 'INVALID',
AttrQuality.ATTR_CHANGING: 'CHANGING',
AttrQuality.ATTR_WARNING: 'WARNING',
AttrQuality.ATTR_ALARM: 'ALARM',
None: 'UNKNOWN'
}
class InterruptException(Exception):
pass
class StopException(InterruptException):
pass
class AbortException(InterruptException):
pass
[docs]class BaseElement(object):
""" The base class for elements in the Pool (Pool itself, Motor,
ControllerClass, ExpChannel all should inherit from this class directly or
indirectly)
"""
def __repr__(self):
pd = self.getPoolData()
return "{0}({1})".format(pd['type'], pd['full_name'])
def __str__(self):
return self.getName()
[docs] def serialize(self):
return self.getPoolData()
[docs] def str(self, n=0):
"""Returns a sequence of strings representing the object in
'consistent' way.
Default is to return <name>, <controller name>, <axis>
:param n: the number of elements in the tuple."""
if n == 0:
return CodecFactory.encode(('json'), self.serialize())
return self._str_tuple[:n]
def __cmp__(self, o):
return cmp(self.getPoolData()['full_name'],
o.getPoolData()['full_name'])
[docs] def getName(self):
return self.getPoolData()['name']
[docs] def getPoolObj(self):
"""Get reference to this object's Pool."""
return self._pool_obj
[docs] def getPoolData(self):
"""Get reference to this object's Pool data."""
return self._pool_data
[docs]class ControllerClass(BaseElement):
def __init__(self, **kw):
self.__dict__.update(kw)
self.path, self.f_name = os.path.split(self.file_name)
self.lib_name, self.ext = os.path.splitext(self.f_name)
def __repr__(self):
pd = self.getPoolData()
return "ControllerClass({0})".format(pd['full_name'])
[docs] def getSimpleFileName(self):
return self.f_name
[docs] def getFileName(self):
return self.file_name
[docs] def getClassName(self):
return self.getName()
[docs] def getType(self):
return self.getTypes()[0]
[docs] def getTypes(self):
return self.types
[docs] def getLib(self):
return self.f_name
[docs] def getGender(self):
return self.gender
[docs] def getModel(self):
return self.model
[docs] def getOrganization(self):
return self.organization
def __cmp__(self, o):
t = cmp(self.getType(), o.getType())
if t != 0:
return t
t = cmp(self.getGender(), o.getGender())
if t != 0:
return t
return cmp(self.getClassName(), o.getClassName())
class ControllerLibrary(BaseElement):
def __init__(self, **kw):
self.__dict__.update(kw)
def getType(self):
return self.getTypes()[0]
def getTypes(self):
return self.type
class TangoAttributeEG(Logger, EventGenerator):
"""An event generator for a 'State' attribute"""
def __init__(self, attr):
self._attr = attr
self.call__init__(Logger, 'EG', attr)
event_name = '%s EG' % (attr.getParentObj().getNormalName())
self.call__init__(EventGenerator, event_name)
self._attr.addListener(self)
def getAttribute(self):
return self._attr
def eventReceived(self, evt_src, evt_type, evt_value):
"""Event handler from Taurus"""
if evt_type not in CHANGE_EVT_TYPES:
return
if evt_value is None:
v = None
else:
v = evt_value.value
EventGenerator.fireEvent(self, v)
def read(self, force=False):
try:
self.last_val = self._attr.read(cache=not force).value
except:
self.error("Read error")
self.debug("Details:", exc_info=1)
self.last_val = None
return EventGenerator.read(self)
def readValue(self, force=False):
r = self.read(force=force)
if r is None:
# do a retry
r = self.read(force=force)
return r
def write(self, value):
self._attr.write(value, with_read=False)
def __getattr__(self, name):
return getattr(self._attr, name)
def reservedOperation(fn):
def new_fn(*args, **kwargs):
self = args[0]
wr = self.getReservedWR()
if wr is not None:
if wr().isStopped():
raise StopException("stopped before calling %s" % fn.__name__)
elif wr().isAborted():
raise AbortException("aborted before calling %s" % fn.__name__)
try:
return fn(*args, **kwargs)
except:
print("Exception occurred in reserved operation:"
" clearing events...")
self._clearEventWait()
raise
return new_fn
def get_pool_for_device(db, device):
server_devs = db.get_device_class_list(device.info().server_id)
for dev_name, klass_name in zip(server_devs[0::2], server_devs[1::2]):
if klass_name == "Pool":
return Device(dev_name)
[docs]class PoolElement(BaseElement, TangoDevice):
"""Base class for a Pool element device."""
def __init__(self, name, **kwargs):
"""PoolElement initialization."""
self._reserved = None
self._evt_wait = None
self.__go_start_time = 0
self.__go_end_time = 0
self.__go_time = 0
self._total_go_time = 0
self.call__init__(TangoDevice, name, **kwargs)
# dict<string, TangoAttributeEG>
# key : the attribute name
# value : the corresponding TangoAttributeEG
self._attrEG = CaselessDict()
# force the creation of a state attribute
self.getStateEG()
def _find_pool_obj(self):
pool = get_pool_for_device(self.getParentObj(), self.getHWObj())
return pool
def _find_pool_data(self):
pool = self._find_pool_obj()
return pool.getElementInfo(self.getFullName())._data
# Override BaseElement.getPoolObj because the reference to pool object may
# not be filled. This reference is filled when the element is obtained
# using Pool.getObject. If one obtain the element directly using Taurus
# e.g. mot = taurus.Device(<mot_name>) it won't be filled. In this case
# look for the pool object using the database information.
[docs] def getPoolObj(self):
try:
return self._pool_obj
except AttributeError:
self._pool_obj = self._find_pool_obj()
return self._pool_obj
# Override BaseElement.getPoolData because the reference to pool data may
# not be filled. This reference is filled when the element is obtained
# using Pool.getPoolData. If one obtain the element directly using Taurus
# e.g. mot = taurus.Device(<mot_name>) it won't be filled. In this case
# look for the pool object and its data using the database information.
[docs] def getPoolData(self):
try:
return self._pool_data
except AttributeError:
self._pool_data = self._find_pool_data()
return self._pool_data
[docs] def cleanUp(self):
TangoDevice.cleanUp(self)
self._reserved = None
f = self.factory()
attr_map = self._attrEG
for attr_name in attr_map.keys():
attrEG = attr_map.pop(attr_name)
attr = attrEG.getAttribute()
attrEG = None
f.removeExistingAttribute(attr)
[docs] def reserve(self, obj):
if obj is None:
self._reserved = None
return
self._reserved = weakref.ref(obj, self._unreserveCB)
def _unreserveCB(self, obj):
self.unreserve()
[docs] def unreserve(self):
self._reserved = None
[docs] def isReserved(self, obj=None):
if obj is None:
return self._reserved is not None
else:
o = self._reserved()
return o == obj
[docs] def getReservedWR(self):
return self._reserved
[docs] def getReserved(self):
if self._reserved is None:
return None
return self._reserved()
[docs] def dump_attributes(self):
attr_names = self.get_attribute_list()
req_id = self.read_attributes_asynch(attr_names)
return self.read_attributes_reply(req_id, 2000)
def _getAttrValue(self, name, force=False):
attrEG = self._getAttrEG(name)
if attrEG is None:
return None
return attrEG.readValue(force=force)
def _getAttrEG(self, name):
attrEG = self.getAttrEG(name)
if attrEG is None:
attrEG = self._createAttribute(name)
return attrEG
def _createAttribute(self, name):
attrObj = self.getAttribute(name)
if attrObj is None:
self.warning("Unable to create attribute %s" % name)
return None, None
attrEG = TangoAttributeEG(attrObj)
self._attrEG[name] = attrEG
return attrEG
def _getEventWait(self):
if self._evt_wait is None:
# create an object that waits for attribute events.
# each time we use it we have to connect and disconnect to an
# attribute
self._evt_wait = AttributeEventWait()
return self._evt_wait
def _clearEventWait(self):
self._evt_wait = None
[docs] def getStateEG(self):
return self._getAttrEG('state')
[docs] def getControllerName(self):
return self.getControllerObj().name
[docs] def getControllerObj(self):
full_ctrl_name = self.getPoolData()['controller']
return self.getPoolObj().getObj(full_ctrl_name, "Controller")
[docs] def getAxis(self):
return self.getPoolData()['axis']
[docs] def getType(self):
return self.getPoolData()['type']
[docs] def waitReady(self, timeout=None):
return self.getStateEG().waitEvent(Moving, equal=False,
timeout=timeout)
[docs] def getAttrEG(self, name):
"""Returns the TangoAttributeEG object"""
return self._attrEG.get(name)
[docs] def getAttrObj(self, name):
"""Returns the taurus.core.tangoattribute.TangoAttribute object"""
attrEG = self._attrEG.get(name)
if attrEG is None:
return None
return attrEG.getAttribute()
[docs] def getInstrumentObj(self):
return self._getAttrEG('instrument')
[docs] def getInstrumentName(self, force=False):
instr_name = self._getAttrValue('instrument', force=force)
if not instr_name:
return ''
# instr_name = instr_name[:instr_name.index('(')]
return instr_name
[docs] def getInstrument(self):
instr_name = self.getInstrumentName()
if not instr_name:
return None
return self.getPoolObj().getObj("Instrument", instr_name)
[docs] @reservedOperation
def start(self, *args, **kwargs):
evt_wait = self._getEventWait()
evt_wait.connect(self.getAttribute("state"))
evt_wait.lock()
try:
evt_wait.waitEvent(DevState.MOVING, equal=False)
self.__go_time = 0
self.__go_start_time = ts1 = time.time()
self._start(*args, **kwargs)
ts2 = time.time()
evt_wait.waitEvent(DevState.MOVING, after=ts1)
except:
evt_wait.disconnect()
raise
finally:
evt_wait.unlock()
ts2 = evt_wait.getRecordedEvents().get(DevState.MOVING, ts2)
return (ts2,)
[docs] def waitFinish(self, timeout=None, id=None):
"""Wait for the operation to finish
:param timeout: optional timeout (seconds)
:type timeout: float
:param id: id of the opertation returned by start
:type id: tuple(float)
"""
# Due to taurus-org/taurus #573 we need to divide the timeout
# in two intervals
if timeout is not None:
timeout = timeout / 2
if id is not None:
id = id[0]
evt_wait = self._getEventWait()
evt_wait.lock()
try:
evt_wait.waitEvent(DevState.MOVING, after=id, equal=False,
timeout=timeout, retries=1)
finally:
self.__go_end_time = time.time()
self.__go_time = self.__go_end_time - self.__go_start_time
evt_wait.unlock()
evt_wait.disconnect()
[docs] @reservedOperation
def go(self, *args, **kwargs):
self._total_go_time = 0
start_time = time.time()
eid = self.start(*args, **kwargs)
self.waitFinish(id=eid)
self._total_go_time = time.time() - start_time
[docs] def getLastGoTime(self):
"""Returns the time it took for last go operation"""
return self.__go_time
[docs] def getTotalLastGoTime(self):
"""Returns the time it took for last go operation, including dead time
to prepare, wait for events, etc"""
return self._total_go_time
[docs] def abort(self, wait_ready=True, timeout=None):
state = self.getStateEG()
state.lock()
try:
self.command_inout("Abort")
if wait_ready:
self.waitReady(timeout=timeout)
finally:
state.unlock()
[docs] def stop(self, wait_ready=True, timeout=None):
state = self.getStateEG()
state.lock()
try:
self.command_inout("Stop")
if wait_ready:
self.waitReady(timeout=timeout)
finally:
state.unlock()
def _information(self, tab=' '):
indent = "\n" + tab + 10 * ' '
msg = [self.getName() + ":"]
try:
# TODO: For Taurus 4 / Taurus 3 compatibility
if hasattr(self, "stateObj"):
state_value = self.stateObj.read().rvalue
# state_value is DevState enumeration (IntEnum)
state = state_value.name.capitalize()
else:
state = str(self.state()).capitalize()
except DevFailed as df:
if len(df.args):
state = df.args[0].desc
else:
e_info = sys.exc_info()[:2]
state = traceback.format_exception_only(*e_info)
except:
e_info = sys.exc_info()[:2]
state = traceback.format_exception_only(*e_info)
try:
msg.append(tab + " State: " + state)
except TypeError:
msg.append(tab + " State: " + state[0])
try:
e_info = sys.exc_info()[:2]
status = self.status()
status = status.replace('\n', indent)
except DevFailed as df:
if len(df.args):
status = df.args[0].desc
else:
e_info = sys.exc_info()[:2]
status = traceback.format_exception_only(*e_info)
except:
e_info = sys.exc_info()[:2]
status = traceback.format_exception_only(*e_info)
msg.append(tab + " Status: " + status)
return msg
[docs]class Controller(PoolElement):
""" Class encapsulating Controller functionality."""
def __init__(self, name, **kw):
"""PoolElement initialization."""
self.call__init__(PoolElement, name, **kw)
[docs] def getModuleName(self):
return self.getPoolData()['module']
[docs] def getClassName(self):
return self.getPoolData()['klass']
[docs] def getTypes(self):
return self.getPoolData()['types']
[docs] def getMainType(self):
return self.getPoolData()['main_type']
[docs] def addElement(self, elem):
axis = elem.getAxis()
self._elems[axis] = elem
self._last_axis = max(self._last_axis, axis)
[docs] def removeElement(self, elem):
axis = elem.getAxis()
del self._elems[elem.getAxis()]
if axis == self._last_axis:
self._last_axis = max(self._elems)
[docs] def getElementByAxis(self, axis):
pool = self.getPoolObj()
for _, elem in pool.getElementsOfType(self.getMainType()).items():
if (elem.controller != self.getFullName() or
elem.getAxis() != axis):
continue
return elem
[docs] def getElementByName(self, name):
pool = self.getPoolObj()
for _, elem in pool.getElementsOfType(self.getMainType()).items():
if (elem.controller != self.getFullName() or
elem.getName() != name):
continue
return elem
[docs] def getUsedAxes(self):
"""Return axes in use by this controller
:return: list of axes
:rtype: list<int>
"""
pool = self.getPoolObj()
axes = []
for _, elem in pool.getElementsOfType(self.getMainType()).items():
if elem.controller != self.getFullName():
continue
axes.append(elem.getAxis())
return sorted(axes)
[docs] def getUsedAxis(self):
msg = ("getUsedAxis is deprecated since version 2.5.0. ",
"Use getUsedAxes instead.")
self.warning(msg)
self.getUsedAxes()
[docs] def getLastUsedAxis(self):
"""Return the last used axis (the highest axis) in this controller
:return: last used axis
:rtype: int or None
"""
used_axes = self.getUsedAxes()
if len(used_axes) == 0:
return None
return max(used_axes)
def __cmp__(self, o):
return cmp(self.getName(), o.getName())
class ComChannel(PoolElement):
""" Class encapsulating CommunicationChannel functionality."""
pass
[docs]class ExpChannel(PoolElement):
""" Class encapsulating ExpChannel functionality."""
def __init__(self, name, **kw):
"""ExpChannel initialization."""
self.call__init__(PoolElement, name, **kw)
self._value_buffer = {}
[docs] def getValueObj_(self):
"""Retrurns Value attribute event generator object.
:return: Value attribute event generator
:rtype: TangoAttributeEG
..todo:: When support to Taurus 3 will be dropped provide getValueObj.
Taurus 3 TaurusDevice class already uses this name.
"""
return self._getAttrEG('value')
[docs] def getValue(self, force=False):
return self._getAttrValue('value', force=force)
[docs] def getValueBufferObj(self):
return self._getAttrEG('data')
[docs] def getValueBuffer(self):
return self._value_buffer
[docs] def valueBufferChanged(self, value_buffer):
if value_buffer is None:
return
_, value_buffer = self._codec.decode(('json', value_buffer),
ensure_ascii=True)
indexes = value_buffer["index"]
values = value_buffer["data"]
for index, value in zip(indexes, values):
self._value_buffer[index] = value
[docs]class CTExpChannel(ExpChannel):
""" Class encapsulating CTExpChannel functionality."""
pass
[docs]class ZeroDExpChannel(ExpChannel):
""" Class encapsulating ZeroDExpChannel functionality."""
pass
[docs]class OneDExpChannel(ExpChannel):
""" Class encapsulating OneDExpChannel functionality."""
pass
[docs]class TwoDExpChannel(ExpChannel):
""" Class encapsulating TwoDExpChannel functionality."""
pass
[docs]class PseudoCounter(ExpChannel):
""" Class encapsulating PseudoCounter functionality."""
pass
[docs]class TriggerGate(PoolElement):
""" Class encapsulating TriggerGate functionality."""
pass
[docs]class Motor(PoolElement, Moveable):
""" Class encapsulating Motor functionality."""
def __init__(self, name, **kw):
"""PoolElement initialization."""
self.call__init__(PoolElement, name, **kw)
self.call__init__(Moveable)
[docs] def getPosition(self, force=False):
return self._getAttrValue('position', force=force)
[docs] def getDialPosition(self, force=False):
return self._getAttrValue('dialposition', force=force)
[docs] def getVelocity(self, force=False):
return self._getAttrValue('velocity', force=force)
[docs] def getAcceleration(self, force=False):
return self._getAttrValue('acceleration', force=force)
[docs] def getDeceleration(self, force=False):
return self._getAttrValue('deceleration', force=force)
[docs] def getBaseRate(self, force=False):
return self._getAttrValue('base_rate', force=force)
[docs] def getBacklash(self, force=False):
return self._getAttrValue('backlash', force=force)
[docs] def getLimitSwitches(self, force=False):
return self._getAttrValue('limit_switches', force=force)
[docs] def getOffset(self, force=False):
return self._getAttrValue('offset', force=force)
[docs] def getStepPerUnit(self, force=False):
return self._getAttrValue('step_per_unit', force=force)
[docs] def getSign(self, force=False):
return self._getAttrValue('Sign', force=force)
[docs] def getSimulationMode(self, force=False):
return self._getAttrValue('SimulationMode', force=force)
[docs] def getPositionObj(self):
return self._getAttrEG('position')
[docs] def getDialPositionObj(self):
return self._getAttrEG('dialposition')
[docs] def getVelocityObj(self):
return self._getAttrEG('velocity')
[docs] def getAccelerationObj(self):
return self._getAttrEG('acceleration')
[docs] def getDecelerationObj(self):
return self._getAttrEG('deceleration')
[docs] def getBaseRateObj(self):
return self._getAttrEG('base_rate')
[docs] def getBacklashObj(self):
return self._getAttrEG('backlash')
[docs] def getLimitSwitchesObj(self):
return self._getAttrEG('limit_switches')
[docs] def getOffsetObj(self):
return self._getAttrEG('offset')
[docs] def getStepPerUnitObj(self):
return self._getAttrEG('step_per_unit')
[docs] def getSimulationModeObj(self):
return self._getAttrEG('step_per_unit')
[docs] def setVelocity(self, value):
return self.getVelocityObj().write(value)
[docs] def setAcceleration(self, value):
return self.getAccelerationObj().write(value)
[docs] def setDeceleration(self, value):
return self.getDecelerationObj().write(value)
[docs] def setBaseRate(self, value):
return self.getBaseRateObj().write(value)
[docs] def setBacklash(self, value):
return self.getBacklashObj().write(value)
[docs] def setOffset(self, value):
return self.getOffsetObj().write(value)
[docs] def setStepPerUnit(self, value):
return self.getStepPerUnitObj().write(value)
[docs] def setSign(self, value):
return self.getSignObj().write(value)
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
# Moveable interface
#
def _start(self, *args, **kwargs):
new_pos = args[0]
if operator.isSequenceType(new_pos):
new_pos = new_pos[0]
try:
self.write_attribute('position', new_pos)
except DevFailed as df:
for err in df:
if err.reason == 'API_AttrNotAllowed':
raise RuntimeError('%s is already moving' % self)
else:
raise
self.final_pos = new_pos
[docs] def go(self, *args, **kwargs):
start_time = time.time()
PoolElement.go(self, *args, **kwargs)
ret = self.getStateEG().readValue(), self.readPosition()
self._total_go_time = time.time() - start_time
return ret
startMove = PoolElement.start
waitMove = PoolElement.waitFinish
move = go
getLastMotionTime = PoolElement.getLastGoTime
getTotalLastMotionTime = PoolElement.getTotalLastGoTime
[docs] @reservedOperation
def iterMove(self, new_pos, timeout=None):
if operator.isSequenceType(new_pos):
new_pos = new_pos[0]
state, pos = self.getAttribute("state"), self.getAttribute("position")
evt_wait = self._getEventWait()
evt_wait.connect(state)
evt_wait.lock()
try:
# evt_wait.waitEvent(DevState.MOVING, equal=False)
time_stamp = time.time()
try:
self.getPositionObj().write(new_pos)
except DevFailed as err_traceback:
for err in err_traceback:
if err.reason == 'API_AttrNotAllowed':
raise RuntimeError('%s is already moving' % self)
else:
raise
self.final_pos = new_pos
# putting timeout=0.1 and retries=1 is a patch for the case when
# the initial moving event doesn't arrive do to an unknown
# tango/pytango error at the time
evt_wait.waitEvent(DevState.MOVING, time_stamp,
timeout=0.1, retries=1)
finally:
evt_wait.unlock()
evt_wait.disconnect()
evt_iter_wait = AttributeEventIterator(state, pos)
evt_iter_wait.lock()
try:
for evt_data in evt_iter_wait.events():
src, value = evt_data
if src == state and value != DevState.MOVING:
raise StopIteration
yield value
finally:
evt_iter_wait.unlock()
evt_iter_wait.disconnect()
[docs] def readPosition(self, force=False):
return [self.getPosition(force=force)]
[docs] def getMoveableSource(self):
return self.getPoolObj()
[docs] def getSize(self):
return 1
[docs] def getIndex(self, name):
if name.lower() == self.getName().lower():
return 0
return -1
#
# End of Moveable interface
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
def _information(self, tab=' '):
msg = PoolElement._information(self, tab=tab)
try:
position = self.read_attribute("position")
pos = str(position.value)
if position.quality != AttrQuality.ATTR_VALID:
pos += " [" + QUALITY[position.quality] + "]"
except DevFailed, df:
if len(df.args):
pos = df.args[0].desc
else:
e_info = sys.exc_info()[:2]
pos = traceback.format_exception_only(*e_info)
except:
e_info = sys.exc_info()[:2]
pos = traceback.format_exception_only(*e_info)
msg.append(tab + "Position: " + str(pos))
return msg
[docs]class PseudoMotor(PoolElement, Moveable):
""" Class encapsulating PseudoMotor functionality."""
def __init__(self, name, **kw):
"""PoolElement initialization."""
self.call__init__(PoolElement, name, **kw)
self.call__init__(Moveable)
[docs] def getPosition(self, force=False):
return self._getAttrValue('position', force=force)
[docs] def getDialPosition(self, force=False):
return self.getPosition(force=force)
[docs] def getPositionObj(self):
return self._getAttrEG('position')
[docs] def getDialPositionObj(self):
return self.getPositionObj()
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
# Moveable interface
#
def _start(self, *args, **kwargs):
new_pos = args[0]
if operator.isSequenceType(new_pos):
new_pos = new_pos[0]
try:
self.write_attribute('position', new_pos)
except DevFailed, df:
for err in df:
if err.reason == 'API_AttrNotAllowed':
raise RuntimeError('%s is already moving' % self)
else:
raise
self.final_pos = new_pos
[docs] def go(self, *args, **kwargs):
start_time = time.time()
PoolElement.go(self, *args, **kwargs)
ret = self.getStateEG().readValue(), self.readPosition()
self._total_go_time = time.time() - start_time
return ret
startMove = PoolElement.start
waitMove = PoolElement.waitFinish
move = go
getLastMotionTime = PoolElement.getLastGoTime
getTotalLastMotionTime = PoolElement.getTotalLastGoTime
[docs] def readPosition(self, force=False):
return [self.getPosition(force=force)]
[docs] def getMoveableSource(self):
return self.getPoolObj()
[docs] def getSize(self):
return 1
[docs] def getIndex(self, name):
if name.lower() == self.getName().lower():
return 0
return -1
#
# End of Moveable interface
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
def _information(self, tab=' '):
msg = PoolElement._information(self, tab=tab)
try:
position = self.read_attribute("position")
pos = str(position.value)
if position.quality != AttrQuality.ATTR_VALID:
pos += " [" + QUALITY[position.quality] + "]"
except DevFailed, df:
if len(df.args):
pos = df.args[0].desc
else:
e_info = sys.exc_info()[:2]
pos = traceback.format_exception_only(*e_info)
except:
e_info = sys.exc_info()[:2]
pos = traceback.format_exception_only(*e_info)
msg.append(tab + "Position: " + str(pos))
return msg
[docs]class MotorGroup(PoolElement, Moveable):
""" Class encapsulating MotorGroup functionality."""
def __init__(self, name, **kw):
"""PoolElement initialization."""
self.call__init__(PoolElement, name, **kw)
self.call__init__(Moveable)
def _create_str_tuple(self):
return 3 * ["TODO"]
[docs] def getMotorNames(self):
return self.getPoolData()['elements']
[docs] def hasMotor(self, name):
motor_names = map(str.lower, self.getMotorNames())
return name.lower() in motor_names
[docs] def getPosition(self, force=False):
return self._getAttrValue('position', force=force)
[docs] def getPositionObj(self):
return self._getAttrEG('position')
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
# Moveable interface
#
def _start(self, *args, **kwargs):
new_pos = args[0]
try:
self.write_attribute('position', new_pos)
except DevFailed, df:
for err in df:
if err.reason == 'API_AttrNotAllowed':
raise RuntimeError('%s is already moving' % self)
else:
raise
self.final_pos = new_pos
[docs] def go(self, *args, **kwargs):
start_time = time.time()
PoolElement.go(self, *args, **kwargs)
ret = self.getStateEG().readValue(), self.readPosition()
self._total_go_time = time.time() - start_time
return ret
startMove = PoolElement.start
waitMove = PoolElement.waitFinish
move = go
getLastMotionTime = PoolElement.getLastGoTime
getTotalLastMotionTime = PoolElement.getTotalLastGoTime
[docs] def readPosition(self, force=False):
return self.getPosition(force=force)
[docs] def getMoveableSource(self):
return self.getPoolObj()
[docs] def getSize(self):
return len(self.getMotorNames())
[docs] def getIndex(self, name):
try:
motor_names = map(str.lower, self.getMotorNames())
return motor_names.index(name.lower())
except:
return -1
#
# End of Moveable interface
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
def _information(self, tab=' '):
msg = PoolElement._information(self, tab=tab)
try:
position = self.read_attribute("position")
pos = str(position.value)
if position.quality != AttrQuality.ATTR_VALID:
pos += " [" + QUALITY[position.quality] + "]"
except DevFailed, df:
if len(df.args):
pos = df.args[0].desc
else:
e_info = sys.exc_info()[:2]
pos = traceback.format_exception_only(*e_info)
except:
e_info = sys.exc_info()[:2]
pos = traceback.format_exception_only(*e_info)
msg.append(tab + "Position: " + str(pos))
return msg
class BaseChannelInfo(object):
def __init__(self, data):
# dict<str, obj>
# channel data
self.raw_data = data
self.__dict__.update(data)
class TangoChannelInfo(BaseChannelInfo):
def __init__(self, data, info):
BaseChannelInfo.__init__(self, data)
# PyTango.AttributeInfoEx
self.set_info(info)
def has_info(self):
return self.raw_info is not None
def set_info(self, info):
self.raw_info = info
if info is None:
return
data = self.raw_data
if 'data_type' not in data:
data_type = info.data_type
try:
self.data_type = FROM_TANGO_TO_STR_TYPE[data_type]
except KeyError, e:
# For backwards compatibility:
# starting from Taurus 4.3.0 DevVoid was added to the dict
if data_type == PyTango.DevVoid:
self.data_type = None
else:
raise e
if 'shape' not in data:
shape = ()
if info.data_format == AttrDataFormat.SPECTRUM:
shape = (info.max_dim_x,)
elif info.data_format == AttrDataFormat.IMAGE:
shape = (info.max_dim_x, info.max_dim_y)
self.shape = shape
else:
shape = self.shape
self.shape = list(shape)
def __getattr__(self, name):
if self.has_info():
return getattr(self.raw_info, name)
cls_name = self.__class__.__name__
raise AttributeError("'%s' has no attribute '%s'" % (cls_name, name))
def getChannelConfigs(mgconfig, ctrls=None, sort=True):
'''
gets a list of channel configurations of the controllers of the given
measurement group configuration. It optionally filters to those channels
matching given lists of controller.
:param ctrls: (seq<str> or None) a sequence of strings to filter the
controllers. If None given, all controllers will be used
:param sort: (bool) If True (default) the returned list will be sorted
according to channel index (if given in channeldata) and
then by channelname.
:return: (list<tuple>) A list of channelname,channeldata pairs.
'''
chconfigs = []
if not mgconfig:
return []
for ctrl_name, ctrl_data in mgconfig['controllers'].items():
if ctrls is None or ctrl_name in ctrls:
for ch_name, ch_data in ctrl_data['channels'].items():
ch_data.update({'_controller_name': ctrl_name})
chconfigs.append((ch_name, ch_data))
if sort:
# sort the channel configs by index (primary sort) and then by channel
# name.
# sort by channel_name
chconfigs = sorted(chconfigs, key=lambda c: c[0])
# sort by index (give a very large index for those which don't have it)
chconfigs = sorted(chconfigs, key=lambda c: c[1].get('index', 1e16))
return chconfigs
class MGConfiguration(object):
def __init__(self, mg, data):
self._mg = weakref.ref(mg)
if isinstance(data, (str, unicode)):
data = CodecFactory().decode(('json', data), ensure_ascii=True)
self.raw_data = data
self.__dict__.update(data)
# dict<str, dict>
# where key is the channel name and value is the channel data in form
# of a dict as receveid by the MG configuration attribute
self.channels = channels = CaselessDict()
for _, ctrl_data in self.controllers.items():
for channel_name, channel_data in ctrl_data['channels'].items():
channels[channel_name] = channel_data
#####################
# @todo: the for-loops above could be replaced by something like:
# self.channels = channels = CaselessDict(getChannelConfigs(data,
# sort=False))
#####################
# seq<dict> each element is the channel data in form of a dict as
# receveid by the MG configuration attribute. This seq is just a cache
# ordered by channel index in the MG.
self.channel_list = len(channels) * [None]
for channel in channels.values():
self.channel_list[channel['index']] = channel
# dict<str, list[DeviceProxy, CaselessDict<str, dict>]>
# where key is a device name and value is a list with two elements:
# - A device proxy or None if there was an error building it
# - A dict where keys are attribute names and value is a reference to
# a dict representing channel data as received in raw data
self.tango_dev_channels = None
# Number of elements in tango_dev_channels in error (could not build
# DeviceProxy, probably)
self.tango_dev_channels_in_error = 0
# dict<str, tuple<str, str, TangoChannelInfo>>
# where key is a channel name and value is a tuple of three elements:
# - device name
# - attribute name
# - attribute information or None if there was an error trying to get
# the information
self.tango_channels_info = None
# Number of elements in tango_channels_info_in_error in error
# (could not build attribute info, probably)
self.tango_channels_info_in_error = 0
# dict<str, dict>
# where key is a channel name and data is a reference to a dict
# representing channel data as received in raw data
self.non_tango_channels = None
self.initialized = False
def _build(self):
# internal channel structure that groups channels by tango device so
# they can be read as a group minimizing this way the network requests
self.tango_dev_channels = tg_dev_chs = CaselessDict()
self.tango_dev_channels_in_error = 0
self.tango_channels_info = tg_chs_info = CaselessDict()
self.tango_channels_info_in_error = 0
self.non_tango_channels = n_tg_chs = CaselessDict()
self.cache = cache = {}
tg_attr_validator = TangoAttributeNameValidator()
for channel_name, channel_data in self.channels.items():
cache[channel_name] = None
data_source = channel_data['source']
params = tg_attr_validator.getParams(data_source)
if params is None:
# Handle NON tango channel
n_tg_chs[channel_name] = channel_data
else:
# Handle tango channel
dev_name = params['devicename'].lower()
attr_name = params['attributename'].lower()
host, port = params.get('host'), params.get('port')
if host is not None and port is not None:
dev_name = "tango://{0}:{1}/{2}".format(host, port,
dev_name)
dev_data = tg_dev_chs.get(dev_name)
if dev_data is None:
# Build tango device
dev = None
try:
dev = DeviceProxy(dev_name)
except:
self.tango_dev_channels_in_error += 1
tg_dev_chs[dev_name] = dev_data = [dev, CaselessDict()]
dev, attr_data = dev_data
attr_data[attr_name] = channel_data
# get attribute configuration
attr_info = None
if dev is None:
self.tango_channels_info_in_error += 1
else:
try:
tg_attr_info = dev.get_attribute_config_ex(attr_name)[
0]
except:
tg_attr_info = \
self._build_empty_tango_attr_info(channel_data)
self.tango_channels_info_in_error += 1
attr_info = TangoChannelInfo(channel_data, tg_attr_info)
tg_chs_info[channel_name] = dev_name, attr_name, attr_info
def _build_empty_tango_attr_info(self, channel_data):
ret = PyTango.AttributeInfoEx()
ret.name = channel_data['name']
ret.label = channel_data['label']
return ret
def prepare(self):
# first time? build everything
if self.tango_dev_channels is None:
return self._build()
# prepare missing tango devices
if self.tango_dev_channels_in_error > 0:
for dev_name, dev_data in self.tango_dev_channels.items():
if dev_data[0] is None:
try:
dev_data[0] = DeviceProxy(dev_name)
self.tango_dev_channels_in_error -= 1
except:
pass
# prepare missing tango attribute configuration
if self.tango_channels_info_in_error > 0:
for _, attr_data in self.tango_channels_info.items():
dev_name, attr_name, attr_info = attr_data
if attr_info.has_info():
continue
dev = self.tango_dev_channels[dev_name]
if dev is None:
continue
try:
tg_attr_info = dev.get_attribute_config_ex(attr_name)[0]
attr_info.set_info(tg_attr_info)
self.tango_channels_info_in_error -= 1
except:
pass
def getChannels(self):
return self.channel_list
def getChannelInfo(self, channel_name):
try:
return self.tango_channels_info[channel_name]
except:
channel_name = channel_name.lower()
for d_name, a_name, ch_info in self.tango_channels_info.values():
if ch_info.name.lower() == channel_name:
return d_name, a_name, ch_info
def getChannelsInfo(self, only_enabled=False):
"""Returns information about the channels present in the measurement
group in a form of dictionary, where key is a channel name and value is
a tuple of three elements:
- device name
- attribute name
- attribute information or None if there was an error trying to get
the information
:param only_enabled: flag to filter out disabled channels
:type only_enabled: bool
:return: dictionary with channels info
:rtype: dict<str, tuple<str, str, TangoChannelInfo>>
"""
self.prepare()
ret = CaselessDict(self.tango_channels_info)
ret.update(self.non_tango_channels)
for ch_name, (_, _, ch_info) in ret.items():
if only_enabled and not ch_info.enabled:
ret.pop(ch_name)
return ret
def getChannelsInfoList(self, only_enabled=False):
"""Returns information about the channels present in the measurement
group in a form of ordered, based on the channel index, list.
:param only_enabled: flag to filter out disabled channels
:type only_enabled: bool
:return: list with channels info
:rtype: list<TangoChannelInfo>
"""
channels_info = self.getChannelsInfo(only_enabled=only_enabled)
ret = []
for _, (_, _, ch_info) in channels_info.items():
ret.append(ch_info)
ret = sorted(ret, lambda x, y: cmp(x.index, y.index))
return ret
def getCountersInfoList(self):
channels_info = self.getChannelsInfoList()
timer_name, idx = self.timer, -1
for i, ch in enumerate(channels_info):
if ch['full_name'] == timer_name:
idx = i
break
if idx >= 0:
channels_info.pop(idx)
return channels_info
def getTangoDevChannels(self, only_enabled=False):
"""Returns Tango channels (attributes) that could be used to read
measurement group results in a form of dict where key is a device name
and value is a list with two elements:
- A device proxy or None if there was an error building it
- A dict where keys are attribute names and value is a reference to
a dict representing channel data as received in raw data
:param only_enabled: flag to filter out disabled channels
:type only_enabled: bool
:return: dict with Tango channels
:rtype: dict<str, list[DeviceProxy, CaselessDict<str, dict>]>
"""
if not only_enabled:
return self.tango_dev_channels
tango_dev_channels = copy.deepcopy(self.tango_dev_channels)
for _, dev_data in tango_dev_channels.items():
_, attrs = dev_data
for attr_name, channel_data in attrs.items():
if not channel_data["enabled"]:
attrs.pop(attr_name)
return tango_dev_channels
def read(self, parallel=True):
if parallel:
return self._read_parallel()
return self._read()
def _read_parallel(self):
self.prepare()
ret = CaselessDict(self.cache)
dev_replies = {}
# deposit read requests
tango_dev_channels = self.getTangoDevChannels(only_enabled=True)
for _, dev_data in tango_dev_channels.items():
dev, attrs = dev_data
if dev is None:
continue
try:
dev_replies[dev] = dev.read_attributes_asynch(
attrs.keys()), attrs
except:
dev_replies[dev] = None, attrs
# gather all replies
for dev, reply_data in dev_replies.items():
reply, attrs = reply_data
try:
data = dev.read_attributes_reply(reply, 0)
for data_item in data:
channel_data = attrs[data_item.name]
if data_item.has_failed:
value = None
else:
value = data_item.value
ret[channel_data['full_name']] = value
except:
for _, channel_data in attrs.items():
ret[channel_data['full_name']] = None
return ret
def _read(self):
self.prepare()
ret = CaselessDict(self.cache)
tango_dev_channels = self.getTangoDevChannels(only_enabled=True)
for _, dev_data in tango_dev_channels.items():
dev, attrs = dev_data
try:
data = dev.read_attributes(attrs.keys())
for data_item in data:
channel_data = attrs[data_item.name]
if data_item.has_failed:
value = None
else:
value = data_item.value
ret[channel_data['full_name']] = value
except:
for _, channel_data in attrs.items():
ret[channel_data['full_name']] = None
return ret
[docs]class MeasurementGroup(PoolElement):
""" Class encapsulating MeasurementGroup functionality."""
def __init__(self, name, **kw):
"""PoolElement initialization."""
self._configuration = None
self._channels = None
self._last_integ_time = None
self.call__init__(PoolElement, name, **kw)
self.__cfg_attr = self.getAttribute('configuration')
self.__cfg_attr.addListener(self.on_configuration_changed)
self._value_buffer_cb = None
self._codec = CodecFactory().getCodec("json")
[docs] def cleanUp(self):
PoolElement.cleanUp(self)
f = self.factory()
f.removeExistingAttribute(self.__cfg_attr)
def _create_str_tuple(self):
channel_names = ", ".join(self.getChannelNames())
return self.getName(), self.getTimerName(), channel_names
[docs] def getConfigurationAttrEG(self):
return self._getAttrEG('Configuration')
[docs] def setConfiguration(self, configuration):
codec = CodecFactory().getCodec('json')
f, data = codec.encode(('', configuration))
self.write_attribute('configuration', data)
def _setConfiguration(self, data):
self._configuration = MGConfiguration(self, data)
[docs] def getConfiguration(self, force=False):
if force or self._configuration is None:
data = self.getConfigurationAttrEG().readValue(force=True)
self._setConfiguration(data)
return self._configuration
[docs] def on_configuration_changed(self, evt_src, evt_type, evt_value):
if evt_type not in CHANGE_EVT_TYPES:
return
self.info("Configuration changed")
self._setConfiguration(evt_value.value)
[docs] def getTimerName(self):
return self.getTimer()['name']
[docs] def getTimer(self):
cfg = self.getConfiguration()
return cfg.channels[cfg.timer]
[docs] def getTimerValue(self):
return self.getTimerName()
[docs] def getMonitorName(self):
return self.getMonitor()['name']
[docs] def getMonitor(self):
cfg = self.getConfiguration()
return cfg.channels[cfg.monitor]
[docs] def setTimer(self, timer_name):
try:
self.getChannel(timer_name)
except KeyError:
raise Exception("%s does not contain a channel named '%s'"
% (str(self), timer_name))
cfg = self.getConfiguration().raw_data
cfg['timer'] = timer_name
import json
self.write_attribute("configuration", json.dumps(cfg))
[docs] def getChannels(self):
return self.getConfiguration().getChannels()
[docs] def getCounters(self):
cfg = self.getConfiguration()
return [c for c in self.getChannels() if c['full_name'] != cfg.timer]
[docs] def getChannelNames(self):
return [ch['name'] for ch in self.getChannels()]
[docs] def getCounterNames(self):
return [ch['name'] for ch in self.getCounters()]
[docs] def getChannelLabels(self):
return [ch['label'] for ch in self.getChannels()]
[docs] def getCounterLabels(self):
return [ch['label'] for ch in self.getCounters()]
[docs] def getChannel(self, name):
return self.getConfiguration().channels[name]
[docs] def getChannelInfo(self, name):
return self.getConfiguration().getChannelInfo(name)
[docs] def getChannelsInfo(self):
return self.getConfiguration().getChannelsInfoList()
[docs] def getChannelsEnabledInfo(self):
"""Returns information about **only enabled** channels present in the
measurement group in a form of ordered, based on the channel index,
list.
:return: list with channels info
:rtype: list<TangoChannelInfo>
"""
return self.getConfiguration().getChannelsInfoList(only_enabled=True)
[docs] def getCountersInfo(self):
return self.getConfiguration().getCountersInfoList()
[docs] def getValues(self, parallel=True):
return self.getConfiguration().read(parallel=parallel)
[docs] def getValueBuffers(self):
value_buffers = []
for channel_info in self.getChannels():
channel = Device(channel_info["full_name"])
value_buffers.append(channel.getValueBuffer())
return value_buffers
[docs] def getIntegrationTime(self):
return self._getAttrValue('IntegrationTime')
[docs] def getIntegrationTimeObj(self):
return self._getAttrEG('IntegrationTime')
[docs] def setIntegrationTime(self, ctime):
self.getIntegrationTimeObj().write(ctime)
[docs] def putIntegrationTime(self, ctime):
if self._last_integ_time == ctime:
return
self._last_integ_time = ctime
self.getIntegrationTimeObj().write(ctime)
[docs] def getAcquisitionModeObj(self):
return self._getAttrEG('AcquisitionMode')
[docs] def getAcquisitionMode(self):
return self._getAttrValue('AcquisitionMode')
[docs] def setAcquisitionMode(self, acqMode):
self.getAcquisitionModeObj().write(acqMode)
[docs] def getSynchronizationObj(self):
return self._getAttrEG('Synchronization')
[docs] def getSynchronization(self):
return self._getAttrValue('Synchronization')
[docs] def setSynchronization(self, synchronization):
codec = CodecFactory().getCodec('json')
_, data = codec.encode(('', synchronization))
self.getSynchronizationObj().write(data)
self._last_integ_time = None
# NbStarts Methods
[docs] def getNbStartsObj(self):
return self._getAttrEG('NbStarts')
[docs] def setNbStarts(self, starts):
self.getNbStartsObj().write(starts)
[docs] def getNbStarts(self):
return self._getAttrValue('NbStarts')
[docs] def getMoveableObj(self):
return self._getAttrEG('Moveable')
[docs] def getMoveable(self):
return self._getAttrValue('Moveable')
[docs] def getLatencyTimeObj(self):
return self._getAttrEG('LatencyTime')
[docs] def getLatencyTime(self):
return self._getAttrValue('LatencyTime')
[docs] def setMoveable(self, moveable=None):
if moveable is None:
moveable = 'None' # Tango attribute is of type DevString
self.getMoveableObj().write(moveable)
[docs] def valueBufferChanged(self, channel, value_buffer):
"""Receive value buffer updates, pre-process them, and call
the subscribed callback.
:param channel: channel that reports value buffer update
:type channel: ExpChannel
:param value_buffer: json encoded value buffer update, it contains
at least values and indexes
:type value_buffer: :obj:`str`
"""
if value_buffer is None:
return
_, value_buffer = self._codec.decode(('json', value_buffer),
ensure_ascii=True)
values = value_buffer["data"]
if isinstance(values[0], list):
np_values = map(numpy.array, values)
value_buffer["data"] = np_values
self._value_buffer_cb(channel, value_buffer)
[docs] def subscribeValueBuffer(self, cb=None):
"""Subscribe to channels' value buffer update events. If no
callback is passed, the default channel's callback is subscribed which
will store the data in the channel's value_buffer attribute.
:param cb: callback to be subscribed, None means subscribe the default
channel's callback
:type cb: callable
"""
for channel_info in self.getChannels():
full_name = channel_info["full_name"]
channel = Device(full_name)
value_buffer_obj = channel.getValueBufferObj()
if cb is not None:
self._value_buffer_cb = cb
value_buffer_obj.subscribeEvent(self.valueBufferChanged,
channel, False)
else:
value_buffer_obj.subscribeEvent(channel.valueBufferChanged,
with_first_event=False)
[docs] def unsubscribeValueBuffer(self, cb=None):
"""Unsubscribe from channels' value buffer events. If no callback is
passed, unsubscribe the channel's default callback.
:param cb: callback to be unsubscribed, None means unsubscribe the
default channel's callback
:type cb: callable
"""
for channel_info in self.getChannels():
full_name = channel_info["full_name"]
channel = Device(full_name)
value_buffer_obj = channel.getValueBufferObj()
if cb is not None:
value_buffer_obj.unsubscribeEvent(self.valueBufferChanged,
channel)
self._value_buffer_cb = None
else:
value_buffer_obj.unsubscribeEvent(channel.valueBufferChanged)
[docs] def enableChannels(self, channels):
'''Enable acquisition of the indicated channels.
:param channels: (seq<str>) a sequence of strings indicating
channel names
'''
self._enableChannels(channels, True)
[docs] def disableChannels(self, channels):
'''Disable acquisition of the indicated channels.
:param channels: (seq<str>) a sequence of strings indicating
channel names
'''
self._enableChannels(channels, False)
def _enableChannels(self, channels, state):
found = {}
for channel in channels:
found[channel] = False
cfg = self.getConfiguration()
for channel in cfg.getChannels():
name = channel['name']
if name in channels:
channel['enabled'] = state
found[name] = True
wrong_channels = []
for ch, f in found.items():
if f is False:
wrong_channels.append(ch)
if len(wrong_channels) > 0:
msg = 'channels: %s are not present in measurement group' % \
wrong_channels
raise Exception(msg)
self.setConfiguration(cfg.raw_data)
def _start(self, *args, **kwargs):
try:
self.Start()
except DevFailed as e:
# TODO: Workaround for CORBA timeout on measurement group start
# remove it whenever sardana-org/sardana#93 gets implemented
if e[-1].reason == "API_DeviceTimedOut":
self.error("start timed out, trying to stop")
self.stop()
self.debug("stopped")
raise e
[docs] def prepare(self):
self.command_inout("Prepare")
[docs] def count_raw(self, start_time=None):
PoolElement.go(self)
if start_time is None:
start_time = time.time()
state = self.getStateEG().readValue()
if state == Fault:
msg = "Measurement group ended acquisition with Fault state"
raise Exception(msg)
values = self.getValues()
ret = state, values
self._total_go_time = time.time() - start_time
return ret
[docs] def go(self, *args, **kwargs):
start_time = time.time()
cfg = self.getConfiguration()
cfg.prepare()
integration_time = args[0]
if integration_time is None or integration_time == 0:
return self.getStateEG().readValue(), self.getValues()
self.putIntegrationTime(integration_time)
self.setMoveable(None)
self.setNbStarts(1)
self.prepare()
return self.count_raw(start_time)
[docs] def count_continuous(self, synchronization, value_buffer_cb=None):
"""Execute measurement process according to the given synchronization
description.
:param synchronization: synchronization description
:type synchronization: list of groups with equidistant synchronizations
:param value_buffer_cb: callback on value buffer updates
:type value_buffer_cb: callable
:return: state and eventually value buffers if no callback was passed
:rtype: tuple<list<DevState>,<list>>
.. todo:: Think of unifying measure with count.
.. note:: The measure method has been included in MeasurementGroup
class on a provisional basis. Backwards incompatible changes
(up to and including removal of the method) may occur if
deemed necessary by the core developers.
"""
start_time = time.time()
cfg = self.getConfiguration()
cfg.prepare()
self.setSynchronization(synchronization)
self.subscribeValueBuffer(value_buffer_cb)
self.count_raw(start_time)
self.unsubscribeValueBuffer(value_buffer_cb)
state = self.getStateEG().readValue()
if state == Fault:
msg = "Measurement group ended acquisition with Fault state"
raise Exception(msg)
if value_buffer_cb is None:
value_buffers = self.getValueBuffers()
else:
value_buffers = None
ret = state, value_buffers
self._total_go_time = time.time() - start_time
return ret
startCount = PoolElement.start
waitCount = PoolElement.waitFinish
count = go
stopCount = PoolElement.abort
stop = PoolElement.stop
[docs]class IORegister(PoolElement):
""" Class encapsulating IORegister functionality."""
def __init__(self, name, **kw):
"""IORegister initialization."""
self.call__init__(PoolElement, name, **kw)
[docs] def getValueObj(self):
return self._getAttrEG('value')
[docs] def readValue(self, force=False):
return self._getAttrValue('value', force=force)
[docs] def startWriteValue(self, new_value, timeout=None):
try:
self.getValueObj().write(new_value)
self.final_val = new_value
except DevFailed as err_traceback:
for err in err_traceback:
if err.reason == 'API_AttrNotAllowed':
raise RuntimeError('%s is already chaging' % self)
else:
raise
[docs] def waitWriteValue(self, timeout=None):
pass
[docs] def writeValue(self, new_value, timeout=None):
self.startWriteValue(new_value, timeout=timeout)
self.waitWriteValue(timeout=timeout)
return self.getStateEG().readValue(), self.readValue()
writeIORegister = writeIOR = writeValue
readIORegister = readIOR = getValue = readValue
[docs]class Instrument(BaseElement):
def __init__(self, **kw):
self.__dict__.update(kw)
[docs] def getFullName(self):
return self.full_name
[docs] def getParentInstrument(self):
return self.getPoolObj().getObj(self.parent_instrument)
[docs] def getParentInstrumentName(self):
return self.parent_instrument
[docs] def getChildrenInstruments(self):
raise NotImplementedError
return self._children
[docs] def getElements(self):
raise NotImplementedError
return self._elements
[docs] def getType(self):
return self.klass
[docs]class Pool(TangoDevice, MoveableSource):
""" Class encapsulating device Pool functionality."""
def __init__(self, name, **kw):
self.call__init__(TangoDevice, name, **kw)
self.call__init__(MoveableSource)
self._elements = BaseSardanaElementContainer()
self.__elements_attr = self.getAttribute("Elements")
self.__elements_attr.addListener(self.on_elements_changed)
[docs] def getObject(self, element_info):
elem_type = element_info.getType()
data = element_info._data
if elem_type in ('ControllerClass', 'ControllerLibrary', 'Instrument'):
klass = globals()[elem_type]
kwargs = dict(data)
kwargs['_pool_data'] = data
kwargs['_pool_obj'] = self
return klass(**kwargs)
obj = Factory().getDevice(element_info.full_name, _pool_obj=self,
_pool_data=data)
return obj
[docs] def on_elements_changed(self, evt_src, evt_type, evt_value):
if evt_type == TaurusEventType.Error:
msg = evt_value
if isinstance(msg, DevFailed):
d = msg[0]
# skip configuration errors
if d.reason == "API_BadConfigurationProperty":
return
if d.reason in ("API_DeviceNotExported",
"API_CantConnectToDevice"):
msg = "Pool was shutdown or is inaccessible"
else:
msg = "{0}: {1}".format(d.reason, d.desc)
self.warning("Received elements error event %s", msg)
self.debug(evt_value)
return
elif evt_type not in CHANGE_EVT_TYPES:
return
try:
elems = CodecFactory().decode(evt_value.value, ensure_ascii=True)
except:
self.error("Could not decode element info")
self.info("value: '%s'", evt_value.value)
self.debug("Details:", exc_info=1)
return
elements = self.getElementsInfo()
for element_data in elems.get('new', ()):
element_data['manager'] = self
element = BaseSardanaElement(**element_data)
elements.addElement(element)
for element_data in elems.get('del', ()):
element = self.getElementInfo(element_data['full_name'])
try:
elements.removeElement(element)
except:
self.warning("Failed to remove %s", element_data)
for element_data in elems.get('change', ()):
# TODO: element is assigned but not used!! (check)
element = self._removeElement(element_data)
element = self._addElement(element_data)
return elems
def _addElement(self, element_data):
element_data['manager'] = self
element = BaseSardanaElement(**element_data)
self.getElementsInfo().addElement(element)
return element
def _removeElement(self, element_data):
name = element_data['full_name']
element = self.getElementInfo(name)
self.getElementsInfo().removeElement(element)
return element
[docs] def getElementsInfo(self):
return self._elements
[docs] def getElements(self):
return self.getElementsInfo().getElements()
[docs] def getElementInfo(self, name):
return self.getElementsInfo().getElement(name)
[docs] def getElementNamesOfType(self, elem_type):
return self.getElementsInfo().getElementNamesOfType(elem_type)
[docs] def getElementsOfType(self, elem_type):
return self.getElementsInfo().getElementsOfType(elem_type)
[docs] def getElementsWithInterface(self, interface):
return self.getElementsInfo().getElementsWithInterface(interface)
[docs] def getElementWithInterface(self, elem_name, interface):
return self.getElementsInfo().getElementWithInterface(elem_name,
interface)
[docs] def getObj(self, name, elem_type=None):
if elem_type is None:
return self.getElementInfo(name)
elif isinstance(elem_type, (str, unicode)):
elem_types = elem_type,
else:
elem_types = elem_type
name = name.lower()
for e_type in elem_types:
elems = self.getElementsOfType(e_type)
for elem in elems.values():
if elem.name.lower() == name:
return elem
elem = elems.get(name)
if elem is not None:
return elem
def __repr__(self):
return self.getNormalName()
def __str__(self):
return repr(self)
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
# MoveableSource interface
#
[docs] def getMoveable(self, names):
"""getMoveable(seq<string> names) -> Moveable
Returns a moveable object that handles all the moveable items given in
names."""
# if simple motor just return it (if the pool has it)
if isinstance(names, (str, unicode)):
names = names,
if len(names) == 1:
name = names[0]
return self.getObj(name, elem_type=MOVEABLE_TYPES)
# find a motor group that contains elements
moveable = self.__findMotorGroupWithElems(names)
# if none exists create one
if moveable is None:
mgs = self.getElementsOfType('MotorGroup')
i = 1
pid = os.getpid()
while True:
name = "_mg_ms_{0}_{1}".format(pid, i)
exists = False
for mg in mgs.values():
if mg.name == name:
exists = True
break
if not exists:
break
i += 1
moveable = self.createMotorGroup(name, names)
return moveable
def __findMotorGroupWithElems(self, names):
names_lower = map(str.lower, names)
len_names = len(names)
mgs = self.getElementsOfType('MotorGroup')
for mg in mgs.values():
mg_elems = mg.elements
if len(mg_elems) != len_names:
continue
for mg_elem, name in zip(mg_elems, names_lower):
if mg_elem.lower() != name:
break
else:
return mg
#
# End of MoveableSource interface
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
def _wait_for_element_in_container(self, container, elem_name, timeout=0.5,
contains=True):
start = time.time()
cond = True
nap = 0.01
if timeout:
nap = timeout / 10.
while cond:
elem = container.getElement(elem_name)
if contains:
if elem is not None:
return elem
else:
if elem is None:
return True
if timeout:
dt = time.time() - start
if dt > timeout:
self.info("Timed out waiting for '%s' in container",
elem_name)
return
time.sleep(nap)
[docs] def createMotorGroup(self, mg_name, elements):
params = [mg_name, ] + map(str, elements)
self.debug('trying to create motor group for elements: %s', params)
self.command_inout('CreateMotorGroup', params)
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, mg_name)
[docs] def createMeasurementGroup(self, mg_name, elements):
params = [mg_name, ] + map(str, elements)
self.debug('trying to create measurement group: %s', params)
self.command_inout('CreateMeasurementGroup', params)
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, mg_name)
[docs] def deleteMeasurementGroup(self, name):
return self.deleteElement(name)
[docs] def createElement(self, name, ctrl, axis=None):
ctrl_type = ctrl.types[0]
if axis is None:
last_axis = ctrl.getLastUsedAxis()
if last_axis is None:
axis = str(1)
else:
axis = str(last_axis + 1)
else:
axis = str(axis)
cmd = "CreateElement"
pars = ctrl_type, ctrl.name, axis, name
self.command_inout(cmd, pars)
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, name)
[docs] def renameElement(self, old_name, new_name):
self.debug('trying to rename element: %s to: %s', old_name, new_name)
self.command_inout('RenameElement', [old_name, new_name])
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, new_name,
contains=True)
[docs] def deleteElement(self, name):
self.debug('trying to delete element: %s', name)
self.command_inout('DeleteElement', name)
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, name,
contains=False)
[docs] def createController(self, class_name, name, *props):
ctrl_class = self.getObj(class_name, elem_type='ControllerClass')
if ctrl_class is None:
raise Exception("Controller class %s not found" % class_name)
cmd = "CreateController"
pars = [ctrl_class.types[0], ctrl_class.file_name, class_name, name]
pars.extend(map(str, props))
self.command_inout(cmd, pars)
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, name)
[docs] def deleteController(self, name):
return self.deleteElement(name)
def registerExtensions():
factory = Factory()
factory.registerDeviceClass("Pool", Pool)
hw_type_names = [
'Controller',
'ComChannel', 'Motor', 'PseudoMotor', 'TriggerGate',
'CTExpChannel', 'ZeroDExpChannel', 'OneDExpChannel', 'TwoDExpChannel',
'PseudoCounter', 'IORegister', 'MotorGroup', 'MeasurementGroup']
hw_type_map = [(name, globals()[name]) for name in hw_type_names]
for klass_name, klass in hw_type_map:
factory.registerDeviceClass(klass_name, klass)
def unregisterExtensions():
factory = Factory()
factory.unregisterDeviceClass("Pool")
hw_type_names = [
'Controller',
'ComChannel', 'Motor', 'PseudoMotor', 'TriggerGate',
'CTExpChannel', 'ZeroDExpChannel', 'OneDExpChannel', 'TwoDExpChannel',
'PseudoCounter', 'IORegister', 'MotorGroup', 'MeasurementGroup']
for klass_name in hw_type_names:
factory.unregisterDeviceClass(klass_name)