import scrapy from scrapy.crawler import CrawlerProcess import os import re from datetime import datetime from supabase import create_client from io import StringIO class PnpSpider(scrapy.Spider): name = 'pnp_spider' allowed_domains = ['presensi.pnp.ac.id', 'elektro.pnp.ac.id'] start_urls = [ 'https://presensi.pnp.ac.id/', 'https://elektro.pnp.ac.id/jadwal-perkuliahan-jurusan-teknik-elektro/jadwal-perkuliahan-program-studi-teknik-listrik/' ] excluded_departments = ['elektronika', 'telkom', 'listrik'] def __init__(self, *args, **kwargs): super(PnpSpider, self).__init__(*args, **kwargs) # Initialize Supabase client url = os.environ.get("NEXT_PUBLIC_SUPABASE_URL") key = os.environ.get("SUPABASE_SERVICE_KEY") self.supabase = create_client(url, key) self.storage_bucket = os.environ.get("NEXT_PUBLIC_SUPABASE_STORAGE_BUCKET") self.file_buffers = {} # Dictionary to store StringIO objects self.current_date = datetime.now().strftime("%Y-%m-%d") def closed(self, reason): print(f"Spider closing with reason: {reason}") print(f"Uploading {len(self.file_buffers)} files to Supabase...") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") for jurusan_id, buffer in self.file_buffers.items(): filename = f"{jurusan_id}_{timestamp}.txt" content = buffer.getvalue() print(f"Uploading {filename} with content length: {len(content)}") success = self.upload_to_supabase(filename, content) if success: print(f"✅ Successfully uploaded {filename}") else: print(f"❌ Failed to upload {filename}") buffer.close() def upload_to_supabase(self, filename, content): """Upload content directly to Supabase Storage""" try: # Upload new content res = self.supabase.storage.from_(self.storage_bucket).upload( path=filename, file=content.encode('utf-8'), file_options={"content-type": "text/plain"} ) return True except Exception as e: print(f"Upload error: {str(e)}") return False def parse(self, response): if 'elektro.pnp.ac.id' in response.url: jurusan_id = 'teknik_elektro' jurusan_name = 'Jurusan Teknik Elektro' return self.parse_elektro_page(response, jurusan_id, jurusan_name) print("Memulai scraping dari halaman utama...") jurusan_links = set(response.xpath('//article[contains(@class, "section")]//a/@href').getall()) for link in jurusan_links: if any(excluded in link.lower() for excluded in self.excluded_departments): continue jurusan_url = response.urljoin(link) jurusan_id = self.extract_jurusan_id(link) yield scrapy.Request(jurusan_url, callback=self.parse_jurusan, meta={'jurusan_id': jurusan_id}) def parse_elektro_page(self, response, jurusan_id, jurusan_name): if jurusan_id not in self.file_buffers: self.initialize_document_buffer(jurusan_id, jurusan_name) output_buffer = self.file_buffers[jurusan_id] tables = response.xpath('//table') if not tables: return for table_idx, table in enumerate(tables): caption_text = self.get_table_caption(table, table_idx) class_info = self.clean_class_info(caption_text, table) if not class_info: continue self.write_section_header(output_buffer, class_info) days = table.xpath('.//thead//th[@class="xAxis"]/text()').getall() or \ table.xpath('.//thead//th[contains(@class, "xAxis")]/text()').getall() time_slots = table.xpath('.//tbody//th[@class="yAxis"]/text()').getall() or \ table.xpath('.//tbody//th[contains(@class, "yAxis")]/text()').getall() if not days or not time_slots: continue schedule_grid = self.build_schedule_grid(days, time_slots) self.process_table_rows(table, schedule_grid, days, time_slots) self.write_schedule_to_buffer(output_buffer, schedule_grid, days, time_slots) def initialize_document_buffer(self, jurusan_id, jurusan_name): """Initialize a new document with proper title and metadata""" self.file_buffers[jurusan_id] = StringIO() buffer = self.file_buffers[jurusan_id] # Write document title and metadata buffer.write(f"# Jadwal Perkuliahan {jurusan_name}\n\n") buffer.write(f"**Jurusan:** {jurusan_name}\n") buffer.write(f"**Tanggal Update:** {self.current_date}\n") buffer.write(f"**Sumber:** Politeknik Negeri Padang\n\n") buffer.write("---\n\n") def get_table_caption(self, table, table_idx): """Extract and clean table caption text""" caption = table.xpath('.//caption//text()').getall() caption_text = ' '.join(caption).strip() if not caption_text: caption_text = table.xpath('preceding::h2[1]//text()|preceding::h3[1]//text()|preceding::h4[1]//text()').get() caption_text = caption_text.strip() if caption_text else f"Jadwal Kelas {table_idx + 1}" return caption_text def clean_class_info(self, caption_text, table): """Combine and clean class information""" thead_class_info = ' '.join(table.xpath('.//thead/tr[1]//text()').getall()).strip() class_info = f"{caption_text} {thead_class_info}" if thead_class_info else caption_text return re.sub(r'\s+', ' ', class_info).strip() def write_section_header(self, buffer, class_info): """Write a section header for each class schedule""" buffer.write(f"## Jadwal Perkuliahan {class_info}\n\n") buffer.write("Berikut adalah jadwal perkuliahan untuk kelas tersebut, diurutkan berdasarkan hari dan waktu:\n\n") def build_schedule_grid(self, days, time_slots): """Initialize the schedule grid structure""" return {day: {time: 'kosong' for time in time_slots} for day in days} def process_table_rows(self, table, schedule_grid, days, time_slots): """Process table rows respecting rowspans and colspans""" rows = table.xpath('.//tbody/tr[not(contains(@class, "foot"))]') active_rowspans = {} for row_idx, row in enumerate(rows): if row_idx >= len(time_slots): continue current_time = time_slots[row_idx] filled_columns = set() # Apply active rowspans self.apply_active_rowspans(active_rowspans, schedule_grid, days, current_time, filled_columns, row_idx) # Process current row cells cells = row.xpath('./td') col_idx = 0 for cell in cells: while col_idx < len(days) and col_idx in filled_columns: col_idx += 1 if col_idx >= len(days): break cell_content = self.process_cell_content(cell) rowspan = int(cell.xpath('./@rowspan').get() or 1) colspan = int(cell.xpath('./@colspan').get() or 1) self.update_schedule_grid(schedule_grid, days, current_time, col_idx, colspan, cell_content) self.update_active_rowspans(active_rowspans, row_idx, col_idx, colspan, rowspan, cell_content) col_idx += colspan def apply_active_rowspans(self, active_rowspans, schedule_grid, days, current_time, filled_columns, row_idx): """Apply content from cells with rowspan to current row""" rowspans_to_remove = [] for (rs_col_idx, rs_row_start_idx), (rowspan_left, content) in active_rowspans.items(): if rowspan_left > 0 and rs_col_idx < len(days): day = days[rs_col_idx] schedule_grid[day][current_time] = content filled_columns.add(rs_col_idx) active_rowspans[(rs_col_idx, rs_row_start_idx)] = (rowspan_left - 1, content) if rowspan_left - 1 <= 0: rowspans_to_remove.append((rs_col_idx, rs_row_start_idx)) for key in rowspans_to_remove: del active_rowspans[key] def process_cell_content(self, cell): """Extract and clean cell content""" content = ' '.join(cell.xpath('.//text()').getall()).strip() return 'kosong' if not content or content == '---' else content def update_schedule_grid(self, schedule_grid, days, current_time, col_idx, colspan, content): """Update schedule grid with cell content""" for c in range(colspan): current_col_idx = col_idx + c if current_col_idx < len(days): schedule_grid[days[current_col_idx]][current_time] = content def update_active_rowspans(self, active_rowspans, row_idx, col_idx, colspan, rowspan, content): """Track cells with rowspan for future rows""" if rowspan > 1: for c in range(colspan): active_rowspans[(col_idx + c, row_idx)] = (rowspan - 1, content) def format_course_entry(self, time_slots, course_info): """Format a course entry for optimal RAG retrieval""" # Parse course information parts = course_info.split() course_code = parts[0] if parts and len(parts[0]) == 7 and parts[0][:3].isalpha() and parts[0][3:].isdigit() else "" course_name = "" lecturer = "" room = "" # Extract course name, lecturer, and room if "_" in course_info: # Format: COURSE_CODE Course_Name_P Lecturer Room course_parts = course_info.split("_P") if len(course_parts) > 1: course_name = course_parts[0].replace(course_code, "").strip() remaining = course_parts[1].strip().split() lecturer = " ".join(remaining[:-1]) room = remaining[-1] if remaining else "" else: # Alternative format course_name = " ".join(parts[1:-2]) if len(parts) > 3 else course_info.replace(course_code, "").strip() lecturer = parts[-2] if len(parts) > 1 else "" room = parts[-1] if parts else "" # Format time range time_range = self.format_time_range(time_slots) # Create structured information return { "time_range": time_range, "course_code": course_code, "course_name": course_name, "lecturer": lecturer, "room": room } def write_schedule_to_buffer(self, buffer, schedule_grid, days, time_slots): for day in days: current_course = None current_times = [] day_schedule = [] for time_slot in time_slots: course = schedule_grid[day][time_slot] if course == current_course: current_times.append(time_slot) else: if current_course and current_course.lower() != 'kosong': time_range = self.format_time_range(current_times) entry = f"- {day} {time_range} | {current_course}" day_schedule.append(entry) current_course = course current_times = [time_slot] # Tambahkan entri terakhir if current_course and current_course.lower() != 'kosong': time_range = self.format_time_range(current_times) entry = f"- {day} {time_range} | {current_course}" day_schedule.append(entry) # Tulis hasil ke buffer for entry in day_schedule: buffer.write(entry + "\n") buffer.write("\n") # spasi antar hari def format_time_range(self, time_slots): """Format multiple time slots into a readable range""" if len(time_slots) == 1: return time_slots[0] first_start = time_slots[0].split('-')[0].strip() last_end = time_slots[-1].split('-')[-1].strip() return f"{first_start} - {last_end}" def extract_jurusan_id(self, link): match = re.search(r'department\?dep=(\d+)', link) return match.group(1) if match else f"unknown_{hash(link) % 1000}" def parse_jurusan(self, response): jurusan_id = response.meta.get('jurusan_id') jurusan_name = self.extract_title_jurusan_name(response) groups_days_horizontal_link = response.xpath('//td/a[contains(@href, "groups_days_horizontal") and not(contains(@href, "subgroups_days_horizontal"))]/@href').get() if groups_days_horizontal_link: groups_days_horizontal_url = response.urljoin(groups_days_horizontal_link) safe_jurusan_name = re.sub(r'[^\w\-_\. ]', '_', jurusan_name) yield scrapy.Request(groups_days_horizontal_url, callback=self.parse_jadwal, meta={'jurusan_id': safe_jurusan_name, 'jurusan_name': jurusan_name}) def parse_jadwal(self, response): jurusan_id = response.meta.get('jurusan_id') jurusan_name = response.meta.get('jurusan_name') if jurusan_id not in self.file_buffers: self.initialize_document_buffer(jurusan_id, jurusan_name) output_buffer = self.file_buffers[jurusan_id] tables = response.xpath('//table[contains(@id, "table_")]') or response.xpath('//table') for table in tables: caption_text = self.get_table_caption(table, 0) class_info = self.clean_class_info(caption_text, table) if not class_info: continue self.write_section_header(output_buffer, class_info) days = table.xpath('.//thead//th[@class="xAxis"]/text()').getall() time_slots = table.xpath('.//tbody/tr[not(contains(@class, "foot"))]/th[@class="yAxis"]/text()').getall() if not days or not time_slots: continue schedule_grid = self.build_schedule_grid(days, time_slots) self.process_table_rows(table, schedule_grid, days, time_slots) self.write_schedule_to_buffer(output_buffer, schedule_grid, days, time_slots) def extract_title_jurusan_name(self, response): title = response.xpath('//title/text()').get() return title.strip() if title else f"Jurusan_{response.meta.get('jurusan_id')}" if __name__ == "__main__": process = CrawlerProcess(settings={ 'DOWNLOAD_DELAY': 1, 'USER_AGENT': 'PNPBot/1.0', 'ROBOTSTXT_OBEY': True, 'LOG_LEVEL': 'INFO', 'HTTPCACHE_ENABLED': False, 'CONCURRENT_REQUESTS': 1, 'RETRY_TIMES': 3 }) process.crawl(PnpSpider) process.start()