Skip to content

analyser

Classes and functions to do syntactic analysis on GiellaLT xml docs.

analyse(xml_path)

Analyse a file.

Source code in corpustools/analyser.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def analyse(xml_path: corpuspath.CorpusPath) -> None:
    """Analyse a file."""
    zpipe_path = valid_path(xml_path.lang)
    variant_name = get_modename(xml_path)

    try:
        dependency_analysis(
            xml_path,
            analysed_text=run_external_command(
                command=f"divvun-checker -a {zpipe_path} -n {variant_name}".split(),
                instring=ccatter(xml_path),
            ),
        )
    except (etree.XMLSyntaxError, UserWarning) as error:
        print("Can not parse", xml_path, file=sys.stderr)
        print("The error was:", str(error), file=sys.stderr)

analyse_in_parallel(file_list, pool_size)

Analyse file in parallel.

Source code in corpustools/analyser.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def analyse_in_parallel(file_list: list[corpuspath.CorpusPath], pool_size: int):
    """Analyse file in parallel."""
    print(f"Parallel analysis of {len(file_list)} files with {pool_size} workers")
    # Here we know that we are looking at the .converted file,
    enhanced_file_list: list[tuple[corpuspath.CorpusPath, int]] = [
        (file, os.path.getsize(file.converted)) for file in file_list
    ]

    # sort the file list by size, smallest first
    enhanced_file_list.sort(key=lambda entry: entry[1])

    # and turn the list of 2-tuples [[a, b, c, d], [1, 2, 3, 4]] back to
    # two lists: [a, b, c, d] and [1, 2, 3, 4]
    corpus_paths, file_sizes = zip(*enhanced_file_list, strict=True)
    util.run_in_parallel(
        function=analyse,
        max_workers=pool_size,
        file_list=list(corpus_paths),
        file_sizes=list(file_sizes),
    )

analyse_serially(file_list)

Analyse files one by one.

Source code in corpustools/analyser.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def analyse_serially(file_list: list[corpuspath.CorpusPath]):
    """Analyse files one by one."""
    print(f"Starting the analysis of {len(file_list)} files")

    fileno = 0
    for xml_file in file_list:
        fileno += 1
        # print some ugly banner cos i want to see progress on local
        # batch job
        util.print_frame("*" * 79)
        util.print_frame(
            f"Analysing {xml_file.converted} [{fileno} of {len(file_list)}]"
        )
        util.print_frame("*" * 79)
        analyse(xml_file)

dependency_analysis(path, analysed_text)

Insert dependency analysis into the body.

Source code in corpustools/analyser.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def dependency_analysis(path: corpuspath.CorpusPath, analysed_text: str) -> None:
    """Insert dependency analysis into the body."""
    xml_file = etree.parse(path.converted)
    oldbody = xml_file.find(".//body")

    if oldbody is None:
        raise UserWarning(f"No body found in {path.converted}")

    parent = oldbody.getparent()

    if parent is None:
        raise UserWarning(f"No parent found for body in {path.converted}")

    parent.remove(oldbody)

    body = etree.SubElement(parent, "body")
    dependency = etree.SubElement(body, "dependency")
    dependency.text = str(etree.CDATA(analysed_text))

    with util.ignored(OSError):
        os.makedirs(os.path.dirname(path.analysed))
    with open(path.analysed, "wb") as analysed_stream:
        analysed_stream.write(
            etree.tostring(
                xml_file, xml_declaration=True, encoding="utf8", pretty_print=True
            )
        )

get_modename(path)

Get the modename depending on the CorpusPath

Source code in corpustools/analyser.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def get_modename(path: corpuspath.CorpusPath) -> str:
    """Get the modename depending on the CorpusPath"""

    o_nine = 1909
    thirtynine = 1939
    ninetyfive = 1995
    if path.lang == "mhr":
        year = path.metadata.get_variable("year")
        if year:
            if o_nine < int(year) < thirtynine:
                return "korp-analyser-thirties"
    if path.lang == "mrj":
        year = path.metadata.get_variable("year")
        if year:
            if o_nine < int(year) < thirtynine:
                return "korp-analyser-thirties"
            if thirtynine < int(year) < ninetyfive:
                return "korp-analyser-eighties"

    return "korp-analyser"

main()

Analyse files in the given directories.

Source code in corpustools/analyser.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def main():
    """Analyse files in the given directories."""
    args = parse_options()

    corpuspath_paths = (
        corpuspath.make_corpus_path(xml_file.as_posix())
        for xml_file in corpuspath.collect_files(args.converted_entities, suffix=".xml")
    )
    analysable_paths = [
        path for path in corpuspath_paths if not path.metadata.get_variable("ocr")
    ]

    if args.skip_existing:
        non_skipped_files = [
            analysable_path
            for analysable_path in analysable_paths
            if not analysable_path.analysed.exists()
        ]
        n_skipped_files = len(analysable_paths) - len(non_skipped_files)
        print(
            f"--skip-existing given. Skipping {n_skipped_files} "
            "files that are already analysed"
        )
        if n_skipped_files == len(analysable_paths):
            print("nothing to do, exiting")
            raise SystemExit(0)
        analysable_paths = non_skipped_files

    try:
        if args.serial:
            analyse_serially(analysable_paths)
        else:
            analyse_in_parallel(analysable_paths, args.ncpus)
    except util.ArgumentError as error:
        print(f"Cannot do analysis\n{str(error)}", file=sys.stderr)
        raise SystemExit(1) from error

parse_options()

Parse the given options.

Source code in corpustools/analyser.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def parse_options():
    """Parse the given options."""
    parser = argparse.ArgumentParser(
        parents=[argparse_version.parser], description="Analyse files in parallel."
    )

    parser.add_argument("--ncpus", action=NCpus)
    parser.add_argument(
        "--skip-existing",
        action="store_true",
        help="Skip analysis of files that already are analysed (== already "
        "exist in the analysed/ folder)",
    )
    parser.add_argument(
        "--serial",
        action="store_true",
        help="When this argument is used files will be analysed one by one. "
        "Using --serial takes priority over --ncpus",
    )
    parser.add_argument(
        "converted_entities",
        nargs="+",
        help="converted files or director(y|ies) where the converted files exist",
    )

    return parser.parse_args()

valid_path(lang) cached

Check if resources needed by modes exists.

Parameters:

Name Type Description Default
lang str

the language that modes is asked to serve.

required

Returns:

Type Description
Path

A path to the zpipe file.

Raises:

Type Description
ArgumentError

if no resources are found.

Source code in corpustools/analyser.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@lru_cache(maxsize=None)
def valid_path(lang: str) -> Path:
    """Check if resources needed by modes exists.

    Args:
        lang: the language that modes is asked to serve.

    Returns:
        A path to the zpipe file.

    Raises:
        utils.ArgumentError: if no resources are found.
    """
    archive_name = f"{LANGUAGES.get(lang, lang)}.zpipe"
    for lang_dir in lang_resource_dirs(lang):
        full_path = lang_dir / archive_name
        if full_path.exists():
            return full_path

    raise (util.ArgumentError(f"ERROR: found no resources for {lang}"))