225 lines
7.3 KiB
Python
225 lines
7.3 KiB
Python
#!/usr/bin/env python3
|
|
"""Parse AI_EMAIL / AI_CONTENT pairs from imap check script output."""
|
|
import sys
|
|
import json
|
|
import re
|
|
|
|
# Senders to exclude
|
|
BLOCKLIST = [
|
|
'googlenews-noreply@google.com',
|
|
]
|
|
|
|
# URL patterns to skip (ads, tracking, social, email)
|
|
URL_SKIP = re.compile(
|
|
r'unsubscribe|mailto|twitter\.com|instagram\.com|facebook\.com|'
|
|
r'youtube\.com/unsubscribe|youtube\.com/channel|'
|
|
r'utm_source=dlvr|utm_medium=email|'
|
|
r'genstore|typeform|pigment\.com|maton\.ai|'
|
|
r'youtu\.be|medium\.com/@|linkedin\.com/posts|'
|
|
r'cdn-cgi|imageproxy',
|
|
re.IGNORECASE
|
|
)
|
|
|
|
def decode_subject(subject):
|
|
try:
|
|
from email.header import decode_header
|
|
parts = decode_header(subject)
|
|
decoded = ''
|
|
for part, charset in parts:
|
|
if isinstance(part, bytes):
|
|
decoded += part.decode(charset or 'utf-8', errors='replace')
|
|
else:
|
|
decoded += part
|
|
return decoded.strip()
|
|
except Exception:
|
|
return subject.strip()
|
|
|
|
def decode_qp(content):
|
|
"""Decode quoted-printable content before URL extraction."""
|
|
# First decode soft line breaks (the main issue) - must be first!
|
|
# Match = followed by newline (any type)
|
|
content = re.sub(r'=\r?\n', '', content)
|
|
content = re.sub(r'=20', '', content) # Remove =20 (space) encoding
|
|
|
|
# More aggressive: remove any = followed by lowercase letter and space
|
|
content = re.sub(r'=[a-z] ', '', content)
|
|
content = re.sub(r'=[a-z]$', '', content, flags=re.MULTILINE)
|
|
|
|
def qp_decode(m):
|
|
try:
|
|
return bytes.fromhex(m.group(1)).decode('utf-8', errors='replace')
|
|
except Exception:
|
|
return m.group(0)
|
|
|
|
# Decode quoted-printable hex codes
|
|
content = re.sub(r'=([0-9A-Fa-f]{2})', qp_decode, content)
|
|
|
|
# Clean up any remaining = in URLs
|
|
content = content.replace('=', '')
|
|
|
|
return content
|
|
|
|
def extract_urls(content):
|
|
"""Extract clean article URLs from email content (after full QP decoding)."""
|
|
# Decode QP FIRST - this is the key fix
|
|
content = decode_qp(content)
|
|
|
|
# Also extract markdown-style links: [text](https://...)
|
|
markdown_urls = re.findall(r'\[([^\]]+)\]\((https?://[^\s"<>)\]\']+)\)', content)
|
|
|
|
# Extract regular URLs
|
|
urls = re.findall(r'https?://[^\s"<>)\]\']+', content)
|
|
|
|
seen = set()
|
|
clean = []
|
|
|
|
# First add markdown URLs (they tend to be cleaner)
|
|
for text, url in markdown_urls:
|
|
if url not in seen and not URL_SKIP.search(url):
|
|
# Clean tracking
|
|
url = re.sub(r'[?&]utm_[^&]+', '', url)
|
|
url = re.sub(r'[?&]_bhlid=\w+', '', url)
|
|
url = re.sub(r'[?&]jwt_token=\w+', '', url)
|
|
url = re.sub(r'[?&]ref=[^&]+', '', url)
|
|
url = url.rstrip('.,;)\'"')
|
|
if len(url) > 15 and not any(x in url.lower() for x in ['.jpg', '.png', '.gif', '.jpeg', '.svg', '/cdn-cgi/']):
|
|
seen.add(url)
|
|
clean.append(url)
|
|
|
|
# Then add regular URLs
|
|
for url in urls:
|
|
url = url.rstrip('.,;)\'"')
|
|
|
|
# Skip tracking-heavy URLs
|
|
if URL_SKIP.search(url):
|
|
continue
|
|
|
|
# Clean up common tracking garbage
|
|
url = re.sub(r'[?&]utm_[^&]+', '', url)
|
|
url = re.sub(r'[?&]_bhlid=\w+', '', url)
|
|
url = re.sub(r'[?&]jwt_token=\w+', '', url)
|
|
url = re.sub(r'[?&]ref=[^&]+', '', url)
|
|
url = url.rstrip('.,;)\'"')
|
|
|
|
# Must be reasonably long to be a real article
|
|
if len(url) > 15 and url not in seen:
|
|
# Also skip image/video URLs
|
|
if any(x in url.lower() for x in ['.jpg', '.png', '.gif', '.jpeg', '.svg', '/image/', '/cdn-cgi/', '/media/', '/assets/']):
|
|
continue
|
|
seen.add(url)
|
|
clean.append(url)
|
|
|
|
return clean[:5]
|
|
|
|
def clean_content(content):
|
|
"""Best-effort text extraction from messy email body."""
|
|
# Decode QP
|
|
content = decode_qp(content)
|
|
|
|
# Remove HTML tags
|
|
content = re.sub(r'<[^>]+>', ' ', content)
|
|
|
|
# Remove email formatting artifacts
|
|
content = re.sub(r'\[\[.*?\]\]', ' ', content) # [[markup]]
|
|
content = re.sub(r'\{\{.*?\}\}', ' ', content) # {{markup}}
|
|
content = re.sub(r'\{\|\|.*?\|\|\}', ' ', content) # {||markup||}
|
|
content = re.sub(r'\^[^\^]+\^', ' ', content) # ^markup^
|
|
content = re.sub(r'~~[^~]+~~', ' ', content) # ~~markup~~
|
|
content = re.sub(r'__[^_]+__', ' ', content) # __markup__
|
|
|
|
# Remove base64/encoded blocks
|
|
content = re.sub(r'[A-Za-z0-9+/]{60,}', '', content)
|
|
|
|
# Convert markdown links to just text
|
|
content = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', content)
|
|
|
|
# Clean up whitespace
|
|
content = re.sub(r'[ \t]+', ' ', content)
|
|
content = re.sub(r'\n{3,}', '\n\n', content)
|
|
|
|
# Split into lines and filter - be LESS aggressive
|
|
lines = [l.strip() for l in content.split('\n')]
|
|
|
|
# Filter out noise lines
|
|
noise_patterns = [
|
|
r'^[-=_\*\^|>.@#]+$', # Separator lines
|
|
r'^read online$',
|
|
r'^sign up$',
|
|
r'^advertise$',
|
|
r'^sponsor$',
|
|
r'^view all$',
|
|
r'^ unsubscribe$',
|
|
r'^\d+ more$',
|
|
]
|
|
|
|
filtered = []
|
|
for line in lines:
|
|
if len(line) < 20:
|
|
continue
|
|
if re.match(r'^[\W_]+\s*[\W_]+$', line):
|
|
continue
|
|
skip = False
|
|
for pattern in noise_patterns:
|
|
if re.match(pattern, line, re.IGNORECASE):
|
|
skip = True
|
|
break
|
|
if not skip:
|
|
filtered.append(line[:300])
|
|
|
|
result = '\n'.join(filtered)
|
|
|
|
# Final cleanup
|
|
result = re.sub(r'^\s*(by|from|to|subject|date):.*$', '', result, flags=re.IGNORECASE | re.MULTILINE)
|
|
|
|
return result[:3000].strip()
|
|
|
|
def is_blocked(sender):
|
|
return any(b in sender.lower() for b in BLOCKLIST)
|
|
|
|
if len(sys.argv) < 2:
|
|
print("[]")
|
|
sys.exit(0)
|
|
|
|
with open(sys.argv[1], 'r', errors='replace') as f:
|
|
text = f.read()
|
|
|
|
# Split into structured records
|
|
emails = []
|
|
current_from = None
|
|
current_subject = None
|
|
content_lines = []
|
|
in_content = False
|
|
|
|
for line in text.split('\n'):
|
|
if line.startswith('AI_EMAIL:'):
|
|
if current_from and in_content and not is_blocked(current_from):
|
|
raw = '\n'.join(content_lines)
|
|
emails.append({
|
|
'from': current_from,
|
|
'subject': decode_subject(current_subject),
|
|
'urls': extract_urls(raw),
|
|
'content': clean_content(raw)
|
|
})
|
|
meta = line[9:]
|
|
parts = meta.split(' | ', 1)
|
|
current_from = parts[0].strip()
|
|
current_subject = parts[1].strip() if len(parts) > 1 else ''
|
|
content_lines = []
|
|
in_content = False
|
|
elif line.startswith('AI_CONTENT:') and current_from:
|
|
content_lines = [line[12:]] # strip prefix
|
|
in_content = True
|
|
elif in_content and not line.startswith(('STATUS:', 'TOTAL:', 'LAST_UID:', 'RECENT:', 'AI_COUNT:')):
|
|
content_lines.append(line)
|
|
|
|
# Last one
|
|
if current_from and in_content and not is_blocked(current_from):
|
|
raw = '\n'.join(content_lines)
|
|
emails.append({
|
|
'from': current_from,
|
|
'subject': decode_subject(current_subject),
|
|
'urls': extract_urls(raw),
|
|
'content': clean_content(raw)
|
|
})
|
|
|
|
print(json.dumps(emails, indent=2)) |