Spaces:
Sleeping
Sleeping
| import scrapy | |
| from scrapy.crawler import CrawlerProcess | |
| from datetime import datetime | |
| import re | |
| from supabase import create_client | |
| import os | |
| class DosenSpider(scrapy.Spider): | |
| name = 'dosen_spider' | |
| start_urls = ['https://sipeg.pnp.ac.id/'] | |
| custom_settings = { | |
| 'DOWNLOAD_DELAY': 1, | |
| 'USER_AGENT': 'PNPBot/1.0', | |
| 'ROBOTSTXT_OBEY': True, | |
| 'LOG_LEVEL': 'INFO', | |
| 'CONCURRENT_REQUESTS': 1, | |
| 'HTTPCACHE_ENABLED': False, | |
| 'RETRY_TIMES': 3 | |
| } | |
| def __init__(self, *args, **kwargs): | |
| super(DosenSpider, self).__init__(*args, **kwargs) | |
| # Initialize Supabase client | |
| self.supabase = create_client( | |
| os.environ.get("NEXT_PUBLIC_SUPABASE_URL"), | |
| os.environ.get("SUPABASE_SERVICE_KEY") | |
| ) | |
| self.storage_bucket = os.environ.get("NEXT_PUBLIC_SUPABASE_STORAGE_BUCKET") | |
| self.collected_data = [] | |
| def parse(self, response): | |
| # Mengekstrak menu utama dan submenu | |
| main_menu_items = response.css('li.level1') | |
| for menu_item in main_menu_items: | |
| menu_title = menu_item.css('span.bg::text').get('').strip() | |
| main_link = menu_item.css('a::attr(href)').get() | |
| if main_link: | |
| main_link = response.urljoin(main_link) | |
| # Follow link menu utama | |
| yield scrapy.Request( | |
| url=main_link, | |
| callback=self.parse_page, | |
| meta={'page_title': menu_title, 'page_number': 1} | |
| ) | |
| # Cek submenu | |
| submenus = menu_item.css('li.level2') | |
| for submenu in submenus: | |
| submenu_title = submenu.css('span.bg::text').get('').strip() | |
| submenu_link = submenu.css('a::attr(href)').get() | |
| if submenu_link: | |
| submenu_link = response.urljoin(submenu_link) | |
| # Follow link submenu | |
| yield scrapy.Request( | |
| url=submenu_link, | |
| callback=self.parse_page, | |
| meta={'page_title': submenu_title, 'page_number': 1} | |
| ) | |
| def parse_page(self, response): | |
| page_title = response.meta.get('page_title', '') | |
| page_number = response.meta.get('page_number', 1) | |
| # Cek pesan "Data belum tersedia" | |
| page_text = ' '.join(response.css('body ::text').getall()).lower() | |
| unavailable_messages = [ | |
| 'data staf pengajar belum tersedia', | |
| 'data staf administrasi belum tersedia', | |
| 'data staf teknisi belum tersedia' | |
| ] | |
| if any(msg in page_text for msg in unavailable_messages): | |
| self.logger.info(f"Data tidak tersedia pada halaman: {response.url}") | |
| return | |
| # Cek tabel dalam halaman | |
| tables = response.css('table.table-landscape, table.table, table.table-bordered') | |
| if tables: | |
| for table in tables: | |
| # Ambil header tabel untuk menentukan jenis tabel | |
| headers = [h.strip() for h in table.css('th::text').getall()] | |
| # Tentukan jenis tabel berdasarkan header | |
| if 'Jabatan' in headers and 'Pejabat' in headers: | |
| yield from self.extract_officials_table(table, page_title) | |
| elif 'Nama' in headers and 'NIP' in headers: | |
| # Tentukan jenis staf berdasarkan judul halaman | |
| staff_type = self.determine_simple_staff_type(page_title) | |
| yield from self.extract_staff_table(table, page_title, staff_type, page_number) | |
| else: | |
| self.logger.info(f"No tables found on page: {response.url}") | |
| # Improved pagination handling | |
| current_url = response.url | |
| base_url = current_url.split('?')[0] if '?' in current_url else current_url | |
| # Extract p value from current URL if it exists | |
| current_p = 0 | |
| if 'p=' in current_url: | |
| try: | |
| current_p = int(current_url.split('p=')[1].split('&')[0]) | |
| except (ValueError, IndexError): | |
| current_p = 0 | |
| # Determine items per page based on staff type | |
| staff_type = self.determine_simple_staff_type(page_title) | |
| if staff_type == 'staff_pengajar': | |
| items_per_page = 30 | |
| elif staff_type in ['staff_administrasi', 'staff_teknisi']: | |
| items_per_page = 25 | |
| else: | |
| items_per_page = 0 # No pagination for jabatan | |
| # First try to get the Next link using XPath | |
| next_page = None | |
| next_link = response.xpath('//span[@class="table-link"]/a[contains(text(), "Next")]/@href').get() | |
| if next_link: | |
| next_page = response.urljoin(next_link) | |
| elif current_p >= 0 and items_per_page > 0: | |
| next_p = items_per_page if current_p == 0 else current_p + items_per_page | |
| next_page = f"{base_url}?p={next_p}" | |
| self.logger.info(f"Constructed next page URL with p parameter: {next_page}") | |
| # Fallback to other pagination methods if specific method failed | |
| if not next_page: | |
| pagination_xpath_patterns = [ | |
| '//ul[contains(@class, "pagination")]/li/a[contains(text(), "Next")]/@href', | |
| '//ul[contains(@class, "pagination")]/li/a[contains(text(), "»")]/@href', | |
| f'//ul[contains(@class, "pagination")]/li/a[contains(text(), "{page_number + 1}")]/@href', | |
| '//a[@class="next page-numbers"]/@href', | |
| ] | |
| for xpath in pagination_xpath_patterns: | |
| next_page_link = response.xpath(xpath).get() | |
| if next_page_link: | |
| next_page = response.urljoin(next_page_link) | |
| self.logger.info(f"Found next page link using XPath: {next_page}") | |
| break | |
| # Generic parameter detection as last resort | |
| if not next_page: | |
| if 'page=' in current_url: | |
| next_page = current_url.replace(f'page={page_number}', f'page={page_number + 1}') | |
| elif 'p=' in current_url and 'p=' not in next_page: | |
| next_page = current_url.replace(f'p={current_p}', f'p={current_p + items_per_page}') | |
| elif 'halaman=' in current_url: | |
| next_page = current_url.replace(f'halaman={page_number}', f'halaman={page_number + 1}') | |
| elif 'page/' in current_url: | |
| next_page = current_url.replace(f'page/{page_number}', f'page/{page_number + 1}') | |
| if next_page: | |
| next_page_number = page_number + 1 | |
| if 'p=' in next_page: | |
| try: | |
| p_value = int(next_page.split('p=')[1].split('&')[0]) | |
| next_page_number = (p_value // items_per_page) + 1 | |
| except (ValueError, IndexError): | |
| pass | |
| self.logger.info(f"Following to next page: {next_page} (Page {next_page_number})") | |
| yield scrapy.Request( | |
| url=next_page, | |
| callback=self.parse_page, | |
| meta={'page_title': page_title, 'page_number': next_page_number} | |
| ) | |
| def determine_simple_staff_type(self, page_title): | |
| """Menentukan jenis staf berdasarkan judul halaman""" | |
| page_title_lower = page_title.lower() | |
| if any(word in page_title_lower for word in ['dosen', 'pengajar', 'akademik', 'jurusan']): | |
| return 'staff_pengajar' | |
| elif any(word in page_title_lower for word in ['administrasi', 'admin', 'tata usaha', 'pegawai']): | |
| return 'staff_administrasi' | |
| elif any(word in page_title_lower for word in ['teknisi', 'lab', 'teknik', 'laboratorium']): | |
| return 'staff_teknisi' | |
| return 'staff_lainnya' | |
| def extract_officials_table(self, table, page_title): | |
| rows = table.css('tr') | |
| for row in rows: | |
| row_html = row.get() | |
| period_match = re.search(r'<!--\s*<td[^>]*>(.*?)</td>\s*-->', row_html) | |
| period = period_match.group(1).strip() if period_match else "" | |
| cells = row.css('td') | |
| if len(cells) < 3: | |
| continue | |
| number = cells[0].css('::text').get('').strip() | |
| position = cells[1].css('::text').get('').strip() | |
| official = cells[2].css('::text').get('').strip() | |
| item = { | |
| 'halaman': page_title, | |
| 'tipe': 'jabatan', | |
| 'nomor': number, | |
| 'jabatan': position, | |
| 'pejabat': official, | |
| 'periode': period | |
| } | |
| self.collected_data.append(item) | |
| yield item | |
| def extract_staff_table(self, table, page_title, staff_type, page_number): | |
| rows = table.css('tr') | |
| rows = rows[1:] if len(rows) > 1 else [] | |
| for row in rows: | |
| cells = row.css('td') | |
| if len(cells) < 3: | |
| continue | |
| number = cells[0].css('::text').get('').strip() if len(cells) > 0 else "" | |
| name_cell = cells[1] if len(cells) > 1 else None | |
| name = "" | |
| if name_cell: | |
| name_link = name_cell.css('a::text').get() | |
| name = name_link.strip() if name_link else name_cell.css('::text').get('').strip() | |
| detail_url = name_cell.css('a::attr(href)').get() | |
| nip = cells[2].css('::text').get('').strip() if len(cells) > 2 else "" | |
| department = cells[3].css('::text').get('').strip() if len(cells) > 3 else "" | |
| if not name and not nip: | |
| continue | |
| item = { | |
| 'halaman': page_title, | |
| 'tipe': staff_type, | |
| 'halaman_ke': page_number, | |
| 'nomor': number, | |
| 'nama': name, | |
| 'nip': nip, | |
| 'jurusan': department, | |
| 'detail': detail_url | |
| } | |
| self.collected_data.append(item) | |
| yield item | |
| def closed(self, reason): | |
| """Called when spider closes - formats data and uploads to Supabase""" | |
| # Generate text content | |
| text_content = self.generate_text_output() | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"data_dosen_{timestamp}.txt" | |
| # Upload to Supabase | |
| try: | |
| # Upload new file | |
| res = self.supabase.storage.from_(self.storage_bucket).upload( | |
| path=filename, | |
| file=text_content.encode('utf-8'), | |
| file_options={"content-type": "text/plain"}, | |
| ) | |
| if res: | |
| self.logger.info(f"Successfully uploaded {filename} to Supabase storage") | |
| else: | |
| self.logger.error(f"Failed to upload {filename} to Supabase") | |
| except Exception as e: | |
| self.logger.error(f"Error uploading to Supabase: {str(e)}") | |
| def generate_text_output(self): | |
| output = [] | |
| output.append(f"# Data Dosen dan Staff PNP\n") | |
| output.append(f"Diperbarui pada: {datetime.now().strftime('%d %B %Y %H:%M')}\n\n") | |
| grouped = {} | |
| for item in self.collected_data: | |
| tipe = item.get('tipe', 'lainnya') | |
| grouped.setdefault(tipe, []).append(item) | |
| section_titles = { | |
| 'jabatan': 'Daftar Jabatan Struktural', | |
| 'staff_pengajar': 'Daftar Dosen dan Pengajar', | |
| 'staff_administrasi': 'Daftar Staff Administrasi', | |
| 'staff_teknisi': 'Daftar Staff Teknisi', | |
| 'staff_lainnya': 'Daftar Staff Lainnya' | |
| } | |
| for tipe, items in grouped.items(): | |
| title = section_titles.get(tipe, tipe.capitalize()) | |
| output.append(f"# {title}\n") | |
| output.append(f"Jumlah data: {len(items)}\n\n") | |
| for item in items: | |
| if tipe == 'jabatan': | |
| paragraph = f"{item['pejabat']} menjabat sebagai {item['jabatan']}." | |
| if item.get('periode'): | |
| paragraph += f" Masa jabatan berlangsung selama {item['periode']}." | |
| else: | |
| paragraph = f"{item['nama']} adalah staf dengan NIP {item['nip']}." | |
| if item.get('jurusan'): | |
| paragraph += f" Ia bertugas di {item['jurusan']}." | |
| if item.get('detail'): | |
| paragraph += f" Informasi lebih lengkap tersedia di {item['detail']}." | |
| output.append(paragraph + "\n\n") | |
| return ''.join(output) | |
| if __name__ == '__main__': | |
| process = CrawlerProcess() | |
| process.crawl(DosenSpider) | |
| process.start() |