Skip to content

util

Utility functions and classes used by other modules in CorpusTools.

ArgumentError

Bases: Exception

This exception is raised when argument errors occur.

Source code in corpustools/util.py
52
53
class ArgumentError(Exception):
    """This exception is raised when argument errors occur."""

ConversionError

Bases: Exception

Raise this exception when conversions error occur.

Source code in corpustools/util.py
56
57
class ConversionError(Exception):
    """Raise this exception when conversions error occur."""

ExecutableMissingError

Bases: Exception

This exception is raised when wanted executables are missing.

Source code in corpustools/util.py
48
49
class ExecutableMissingError(Exception):
    """This exception is raised when wanted executables are missing."""

ExternalCommandRunner

Class to run external command through subprocess.

Attributes:

Name Type Description
stdout

save the stdout of the command here.

stderr

save the stderr of the command here.

returncode

save the returncode of the command here.

Source code in corpustools/util.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
class ExternalCommandRunner:
    """Class to run external command through subprocess.

    Attributes:
        stdout: save the stdout of the command here.
        stderr: save the stderr of the command here.
        returncode: save the returncode of the command here.
    """

    def __init__(self):
        """Initialise the ExternalCommandRunner class."""
        self.stdout = None
        self.stderr = None
        self.returncode = None

    def run(self, command, cwd=None, to_stdin=None):
        """Run the command, save the result."""
        try:
            subp = subprocess.Popen(
                command,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                cwd=cwd,
            )
        except OSError:
            raise ExecutableMissingError(
                f"Please install {command[0]}, can not continue without it."
            ) from None

        (self.stdout, self.stderr) = subp.communicate(to_stdin)
        self.returncode = subp.returncode

__init__()

Initialise the ExternalCommandRunner class.

Source code in corpustools/util.py
312
313
314
315
316
def __init__(self):
    """Initialise the ExternalCommandRunner class."""
    self.stdout = None
    self.stderr = None
    self.returncode = None

run(command, cwd=None, to_stdin=None)

Run the command, save the result.

Source code in corpustools/util.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def run(self, command, cwd=None, to_stdin=None):
    """Run the command, save the result."""
    try:
        subp = subprocess.Popen(
            command,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            cwd=cwd,
        )
    except OSError:
        raise ExecutableMissingError(
            f"Please install {command[0]}, can not continue without it."
        ) from None

    (self.stdout, self.stderr) = subp.communicate(to_stdin)
    self.returncode = subp.returncode

SetupError

Bases: Exception

This exception is raised when setup is faulty.

Source code in corpustools/util.py
44
45
class SetupError(Exception):
    """This exception is raised when setup is faulty."""

basename_noext(fname, ext)

Get the basename without the extension.

Parameters:

Name Type Description Default
fname str

path to the file.

required
ext str

the extension that should be removed.

required

Returns:

Type Description
str

fname without the extension.

Source code in corpustools/util.py
73
74
75
76
77
78
79
80
81
82
83
def basename_noext(fname, ext):
    """Get the basename without the extension.

    Args:
        fname (str): path to the file.
        ext (str): the extension that should be removed.

    Returns:
        (str): fname without the extension.
    """
    return os.path.basename(fname)[: -len(ext)]

executable_in_path(program)

Check if program is in path.

Parameters:

Name Type Description Default
program str

name of the program

required

Returns:

Type Description
bool

True if program is found, False otherwise.

Source code in corpustools/util.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def executable_in_path(program):
    """Check if program is in path.

    Args:
        program (str): name of the program

    Returns:
        (bool): True if program is found, False otherwise.
    """
    fpath, _ = os.path.split(program)
    if fpath:
        return is_executable(program)
    else:
        return any(
            is_executable(possible_path)
            for possible_path in path_possibilities(program)
        )

get_lang_resource(lang, resource, fallback=None)

Get a language resource.

Parameters:

Name Type Description Default
lang str

the language of the resource.

required
resource str

the resource that is needed.

required
fallback str or None

the fallback resource. Default is None.

None

Returns:

Type Description
str

path to the resource or fallback.

Source code in corpustools/util.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def get_lang_resource(lang, resource, fallback=None):
    """Get a language resource.

    Args:
        lang (str): the language of the resource.
        resource (str): the resource that is needed.
        fallback (str or None): the fallback resource. Default is None.

    Returns:
        (str): path to the resource or fallback.
    """
    path = os.path.join(os.environ["GTHOME"], "langs", lang, resource)
    if os.path.exists(path):
        return path
    else:
        return fallback

get_preprocess_command(lang)

Get the complete proprocess command for lang.

Parameters:

Name Type Description Default
lang str

the language

required

Returns:

Type Description
list[str]

the complete preprocess command.

Source code in corpustools/util.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def get_preprocess_command(lang):
    """Get the complete proprocess command for lang.

    Args:
        lang (str): the language

    Returns:
        (list[str]): the complete preprocess command.
    """
    preprocess_script = os.path.join(os.environ["GTHOME"], "gt/script/preprocess")
    sanity_check([preprocess_script])
    abbr_fb = get_lang_resource("sme", "tools/preprocess/abbr.txt")
    abbr = get_lang_resource(lang, "tools/preprocess/abbr.txt", abbr_fb)
    return [preprocess_script, f"--abbr={abbr}"]

human_readable_filesize(num, suffix='B')

Returns human readable filesize

Source code in corpustools/util.py
337
338
339
340
341
342
343
344
def human_readable_filesize(num, suffix="B"):
    """Returns human readable filesize"""
    # https://stackoverflow.com/questions/1094841/get-human-readable-version-of-file-size
    for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
        if abs(num) < 1024.0:
            return f"{num:3.1f}{unit}{suffix}"
        num /= 1024.0
    return f"{num:.1f}Yi{suffix}"

ignored(*exceptions)

Ignore exceptions.

Source code in corpustools/util.py
294
295
296
297
298
299
300
@contextmanager
def ignored(*exceptions):
    """Ignore exceptions."""
    try:
        yield
    except exceptions:
        pass

is_executable(fullpath)

Check if the program in fullpath is executable.

Parameters:

Name Type Description Default
fullpath str

the path to the program or script.

required

Returns:

Type Description
bool

True if fullpath contains a executable, False otherwise.

Source code in corpustools/util.py
115
116
117
118
119
120
121
122
123
124
def is_executable(fullpath):
    """Check if the program in fullpath is executable.

    Args:
        fullpath (str): the path to the program or script.

    Returns:
        (bool): True if fullpath contains a executable, False otherwise.
    """
    return os.path.isfile(fullpath) and os.access(fullpath, os.X_OK)

lang_resource_dirs(lang)

Get the path to the language resources.

Parameters:

Name Type Description Default
lang str

the language that modes is asked to serve.

required

Returns:

Type Description
list[Path]

A path to the zpipe file.

Source code in corpustools/util.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
def lang_resource_dirs(lang: str) -> list[Path]:
    """Get the path to the language resources.

    Args:
        lang: the language that modes is asked to serve.

    Returns:
        A path to the zpipe file.
    """
    return [
        prefix / "share" / "giella" / lang
        for prefix in [
            Path().home() / ".local",
            Path("/usr/local"),
            Path("/usr"),
        ]
    ]

lineno()

Return the current line number in our program.

Source code in corpustools/util.py
212
213
214
def lineno():
    """Return the current line number in our program."""
    return inspect.currentframe().f_back.f_lineno

make_digest(bytestring)

Make a md5 hash to identify possible dupes.

Source code in corpustools/util.py
461
462
463
464
465
def make_digest(bytestring: bytes) -> str:
    """Make a md5 hash to identify possible dupes."""
    hasher = hashlib.md5()
    hasher.update(bytestring)
    return hasher.hexdigest()

name_to_unicode(filename)

Turn a filename to a unicode string.

Parameters:

Name Type Description Default
filename str

name of the file

required

Returns:

Type Description
str

A unicode string.

Source code in corpustools/util.py
270
271
272
273
274
275
276
277
278
279
280
281
282
def name_to_unicode(filename):
    """Turn a filename to a unicode string.

    Args:
        filename (str): name of the file

    Returns:
        (str): A unicode string.
    """
    if platform.system() == "Windows":
        return filename
    else:
        return filename.decode("utf-8")

note(msg)

Print msg to stderr.

Parameters:

Name Type Description Default
msg str

the message

required
Source code in corpustools/util.py
285
286
287
288
289
290
291
def note(msg):
    """Print msg to stderr.

    Args:
        msg (str): the message
    """
    print(msg, file=sys.stderr)

path_possibilities(program)

Check if program is found in $PATH.

Parameters:

Name Type Description Default
program str

name of program of script.

required

Yields:

Type Description
str

possible fullpath to the program

Source code in corpustools/util.py
127
128
129
130
131
132
133
134
135
136
137
138
139
def path_possibilities(program):
    """Check if program is found in $PATH.

    Args:
        program (str): name of program of script.

    Yields:
        (str): possible fullpath to the program
    """
    return (
        os.path.join(path.strip('"'), program)
        for path in os.environ["PATH"].split(os.pathsep)
    )

print_element(element, level, indent)

Format an html document.

This function formats html documents for readability, to see the structure of the given document. It ruins white space in text parts.

Parameters:

Name Type Description Default
element Element

the element to format.

required
level int

indicate at what level this element is.

required
indent int

indicate how many spaces this element should be indented

required

Returns: formatted element as list of strings.

Source code in corpustools/util.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def print_element(element: etree.Element, level: int, indent: int) -> list[str]:
    """Format an html document.

    This function formats html documents for readability, to see
    the structure of the given document. It ruins white space in
    text parts.

    Args:
        element: the element to format.
        level: indicate at what level this element is.
        indent: indicate how many spaces this element should be indented
    Returns:
        formatted element as list of strings.
    """
    tag = element.tag.replace("{http://www.w3.org/1999/xhtml}", "")

    strings:list[str] = []
    strings.append(" " * (level * indent))
    strings.append(f"<{tag}")

    for k, v in element.attrib.items():
        strings.append(" ")
        if isinstance(k, str):
            strings.append(k)
        else:
            strings.append(k)
        strings.append('="')
        if isinstance(v, str):
            strings.append(v)
        else:
            strings.append(v)
        strings.append('"')
    strings.append(">\n")

    if element.text is not None and element.text.strip() != "":
        strings.append(" " * ((level + 1) * indent))
        strings.append(element.text.strip())
        strings.append("\n")

    for child in element:
        print_element(child, level + 1, indent, out)

    strings.append(" " * (level * indent))
    strings.append(f"</{tag}>\n")

    if level > 0 and element.tail is not None and element.tail.strip() != "":
        for _ in range(0, (level - 1) * indent):
            strings.append(" ")
        strings.append(element.tail.strip())
        strings.append("\n")

    return strings

print_frame(debug='', *args)

Print debug output.

Source code in corpustools/util.py
60
61
62
63
64
65
66
67
68
69
70
def print_frame(debug="", *args):
    """Print debug output."""
    # 0 represents this line, 1 represents line at caller
    callerframerecord = inspect.stack()[1]
    frame = callerframerecord[0]
    info = inspect.getframeinfo(frame)

    print(info.lineno, info.function, debug, file=sys.stderr, end=" ")
    for arg in args:
        print(arg, file=sys.stderr, end=" ")
    print(file=sys.stderr)

replace_all(replacements, string)

Replace unwanted strings with wanted strings.

Parameters:

Name Type Description Default
replacements list of tuple

unwanted:wanted string pairs.

required
string str

the string where replacements should be done.

required

Returns:

Type Description
str

string with replaced strings.

Source code in corpustools/util.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def replace_all(replacements, string):
    """Replace unwanted strings with wanted strings.

    Args:
        replacements (list of tuple): unwanted:wanted string pairs.
        string (str): the string where replacements should be done.

    Returns:
        (str): string with replaced strings.
    """
    for unwanted, wanted in replacements:
        string = string.replace(unwanted, wanted)

    return string

run_external_command(command, instring)

Run the command with input using subprocess.

Parameters:

Name Type Description Default
command list[str]

a subprocess compatible command.

required
instring str

the input to the command.

required

Returns:

Type Description
str

Analysed text.

Raises:

Type Description
UserWarning

if the command fails.

Source code in corpustools/util.py
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
def run_external_command(command: list[str], instring: str) -> str:
    """Run the command with input using subprocess.

    Args:
        command: a subprocess compatible command.
        instring: the input to the command.

    Returns:
        Analysed text.

    Raises:
        UserWarning: if the command fails.
    """
    runner = ExternalCommandRunner()
    runner.run(command, to_stdin=instring.encode("utf8"))

    if runner.stderr:
        raise UserWarning(f"{' '.join(command)} failed:\n{runner.stderr}")

    return runner.stdout.decode("utf8")

run_in_parallel(function, max_workers, file_list, file_sizes, msg_format=_PARA_DEFAULT_MSG_FORMAT, *args, **kwargs)

Run function as many times as there are files in the file_list, in parallel. Each invocation gets one element of the file_list.

Conceptually, it's like function(file) for file in file_list, but in parallel. Uses a ProcessPoolExecutor with max_workers.

Any additional arguments (positional or keyword) given to run_in_parallel, will be passed along to the function.

Parameters:

Name Type Description Default
function Callable

The function to call. The first argument to the function is the file path.

required
max_workers int

How many worker processes to use

required
file_list list[str]

The list of files (full paths)

required
Source code in corpustools/util.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
def run_in_parallel(
    function: Callable[["CorpusPath"], None],
    max_workers: int,
    file_list: list["CorpusPath"],
    file_sizes: list[int],
    msg_format: str = _PARA_DEFAULT_MSG_FORMAT,
    *args: list[Any],
    **kwargs: dict[str, Any],
):
    """Run function as many times as there are files in the `file_list`,
    in parallel. Each invocation gets one element of the `file_list`.

    Conceptually, it's like `function(file) for file in file_list`, but
    in parallel. Uses a ProcessPoolExecutor with `max_workers`.

    Any additional arguments (positional or keyword) given to
    `run_in_parallel`, will be passed along to the `function`.

    Args:
        function (Callable): The function to call. The first argument to
            the function is the file path.
        max_workers (int): How many worker processes to use
        file_list (list[str]): The list of files (full paths)
    """
    total_size = sum(file_sizes)
    nfiles = len(file_list)
    n_failed = 0
    t0 = time.monotonic_ns()
    print(
        f"Processing {nfiles} files ({human_readable_filesize(total_size)}) "
        f"in parallel using {max_workers} workers"
    )

    futures = {}  # future -> (filepath, filesize)

    try:
        with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as pool:
            for file, filesize in zip(file_list, file_sizes, strict=False):
                fut = pool.submit(function, file, *args, **kwargs)
                futures[fut] = (file, filesize)

            completed_bytes = 0

            completed = concurrent.futures.as_completed(futures)
            for i, future in enumerate(completed, start=1):
                (filename, filesize) = futures.pop(future)
                completed_bytes += filesize
                bytes_remaining = total_size - completed_bytes
                secs_passed = (time.monotonic_ns() - t0) / 1_000_000_000
                bytes_processed_per_sec = completed_bytes / secs_passed

                # anders: this is so crude as to almost be pointless
                # but -- due to the way it works, it at least gives more of
                # an upper bound than a lower bound quite quickly into the
                # processing, which at least is something
                # -> because in the beginning, bytes_processed_per_sec doesn't
                # take into account that there are other processes also
                # working, which means bytes_completed is an underestimate on
                # how many bytes of processing has been done in total
                # -> but the more files are completed, the better the estimate
                # is going to be
                est_remaining_seconds = int(bytes_remaining / bytes_processed_per_sec)

                exc = future.exception()
                if exc is None:
                    status = "done"
                else:
                    status = "FAILED"
                    n_failed += 1

                msg = msg_format.format(
                    filename=filename,
                    file_number=i,
                    nfiles=nfiles,
                    bytes_processed=human_readable_filesize(completed_bytes),
                    bytes_total=human_readable_filesize(total_size),
                    processing_speed=human_readable_filesize(bytes_processed_per_sec),
                    timeleft=human_readable_timespan(est_remaining_seconds),
                    status=status,
                )
                print(msg)
                if exc is not None:
                    print(exc)
                    print(traceback.format_exc())
    except concurrent.futures.process.BrokenProcessPool:
        n_remaining = len(futures)
        n_done = nfiles - n_remaining - n_failed
        print("error: Processing was terminated unexpectedly!")
        print(f"{n_done} files were completed, {n_failed} files failed, and ")
        print(f"{n_remaining} didn't start processing, and still remains")
    except KeyboardInterrupt:
        n_remaining = len(futures)
        n_done = nfiles - n_remaining - n_failed
        print("Cancelled by user")
        print(f"{n_done} files were completed, {n_failed} files failed, and ")
        print(f"{n_remaining} didn't start processing, and still remains")
    else:
        n_ok = nfiles - n_failed
        print(f"all done. {n_ok} files ok, {n_failed} failed")

sanity_check(program_list)

Look for programs and files that are needed to do the analysis.

If they don't exist, raise an exception.

Source code in corpustools/util.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def sanity_check(program_list):
    """Look for programs and files that are needed to do the analysis.

    If they don't exist, raise an exception.
    """
    if "GTHOME" not in os.environ:
        raise SetupError(
            "You have to set the environment variable GTHOME "
            "to your checkout of langtech/trunk!"
        )
    for program in program_list:
        if executable_in_path(program) is False:
            raise ExecutableMissingError(
                f"Please install {program}, can not continue without it."
            )

sort_by_value(table, reverse=False)

Sort the table by value.

Parameters:

Name Type Description Default
table dict

the dictionary that should be sorted.

required
reverse bool

whether or not to sort in reverse

False

Returns:

Type Description
dict

sorted by value.

Source code in corpustools/util.py
86
87
88
89
90
91
92
93
94
95
96
def sort_by_value(table, reverse=False):
    """Sort the table by value.

    Args:
        table (dict): the dictionary that should be sorted.
        reverse (bool): whether or not to sort in reverse

    Returns:
        (dict): sorted by value.
    """
    return sorted(table.items(), key=operator.itemgetter(1), reverse=reverse)