helloproject-ai/text_processing/hdf2sql_parallel.py

72 lines
2.9 KiB
Python

from typing import Any
from h5py import File
from io import BytesIO
from os import cpu_count
from os.path import join, pardir, dirname
from pandas import DataFrame
from sqlite3 import connect
from concurrent.futures import ProcessPoolExecutor as PPE
from multiprocessing import Manager
from sys import path, stdout, stderr
from shutil import move
from time import time
from tqdm import tqdm
path.append(pardir)
from settings import datadir
filename = join('/mnt/shm/blog_text.hdf5')
temporary_dir = '/mnt/shm/'
with open(file=filename, mode='rb') as f:
file_bio = BytesIO(initial_bytes=f.read())
def extract(key: str, file: str) -> tuple[list[list[Any]], list[list[Any]]]:
with File(name=file, mode='r') as hdf:
print(f'start: {key}')
start = time()
article_table = []
comment_table = []
for blog_entry in hdf[key].keys():
blog_article = hdf[key][blog_entry]['article'][()].decode('utf-8')
entry_theme, entry_title, entry_date = list(hdf[key][blog_entry]['article'].attrs.values())
article_table.append([blog_entry, key, entry_theme, entry_title, entry_date, blog_article])
for comment_entry, comment_text in hdf[key][blog_entry]['comments_dataset'].items():
comment_article = comment_text[()].decode('utf-8')
comment_blog_id, comment_user_id, comment_nickname, comment_title, comment_date = \
list(comment_text.attrs.values())
comment_table.append(
[comment_entry, comment_blog_id, comment_user_id, comment_nickname, comment_title, comment_date,
comment_article])
# break
print(f'end: {key} at {int(time() - start)}s')
return article_table, comment_table
results = []
with PPE(max_workers=cpu_count()) as executor, File(name=file_bio, mode='r') as hdf5:
for order, blog_group in enumerate(hdf5.keys(), start=0):
# print(blog_group)
lock = Manager().Lock()
results.append(executor.submit(extract, blog_group, filename))
article_tables = []
comment_tables = []
for job in results:
a, b = job.result()
article_tables.extend(a)
comment_tables.extend(b)
with connect(database=join(temporary_dir, 'tmp.sqlite'), timeout=3600) as connector:
article_dataframe = DataFrame(data=article_tables, columns=['index', 'group', 'theme', 'title', 'date', 'article'])
# article_dataframe.set_index('index')
article_dataframe = article_dataframe.astype({"index":int})
article_dataframe.to_sql(name='blog', con=connector, if_exists='replace',index=False)
comment_dataframe = DataFrame(data=comment_tables,
columns=['index', 'blog_id', 'user_id', 'nickname', 'title', 'date', 'article'])
comment_dataframe.set_index('index')
comment_dataframe.to_sql(name='comment', con=connector, if_exists='replace')
move(join(temporary_dir, 'tmp.sqlite'), join(datadir(), 'blog_post.sqlite'))