"""
Provides utilities for exporting structure objects with ``ase`` and ``pymatgen``.
Tools for reading/writings settings files
"""
import io
import tarfile
import zipfile
import functools
import numpy as np
import typing as T
from frozendict import frozendict
from sqsgenerator.fallback.attrdict import AttrDict
from sqsgenerator.core import Structure, IterationMode
from sqsgenerator.compat import FeatureNotAvailableException
from operator import attrgetter as attr, methodcaller as method
from sqsgenerator.compat import require, have_feature, get_module, Feature
from sqsgenerator.adapters import from_ase_atoms, from_pymatgen_structure, to_pymatgen_structure, to_ase_atoms
F = Feature
known_adapters = (F.ase, F.pymatgen)
compression_to_file_extension = frozendict(zip='zip', bz2='tar.bz2', gz='tar.gz', xz='tar.xz')
output_formats = {
F.pymatgen: {'cif', 'mcif', 'poscar', 'cssr', 'json', 'xsf', 'prismatic', 'yaml'}
}
def identity(x: T.Any) -> T.Any:
return x
if have_feature(F.ase):
from ase.io.formats import all_formats, get_ioformat
# the underlying ase write function must be capable to cope with file handles
# therefore we use get_ioformat to get meta-information about the formats, and sort out the unsuitable
output_formats[F.ase] = {f for f in all_formats.keys() if get_ioformat(f).acceptsfd}
def prepare_handle(fp: T.IO[bytes], feature: Feature, format: str) -> T.Union[T.IO[bytes], T.IO[str]]:
"""
Sanitizes a file-like by wrapping it in a TextIO if needed
:param fp: the file-like to sanitize
:type fp: IO[bytes]
:param feature: the file writer module "ase" or "pymatgen"
:type feature: Feature
:param format: the file format e.g "cif"
:type format: str
:return: the file handle
:rtype: IO[bytes] or IO[str]
"""
assert format in output_formats[feature]
if feature == F.pymatgen:
return io.TextIOWrapper(fp)
elif feature == F.ase:
return fp if get_ioformat(format).isbinary else io.TextIOWrapper(fp)
def default_adapter():
"""
Gets the default writer/reader module to read files
:return: the default
:rtype: Feature or None
"""
for a in known_adapters:
if have_feature(a): return a
return None
def supported_formats(feature=None):
"""
For a given feature ("ase" or "pymatgen") the supported output file formats computed. If None is passed the values
from ``default_adapter()`` will be used
:param feature: the writer/reader feature (default is None)
:type feature: Feature
:return: the supported output formats
:rtype: set
"""
return output_formats.get(feature or default_adapter(), {})
class NonCloseableBytesIO(io.BytesIO):
"""
Class as workaround. Some ase.io writer functions close the buffer so that we cannot capture their output
I/O operation on closed buffer Exception. Therefore BytesIO with dummy
"""
def close(self) -> None:
pass
def really_close(self):
super(NonCloseableBytesIO, self).close()
def capture(f):
"""
Decorator which captures the written bytes of a "write" function an. It passed the buffer as the first argument
to the wrapped function
:param f: a callable returning the written bytes
:type f: callable
:return: wrapped function
"""
@functools.wraps(f)
def _capture(*args, **kwargs):
fp = NonCloseableBytesIO()
f(fp, *args, **kwargs)
fp.seek(0)
r = fp.getvalue()
fp.really_close()
return r
return _capture
@require(F.json, F.yaml, F.pickle, condition=any)
def dumps(o: dict, output_format: str = 'yaml') -> bytes:
"""
Dumps a dict-like object into a byte string, using a backend specified by {output_format}
:param o: a dict-like object which is serializable by the module specified by {output_format}
:param output_format: backend used to store (json, pickle, yaml) the object (default is ``"yaml"``
:type output_format: str
:return: the dumped content
:rtype: bytes
"""
f = F(output_format)
if not have_feature(f):
raise FeatureNotAvailableException(f'The package "{format}" is not installed, '
'consider to install it with')
# for yaml format we create a simple wrapper which captures the output
def safe_dumps(d, **kwargs):
buf = io.StringIO()
get_module(F.yaml).safe_dump(d, buf, **kwargs)
return buf.getvalue()
dumpers = {
F.json: lambda d: get_module(F.json).dumps(d, indent=4),
F.pickle: lambda d: get_module(F.pickle).dumps(d),
F.yaml: lambda d: safe_dumps(d, default_flow_style=None)
}
content = dumpers[f](o)
return content if f == F.pickle else content.encode()
[docs]@require(F.yaml, F.json, condition=any)
def read_settings_file(path: str, format: str = 'yaml') -> AttrDict:
"""
Reads a file expecting {format} as the file type. This method does not process the input paramters, but rather
just reads, them from. To obtain default values for all parameters use :py:func:`process_settings`
:param path: the file path
:type path: str
:param format: the input file-type. Possible formats are *yaml*, *json* and *pickle* (default is ``'yaml'``)
:type format: str
:return: the parsed settings
:rtype: AttrDict
"""
f = F(format)
readers = {
F.json: 'loads',
F.pickle: 'loads',
F.yaml: 'safe_load'
}
if not have_feature(f):
raise FeatureNotAvailableException(f'The package "{format}" is not installed, '
'consider to install it with')
reader = getattr(get_module(f), readers[f])
try:
mode = 'r' if f != F.pickle else 'rb'
with open(path, mode) as settings_file:
content = settings_file.read()
except (FileNotFoundError, UnicodeDecodeError):
raise
try:
data = AttrDict(reader(content))
except Exception as e:
raise IOError(f'While reading the file "{path}", a "{type(e).__name__}" occurred. '
f'Maybe the file has the wrong format. '
f'I was expecting a "{format}"-file. '
f'You can specify a different input-file format using the "--format" option')
return data
@require(F.ase)
def read_structure_file_with_ase(fn, **kwargs) -> Structure:
import ase.io
return from_ase_atoms(ase.io.read(fn, **kwargs))
@require(F.pymatgen)
def read_structure_file_with_pymatgen(fn, **kwargs) -> Structure:
import pymatgen.core
return from_pymatgen_structure(pymatgen.core.Structure.from_file(fn, **kwargs))
@require(F.ase)
def write_structure_file_with_ase(fp, structure: Structure, format, sort=True, **kwargs):
import ase.io
ase.io.write(fp, (to_ase_atoms(structure.sorted() if sort else structure),), format=format, **kwargs)
@require(F.pymatgen)
def write_structure_file_with_pymatgen(fp, structure: Structure, format, sort=True, **kwargs):
fp.write(to_pymatgen_structure(structure.sorted() if sort else structure).to(fmt=format, **kwargs))
@require(F.ase, F.pymatgen, condition=any)
def write_structure_file(fp: T.IO[bytes], structure: Structure, format: str, writer: Feature = default_adapter(),
**kwargs) -> T.NoReturn:
"""
Write a ``sqsgenerator.core.Structure`` object into a file, with file format {format} using {writer} as backend
:param fp: file object
:type fp: IO[bytes]
:param structure: the structure to save
:type structure: Structure
:param format: file type used. Must be supported by {writer}
:type format: str
:param writer: the writer backend (default is ``default_adapter()``)
:type writer: Feature
:param kwargs: keyword arguments passed to the backends
"""
writer_funcs = {
F.ase: write_structure_file_with_ase,
F.pymatgen: write_structure_file_with_pymatgen
}
fh = prepare_handle(fp, writer, format)
writer_funcs[writer](fh, structure, format, **kwargs)
fh.flush()
@require(F.ase, F.pymatgen, condition=any)
def read_structure_from_file(settings: AttrDict) -> Structure:
"""
Read a structure object from
:param settings: the settings dictionary
:type settings: AttrDict
:return: the Structure object
:rtype: Structure
"""
reader = settings.structure.get('reader', 'ase')
available_readers = set(map(attr('value'), known_adapters))
if reader not in available_readers:
raise FeatureNotAvailableException(f'Unknown reader specification "{reader}. '
f'Available readers are {known_adapters}')
reader_kwargs = settings.structure.get('args', {})
reader_funcs = dict(ase=read_structure_file_with_ase, pymatgen=read_structure_file_with_pymatgen)
return reader_funcs[reader](settings.structure.file, **reader_kwargs)
# Helper to capture the backends output into bytes
dumps_structure = capture(write_structure_file)
def to_dict(settings: dict) -> T.Dict[str, T.Any]:
"""
Utility method to recursively turn a general dictionary into an JSON/YAML serializable dictionary.
If a non-trivial object is encountered the function searches for a ``to_dict()`` function. If it has no method
available to serialize the object a ``TypeError`` is raised. **Attention:** the function serializes ``np.ndarray``
by calling ``tolist()``. This is not a good idea but fits the needs in this project
:param settings: a generic dictionary object
:type settings: dict
:return: a serializable dict
:rtype: dict
"""
identity = lambda _: _ # although bad practice this is readable =)
converters = {
int: identity,
float: identity,
str: identity,
bool: identity,
IterationMode: str,
np.float32: float,
np.float64: float,
np.int8: int,
np.int16: int,
np.int64: int,
np.int32: int,
np.ndarray: method('tolist')
}
def _generic_to_dict(d):
td = type(d)
if isinstance(d, (tuple, list, set)):
return td(map(_generic_to_dict, d))
elif isinstance(d, dict):
return dict({k: _generic_to_dict(v) for k, v in d.items()})
elif td in converters:
return converters[td](d)
elif hasattr(d, 'to_dict'):
return d.to_dict()
else:
raise TypeError(f'No converter specified for type "{td}"')
return _generic_to_dict(settings)
[docs]def export_structures(structures: T.Dict[T.Any, T.Any], format: str = 'cif', output_file: str = 'sqs.result',
writer: T.Union[Feature, str] = 'ase', compress: T.Optional[str] = None,
functor: T.Callable[[T.Any], str] = identity) -> T.NoReturn:
"""
Writes structures into files. The filename is specified by the keys of {structure} argument. The structures stored
in the values will be written using the {writer} backend in {format}. If compress is specified the structures will
be dumped into an archive with name {output_file}. The file-extension is chosen automatically.
:param structures: a mapping of filenames and :py:class:`Structures`
:type structures: dict[Any, Structure]
:param format: output file format (default is ``'cif'``)
:param output_file: the prefix of the output archive name. File extension is chosen automatically.
If {compress} is ``None`` this option is ignored (default is ``'sqs.result'``)
:type output_file: str
:param writer: the writer backend (default is ``'ase'``)
:type writer: str
:param compress: compression algorithm (``'zip'``, ``'gz'``, ``'bz2'`` or ``'xz'``) used to store the structure
files. If ``None`` the structures are written to plain files (default is ``None``)
:type compress: str or None
:param functor: a callable which maps the values of {structures} on a :py:class:`Structure` (default is ``identity = lambda x: x``)
:type functor: Callable[[Any], Structure]
"""
writer = Feature(writer) if isinstance(writer, str) else writer
output_prefix = output_file
if compress:
# select the proper file-mode as well as file-name and opening method
output_archive_file_mode = f'x:{compress}' if compress != 'zip' else 'x'
output_archive_name = f'{output_prefix}.{compression_to_file_extension.get(compress)}'
open_ = tarfile.open if compress != 'zip' else zipfile.ZipFile
archive_handle = open_(output_archive_name, output_archive_file_mode)
else:
archive_handle = None
# helper method, dealing with the compression algorithms
def write_structure_dump(data: bytes, filename: str):
if not compress:
with open(filename, 'wb') as fh:
fh.write(data)
else:
if compress == 'zip':
assert isinstance(archive_handle, zipfile.ZipFile)
archive_handle.writestr(filename, data)
else:
assert isinstance(archive_handle, tarfile.TarFile)
with io.BytesIO(data) as buf:
tar_info = tarfile.TarInfo(name=filename)
tar_info.size = len(data)
archive_handle.addfile(tar_info, buf)
structures = {k: functor(v) for k, v in structures.items()}
for rank, structure in structures.items():
filename = f'{rank}.{format}'
data = dumps_structure(structure, format, writer=writer) # capture the output from the {writer} backend
write_structure_dump(data, filename)
if compress:
# If we dumped everything into an archive, we have to close the it at the end
archive_handle.close()