最新版本 · 开源发布
🇺🇸🇨🇳🇯🇵🇰🇷🇪🇸🇫🇷 🇩🇪🇧🇷🇷🇺🇸🇦🇹🇭🇻🇳🇮🇩

13国语言社交媒体数据采集系统

🌐 13 种语言 🎯 精准过滤 🔄 代理轮换 🧠 智能分析 📊 数据清洗 🐳 Docker 部署

基于 Python 的多平台社交媒体数据采集系统。覆盖 Twitter/X、Reddit、微博、VK 四大平台, 支持 13 国语言关键词匹配与情感分析,精准筛选 20-50 岁单身男性用户。 集成代理轮换、速率限制、去重清洗和 MongoDB 存储,开箱即用。

📥 立即下载 ⚡ 核心功能 🌍 支持语言
13
支持语言
4
数据平台
25+
Python模块
50K
代码量(行)

✦ 核心功能特性 ✦

模块化流水线架构,每个环节精准可控

🌐

13 国语言关键词引擎

英/中/日/韩/西/法/德/葡/俄/阿/泰/越/印尼。每种语言 30-50 个精准关键词 + 排除词库。自动语言检测与置信度评分。

LanguageDetector
🎯

精准用户过滤

按性别(男性优先)、年龄(20-50岁)、情感状态(单身)三层过滤。支持平台字段提取 + 文本语义分析双重验证。

UserFilter
🔄

代理轮换系统

多代理提供商集成(Webshare/ScrapingBee/手动代理),健康检查 + 轮转策略(轮询/随机/最少使用),失败冷却机制。

ProxyManager
🧹

数据清洗去重

MD5 内容哈希去重、HTML 标签剥离、URL 过滤、Unicode 规范化。24 小时滑动窗口去重,精准垃圾信息检测。

DataCleaner
🐳

Docker 容器化部署

Docker Compose 一键启动(App + MongoDB)。环境变量配置、定时调度(cron 风格)、日志轮转。

Docker
📊

数据持久化存储

MongoDB 多集合存储(原始数据、处理后数据、目标用户)。索引优化、TTL 自动过期、批量导入导出。

MongoDB

速率限制与重试

平台自适应速率限制、指数退避重试、最大错误容忍自动熔断。每个 API 调用的智能时间窗口管理。

BaseScraper
🔌

多平台数据采集

Twitter/X(Bearer Token)、Reddit(OAuth2)、Weibo(Cookie)、VK(Access Token)。统一抽象基类,新增平台即插即用。

4 Platforms

13 种支持语言

每种语言均配置了针对性关键词、排除词、年龄/性别提示语

🇺🇸
English
en
35 keywords · 15 exclude
🇨🇳
中文
zh
40 keywords · 20 exclude
🇯🇵
日本語
ja
32 keywords · 16 exclude
🇰🇷
한국어
ko
30 keywords · 15 exclude
🇪🇸
Español
es
35 keywords · 14 exclude
🇫🇷
Français
fr
34 keywords · 14 exclude
🇩🇪
Deutsch
de
34 keywords · 14 exclude
🇧🇷
Português
pt
33 keywords · 14 exclude
🇷🇺
Русский
ru
34 keywords · 14 exclude
🇸🇦
العربية
ar
32 keywords · 14 exclude
🇹🇭
ไทย
th
33 keywords · 14 exclude
🇻🇳
Tiếng Việt
vi
34 keywords · 14 exclude
🇮🇩
Bahasa Indonesia
id
33 keywords · 14 exclude

数据源平台

覆盖四大主流社交媒体平台,统一抽象接口

🐦

Twitter / X

Bearer Token 认证 · API v2 · 搜索推文、用户画像、趋势话题

🤖

Reddit

OAuth2 认证 · 多子版块搜索 · 新帖/热门帖 · 用户画像提取

📱

微博 Weibo

移动端 API · Cookie 认证 · 中文关键词搜索 · 用户资料获取

💬

VK (Vkontakte)

Access Token · 用户/帖子搜索 · 关系状态检测 · 群组成员扫描

系统架构

模块化流水线设计,六层架构清晰可扩展

数据源层
🐦 Twitter
🤖 Reddit
📱 微博 Weibo
💬 VK
代理/中间件层
🔄 ProxyManager
⚡ RateLimit
🛡️ Retry + Backoff
处理器层
🌐 语言检测
🎯 用户过滤
🧠 关键词提取
🧹 数据清洗
存储层
🗄️ MongoDB
📁 JSON/CSV
📊 Parquet
输出层
📊 统计报告
🎯 目标用户
📈 趋势分析

代码预览

核心模块代码片段,展示设计思路与实现细节

🔧 抽象基类爬虫 · BaseScraper src/scraper/base.py
class BaseScraper(abc.ABC):
    """Abstract base class for all platform scrapers."""

    def __init__(self, platform_name, proxy_manager=None, logger=None):
        self.platform_name = platform_name
        self.proxy_manager = proxy_manager or ProxyManager()
        self._consecutive_errors = 0
        self._max_consecutive_errors = 5

    @abc.abstractmethod
    def authenticate(self) -> bool:
        pass

    @abc.abstractmethod
    def search(self, query, **kwargs) -> Generator[Dict, None, None]:
        pass

    def rate_limit(self):
        """Apply rate limiting based on platform constraints."""
        min_interval = 1.0 / max(self._max_requests_per_second(), 0.1)
        elapsed = time.time() - self._last_request_time
        if elapsed < min_interval:
            time.sleep(min_interval - elapsed + random.uniform(0.1, 0.5))
        self._last_request_time = time.time()

    def make_request(self, method, url, headers=None, params=None, data=None,
                         use_proxy=True, max_retries=3):
        """HTTP request with proxy rotation, retry, and rate limiting."""
        self.rate_limit()
        for attempt in range(max_retries):
            try:
                proxy = self.get_proxy() if use_proxy else None
                response = requests.request(method, url, headers=headers,
                    params=params, json=data,
                    proxies=... if proxy else None, timeout=30)
                if response.status_code == 429:  # rate limited
                    time.sleep(int(response.headers.get('Retry-After', 30)))
                    continue
                response.raise_for_status()
                return response.json()
            except requests.exceptions.ProxyError as e:
                self.proxy_manager.mark_failed(proxy.id)
                time.sleep(random.uniform(1, 3))
        return None
🌐 语言检测引擎 · LanguageDetector src/processor/language_detector.py
class LanguageDetector:
    """Detect language and extract gender/age/relationship keywords from text."""

    def detect_language(self, text: str) -> Tuple[str, float]:
        """Detect the language of text. Returns (code, confidence)."""
        scores = {}
        # Check character-based patterns for non-Latin scripts
        for lang_code, pattern in LANG_PATTERNS.items():
            matches = pattern.findall(text)
            if matches:
                scores[lang_code] = len(matches) / max(len(text), 1)

        # Keyword-based detection for Latin-script languages
        for lang_code in latin_langs:
            matches = sum(1 for w in words if w in kw_set)
            if matches > 0:
                scores[lang_code] = scores.get(lang_code, 0) + matches / max(len(words), 1)

        best_lang = max(scores, key=scores.get) if scores else "unknown"
        confidence = scores[best_lang] / sum(scores.values()) if scores else 0
        return (best_lang, min(confidence, 1.0))

    def extract_keywords(self, text, lang=None) -> Dict:
        """Extract target keywords, exclude keywords, age/gender hints."""
        for lang_code in check_langs:
            kw_data = self.lang_data[lang_code]
            for kw in kw_data.get("keywords", []):
                if kw.lower() in text_lower:
                    result["matched_keywords"].append({kw, lang_code})
            # Check exclude keywords (married, etc.)
            for kw in kw_data.get("exclude_keywords", []):
                if kw.lower() in text_lower:
                    result["is_excluded"] = True
        return result
🗄️ MongoDB 存储模块 · MongoDBStorage src/storage/mongodb.py
class MongoDBStorage:
    """MongoDB storage handler for social media scraper."""

    def __init__(self, logger=None):
        self.config = MONGO_CONFIG
        self.client = None
        self.collections = {}

    def connect(self) -> bool:
        """Connect to MongoDB and init collections."""
        self.client = MongoClient(self.config["uri"],
            maxPoolSize=50, serverSelectionTimeoutMS=5000)
        self.client.admin.command("ping")
        self.db = self.client[self.config["database"]]
        for name, col in self.config["collections"].items():
            self.collections[name] = self.db[col]
        return True

    def insert_raw_post(self, post) -> Optional[str]:
        """Insert a raw scraped post with dedup."""
        post["_stored_at"] = datetime.utcnow()
        result = self.collections["raw_posts"].insert_one(post)
        return str(result.inserted_id)

    def save_filtered_user(self, user_data) -> Optional[str]:
        """Save a filtered target user record."""
        user_data["_stored_at"] = datetime.utcnow()
        result = self.collections["filtered_users"].insert_one(user_data)
        return str(result.inserted_id)
🚀 主入口 · main.py src/main.py
def run_scrape(platforms, languages, demo_mode=True, max_posts=100):
    """Run the complete scraping pipeline."""
    scrapers = create_scrapers(proxy_manager, demo_mode)
    detector = LanguageDetector()
    filter_engine = UserFilter()
    cleaner = DataCleaner()

    for platform_name in platforms:
        scraper = scrapers[platform_name]
        scraper.authenticate()

        for lang in languages:
            posts = scraper.run_search_query(query=query, lang=lang,
                max_results=max_posts // len(languages))
            platform_posts.extend(posts)

        cleaned_posts = cleaner.clean_batch(platform_posts)

        for post in cleaned_posts:
            analysis = detector.process_text(post.get("text", ""), platform_name)
            is_target, _ = filter_engine.filter_post(post)
            if is_target:
                storage.save_filtered_user(post)

    return {"total_posts": len(all), "targets": len(targets)}


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="13-Language Social Media Scraper")
    parser.add_argument("--platforms", nargs="+", choices=["twitter", "reddit", "weibo", "vk"])
    parser.add_argument("--demo", action="store_true", default=True)
    parser.add_argument("--schedule", type=int, default=0, help="Run on schedule (minutes)")
    args = parser.parse_args()
    main()
📦

📥 下载 13国语言社交媒体数据采集系统

完整源码已打包为 ZIP 格式。包含全部 25+ Python 模块、配置文件、Docker 部署脚本。即刻下载,马上部署运行!

52 KB
文件大小
25+
Python模块
v1.0
版本
Python 3.11
运行环境
下载 ZIP 文件

解压后运行 python src/main.py --demo --list-languages 查看支持语言,或 docker compose up 一键启动