自动爬取代理IP例子
2024-08-30 01:27:05
import time
import json
import datetime
import threading
import requests
from lxml import etree
from queue import Queue # 爬取免费代理IP 来源xicidaili.com
# 多线程验证代理ip是否可用
class ProxyTest:
def __init__(self):
self.test_url = "http://pv.sohu.com/cityjson?ie=utf-8"
self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36",}
self.request_queue = Queue() def parse_url(self, url, proxies, timeout=3):
return requests.get(url, headers=self.headers, proxies=proxies, timeout=timeout).content.decode() # 请求
def request(self):
while True:
# 获取ip地址
ip = self.request_queue.get() # 发起请求
try:
starttime = datetime.datetime.now()
html_str = self.parse_url(self.test_url, proxies={"http": ip}, timeout=5)
endtime = datetime.datetime.now()
use_time = endtime - starttime
except Exception as e:
# 请求超时
print("timeout %s" % ip)
self.request_queue.task_done()
continue # 检查返回html
try:
json_dict = json.loads(html_str[19:-1])
except:
print("fail %s, use time %d" % (ip, use_time.seconds))
self.request_queue.task_done()
continue if ip.startswith("http://"+json_dict["cip"]):
# 代理可用
print("success %s, use time %d, %s" % (ip, use_time.seconds, html_str))
self.request_queue.task_done()
# 保存到文件
with open("proxy_ok_ip.json", "a", encoding="utf-8") as f:
f.write(ip)
f.write("\n")
else:
# ip不是高匿代理
print("%s invalid, use time %d" % (ip, use_time.seconds))
self.request_queue.task_done() def run(self):
# 读取ip地址文件 并存储到队列中
with open("proxy.json", "r", encoding="utf-8") as f:
for line in f:
self.request_queue.put(line.strip()) # 遍历,发送请求,获取响应
for i in range(30):
# daemon=True 把子线程设置为守护线程,该线程不重要主线程结束,子线程结束
threading.Thread(target=self.request, daemon=True).start() self.request_queue.join() #让主线程等待阻塞,等待队列的任务完成之后再完成 print("主线程结束") class Proxy:
def __init__(self):
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36"
} def start_urls_superfastip(self):
return ["http://www.superfastip.com/welcome/freeip/%d" % i for i in range(1,11)] def get_content_list_superfastip(self, html_str):
content_list = []
html = etree.HTML(html_str)
tr_list = html.xpath('/html/body/div[3]/div/div/div[2]/div/table/tbody/tr')
for tr in tr_list:
if tr.xpath('./td[4]/text()')[0].strip() == 'HTTP':
item = {}
item["ip"] = tr.xpath('./td[1]/text()')[0].strip()
item["port"] = tr.xpath('./td[2]/text()')[0].strip()
content_list.append(item) return content_list def start_urls_xici(self):
return ["http://www.xicidaili.com/nn/%d" % i for i in range(1,6)] def get_content_list_xici(self, html_str):
content_list = []
html = etree.HTML(html_str)
tr_list = html.xpath('//table[@id="ip_list"]/tr')[1:]
for tr in tr_list:
item = {}
item["ip"] = tr.xpath('./td[2]/text()')[0].strip()
item["port"] = tr.xpath('./td[3]/text()')[0].strip()
content_list.append(item)
return content_list def start_urls_kuaidaili(self):
return ["https://www.kuaidaili.com/free/inha/%d/" % i for i in range(1, 11)] def get_content_list_kuaidaili(self, html_str):
content_list = []
html = etree.HTML(html_str)
tr_list = html.xpath('//div[@id="list"]/table/tbody/tr')
for tr in tr_list:
item = {}
item["ip"] = tr.xpath('./td[1]/text()')[0].strip()
item["port"] = tr.xpath('./td[2]/text()')[0].strip()
content_list.append(item)
return content_list def start_urls_89ip(self):
return ["http://www.89ip.cn/index_%d.html" % i for i in range(1, 11)] def get_content_list_89ip(self, html_str):
content_list = []
html = etree.HTML(html_str)
tr_list = html.xpath('//div[@class="layui-form"]/table/tbody/tr')
for tr in tr_list:
item = {}
item["ip"] = tr.xpath('./td[1]/text()')[0].strip()
item["port"] = tr.xpath('./td[2]/text()')[0].strip()
content_list.append(item)
return content_list def parse_url(self, url):
return requests.get(url, headers=self.headers).content.decode() def save_content_list(self, content_list):
with open("proxy.json", "a", encoding="utf-8") as f:
for ip in content_list:
f.write("http://%s:%s" % (ip["ip"], ip["port"]))
f.write("\n") def run(self):
# 构造请求地址列表
start_urls_xici = self.start_urls_xici()
start_urls_89ip = self.start_urls_89ip()
start_urls_kuaidaili = self.start_urls_kuaidaili()
start_urls_superfastip = self.start_urls_superfastip() all_content_list = [] # 存放所有爬取到的ip for url in start_urls_superfastip:
html_str = self.parse_url(url) # 获取响应
content_list = self.get_content_list_superfastip(html_str) # 处理响应
all_content_list.extend(content_list) # 将结果加到列表里
time.sleep(0.2) for url in start_urls_xici:
html_str = self.parse_url(url) # 获取响应
content_list = self.get_content_list_xici(html_str) # 处理响应
all_content_list.extend(content_list) # 将结果加到列表里
time.sleep(0.2) for url in start_urls_kuaidaili:
html_str = self.parse_url(url)
content_list = self.get_content_list_kuaidaili(html_str)
all_content_list.extend(content_list)
time.sleep(0.2) for url in start_urls_89ip:
html_str = self.parse_url(url)
content_list = self.get_content_list_89ip(html_str)
all_content_list.extend(content_list)
time.sleep(0.2)
print("抓取完成")
self.save_content_list(all_content_list) if __name__ == '__main__':
# 抓取数据
spider = Proxy()
spider.run() # 检测ip是否可用
proxy = ProxyTest()
proxy.run()
print("最后可以用的代理IP在proxy_ok_ip.json")
最新文章
- JavaScript--面向对象--猜拳游戏
- 转 Microsoft's Objective-C tech started on BlackBerryOS, Tizen
- Java Volatile关键字
- Linux链接库一(动态库,静态库,库放在什么路径下)
- 一键安装GitLab7
- Linux显示全部执行中的进程
- KindEditor 修改多图片上传显示限制大小和张数
- 【转】在Ubuntu上下载、编译和安装Android最新源代码
- WindowsForm 公共控件 菜单和工具栏
- MYSQL 没有varchar(max)这个类型。
- 要不要用gzip优化前端项目
- BEX5下新建任务到待办任务
- Linux中VIM的使用
- CSS 实现隐藏滚动条同时又可以滚动
- Web开发之404小结
- jquery给按钮绑定事件
- C++ 纯虚方法
- [strace]跟踪进程的系统调用
- VS2010编译错误:是否忘记了向源中添加“#include ";stdafx.h
- chromium对网页获取favicon
热门文章
- 2019-11-29-dotnet-core-使用-CoreRT-将程序编译为-Native-程序
- adb shell命令模拟按键/输入input使用keycode 列表详解
- SpringCloud系列(一):Eureka 注册中心
- spring中spEL常用应用场景
- 牛客假日团队赛6 D	迷路的牛 (思维)
- 重大更新:DeepFaceLab更新至2019.12.20
- spring+mybatis事务配置(转载)
- bootstrap-table.min.js不同版本返回分页参数不同的问题
- 移动端布局基础viewport
- Python之模块和包补充