#!/usr/bin/env python3 """ Content Aggregation Script for OpenClaw Daily Digest Unifies Reddit, News, and Twitter content into structured JSON """ import json import sys import os from datetime import datetime, timedelta from typing import List, Dict, Any sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'sources')) from reddit_fetcher import fetch_reddit_content from news_fetcher import fetch_news_content def deduplicate_stories(items: List[Dict]) -> List[Dict]: """Remove duplicate stories based on URL similarity""" seen_urls = set() unique = [] for item in items: url = item.get('url', '').lower().split('?')[0] # Normalize URL # Skip if we've seen this URL if url in seen_urls: continue # Also check for title similarity title = item.get('title', '').lower() is_duplicate = False for existing in unique: existing_title = existing.get('title', '').lower() # Simple similarity: if titles share 80%+ words title_words = set(title.split()) existing_words = set(existing_title.split()) if title_words and existing_words: overlap = len(title_words & existing_words) / max(len(title_words), len(existing_words)) if overlap > 0.8: is_duplicate = True break if not is_duplicate: seen_urls.add(url) unique.append(item) return unique def score_relevance(item: Dict) -> float: """Score story relevance for ranking""" score = 0.0 # Base engagement score if 'score' in item and 'num_comments' in item: # Reddit-style scoring score += item.get('score', 0) * 0.5 score += item.get('num_comments', 0) * 1.5 score += item.get('upvote_ratio', 0.5) * 50 elif 'points' in item: # Hacker News scoring score += item.get('points', 0) * 1.0 score += item.get('num_comments', 0) * 2.0 else: # Default: news articles get medium base score score = 50.0 # Boost for high-engagement content if item.get('num_comments', 0) > 50 or item.get('points', 0) > 100: score += 100 return score def format_reddit_story(story: Dict) -> str: """Format a Reddit story for v2 HTML email - Spark-safe inline styles""" engagement = [] if story.get('score'): engagement.append(f"โ {story['score']}") if story.get('num_comments'): engagement.append(f"๐ฌ {story['num_comments']}") # Shorter excerpt for cleaner look excerpt = story.get('selftext', '')[:150] if len(story.get('selftext', '')) > 150: excerpt += "..." flair = story.get('link_flair_text', '') title = story.get('title', '') if flair: title = f"[{flair}] {title}" engagement_html = f"
{' ยท '.join(engagement)}
" if engagement else "" excerpt_html = f"{excerpt}
" if excerpt else "" return f'''
Reddit
{title}u/{story.get('author', 'unknown')} {excerpt_html} {engagement_html} |
{' ยท '.join(engagement)}
" if engagement else "" excerpt_html = f"{excerpt}
" if excerpt else "" return f'''
{source}
{story.get('title', '')}{excerpt_html} {engagement_html} |
๐ง X/Twitter integration coming soon
' # Generate text sections reddit_text = '\n'.join([format_story_text(s) for s in reddit_top]) if reddit_top else "No new Reddit posts today." news_text = '\n'.join([format_story_text(s) for s in news_top]) if news_top else "No new news articles today." twitter_text = "๐ง X/Twitter integration coming soon - requires API setup\n" # Build result result = { "meta": { "generated_at": datetime.utcnow().isoformat(), "time_window_hours": hours, "date": datetime.utcnow().strftime("%A, %B %d, %Y") }, "stats": { "reddit_count": reddit_data.get('total_posts', 0), "news_count": news_data.get('total_items', 0), "twitter_count": 0, "total_unique": len(unique_items) }, "content": { "reddit": reddit_data, "news": news_data, "twitter": twitter_data }, "formatted": { "reddit_html": reddit_html, "news_html": news_html, "twitter_html": twitter_html, "reddit_text": reddit_text, "news_text": news_text, "twitter_text": twitter_text } } print("\n" + "=" * 50) print(f"โ Aggregation complete!") print(f" Reddit posts: {result['stats']['reddit_count']}") print(f" News items: {result['stats']['news_count']}") print(f" Total unique: {result['stats']['total_unique']}") return result if __name__ == "__main__": hours = int(sys.argv[1]) if len(sys.argv) > 1 else 24 output_file = sys.argv[2] if len(sys.argv) > 2 else "/home/openclaw/.openclaw/workspace/automations/openclaw-digest/output/digest.json" result = aggregate_content(hours=hours) with open(output_file, 'w') as f: json.dump(result, f, indent=2) print(f"\n๐ Output saved to: {output_file}")