Skip to content

util

Utility functions and classes used by other modules in CorpusTools.

ArgumentError

Bases: Exception

This exception is raised when argument errors occur.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
44
45
class ArgumentError(Exception):
    """This exception is raised when argument errors occur."""

ConversionError

Bases: Exception

Raise this exception when conversions error occur.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
48
49
class ConversionError(Exception):
    """Raise this exception when conversions error occur."""

ExecutableMissingError

Bases: Exception

This exception is raised when wanted executables are missing.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
40
41
class ExecutableMissingError(Exception):
    """This exception is raised when wanted executables are missing."""

ExternalCommandRunner

Class to run external command through subprocess.

Attributes:

Name Type Description
stdout

save the stdout of the command here.

stderr

save the stderr of the command here.

returncode

save the returncode of the command here.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
class ExternalCommandRunner:
    """Class to run external command through subprocess.

    Attributes:
        stdout: save the stdout of the command here.
        stderr: save the stderr of the command here.
        returncode: save the returncode of the command here.
    """

    def __init__(self):
        """Initialise the ExternalCommandRunner class."""
        self.stdout = None
        self.stderr = None
        self.returncode = None

    def run(self, command, cwd=None, to_stdin=None):
        """Run the command, save the result."""
        try:
            subp = subprocess.Popen(
                command,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                cwd=cwd,
            )
        except OSError:
            raise ExecutableMissingError(
                f"Please install {command[0]}, can not continue without it."
            )

        (self.stdout, self.stderr) = subp.communicate(to_stdin)
        self.returncode = subp.returncode

__init__()

Initialise the ExternalCommandRunner class.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
301
302
303
304
305
def __init__(self):
    """Initialise the ExternalCommandRunner class."""
    self.stdout = None
    self.stderr = None
    self.returncode = None

run(command, cwd=None, to_stdin=None)

Run the command, save the result.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def run(self, command, cwd=None, to_stdin=None):
    """Run the command, save the result."""
    try:
        subp = subprocess.Popen(
            command,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            cwd=cwd,
        )
    except OSError:
        raise ExecutableMissingError(
            f"Please install {command[0]}, can not continue without it."
        )

    (self.stdout, self.stderr) = subp.communicate(to_stdin)
    self.returncode = subp.returncode

SetupError

Bases: Exception

This exception is raised when setup is faulty.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
36
37
class SetupError(Exception):
    """This exception is raised when setup is faulty."""

basename_noext(fname, ext)

Get the basename without the extension.

Parameters:

Name Type Description Default
fname str

path to the file.

required
ext str

the extension that should be removed.

required

Returns:

Type Description
str

fname without the extension.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
65
66
67
68
69
70
71
72
73
74
75
def basename_noext(fname, ext):
    """Get the basename without the extension.

    Args:
        fname (str): path to the file.
        ext (str): the extension that should be removed.

    Returns:
        (str): fname without the extension.
    """
    return os.path.basename(fname)[: -len(ext)]

executable_in_path(program)

Check if program is in path.

Parameters:

Name Type Description Default
program str

name of the program

required

Returns:

Type Description
bool

True if program is found, False otherwise.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def executable_in_path(program):
    """Check if program is in path.

    Args:
        program (str): name of the program

    Returns:
        (bool): True if program is found, False otherwise.
    """
    fpath, _ = os.path.split(program)
    if fpath:
        return is_executable(program)
    else:
        return any(
            is_executable(possible_path)
            for possible_path in path_possibilities(program)
        )

get_lang_resource(lang, resource, fallback=None)

Get a language resource.

Parameters:

Name Type Description Default
lang str

the language of the resource.

required
resource str

the resource that is needed.

required
fallback str or None

the fallback resource. Default is None.

None

Returns:

Type Description
str

path to the resource or fallback.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def get_lang_resource(lang, resource, fallback=None):
    """Get a language resource.

    Args:
        lang (str): the language of the resource.
        resource (str): the resource that is needed.
        fallback (str or None): the fallback resource. Default is None.

    Returns:
        (str): path to the resource or fallback.
    """
    path = os.path.join(os.environ["GTHOME"], "langs", lang, resource)
    if os.path.exists(path):
        return path
    else:
        return fallback

get_preprocess_command(lang)

Get the complete proprocess command for lang.

Parameters:

Name Type Description Default
lang str

the language

required

Returns:

Type Description
list[str]

the complete preprocess command.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def get_preprocess_command(lang):
    """Get the complete proprocess command for lang.

    Args:
        lang (str): the language

    Returns:
        (list[str]): the complete preprocess command.
    """
    preprocess_script = os.path.join(os.environ["GTHOME"], "gt/script/preprocess")
    sanity_check([preprocess_script])
    abbr_fb = get_lang_resource("sme", "tools/preprocess/abbr.txt")
    abbr = get_lang_resource(lang, "tools/preprocess/abbr.txt", abbr_fb)
    return [preprocess_script, f"--abbr={abbr}"]

human_readable_filesize(num, suffix='B')

Returns human readable filesize

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
326
327
328
329
330
331
332
333
def human_readable_filesize(num, suffix="B"):
    """Returns human readable filesize"""
    # https://stackoverflow.com/questions/1094841/get-human-readable-version-of-file-size
    for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
        if abs(num) < 1024.0:
            return f"{num:3.1f}{unit}{suffix}"
        num /= 1024.0
    return f"{num:.1f}Yi{suffix}"

ignored(*exceptions)

Ignore exceptions.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
283
284
285
286
287
288
289
@contextmanager
def ignored(*exceptions):
    """Ignore exceptions."""
    try:
        yield
    except exceptions:
        pass

is_executable(fullpath)

Check if the program in fullpath is executable.

Parameters:

Name Type Description Default
fullpath str

the path to the program or script.

required

Returns:

Type Description
bool

True if fullpath contains a executable, False otherwise.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
107
108
109
110
111
112
113
114
115
116
def is_executable(fullpath):
    """Check if the program in fullpath is executable.

    Args:
        fullpath (str): the path to the program or script.

    Returns:
        (bool): True if fullpath contains a executable, False otherwise.
    """
    return os.path.isfile(fullpath) and os.access(fullpath, os.X_OK)

lineno()

Return the current line number in our program.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
204
205
206
def lineno():
    """Return the current line number in our program."""
    return inspect.currentframe().f_back.f_lineno

name_to_unicode(filename)

Turn a filename to a unicode string.

Parameters:

Name Type Description Default
filename str

name of the file

required

Returns:

Type Description
str

A unicode string.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
259
260
261
262
263
264
265
266
267
268
269
270
271
def name_to_unicode(filename):
    """Turn a filename to a unicode string.

    Args:
        filename (str): name of the file

    Returns:
        (str): A unicode string.
    """
    if platform.system() == "Windows":
        return filename
    else:
        return filename.decode("utf-8")

note(msg)

Print msg to stderr.

Parameters:

Name Type Description Default
msg str

the message

required
Source code in /home/anders/projects/CorpusTools/corpustools/util.py
274
275
276
277
278
279
280
def note(msg):
    """Print msg to stderr.

    Args:
        msg (str): the message
    """
    print(msg, file=sys.stderr)

path_possibilities(program)

Check if program is found in $PATH.

Parameters:

Name Type Description Default
program str

name of program of script.

required

Yields:

Type Description
str

possible fullpath to the program

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
119
120
121
122
123
124
125
126
127
128
129
130
131
def path_possibilities(program):
    """Check if program is found in $PATH.

    Args:
        program (str): name of program of script.

    Yields:
        (str): possible fullpath to the program
    """
    return (
        os.path.join(path.strip('"'), program)
        for path in os.environ["PATH"].split(os.pathsep)
    )

print_element(element, level, indent, out)

Format an html document.

This function formats html documents for readability, to see the structure of the given document. It ruins white space in text parts.

Parameters:

Name Type Description Default
element etree._Element

the element to format.

required
level int

indicate at what level this element is.

required
indent int

indicate how many spaces this element should be indented

required
out stream

a buffer where the formatted element is written.

required
Source code in /home/anders/projects/CorpusTools/corpustools/util.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def print_element(element, level, indent, out):
    """Format an html document.

    This function formats html documents for readability, to see
    the structure of the given document. It ruins white space in
    text parts.

    Args:
        element (etree._Element): the element to format.
        level (int): indicate at what level this element is.
        indent (int): indicate how many spaces this element should be indented
        out (stream): a buffer where the formatted element is written.
    """
    tag = element.tag.replace("{http://www.w3.org/1999/xhtml}", "")

    out.write(" " * (level * indent))
    out.write(f"<{tag}")

    for k, v in element.attrib.items():
        out.write(" ")
        if isinstance(k, str):
            out.write(k)
        else:
            out.write(k)
        out.write('="')
        if isinstance(v, str):
            out.write(v)
        else:
            out.write(v)
        out.write('"')
    out.write(">\n")

    if element.text is not None and element.text.strip() != "":
        out.write(" " * ((level + 1) * indent))
        out.write(element.text.strip())
        out.write("\n")

    for child in element:
        print_element(child, level + 1, indent, out)

    out.write(" " * (level * indent))
    out.write(f"</{tag}>\n")

    if level > 0 and element.tail is not None and element.tail.strip() != "":
        for _ in range(0, (level - 1) * indent):
            out.write(" ")
        out.write(element.tail.strip())
        out.write("\n")

print_frame(debug='', *args)

Print debug output.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
52
53
54
55
56
57
58
59
60
61
62
def print_frame(debug="", *args):
    """Print debug output."""
    # 0 represents this line, 1 represents line at caller
    callerframerecord = inspect.stack()[1]
    frame = callerframerecord[0]
    info = inspect.getframeinfo(frame)

    print(info.lineno, info.function, debug, file=sys.stderr, end=" ")
    for arg in args:
        print(arg, file=sys.stderr, end=" ")
    print(file=sys.stderr)

replace_all(replacements, string)

Replace unwanted strings with wanted strings.

Parameters:

Name Type Description Default
replacements list of tuple

unwanted:wanted string pairs.

required
string str

the string where replacements should be done.

required

Returns:

Type Description
str

string with replaced strings.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def replace_all(replacements, string):
    """Replace unwanted strings with wanted strings.

    Args:
        replacements (list of tuple): unwanted:wanted string pairs.
        string (str): the string where replacements should be done.

    Returns:
        (str): string with replaced strings.
    """
    for unwanted, wanted in replacements:
        string = string.replace(unwanted, wanted)

    return string

run_in_parallel(function, max_workers, file_list, msg_format=_PARA_DEFAULT_MSG_FORMAT, *args, **kwargs)

Run function as many times as there are files in the file_list, in parallel. Each invocation gets one element of the file_list.

Conceptually, it's like function(file) for file in file_list, but in parallel. Uses a ProcessPoolExecutor with max_workers.

Any additional arguments (positional or keyword) given to run_in_parallel, will be passed along to the function.

Parameters:

Name Type Description Default
function Callable

The function to call. The first argument to the function is the file path.

required
max_workers int

How many worker processes to use

required
file_list list[str]

The list of files (full paths)

required
Source code in /home/anders/projects/CorpusTools/corpustools/util.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
def run_in_parallel(
    function,
    max_workers,
    file_list,
    msg_format=_PARA_DEFAULT_MSG_FORMAT,
    *args,
    **kwargs,
):
    """Run function as many times as there are files in the `file_list`,
    in parallel. Each invocation gets one element of the `file_list`.

    Conceptually, it's like `function(file) for file in file_list`, but
    in parallel. Uses a ProcessPoolExecutor with `max_workers`.

    Any additional arguments (positional or keyword) given to
    `run_in_parallel`, will be passed along to the `function`.

    Args:
        function (Callable): The function to call. The first argument to
            the function is the file path.
        max_workers (int): How many worker processes to use
        file_list (list[str]): The list of files (full paths)
    """
    file_list = [(file, os.path.getsize(file)) for file in file_list]
    file_list.sort(key=lambda entry: entry[1])
    total_size = sum(filesize for file, filesize in file_list)

    nfiles = len(file_list)
    n_failed = 0
    t0 = time.monotonic_ns()
    print(f"Processing {nfiles} files ({human_readable_filesize(total_size)}) "
          f"in parallel using {max_workers} workers")

    futures = {}  # future -> (filepath, filesize)

    try:
        with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as pool:
            for file, filesize in file_list:
                fut = pool.submit(function, file, *args, **kwargs)
                futures[fut] = (file, filesize)

            completed_bytes = 0

            completed = concurrent.futures.as_completed(futures)
            for i, future in enumerate(completed, start=1):
                (filename, filesize) = futures.pop(future)
                completed_bytes += filesize
                bytes_remaining = total_size - completed_bytes
                secs_passed = (time.monotonic_ns() - t0) / 1_000_000_000
                bytes_processed_per_sec = completed_bytes / secs_passed

                # anders: this is so crude as to almost be pointless
                # but -- due to the way it works, it at least gives more of
                # an upper bound than a lower bound quite quickly into the
                # processing, which at least is something
                # -> because in the beginning, bytes_processed_per_sec doesn't
                # take into account that there are other processes also
                # working, which means bytes_completed is an underestimate on
                # how many bytes of processing has been done in total
                # -> but the more files are completed, the better the estimate
                # is going to be
                est_remaining_seconds = int(bytes_remaining / bytes_processed_per_sec)

                exc = future.exception()
                if exc is None:
                    status = "done"
                else:
                    status = "FAILED"
                    n_failed += 1

                msg = msg_format.format(
                    filename=filename,
                    file_number=i,
                    nfiles=nfiles,
                    bytes_processed=human_readable_filesize(completed_bytes),
                    bytes_total=human_readable_filesize(total_size),
                    processing_speed=human_readable_filesize(bytes_processed_per_sec),
                    timeleft=human_readable_timespan(est_remaining_seconds),
                    status=status,
                )
                print(msg)
                if exc is not None:
                    print(exc)
                    print(traceback.format_exc())
    except concurrent.futures.process.BrokenProcessPool:
        n_remaining = len(futures)
        n_done = nfiles - n_remaining - n_failed
        print("error: Processing was terminated unexpectedly!")
        print(f"{n_done} files were completed, {n_failed} files failed, and ")
        print(f"{n_remaining} didn't start processing, and still remains")
    except KeyboardInterrupt:
        n_remaining = len(futures)
        n_done = nfiles - n_remaining - n_failed
        print("Cancelled by user")
        print(f"{n_done} files were completed, {n_failed} files failed, and ")
        print(f"{n_remaining} didn't start processing, and still remains")
    else:
        n_ok = nfiles - n_failed
        print(f"all done. {n_ok} files ok, {n_failed} failed")

sanity_check(program_list)

Look for programs and files that are needed to do the analysis.

If they don't exist, raise an exception.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def sanity_check(program_list):
    """Look for programs and files that are needed to do the analysis.

    If they don't exist, raise an exception.
    """
    if "GTHOME" not in os.environ:
        raise SetupError(
            "You have to set the environment variable GTHOME "
            "to your checkout of langtech/trunk!"
        )
    for program in program_list:
        if executable_in_path(program) is False:
            raise ExecutableMissingError(
                f"Please install {program}, can not continue without it."
            )

sort_by_value(table, reverse=False)

Sort the table by value.

Parameters:

Name Type Description Default
table dict

the dictionary that should be sorted.

required
reverse bool

whether or not to sort in reverse

False

Returns:

Type Description
dict

sorted by value.

Source code in /home/anders/projects/CorpusTools/corpustools/util.py
78
79
80
81
82
83
84
85
86
87
88
def sort_by_value(table, reverse=False):
    """Sort the table by value.

    Args:
        table (dict): the dictionary that should be sorted.
        reverse (bool): whether or not to sort in reverse

    Returns:
        (dict): sorted by value.
    """
    return sorted(table.items(), key=operator.itemgetter(1), reverse=reverse)