Source code for lavuelib.imageNexusExporter

# Copyright (C) 2017  DESY, Christoph Rosemann, Notkestr. 85, D-22607 Hamburg
#
# lavue is an image viewing program for photon science imaging detectors.
# Its usual application is as a live viewer using hidra as data source.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation in  version 2
# of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor,
# Boston, MA  02110-1301, USA.
#
# Authors:
#     Jan Kotanski <jan.kotanski@desy.de>
#

""" image Nexus exporter """

import numpy as np
import os
import logging
from pyqtgraph import QtGui
from pyqtgraph.exporters import Exporter
from pyqtgraph.parametertree import Parameter

from . import filewriter

#: (:obj:`dict` <:obj:`str`, :obj:`module`> ) nexus writer modules
WRITERS = {}
try:
    from . import h5pywriter
    WRITERS["h5py"] = h5pywriter
except Exception:
    pass
try:
    from . import h5cppwriter
    WRITERS["h5cpp"] = h5cppwriter
except Exception:
    pass

__all__ = ['ImageNexusExporter']

logger = logging.getLogger("lavue")


def getcompression(compression):
    """ converts compression string to a deflate level parameter
        or list with [filterid, opt1, opt2, ...]

    :param compression: compression string
    :type compression: :obj:`str`
    :returns: deflate level parameter
              or list with [filterid, opt1, opt2, ...]
    :rtype: :obj:`int` or :obj:`list` < :obj:`int` > or `None`

    """
    if compression:
        if isinstance(compression, int) or ":" not in compression:
            level = None
            try:
                level = int(compression)
            except Exception:
                raise Exception(
                    "Error: argument compression: "
                    "invalid int value: '%s'\n" % compression)
            return level
        else:
            opts = None
            try:
                sfid, sopts = compression.split(":")
                opts = [int(sfid)]
                opts.extend([int(opt) for opt in sopts.split(",")])
            except Exception:
                raise Exception(
                    "Error: argument compression: "
                    "invalid format: '%s'\n" % compression)
            return opts
        return


[docs]class ImageNexusExporter(Exporter): """ NeXus Raw Image Exporter """ Name = "NeXus Raw Image" windows = [] allowCopy = False def __init__(self, item): """ constructor :param item: image item :param item: :class: `pyqtgraph.PlotItem` or `pyqtgraph.GraphicsScene` """ Exporter.__init__(self, item) #: (:class:`pyqtgraph.parametertree.Parameter`) exporter parameters self.params = Parameter(name='params', type='group', children=[ {'name': 'FieldName', 'type': 'str', 'value': 'data'}, {'name': 'Compression', 'type': 'str', 'value': '2'}, {'name': 'FileName', 'type': 'str', 'value': ''}, ])
[docs] def parameters(self): """ parameters :returns: exporter parameters :rtype: :class:`pyqtgraph.parametertree.Parameter` """ return self.params
[docs] def export(self, fileName=None): """ export data image to NeXus file :param fileName: output file name :rtype fileName: :obj:`str` """ if self.params['FileName']: filename = self.params['FileName'] elif fileName: filename = str(fileName) self.params['FileName'] = str(fileName) else: filename = None if "h5cpp" in WRITERS.keys(): writer = "h5cpp" else: writer = "h5py" if writer not in WRITERS.keys(): raise Exception("Writer '%s' cannot be opened" % writer) wrmodule = WRITERS[writer.lower()] if isinstance(self.item, QtGui.QGraphicsItem): scene = self.item.scene() else: scene = self.item if not hasattr(scene, "rawdata"): raise Exception( "Scene object without rawdata") rawdata = np.transpose(np.array(scene.rawdata)) if rawdata is None or not isinstance(rawdata, np.ndarray): raise Exception("Empty image") if filename is None: self.fileSaveDialog( filter=["*.nxs", "*.ndf", "*.n5", "*.nx", "*.h5", "*.hdf", "*.hd5"]) return fieldname = self.params['FieldName'] try: if os.path.exists(filename): fl = filewriter.open_file( str(filename), readonly=False, writer=wrmodule) else: fl = filewriter.create_file( str(filename), overwrite=False, writer=wrmodule) except Exception as e: logger.warning(str(e)) # print(str(e)) raise Exception( "File '%s' cannot be created \n" % (filename)) root = fl.root() if "data" in root.names(): node = root.open("data") else: node = root.create_group("data", "NXdata") if fieldname in node.names(): field = node.open(fieldname) else: fieldcompression = self.params['Compression'] if fieldcompression: opts = getcompression(fieldcompression) if isinstance(opts, int): cfilter = filewriter.data_filter(node) cfilter.rate = opts elif isinstance(opts, list) and opts: cfilter = filewriter.data_filter(node) cfilter.filterid = opts[0] cfilter.options = tuple(opts[1:]) shape = list(rawdata.shape) dtype = str(rawdata.dtype) fdshape = [0] + shape fdchunk = [1] + shape field = node.create_field( fieldname, dtype, shape=fdshape, chunk=fdchunk, dfilter=cfilter) field.grow(0, 1) field[-1, ...] = rawdata field.close() node.close() root.close() fl.close()
if WRITERS: ImageNexusExporter.register()