Source code for polonaexplorer.explorer

"""Explorer for Polona corpus.

Finds files and text regions containing target words
and generates dataframes for use in plotter.
"""
#!/usr/bin/env python
# coding: utf-8

import json
import logging
from multiprocessing import Pool, cpu_count
from pathlib import Path

import morfeusz2
import pandas as pd
import regex as re
from tqdm import tqdm

ncore = cpu_count()

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler()],
)

logger = logging.getLogger(__name__)


[docs] class PolonaExplorer: """Explorer the polona2 corpus in METS/MODS format.""" def __init__( self, targetwords: list, data_path: str, out_path: str, metadata_file_path: str, part: str = "region", ) -> None: self.word_targets = targetwords self.part = part full_word_list = [] self.data_path = Path(data_path) self.out_path = Path(out_path) self.metadata_file = Path(metadata_file_path) self.all_word_targets = {} polish_generate = morfeusz2.Morfeusz() for base_word in targetwords: forms = polish_generate.generate(base_word) sub_words = [] for elem in forms: sub_words.append(elem[0]) full_word_list.append(elem[0]) self.all_word_targets[base_word] = ( re.compile( r"(?<![\p{L}\p{N}])(?:" + "|".join(map(re.escape, sub_words)) + r")(?![\p{L}\p{N}])" ), re.compile( r"<pc:TextEquiv\b[^>]*>\s*<pc:Unicode>" r"(?=(?:(?!</pc:Unicode>).)*?(?<![\p{L}\p{N}])(?:" + "|".join(map(re.escape, sub_words)) + r")(?![\p{L}\p{N}]))" r"(?P<text>(?:(?!</pc:Unicode>).)*)" r"</pc:Unicode>\s*</pc:TextEquiv>\s*</pc:TextRegion>", re.IGNORECASE | re.DOTALL, ), ) self.searchString = "|".join(full_word_list) logger.debug( f"""Got word list: {self.word_targets}. \n\t Searching for: {self.searchString} """ ) with open( self.out_path / "parameters.json", "w", encoding="utf8" ) as parameter_file: data = {"word_list": self.word_targets, "search_string": self.searchString} json.dump(data, parameter_file, ensure_ascii=False) def _find_text(self, file) -> Path | None: """Find files with any of the target words.""" with open(file, "r") as f: content = f.read() if re.search(self.searchString, content): return file else: return None def _find_pattern(self, file) -> dict: """Find Text Regions using target words in PAGE XML formated files.""" with open(file, "r") as f: content = f.read() results = {"file": file.stem} for base_word, reg_pattern in self.all_word_targets.items(): matches = [m.group("text") for m in reg_pattern[1].finditer(content)] if len(matches) > 0: results.update({base_word: matches}) else: results.update({base_word: None}) return results def _find_words(self, file) -> dict: """Find target words usage statistics.""" with open(file, "r") as f: content = f.read() results = {"file": file.stem} for key, reg_pattern in self.all_word_targets.items(): key_results = re.findall(reg_pattern[0], content.lower()) results.update({key: key_results}) return results
[docs] def get_file_stats(self) -> None: """Generate target words usage corpora. Generates one file (part=page) with all files containing at least one target word, or two files (part=region) files with number of found words or extracted text regions containing at least one of the target words. """ text_files = list(self.data_path.glob("**/OCR-D-TXT/*")) if self.part == "page": with Pool(ncore) as pool: containsAnyWords = list( tqdm(pool.imap(self._find_text, text_files), total=len(text_files)) ) self.contains_words = [x for x in containsAnyWords if x] with open( self.out_path / "files_with_target_words.csv", "w", ) as file: data_to_write = "\n".join([x.as_posix() for x in self.contains_words]) file.write(data_to_write) elif self.part == "region": with Pool(ncore) as pool: containsSpecificWords = list( tqdm(pool.imap(self._find_words, text_files), total=len(text_files)) ) ocr_files = list(self.data_path.glob("**/OCR-D-OCR/*")) with Pool(ncore) as pool: textparts = list( tqdm(pool.imap(self._find_pattern, ocr_files), total=len(ocr_files)) ) self.word_surroundings = [x for x in textparts if x] with open(self.out_path / "word_stats.json", "w") as file: json.dump([x for x in containsSpecificWords if x], file) with open( self.out_path / "word_surroundings.json", "w", ) as file: json.dump([x for x in textparts if x], file)
def _create_df(self, file: Path) -> pd.DataFrame: """Generate dataframe with text content and metadata. The ID is given by the original identifier from the polona2 archive. """ with open(file, "r") as datafile: text = "".join(datafile.readlines()) temp_df = pd.DataFrame([text], columns=["text"]) temp_df.insert(0, "id", file.stem.split("_")[1]) temp_df.insert(0, "page", int(file.stem.split("_")[2].split("-")[-1])) return temp_df
[docs] def generate_dataframe(self) -> Path: """Generate dataframe of all found page texts. Uses metadata information to include publication date, title, place and more for the found periodicals. ID denotes the original identifier from the polona2 archive. Fragments contains the text data identified to contain fitting text by the original archive. """ if self.part == "page": with Pool(ncore) as pool: df_list = list( tqdm( pool.imap(self._create_df, self.contains_words), total=len(self.contains_words), ) ) df = pd.concat(df_list) lentext = df.text.apply(lambda x: len(x.split())) df.insert(0, "textlength", lentext) logger.debug( f""" Collected {df.shape[0]} texts from pages with fitting text.\n \tReducing metadata to used IDs. """ ) elif self.part == "region": df_in = ( pd.DataFrame(self.word_surroundings) .dropna(subset=self.word_targets, how="all") .reset_index(drop=True) ) entries = [] for word in self.word_targets: temp_df = df_in[["file", word]].dropna() temp_df = temp_df.explode(word) for idx, row in temp_df.iterrows(): entries.append((row["file"], row[word])) df = pd.DataFrame(entries, columns=["file", "text"]) doc_id = df.file.apply(lambda x: x.split("_")[1]) doc_page = df.file.apply(lambda x: int(x.split("_")[2].split("-")[-1])) df.insert(0, "page", doc_page) df.insert(0, "id", doc_id) df = df.drop("file", axis=1) lentext = df.text.apply(lambda x: len(x.split())) df.insert(0, "textlength", lentext) logger.debug( f""" Collected {df.shape[0]} texts from regions with fitting text.\n \tReducing metadata to used IDs. """ ) metadata = pd.read_json(self.metadata_file, lines=True) used_ids = df["id"].unique() reduced_metadata = metadata.query("id.isin(@used_ids)").reset_index(drop=True) reduced_metadata.insert(0, "year", reduced_metadata.date.dt.year) assert len(used_ids) == reduced_metadata.shape[0], ( "Found different ids in metadata file." ) df_merged = df.merge(reduced_metadata, on="id", how="inner") df_merged.to_json( self.out_path / f"polona_matchin_text_{self.part}.json", orient="records", lines=True, date_format="iso", ) return self.out_path / f"polona_matching_text_{self.part}.json"