|
|
|
|
|
|
|
|
|
|
|
import os |
|
import json |
|
import time |
|
import munch |
|
import requests |
|
import argparse |
|
import pandas as pd |
|
from tqdm import tqdm |
|
from datetime import date |
|
from loguru import logger |
|
from random import randint |
|
from bs4 import BeautifulSoup |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_web_component(): |
|
with open("website_format.json") as json_file: |
|
website_format = json.load(json_file) |
|
website_format = munch.munchify(website_format) |
|
return website_format.USER_AGENTS |
|
|
|
|
|
|
|
|
|
|
|
def get_web_content(url, USER_AGENTS): |
|
random_agent = USER_AGENTS[randint(0, len(USER_AGENTS) - 1)] |
|
headers = {"User-Agent": random_agent} |
|
req = requests.get(url, headers=headers) |
|
req.encoding = req.apparent_encoding |
|
soup = BeautifulSoup(req.text, features="lxml") |
|
return soup |
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_pages_urls(url, USER_AGENTS): |
|
soup = get_web_content(url, USER_AGENTS) |
|
|
|
first_td = soup.find("td", class_="middletext", id="toppages") |
|
nav_pages_links = first_td.find_all("a", class_="navPages") |
|
href_links = [link["href"] for link in nav_pages_links] |
|
next_50_link = href_links[-3] |
|
href_links.insert(0, url) |
|
return href_links, next_50_link |
|
|
|
|
|
|
|
|
|
def get_post_urls(url, USER_AGENTS): |
|
soup = get_web_content(url, USER_AGENTS) |
|
|
|
links_elements = soup.select("td.windowbg span a") |
|
links = [link["href"] for link in links_elements] |
|
|
|
|
|
|
|
|
|
|
|
|
|
return links |
|
|
|
|
|
|
|
|
|
def loop_through_source_url(USER_AGENTS, url, num_of_pages): |
|
pages_urls = [] |
|
counter = 0 |
|
while len(pages_urls) != num_of_pages: |
|
href_links, next_50_link = get_pages_urls(url, USER_AGENTS) |
|
pages_urls.extend(href_links) |
|
pages_urls = list(dict.fromkeys(pages_urls)) |
|
url = next_50_link |
|
|
|
if counter == 15: |
|
break |
|
counter += 1 |
|
return pages_urls |
|
|
|
|
|
|
|
|
|
|
|
def loop_through_pages(USER_AGENTS, pages_urls): |
|
post_urls = [] |
|
for url in tqdm(pages_urls): |
|
herf_links = get_post_urls(url, USER_AGENTS) |
|
post_urls.extend(herf_links) |
|
post_urls = list(dict.fromkeys(post_urls)) |
|
return post_urls |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def loop_through_posts(USER_AGENTS, post_urls, board, num_of_posts_start): |
|
for post_url in tqdm(post_urls[num_of_posts_start:]): |
|
time.sleep(1) |
|
try: |
|
soup = get_web_content(post_url, USER_AGENTS) |
|
middletext = soup.find("td", class_="middletext") |
|
nav_pages_links = middletext.find_all("a", class_="navPages") |
|
|
|
df = pd.DataFrame( |
|
columns=[ |
|
"timestamp", |
|
"last_edit", |
|
"author", |
|
"post", |
|
"topic", |
|
"attachment", |
|
"link", |
|
"original_info", |
|
] |
|
) |
|
|
|
if len(nav_pages_links) > 0: |
|
href_links = [link["href"] for link in nav_pages_links[:-1]] |
|
href_links.insert(0, post_url) |
|
for url in href_links: |
|
df = read_subject_page(USER_AGENTS, url, df) |
|
|
|
else: |
|
df = read_subject_page(USER_AGENTS, post_url, df) |
|
|
|
topic_id = post_url.split("topic=")[1] |
|
df.to_csv(f"data/{board}/data_{topic_id}.csv", mode="w", index=False) |
|
except Exception as e: |
|
print(e) |
|
with open(f"data/{board}/error_log.txt", "a") as f: |
|
f.write(f"{post_url}\n -- {e}\n") |
|
continue |
|
|
|
|
|
|
|
|
|
def read_subject_page(USER_AGENTS, post_url, df): |
|
time.sleep(1) |
|
soup = get_web_content(post_url, USER_AGENTS) |
|
form_tag = soup.find("form", id="quickModForm") |
|
table_tag = form_tag.find("table", class_="bordercolor") |
|
td_tag = table_tag.find_all("td", class_="windowbg") |
|
td_tag.extend(table_tag.find_all("td", class_="windowbg2")) |
|
|
|
for comment in tqdm(td_tag): |
|
res = extract_useful_content_windowbg(comment) |
|
if res is not None: |
|
df = pd.concat([df, pd.DataFrame([res])]) |
|
|
|
return df |
|
|
|
|
|
|
|
|
|
|
|
def extract_useful_content_windowbg(tr_tag): |
|
""" |
|
Timestamp of the post (ex: September 11, 2023, 07:49:45 AM; but if you want just 11/09/2023 is enough) |
|
Author of the post (ex: SupermanBitcoin) |
|
The post itself |
|
|
|
The topic where the post was posted (ex: [INFO - DISCUSSION] Security Budget Problem) eg. Whats your thoughts: Next-Gen Bitcoin Mining Machine With 1X Efficiency Rating. |
|
Number of characters in the post --> so this is an integer |
|
Does the post contain at least one attachment (image, video etc.) --> if yes put '1' in the column, if no, just put '0' |
|
Does the post contain at least one link --> if yes put '1' in the column, if no, just put '0' |
|
""" |
|
headerandpost = tr_tag.find("td", class_="td_headerandpost") |
|
if not headerandpost: |
|
return None |
|
|
|
timestamp = headerandpost.find("div", class_="smalltext").get_text() |
|
timestamps = timestamp.split("Last edit: ") |
|
timestamp = timestamps[0].strip() |
|
last_edit = None |
|
if len(timestamps) > 1: |
|
if "Today " in timestamps[1]: |
|
print(timestamps[1]) |
|
last_edit = ( |
|
date.today().strftime("%B %d, %Y") |
|
+ ", " |
|
+ timestamps[1].split("by")[0].split("Today at")[1].strip() |
|
) |
|
last_edit = timestamps[1].split("by")[0].strip() |
|
|
|
|
|
|
|
|
|
poster_info_tag = tr_tag.find("td", class_="poster_info") |
|
anchor_tag = poster_info_tag.find("a") |
|
author = "Anonymous" if anchor_tag is None else anchor_tag.get_text() |
|
|
|
|
|
post = tr_tag.find("div", class_="post").get_text() |
|
|
|
|
|
topic = headerandpost.find("div", class_="subject").get_text() |
|
|
|
|
|
attachments = headerandpost.find("img") |
|
attachment = 0 if attachments is None else 1 |
|
|
|
|
|
links = headerandpost.find("a", class_="ul") |
|
link = 0 if links is None else 1 |
|
|
|
|
|
original_info = headerandpost |
|
|
|
return { |
|
"timestamp": timestamp, |
|
"last_edit": last_edit, |
|
"author": author.strip(), |
|
"post": post.strip(), |
|
"topic": topic.strip(), |
|
"attachment": attachment, |
|
"link": link, |
|
"original_info": original_info, |
|
} |
|
|
|
|
|
|
|
|
|
def save_page_file(data, file_name): |
|
with open(file_name, "w") as filehandle: |
|
for listitem in data: |
|
filehandle.write("%s\\n" % listitem) |
|
|
|
|
|
|
|
|
|
def parse_args(): |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("url", help="url for the extraction") |
|
parser.add_argument("--update", help="extract updated data", action="store_true") |
|
parser.add_argument("--board", help="board name") |
|
parser.add_argument( |
|
"--num_of_pages", "-pages", help="number of pages to extract", type=int |
|
) |
|
parser.add_argument( |
|
"--num_of_posts_start", |
|
"-posts", |
|
help="the number of posts start to extract", |
|
type=int, |
|
default=0, |
|
) |
|
return vars(parser.parse_args()) |
|
|
|
|
|
|
|
|
|
|
|
def main(url, update, board, num_of_pages, num_of_posts_start): |
|
USER_AGENTS = get_web_component() |
|
|
|
os.makedirs(f"data/{board}/", exist_ok=True) |
|
pages_file_path = f"data/{board}/pages_urls.txt" |
|
post_file_path = f"data/{board}/post_urls.txt" |
|
|
|
if update: |
|
if os.path.exists(pages_file_path): |
|
os.remove(pages_file_path) |
|
if os.path.exists(post_file_path): |
|
os.remove(post_file_path) |
|
|
|
if not os.path.exists(pages_file_path): |
|
pages_urls = loop_through_source_url(USER_AGENTS, url, num_of_pages) |
|
save_page_file(pages_urls, pages_file_path) |
|
|
|
with open(pages_file_path, "r") as filehandle: |
|
pages_urls = [ |
|
current_place.rstrip() for current_place in filehandle.readlines() |
|
] |
|
|
|
if not os.path.exists(post_file_path): |
|
post_urls = loop_through_pages(USER_AGENTS, pages_urls) |
|
save_page_file(post_urls, post_file_path) |
|
|
|
with open(post_file_path, "r") as filehandle: |
|
post_urls = [current_place.rstrip() for current_place in filehandle.readlines()] |
|
|
|
loop_through_posts(USER_AGENTS, post_urls, board, num_of_posts_start) |
|
|
|
|
|
if __name__ == "__main__": |
|
main(**parse_args()) |
|
|