#!/usr/bin/env python3 """Parse AI_EMAIL / AI_CONTENT pairs from imap check script output.""" import sys import json import re # Senders to exclude BLOCKLIST = [ 'googlenews-noreply@google.com', ] # URL patterns to skip (ads, tracking, social, email) URL_SKIP = re.compile( r'unsubscribe|mailto|twitter\.com|instagram\.com|facebook\.com|' r'youtube\.com/unsubscribe|youtube\.com/channel|' r'utm_source=dlvr|utm_medium=email|' r'genstore|typeform|pigment\.com|maton\.ai|' r'youtu\.be|medium\.com/@|linkedin\.com/posts|' r'cdn-cgi|imageproxy', re.IGNORECASE ) def decode_subject(subject): try: from email.header import decode_header parts = decode_header(subject) decoded = '' for part, charset in parts: if isinstance(part, bytes): decoded += part.decode(charset or 'utf-8', errors='replace') else: decoded += part return decoded.strip() except Exception: return subject.strip() def decode_qp(content): """Decode quoted-printable content before URL extraction.""" # First decode soft line breaks (the main issue) - must be first! # Match = followed by newline (any type) content = re.sub(r'=\r?\n', '', content) content = re.sub(r'=20', '', content) # Remove =20 (space) encoding # More aggressive: remove any = followed by lowercase letter and space content = re.sub(r'=[a-z] ', '', content) content = re.sub(r'=[a-z]$', '', content, flags=re.MULTILINE) def qp_decode(m): try: return bytes.fromhex(m.group(1)).decode('utf-8', errors='replace') except Exception: return m.group(0) # Decode quoted-printable hex codes content = re.sub(r'=([0-9A-Fa-f]{2})', qp_decode, content) # Clean up any remaining = in URLs content = content.replace('=', '') return content def extract_urls(content): """Extract clean article URLs from email content (after full QP decoding).""" # Decode QP FIRST - this is the key fix content = decode_qp(content) # Also extract markdown-style links: [text](https://...) markdown_urls = re.findall(r'\[([^\]]+)\]\((https?://[^\s"<>)\]\']+)\)', content) # Extract regular URLs urls = re.findall(r'https?://[^\s"<>)\]\']+', content) seen = set() clean = [] # First add markdown URLs (they tend to be cleaner) for text, url in markdown_urls: if url not in seen and not URL_SKIP.search(url): # Clean tracking url = re.sub(r'[?&]utm_[^&]+', '', url) url = re.sub(r'[?&]_bhlid=\w+', '', url) url = re.sub(r'[?&]jwt_token=\w+', '', url) url = re.sub(r'[?&]ref=[^&]+', '', url) url = url.rstrip('.,;)\'"') if len(url) > 15 and not any(x in url.lower() for x in ['.jpg', '.png', '.gif', '.jpeg', '.svg', '/cdn-cgi/']): seen.add(url) clean.append(url) # Then add regular URLs for url in urls: url = url.rstrip('.,;)\'"') # Skip tracking-heavy URLs if URL_SKIP.search(url): continue # Clean up common tracking garbage url = re.sub(r'[?&]utm_[^&]+', '', url) url = re.sub(r'[?&]_bhlid=\w+', '', url) url = re.sub(r'[?&]jwt_token=\w+', '', url) url = re.sub(r'[?&]ref=[^&]+', '', url) url = url.rstrip('.,;)\'"') # Must be reasonably long to be a real article if len(url) > 15 and url not in seen: # Also skip image/video URLs if any(x in url.lower() for x in ['.jpg', '.png', '.gif', '.jpeg', '.svg', '/image/', '/cdn-cgi/', '/media/', '/assets/']): continue seen.add(url) clean.append(url) return clean[:5] def clean_content(content): """Best-effort text extraction from messy email body.""" # Decode QP content = decode_qp(content) # Remove HTML tags content = re.sub(r'<[^>]+>', ' ', content) # Remove email formatting artifacts content = re.sub(r'\[\[.*?\]\]', ' ', content) # [[markup]] content = re.sub(r'\{\{.*?\}\}', ' ', content) # {{markup}} content = re.sub(r'\{\|\|.*?\|\|\}', ' ', content) # {||markup||} content = re.sub(r'\^[^\^]+\^', ' ', content) # ^markup^ content = re.sub(r'~~[^~]+~~', ' ', content) # ~~markup~~ content = re.sub(r'__[^_]+__', ' ', content) # __markup__ # Remove base64/encoded blocks content = re.sub(r'[A-Za-z0-9+/]{60,}', '', content) # Convert markdown links to just text content = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', content) # Clean up whitespace content = re.sub(r'[ \t]+', ' ', content) content = re.sub(r'\n{3,}', '\n\n', content) # Split into lines and filter - be LESS aggressive lines = [l.strip() for l in content.split('\n')] # Filter out noise lines noise_patterns = [ r'^[-=_\*\^|>.@#]+$', # Separator lines r'^read online$', r'^sign up$', r'^advertise$', r'^sponsor$', r'^view all$', r'^ unsubscribe$', r'^\d+ more$', ] filtered = [] for line in lines: if len(line) < 20: continue if re.match(r'^[\W_]+\s*[\W_]+$', line): continue skip = False for pattern in noise_patterns: if re.match(pattern, line, re.IGNORECASE): skip = True break if not skip: filtered.append(line[:300]) result = '\n'.join(filtered) # Final cleanup result = re.sub(r'^\s*(by|from|to|subject|date):.*$', '', result, flags=re.IGNORECASE | re.MULTILINE) return result[:3000].strip() def is_blocked(sender): return any(b in sender.lower() for b in BLOCKLIST) if len(sys.argv) < 2: print("[]") sys.exit(0) with open(sys.argv[1], 'r', errors='replace') as f: text = f.read() # Split into structured records emails = [] current_from = None current_subject = None content_lines = [] in_content = False for line in text.split('\n'): if line.startswith('AI_EMAIL:'): if current_from and in_content and not is_blocked(current_from): raw = '\n'.join(content_lines) emails.append({ 'from': current_from, 'subject': decode_subject(current_subject), 'urls': extract_urls(raw), 'content': clean_content(raw) }) meta = line[9:] parts = meta.split(' | ', 1) current_from = parts[0].strip() current_subject = parts[1].strip() if len(parts) > 1 else '' content_lines = [] in_content = False elif line.startswith('AI_CONTENT:') and current_from: content_lines = [line[12:]] # strip prefix in_content = True elif in_content and not line.startswith(('STATUS:', 'TOTAL:', 'LAST_UID:', 'RECENT:', 'AI_COUNT:')): content_lines.append(line) # Last one if current_from and in_content and not is_blocked(current_from): raw = '\n'.join(content_lines) emails.append({ 'from': current_from, 'subject': decode_subject(current_subject), 'urls': extract_urls(raw), 'content': clean_content(raw) }) print(json.dumps(emails, indent=2))