Python获取上市公司报告,AI分析助力投资决策

折腾了几天,通过从巨潮信息网上获取上市公司的报告,然后实现调用大语言模型的API去分析报告内容,下面把相应的代码和过程分享给对这个感兴趣的兄弟姐妹们,希望能帮到大家。

1,首先去巨潮信息网首页,右上角有个查询,输入相应的关键字就能获取上市公司的公告,比如我这里输入“变更会计师事务所

可以看到下面的内容

这些链接打开后,就是一个个的pdf报告

如何批量下载这些报告呢,可以用python去实现,

可以先找到这个pdf文件的data-id值,

然后在idm下载地址中发现下载地址都是下面的格式,最后就是data-id加pdf命名

找到这个规律后,写出python代码如下:

import os
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
from bs4 import BeautifulSoup
import time

from selenium.webdriver.chrome.options import Options  # 导入Options类

def download_pdfs_after_n_pages(start_click, max_clicks, url, save_dir, temp_dir):
    # 设置Selenium选项,以无头模式运行Chrome
    options = Options()
    options.headless = True
    options.add_argument("--window-size=1920,1080")

    # 创建WebDriver实例
    driver = webdriver.Chrome(options=options)

    # 打开网页
    driver.get(url)

    # 用于存储所有页面的链接
    all_links = []

    # 设置翻页计数器
    click_counter = 0

    # 循环直到达到最大翻页次数
    while click_counter < max_clicks:
        # 如果当前点击次数大于或等于指定的开始点击次数,则开始收集链接
        if click_counter >= start_click:
            soup = BeautifulSoup(driver.page_source, 'html.parser')
            a_tags = soup.select("#fulltext-search > div:nth-child(2) > div > div > div:nth-child(3) > div.tab-content > div.el-table-box > div > div.el-table__body-wrapper.is-scrolling-none > table > tbody > tr > td.el-table_1_column_2 > div > a")
            for a in a_tags:
                href_parts = a['href'].split('&')
                announcement_id = href_parts[1].split('=')[1]
                announcement_time = href_parts[2].split('=')[1]
                pdf_url = f"http://static.cninfo.com.cn/finalpage/{announcement_time}/{announcement_id}.PDF"
                sec_name_span = a.select_one("span > span > span.secNameSuper")
                if sec_name_span:
                    file_name = sec_name_span.get('title').replace(":", "")
                    pdf_file_name = f"{file_name}.PDF"
                else:
                    pdf_file_name = f"{announcement_id}.PDF"
                all_links.append((pdf_url, pdf_file_name))

        # 检查是否存在下一页按钮
        try:
            next_button = WebDriverWait(driver, 10).until(
                EC.element_to_be_clickable((By.CLASS_NAME, 'btn-next'))
            )
        except TimeoutException:
            break

        # 如果下一页按钮存在,模拟点击,并增加点击计数器
        if next_button:
            next_button.click()
            click_counter += 1
            time.sleep(5)
        else:
            break

    # 关闭WebDriver
    driver.quit()

    # 创建新的保存目录
    new_save_dir = os.path.join(save_dir, 'new')
    os.makedirs(new_save_dir, exist_ok=True)

    # 下载PDF文件
    for link, pdf_file_name in all_links:
        # 清理文件名,移除特殊字符和大写字母A或B
        clean_file_name = "".join(char for char in pdf_file_name if char.isalnum() or char in ('.', '_'))
        clean_file_name = clean_file_name.replace('A', '').replace('B', '')
        pdf_file_path = os.path.join(new_save_dir, clean_file_name)

        # 检查临时目录中是否已存在该文件
        temp_file_path = os.path.join(temp_dir, clean_file_name)
        if not os.path.exists(temp_file_path):
            print(f"Downloading {link}")
            try:
                response = requests.get(link, stream=True)
                if response.status_code == 200:
                    with open(pdf_file_path, 'wb') as f:
                        for chunk in response.iter_content(chunk_size=8192):
                            f.write(chunk)
                    # 等待1秒钟再继续下载
                    time.sleep(1)

            except requests.exceptions.RequestException as e:
                print(f"An error occurred: {e}")

    print("Download completed.")

# 调用函数,指定不需要点击翻页就开始下载链接,且只点击一次翻页按钮(实际上不点击)
download_pdfs_after_n_pages(0, 1, 'http://www.cninfo.com.cn/new/fulltextSearch?notautosubmit=&keyWord=%E5%8F%98%E6%9B%B4%E4%BC%9A%E8%AE%A1%E5%B8%88%E4%BA%8B%E5%8A%A1%E6%89%80', r'C:\temp\123\pdf\', r'C:\temp\123\pdf\old\')

运行效果如下,自动翻页去获取dom:

将pdf下载到设定的文件夹下

2,开始将pdf转换成txt文件,代码如下:

import os
import PyPDF2

def process_pdfs_in_folder(pdf_folder_path, output_folder_path):
    # 确保输出文件夹存在
    if not os.path.exists(output_folder_path):
        os.makedirs(output_folder_path)

    # 遍历文件夹中的所有PDF文件
    for file_name in os.listdir(pdf_folder_path):
        if file_name.lower().endswith('.pdf'):
            pdf_file_path = os.path.join(pdf_folder_path, file_name)
            # 获取PDF文件名(不带扩展名)
            pdf_file_name = os.path.splitext(file_name)[0]

            try:
                # 打开PDF文件
                with open(pdf_file_path, 'rb') as file:
                    reader = PyPDF2.PdfReader(file)
                    text = ""

                    # 遍历PDF中的每一页
                    for page in reader.pages:
                        text += page.extract_text()

                # 去掉空格和回车
                text = text.replace(" ", "").replace("\n", "")

                # 将提取的文本保存到文本文件
                output_file_path = os.path.join(output_folder_path, f"{pdf_file_name}.txt")
                with open(output_file_path, 'w', encoding='utf-8') as file:
                    file.write(text)
            except PyPDF2.errors.PdfReadError as e:
                print(f"Error processing file {pdf_file_path}: {e}")

# 调用方法
pdf_folder = r'C:\temp\123\pdf\'  # 替换为PDF文件所在的文件夹路径
output_text_folder = r'C:\temp\123\txt'  # 输出文本文件的文件夹路径
process_pdfs_in_folder(pdf_folder, output_text_folder)

运行后将相应的pdf文件变成了txt文件:

3,利用python读取txt文本的内容,将文本内容发送给大语言模型,让大语言模型分析文字内容,输出相应的json格式的数据,将json数据写入到excel中,代码如下:

我这里用的是零一万物的api,目前开发者申请送60元调用额度,这个调用方法和chatgpt一样的代码,只需要换 key和模型名称就行了,然后,prompt可以要求大模型按照需求输出json格式的数据,我的prompt是这样的。

“请你根据我提供给你的文字,不用其他废话,只需要从我给的文字中提取4个字段,1,这个公告的证券代码,2这个公告的证券名称,3,这个公告聘任的2024年的会计师事务所的名称。,4,2024年聘任的会计师事务所的审计费用。将这4个字段生成json格式给我。回答只需要json格式的数据,如果没找到值就为null,其他不用废话。严格按照这下面4个字段返回数据,'证券代码', '证券名称','会计师事务所名称','审计费用'。”

import pandas as pd
import os
import json
import time
from openai import OpenAI

# 设置延迟时间,单位为秒
delay_time = 2  # 等待3秒

def chat_with_kimi(user_input):
    client = OpenAI(
        api_key="api key",
        base_url="https://api.lingyiwanwu.com/v1",
    )
    try:
        completion = client.chat.completions.create(
            model="yi-34b-chat-0205", #模型名称
            messages=[
                {"role": "system", "content": "请你根据我提供给你的文字,不用其他废话,只需要从我给的文字中提取4个字段,1,这个公告的证券代码,2这个公告的证券名称,3,这个公告聘任的2024年的会计师事务所的名称。,4,2024年聘任的会计师事务所的审计费用。将这4个字段生成json格式给我。回答只需要json格式的数据,如果没找到值就为null,其他不用废话。严格按照这下面4个字段返回数据,'证券代码', '证券名称','会计师事务所名称','审计费用'。"},
                {"role": "user", "content": user_input}
            ],
            temperature=0.3,
        )
        response = completion.choices[0].message.content
        print(f"Received response: {response}")
        time.sleep(delay_time)
        return response
    except Exception as e:
        if "Rate limit reached" in str(e):
            print("Rate limit reached. Waiting for 30 seconds before retrying.")
            time.sleep(30)  # 增加等待时间以避免频繁的API调用
            return chat_with_kimi(user_input)
        else:
            print(f"Error during API call: {e}")
            return None

# 遍历指定文件夹下的txt文件
for filename in os.listdir('C:/temp/123/txt'):
    if os.path.splitext(filename)[1].lower() == '.txt':
        try:
            with open(os.path.join('C:/temp/123/txt', filename), 'r', encoding='utf-8') as file:
                user_input = file.read()
            print(f"Processing file: {filename}")

            # 运行聊天函数获取JSON数据
            response = chat_with_kimi(user_input)
            if response is None:
                continue  # 如果API调用失败,则跳过当前文件

            # 尝试解析JSON数据
            try:
                # 移除响应中的反引号
                response_cleaned = response.replace('```json', '').replace('```', '')
                json_data = json.loads(response_cleaned)
                print(f"JSON data extracted: {json_data}")
                # 将JSON数据转换为DataFrame
                df = pd.DataFrame([json_data])

                # 检查文件是否存在
                if os.path.exists('b.xlsx'):
                    # 如果文件存在,读取现有数据
                    existing_df = pd.read_excel('b.xlsx')
                    # 将新数据追加到现有数据
                    df = pd.concat([existing_df, df], ignore_index=True)

                # 将DataFrame写入Excel文件
                df.to_excel('b.xlsx', sheet_name='sheet1', index=False)
                print(f"Data saved to b.xlsx")
            except json.JSONDecodeError:
                print("Error decoding JSON from response. Skipping this file.")

            # 删除已处理的txt文件
            os.remove(os.path.join('C:/temp/123/txt', filename))
            print(f"File {filename} has been deleted.")
        except Exception as e:  # 捕获所有可能的文件处理错误
            print(f"Error processing file {filename}: {e}")

print("Processing complete.")

运行后,就在运行的目录下生成了一个b.xlsx文件,打开文件就可以看到如下数据

我感觉用这个方法,可以分析上市公司公布的减持或者预增公告,然后让大语言模型去分析这些公告,给出一些投资建议,今天分享就是这些,希望能帮到有需要的朋友们。

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-20 15:08:10       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-20 15:08:10       100 阅读
  3. 在Django里面运行非项目文件

    2024-04-20 15:08:10       82 阅读
  4. Python语言-面向对象

    2024-04-20 15:08:10       91 阅读

热门阅读

  1. rpc简介

    2024-04-20 15:08:10       38 阅读
  2. Nginx基础(04)

    2024-04-20 15:08:10       30 阅读
  3. 数据仓库元数据管理

    2024-04-20 15:08:10       28 阅读
  4. MySQL常见故障现象分析及解决办法

    2024-04-20 15:08:10       32 阅读
  5. 红黑树(Red-Black Tree)

    2024-04-20 15:08:10       49 阅读
  6. 由于bug发现的现象

    2024-04-20 15:08:10       38 阅读
  7. void * 指针的作用_C

    2024-04-20 15:08:10       109 阅读
  8. 若依前端分离版中使用二维码功能

    2024-04-20 15:08:10       103 阅读