Zum Inhalt springen

claude code写的代码(强化鲁棒性版本)

Image description

  • 需求是补充原表的粉丝数数据
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import csv
import json
import re
import time
import requests
import os
import pandas as pd
from tqdm import tqdm
import fcntl
import signal
import sys
from datetime import datetime

class TwitterFollowerExtractor:
    """
    A class to extract Twitter follower counts from a CSV file.
    """

    # API configuration
    X_RAPIDAPI_KEY = "xxx"
    RAPIDAPI_HOST = "twitter-v1-1-v2-api.p.rapidapi.com"
    ENDPOINT = "https://twitter-v1-1-v2-api.p.rapidapi.com/graphql/UserByScreenName"

    def __init__(self, csv_file_path):
        """
        Initialize the extractor with the path to the CSV file.

        :param csv_file_path: The path to the CSV file.
        """
        self.csv_file_path = csv_file_path
        self.df = None
        self.progress_file = csv_file_path + '.progress'
        self.lock_file = csv_file_path + '.lock'
        self.processed_count = 0
        self.setup_signal_handlers()

    def setup_signal_handlers(self):
        """设置信号处理器以优雅地处理程序中断"""
        def signal_handler(signum, frame):
            print(f"n收到信号 {signum},正在安全退出...")
            self._cleanup_and_exit()

        signal.signal(signal.SIGINT, signal_handler)
        signal.signal(signal.SIGTERM, signal_handler)

    def _cleanup_and_exit(self):
        """清理资源并退出"""
        if os.path.exists(self.lock_file):
            os.remove(self.lock_file)
        print("程序已安全退出。")
        sys.exit(0)

    def _create_lock_file(self):
        """创建锁文件防止多个实例同时运行"""
        if os.path.exists(self.lock_file):
            print(f"检测到锁文件 {self.lock_file},可能有其他实例正在运行。")
            print("如果确认没有其他实例运行,请删除锁文件后重试。")
            return False

        try:
            with open(self.lock_file, 'w') as f:
                f.write(f"{os.getpid()}n{datetime.now()}")
            return True
        except Exception as e:
            print(f"创建锁文件失败: {e}")
            return False

    def _save_progress(self, processed_usernames):
        """保存进度到文件"""
        try:
            with open(self.progress_file, 'w') as f:
                json.dump({
                    'processed_usernames': list(processed_usernames),
                    'processed_count': self.processed_count,
                    'last_update': datetime.now().isoformat()
                }, f, indent=2)
        except Exception as e:
            print(f"保存进度失败: {e}")

    def _load_progress(self):
        """从文件加载进度"""
        if not os.path.exists(self.progress_file):
            return set()

        try:
            with open(self.progress_file, 'r') as f:
                progress_data = json.load(f)
                processed_usernames = set(progress_data.get('processed_usernames', []))
                self.processed_count = progress_data.get('processed_count', 0)
                print(f"从断点继续: 已处理 {len(processed_usernames)} 个用户")
                return processed_usernames
        except Exception as e:
            print(f"加载进度失败: {e}")
            return set()

    def _save_single_row(self, idx, follower_count):
        """逐条保存单行数据到CSV文件"""
        try:
            # 更新DataFrame
            if '粉丝数' not in self.df.columns:
                self.df['粉丝数'] = None
            self.df.at[idx, '粉丝数'] = follower_count

            # 立即写入CSV文件
            self._save_csv()
            return True
        except Exception as e:
            print(f"保存单行数据失败: {e}")
            return False

    def _extract_twitter_username(self, url):
        """Extract Twitter username from URL."""
        if not url:
            return None

        # Handle both profile URLs and status URLs
        status_pattern = r'(?:https?://)?(?:www.)?(?:x.com|twitter.com)/([^/]+)/status/'
        profile_pattern = r'(?:https?://)?(?:www.)?(?:x.com|twitter.com)/([^/?]+)'

        # First try to extract from status URL
        match = re.search(status_pattern, url)
        if match:
            username = match.group(1)
            # Skip if username is "status" (malformed URL)
            if username.lower() == 'status':
                return None
            return username

        # Then try to extract from profile URL
        match = re.search(profile_pattern, url)
        if match:
            username = match.group(1)
            # Remove any query parameters if present
            username = username.split('?')[0]
            # Skip if username is "status" (malformed URL)
            if username.lower() == 'status':
                return None
            return username

        return None

    def _get_follower_count(self, username):
        """Get follower count for a Twitter username using RapidAPI with retry logic."""
        if not username:
            return None

        headers = {
            "X-RapidAPI-Key": self.X_RAPIDAPI_KEY,
            "X-RapidAPI-Host": self.RAPIDAPI_HOST
        }

        # Prepare variables according to the correct API format
        variables = {
            "screen_name": username,
            "withSafetyModeUserFields": True,
            "withHighlightedLabel": True
        }

        querystring = {"variables": json.dumps(variables)}

        # Implement retry logic
        max_retries = 3
        retry_delay = 2  # seconds

        for attempt in range(max_retries):
            try:
                response = requests.get(self.ENDPOINT, headers=headers, params=querystring)
                if response.status_code == 200:
                    data = response.json()
                    # Extract follower count from the response using the correct path
                    if "data" in data and "user" in data["data"] and data["data"]["user"]:
                        user_result = data["data"]["user"]["result"]
                        if "legacy" in user_result:
                            return user_result["legacy"]["followers_count"]
                    else:
                        print(f"No user data found for {username}")
                else:
                    print(f"API request failed for {username} (Attempt {attempt+1}/{max_retries}): Status code {response.status_code}")

                # If we're not on the last attempt, wait before retrying
                if attempt < max_retries - 1:
                    print(f"Retrying in {retry_delay} seconds...")
                    time.sleep(retry_delay)
            except Exception as e:
                print(f"Error fetching data for {username} (Attempt {attempt+1}/{max_retries}): {e}")
                # If we're not on the last attempt, wait before retrying
                if attempt < max_retries - 1:
                    print(f"Retrying in {retry_delay} seconds...")
                    time.sleep(retry_delay)

        return None

    def _backup_file(self):
        """Create a backup of the original CSV file."""
        backup_file = self.csv_file_path + '.backup'
        try:
            with open(self.csv_file_path, 'rb') as src, open(backup_file, 'wb') as dst:
                dst.write(src.read())
            print(f"Created backup of original file at {backup_file}")
        except Exception as e:
            print(f"Warning: Could not create backup file: {e}")

    def _load_csv(self):
        """Load the CSV file into a pandas DataFrame with enhanced compatibility."""
        try:
            # Try different encoding methods for better compatibility
            encodings = ['utf-8-sig', 'utf-8', 'gbk', 'gb2312', 'latin-1']
            df_loaded = False

            for encoding in encodings:
                try:
                    self.df = pd.read_csv(self.csv_file_path, encoding=encoding)
                    df_loaded = True
                    print(f"Successfully loaded CSV with {len(self.df)} rows using {encoding} encoding.")
                    break
                except UnicodeDecodeError:
                    continue
                except Exception as e:
                    print(f"Error with {encoding} encoding: {e}")
                    continue

            if not df_loaded:
                print("Failed to load CSV with any encoding method.")
                return False

            # Clean up the DataFrame columns and data
            self._clean_dataframe()
            return True
        except Exception as e:
            print(f"Error reading CSV file: {e}")
            return False

    def _clean_dataframe(self):
        """Clean the DataFrame to handle malformed data."""
        # Clean column names by removing newlines and extra whitespace
        self.df.columns = [col.strip().replace('n', '').replace('r', '') for col in self.df.columns]

        # Clean the '粉丝数' column if it exists
        if '粉丝数' in self.df.columns:
            # Remove newlines and extra whitespace from the follower count column
            self.df['粉丝数'] = self.df['粉丝数'].astype(str).str.strip().str.replace('n', '').str.replace('r', '')
            # Replace empty strings with None
            self.df['粉丝数'] = self.df['粉丝数'].replace('', None)

        # Clean other string columns
        for col in self.df.columns:
            if self.df[col].dtype == 'object':
                self.df[col] = self.df[col].astype(str).str.strip().str.replace('n', '').str.replace('r', '')
                # Replace 'nan' strings with None
                self.df[col] = self.df[col].replace('nan', None)

    def _save_csv(self):
        """Save the updated DataFrame back to the CSV file."""
        try:
            self.df.to_csv(self.csv_file_path, index=False, encoding='utf-8-sig')
            print(f"Process completed. Follower counts have been updated in {self.csv_file_path}.")
        except Exception as e:
            print(f"Error saving updated CSV: {e}")
            print("Please check the backup file if needed.")

    def _generate_summary(self, processed_count):
        """Generate and print a summary of the results."""
        if '粉丝数' in self.df.columns:
            total_updated = self.df['粉丝数'].notna().sum()

            print(f"nSummary:")
            print(f"Total Twitter accounts processed: {processed_count}")
            print(f"Successfully updated follower counts: {total_updated}")
            print(f"Failed to update follower counts: {processed_count - total_updated}")

            # Print top 10 accounts by follower count
            if total_updated > 0:
                print("nTop 10 accounts by follower count:")
                top_accounts = self.df[self.df['粉丝数'].notna()].sort_values('粉丝数', ascending=False).head(10)
                for _, row in top_accounts.iterrows():
                    url_value = row['url'] if 'url' in row and pd.notna(row['url']) else "N/A"
                    followers = row['粉丝数'] if pd.notna(row['粉丝数']) else 0

                    # Clean the followers value and convert to int safely
                    try:
                        # Remove any whitespace and newlines
                        followers_str = str(followers).strip()
                        if followers_str and followers_str != 'nan':
                            followers_int = int(float(followers_str))
                            print(f"- {self._extract_twitter_username(url_value)}: {followers_int} followers")
                    except (ValueError, TypeError) as e:
                        print(f"- {self._extract_twitter_username(url_value)}: Unable to parse follower count ({followers})")

    def process_followers(self):
        """
        Main method to run the follower extraction process.
        """
        print("Starting Twitter follower count extraction...")

        if not os.path.exists(self.csv_file_path):
            print(f"Error: File {self.csv_file_path} not found.")
            return

        self._backup_file()

        if not self._load_csv():
            return

        usernames_to_process = []

        for idx, row in self.df.iterrows():
            twitter_url = None

            try:
                if 'ext_info' in row and pd.notna(row['ext_info']):
                    ext_info = json.loads(row['ext_info'])
                    if 'twitterUrl' in ext_info and ext_info['twitterUrl']:
                        twitter_url = ext_info['twitterUrl']
            except Exception as e:
                print(f"Error parsing ext_info for row {idx}: {e}")

            if not twitter_url and 'url' in self.df.columns and pd.notna(row['url']):
                twitter_url = row['url']

            if twitter_url:
                username = self._extract_twitter_username(twitter_url)
                if username:
                    usernames_to_process.append((idx, username, twitter_url))

        print(f"Found {len(usernames_to_process)} Twitter usernames to process.")

        # 创建锁文件
        if not self._create_lock_file():
            return

        try:
            # 加载进度
            processed_usernames = self._load_progress()

            # 过滤掉已处理的用户
            remaining_usernames = []
            for idx, username, url in usernames_to_process:
                if username not in processed_usernames:
                    # 检查是否已有有效的粉丝数
                    if '粉丝数' in self.df.columns and pd.notna(self.df.at[idx, '粉丝数']):
                        existing_value = str(self.df.at[idx, '粉丝数']).strip()
                        if existing_value and existing_value not in ['#VALUE!', 'nan', '', 'n']:
                            try:
                                int(float(existing_value))
                                print(f"Skipping {username} - already has follower count: {existing_value}")
                                processed_usernames.add(username)
                                continue
                            except (ValueError, TypeError):
                                pass
                    remaining_usernames.append((idx, username, url))
                else:
                    print(f"Skipping {username} - already processed in previous run")

            print(f"需要处理的用户数: {len(remaining_usernames)} (总计: {len(usernames_to_process)})")

            # 处理剩余用户
            for idx, username, url in tqdm(remaining_usernames, desc="Fetching follower counts"):
                try:
                    follower_count = self._get_follower_count(username)

                    if follower_count is not None:
                        # 逐条保存数据
                        if self._save_single_row(idx, follower_count):
                            processed_usernames.add(username)
                            self.processed_count += 1
                            print(f"Updated {username} with {follower_count} followers")

                            # 每处理10个用户保存一次进度
                            if self.processed_count % 10 == 0:
                                self._save_progress(processed_usernames)
                        else:
                            print(f"Failed to save data for {username}")
                    else:
                        print(f"Could not get follower count for {username}")
                        processed_usernames.add(username)  # 标记为已处理以避免重复尝试

                except Exception as e:
                    print(f"处理用户 {username} 时发生错误: {e}")
                    continue

            # 保存最终进度
            self._save_progress(processed_usernames)

        finally:
            # 清理锁文件
            if os.path.exists(self.lock_file):
                os.remove(self.lock_file)

        self._generate_summary(len(usernames_to_process))

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description='Twitter Follower Extractor')
    parser.add_argument('--clean', action='store_true', help='清理进度文件重新开始')
    parser.add_argument('--file', default='用户活动报名信息.csv', help='CSV文件路径')
    args = parser.parse_args()

    CSV_FILE_PATH = args.file

    extractor = TwitterFollowerExtractor(CSV_FILE_PATH)

    if args.clean:
        # 清理进度文件和锁文件
        if os.path.exists(extractor.progress_file):
            os.remove(extractor.progress_file)
            print("已清理进度文件")
        if os.path.exists(extractor.lock_file):
            os.remove(extractor.lock_file)
            print("已清理锁文件")

    extractor.process_followers()

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert