Source code for lavuelib.imageFileHandler

# Copyright (C) 2017  DESY, Christoph Rosemann, Notkestr. 85, D-22607 Hamburg
#
# lavue is an image viewing program for photon science imaging detectors.
# Its usual application is as a live viewer using hidra as data source.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation in  version 2
# of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor,
# Boston, MA  02110-1301, USA.
#
# Authors:
#     Christoph Rosemann <christoph.rosemann@desy.de>
#     Andre Rothkirch <andre.rothkirch@desy.de>
#     Jan Kotanski <jan.kotanski@desy.de>
#

""" this a simple file handler that loads image files
    and delivers just the actual array """

import struct
import numpy as np
import sys
import json
import logging

from . import filewriter

if sys.version_info > (3,):
    long = int

try:
    import fabio
    #: (:obj:`bool`) fabio can be imported
    FABIO = True
except ImportError:
    FABIO = False

try:
    import PIL
    import PIL.Image
    #: (:obj:`bool`) PIL can be imported
    PILLOW = True
except ImportError:
    PILLOW = False


#: (:obj:`dict` <:obj:`str`, :obj:`module`> ) nexus writer modules
WRITERS = {}
try:
    from . import h5pywriter
    WRITERS["h5py"] = h5pywriter
except Exception:
    pass
try:
    from . import h5cppwriter
    WRITERS["h5cpp"] = h5cppwriter
except Exception:
    pass


logger = logging.getLogger("lavue")


[docs]class NexusFieldHandler(object): """Nexus file handler class. Reads image from file and returns the numpy array.""" def __init__(self, fname=None, writer=None): """ constructor :param fname: file name :type fname: :obj:`str` :param writer: h5 writer module: "h5cpp" or "h5py" :type writer: :obj:`str` """ #: (:obj:`any`) module image object self.__image = None #: (:obj:`numpy.ndarray`) image data self.__data = None #: (:obj:`str`) file name self.__fname = fname #: (:obj:`dict` <:obj:`str`, :obj:`dict` <:obj:`str`, :obj:`any`>>) #: image field dictionary self.__fields = {} # (:class:`lavuelib.filewriter.root`) nexus file root self.__root = None # (:class:`lavuelib.filewriter.FTFile') nexus file object self.__fl = None #: (:obj:`str`) json dictionary with metadata or empty string self.__metadata = "" if not writer: if "h5cpp" in WRITERS.keys(): writer = "h5cpp" else: writer = "h5py" if writer not in WRITERS.keys(): raise Exception("Writer '%s' cannot be opened" % writer) wrmodule = WRITERS[writer.lower()] if fname: try: self.__fl = filewriter.open_file( fname, writer=wrmodule, readonly=True, libver='latest', swmr=(True if writer in ["h5py", "h5cpp"] else False) ) except Exception: try: self.__fl = filewriter.open_file( fname, writer=wrmodule, readonly=True) except Exception: raise Exception("File '%s' cannot be opened \n" % (fname)) # except Exception as e: # raise Exception("File '%s' cannot be opened %s\n" # % (fname, str(e))) self.__root = self.__fl.root()
[docs] def frombuffer(self, membuffer, fname=None, writer=None): """ constructor :param membuffer: memory buffer :type membuffer: :obj:`bytes` or :obj:`io.BytesIO` :param fname: file name :type fname: :obj:`str` :param writer: h5 writer module: "h5py" or "h5py" :type writer: :obj:`str` """ if fname is not None: self.__fname = fname if not writer: if "h5cpp" in WRITERS.keys() and \ WRITERS["h5cpp"].is_image_file_supported(): writer = "h5cpp" elif "h5py" in WRITERS.keys() and \ WRITERS["h5py"].is_image_file_supported(): writer = "h5py" elif "h5cpp" in WRITERS.keys(): writer = "h5cpp" else: writer = "h5py" if writer not in WRITERS.keys(): raise Exception("Writer '%s' cannot be opened" % writer) wrmodule = WRITERS[writer.lower()] try: self.__fl = filewriter.load_file( membuffer, fname=self.__fname, writer=wrmodule, readonly=True, libver='latest', swmr=(True if writer in ["h5py", "h5cpp"] else False) ) except Exception: try: self.__fl = filewriter.load_file( membuffer, fname, writer=wrmodule, readonly=True) except Exception: raise Exception( "File '%s' cannot be loaded \n" % (self.__fname)) self.__root = self.__fl.root()
[docs] def findImageFields(self): """ provides a dictionary with of all image fields :returns: dictionary of the field names and the field objects :rtype: :obj:`dict` <:obj:`str`, :obj:`dict` <:obj:`str`, :obj:`any`>> """ #: (:obj:`dict` <:obj:`str`, :obj:`dict` <:obj:`str`, :obj:`any`>>) #: image field dictionary self.__fields = {} self.__parseimages(self.__root) return self.__fields
@classmethod def __getpath(cls, path): """ converts full_path with NX_classes into nexus_path :param path: nexus full_path :type path: :obj:`str` """ spath = path.split("/") return "/".join( [(dr if ":" not in dr else dr.split(":")[0]) for dr in spath]) def __addimage(self, node, tgpath): """adds the node into the description list :param node: nexus node :type node: :class:`lavuelib.filewriter.FTField` or \ :class:`lavuelib.filewriter.FTGroup` :param path: path of the link target or `None` :type path: :obj:`str` """ desc = {} path = filewriter.first(node.path) desc["full_path"] = str(path) desc["nexus_path"] = str(self.__getpath(path)) if hasattr(node, "shape"): desc["shape"] = list(node.shape or []) else: return if len(desc["shape"]) < 2: return if hasattr(node, "dtype"): desc["dtype"] = str(node.dtype) else: return if node.is_valid: desc["node"] = node else: return self.__fields[desc["nexus_path"]] = desc def __parseimages(self, node, tgpath=None): """parses the node and add it into the description list :param node: nexus node :type node: :class:`lavuelib.filewriter.FTField` or \ :class:`lavuelib.filewriter.FTGroup` or :param path: path of the link target or `None` :type path: :obj:`str` """ self.__addimage(node, tgpath) names = [] if isinstance(node, filewriter.FTGroup): names = [ (ch.name, str(ch.target_path) if hasattr(ch, "target_path") else None) for ch in filewriter.get_links(node)] for nm in names: try: ch = node.open(nm[0]) self.__parseimages(ch, nm[1]) # except Exception: # pass finally: pass
[docs] def getNode(self, field=None): """ get node :param field: field path :type field: :obj:`str` :returns: nexus field node :rtype: :class:`lavuelib.filewriter.FTField` """ node = None if field is not None: sfield = str(field).split("/") node = self.__root for name in sfield: if name: node = node.open(name) else: node = self.__fl.default_field() return node
[docs] @classmethod def getFrameCount(cls, node, growing=0, refresh=True): """ provides the last frame number :param node: nexus field node :type node: :class:`lavuelib.filewriter.FTField` :param growing: growing dimension :type growing: :obj:`int` :param refresh: refresh image node :type refresh: :obj:`bool` :returns: a number of frames :rtype: :obj:`int` """ if refresh: node.refresh() if node: shape = node.shape if shape: if len(shape) > growing and growing > -1: return shape[growing] return 0
[docs] @classmethod def getMetaData(cls, node, mdata=None, maxrec=4): """ provides the image metadata :returns: JSON dictionary with image metadata :rtype: :obj:`str` """ metadata = mdata if mdata is not None else {} pgroup = node.parent mnames = ['distance', 'wavelength', 'x_pixel_size', 'y_pixel_size', 'beam_center_x', 'beam_center_y'] names = pgroup.names() for nm in mnames: if nm in names and nm not in metadata.keys(): fld = pgroup.open(nm) value = cls.extract(fld.read()) if isinstance(value, np.ndarray): continue try: atts = [at.name for at in fld.attributes] if "units" in atts: units = cls.extract(fld.attributes["units"].read()) if isinstance(units, np.ndarray): continue if units: metadata[nm] = (value, units) else: metadata[nm] = value else: metadata[nm] = value except Exception as e: # print(str(e)) logger.warning(str(e)) metadata[nm] = value lfld = pgroup.open_link(node.name) slpath = str(lfld.target_path).rsplit(":/", 1) if len(slpath) > 1: file_path = slpath[0] lpath = slpath[1] else: file_path = None lpath = slpath[0] opath = "/".join([gr.split(":")[0] for gr in node.path.split("/") if gr != '.']) lpath = lpath.replace("//", "/") lfile = lfld.getfilename(lfld) if lfile is not None and \ maxrec and lfile == file_path and str(lpath) != str(opath): try: root = None par = pgroup child = node while not root: if isinstance(par, filewriter.FTFile): root = child else: child = par par = par.parent grps = str(lpath).split("/") lnode = root for gr in grps: if not gr: continue lnode = lnode.open(gr) cls.getMetaData(lnode, metadata, maxrec - 1) except Exception as e: # print(str(e)) logger.warning(str(e)) return json.dumps(metadata)
[docs] @classmethod def extract(cls, value): if isinstance(value, np.ndarray): if len(value.shape) == 1 and value.shape[0] == 1: value = value[0] return value
[docs] @classmethod def getImage(cls, node, frame=-1, growing=0, refresh=True): """parses the field and add it into the description list :param node: nexus field node :type node: :class:`lavuelib.filewriter.FTField` :param frame: frame to take, the last one is -1 :type frame: :obj:`int` :param growing: growing dimension :type growing: :obj:`int` :param refresh: refresh image node :type refresh: :obj:`bool` :returns: get the image :rtype: :class:`numpy.ndarray` """ if refresh: node.refresh() if node: shape = node.shape if shape: if frame is None: return node.read() if len(shape) == 2: return node[...] elif len(shape) == 3: if growing == 0: if frame < 0 or shape[0] > frame: return node[frame, :, :] elif growing == 1: if frame < 0 or shape[1] > frame: return node[:, frame, :] else: if frame < 0 or shape[2] > frame: return node[:, :, frame] elif len(shape) == 4: if growing == 0: if frame < 0 or shape[0] > frame: return node[frame, :, :, :] elif growing == 1: if frame < 0 or shape[1] > frame: return node[:, frame, :, :] elif growing == 2: if frame < 0 or shape[2] > frame: return node[:, :, frame, :] else: if frame < 0 or shape[3] > frame: return node[:, :, :, frame]
[docs]class ImageFileHandler(object): """Simple file handler class. Reads image from file and returns the numpy array.""" def __init__(self, fname): """ constructor :param fname: file name :type fname: :obj:`str` """ #: (:obj:`any`) module image object self.__image = None #: (:obj:`numpy.ndarray`) image data self.__data = None #: (:obj:`str`) json dictionary with metadata or empty string self.__metadata = "" try: if FABIO: self.__image = fabio.open(fname) self.__data = self.__image.data if self.__data is None: raise Exception("no data") if "_array_data.header_convention" \ in self.__image.header.keys(): rmeta = self.__image.header["_array_data.header_contents"] self.__metadata = CBFLoader().metadata(rmeta) elif PILLOW: self.__image = PIL.Image.open(fname) self.__data = np.array(self.__image) except Exception as e: logger.debug(str(e)) try: if FABIO and PILLOW: self.__image = PIL.Image.open(fname) self.__data = np.array(self.__image) except Exception as e: logger.debug(str(e)) try: self.__image = np.fromfile(str(fname), dtype='uint8') if fname.endswith(".cbf"): self.__data = CBFLoader().load(self.__image) self.__metadata = CBFLoader().metadata(self.__image) else: self.__data = TIFLoader().load(self.__image) except Exception as e: # print(str(e)) logger.warning(str(e))
[docs] def getImage(self): """ provides the image data :returns: image data :rtype: :class:`numpy.ndarray` """ return self.__data
[docs] def getMetaData(self): """ provides the image metadata :returns: JSON dictionary with image metadata :rtype: :obj:`str` """ return self.__metadata
[docs]class CBFLoader(object): """ CBF loader """
[docs] @classmethod def metadata(cls, flbuffer, premeta=None): """ extract header_contents from CBF file image data :param flbuffer: numpy array with CBF file image data :type flbuffer: :class:`numpy.ndarray` or :obj:`str` :param premeta: metadata to be merged :type premeta: :obj:`dict` <:obj:`str`, :obj:`any`> :returns: metadata dictionary :rtype: :class:`numpy.ndarray` """ headerstart = b'_array_data.header_convention' headerend = b'_array_data.data' mdata = {} try: if hasattr(flbuffer, "tostring"): try: hspos = flbuffer.tostring().index(headerstart) // \ flbuffer.itemsize except Exception: hspos = 0 hepos = flbuffer.tostring().index(headerend) // \ flbuffer.itemsize flbuffer = flbuffer[hspos:hepos + 1].tostring() except Exception as e: # print(str(e)) logger.warning(str(e)) try: header = str(flbuffer, "utf8").strip() except Exception: header = str(flbuffer).strip() sheader = [hd[2:] for hd in str(header).split("\r\n") if hd.startswith("# ")] if sheader and sheader[0].startswith("Detector: "): mdata["detector"] = " ".join(sheader[0].split(" ", 1)[1:]) if sheader and len(sheader) > 1: mdata["timestamp"] = sheader[1] for hd in sheader[2:]: try: if hd.startswith("Silicon sensor, thickness "): dt = hd.split(" ")[3:] if dt and len(dt) == 2: dt = (float(dt[0]), dt[1]) mdata["silicon_sensor_thickness"] = dt else: dt = hd.split(" ") name = dt[0].strip().lower() if name.endswith(":"): name = name[:-1] if dt: dt = dt[1:] if dt and dt[0] == '=': dt = dt[1:] if dt and len(dt) == 1: try: dt = [float(dt[0])][0] except ValueError: dt = dt[0] elif dt and len(dt) == 2: try: dt[0] = float(dt[0]) try: dt[1] = float(dt[1]) except ValueError: dt = tuple(dt) except ValueError: pass elif dt and len(dt) == 3: try: dt[0] = float(dt[0]) try: dt[1] = float(dt[1]) try: dt[2] = float(dt[2]) except ValueError: dt = [(dt[0], dt[2]), (dt[1], dt[2])] except ValueError: if dt[1] == 'x': try: dt = [dt[0], float(dt[2])] except ValueError: pass except ValueError: try: dt = [(float(dt[0].strip("(), ")), dt[2]), (float(dt[1].strip("(), ")), dt[2])] except Exception: pass elif dt and len(dt) == 5: if dt[2] == 'x': try: dt = [(float(dt[0]), dt[1]), (float(dt[3]), dt[4])] except ValueError: pass elif dt[3] == '=': try: dt = [dt[0], dt[1], {dt[2].strip("(), "): float(dt[4].strip("(), "))}] except ValueError: pass mdata[name] = dt except Exception as e: # print(str(e)) logger.warning(str(e)) if mdata: if premeta: mdata.update(premeta) return json.dumps(mdata) else: return json.dumps(premeta) if premeta else ""
[docs] @classmethod def load(cls, flbuffer): """ loads CBF file image data into numpy array :param flbuffer: numpy array with CBF file image data :type flbuffer: :class:`numpy.ndarray` :returns: image data :rtype: :class:`numpy.ndarray` """ image = np.array([0]) inpoint = np.array([26, 4, 213], dtype='uint8') # array with '--CIF-BINARY-FORMAT-SECTION---' outpoint = np.array( [45, 45, 67, 73, 70, 45, 66, 73, 78, 65, 82, 89, 45, 70, 79, 82, 77, 65, 84, 45, 83, 69, 67, 84, 73, 79, 78, 45, 45, 45], dtype='uint8') flag = 0 # check if byte offset compress ('x-CBF_BYTE_OFFSET') boc = np.array( [120, 45, 67, 66, 70, 95, 66, 89, 84, 69, 95, 79, 70, 70, 83, 69, 84], dtype='uint8') try: # iscbf flbuffer.tostring().index(boc.tostring()) // flbuffer.itemsize except Exception: flag = 1 # additional parms for cross check if decompress worked out # ('X-Binary-Number-of-Elements:') dset_num_ele = np.array( [88, 45, 66, 105, 110, 97, 114, 121, 45, 78, 117, 109, 98, 101, 114, 45, 111, 102, 45, 69, 108, 101, 109, 101, 110, 116, 115, 58], dtype='uint8') # array with 'X-Binary-Size-Fastest-Dimension:' dset_fast_dim = np.array( [88, 45, 66, 105, 110, 97, 114, 121, 45, 83, 105, 122, 101, 45, 70, 97, 115, 116, 101, 115, 116, 45, 68, 105, 109, 101, 110, 115, 105, 111, 110, 58], dtype='uint8') # array with 'X-Binary-Size-Second-Dimension:' dset_sec_dim = np.array( [88, 45, 66, 105, 110, 97, 114, 121, 45, 83, 105, 122, 101, 45, 83, 101, 99, 111, 110, 100, 45, 68, 105, 109, 101, 110, 115, 105, 111, 110, 58], dtype='uint8') # array with 'X-Binary-Size-Padding:' dset_pad = np.array( [88, 45, 66, 105, 110, 97, 114, 121, 45, 83, 105, 122, 101, 45, 80, 97, 100, 100, 105, 110, 103, 58], dtype='uint8') # search for data stream start if flag == 0: try: idstart = flbuffer.tostring().index( inpoint.tostring()) // flbuffer.itemsize idstart += inpoint.size except Exception: flag = 1 try: idstop = flbuffer.tostring().index( outpoint.tostring()) // flbuffer.itemsize idstop -= 3 # cr / extra -1 due to '10B' -- linefeed except Exception: flag = 1 vals = np.zeros(4, dtype='int') spos = np.zeros(5, dtype='int') spos[4] = idstart try: spos[0] = flbuffer.tostring().index( dset_num_ele.tostring()) // flbuffer.itemsize spos[1] = flbuffer.tostring().index( dset_fast_dim.tostring()) // flbuffer.itemsize spos[2] = flbuffer.tostring().index( dset_sec_dim.tostring()) // flbuffer.itemsize spos[3] = flbuffer.tostring().index( dset_pad.tostring()) // flbuffer.itemsize # by A.R., Apr 24, 2017 vals[0] = int( flbuffer[ spos[0] + dset_num_ele.size:spos[1] - 2].tostring()) vals[1] = int( flbuffer[ spos[1] + dset_fast_dim.size:spos[2] - 2].tostring()) vals[2] = int( flbuffer[ spos[2] + dset_sec_dim.size:spos[3] - 2].tostring()) vals[3] = int( flbuffer[ spos[3] + dset_pad.size:spos[4] - 8].tostring()) except Exception: flag = 1 if flag == 0: image = 0 image = cls._decompress_cbf_c( flbuffer[idstart:idstop + 1], vals) else: image = np.array([0]) return np.transpose(image)
@classmethod def _decompress_cbf_c(cls, stream, vals): """ decompresses CBF :param stream: a part of cbf data :type stream: :class:`numpy.ndarray` :param val: decompress parameters, i.e. n_out, xdim, ydum padding :type val: :class:`numpy.ndarray` :returns: image data :rtype: :class:`numpy.ndarray` """ xdim = long(487) ydim = 619 padding = long(4095) n_out = xdim * ydim # simply assume content fits here if vals.size == 4 and sum(vals) != 0: xdim = vals[1] ydim = vals[2] padding = vals[3] n_out = vals[0] flbuffer = np.zeros(stream.size, dtype='int32') + stream mymap = np.zeros(stream.size, dtype='uint8') + 1 isvalid = np.zeros(stream.size, dtype='uint8') + 1 id_relevant = np.where(stream == 128) # overcome issue if 128 exists in padding (seems so that # either this does not happened before or padding was 0 in any case) try: idd = np.where(id_relevant < (flbuffer.size - padding)) id_relevant = id_relevant[idd] except Exception: pass for dummy, dummy2 in enumerate(id_relevant): for j, i in enumerate(dummy2): if mymap[i] != 0: if stream[i + 1] != 0 or stream[i + 2] != 128: mymap[i:i + 3] = 0 isvalid[i + 1:i + 3] = 0 delta = flbuffer[i + 1] + flbuffer[i + 2] * 256 if delta > 32768: delta -= 65536 flbuffer[i] = delta else: mymap[i:i + 7] = 0 isvalid[i + 1:i + 7] = 0 # delta=sum(np.multiply(flbuffer[i+3:i+7], # np.array([1,256,65536,16777216],dtype='int64'))) delta = ( np.multiply( flbuffer[i + 3:i + 7], np.array([1, 256, 65536, 16777216], dtype='int64'))).sum() if delta > 2147483648: delta -= 4294967296 flbuffer[i] = delta try: id8sign = np.where((stream > 128) & (mymap != 0)) flbuffer[id8sign] -= 256 # print ("adjusting 8Bit vals") # for i, j in enumerate(stream): # if j > 128 and mymap[i] !=0: # flbuffer[i]=flbuffer[i]-256 # print stream[0:11] # print flbuffer[0:11] except Exception: pass try: # print sum(isvalid) #should be 305548 idd = np.where(isvalid != 0) flbuffer = flbuffer[idd] except Exception: pass # print stream[0:11] # print flbuffer[0:11] res = np.cumsum(flbuffer, dtype='int32') # print max(res) if res.size - padding != n_out: return np.array([0]) # by A.R., Apr 24, 2017 # return res[0:n_out].reshape(xdim, ydim) return res[0:n_out].reshape(xdim, ydim, order='F')
[docs]class TIFLoader(object): """ TIF loader """
[docs] @classmethod def load(cls, flbuffer): """ loads TIF file image data into numpy array :param flbuffer: numpy array with TIF file image data :type flbuffer: :class:`numpy.ndarray` :returns: image data :rtype: :class:`numpy.ndarray` """ image = np.float64(-1) # define unsigned default if undefined - i.e. like MAR165 data sample_format = 1 flbuffer_endian = 'none' if sum(abs(flbuffer[0:2] - [73, 73])) == 0: flbuffer_endian = "<" # little if sum(abs(flbuffer[0:2] - [77, 77])) == 0: flbuffer_endian = ">" # big if flbuffer_endian == "none": return image # or better to raise exception? numfortiff = np.uint16( struct.unpack_from(flbuffer_endian + "H", flbuffer[2:4])[0]) if numfortiff != 42: return image # or better to raise exception? ifd_off = np.uint32( struct.unpack_from(flbuffer_endian + "I", flbuffer[4:8])[0]) # # jump to/eval image file directory (ifd) num_of_ifd = np.uint16( struct.unpack_from( flbuffer_endian + "H", flbuffer[ifd_off:ifd_off + 2])[0]) for ifd_entry in range(num_of_ifd): field_tag = np.uint16( struct.unpack_from( flbuffer_endian + "H", flbuffer[ifd_off + 2 + ifd_entry * 12:ifd_off + 4 + ifd_entry * 12])[0]) field_type = np.uint16( struct.unpack_from( flbuffer_endian + "H", flbuffer[ifd_off + 4 + ifd_entry * 12:ifd_off + 6 + ifd_entry * 12])[0]) # num_vals = np.uint32( # struct.unpack_from( # flbuffer_endian + "I", # flbuffer[ifd_off + 6 + ifd_entry * 12:ifd_off + 10 # + ifd_entry * 12])[0]) # given tiff 6.0 there are 12 type entries, currently not all of # them are accounted, A.R. val_or_off = 0 if field_type == 1: # check flbuffer addressing! val_or_off = np.uint8( struct.unpack_from( flbuffer_endian + "B", flbuffer[ifd_off + 10 + ifd_entry * 12:ifd_off + 15 + ifd_entry * 12])[0]) if field_type == 3: val_or_off = np.uint16( struct.unpack_from( flbuffer_endian + "H", flbuffer[ifd_off + 10 + ifd_entry * 12:ifd_off + 15 + ifd_entry * 12])[0]) if field_type == 4: val_or_off = np.uint32( struct.unpack_from( flbuffer_endian + "I", flbuffer[ifd_off + 10 + ifd_entry * 12:ifd_off + 15 + ifd_entry * 12])[0]) if field_type == 8: val_or_off = np.int16( struct.unpack_from( flbuffer_endian + "h", flbuffer[ifd_off + 10 + ifd_entry * 12:ifd_off + 15 + ifd_entry * 12])[0]) if field_type == 9: val_or_off = np.int32( struct.unpack_from( flbuffer_endian + "i", flbuffer[ifd_off + 10 + ifd_entry * 12:ifd_off + 15 + ifd_entry * 12])[0]) if field_type == 11: val_or_off = np.float32( struct.unpack_from( flbuffer_endian + "f", flbuffer[ifd_off + 10 + ifd_entry * 12:ifd_off + 15 + ifd_entry * 12])[0]) # eval (hopefully) tags needed to allow for getting an image if field_tag == 256: width = int(val_or_off) if field_tag == 257: length = int(val_or_off) if field_tag == 258: bit_per_sample = int(val_or_off) # compression scheme - return invalid if NOT none, # i.e. only uncompressed data is supported (forever!?) if field_tag == 259: if val_or_off != 1: return image # photometric interpretation - 2 denotes RGB which is refused # otherwise don't mind/care ... if field_tag == 262: if val_or_off == 2: return image if field_tag == 273: strip_offsets = val_or_off # likely equals image width # if field_tag == 278: # rows_per_strip = val_or_off if field_tag == 279: strip_byte_counts = val_or_off if field_tag == 339: sample_format = val_or_off next_idf = np.uint32( struct.unpack_from( flbuffer_endian + "I", flbuffer[ifd_off + 2 + (ifd_entry + 1) * 12: ifd_off + 6 + (ifd_entry + 1) * 12] )[0]) if next_idf != 0: # print('another ifd exists ... NOT read') pass if width * length * bit_per_sample // 8 != strip_byte_counts: return image if sample_format == 1 and bit_per_sample == 8: image = np.uint8( struct.unpack_from( flbuffer_endian + str(width * length) + "B", flbuffer[strip_offsets:strip_offsets + strip_byte_counts + 1])) if sample_format == 1 and bit_per_sample == 16: image = np.uint16( struct.unpack_from( flbuffer_endian + str(width * length) + "H", flbuffer[strip_offsets:strip_offsets + strip_byte_counts + 1])) if sample_format == 1 and bit_per_sample == 32: image = np.uint32( struct.unpack_from( flbuffer_endian + str(width * length) + "I", flbuffer[strip_offsets:strip_offsets + strip_byte_counts + 1])) # if sample_format == 2 and bit_per_sample == 8: # image=np.int8(struct.unpack_from( # flbuffer_endian+str(width*length)+"b", # flbuffer[strip_offsets:strip_offsets # +strip_byte_counts+1])) if sample_format == 2 and bit_per_sample == 16: image = np.int16( struct.unpack_from( flbuffer_endian + str(width * length) + "h", flbuffer[strip_offsets:strip_offsets + strip_byte_counts + 1])) if sample_format == 2 and bit_per_sample == 32: image = np.int32( struct.unpack_from( flbuffer_endian + str(width * length) + "i", flbuffer[strip_offsets:strip_offsets + strip_byte_counts + 1])) if sample_format == 3 and bit_per_sample == 32: image = np.float32( struct.unpack_from( flbuffer_endian + str(width * length) + "f", flbuffer[strip_offsets:strip_offsets + strip_byte_counts + 1])) try: return np.transpose(image.reshape(width, length, order='F')) except Exception: return np.transpose(image)
if __name__ == "__main__": # filename = # '/afs/desy.de/user/r/rothkirc/public/P03/lost_00001_00001.tif' # input # file name # input file name filename = '/afs/desy.de/user/r/rothkirc/public/20131129_eugen/' \ + 'mar165_agbeh_00001.tif' tmp = np.fromfile(filename, dtype='uint8') # read all content as unit 8 resu = TIFLoader().load(tmp) print("Return value shape and dtype") print("%s %s" % (resu.shape, resu.dtype))