Skip to content

io

ascii

Helpers for loading numeric data from ASCII files.

extract_data_paths_from_dir(dir_path, file_pattern='*')

List data files in a directory and return their sorted paths.

Hidden files (names starting with '.' or '__') are excluded. The returned paths are sorted lexicographically by file name.

Parameters:

Name Type Description Default
dir_path str | Path

Path to the directory containing data files.

required
file_pattern str

Glob pattern to filter files (e.g. '*.dat', '*.xye').

'*'

Returns:

Type Description
list[str]

Sorted absolute paths to the matching data files.

Raises:

Type Description
FileNotFoundError

If dir_path does not exist or is not a directory.

ValueError

If no matching data files are found.

Source code in src/easydiffraction/io/ascii.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def extract_data_paths_from_dir(
    dir_path: str | Path,
    file_pattern: str = '*',
) -> list[str]:
    """
    List data files in a directory and return their sorted paths.

    Hidden files (names starting with ``'.'`` or ``'__'``) are excluded.
    The returned paths are sorted lexicographically by file name.

    Parameters
    ----------
    dir_path : str | Path
        Path to the directory containing data files.
    file_pattern : str, default='*'
        Glob pattern to filter files (e.g. ``'*.dat'``, ``'*.xye'``).

    Returns
    -------
    list[str]
        Sorted absolute paths to the matching data files.

    Raises
    ------
    FileNotFoundError
        If *dir_path* does not exist or is not a directory.
    ValueError
        If no matching data files are found.
    """
    dir_path = Path(dir_path)
    if not dir_path.is_dir():
        raise FileNotFoundError(f'Directory not found: {dir_path}')

    paths = sorted(
        str(p)
        for p in dir_path.glob(file_pattern)
        if p.is_file() and not p.name.startswith('.') and not p.name.startswith('__')
    )

    if not paths:
        raise ValueError(f"No files matching '{file_pattern}' found in directory: {dir_path}")

    return paths

extract_data_paths_from_zip(zip_path)

Extract all files from a ZIP archive and return their paths.

Files are extracted into a temporary directory that persists for the lifetime of the process. The returned paths are sorted lexicographically by file name so that numbered data files (e.g. scan_001.dat, scan_002.dat) appear in natural order. Hidden files and directories (names starting with '.' or '__') are excluded.

Parameters:

Name Type Description Default
zip_path str | Path

Path to the ZIP archive.

required

Returns:

Type Description
list[str]

Sorted absolute paths to the extracted data files.

Raises:

Type Description
FileNotFoundError

If zip_path does not exist.

ValueError

If the archive contains no usable data files.

Source code in src/easydiffraction/io/ascii.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def extract_data_paths_from_zip(zip_path: str | Path) -> list[str]:
    """
    Extract all files from a ZIP archive and return their paths.

    Files are extracted into a temporary directory that persists for the
    lifetime of the process.  The returned paths are sorted
    lexicographically by file name so that numbered data files (e.g.
    ``scan_001.dat``, ``scan_002.dat``) appear in natural order. Hidden
    files and directories (names starting with ``'.'`` or ``'__'``) are
    excluded.

    Parameters
    ----------
    zip_path : str | Path
        Path to the ZIP archive.

    Returns
    -------
    list[str]
        Sorted absolute paths to the extracted data files.

    Raises
    ------
    FileNotFoundError
        If *zip_path* does not exist.
    ValueError
        If the archive contains no usable data files.
    """
    zip_path = Path(zip_path)
    if not zip_path.exists():
        raise FileNotFoundError(f'ZIP file not found: {zip_path}')

    # TODO: Unify mkdir with other uses in the code
    extract_dir = Path(tempfile.mkdtemp(prefix='ed_zip_'))

    with zipfile.ZipFile(zip_path, 'r') as zf:
        zf.extractall(extract_dir)

    paths = sorted(
        str(p)
        for p in extract_dir.rglob('*')
        if p.is_file() and not p.name.startswith('.') and not p.name.startswith('__')
    )

    if not paths:
        raise ValueError(f'No data files found in ZIP archive: {zip_path}')

    return paths

extract_metadata(file_path, pattern)

Extract a single numeric value from a file using a regex pattern.

The entire file content is searched (not just the header). The first match is used. The regex must contain exactly one capture group whose match is convertible to float.

Parameters:

Name Type Description Default
file_path str | Path

Path to the input file.

required
pattern str

Regex with one capture group that matches the numeric value.

required

Returns:

Type Description
float | None

The extracted value, or None if the pattern did not match or the captured text could not be converted to float.

Source code in src/easydiffraction/io/ascii.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def extract_metadata(
    file_path: str | Path,
    pattern: str,
) -> float | None:
    """
    Extract a single numeric value from a file using a regex pattern.

    The entire file content is searched (not just the header).  The
    **first** match is used.  The regex must contain exactly one capture
    group whose match is convertible to ``float``.

    Parameters
    ----------
    file_path : str | Path
        Path to the input file.
    pattern : str
        Regex with one capture group that matches the numeric value.

    Returns
    -------
    float | None
        The extracted value, or ``None`` if the pattern did not match or
        the captured text could not be converted to float.
    """
    import re

    content = Path(file_path).read_text(encoding='utf-8', errors='ignore')
    match = re.search(pattern, content, re.MULTILINE)
    if match is None:
        return None
    try:
        return float(match.group(1))
    except (ValueError, IndexError):
        return None

load_numeric_block(data_path)

Load a numeric block from an ASCII file, skipping header lines.

Read the file and try numpy.loadtxt starting from the first line, then the second, etc., until the load succeeds. This allows files with an arbitrary number of non-numeric header lines to be parsed without prior knowledge of the format.

Parameters:

Name Type Description Default
data_path str | Path

Path to the ASCII data file.

required

Returns:

Type Description
ndarray

2-D array of the parsed numeric data.

Raises:

Type Description
IOError

If no contiguous numeric block can be found in the file.

Source code in src/easydiffraction/io/ascii.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def load_numeric_block(data_path: str | Path) -> np.ndarray:
    """
    Load a numeric block from an ASCII file, skipping header lines.

    Read the file and try ``numpy.loadtxt`` starting from the first
    line, then the second, etc., until the load succeeds.  This allows
    files with an arbitrary number of non-numeric header lines to be
    parsed without prior knowledge of the format.

    Parameters
    ----------
    data_path : str | Path
        Path to the ASCII data file.

    Returns
    -------
    np.ndarray
        2-D array of the parsed numeric data.

    Raises
    ------
    IOError
        If no contiguous numeric block can be found in the file.
    """
    data_path = Path(data_path)
    lines = data_path.read_text().splitlines()

    last_error: Exception | None = None
    for start in range(len(lines)):
        try:
            return np.loadtxt(StringIO('\n'.join(lines[start:])))
        except Exception as e:  # noqa: BLE001
            last_error = e

    raise IOError(
        f'Failed to read numeric data from {data_path}: {last_error}',
    ) from last_error

cif

handler

Minimal CIF tag handler used by descriptors/parameters.

CifHandler

Canonical CIF handler used by descriptors/parameters.

Holds CIF tags (names) and attaches to an owning descriptor so it can derive a stable uid if needed.

Source code in src/easydiffraction/io/cif/handler.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class CifHandler:
    """
    Canonical CIF handler used by descriptors/parameters.

    Holds CIF tags (names) and attaches to an owning descriptor so it
    can derive a stable uid if needed.
    """

    def __init__(self, *, names: list[str]) -> None:
        self._names = names
        self._owner = None  # set by attach

    def attach(self, owner: object) -> None:
        """Attach to a descriptor or parameter instance."""
        self._owner = owner

    @property
    def names(self) -> list[str]:
        """List of CIF tag names associated with the owner."""
        return self._names

    @property
    def uid(self) -> str | None:
        """Unique identifier taken from the owner, if attached."""
        if self._owner is None:
            return None
        return self._owner.unique_name
attach(owner)

Attach to a descriptor or parameter instance.

Source code in src/easydiffraction/io/cif/handler.py
20
21
22
def attach(self, owner: object) -> None:
    """Attach to a descriptor or parameter instance."""
    self._owner = owner
names property

List of CIF tag names associated with the owner.

uid property

Unique identifier taken from the owner, if attached.

parse

document_from_path(path)

Read a CIF document from a file path.

Source code in src/easydiffraction/io/cif/parse.py
7
8
9
def document_from_path(path: str) -> gemmi.cif.Document:
    """Read a CIF document from a file path."""
    return gemmi.cif.read_file(path)

document_from_string(text)

Read a CIF document from a raw text string.

Source code in src/easydiffraction/io/cif/parse.py
12
13
14
def document_from_string(text: str) -> gemmi.cif.Document:
    """Read a CIF document from a raw text string."""
    return gemmi.cif.read_string(text)

name_from_block(block)

Extract a model name from the CIF block name.

Source code in src/easydiffraction/io/cif/parse.py
22
23
24
25
def name_from_block(block: gemmi.cif.Block) -> str:
    """Extract a model name from the CIF block name."""
    # TODO: Need validator or normalization?
    return block.name

pick_sole_block(doc)

Pick the sole data block from a CIF document.

Source code in src/easydiffraction/io/cif/parse.py
17
18
19
def pick_sole_block(doc: gemmi.cif.Document) -> gemmi.cif.Block:
    """Pick the sole data block from a CIF document."""
    return doc.sole_block()

serialize

analysis_to_cif(analysis)

Render analysis metadata, aliases, and constraints to CIF.

Source code in src/easydiffraction/io/cif/serialize.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def analysis_to_cif(analysis: object) -> str:
    """Render analysis metadata, aliases, and constraints to CIF."""
    cur_min = format_value(analysis.current_minimizer)
    lines: list[str] = []
    lines.append(f'_analysis.fitting_engine  {cur_min}')
    lines.append(analysis.fit_mode.as_cif)
    lines.append('')
    lines.append(analysis.aliases.as_cif)
    lines.append('')
    lines.append(analysis.constraints.as_cif)
    jfe_cif = analysis.joint_fit_experiments.as_cif
    if jfe_cif:
        lines.append('')
        lines.append(jfe_cif)
    return '\n'.join(lines)

category_collection_from_cif(self, block)

Populate a CategoryCollection from a CIF loop.

Parameters:

Name Type Description Default
self CategoryCollection

The collection instance to populate.

required
block Block

Parsed CIF block to read the loop from.

required

Raises:

Type Description
ValueError

If the collection has no _item_type defined.

Source code in src/easydiffraction/io/cif/serialize.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def category_collection_from_cif(
    self: CategoryCollection,
    block: gemmi.cif.Block,
) -> None:
    """
    Populate a CategoryCollection from a CIF loop.

    Parameters
    ----------
    self : CategoryCollection
        The collection instance to populate.
    block : gemmi.cif.Block
        Parsed CIF block to read the loop from.

    Raises
    ------
    ValueError
        If the collection has no ``_item_type`` defined.
    """
    # TODO: Find a better way and then remove TODO in the AtomSite
    #  class
    # TODO: Rename to _item_cls?
    if self._item_type is None:
        raise ValueError('Child class is not defined.')

    # Create a temporary instance to access its parameters and
    # parameter CIF names
    category_item = self._item_type()

    # Iterate over category parameters and their possible CIF names
    # trying to find the whole loop it belongs to inside the CIF block
    def _get_loop(block: object, category_item: object) -> object | None:
        for param in category_item.parameters:
            for name in param._cif_handler.names:
                loop = block.find_loop(name).get_loop()
                if loop is not None:
                    return loop
        return None

    loop = _get_loop(block, category_item)

    # If no loop found
    if loop is None:
        log.debug(f'No loop found for category {self}.')
        return

    # Get 2D array of loop values (as strings)
    num_rows = loop.length()
    num_cols = loop.width()
    array = np.array(loop.values, dtype=str).reshape(num_rows, num_cols)

    # Pre-create default items in the collection
    self._items = [self._item_type() for _ in range(num_rows)]

    # Set parent for each item to enable identity resolution
    for item in self._items:
        object.__setattr__(item, '_parent', self)

    # Set those items' parameters, which are present in the loop
    for row_idx in range(num_rows):
        current_item = self._items[row_idx]
        for param in current_item.parameters:
            for cif_name in param._cif_handler.names:
                if cif_name in loop.tags:
                    col_idx = loop.tags.index(cif_name)

                    # TODO: The following is duplication of
                    #  param_from_cif
                    raw = array[row_idx][col_idx]

                    # If numeric, parse with uncertainty if present
                    if param._value_type == DataTypes.NUMERIC:
                        u = str_to_ufloat(raw)
                        param.value = u.n
                        if not np.isnan(u.s) and hasattr(param, 'uncertainty'):
                            param.uncertainty = u.s  # type: ignore[attr-defined]
                            param.free = True  # Mark as free if uncertainty is present

                    # If string, strip quotes if present
                    # TODO: Make a helper function for this
                    elif param._value_type == DataTypes.STRING:
                        if len(raw) >= 2 and raw[0] == raw[-1] and raw[0] in {"'", '"'}:
                            param.value = raw[1:-1]
                        else:
                            param.value = raw

                    # Other types are not supported
                    else:
                        log.debug(f'Unrecognized type: {param._value_type}')

                    break

category_collection_to_cif(collection, max_display=20)

Render a CategoryCollection-like object to CIF text.

Uses first item to build loop header, then emits rows for each item.

Source code in src/easydiffraction/io/cif/serialize.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def category_collection_to_cif(
    collection: object,
    max_display: Optional[int] = 20,
) -> str:
    """
    Render a CategoryCollection-like object to CIF text.

    Uses first item to build loop header, then emits rows for each item.
    """
    if not len(collection):
        return ''

    lines: list[str] = []

    # Header
    first_item = list(collection.values())[0]
    lines.append('loop_')
    for p in first_item.parameters:
        tags = p._cif_handler.names  # type: ignore[attr-defined]
        lines.append(tags[0])

    # Rows
    # Limit number of displayed rows if requested
    if len(collection) > max_display:
        half_display = max_display // 2
        for i in range(half_display):
            item = list(collection.values())[i]
            row_vals = [format_value(p.value) for p in item.parameters]
            lines.append(' '.join(row_vals))
        lines.append('...')
        for i in range(-half_display, 0):
            item = list(collection.values())[i]
            row_vals = [format_value(p.value) for p in item.parameters]
            lines.append(' '.join(row_vals))
    # No limit
    else:
        for item in collection.values():
            row_vals = [format_value(p.value) for p in item.parameters]
            lines.append(' '.join(row_vals))

    return '\n'.join(lines)

category_item_from_cif(self, block, idx=0)

Populate each parameter from CIF block at given loop index.

Source code in src/easydiffraction/io/cif/serialize.py
292
293
294
295
296
297
298
299
def category_item_from_cif(
    self: CategoryItem,
    block: gemmi.cif.Block,
    idx: int = 0,
) -> None:
    """Populate each parameter from CIF block at given loop index."""
    for param in self.parameters:
        param.from_cif(block, idx=idx)

category_item_to_cif(item)

Render a CategoryItem-like object to CIF text.

Expects item.parameters iterable of params with _cif_handler.names and value.

Source code in src/easydiffraction/io/cif/serialize.py
74
75
76
77
78
79
80
81
82
83
84
def category_item_to_cif(item: object) -> str:
    """
    Render a CategoryItem-like object to CIF text.

    Expects ``item.parameters`` iterable of params with
    ``_cif_handler.names`` and ``value``.
    """
    lines: list[str] = []
    for p in item.parameters:
        lines.append(param_to_cif(p))
    return '\n'.join(lines)

datablock_collection_to_cif(collection)

Render a collection of datablocks by joining their CIF blocks.

Source code in src/easydiffraction/io/cif/serialize.py
156
157
158
def datablock_collection_to_cif(collection: object) -> str:
    """Render a collection of datablocks by joining their CIF blocks."""
    return '\n\n'.join([block.as_cif for block in collection.values()])

datablock_item_to_cif(datablock)

Render a DatablockItem-like object to CIF text.

Emits a data_ header and then concatenates category CIF sections.

Source code in src/easydiffraction/io/cif/serialize.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def datablock_item_to_cif(datablock: object) -> str:
    """
    Render a DatablockItem-like object to CIF text.

    Emits a data_ header and then concatenates category CIF sections.
    """
    # Local imports to avoid import-time cycles
    from easydiffraction.core.category import CategoryCollection
    from easydiffraction.core.category import CategoryItem

    header = f'data_{datablock._identity.datablock_entry_name}'
    parts: list[str] = [header]

    # First categories
    for v in vars(datablock).values():
        if isinstance(v, CategoryItem):
            parts.append(v.as_cif)

    # Then collections
    for v in vars(datablock).values():
        if isinstance(v, CategoryCollection):
            parts.append(v.as_cif)

    return '\n\n'.join(parts)

experiment_to_cif(experiment)

Render an experiment: datablock part plus measured data.

Source code in src/easydiffraction/io/cif/serialize.py
204
205
206
def experiment_to_cif(experiment: object) -> str:
    """Render an experiment: datablock part plus measured data."""
    return datablock_item_to_cif(experiment)

format_value(value)

Format a single CIF value for output.

.. note:: The precision must be high enough so that the minimizer's finite-difference Jacobian probes (typically ~1e-8 relative) survive the float→string→float round-trip through CIF.

Source code in src/easydiffraction/io/cif/serialize.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def format_value(value: object) -> str:
    """
    Format a single CIF value for output.

    .. note::     The precision must be high enough so that the
    minimizer's     finite-difference Jacobian probes (typically ~1e-8
    relative)     survive the float→string→float round-trip through CIF.
    """
    width = 12
    precision = 8

    # Converting

    # Convert ints to floats
    if isinstance(value, int):
        value = float(value)
    # Strings with whitespace are quoted
    elif isinstance(value, str) and (' ' in value or '\t' in value):
        value = f'"{value}"'

    # Formatting

    # Format floats with given precision
    if isinstance(value, float):
        return f'{value:>{width}.{precision}f}'
    # Format strings right-aligned
    elif isinstance(value, str):
        return f'{value:>{width}s}'
    # Everything else: fallback
    else:
        return str(value)

param_from_cif(self, block, idx=0)

Populate a single descriptor from a CIF block.

Parameters:

Name Type Description Default
self GenericDescriptorBase

The descriptor instance to populate.

required
block Block

Parsed CIF block to read values from.

required
idx int

Row index used when the tag belongs to a loop.

0
Source code in src/easydiffraction/io/cif/serialize.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def param_from_cif(
    self: GenericDescriptorBase,
    block: gemmi.cif.Block,
    idx: int = 0,
) -> None:
    """
    Populate a single descriptor from a CIF block.

    Parameters
    ----------
    self : GenericDescriptorBase
        The descriptor instance to populate.
    block : gemmi.cif.Block
        Parsed CIF block to read values from.
    idx : int, default=0
        Row index used when the tag belongs to a loop.
    """
    found_values: list[Any] = []

    # Try to find the value(s) from the CIF block iterating over
    # the possible cif names in order of preference.
    for tag in self._cif_handler.names:
        candidates = list(block.find_values(tag))
        if candidates:
            found_values = candidates
            break

    # If no values found, the parameter keeps its default value.
    if not found_values:
        return

    # If found, pick the one at the given index
    raw = found_values[idx]

    # If numeric, parse with uncertainty if present
    if self._value_type == DataTypes.NUMERIC:
        u = str_to_ufloat(raw)
        self.value = u.n
        if not np.isnan(u.s) and hasattr(self, 'uncertainty'):
            self.uncertainty = u.s  # type: ignore[attr-defined]
            self.free = True  # Mark as free if uncertainty is present

    # If string, strip quotes if present
    elif self._value_type == DataTypes.STRING:
        if len(raw) >= 2 and raw[0] == raw[-1] and raw[0] in {"'", '"'}:
            self.value = raw[1:-1]
        else:
            self.value = raw

    # Other types are not supported
    else:
        log.debug(f'Unrecognized type: {self._value_type}')

param_to_cif(param)

Render a single descriptor/parameter to a CIF line.

Expects param to expose _cif_handler.names and value.

Source code in src/easydiffraction/io/cif/serialize.py
63
64
65
66
67
68
69
70
71
def param_to_cif(param: object) -> str:
    """
    Render a single descriptor/parameter to a CIF line.

    Expects ``param`` to expose ``_cif_handler.names`` and ``value``.
    """
    tags: Sequence[str] = param._cif_handler.names  # type: ignore[attr-defined]
    main_key: str = tags[0]
    return f'{main_key} {format_value(param.value)}'

project_info_to_cif(info)

Render ProjectInfo to CIF text (id, title, description).

Source code in src/easydiffraction/io/cif/serialize.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def project_info_to_cif(info: object) -> str:
    """Render ProjectInfo to CIF text (id, title, description)."""
    name = f'{info.name}'

    title = f'{info.title}'
    if ' ' in title:
        title = f"'{title}'"

    if len(info.description) > 60:
        description = f'\n;\n{info.description}\n;'
    else:
        description = f'{info.description}'
        if ' ' in description:
            description = f"'{description}'"

    created = f"'{info._created.strftime('%d %b %Y %H:%M:%S')}'"
    last_modified = f"'{info._last_modified.strftime('%d %b %Y %H:%M:%S')}'"

    return (
        f'_project.id               {name}\n'
        f'_project.title            {title}\n'
        f'_project.description      {description}\n'
        f'_project.created          {created}\n'
        f'_project.last_modified    {last_modified}'
    )

project_to_cif(project)

Render a whole project by concatenating sections when present.

Source code in src/easydiffraction/io/cif/serialize.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def project_to_cif(project: object) -> str:
    """Render a whole project by concatenating sections when present."""
    parts: list[str] = []
    if hasattr(project, 'info'):
        parts.append(project.info.as_cif)
    if getattr(project, 'structures', None):
        parts.append(project.structures.as_cif)
    if getattr(project, 'experiments', None):
        parts.append(project.experiments.as_cif)
    if getattr(project, 'analysis', None):
        parts.append(project.analysis.as_cif())
    if getattr(project, 'summary', None):
        parts.append(project.summary.as_cif())
    return '\n\n'.join([p for p in parts if p])

summary_to_cif(_summary)

Render a summary CIF block (placeholder for now).

Source code in src/easydiffraction/io/cif/serialize.py
226
227
228
def summary_to_cif(_summary: object) -> str:
    """Render a summary CIF block (placeholder for now)."""
    return 'To be added...'