Spaces:
Sleeping
Sleeping
File size: 6,556 Bytes
258e5d5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
import fileinput
import io
import json
import os
import pathlib
import sys
from functools import wraps
from typing import List, Union
# import google.auth
class Logger(object):
def __init__(self, filename="Default.log"):
self.terminal = sys.stdout
self.log = open(filename, "a")
def write(self, message):
self.terminal.write(message)
self.log.write(message)
def flush(self):
pass
def log_to_file(file_name="Default.log"):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Save the current stdout and stderr
original_stdout = sys.stdout
original_stderr = sys.stderr
# Redirect stdout and stderr to the log file
logger = Logger(file_name)
sys.stdout = logger
sys.stderr = logger
try:
# Call the original function
result = func(*args, **kwargs)
return result
finally:
# Reset stdout and stderr
sys.stdout = original_stdout
sys.stderr = original_stderr
return wrapper
return decorator
# doesn't work directly, need to setup Google Cloud credentials if not present
# src: https://developers.google.com/drive/api/guides/manage-downloads#download-content
# def download_file(real_file_id):
# # dataset link: https://drive.google.com/drive/folders/1KD7v4eW2ZKQ0Re_6lXRuaaVswvS3IFIh?usp=sharing
# """Downloads a file
# Args:
# real_file_id: ID of the file to download
# Returns : IO object with location.
#
# Load pre-authorized user credentials from the environment.
# TODO(developer) - See https://developers.google.com/identity
# for guides on implementing OAuth2 for the application.
# """
# creds, _ = google.auth.default()
#
# try:
# # create drive api client
# service = build("drive", "v3", credentials=creds)
#
# file_id = real_file_id
#
# # pylint: disable=maybe-no-member
# request = service.files().get_media(fileId=file_id)
# file = io.BytesIO()
# downloader = MediaIoBaseDownload(file, request)
# done = False
# while done is False:
# status, done = downloader.next_chunk()
# print(f"Download {int(status.progress() * 100)}.")
#
# except HttpError as error:
# print(f"An error occurred: {error}")
# file = None
#
# return file.getvalue()
def read_from_all_files(all_files_to_read: List[Union[str, pathlib.Path]], batch_size: int = 1000,
batch_num: int = None,
encoding: str = "utf-8",
reading_only_specific_files: List[str] = None) -> List:
"""
bas basic generator that yields a batch of lines, leverages in-built fileinput for reading all files and using same file object
:param all_files_to_read: list of file paths, str or Path
:param batch_size: the number of maximum lines to yield
:param batch_num: the number of batches to yield and then stop, added later for testing
:return: List of text lines
"""
print("\n=========\nReading dataset\n=============")
counter = 0
if reading_only_specific_files:
for idx, f_name in enumerate(all_files_to_read):
if not all(x in f_name for x in reading_only_specific_files):
all_files_to_read.pop(idx)
print(f"\nCount of files to read...{len(all_files_to_read)}")
all_files_to_read = sorted(all_files_to_read)
with fileinput.input(files=all_files_to_read,
encoding=encoding) as f: # in-built fileinput to read all files, efficient, handles things internally
batch = []
for line in f:
# print(f"file number: {f.fileno()}")
# print(f"file-line number: {f.filelineno()}")
# print(line)
if line != '\n':
batch.append(line)
if len(batch) == batch_size:
counter += 1
yield batch
if batch_num and counter == batch_num:
break
batch = []
if batch:
yield batch
print(f"\nFinal counter value: {counter}")
print("\n=========\nReading dataset done\n=============")
def read_chunks_from_file(file_path, chunk_size=4 * 1024 * 1024, encoding="utf-8"):
"""
helper function to yield chunk_size of data read from the file_path given
"""
file_path = os.path.abspath(file_path)
with open(file_path, 'r', encoding=encoding) as f:
for chunk in iter(lambda: f.read(chunk_size), b''):
yield chunk
def get_all_text_dataset(path: str | pathlib.Path, file_type=".txt") -> List:
"""
Helper function to get all .txt files' given a path or root directory, uses glob recursively to find the given format files
:param path: str or Path object, root directory for a dataset
:param file_type: format of files to get
:return: list of path of all files of the specified format
"""
files = []
# first convert json data to text and then process text
convert_json_data_to_text_and_process_text(dir_path="./web-scrapper",
file_type=".json",
output_file_path="./dataset/combined_from_crawler-json.txt")
for txt_file in pathlib.Path(path).rglob('*' + file_type):
files.append(txt_file)
return files
# def get_data_batch(all_files, chunk_size=100 * 1024 * 1024, formats=".txt"):
# for file in all_files:
# yield from read_chunks_from_file(file)
def convert_json_data_to_text_and_process_text(dir_path, file_type=".json", output_file_path="crawler_data.txt"):
"""
Helper function to convert JSON data to text and then process the text
"""
with open(output_file_path, "w", encoding="utf-8") as f_out:
for json_file in pathlib.Path(dir_path).rglob('*' + file_type):
with open(json_file, "r", encoding="utf-8") as f:
data = json.load(f)
for item in data:
f_out.write(" ".join(item["text"]) + "\n")
if __name__ == "__main__":
download_file(real_file_id="1KD7v4eW2ZKQ0Re_6lXRuaaVswvS3IFIh")
|