1010import shutil # 适用于Linux和macOS
1111import sys
1212import re
13- import aiohttp
14- import requests
13+ import datetime
1514from urllib .parse import urlparse , parse_qs
1615from PyQt5 .QtWidgets import QApplication
1716from FB_loginwin import win_main
1817from playwright .async_api import async_playwright
1918from FB_status import StatusWindow
2019from database_manager import db_manager
21-
20+ from concurrent . futures import ThreadPoolExecutor
2221
2322class Crawler :
2423 def __init__ (self , cookies , params ):
@@ -40,6 +39,8 @@ def __init__(self, cookies, params):
4039 self .supportId = ""
4140 self .post_user_cunt = 0
4241 self .post_name = ""
42+ self .eq_type = None
43+ self .device = params .get ('device' )
4344
4445 async def safe_update_status (self , text ):
4546 """安全的异步状态更新"""
@@ -527,7 +528,7 @@ async def getusers(self):
527528 if not addresses :
528529 print ("没有提供地址列表" )
529530 return
530-
531+ self . eq_type = 1
531532 for i in range (len (addresses )):
532533 url = addresses [i ].strip ()
533534 # 创建一个新的CSV文件名
@@ -545,6 +546,8 @@ async def getusers(self):
545546 groups_num_selector = '//span[@dir="auto"]/div//a[@role="link" and contains(text(), "成員")]'
546547 groups_num_function = await self .page .wait_for_selector (groups_num_selector , timeout = 10000 )
547548 self .groups_num = await groups_num_function .inner_text ()
549+ day_match = re .search (r'\d+' , self .groups_num )
550+ self .groups_num = day_match .group ()
548551 print (self .groups_num )
549552 await self .robust_update_status (f"社團名:{ self .groups_name } 社團人數:{ self .groups_num } " )
550553 except Exception as e :
@@ -590,7 +593,7 @@ async def getusers(self):
590593 'name' : text .strip (),
591594 'user_id' : user_id
592595 })
593- print (f"{ user_counter } :{ user_id } { text .strip ()} " )
596+ # print(f"{user_counter}:{user_id} {text.strip()}")
594597 await self .robust_update_status (f"{ user_counter } :{ user_id } { text .strip ()} " )
595598 in_csv_data .append ([user_id , text .strip (), self .extract_group_id (url )])
596599
@@ -616,20 +619,42 @@ async def getusers(self):
616619 if scroll_attempts >= 3 : # 连续3次没有新用户就停止
617620 print ("已加载所有用户" )
618621 break
619- # 将数据写入CSV文件
620- with open (csv_filename , 'w' , newline = '' , encoding = 'utf-8' ) as csvfile :
621- csv_writer = csv .writer (csvfile )
622- csv_writer .writerow (['userid' , 'username' , 'societiesid' ]) # 写入表头
623- csv_writer .writerows (in_csv_data ) # 写入数据
624- print (f"爬取完成,共获取 { user_counter } 个用户信息" )
622+ if len (users ) > 0 :
623+ # 将数据写入CSV文件
624+ with open (csv_filename , 'w' , newline = '' , encoding = 'utf-8' ) as csvfile :
625+ csv_writer = csv .writer (csvfile )
626+ csv_writer .writerow (['userid' , 'username' , 'societiesid' ]) # 写入表头
627+ csv_writer .writerows (in_csv_data ) # 写入数据
628+ print (f"爬取完成,共获取 { user_counter } 个用户信息" )
629+ # 使用线程池执行数据库提交(避免阻塞主线程)
630+ loop = asyncio .get_event_loop ()
631+ with ThreadPoolExecutor () as executor :
632+ await loop .run_in_executor (
633+ executor ,
634+ submit_data_to_database ,
635+ csv_filename ,
636+ i ,
637+ self .eq_type ,
638+ self .extract_group_id (url ), # 社团ID
639+ self .groups_name , # 社团名称
640+ self .groups_num , # 总成员数
641+ user_counter # 实际获取数量
642+ )
643+
644+ await self .robust_update_status (f"{ csv_filename } 数据提交完成" )
645+ upend_time = datetime .datetime .now ()
646+ db_manager .update_updata_table (self .device , len (users ), upend_time )
647+ else :
648+ print ("無用戶" )
649+
625650 # return users
626651
627652 async def getusers_fans (self ):
628653 addresses = self .params .get ('addresses' , [])
629654 if not addresses :
630655 print ("没有提供地址列表" )
631656 return
632-
657+ self . eq_type = 2
633658 for i in range (len (addresses )):
634659 url = addresses [i ].strip ()
635660 # 创建一个新的CSV文件名
@@ -703,7 +728,7 @@ async def getusers_fans(self):
703728 'name' : text .strip (),
704729 'user_id' : user_id
705730 })
706- print (f"{ user_counter } :{ user_id } { text .strip ()} " )
731+ # print(f"{user_counter}:{user_id} {text.strip()}")
707732 await self .robust_update_status (f"{ user_counter } :{ user_id } { text .strip ()} " )
708733 in_csv_data .append ([user_id , text .strip (), await self .extract_facebook_identifier (url )])
709734
@@ -729,12 +754,32 @@ async def getusers_fans(self):
729754 if scroll_attempts >= 3 : # 连续3次没有新用户就停止
730755 print ("已加载所有用户" )
731756 break
732- # 将数据写入CSV文件
733- with open (csv_filename , 'w' , newline = '' , encoding = 'utf-8' ) as csvfile :
734- csv_writer = csv .writer (csvfile )
735- csv_writer .writerow (['userid' , 'username' , 'societiesid' ]) # 写入表头
736- csv_writer .writerows (in_csv_data ) # 写入数据
737- print (f"爬取完成,共获取 { user_counter } 个用户信息" )
757+ if len (users ) > 0 :
758+ # 将数据写入CSV文件
759+ with open (csv_filename , 'w' , newline = '' , encoding = 'utf-8' ) as csvfile :
760+ csv_writer = csv .writer (csvfile )
761+ csv_writer .writerow (['userid' , 'username' , 'societiesid' ]) # 写入表头
762+ csv_writer .writerows (in_csv_data ) # 写入数据
763+ print (f"爬取完成,共获取 { user_counter } 个用户信息" )
764+ # 使用线程池执行数据库提交(避免阻塞主线程)
765+ loop = asyncio .get_event_loop ()
766+ with ThreadPoolExecutor () as executor :
767+ await loop .run_in_executor (
768+ executor ,
769+ submit_data_to_database ,
770+ csv_filename ,
771+ i ,
772+ self .eq_type ,
773+ self .extract_fans_id (url ), # 粉丝专页ID
774+ self .groups_name , # 粉丝专页名称
775+ self .groups_num , # 总粉丝数
776+ user_counter # 实际获取数量
777+ )
778+ await self .robust_update_status (f"{ csv_filename } 数据提交完成" )
779+ upend_time = datetime .datetime .now ()
780+ db_manager .update_updata_table (self .device , len (users ), upend_time )
781+ else :
782+ print ("無用戶" )
738783
739784 async def getusers_like (self ):
740785 await self .page .goto (url = "https://www.facebook.com/" , wait_until = 'load' , timeout = 50000 )
@@ -987,6 +1032,8 @@ async def get_sponsor_user(self):
9871032 }
9881033 print ('提交数据' , post_info )
9891034 db_manager .insert_post_info (post_info )
1035+ upend_time = datetime .datetime .now ()
1036+ db_manager .update_updata_table (self .device , len (users ), upend_time )
9901037 else :
9911038 print ('数据为空不提交' )
9921039 await self .robust_update_status (f"沒有成員跳過~" )
@@ -1206,6 +1253,56 @@ async def force_minimize_browser(self):
12061253 self .minimize_browser_window ()
12071254
12081255
1256+ def submit_data_to_database (csv_filename , batch_number , eq_type , societies_url_id , societies_name , total_number ,
1257+ getnum ):
1258+ """提交数据到数据库"""
1259+ try :
1260+ with open (csv_filename , 'r' , newline = '' , encoding = 'utf-8' ) as csvfile :
1261+ csv_reader = csv .reader (csvfile )
1262+ next (csv_reader ) # 跳过表头
1263+ data = list (csv_reader )
1264+ print (f"批次 { batch_number } : 开始提交数据..." )
1265+
1266+ batch_size = 80 # 每批80条
1267+ for i in range (0 , len (data ), batch_size ):
1268+ batch_data = data [i :i + batch_size ]
1269+
1270+ if eq_type == 1 :
1271+ # 插入社团用户表
1272+ success = db_manager .insert_societies_user_batch (batch_data )
1273+ else :
1274+ # 插入粉丝用户表
1275+ success = db_manager .insert_fans_user_batch (batch_data )
1276+
1277+ if not success :
1278+ print (f"批次 { batch_number } 第 { i // batch_size + 1 } 批数据插入失败" )
1279+ continue
1280+
1281+ print (f"批次 { batch_number } 第 { i // batch_size + 1 } 批数据插入成功" )
1282+
1283+ # 批次间等待
1284+ sleep_time = random .uniform (8 , 14 )
1285+ time .sleep (sleep_time )
1286+
1287+ print (f"批次 { batch_number } : 提交完成,共 { len (data )} 条数据" )
1288+
1289+ # 所有批次数据提交完成后,插入汇总信息
1290+ if eq_type == 1 :
1291+ # 插入社团汇总信息
1292+ db_manager .insert_societies_inf (societies_url_id , societies_name , total_number , getnum )
1293+ else :
1294+ # 插入粉丝专页汇总信息
1295+ db_manager .insert_fans_inf (societies_url_id , societies_name , total_number , getnum )
1296+
1297+ # 提交完成后删除CSV文件
1298+ os .remove (csv_filename )
1299+ print (f"完成: { csv_filename } " )
1300+ return True
1301+
1302+ except Exception as e :
1303+ print (f"提交数据到数据库时出错: { str (e )} " )
1304+ return False
1305+
12091306def parse_bool (type_data ):
12101307 type_data = str (type_data ).lower ().strip ()
12111308 return type_data in ('true' , '1' , 'yes' , 'yes' )
0 commit comments