本文继前两篇文章之后,将前两篇生成图片的文字自动化爬取生成,
爬取zhihu的部分问答数据,仅作本人的学习使用。
import time
import requests
import json
import base64
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from faker import Faker
# Create UserAgent and Faker objects
ua = UserAgent()
fake = Faker()
# Generate a random User-Agent and fake IP address
random_ua = ua.random
fake_ip = fake.ipv4()
# Set headers and proxy
headers = {
'User-Agent': random_ua,
'referer': 'https://www.zhihu.com/',
'cookie' : '你自己cookie‘
}
proxies = {
"http": f"http://{fake_ip}",
"https": f"http://{fake_ip}"
}
def request_page(url, limit, offset):
params = {'limit': limit, 'offset': offset}
response = requests.get(url, params=params, headers=headers).content.decode('utf-8')
return json.loads(response)
def is_base64_encoded(data):
try:
if isinstance(data, str):
# Check if data can be decoded from base64
base64.b64decode(data)
return True
return False
except Exception:
return False
url = 'https://www.zhihu.com/api/v4/columns/c_1315712092947886080/items'
output_file = "D:/data.txt"
limit = 10
offset = 0
try:
with open(output_file, "w", encoding='utf-8') as file:
for i in range(300):
print(f"Fetching data batch {i + 1} with offset {offset}")
json_response = request_page(url, limit, offset)
print(f"Received response: {json_response}")
for data in json_response['data']:
time.sleep(5)
article_id = data['id']
content_url = f"https://zhuanlan.zhihu.com/p/{article_id}"
print(f"Fetching content from: {content_url}")
content = requests.get(content_url, headers=headers).content.decode('utf-8')
# Check if content is base64 encoded
if is_base64_encoded(content):
try:
decoded_content = base64.b64decode(content)
except Exception as e:
print(f"Failed to decode base64 content for article ID: {article_id} - {e}")
continue
else:
decoded_content = content
# Parse the HTML content
soup = BeautifulSoup(decoded_content, 'html.parser')
article_content_div = soup.find('div', class_="RichText ztext Post-RichText css-1ygg4xu")
if article_content_div is None:
print(f"No content found for article ID: {article_id}")
continue
article_content = article_content_div.text.strip()
qa_list = article_content.split("Q:")
qa_list.pop(0)
for qa in qa_list:
try:
index = qa.index("@")
question = qa[:index].strip()
answer = qa[index:].strip()
print(f'Q: {question}\n')
print(f'A: {answer}\n')
# Write to the local text file
file.write(f'Q: {question}\n')
file.write(f'A: {answer}\n\n')
except Exception as e:
file.close()
print(f"An exception occurred while processing Q&A: {e}")
offset += limit
except Exception as e:
file.close()
print(f"An exception occurred during file operations: {e}")
上述代码会生成一个data.txt
文件,文件格式如下: