357 lines
11 KiB
Python
357 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
IMAP Email CLI - Python version
|
|
Supports: check, fetch, search, mark-read, mark-unread, list-mailboxes
|
|
"""
|
|
import imaplib
|
|
import email
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import json
|
|
from pathlib import Path
|
|
from email.header import decode_header
|
|
from datetime import datetime, timedelta
|
|
import re
|
|
|
|
# Load .env
|
|
env_file = Path(__file__).parent.parent / '.env'
|
|
if env_file.exists():
|
|
for line in env_file.read_text().splitlines():
|
|
if line.strip() and not line.startswith('#') and '=' in line:
|
|
key, _, value = line.partition('=')
|
|
os.environ[key.strip()] = value.strip()
|
|
|
|
def connect_imap():
|
|
"""Connect and login to IMAP server"""
|
|
host = os.environ.get('IMAP_HOST', 'imap.gmail.com')
|
|
port = int(os.environ.get('IMAP_PORT', '993'))
|
|
user = os.environ.get('IMAP_USER')
|
|
password = os.environ.get('IMAP_PASS')
|
|
|
|
if not user or not password:
|
|
print("Error: IMAP_USER and IMAP_PASS must be set", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
mail = imaplib.IMAP4_SSL(host, port)
|
|
mail.login(user, password)
|
|
return mail
|
|
|
|
def decode_header_value(value):
|
|
"""Decode email header value"""
|
|
if not value:
|
|
return ""
|
|
decoded_parts = decode_header(value)
|
|
result = []
|
|
for part, encoding in decoded_parts:
|
|
if isinstance(part, bytes):
|
|
result.append(part.decode(encoding or 'utf-8', errors='ignore'))
|
|
else:
|
|
result.append(part)
|
|
return ' '.join(result)
|
|
|
|
def parse_time_filter(time_str):
|
|
"""Parse time filter like '30m', '2h', '7d' into datetime"""
|
|
match = re.match(r'(\d+)([mhd])', time_str)
|
|
if not match:
|
|
return None
|
|
|
|
value, unit = int(match.group(1)), match.group(2)
|
|
if unit == 'm':
|
|
delta = timedelta(minutes=value)
|
|
elif unit == 'h':
|
|
delta = timedelta(hours=value)
|
|
elif unit == 'd':
|
|
delta = timedelta(days=value)
|
|
else:
|
|
return None
|
|
|
|
return datetime.now() - delta
|
|
|
|
def cmd_check(args):
|
|
"""Check for recent emails"""
|
|
mail = connect_imap()
|
|
mailbox = args.mailbox or 'INBOX'
|
|
mail.select(mailbox)
|
|
|
|
# Search criteria
|
|
criteria = ['ALL']
|
|
if args.recent:
|
|
since_date = parse_time_filter(args.recent)
|
|
if since_date:
|
|
date_str = since_date.strftime('%d-%b-%Y')
|
|
criteria = [f'SINCE {date_str}']
|
|
|
|
# Search
|
|
status, messages = mail.search(None, *criteria)
|
|
if status != 'OK':
|
|
print("Error searching mailbox", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
msg_nums = messages[0].split()
|
|
if not msg_nums:
|
|
print("No messages found")
|
|
mail.close()
|
|
mail.logout()
|
|
return
|
|
|
|
# Get last N messages
|
|
limit = int(args.limit) if args.limit else 10
|
|
msg_nums = msg_nums[-limit:]
|
|
|
|
print(f"Showing {len(msg_nums)} most recent messages:\n")
|
|
|
|
for num in reversed(msg_nums):
|
|
typ, data = mail.fetch(num, '(FLAGS BODY[HEADER.FIELDS (FROM SUBJECT DATE)])')
|
|
if data[0]:
|
|
msg_data = data[0][1].decode('utf-8', errors='ignore')
|
|
msg = email.message_from_string(msg_data)
|
|
|
|
from_addr = decode_header_value(msg.get('From', ''))
|
|
subject = decode_header_value(msg.get('Subject', ''))
|
|
date = msg.get('Date', '')
|
|
|
|
# Check if unread
|
|
flags = data[1].decode('utf-8', errors='ignore') if len(data) > 1 else ''
|
|
unread = '●' if '\\Seen' not in flags else ' '
|
|
|
|
print(f"{unread} UID: {num.decode()}")
|
|
print(f" From: {from_addr}")
|
|
print(f" Subject: {subject}")
|
|
print(f" Date: {date}")
|
|
print()
|
|
|
|
mail.close()
|
|
mail.logout()
|
|
|
|
def cmd_fetch(args):
|
|
"""Fetch full email content"""
|
|
if not args.uid:
|
|
print("Error: UID required", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
mail = connect_imap()
|
|
mailbox = args.mailbox or 'INBOX'
|
|
mail.select(mailbox)
|
|
|
|
uid = args.uid
|
|
typ, data = mail.fetch(uid, '(RFC822)')
|
|
|
|
if data[0] is None:
|
|
print(f"Error: Message {uid} not found", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
msg = email.message_from_bytes(data[0][1])
|
|
|
|
print(f"From: {decode_header_value(msg.get('From', ''))}")
|
|
print(f"To: {decode_header_value(msg.get('To', ''))}")
|
|
print(f"Subject: {decode_header_value(msg.get('Subject', ''))}")
|
|
print(f"Date: {msg.get('Date', '')}")
|
|
print("-" * 80)
|
|
|
|
# Get body
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
content_type = part.get_content_type()
|
|
if content_type == 'text/plain':
|
|
body = part.get_payload(decode=True)
|
|
if body:
|
|
print(body.decode('utf-8', errors='ignore'))
|
|
break
|
|
elif content_type == 'text/html' and args.html:
|
|
body = part.get_payload(decode=True)
|
|
if body:
|
|
print(body.decode('utf-8', errors='ignore'))
|
|
break
|
|
else:
|
|
body = msg.get_payload(decode=True)
|
|
if body:
|
|
print(body.decode('utf-8', errors='ignore'))
|
|
|
|
mail.close()
|
|
mail.logout()
|
|
|
|
def cmd_search(args):
|
|
"""Search emails with filters"""
|
|
mail = connect_imap()
|
|
mailbox = args.mailbox or 'INBOX'
|
|
mail.select(mailbox)
|
|
|
|
# Build search criteria
|
|
criteria = []
|
|
|
|
if args.unseen:
|
|
criteria.append('UNSEEN')
|
|
if args.seen:
|
|
criteria.append('SEEN')
|
|
if args.from_:
|
|
criteria.extend(['FROM', f'"{args.from_}"'])
|
|
if args.subject:
|
|
criteria.extend(['SUBJECT', f'"{args.subject}"'])
|
|
if args.since:
|
|
criteria.extend(['SINCE', args.since])
|
|
if args.before:
|
|
criteria.extend(['BEFORE', args.before])
|
|
if args.recent:
|
|
since_date = parse_time_filter(args.recent)
|
|
if since_date:
|
|
date_str = since_date.strftime('%d-%b-%Y')
|
|
criteria.extend(['SINCE', date_str])
|
|
|
|
if not criteria:
|
|
criteria = ['ALL']
|
|
|
|
# Search
|
|
status, messages = mail.search(None, *criteria)
|
|
if status != 'OK':
|
|
print("Error searching mailbox", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
msg_nums = messages[0].split()
|
|
if not msg_nums:
|
|
print("No messages found")
|
|
mail.close()
|
|
mail.logout()
|
|
return
|
|
|
|
# Get last N messages
|
|
limit = int(args.limit) if args.limit else 20
|
|
msg_nums = msg_nums[-limit:]
|
|
|
|
print(f"Found {len(msg_nums)} messages:\n")
|
|
|
|
for num in reversed(msg_nums):
|
|
typ, data = mail.fetch(num, '(FLAGS BODY[HEADER.FIELDS (FROM SUBJECT DATE)])')
|
|
if data[0]:
|
|
msg_data = data[0][1].decode('utf-8', errors='ignore')
|
|
msg = email.message_from_string(msg_data)
|
|
|
|
from_addr = decode_header_value(msg.get('From', ''))
|
|
subject = decode_header_value(msg.get('Subject', ''))
|
|
date = msg.get('Date', '')
|
|
|
|
flags = data[1].decode('utf-8', errors='ignore') if len(data) > 1 else ''
|
|
unread = '●' if '\\Seen' not in flags else ' '
|
|
|
|
print(f"{unread} UID: {num.decode()}")
|
|
print(f" From: {from_addr}")
|
|
print(f" Subject: {subject}")
|
|
print(f" Date: {date}")
|
|
print()
|
|
|
|
mail.close()
|
|
mail.logout()
|
|
|
|
def cmd_mark_read(args):
|
|
"""Mark message(s) as read"""
|
|
if not args.uids:
|
|
print("Error: At least one UID required", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
mail = connect_imap()
|
|
mailbox = args.mailbox or 'INBOX'
|
|
mail.select(mailbox)
|
|
|
|
for uid in args.uids:
|
|
mail.store(uid, '+FLAGS', '\\Seen')
|
|
print(f"Marked {uid} as read")
|
|
|
|
mail.close()
|
|
mail.logout()
|
|
|
|
def cmd_mark_unread(args):
|
|
"""Mark message(s) as unread"""
|
|
if not args.uids:
|
|
print("Error: At least one UID required", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
mail = connect_imap()
|
|
mailbox = args.mailbox or 'INBOX'
|
|
mail.select(mailbox)
|
|
|
|
for uid in args.uids:
|
|
mail.store(uid, '-FLAGS', '\\Seen')
|
|
print(f"Marked {uid} as unread")
|
|
|
|
mail.close()
|
|
mail.logout()
|
|
|
|
def cmd_list_mailboxes(args):
|
|
"""List all mailboxes"""
|
|
mail = connect_imap()
|
|
|
|
status, mailboxes = mail.list()
|
|
if status == 'OK':
|
|
print("Available mailboxes:")
|
|
for mb in mailboxes:
|
|
parts = mb.decode().split(' "/" ')
|
|
if len(parts) >= 2:
|
|
name = parts[1].strip('"')
|
|
print(f" {name}")
|
|
|
|
mail.logout()
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='IMAP Email CLI')
|
|
subparsers = parser.add_subparsers(dest='command', help='Command to execute')
|
|
|
|
# check
|
|
check_parser = subparsers.add_parser('check', help='Check for new/unread emails')
|
|
check_parser.add_argument('--limit', help='Max results', default='10')
|
|
check_parser.add_argument('--mailbox', help='Mailbox name', default='INBOX')
|
|
check_parser.add_argument('--recent', help='Only show emails from last X time (e.g., 30m, 2h, 7d)')
|
|
|
|
# fetch
|
|
fetch_parser = subparsers.add_parser('fetch', help='Fetch full email content')
|
|
fetch_parser.add_argument('uid', help='Email UID')
|
|
fetch_parser.add_argument('--mailbox', help='Mailbox name', default='INBOX')
|
|
fetch_parser.add_argument('--html', action='store_true', help='Show HTML content')
|
|
|
|
# search
|
|
search_parser = subparsers.add_parser('search', help='Search emails')
|
|
search_parser.add_argument('--unseen', action='store_true', help='Only unread messages')
|
|
search_parser.add_argument('--seen', action='store_true', help='Only read messages')
|
|
search_parser.add_argument('--from', dest='from_', help='From address contains')
|
|
search_parser.add_argument('--subject', help='Subject contains')
|
|
search_parser.add_argument('--recent', help='From last X time (e.g., 30m, 2h, 7d)')
|
|
search_parser.add_argument('--since', help='After date (DD-Mon-YYYY)')
|
|
search_parser.add_argument('--before', help='Before date (DD-Mon-YYYY)')
|
|
search_parser.add_argument('--limit', help='Max results', default='20')
|
|
search_parser.add_argument('--mailbox', help='Mailbox name', default='INBOX')
|
|
|
|
# mark-read
|
|
read_parser = subparsers.add_parser('mark-read', help='Mark message(s) as read')
|
|
read_parser.add_argument('uids', nargs='+', help='Email UIDs')
|
|
read_parser.add_argument('--mailbox', help='Mailbox name', default='INBOX')
|
|
|
|
# mark-unread
|
|
unread_parser = subparsers.add_parser('mark-unread', help='Mark message(s) as unread')
|
|
unread_parser.add_argument('uids', nargs='+', help='Email UIDs')
|
|
unread_parser.add_argument('--mailbox', help='Mailbox name', default='INBOX')
|
|
|
|
# list-mailboxes
|
|
subparsers.add_parser('list-mailboxes', help='List all mailboxes')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.command:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
# Execute command
|
|
if args.command == 'check':
|
|
cmd_check(args)
|
|
elif args.command == 'fetch':
|
|
cmd_fetch(args)
|
|
elif args.command == 'search':
|
|
cmd_search(args)
|
|
elif args.command == 'mark-read':
|
|
cmd_mark_read(args)
|
|
elif args.command == 'mark-unread':
|
|
cmd_mark_unread(args)
|
|
elif args.command == 'list-mailboxes':
|
|
cmd_list_mailboxes(args)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|