"""Explorer for Polona corpus.
Finds files and text regions containing target words
and generates dataframes for use in plotter.
"""
#!/usr/bin/env python
# coding: utf-8
import json
import logging
from multiprocessing import Pool, cpu_count
from pathlib import Path
import morfeusz2
import pandas as pd
import regex as re
from tqdm import tqdm
ncore = cpu_count()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
)
logger = logging.getLogger(__name__)
[docs]
class PolonaExplorer:
"""Explorer the polona2 corpus in METS/MODS format."""
def __init__(
self,
targetwords: list,
data_path: str,
out_path: str,
metadata_file_path: str,
part: str = "region",
) -> None:
self.word_targets = targetwords
self.part = part
full_word_list = []
self.data_path = Path(data_path)
self.out_path = Path(out_path)
self.metadata_file = Path(metadata_file_path)
self.all_word_targets = {}
polish_generate = morfeusz2.Morfeusz()
for base_word in targetwords:
forms = polish_generate.generate(base_word)
sub_words = []
for elem in forms:
sub_words.append(elem[0])
full_word_list.append(elem[0])
self.all_word_targets[base_word] = (
re.compile(
r"(?<![\p{L}\p{N}])(?:"
+ "|".join(map(re.escape, sub_words))
+ r")(?![\p{L}\p{N}])"
),
re.compile(
r"<pc:TextEquiv\b[^>]*>\s*<pc:Unicode>"
r"(?=(?:(?!</pc:Unicode>).)*?(?<![\p{L}\p{N}])(?:"
+ "|".join(map(re.escape, sub_words))
+ r")(?![\p{L}\p{N}]))"
r"(?P<text>(?:(?!</pc:Unicode>).)*)"
r"</pc:Unicode>\s*</pc:TextEquiv>\s*</pc:TextRegion>",
re.IGNORECASE | re.DOTALL,
),
)
self.searchString = "|".join(full_word_list)
logger.debug(
f"""Got word list: {self.word_targets}.
\n\t Searching for: {self.searchString}
"""
)
with open(
self.out_path / "parameters.json", "w", encoding="utf8"
) as parameter_file:
data = {"word_list": self.word_targets, "search_string": self.searchString}
json.dump(data, parameter_file, ensure_ascii=False)
def _find_text(self, file) -> Path | None:
"""Find files with any of the target words."""
with open(file, "r") as f:
content = f.read()
if re.search(self.searchString, content):
return file
else:
return None
def _find_pattern(self, file) -> dict:
"""Find Text Regions using target words in PAGE XML formated files."""
with open(file, "r") as f:
content = f.read()
results = {"file": file.stem}
for base_word, reg_pattern in self.all_word_targets.items():
matches = [m.group("text") for m in reg_pattern[1].finditer(content)]
if len(matches) > 0:
results.update({base_word: matches})
else:
results.update({base_word: None})
return results
def _find_words(self, file) -> dict:
"""Find target words usage statistics."""
with open(file, "r") as f:
content = f.read()
results = {"file": file.stem}
for key, reg_pattern in self.all_word_targets.items():
key_results = re.findall(reg_pattern[0], content.lower())
results.update({key: key_results})
return results
[docs]
def get_file_stats(self) -> None:
"""Generate target words usage corpora.
Generates one file (part=page) with all files
containing at least one target word, or two files (part=region)
files with number of found words or extracted text regions
containing at least one of the target words.
"""
text_files = list(self.data_path.glob("**/OCR-D-TXT/*"))
if self.part == "page":
with Pool(ncore) as pool:
containsAnyWords = list(
tqdm(pool.imap(self._find_text, text_files), total=len(text_files))
)
self.contains_words = [x for x in containsAnyWords if x]
with open(
self.out_path / "files_with_target_words.csv",
"w",
) as file:
data_to_write = "\n".join([x.as_posix() for x in self.contains_words])
file.write(data_to_write)
elif self.part == "region":
with Pool(ncore) as pool:
containsSpecificWords = list(
tqdm(pool.imap(self._find_words, text_files), total=len(text_files))
)
ocr_files = list(self.data_path.glob("**/OCR-D-OCR/*"))
with Pool(ncore) as pool:
textparts = list(
tqdm(pool.imap(self._find_pattern, ocr_files), total=len(ocr_files))
)
self.word_surroundings = [x for x in textparts if x]
with open(self.out_path / "word_stats.json", "w") as file:
json.dump([x for x in containsSpecificWords if x], file)
with open(
self.out_path / "word_surroundings.json",
"w",
) as file:
json.dump([x for x in textparts if x], file)
def _create_df(self, file: Path) -> pd.DataFrame:
"""Generate dataframe with text content and metadata.
The ID is given by the original identifier from the polona2 archive.
"""
with open(file, "r") as datafile:
text = "".join(datafile.readlines())
temp_df = pd.DataFrame([text], columns=["text"])
temp_df.insert(0, "id", file.stem.split("_")[1])
temp_df.insert(0, "page", int(file.stem.split("_")[2].split("-")[-1]))
return temp_df
[docs]
def generate_dataframe(self) -> Path:
"""Generate dataframe of all found page texts.
Uses metadata information to include publication date, title,
place and more for the found periodicals. ID denotes the original
identifier from the polona2 archive. Fragments contains the
text data identified to contain fitting text by the original archive.
"""
if self.part == "page":
with Pool(ncore) as pool:
df_list = list(
tqdm(
pool.imap(self._create_df, self.contains_words),
total=len(self.contains_words),
)
)
df = pd.concat(df_list)
lentext = df.text.apply(lambda x: len(x.split()))
df.insert(0, "textlength", lentext)
logger.debug(
f"""
Collected {df.shape[0]} texts from pages with fitting text.\n
\tReducing metadata to used IDs.
"""
)
elif self.part == "region":
df_in = (
pd.DataFrame(self.word_surroundings)
.dropna(subset=self.word_targets, how="all")
.reset_index(drop=True)
)
entries = []
for word in self.word_targets:
temp_df = df_in[["file", word]].dropna()
temp_df = temp_df.explode(word)
for idx, row in temp_df.iterrows():
entries.append((row["file"], row[word]))
df = pd.DataFrame(entries, columns=["file", "text"])
doc_id = df.file.apply(lambda x: x.split("_")[1])
doc_page = df.file.apply(lambda x: int(x.split("_")[2].split("-")[-1]))
df.insert(0, "page", doc_page)
df.insert(0, "id", doc_id)
df = df.drop("file", axis=1)
lentext = df.text.apply(lambda x: len(x.split()))
df.insert(0, "textlength", lentext)
logger.debug(
f"""
Collected {df.shape[0]} texts from regions with fitting text.\n
\tReducing metadata to used IDs.
"""
)
metadata = pd.read_json(self.metadata_file, lines=True)
used_ids = df["id"].unique()
reduced_metadata = metadata.query("id.isin(@used_ids)").reset_index(drop=True)
reduced_metadata.insert(0, "year", reduced_metadata.date.dt.year)
assert len(used_ids) == reduced_metadata.shape[0], (
"Found different ids in metadata file."
)
df_merged = df.merge(reduced_metadata, on="id", how="inner")
df_merged.to_json(
self.out_path / f"polona_matchin_text_{self.part}.json",
orient="records",
lines=True,
date_format="iso",
)
return self.out_path / f"polona_matching_text_{self.part}.json"