代码实现以下操作:十年内PDF文件下载
1.解析出主要会计数据和财务指标表格
2.将公司资料里的公司网址 办公地址 电子信箱 董秘的姓名电话号码 电子信箱信息找出来
保存为CSV文件
3.选一家公司将其营业收入、归属于上市公司股东的净利润的十年时间序列数据用折线图展示
4.将10家公司的营业收入十年的数据在一张图上进行展示
#下载年报
import time
import requests
def download_pdf(href,code,year):
r=requests.get(href,allow_redirects=True)
fname=f'{code}_{year}.pdf'
f=open(fname,"wb")
f.write(r.content)
f.close()
#
r.close()
def download_pdfs(hrefs,code,years):
for i in range(len(hrefs)):
href=hrefs[i]
year=years[i]
download_pdf(href,code,year)
time.sleep(30)
return()
#sse包
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
import time
import re
import pandas as pd
def get_table_sse(code):
browser = webdriver.Chrome()
browser.set_window_size(1552, 840)
url='http://www.sse.com.cn/disclosure/listedinfo/regular/'
browser.get(url)
time.sleep(3)
browser.find_element(By.ID, "inputCode").click()
browser.find_element(By.ID, "inputCode").send_keys(code)
time.sleep(3)
selector =".sse_outerItem:nth-child(4) .filter-option-inner-inner"
browser.find_element(By.CSS_SELECTOR, selector).click()
browser.find_element(By.LINK_TEXT,'年报').click()
time.sleep(3)
#
selector = "body > div.container.sse_content > div > div.col-lg-9.col-xxl-10 > div > div.sse_colContent.js_regular > div.table-responsive > table"
element=browser.find_element(By.CSS_SELECTOR, selector)
table_html = element.get_attribute('innerHTML')
#
fname=f'{code}.html'
f = open(fname,'w',encoding='utf-8')
f.write(table_html)
f.close()
#
browser.quit()
def get_table_sse_codes(codes):
for code in codes:
get_table_sse(code)
def get_data(tr):
p_td = re.compile('(.*?)', re.DOTALL)
tds = p_td.findall(tr)
#
s = tds[0].find('>') + 1
e = tds[0].rfind('<')
code = tds[0][s:e]
#
s = tds[1].find('>') + 1
e = tds[1].rfind('<')
name = tds[1][s:e]
#
s = tds[2].find('href="') + 6
e = tds[2].find('.pdf"') + 4
href = 'http://www.sse.com.cn' + tds[2][s:e]
s = tds[2].find('$(this))">') + 10
e = tds[2].find('')
title = tds[2][s:e]
#
date = tds[3].strip()
data = [code,name,href,title,date]
return(data)
# data = get_data(trs_new[1])
def parse_table(code,save=True):
fname=f'{code}.html'
f = open(fname, encoding='utf-8')
html = f.read()
f.close()
#
p = re.compile('(.+?) ', re.DOTALL)
trs = p.findall(html)
#
trs_new = []
for tr in trs:
if tr.strip() != '':
trs_new.append(tr)
#
data_all = [get_data(tr) for tr in trs_new[1:]]
df = pd.DataFrame({
'code': [d[0] for d in data_all],
'name': [d[1] for d in data_all],
'href': [d[2] for d in data_all],
'title': [d[3] for d in data_all],
'date': [d[4] for d in data_all]
})
#
if save:
df.to_csv(f'{fname[0:-5]}.CSV')
return(df)
#筛选保留年报链接
import datetime
def filter_words(words,df,include=True):
ls=[]
for word in words:
if include:
#ls.append
ls.append([word in f for f in df["title"]])
else:
ls.append([word not in f for f in df["title"]])
index=[]
for r in range(len(df)):
flag = not include
for c in range(len(words)):
if include:
flag=flag or ls[c][r]
else:
flag = flag and ls[c][r]
index.append(flag)
df2=df[index]
return(df2)
def filter_date(star,end,df):
date=df['date']
v=[d>=star and d<=end for d in date]
df_new=df[v]
return(df_new)
def start_end_10y():
dt_now=datetime.datetime.now()
current_year=dt_now.year
start=f'{current_year-9}-01-01'
end=f'{current_year}-12-31'
return(start,end)
def filter_nb_10y(df,keep_words=['年报','年度报告'],exclude_words=['摘要'],start=''):
if start =='':
start,end=start_end_10y()
else:
start_y=int(start[0:4])
end=f'{start_y + 9}-12-31'
#
df=filter_words(keep_words, df,include=True)
df=filter_words(exclude_words, df,include=False)
df=filter_date(start, end, df)
return(df)
def prepare_hrefs_years(df):
hrefs=df['href'].to_list()
years=[int(d[:4])-1 for d in df['date']]
return(hrefs,years)
#测试所写代码,成功爬取年报
from sse import get_table_sse,get_table_sse_codes,parse_table
from filter_url import filter_words,filter_date,filter_nb_10y,prepare_hrefs_years
import pandas as pd
from download import download_pdfs,download_pdf
codes=['603908','603877', '603839', '603808', '603511', '603555', '601566', '600400', '600177', '600398']
for code in codes:
get_table_sse(code)
df = parse_table(code)
csv_final=filter_nb_10y(df,keep_words=['年报','年度报告'],exclude_words=['摘要'],start='')
hrefs,years=prepare_hrefs_years(csv_final)
pdf=download_pdfs(hrefs,code,years)
#解析年报,获取营业收入和归属于上市公司股东的净利润
import fitz
import pandas as pd
import re
from pprint import pprint
def get_subtxt(doc,bounds=('主要会计数据和财务指标','总资产')):
#默认设置为首尾页码
start_pageno=0
end_pageno=len(doc)-1
#
lb,ub=bounds
#获取左界页码
for n in range(len(doc)):
page=doc[n]
txt=page.get_text()
if lb in txt:
start_pageno=n
break
#获取右界页码
for n in range(start_pageno,len(doc)):
if ub in doc[n].get_text():
end_pageno=n
break
#获取小范围内字符串
txt=''
for n in range(start_pageno,end_pageno+1):
page=doc[n]
txt += page.get_text()
return(txt)
def get_th_span(txt):
nianfen='(20\d\d|199\d)\s*?年'
s=f'{nianfen}\s*{nianfen}.*?{nianfen}'
p=re.compile(s,re.DOTALL) #re.DOTALL指.遇到换行符也是可以的
matchobj=p.search(txt)
#
end=matchobj.end()
year1=matchobj.group(1)
year2=matchobj.group(2)
year3=matchobj.group(3)
#
flag=(int(year1)-int(year2) == 1) and (int(year2)-int(year3) == 1)
#
while (not flag):
matchobj=p.search(txt[end:])
end=matchobj.end()
year1=matchobj.group(1)
year2=matchobj.group(2)
year3=matchobj.group(3)
flag=(int(year1)-int(year2) == 1)
flag=flag and (int(year2)-int(year3) ==1)
return(matchobj.span())
def get_bounds(txt):
th_span_1st=get_th_span(txt)
end=th_span_1st[1]
th_span_2nd=get_th_span(txt[end:])
th_span_2nd=(end+th_span_2nd[0],end+th_span_2nd[1])
#
s=th_span_1st[1]
e=th_span_2nd[0]-1
#
while (txt[e] not in '0123456789'):
e=e-1
return(s,e)
def get_keywords(txt):
p=re.compile(r'\d+\s+([\u2E80-\u9FFF]+)')
keywords=p.findall(txt)
keywords.insert(0,'营业收入')
return(keywords)
def parse_key_fin_data(subtxt,keywords):
ss=[]
s=0
for kw in keywords:
n=subtxt.find(kw,s)
ss.append(n)
s=n+len(kw)
ss.append(len(subtxt))
data=[]
p=re.compile('\D+(?:\s+\D*)?(?:(.*)|\(.*\))?')
p2=re.compile('\s')
for n in range(len(ss)-1):
s=ss[n]
e=ss[n+1]
line=subtxt[s:e]
#获取可能换行的账户名称
matchobj=p.search(line)
account_name=p2.sub('',matchobj.group())
#获取三年数据
amnts=line[matchobj.end():].split()
#加上账户名称
amnts.insert(0,account_name)
#追加到总数据
data.append(amnts)
return data
def get_account_data(account,txt):
p_txt='%s\D*?(\d{1,3}(?:,\d{3})*(?:\.\d+)?)' % account
p=re.compile(p_txt)
matchobj=p.search(txt)
amt=matchobj.group(1)
return(amt)
codes=['603908','603877', '603839', '603808', '603511', '603555', '601566', '600400', '600177', '600398']
for code in codes:
import os
fname=[]
#遍历某文件夹下的所有pdf文件并获取文件名
def main():
file_path = f'C:/Users/hp/nianbao/src/nianbao/实验报告/{code}'
folders = os.listdir(file_path) #提取文件中的所有文件生成一个列表
for file in folders:
if(file.split('.')[-1]=='pdf'):
fname.append(file)
if __name__ == '__main__':
main()
#创建一个空表格用于存放数据
locals()[f'df_{code}']=pd.DataFrame(index=range(2013,2023),
columns=['营业收入(元)','归属于上市公司股东的净利润(元)'])
for f in fname:
doc=fitz.open(f'C:/Users/hp/nianbao/src/nianbao/实验报告/{code}/{f}')
#解析表格
txt=get_subtxt(doc)
revenue=get_account_data('营业收入',txt)
profit=get_account_data('\s*'.join('归属于上市公司股东的净利润'),txt)
#利用正则表达式获取正在解析年报的所属年份
text=''
for i in range(20): #读取每份年报前20页的数据
page = doc[i]
text += page.get_text()
p_year=re.compile('.*?(\d{4}) .*?年度报告.*?') #捕获目前在匹配的年报年份
year = int(p_year.findall(text)[0])
locals()[f'df_{code}'].loc[year,'营业收入(元)']=revenue
locals()[f'df_{code}'].loc[year,'归属于上市公司股东的净利润(元)']=profit
locals()[f'df_{code}'].to_csv(f'C:/Users/hp/nianbao/src/nianbao/实验报告/数据/{code}.csv')
#提取相关信息
import pdfplumber
def remove(original_list):
# 移除空字符串
return [item for item in original_list if item is not None and item != '']
def search_table(pdf_name , search_text):
target_page = None
with pdfplumber.open(pdf_name) as pdf:
for i, page in enumerate(pdf.pages):
text = page.extract_text()
if search_text in text:
target_page = i + 1 # 页数从 1 开始计数
break # 找到第一个匹配后停止搜索
page1 = pdf.pages[target_page-1]
tables = page1.extract_tables()
for index, table in enumerate(tables):
for a in table:
for b in remove(a):
if search_text in b :
print("文字 '{}' 第一次出现在页数: {},出现在这一页的第{}个表格".format(search_text, target_page,index))
return table
df_mes=pd.DataFrame(index=['办公地址','公司网址','电子信箱','董秘的姓名','董秘的电话','董秘的电子信箱'],
columns=[603908,603877, 603839, 603808, 603511, 603555, 601566, 600400, 600177, 600398])
codes=[603908,603877, 603839, 603808, 603511, 603555, 601566, 600400, 600177, 600398]
table1 = search_table('C:/Users/hp/nianbao/src/nianbao/实验报告/{code}/2022年年度报告.pdf','办公地址')
print(table1[-2][1])#网址
print(table1[8][1])#办公地址
print(table1[11][1])#电子信箱
table2 = search_table('C:/Users/hp/nianbao/src/nianbao/实验报告/{code}/2022年年度报告.pdf,'董事会秘书')
print(table2[1][1]) #姓名
print(table2[3][1]) #电话
print(table2[5][1]) #信箱
#画图--单个公司
import matplotlib.pyplot as plt
import pandas as pd
from pylab import mpl
mpl.rcParams['axes.unicode_minus']=False #在图像中正常显示负号
x = [2013,2014,2015,2016,2017,2018,2019,2020,2021,2022]
data1 = pd.read_csv('C:/Users/hp/nianbao/src/nianbao/实验报告/数据/601566.csv',header=0,index_col=0,encoding='gbk')
y1 = data1.loc[:, '归属于上市公司股东的净利润(元)'] #多条曲线的y值 参数名为csv的列名
plt.title('601566') # 折线图标题
plt.rcParams['font.sans-serif'] = ['SimHei'] # 显示汉字
plt.xlabel('时间') # x轴标题
plt.ylabel('亿元') # y轴标题
plt.plot(x, y1, marker='o', markersize=3) # 绘制折线图,添加数据点,设置点的大小
plt.legend(['归属于上市公司股东的净利润'])
plt.show() # 显示折线图
plt.figure(dpi=300,figsize=(30,8))
y2 = data1.loc[:, '营业收入(元)'] #多条曲线的y值 参数名为csv的列名
plt.title('601566') # 折线图标题
plt.rcParams['font.sans-serif'] = ['SimHei'] # 显示汉字
plt.xlabel('时间') # x轴标题
plt.ylabel('亿元') # y轴标题
plt.plot(x, y1, marker='o', markersize=3) # 绘制折线图,添加数据点,设置点的大小
plt.legend(['营业收入(元)'])
plt.show() # 显示折线图
plt.figure(dpi=300,figsize=(30,8))
#十家公司
data1 = pd.read_csv('C:/Users/hp/nianbao/src/nianbao/实验报告/数据/603908.csv',header=0,index_col=0,encoding='gbk')
y1 = data1.loc[:, '营业收入(元)'] #多条曲线的y值 参数名为csv的列名
data2 = pd.read_csv('C:/Users/hp/nianbao/src/nianbao/实验报告/数据/603877.csv',header=0,index_col=0,encoding='gbk')
y2 = data2.loc[:, '营业收入(元)']
data3 = pd.read_csv('C:/Users/hp/nianbao/src/nianbao/实验报告/数据/603808.csv',header=0,index_col=0,encoding='gbk')
y3 = data3.loc[:, '营业收入(元)']
data4 = pd.read_csv('C:/Users/hp/nianbao/src/nianbao/实验报告/数据/603839.csv',header=0,index_col=0,encoding='gbk')
y4 = data4.loc[:, '营业收入(元)']
data5 = pd.read_csv('C:/Users/hp/nianbao/src/nianbao/实验报告/数据/603511.csv',header=0,index_col=0,encoding='gbk')
y5 = data5.loc[:, '营业收入(元)']
data6 = pd.read_csv('C:/Users/hp/nianbao/src/nianbao/实验报告/数据/603555.csv',header=0,index_col=0,encoding='gbk')
y6 = data6.loc[:, '营业收入(元)']
data7 = pd.read_csv('C:/Users/hp/nianbao/src/nianbao/实验报告/数据/601566.csv',header=0,index_col=0,encoding='gbk')
y7 = data7.loc[:, '营业收入(元)']
data8 = pd.read_csv('C:/Users/hp/nianbao/src/nianbao/实验报告/数据/600400.csv',header=0,index_col=0,encoding='gbk')
y8 = data8.loc[:, '营业收入(元)']
data9 = pd.read_csv('C:/Users/hp/nianbao/src/nianbao/实验报告/数据/600177.csv',header=0,index_col=0,encoding='gbk')
y9 = data9.loc[:, '营业收入(元)']
data10 = pd.read_csv('C:/Users/hp/nianbao/src/nianbao/实验报告/数据/600398.csv',header=0,index_col=0,encoding='gbk')
y10 = data10.loc[:, '营业收入(元)']
plt.title('603908') # 折线图标题
plt.rcParams['font.sans-serif'] = ['SimHei'] # 显示汉字
plt.xlabel('时间') # x轴标题
plt.ylabel('亿元') # y轴标题
plt.plot(x, y1, marker='o', markersize=3) # 绘制折线图,添加数据点,设置点的大小
plt.plot(x, y2, marker='o', markersize=3)
plt.plot(x, y3, marker='o', markersize=3)
plt.plot(x, y4, marker='o', markersize=3)
plt.plot(x, y5, marker='o', markersize=3)
plt.plot(x, y6, marker='o', markersize=3)
plt.plot(x, y7, marker='o', markersize=3)
plt.plot(x, y8, marker='o', markersize=3)
plt.plot(x, y9, marker='o', markersize=3)
plt.plot(x, y10, marker='o', markersize=3)
plt.legend(['603908','603877', '603839', '603808', '603511', '603555', '601566', '600400', '600177', '600398']) # 设置折线名称
plt.show() # 显示折线图
plt.figure(dpi=300,figsize=(30,8))
如果所写代码复杂,且没有良好的注释,那么请在这里补充解释。