Spaces:
Runtime error
Runtime error
"""Mbox parser. | |
Contains simple parser for mbox files. | |
""" | |
from pathlib import Path | |
from typing import Any, Dict, List | |
from gpt_index.readers.file.base_parser import BaseParser | |
class MboxParser(BaseParser): | |
"""Mbox parser. | |
Extract messages from mailbox files. | |
Returns string including date, subject, sender, receiver and | |
content for each message. | |
""" | |
DEFAULT_MESSAGE_FORMAT: str = ( | |
"Date: {_date}\n" | |
"From: {_from}\n" | |
"To: {_to}\n" | |
"Subject: {_subject}\n" | |
"Content: {_content}" | |
) | |
def __init__( | |
self, | |
*args: Any, | |
max_count: int = 0, | |
message_format: str = DEFAULT_MESSAGE_FORMAT, | |
**kwargs: Any | |
) -> None: | |
"""Init params.""" | |
super().__init__(*args, **kwargs) | |
self.max_count = max_count | |
self.message_format = message_format | |
def _init_parser(self) -> Dict: | |
"""Initialize parser.""" | |
try: | |
from bs4 import BeautifulSoup # noqa: F401 | |
except ImportError: | |
raise ImportError( | |
"`beautifulsoup4` package not found: `pip install beautifulsoup4`" | |
) | |
return {} | |
def parse_file(self, filepath: Path, errors: str = "ignore") -> List[str]: | |
"""Parse file into string.""" | |
# Import required libraries | |
import mailbox | |
from email.parser import BytesParser | |
from email.policy import default | |
from bs4 import BeautifulSoup | |
i = 0 | |
results: List[str] = [] | |
# Load file using mailbox | |
bytes_parser = BytesParser(policy=default).parse | |
mbox = mailbox.mbox(filepath, factory=bytes_parser) # type: ignore | |
# Iterate through all messages | |
for _, _msg in enumerate(mbox): | |
msg: mailbox.mboxMessage = _msg | |
# Parse multipart messages | |
if msg.is_multipart(): | |
for part in msg.walk(): | |
ctype = part.get_content_type() | |
cdispo = str(part.get("Content-Disposition")) | |
if ctype == "text/plain" and "attachment" not in cdispo: | |
content = part.get_payload(decode=True) # decode | |
break | |
# Get plain message payload for non-multipart messages | |
else: | |
content = msg.get_payload(decode=True) | |
# Parse message HTML content and remove unneeded whitespace | |
soup = BeautifulSoup(content) | |
stripped_content = " ".join(soup.get_text().split()) | |
# Format message to include date, sender, receiver and subject | |
msg_string = self.message_format.format( | |
_date=msg["date"], | |
_from=msg["from"], | |
_to=msg["to"], | |
_subject=msg["subject"], | |
_content=stripped_content, | |
) | |
# Add message string to results | |
results.append(msg_string) | |
# Increment counter and return if max count is met | |
i += 1 | |
if self.max_count > 0 and i >= self.max_count: | |
break | |
return results | |