import os import re import threading import time from lxml import etree import requests from bs4 import BeautifulSoup from database import MyDataBase from utils import make_user_agent
注意:database和utils是自己写的。没有注释,不懂就问
先运行CrawlWangYiYunSinger,不然数据库没有歌手的表!!!!!!!!!!!!!
大概几万个,很快下载完
utils.make_agent()返回的是{"Agent":"..."}
database数据库的包,复制即用:Python操作Mysql数据库-CSDN博客
声明:内容只用于学习交流,不可用于任何商业用途!
一、日志
方便查看爬取情况
class Logger:
def __init__(self, path):
self.path = path
self.log_path = path + "/logs.txt"
self.create()
def create_parent(self):
if not os.path.exists(self.path):
os.makedirs(self.path)
def create(self):
self.create_parent()
if not os.path.exists(self.log_path):
with open(self.log_path, "w", encoding='utf-8') as f:
pass
def clear(self):
with open(self.log_path, "w", encoding='utf-8') as f:
pass
def delete(self):
os.remove(self.log_path)
def info(self, content):
with open(self.log_path, "a", encoding="utf-8") as f:
t = time.strftime("[%Y-%m-%d %H:%M:%S]")
s = f"{t}\t{content}"
f.write(f"{s}\n")
print(s)
二、爬取歌手到数据库
class CrawlWangYiYunSinger(threading.Thread):
def __init__(self):
super().__init__(target=self.run)
self.cookie = '_iuqxldmzr_=32; _ntes_nnid=0e6e1606eb78758c48c3fc823c6c57dd,1527314455632; ' \
'_ntes_nuid=0e6e1606eb78758c48c3fc823c6c57dd; __utmc=94650624; __utmz=94650624.1527314456.1.1.' \
'utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); WM_TID=blBrSVohtue8%2B6VgDkxOkJ2G0VyAgyOY;' \
' JSESSIONID-WYYY=Du06y%5Csx0ddxxx8n6G6Dwk97Dhy2vuMzYDhQY8D%2BmW3vlbshKsMRxS%2BJYEnvCCh%5CKY' \
'x2hJ5xhmAy8W%5CT%2BKqwjWnTDaOzhlQj19AuJwMttOIh5T%5C05uByqO%2FWM%2F1ZS9sqjslE2AC8YD7h7Tt0Shufi' \
'2d077U9tlBepCx048eEImRkXDkr%3A1527321477141; __utma=94650624.1687343966.1527314456.1527314456' \
'.1527319890.2; __utmb=94650624.3.10.1527319890'
self.agent = make_user_agent()['User-Agent']
self.headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Cookie': self.cookie,
'Host': 'music.163.com',
'Referer': 'http://music.163.com/',
'Upgrade-Insecure-Requests': '1',
'User-Agent': self.agent}
self.DB = MyDataBase()
self.artists_sheet = "artists"
def create_artists_table(self):
self.DB.connect()
kwargs = {
"id": "int primary key auto_increment",
"artist_id": "varchar(128)",
"artist": "varchar(128)",
}
self.DB.create_table(self.artists_sheet, kwargs)
def get_artist(self, url):
r = requests.get(url, headers=self.headers)
soup = BeautifulSoup(r.text, 'html.parser')
for artist in soup.find_all('a', attrs={'class': 'nm nm-icn f-thide s-fc0'}):
artist_name = artist.string
artist_id = artist['href'].replace('/artist?id=', '').strip()
data = [artist_id, artist_name]
self.DB.insert_data(self.artists_sheet, field=("artist_id", "artist"), data=data)
def get_artist_url(self):
ids = [1001, 1002, 1003, 2001, 2002, 2003, 6001, 6002, 6003, 7001, 7002, 7003, 4001, 4002, 4003] # id的值
initials = [-1, 0, 65, 66, 67, 68, 69, 70,
71, 72, 73, 74, 75, 76, 77, 78, 79, 80,
81, 82, 83, 84, 85, 86, 87, 88, 89, 90] # initial的值
for _id in ids:
for initial in initials:
url = 'http://music.163.com/discover/artist/cat?id=' + str(_id) + '&initial=' + str(initial)
try:
self.get_artist(url)
except Exception as err:
print("获取错误:", err)
def run(self):
self.create_artists_table()
try:
self.get_artist_url()
except Exception as err:
print(err)
三、爬取单个歌手的音乐的子线程
class CrawlWangYiYunSingerMusic(threading.Thread):
def __init__(self, artist_id, artist, database, num=None, save_path="F:/wyy/"):
super().__init__(target=self.run)
self.artist_id = artist_id
self.artist = artist
self.headers = {
'Referer': 'http://music.163.com',
'Host': 'music.163.com',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
'User-Agent': make_user_agent()["User-Agent"]
}
self.url = f'https://music.163.com/song?id='
self.download_url = f'https://link.hhtjim.com/163/'
self.artist_url = f'https://music.163.com/artist?id={self.artist_id}'
self.save_path = save_path
self.unknown_singer_songs_path = self.save_path + "/未知/"
self.Logger = Logger(self.save_path)
self.num = num # 歌手的数据库编号
self.flag = False
self.DB = MyDataBase()
self.downloaded_sheet = "downloaded_sheet"
self.undownload_sheet = "undownload_sheet"
def make_file(self):
if not os.path.exists(self.save_path):
os.makedirs(self.save_path)
self.Logger.info(f"文件夹{self.save_path}\t创建成功!")
if not os.path.exists(self.unknown_singer_songs_path):
os.makedirs(self.unknown_singer_songs_path)
self.Logger.info(f"文件夹{self.unknown_singer_songs_path}\t创建成功!")
def make_artist_file(self):
artist_path = self.save_path + "/" + self.artist
try:
if not os.path.exists(artist_path):
os.makedirs(artist_path)
return artist_path
except Exception as err:
self.Logger.info(f"{artist_path}创建失败:{err}!")
return self.unknown_singer_songs_path
def create_downloaded_table(self):
kwargs = {
"id": "int primary key auto_increment",
"artist_id": "varchar(128)",
"music_id": "varchar(128)",
"artist": "varchar(128)",
"title": "varchar(128)",
}
if self.downloaded_sheet in self.DB.get_tables():
return
self.DB.create_table(self.downloaded_sheet, kwargs)
def create_undownload_table(self):
kwargs = {
"id": "int primary key auto_increment",
"artist_id": "varchar(128)",
"music_id": "varchar(128)",
"artist": "varchar(128)",
"title": "varchar(128)",
}
if self.undownload_sheet in self.DB.get_tables():
return
self.DB.create_table(self.undownload_sheet, kwargs)
def save_downloaded_music_info(self, data):
filed = ("artist_id", "music_id", "artist", "title")
self.DB.insert_data(self.downloaded_sheet, filed, data)
def save_undownload_music_info(self, data):
filed = ("artist_id", "music_id", "artist", "title")
self.DB.insert_data(self.undownload_sheet, filed, data)
def check_save(self, tbname, music_id, title, artist):
records = self.DB.select_table_record(tbname, f"where music_id={str(music_id)}")
for record in records:
if music_id in record:
self.Logger.info(f"已下载:{music_id}\t<<{title}>>\t{artist}")
return True
else:
return False
def process_music_url_path(self, music_id, title):
artist_path = self.make_artist_file()
music_url = f"{self.download_url}{music_id}.mp3"
music_path = f"{artist_path}/{title}_{self.artist}.mp3"
return music_url, music_path, artist_path
def process_music_id(self):
resp = requests.get(self.artist_url, headers=self.headers)
html = etree.HTML(resp.text)
href_xpath = "//*[@id='hotsong-list']//a/@href"
hrefs = html.xpath(href_xpath)
for href in hrefs:
music_id = href.split("=")[1]
vip, title, artist = self.process_url(music_id)
if vip == "播放":
music_url, music_path, artist_path = self.process_music_url_path(music_id, title)
if not self.check_save(self.downloaded_sheet, music_id, title, artist):
self.download_music(music_id, title, artist_path)
data = [self.artist_id, music_id, self.artist, title]
self.save_downloaded_music_info(data)
else:
if not self.check_save(self.undownload_sheet, music_id, title, artist):
data = [self.artist_id, music_id, self.artist, title]
self.save_undownload_music_info(data)
def process_url(self, music_id):
url = f"{self.url}{music_id}"
response = requests.get(url, headers=make_user_agent()).text
resp = response.replace('<!--', '').replace('-->', '')
soup = BeautifulSoup(resp, "html.parser")
vip_h = soup.find("a", attrs={"data-res-action": "play"}) # 播放 /VIP尊享/None
title_h = soup.find("div", attrs={"class": "tit"}) # 歌名
singer_h = soup.find_all("a", attrs={"class": "s-fc7"}) # 作者
vip = vip_h.text if vip_h else ""
title = title_h.text if title_h else "无"
artist = singer_h[1].text if singer_h else "无"
vip = re.sub(r'[\s]+', '', vip)
title = re.sub(r'[\s]+', '', title).replace("/", "-").replace("*", "x")
artist = re.sub(r'[\s]+', '', artist).replace("/", "-")
return vip, title, artist
def download_music(self, music_id, title, artist_path):
music_url = f"https://link.hhtjim.com/163/{music_id}.mp3"
music_data = requests.get(music_url).content
music_path = f"{artist_path}/{title}_{self.artist}.mp3"
with open(music_path, 'wb') as file:
file.write(music_data)
self.Logger.info(
f"【{self.num}】ARTIST_ID:{self.artist_id}\tMUSIC_ID:{music_id}:\t<<{title}>>\t{self.artist}")
def run(self):
self.make_file()
self.DB.connect()
self.create_downloaded_table()
self.create_undownload_table()
try:
self.process_music_id()
except Exception as err:
print(err)
finally:
self.DB.close()
self.flag = True
四、写一个控制子线程的主线程
class ThreadController:
def __init__(self, save_path: str, start=1, end=10, size=10, length=10):
self.save_path = save_path
self.start = start
self.end = end
self.size = size
self.length = length # 单线程获取数据数量
self.thread_dict = {}
self.Logger = Logger(self.save_path)
self.tag = 1
self.db = MyDataBase()
self.Logger.info(
f"\n已开启线程管理!\n前线程上限:{size}!\n线程数据上限:{length}!\n线程起始位置:{self.start}-{self.end}!")
def add_thread(self, tag, t):
self.thread_dict[tag] = t
def remove_thread(self):
for kv in list(self.thread_dict.items()):
if kv[1].flag:
del self.thread_dict[kv[0]]
self.Logger.info(f"{kv[0]}号线程已结束!")
def operation(self):
if self.start < self.end:
data = self.db.select_table_record("artists", f"where id={self.start}")
i, artist_id, artist = data[0]
wyys = CrawlWangYiYunSingerMusic(database=self.db, artist_id=artist_id, artist=artist, num=i,
save_path=self.save_path)
wyys.start()
self.Logger.info(f"{self.tag}号线程已开启!")
self.add_thread(self.tag, wyys)
self.tag += 1
self.start += 1
else:
if not len(self.thread_dict):
return True
self.remove_thread()
def run(self):
self.db.connect()
while True:
if len(self.thread_dict) >= self.size:
self.remove_thread()
continue
if self.operation():
self.db.close()
self.Logger.info("线程全部结束!")
break