#!/usr/bin/env python
# -*- coding: utf-8 -*-
import csv
import json
import re
import time
import requests
import os
import pandas as pd
from tqdm import tqdm
import fcntl
import signal
import sys
from datetime import datetime
class TwitterFollowerExtractor:
"""
A class to extract Twitter follower counts from a CSV file.
"""
# API configuration
X_RAPIDAPI_KEY = "xxx"
RAPIDAPI_HOST = "twitter-v1-1-v2-api.p.rapidapi.com"
ENDPOINT = "https://twitter-v1-1-v2-api.p.rapidapi.com/graphql/UserByScreenName"
def __init__(self, csv_file_path):
"""
Initialize the extractor with the path to the CSV file.
:param csv_file_path: The path to the CSV file.
"""
self.csv_file_path = csv_file_path
self.df = None
self.progress_file = csv_file_path + '.progress'
self.lock_file = csv_file_path + '.lock'
self.processed_count = 0
self.setup_signal_handlers()
def setup_signal_handlers(self):
"""设置信号处理器以优雅地处理程序中断"""
def signal_handler(signum, frame):
print(f"n收到信号 {signum},正在安全退出...")
self._cleanup_and_exit()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def _cleanup_and_exit(self):
"""清理资源并退出"""
if os.path.exists(self.lock_file):
os.remove(self.lock_file)
print("程序已安全退出。")
sys.exit(0)
def _create_lock_file(self):
"""创建锁文件防止多个实例同时运行"""
if os.path.exists(self.lock_file):
print(f"检测到锁文件 {self.lock_file},可能有其他实例正在运行。")
print("如果确认没有其他实例运行,请删除锁文件后重试。")
return False
try:
with open(self.lock_file, 'w') as f:
f.write(f"{os.getpid()}n{datetime.now()}")
return True
except Exception as e:
print(f"创建锁文件失败: {e}")
return False
def _save_progress(self, processed_usernames):
"""保存进度到文件"""
try:
with open(self.progress_file, 'w') as f:
json.dump({
'processed_usernames': list(processed_usernames),
'processed_count': self.processed_count,
'last_update': datetime.now().isoformat()
}, f, indent=2)
except Exception as e:
print(f"保存进度失败: {e}")
def _load_progress(self):
"""从文件加载进度"""
if not os.path.exists(self.progress_file):
return set()
try:
with open(self.progress_file, 'r') as f:
progress_data = json.load(f)
processed_usernames = set(progress_data.get('processed_usernames', []))
self.processed_count = progress_data.get('processed_count', 0)
print(f"从断点继续: 已处理 {len(processed_usernames)} 个用户")
return processed_usernames
except Exception as e:
print(f"加载进度失败: {e}")
return set()
def _save_single_row(self, idx, follower_count):
"""逐条保存单行数据到CSV文件"""
try:
# 更新DataFrame
if '粉丝数' not in self.df.columns:
self.df['粉丝数'] = None
self.df.at[idx, '粉丝数'] = follower_count
# 立即写入CSV文件
self._save_csv()
return True
except Exception as e:
print(f"保存单行数据失败: {e}")
return False
def _extract_twitter_username(self, url):
"""Extract Twitter username from URL."""
if not url:
return None
# Handle both profile URLs and status URLs
status_pattern = r'(?:https?://)?(?:www.)?(?:x.com|twitter.com)/([^/]+)/status/'
profile_pattern = r'(?:https?://)?(?:www.)?(?:x.com|twitter.com)/([^/?]+)'
# First try to extract from status URL
match = re.search(status_pattern, url)
if match:
username = match.group(1)
# Skip if username is "status" (malformed URL)
if username.lower() == 'status':
return None
return username
# Then try to extract from profile URL
match = re.search(profile_pattern, url)
if match:
username = match.group(1)
# Remove any query parameters if present
username = username.split('?')[0]
# Skip if username is "status" (malformed URL)
if username.lower() == 'status':
return None
return username
return None
def _get_follower_count(self, username):
"""Get follower count for a Twitter username using RapidAPI with retry logic."""
if not username:
return None
headers = {
"X-RapidAPI-Key": self.X_RAPIDAPI_KEY,
"X-RapidAPI-Host": self.RAPIDAPI_HOST
}
# Prepare variables according to the correct API format
variables = {
"screen_name": username,
"withSafetyModeUserFields": True,
"withHighlightedLabel": True
}
querystring = {"variables": json.dumps(variables)}
# Implement retry logic
max_retries = 3
retry_delay = 2 # seconds
for attempt in range(max_retries):
try:
response = requests.get(self.ENDPOINT, headers=headers, params=querystring)
if response.status_code == 200:
data = response.json()
# Extract follower count from the response using the correct path
if "data" in data and "user" in data["data"] and data["data"]["user"]:
user_result = data["data"]["user"]["result"]
if "legacy" in user_result:
return user_result["legacy"]["followers_count"]
else:
print(f"No user data found for {username}")
else:
print(f"API request failed for {username} (Attempt {attempt+1}/{max_retries}): Status code {response.status_code}")
# If we're not on the last attempt, wait before retrying
if attempt < max_retries - 1:
print(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
except Exception as e:
print(f"Error fetching data for {username} (Attempt {attempt+1}/{max_retries}): {e}")
# If we're not on the last attempt, wait before retrying
if attempt < max_retries - 1:
print(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
return None
def _backup_file(self):
"""Create a backup of the original CSV file."""
backup_file = self.csv_file_path + '.backup'
try:
with open(self.csv_file_path, 'rb') as src, open(backup_file, 'wb') as dst:
dst.write(src.read())
print(f"Created backup of original file at {backup_file}")
except Exception as e:
print(f"Warning: Could not create backup file: {e}")
def _load_csv(self):
"""Load the CSV file into a pandas DataFrame with enhanced compatibility."""
try:
# Try different encoding methods for better compatibility
encodings = ['utf-8-sig', 'utf-8', 'gbk', 'gb2312', 'latin-1']
df_loaded = False
for encoding in encodings:
try:
self.df = pd.read_csv(self.csv_file_path, encoding=encoding)
df_loaded = True
print(f"Successfully loaded CSV with {len(self.df)} rows using {encoding} encoding.")
break
except UnicodeDecodeError:
continue
except Exception as e:
print(f"Error with {encoding} encoding: {e}")
continue
if not df_loaded:
print("Failed to load CSV with any encoding method.")
return False
# Clean up the DataFrame columns and data
self._clean_dataframe()
return True
except Exception as e:
print(f"Error reading CSV file: {e}")
return False
def _clean_dataframe(self):
"""Clean the DataFrame to handle malformed data."""
# Clean column names by removing newlines and extra whitespace
self.df.columns = [col.strip().replace('n', '').replace('r', '') for col in self.df.columns]
# Clean the '粉丝数' column if it exists
if '粉丝数' in self.df.columns:
# Remove newlines and extra whitespace from the follower count column
self.df['粉丝数'] = self.df['粉丝数'].astype(str).str.strip().str.replace('n', '').str.replace('r', '')
# Replace empty strings with None
self.df['粉丝数'] = self.df['粉丝数'].replace('', None)
# Clean other string columns
for col in self.df.columns:
if self.df[col].dtype == 'object':
self.df[col] = self.df[col].astype(str).str.strip().str.replace('n', '').str.replace('r', '')
# Replace 'nan' strings with None
self.df[col] = self.df[col].replace('nan', None)
def _save_csv(self):
"""Save the updated DataFrame back to the CSV file."""
try:
self.df.to_csv(self.csv_file_path, index=False, encoding='utf-8-sig')
print(f"Process completed. Follower counts have been updated in {self.csv_file_path}.")
except Exception as e:
print(f"Error saving updated CSV: {e}")
print("Please check the backup file if needed.")
def _generate_summary(self, processed_count):
"""Generate and print a summary of the results."""
if '粉丝数' in self.df.columns:
total_updated = self.df['粉丝数'].notna().sum()
print(f"nSummary:")
print(f"Total Twitter accounts processed: {processed_count}")
print(f"Successfully updated follower counts: {total_updated}")
print(f"Failed to update follower counts: {processed_count - total_updated}")
# Print top 10 accounts by follower count
if total_updated > 0:
print("nTop 10 accounts by follower count:")
top_accounts = self.df[self.df['粉丝数'].notna()].sort_values('粉丝数', ascending=False).head(10)
for _, row in top_accounts.iterrows():
url_value = row['url'] if 'url' in row and pd.notna(row['url']) else "N/A"
followers = row['粉丝数'] if pd.notna(row['粉丝数']) else 0
# Clean the followers value and convert to int safely
try:
# Remove any whitespace and newlines
followers_str = str(followers).strip()
if followers_str and followers_str != 'nan':
followers_int = int(float(followers_str))
print(f"- {self._extract_twitter_username(url_value)}: {followers_int} followers")
except (ValueError, TypeError) as e:
print(f"- {self._extract_twitter_username(url_value)}: Unable to parse follower count ({followers})")
def process_followers(self):
"""
Main method to run the follower extraction process.
"""
print("Starting Twitter follower count extraction...")
if not os.path.exists(self.csv_file_path):
print(f"Error: File {self.csv_file_path} not found.")
return
self._backup_file()
if not self._load_csv():
return
usernames_to_process = []
for idx, row in self.df.iterrows():
twitter_url = None
try:
if 'ext_info' in row and pd.notna(row['ext_info']):
ext_info = json.loads(row['ext_info'])
if 'twitterUrl' in ext_info and ext_info['twitterUrl']:
twitter_url = ext_info['twitterUrl']
except Exception as e:
print(f"Error parsing ext_info for row {idx}: {e}")
if not twitter_url and 'url' in self.df.columns and pd.notna(row['url']):
twitter_url = row['url']
if twitter_url:
username = self._extract_twitter_username(twitter_url)
if username:
usernames_to_process.append((idx, username, twitter_url))
print(f"Found {len(usernames_to_process)} Twitter usernames to process.")
# 创建锁文件
if not self._create_lock_file():
return
try:
# 加载进度
processed_usernames = self._load_progress()
# 过滤掉已处理的用户
remaining_usernames = []
for idx, username, url in usernames_to_process:
if username not in processed_usernames:
# 检查是否已有有效的粉丝数
if '粉丝数' in self.df.columns and pd.notna(self.df.at[idx, '粉丝数']):
existing_value = str(self.df.at[idx, '粉丝数']).strip()
if existing_value and existing_value not in ['#VALUE!', 'nan', '', 'n']:
try:
int(float(existing_value))
print(f"Skipping {username} - already has follower count: {existing_value}")
processed_usernames.add(username)
continue
except (ValueError, TypeError):
pass
remaining_usernames.append((idx, username, url))
else:
print(f"Skipping {username} - already processed in previous run")
print(f"需要处理的用户数: {len(remaining_usernames)} (总计: {len(usernames_to_process)})")
# 处理剩余用户
for idx, username, url in tqdm(remaining_usernames, desc="Fetching follower counts"):
try:
follower_count = self._get_follower_count(username)
if follower_count is not None:
# 逐条保存数据
if self._save_single_row(idx, follower_count):
processed_usernames.add(username)
self.processed_count += 1
print(f"Updated {username} with {follower_count} followers")
# 每处理10个用户保存一次进度
if self.processed_count % 10 == 0:
self._save_progress(processed_usernames)
else:
print(f"Failed to save data for {username}")
else:
print(f"Could not get follower count for {username}")
processed_usernames.add(username) # 标记为已处理以避免重复尝试
except Exception as e:
print(f"处理用户 {username} 时发生错误: {e}")
continue
# 保存最终进度
self._save_progress(processed_usernames)
finally:
# 清理锁文件
if os.path.exists(self.lock_file):
os.remove(self.lock_file)
self._generate_summary(len(usernames_to_process))
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Twitter Follower Extractor')
parser.add_argument('--clean', action='store_true', help='清理进度文件重新开始')
parser.add_argument('--file', default='用户活动报名信息.csv', help='CSV文件路径')
args = parser.parse_args()
CSV_FILE_PATH = args.file
extractor = TwitterFollowerExtractor(CSV_FILE_PATH)
if args.clean:
# 清理进度文件和锁文件
if os.path.exists(extractor.progress_file):
os.remove(extractor.progress_file)
print("已清理进度文件")
if os.path.exists(extractor.lock_file):
os.remove(extractor.lock_file)
print("已清理锁文件")
extractor.process_followers()