Initial backup 2026-02-17
This commit is contained in:
356
skills/imap-smtp-email/scripts/imap-py.py
Normal file
356
skills/imap-smtp-email/scripts/imap-py.py
Normal file
@@ -0,0 +1,356 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
IMAP Email CLI - Python version
|
||||
Supports: check, fetch, search, mark-read, mark-unread, list-mailboxes
|
||||
"""
|
||||
import imaplib
|
||||
import email
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import json
|
||||
from pathlib import Path
|
||||
from email.header import decode_header
|
||||
from datetime import datetime, timedelta
|
||||
import re
|
||||
|
||||
# Load .env
|
||||
env_file = Path(__file__).parent.parent / '.env'
|
||||
if env_file.exists():
|
||||
for line in env_file.read_text().splitlines():
|
||||
if line.strip() and not line.startswith('#') and '=' in line:
|
||||
key, _, value = line.partition('=')
|
||||
os.environ[key.strip()] = value.strip()
|
||||
|
||||
def connect_imap():
|
||||
"""Connect and login to IMAP server"""
|
||||
host = os.environ.get('IMAP_HOST', 'imap.gmail.com')
|
||||
port = int(os.environ.get('IMAP_PORT', '993'))
|
||||
user = os.environ.get('IMAP_USER')
|
||||
password = os.environ.get('IMAP_PASS')
|
||||
|
||||
if not user or not password:
|
||||
print("Error: IMAP_USER and IMAP_PASS must be set", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
mail = imaplib.IMAP4_SSL(host, port)
|
||||
mail.login(user, password)
|
||||
return mail
|
||||
|
||||
def decode_header_value(value):
|
||||
"""Decode email header value"""
|
||||
if not value:
|
||||
return ""
|
||||
decoded_parts = decode_header(value)
|
||||
result = []
|
||||
for part, encoding in decoded_parts:
|
||||
if isinstance(part, bytes):
|
||||
result.append(part.decode(encoding or 'utf-8', errors='ignore'))
|
||||
else:
|
||||
result.append(part)
|
||||
return ' '.join(result)
|
||||
|
||||
def parse_time_filter(time_str):
|
||||
"""Parse time filter like '30m', '2h', '7d' into datetime"""
|
||||
match = re.match(r'(\d+)([mhd])', time_str)
|
||||
if not match:
|
||||
return None
|
||||
|
||||
value, unit = int(match.group(1)), match.group(2)
|
||||
if unit == 'm':
|
||||
delta = timedelta(minutes=value)
|
||||
elif unit == 'h':
|
||||
delta = timedelta(hours=value)
|
||||
elif unit == 'd':
|
||||
delta = timedelta(days=value)
|
||||
else:
|
||||
return None
|
||||
|
||||
return datetime.now() - delta
|
||||
|
||||
def cmd_check(args):
|
||||
"""Check for recent emails"""
|
||||
mail = connect_imap()
|
||||
mailbox = args.mailbox or 'INBOX'
|
||||
mail.select(mailbox)
|
||||
|
||||
# Search criteria
|
||||
criteria = ['ALL']
|
||||
if args.recent:
|
||||
since_date = parse_time_filter(args.recent)
|
||||
if since_date:
|
||||
date_str = since_date.strftime('%d-%b-%Y')
|
||||
criteria = [f'SINCE {date_str}']
|
||||
|
||||
# Search
|
||||
status, messages = mail.search(None, *criteria)
|
||||
if status != 'OK':
|
||||
print("Error searching mailbox", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
msg_nums = messages[0].split()
|
||||
if not msg_nums:
|
||||
print("No messages found")
|
||||
mail.close()
|
||||
mail.logout()
|
||||
return
|
||||
|
||||
# Get last N messages
|
||||
limit = int(args.limit) if args.limit else 10
|
||||
msg_nums = msg_nums[-limit:]
|
||||
|
||||
print(f"Showing {len(msg_nums)} most recent messages:\n")
|
||||
|
||||
for num in reversed(msg_nums):
|
||||
typ, data = mail.fetch(num, '(FLAGS BODY[HEADER.FIELDS (FROM SUBJECT DATE)])')
|
||||
if data[0]:
|
||||
msg_data = data[0][1].decode('utf-8', errors='ignore')
|
||||
msg = email.message_from_string(msg_data)
|
||||
|
||||
from_addr = decode_header_value(msg.get('From', ''))
|
||||
subject = decode_header_value(msg.get('Subject', ''))
|
||||
date = msg.get('Date', '')
|
||||
|
||||
# Check if unread
|
||||
flags = data[1].decode('utf-8', errors='ignore') if len(data) > 1 else ''
|
||||
unread = '●' if '\\Seen' not in flags else ' '
|
||||
|
||||
print(f"{unread} UID: {num.decode()}")
|
||||
print(f" From: {from_addr}")
|
||||
print(f" Subject: {subject}")
|
||||
print(f" Date: {date}")
|
||||
print()
|
||||
|
||||
mail.close()
|
||||
mail.logout()
|
||||
|
||||
def cmd_fetch(args):
|
||||
"""Fetch full email content"""
|
||||
if not args.uid:
|
||||
print("Error: UID required", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
mail = connect_imap()
|
||||
mailbox = args.mailbox or 'INBOX'
|
||||
mail.select(mailbox)
|
||||
|
||||
uid = args.uid
|
||||
typ, data = mail.fetch(uid, '(RFC822)')
|
||||
|
||||
if data[0] is None:
|
||||
print(f"Error: Message {uid} not found", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
msg = email.message_from_bytes(data[0][1])
|
||||
|
||||
print(f"From: {decode_header_value(msg.get('From', ''))}")
|
||||
print(f"To: {decode_header_value(msg.get('To', ''))}")
|
||||
print(f"Subject: {decode_header_value(msg.get('Subject', ''))}")
|
||||
print(f"Date: {msg.get('Date', '')}")
|
||||
print("-" * 80)
|
||||
|
||||
# Get body
|
||||
if msg.is_multipart():
|
||||
for part in msg.walk():
|
||||
content_type = part.get_content_type()
|
||||
if content_type == 'text/plain':
|
||||
body = part.get_payload(decode=True)
|
||||
if body:
|
||||
print(body.decode('utf-8', errors='ignore'))
|
||||
break
|
||||
elif content_type == 'text/html' and args.html:
|
||||
body = part.get_payload(decode=True)
|
||||
if body:
|
||||
print(body.decode('utf-8', errors='ignore'))
|
||||
break
|
||||
else:
|
||||
body = msg.get_payload(decode=True)
|
||||
if body:
|
||||
print(body.decode('utf-8', errors='ignore'))
|
||||
|
||||
mail.close()
|
||||
mail.logout()
|
||||
|
||||
def cmd_search(args):
|
||||
"""Search emails with filters"""
|
||||
mail = connect_imap()
|
||||
mailbox = args.mailbox or 'INBOX'
|
||||
mail.select(mailbox)
|
||||
|
||||
# Build search criteria
|
||||
criteria = []
|
||||
|
||||
if args.unseen:
|
||||
criteria.append('UNSEEN')
|
||||
if args.seen:
|
||||
criteria.append('SEEN')
|
||||
if args.from_:
|
||||
criteria.extend(['FROM', f'"{args.from_}"'])
|
||||
if args.subject:
|
||||
criteria.extend(['SUBJECT', f'"{args.subject}"'])
|
||||
if args.since:
|
||||
criteria.extend(['SINCE', args.since])
|
||||
if args.before:
|
||||
criteria.extend(['BEFORE', args.before])
|
||||
if args.recent:
|
||||
since_date = parse_time_filter(args.recent)
|
||||
if since_date:
|
||||
date_str = since_date.strftime('%d-%b-%Y')
|
||||
criteria.extend(['SINCE', date_str])
|
||||
|
||||
if not criteria:
|
||||
criteria = ['ALL']
|
||||
|
||||
# Search
|
||||
status, messages = mail.search(None, *criteria)
|
||||
if status != 'OK':
|
||||
print("Error searching mailbox", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
msg_nums = messages[0].split()
|
||||
if not msg_nums:
|
||||
print("No messages found")
|
||||
mail.close()
|
||||
mail.logout()
|
||||
return
|
||||
|
||||
# Get last N messages
|
||||
limit = int(args.limit) if args.limit else 20
|
||||
msg_nums = msg_nums[-limit:]
|
||||
|
||||
print(f"Found {len(msg_nums)} messages:\n")
|
||||
|
||||
for num in reversed(msg_nums):
|
||||
typ, data = mail.fetch(num, '(FLAGS BODY[HEADER.FIELDS (FROM SUBJECT DATE)])')
|
||||
if data[0]:
|
||||
msg_data = data[0][1].decode('utf-8', errors='ignore')
|
||||
msg = email.message_from_string(msg_data)
|
||||
|
||||
from_addr = decode_header_value(msg.get('From', ''))
|
||||
subject = decode_header_value(msg.get('Subject', ''))
|
||||
date = msg.get('Date', '')
|
||||
|
||||
flags = data[1].decode('utf-8', errors='ignore') if len(data) > 1 else ''
|
||||
unread = '●' if '\\Seen' not in flags else ' '
|
||||
|
||||
print(f"{unread} UID: {num.decode()}")
|
||||
print(f" From: {from_addr}")
|
||||
print(f" Subject: {subject}")
|
||||
print(f" Date: {date}")
|
||||
print()
|
||||
|
||||
mail.close()
|
||||
mail.logout()
|
||||
|
||||
def cmd_mark_read(args):
|
||||
"""Mark message(s) as read"""
|
||||
if not args.uids:
|
||||
print("Error: At least one UID required", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
mail = connect_imap()
|
||||
mailbox = args.mailbox or 'INBOX'
|
||||
mail.select(mailbox)
|
||||
|
||||
for uid in args.uids:
|
||||
mail.store(uid, '+FLAGS', '\\Seen')
|
||||
print(f"Marked {uid} as read")
|
||||
|
||||
mail.close()
|
||||
mail.logout()
|
||||
|
||||
def cmd_mark_unread(args):
|
||||
"""Mark message(s) as unread"""
|
||||
if not args.uids:
|
||||
print("Error: At least one UID required", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
mail = connect_imap()
|
||||
mailbox = args.mailbox or 'INBOX'
|
||||
mail.select(mailbox)
|
||||
|
||||
for uid in args.uids:
|
||||
mail.store(uid, '-FLAGS', '\\Seen')
|
||||
print(f"Marked {uid} as unread")
|
||||
|
||||
mail.close()
|
||||
mail.logout()
|
||||
|
||||
def cmd_list_mailboxes(args):
|
||||
"""List all mailboxes"""
|
||||
mail = connect_imap()
|
||||
|
||||
status, mailboxes = mail.list()
|
||||
if status == 'OK':
|
||||
print("Available mailboxes:")
|
||||
for mb in mailboxes:
|
||||
parts = mb.decode().split(' "/" ')
|
||||
if len(parts) >= 2:
|
||||
name = parts[1].strip('"')
|
||||
print(f" {name}")
|
||||
|
||||
mail.logout()
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='IMAP Email CLI')
|
||||
subparsers = parser.add_subparsers(dest='command', help='Command to execute')
|
||||
|
||||
# check
|
||||
check_parser = subparsers.add_parser('check', help='Check for new/unread emails')
|
||||
check_parser.add_argument('--limit', help='Max results', default='10')
|
||||
check_parser.add_argument('--mailbox', help='Mailbox name', default='INBOX')
|
||||
check_parser.add_argument('--recent', help='Only show emails from last X time (e.g., 30m, 2h, 7d)')
|
||||
|
||||
# fetch
|
||||
fetch_parser = subparsers.add_parser('fetch', help='Fetch full email content')
|
||||
fetch_parser.add_argument('uid', help='Email UID')
|
||||
fetch_parser.add_argument('--mailbox', help='Mailbox name', default='INBOX')
|
||||
fetch_parser.add_argument('--html', action='store_true', help='Show HTML content')
|
||||
|
||||
# search
|
||||
search_parser = subparsers.add_parser('search', help='Search emails')
|
||||
search_parser.add_argument('--unseen', action='store_true', help='Only unread messages')
|
||||
search_parser.add_argument('--seen', action='store_true', help='Only read messages')
|
||||
search_parser.add_argument('--from', dest='from_', help='From address contains')
|
||||
search_parser.add_argument('--subject', help='Subject contains')
|
||||
search_parser.add_argument('--recent', help='From last X time (e.g., 30m, 2h, 7d)')
|
||||
search_parser.add_argument('--since', help='After date (DD-Mon-YYYY)')
|
||||
search_parser.add_argument('--before', help='Before date (DD-Mon-YYYY)')
|
||||
search_parser.add_argument('--limit', help='Max results', default='20')
|
||||
search_parser.add_argument('--mailbox', help='Mailbox name', default='INBOX')
|
||||
|
||||
# mark-read
|
||||
read_parser = subparsers.add_parser('mark-read', help='Mark message(s) as read')
|
||||
read_parser.add_argument('uids', nargs='+', help='Email UIDs')
|
||||
read_parser.add_argument('--mailbox', help='Mailbox name', default='INBOX')
|
||||
|
||||
# mark-unread
|
||||
unread_parser = subparsers.add_parser('mark-unread', help='Mark message(s) as unread')
|
||||
unread_parser.add_argument('uids', nargs='+', help='Email UIDs')
|
||||
unread_parser.add_argument('--mailbox', help='Mailbox name', default='INBOX')
|
||||
|
||||
# list-mailboxes
|
||||
subparsers.add_parser('list-mailboxes', help='List all mailboxes')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.command:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
# Execute command
|
||||
if args.command == 'check':
|
||||
cmd_check(args)
|
||||
elif args.command == 'fetch':
|
||||
cmd_fetch(args)
|
||||
elif args.command == 'search':
|
||||
cmd_search(args)
|
||||
elif args.command == 'mark-read':
|
||||
cmd_mark_read(args)
|
||||
elif args.command == 'mark-unread':
|
||||
cmd_mark_unread(args)
|
||||
elif args.command == 'list-mailboxes':
|
||||
cmd_list_mailboxes(args)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user