基于 Python 的多平台社交媒体数据采集系统。覆盖 Twitter/X、Reddit、微博、VK 四大平台, 支持 13 国语言关键词匹配与情感分析,精准筛选 20-50 岁单身男性用户。 集成代理轮换、速率限制、去重清洗和 MongoDB 存储,开箱即用。
模块化流水线架构,每个环节精准可控
英/中/日/韩/西/法/德/葡/俄/阿/泰/越/印尼。每种语言 30-50 个精准关键词 + 排除词库。自动语言检测与置信度评分。
LanguageDetector按性别(男性优先)、年龄(20-50岁)、情感状态(单身)三层过滤。支持平台字段提取 + 文本语义分析双重验证。
UserFilter多代理提供商集成(Webshare/ScrapingBee/手动代理),健康检查 + 轮转策略(轮询/随机/最少使用),失败冷却机制。
ProxyManagerMD5 内容哈希去重、HTML 标签剥离、URL 过滤、Unicode 规范化。24 小时滑动窗口去重,精准垃圾信息检测。
DataCleanerDocker Compose 一键启动(App + MongoDB)。环境变量配置、定时调度(cron 风格)、日志轮转。
DockerMongoDB 多集合存储(原始数据、处理后数据、目标用户)。索引优化、TTL 自动过期、批量导入导出。
MongoDB平台自适应速率限制、指数退避重试、最大错误容忍自动熔断。每个 API 调用的智能时间窗口管理。
BaseScraperTwitter/X(Bearer Token)、Reddit(OAuth2)、Weibo(Cookie)、VK(Access Token)。统一抽象基类,新增平台即插即用。
4 Platforms每种语言均配置了针对性关键词、排除词、年龄/性别提示语
覆盖四大主流社交媒体平台,统一抽象接口
Bearer Token 认证 · API v2 · 搜索推文、用户画像、趋势话题
OAuth2 认证 · 多子版块搜索 · 新帖/热门帖 · 用户画像提取
移动端 API · Cookie 认证 · 中文关键词搜索 · 用户资料获取
Access Token · 用户/帖子搜索 · 关系状态检测 · 群组成员扫描
模块化流水线设计,六层架构清晰可扩展
核心模块代码片段,展示设计思路与实现细节
class BaseScraper(abc.ABC): """Abstract base class for all platform scrapers.""" def __init__(self, platform_name, proxy_manager=None, logger=None): self.platform_name = platform_name self.proxy_manager = proxy_manager or ProxyManager() self._consecutive_errors = 0 self._max_consecutive_errors = 5 @abc.abstractmethod def authenticate(self) -> bool: pass @abc.abstractmethod def search(self, query, **kwargs) -> Generator[Dict, None, None]: pass def rate_limit(self): """Apply rate limiting based on platform constraints.""" min_interval = 1.0 / max(self._max_requests_per_second(), 0.1) elapsed = time.time() - self._last_request_time if elapsed < min_interval: time.sleep(min_interval - elapsed + random.uniform(0.1, 0.5)) self._last_request_time = time.time() def make_request(self, method, url, headers=None, params=None, data=None, use_proxy=True, max_retries=3): """HTTP request with proxy rotation, retry, and rate limiting.""" self.rate_limit() for attempt in range(max_retries): try: proxy = self.get_proxy() if use_proxy else None response = requests.request(method, url, headers=headers, params=params, json=data, proxies=... if proxy else None, timeout=30) if response.status_code == 429: # rate limited time.sleep(int(response.headers.get('Retry-After', 30))) continue response.raise_for_status() return response.json() except requests.exceptions.ProxyError as e: self.proxy_manager.mark_failed(proxy.id) time.sleep(random.uniform(1, 3)) return None
class LanguageDetector: """Detect language and extract gender/age/relationship keywords from text.""" def detect_language(self, text: str) -> Tuple[str, float]: """Detect the language of text. Returns (code, confidence).""" scores = {} # Check character-based patterns for non-Latin scripts for lang_code, pattern in LANG_PATTERNS.items(): matches = pattern.findall(text) if matches: scores[lang_code] = len(matches) / max(len(text), 1) # Keyword-based detection for Latin-script languages for lang_code in latin_langs: matches = sum(1 for w in words if w in kw_set) if matches > 0: scores[lang_code] = scores.get(lang_code, 0) + matches / max(len(words), 1) best_lang = max(scores, key=scores.get) if scores else "unknown" confidence = scores[best_lang] / sum(scores.values()) if scores else 0 return (best_lang, min(confidence, 1.0)) def extract_keywords(self, text, lang=None) -> Dict: """Extract target keywords, exclude keywords, age/gender hints.""" for lang_code in check_langs: kw_data = self.lang_data[lang_code] for kw in kw_data.get("keywords", []): if kw.lower() in text_lower: result["matched_keywords"].append({kw, lang_code}) # Check exclude keywords (married, etc.) for kw in kw_data.get("exclude_keywords", []): if kw.lower() in text_lower: result["is_excluded"] = True return result
class MongoDBStorage: """MongoDB storage handler for social media scraper.""" def __init__(self, logger=None): self.config = MONGO_CONFIG self.client = None self.collections = {} def connect(self) -> bool: """Connect to MongoDB and init collections.""" self.client = MongoClient(self.config["uri"], maxPoolSize=50, serverSelectionTimeoutMS=5000) self.client.admin.command("ping") self.db = self.client[self.config["database"]] for name, col in self.config["collections"].items(): self.collections[name] = self.db[col] return True def insert_raw_post(self, post) -> Optional[str]: """Insert a raw scraped post with dedup.""" post["_stored_at"] = datetime.utcnow() result = self.collections["raw_posts"].insert_one(post) return str(result.inserted_id) def save_filtered_user(self, user_data) -> Optional[str]: """Save a filtered target user record.""" user_data["_stored_at"] = datetime.utcnow() result = self.collections["filtered_users"].insert_one(user_data) return str(result.inserted_id)
def run_scrape(platforms, languages, demo_mode=True, max_posts=100): """Run the complete scraping pipeline.""" scrapers = create_scrapers(proxy_manager, demo_mode) detector = LanguageDetector() filter_engine = UserFilter() cleaner = DataCleaner() for platform_name in platforms: scraper = scrapers[platform_name] scraper.authenticate() for lang in languages: posts = scraper.run_search_query(query=query, lang=lang, max_results=max_posts // len(languages)) platform_posts.extend(posts) cleaned_posts = cleaner.clean_batch(platform_posts) for post in cleaned_posts: analysis = detector.process_text(post.get("text", ""), platform_name) is_target, _ = filter_engine.filter_post(post) if is_target: storage.save_filtered_user(post) return {"total_posts": len(all), "targets": len(targets)} if __name__ == "__main__": parser = argparse.ArgumentParser(description="13-Language Social Media Scraper") parser.add_argument("--platforms", nargs="+", choices=["twitter", "reddit", "weibo", "vk"]) parser.add_argument("--demo", action="store_true", default=True) parser.add_argument("--schedule", type=int, default=0, help="Run on schedule (minutes)") args = parser.parse_args() main()
完整源码已打包为 ZIP 格式。包含全部 25+ Python 模块、配置文件、Docker 部署脚本。即刻下载,马上部署运行!
解压后运行 python src/main.py --demo --list-languages 查看支持语言,或 docker compose up 一键启动