import scrapy from scrapy.crawler import CrawlerProcess from datetime import datetime import re import os from supabase import create_client, Client import html SUPABASE_URL = os.environ.get("NEXT_PUBLIC_SUPABASE_URL") SUPABASE_KEY = os.environ.get("SUPABASE_SERVICE_KEY") SUPABASE_BUCKET = os.environ.get("NEXT_PUBLIC_SUPABASE_STORAGE_BUCKET") supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) class PNPContentSpider(scrapy.Spider): name = 'pnp_content_spider' start_urls = ['https://www.pnp.ac.id','https://penerimaan.pnp.ac.id'] excluded_subdomains = [ 'akt.pnp.ac.id', 'an.pnp.ac.id', 'bing.pnp.ac.id', 'elektro.pnp.ac.id', 'me.pnp.ac.id', 'sipil.pnp.ac.id', 'ti.pnp.ac.id' ] custom_settings = { 'DOWNLOAD_DELAY': 1, 'RETRY_TIMES': 3, 'HTTPCACHE_ENABLED': False, 'ROBOTSTXT_OBEY': True, 'CONCURRENT_REQUESTS': 1, 'RETRY_ENABLED': True, 'USER_AGENT': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } def clean_text(self, text: str) -> str: """Clean and normalize text content""" if not text: return "" # Decode HTML entities text = html.unescape(text) # Remove extra whitespace and normalize text = ' '.join(text.split()) # Fix common encoding issues text = text.replace('“', '"').replace('â€', '"').replace('’', "'") text = text.replace('â€"', '—').replace('â€"', '–') return text.strip() def format_paragraph(self, text: str) -> str: text = self.clean_text(text) sentences = re.split(r'(?<=[.!?]) +', text) paragraph = '' word_count = 0 for sentence in sentences: words = sentence.split() word_count += len(words) paragraph += sentence + ' ' if 50 <= word_count <= 150: break return paragraph.strip() def parse(self, response): self.logger.info(f"Processing main page: {response.url}") nav_items = response.css('ul.wp-block-navigation__container > li.wp-block-navigation-item') for item in nav_items: main_title = item.css('a.wp-block-navigation-item__content span.wp-block-navigation-item__label::text').get() if not main_title: main_title = item.css('a.wp-block-navigation-item__content::text').get('').strip() main_link = item.css('a.wp-block-navigation-item__content::attr(href)').get() if main_link and not main_link.startswith('#'): main_link = response.urljoin(main_link) if "jurusan" in main_link.lower(): continue yield scrapy.Request(main_link, callback=self.parse_content, meta={'page_title': main_title, 'menu_path': main_title}) submenus = item.css('ul.wp-block-navigation__submenu-container > li.wp-block-navigation-item') for submenu in submenus: submenu_title = submenu.css('a.wp-block-navigation-item__content span.wp-block-navigation-item__label::text').get() if not submenu_title: submenu_title = submenu.css('a.wp-block-navigation-item__content::text').get('').strip() submenu_link = submenu.css('a.wp-block-navigation-item__content::attr(href)').get() if submenu_link and not submenu_link.startswith('#'): submenu_link = response.urljoin(submenu_link) if "jurusan" in submenu_link.lower(): continue menu_path = f"{main_title} > {submenu_title}" if main_title else submenu_title yield scrapy.Request(submenu_link, callback=self.parse_content, meta={'page_title': submenu_title, 'menu_path': menu_path}) def extract_leadership_info(self, response): """Extract leadership information from the special leadership page""" self.logger.info("Extracting leadership information from special page") leaders_data = [] # Try multiple table selectors based on the HTML structure shown tables = response.css('table, .wp-block-table table, .entry-content table, tbody') if tables: # Process each table for table_idx, table in enumerate(tables): self.logger.info(f"Processing table {table_idx + 1}") rows = table.css('tr') if not rows: continue leader_info = {} position_title = "" # Look for position title (like "DIREKTUR") title_elements = table.css('strong, .position-title, th') for title_elem in title_elements: title_text = self.clean_text(' '.join(title_elem.css('*::text').getall())) if any(pos in title_text.upper() for pos in ['DIREKTUR', 'WAKIL DIREKTUR', 'KETUA', 'SEKRETARIS']): position_title = title_text break # Extract key-value pairs from table rows for row in rows: cells = row.css('td, th') if len(cells) >= 3: # Format: Label | : | Value (3 columns) key = self.clean_text(' '.join(cells[0].css('*::text').getall())) separator = self.clean_text(' '.join(cells[1].css('*::text').getall())) value = self.clean_text(' '.join(cells[2].css('*::text').getall())) if key and value and separator == ":": leader_info[key] = value elif len(cells) == 2: # Format: Label | Value (2 columns) key = self.clean_text(' '.join(cells[0].css('*::text').getall())) value = self.clean_text(' '.join(cells[1].css('*::text').getall())) if key and value and key != value: # Skip if key contains colon (likely "Label:") clean_key = key.replace(':', '').strip() leader_info[clean_key] = value # Add position title if found if position_title: leader_info['Posisi'] = position_title # If we found structured data, add it if leader_info: leaders_data.append(leader_info) self.logger.info(f"Extracted leader data: {list(leader_info.keys())}") # Fallback: Extract from general content structure if not leaders_data: self.logger.info("No table data found, trying general content extraction") # Look for profile sections profile_sections = response.css('.wp-block-group, .entry-content > div, .profile-section') for section in profile_sections: section_text = self.clean_text(' '.join(section.css('*::text').getall())) # Check if this section contains leadership info if any(keyword in section_text.lower() for keyword in ['direktur', 'wakil direktur', 'dr.', 's.t.', 'm.kom', 'nidn']): # Try to extract structured info from the text leader_info = {'description': section_text} # Try to extract specific details using regex name_match = re.search(r'(Dr\.|Ir\.|Prof\.)?\s*([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*),?\s*(S\.T\.|M\.Kom|M\.T\.|S\.E\.|M\.M\.)*', section_text) if name_match: leader_info['Nama'] = name_match.group(0).strip() nidn_match = re.search(r'NIDN[:\s]*(\d+)', section_text) if nidn_match: leader_info['NIDN'] = nidn_match.group(1) leaders_data.append(leader_info) return leaders_data def format_leadership_content(self, leaders_data): """Format leadership data into readable content""" formatted_content = [] for idx, leader in enumerate(leaders_data, 1): if isinstance(leader, dict): if 'description' in leader and len(leader) == 1: # Simple description format content = f"## Pimpinan {idx}\n\n{leader['description']}" else: # Structured data format position = leader.get("Posisi", f"Pimpinan {idx}") content = f"## {position}\n\n" # Format key information in a logical order ordered_keys = ['Nama', 'NIDN', 'Jabatan Akademik', 'Jurusan', 'Program Studi'] # Add ordered information first for key in ordered_keys: if key in leader: content += f"**{key}**: {leader[key]}\n\n" # Add remaining information for key, value in leader.items(): if key not in ordered_keys and key not in ['Posisi', 'description']: content += f"**{key}**: {value}\n\n" # Add description if exists if 'description' in leader: content += f"\n{leader['description']}\n\n" formatted_content.append(content.strip()) return formatted_content def parse_content(self, response): page_title = response.meta.get('page_title', 'Unknown Page') menu_path = response.meta.get('menu_path', '') if page_title == 'Unknown Page': page_title = self.clean_text(response.css('h1.entry-title::text, h1.page-title::text').get('')) self.logger.info(f"Extracting content from: {response.url} ({page_title})") paragraphs = [] # 🔹 Special case: halaman pimpinan PNP if ("pimpinan" in response.url.lower() or "pimpinan" in page_title.lower()) and "pnp.ac.id" in response.url: self.logger.info("Detected leadership page - using special extraction") leaders_data = self.extract_leadership_info(response) self.logger.info(f"Found {len(leaders_data)} leadership entries") if leaders_data: formatted_leaders = self.format_leadership_content(leaders_data) paragraphs = formatted_leaders # Also extract any additional content from the page additional_content = self.extract_general_content(response) if additional_content: paragraphs.extend(["## Informasi Tambahan"] + additional_content) else: # Fallback to general content extraction self.logger.warning("Leadership extraction failed, falling back to general extraction") paragraphs = self.extract_general_content(response) else: # 🔹 Normal content extraction paragraphs = self.extract_general_content(response) # Create final content content_text = self.create_final_content(page_title, response.url, paragraphs) # Add table data if any (but skip for leadership pages to avoid duplication) if not (("pimpinan" in response.url.lower() or "pimpinan" in page_title.lower()) and "pnp.ac.id" in response.url): table_content = self.extract_table_data(response) if table_content: content_text += "\n\n## Data Tabel\n\n" + table_content # Upload to Supabase filename = self.upload_content(page_title, content_text) yield { 'url': response.url, 'title': page_title, 'menu_path': menu_path, 'uploaded_as': filename, 'timestamp': datetime.now().isoformat(), 'content_length': len(content_text), 'leadership_page': ("pimpinan" in response.url.lower() or "pimpinan" in page_title.lower()) and "pnp.ac.id" in response.url } # Continue with additional scraping if needed self.process_additional_links(response, menu_path) def extract_general_content(self, response): """Extract general content from the page""" paragraphs = [] content_selectors = [ 'div.entry-content', 'article.post', 'main.site-main', 'div.content', 'div.main-content', 'div#content', 'div.page-content' ] for selector in content_selectors: content_area = response.css(selector) if content_area: elems = content_area.css('p, h1, h2, h3, h4, h5, h6, li, div.wp-block-group') for elem in elems: text = self.clean_text(' '.join(elem.css('*::text').getall())) if text and len(text.split()) >= 5: # Add links if any links = elem.css('a::attr(href)').getall() for link in links: if link and not link.startswith('#'): text += f" (Link: {response.urljoin(link)})" paragraphs.append(text) if paragraphs: break # Fallback: extract from body if not paragraphs: body_texts = response.css('body *::text').getall() combined_text = self.clean_text(' '.join(body_texts)) if combined_text: # Split into meaningful chunks sentences = re.split(r'(?<=[.!?])\s+', combined_text) current_para = "" for sentence in sentences: if len((current_para + " " + sentence).split()) <= 50: current_para += " " + sentence else: if current_para.strip(): paragraphs.append(current_para.strip()) current_para = sentence if current_para.strip(): paragraphs.append(current_para.strip()) # Format paragraphs formatted_paragraphs = [] for para in paragraphs: if len(para.split()) >= 10: formatted_paragraphs.append(self.format_paragraph(para)) return formatted_paragraphs def extract_table_data(self, response): """Extract and format table data""" tables = response.css('table') table_output = [] for table_idx, table in enumerate(tables): table_rows = [] for row in table.css('tr'): cells = row.css('th, td') row_data = [] for cell in cells: cell_text = self.clean_text(' '.join(cell.css('*::text').getall())) if link := cell.css('a::attr(href)').get(): cell_text += f" (Link: {response.urljoin(link)})" if cell_text: row_data.append(cell_text) if row_data: table_rows.append(" | ".join(row_data)) if table_rows: table_output.append(f"### Tabel {table_idx + 1}\n\n" + "\n".join(table_rows)) return "\n\n".join(table_output) def create_final_content(self, page_title, url, paragraphs): """Create the final formatted content""" return f"""# {page_title} **Tanggal**: {datetime.now().strftime('%d %B %Y')} **URL**: {url} {chr(10).join(paragraphs)}""" def upload_content(self, page_title, content_text): """Upload content to Supabase""" safe_title = re.sub(r'[^\w\s-]', '', page_title).strip().lower() safe_title = re.sub(r'[-\s]+', '-', safe_title) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{safe_title}_{timestamp}.txt" try: supabase.storage.from_(SUPABASE_BUCKET).upload( path=filename, file=content_text.encode('utf-8'), file_options={"content-type": "text/plain; charset=utf-8"} ) self.logger.info(f"Uploaded {filename} successfully.") return filename except Exception as e: self.logger.error(f"Upload error for {filename}: {str(e)}") return f"failed_{filename}" def process_additional_links(self, response, menu_path): """Process additional links from the same domain""" current_domain = response.url.split('//')[1].split('/')[0] if 'pnp.ac.id' not in current_domain: header_links = [] for sel in ['header a::attr(href)', 'nav a::attr(href)', '.navbar a::attr(href)']: header_links.extend(response.css(sel).getall()) for link in set(link for link in header_links if link and not link.startswith(('#', 'javascript:'))): full_link = response.urljoin(link) if current_domain in full_link: yield scrapy.Request( url=full_link, callback=self.parse_content, meta={'page_title': 'Header Link', 'menu_path': f"{menu_path} > Header"} ) if __name__ == '__main__': process = CrawlerProcess({ 'USER_AGENT': 'PNPBot/1.0', 'DOWNLOAD_DELAY': 2, 'ROBOTSTXT_OBEY': True, 'LOG_LEVEL': 'INFO', 'CONCURRENT_REQUESTS': 1, 'DOWNLOAD_TIMEOUT': 100, 'RETRY_TIMES': 3, 'HTTPCACHE_ENABLED': False, 'FEED_EXPORT_ENCODING': 'utf-8' }) process.crawl(PNPContentSpider) process.start()