#!/usr/bin/env python3 """ Content Aggregation Script for OpenClaw Daily Digest Unifies Reddit, News, and Twitter content into structured JSON """ import json import sys import os from datetime import datetime, timedelta from typing import List, Dict, Any sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'sources')) from reddit_fetcher import fetch_reddit_content from news_fetcher import fetch_news_content def deduplicate_stories(items: List[Dict]) -> List[Dict]: """Remove duplicate stories based on URL similarity""" seen_urls = set() unique = [] for item in items: url = item.get('url', '').lower().split('?')[0] # Normalize URL # Skip if we've seen this URL if url in seen_urls: continue # Also check for title similarity title = item.get('title', '').lower() is_duplicate = False for existing in unique: existing_title = existing.get('title', '').lower() # Simple similarity: if titles share 80%+ words title_words = set(title.split()) existing_words = set(existing_title.split()) if title_words and existing_words: overlap = len(title_words & existing_words) / max(len(title_words), len(existing_words)) if overlap > 0.8: is_duplicate = True break if not is_duplicate: seen_urls.add(url) unique.append(item) return unique def score_relevance(item: Dict) -> float: """Score story relevance for ranking""" score = 0.0 # Base engagement score if 'score' in item and 'num_comments' in item: # Reddit-style scoring score += item.get('score', 0) * 0.5 score += item.get('num_comments', 0) * 1.5 score += item.get('upvote_ratio', 0.5) * 50 elif 'points' in item: # Hacker News scoring score += item.get('points', 0) * 1.0 score += item.get('num_comments', 0) * 2.0 else: # Default: news articles get medium base score score = 50.0 # Boost for high-engagement content if item.get('num_comments', 0) > 50 or item.get('points', 0) > 100: score += 100 return score def format_reddit_story(story: Dict) -> str: """Format a Reddit story for v2 HTML email - Spark-safe inline styles""" engagement = [] if story.get('score'): engagement.append(f"โ†‘ {story['score']}") if story.get('num_comments'): engagement.append(f"๐Ÿ’ฌ {story['num_comments']}") # Shorter excerpt for cleaner look excerpt = story.get('selftext', '')[:150] if len(story.get('selftext', '')) > 150: excerpt += "..." flair = story.get('link_flair_text', '') title = story.get('title', '') if flair: title = f"[{flair}] {title}" engagement_html = f"

{' ยท '.join(engagement)}

" if engagement else "" excerpt_html = f"

{excerpt}

" if excerpt else "" return f'''
Reddit

{title}

u/{story.get('author', 'unknown')}

{excerpt_html} {engagement_html}
''' def format_news_story(story: Dict) -> str: """Format a news story for v2 HTML email - Spark-safe inline styles""" source = story.get('source', 'News') tag_color = '#74b9ff' tag_bg = 'rgba(116,185,255,0.15)' if 'GitHub' in source: tag_color = '#a29bfe' tag_bg = 'rgba(139,148,158,0.15)' elif 'Hacker' in source: tag_color = '#ff9f43' tag_bg = 'rgba(255,102,0,0.15)' engagement = [] if story.get('points'): engagement.append(f"โ†‘ {story['points']}") if story.get('num_comments'): engagement.append(f"๐Ÿ’ฌ {story['num_comments']}") # Shorter excerpt for cleaner look excerpt = story.get('summary', '')[:150] if len(story.get('summary', '')) > 150: excerpt += "..." engagement_html = f"

{' ยท '.join(engagement)}

" if engagement else "" excerpt_html = f"

{excerpt}

" if excerpt else "" return f'''
{source}

{story.get('title', '')}

{excerpt_html} {engagement_html}
''' def format_story_text(story: Dict) -> str: """Format a story for plain-text email""" lines = [ f"๐Ÿ“Œ {story.get('title', '')}", f" Link: {story.get('url', '')}", ] if story.get('author'): lines.append(f" Author: {story.get('author')}") if story.get('score') or story.get('points'): score = story.get('score') or story.get('points', 0) lines.append(f" Score: {score} upvotes") if story.get('num_comments'): lines.append(f" Comments: {story.get('num_comments')}") excerpt = story.get('selftext', '') or story.get('summary', '') if excerpt: excerpt = excerpt[:150] + "..." if len(excerpt) > 150 else excerpt lines.append(f" {excerpt}") lines.append("") # Empty line between stories return "\n".join(lines) def aggregate_content(hours: int = 24) -> Dict[str, Any]: """Main aggregation function""" print(f"๐Ÿฆ€ Aggregating OpenClaw content from last {hours} hours...") print("=" * 50) # Fetch from all sources print("\n๐Ÿ“ฅ Fetching Reddit content...") reddit_data = fetch_reddit_content(hours=hours) print("\n๐Ÿ“ฅ Fetching news content...") news_data = fetch_news_content(hours=hours) # Twitter placeholder (will be added when API is configured) twitter_data = { "source": "twitter", "total_items": 0, "tweets": [], "note": "X/Twitter integration requires API setup" } # Combine all items for deduplication all_items = [] all_items.extend([{**item, '_source': 'reddit'} for item in reddit_data.get('all_posts', [])]) all_items.extend([{**item, '_source': 'news'} for item in news_data.get('all_items', [])]) # Deduplicate print("\n๐Ÿงน Deduplicating stories...") unique_items = deduplicate_stories(all_items) print(f" Removed {len(all_items) - len(unique_items)} duplicates") # Sort by relevance score unique_items.sort(key=score_relevance, reverse=True) # Split back into sections reddit_top = [item for item in unique_items if item.get('_source') == 'reddit'][:8] news_top = [item for item in unique_items if item.get('_source') == 'news'][:8] # Generate HTML sections reddit_html = '\n'.join([format_reddit_story(s) for s in reddit_top]) news_html = '\n'.join([format_news_story(s) for s in news_top]) twitter_html = '

๐Ÿšง X/Twitter integration coming soon

' # Generate text sections reddit_text = '\n'.join([format_story_text(s) for s in reddit_top]) if reddit_top else "No new Reddit posts today." news_text = '\n'.join([format_story_text(s) for s in news_top]) if news_top else "No new news articles today." twitter_text = "๐Ÿšง X/Twitter integration coming soon - requires API setup\n" # Build result result = { "meta": { "generated_at": datetime.utcnow().isoformat(), "time_window_hours": hours, "date": datetime.utcnow().strftime("%A, %B %d, %Y") }, "stats": { "reddit_count": reddit_data.get('total_posts', 0), "news_count": news_data.get('total_items', 0), "twitter_count": 0, "total_unique": len(unique_items) }, "content": { "reddit": reddit_data, "news": news_data, "twitter": twitter_data }, "formatted": { "reddit_html": reddit_html, "news_html": news_html, "twitter_html": twitter_html, "reddit_text": reddit_text, "news_text": news_text, "twitter_text": twitter_text } } print("\n" + "=" * 50) print(f"โœ… Aggregation complete!") print(f" Reddit posts: {result['stats']['reddit_count']}") print(f" News items: {result['stats']['news_count']}") print(f" Total unique: {result['stats']['total_unique']}") return result if __name__ == "__main__": hours = int(sys.argv[1]) if len(sys.argv) > 1 else 24 output_file = sys.argv[2] if len(sys.argv) > 2 else "/home/openclaw/.openclaw/workspace/automations/openclaw-digest/output/digest.json" result = aggregate_content(hours=hours) with open(output_file, 'w') as f: json.dump(result, f, indent=2) print(f"\n๐Ÿ“„ Output saved to: {output_file}")