from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import time
import re
import pandas as pd
import os
Codes=['000627','300022','000554','000963','000417','000701','002589',002251 ','000632',’000096’]
def get_table_sse(code): #√
'''
Get HTML source of 年报链接
:param code: 证券代码,深交所上市公司
:type string: str
:return: None
:rtype: None
'''
browser = webdriver.Edge()
url='http://www.szse.cn/disclosure/listedinfo/regular/'
browser.get(url)
browser.set_window_size(1550, 830)
time.sleep(3)
browser.find_element(By.ID, "inputCode").click()
browser.find_element(By.ID, "inputCode").send_keys(code) #'000672'
time.sleep(3)
selector='.sse_outerItem:nth-child(4) .filter-option-inner-inner'
browser.find_element(By.CSS_SELECTOR,selector).click()
browser.find_element(By.LINK_TEXT, "年报").click()
time.sleep(3)
#
selector = "body > div.container.sse_content > div > "
selector += "div.col-lg-9.col-xxl-10 > div > "
selector += "div.sse_colContent.js_regular > "
selector += "div.table-responsive > table"
#
element = browser.find_element(By.CSS_SELECTOR,selector)
table_html = element.get_attribute('innerHTML')
#
fname=f'{code}.html'
f = open(fname,'w',encoding='utf-8')
f.write(table_html)
f.close()
#
browser.quit()
def get_table_sse_codes(codes):
for code in codes:
get_table_sse(code)
def get_data(tr):
p_td = re.compile('(.*?)', re.DOTALL)
tds = p_td.findall(tr)
#
s = tds[0].find('>') + 1
e = tds[0].rfind('<')
code = tds[0][s:e]
#
s = tds[1].find('>') + 1
e = tds[1].rfind('<')
name = tds[1][s:e]
#
s = tds[2].find('href="') + 6
e = tds[2].find('.pdf"') + 4
href = 'http://www.sse.com.cn' + tds[2][s:e]
s = tds[2].find('$(this))">') + 10
e = tds[2].find('')
title = tds[2][s:e]
#
date = tds[3].strip()
data = [code,name,href,title,date]
return(data)
def parse_table(fname,save=True): #√
f=open(fname,encoding='utf-8')
html=f.read()
f.close()
#
p = re.compile('(.+?)', re.DOTALL)
trs = p.findall(html)
#
trs_new = []
for tr in trs:
if tr.strip() != '':
trs_new.append(tr)
#
data_all = [get_data(tr) for tr in trs_new[1:]]
df = pd.DataFrame({
'code': [d[0] for d in data_all],
'name': [d[1] for d in data_all],
'href': [d[2] for d in data_all],
'title': [d[3] for d in data_all],
'date': [d[4] for d in data_all]
})
#
if save:
df.to_csv(f'{fname[0:-5]}.csv')
import requests
def download_pdf(href, code, year):
"""
下载单份年报,自动命名保存
herf: download link address,
typestring:str
code:证券代码,
year:年报年份,
string: str
return:None
rtype:None
"""
r = requests.get(href,allow_redirects=True)
fname = f'{code}_{year}.pdf'
f = open(fname,'wb')
f.write(r.content)
f.close
#
r.close
def download_pdfs(hrefs,code,years):
for i in range(len(hrefs)):
href = hrefs[i]
year = years[i]
download_pdf(href,code,year)
time.sleep(30)
return()
def download_pdfs_codes(list_hrefs,codes,list_years):
for i in range(len(list_hrefs)):
hrefs = list_hrefs[i]
years = list_years[i]
code = codes[i]
download_pdfs(hrefs, code, years)
return()
hrefs=[]
for i in range(10):
hrefs.append(list(df_all_n[i]['href']))
years=[]
for i in range(10):
years.append(list(df_all_n[i]['date']))
download_pdfs_codes(hrefs,Codes,years)
import fitz
import pandas as pd
def get_csv(doc,bounds=('公司简称 ','办公地址'):
start_pageno = 0
end_pageno = len(doc) - 1
lb,ub=bounds
for n in range(len(doc)):
page = doc[n]
txt = page.get_text()
if lb in txt:
start_pageno = n; break
for n in range(start_pageno,len(doc)):
if ub in doc[n].get_text():
end_pageno = n+1; break
txt1 = ''
for n in range(start_pageno,end_pageno):
page = doc[n]
txt1 += page.get_text()
return(txt1)
#提取基本信息CSV
import numpy as np
na=[np.nan]*len(Codes)
csv2=pd.DataFrame(data={'公司简称':Codes,
'股票代码':na,
'办公地址':na,
'公司网址':na,}
col=csv2.columns
p_list=['公司的中文简称.*?\n(.*?)\n',
'股票代码.*?\n(.*?)\n',
'公司办公地址.*?\n(.*?)\n',
'公司网址.*?\n(.*?)\n',]
file_list=os.listdir()
file_list=[i for i in file_list if i.endswith('.pdf') and '2023' in i]
for n in range(len(file_list)):
# n=4
filename = file_list[n]
doc = fitz.open(filename)
csv1=get_csv(doc)
i=1
for p in p_list:
try:
p1 = re.compile(p,re.DOTALL)
# p1 = re.compile(p_list[0],re.DOTALL)
IR = p1.findall(csv1)[0]
# IR
csv2.loc[n,col[i]]=IR
# csv2.loc[4,col[1]]=基本信息
i+=1
except:
i+=1
csv2.to_csv('公司基本信息.csv')
# import pdfplumber
def get_rev(doc,bounds=('主要会计数据和财务指标','主要财务指标')):
start_pageno = 0
end_pageno = len(doc) - 1
lb,ub=bounds
for n in range(len(doc)):
page = doc[n]
txt = page.get_text()
if lb in txt:
start_pageno = n; break
for n in range(start_pageno,len(doc)):
if ub in doc[n].get_text():
end_pageno = n+1; break
txt1 = ''
for n in range(start_pageno,end_pageno):
page = doc[n]
txt1 += page.get_text()
return(txt1)
for n in range(len(doc)):
page = doc[n]
txt = page.get_text()
if lb in txt:
start_pageno = n; break
for n in range(start_pageno,len(doc)):
if ub in doc[n].get_text():
end_pageno = n+1; break
txt1 = ''
for n in range(start_pageno,end_pageno):
page = doc[n]
txt1 += page.get_text()
return(txt1)
d_list=['营业收入.*?\n(.*?)\s',
'归属于股东的净利润.*?\n(.*?)\s']
for n in range(len(Codes)):
# n=0
file_list=os.listdir()
file_list=[i for i in file_list if i.endswith('.pdf') and Codes[n] in i]
for f in range(len(file_list)):
# f=0
try:
doc = fitz.open(file_list[f])
txt11=get_rev(doc)
p1 = re.compile(d_list[0],re.DOTALL)
r = p1.findall(txt11)[0]
r=r.replace(',','')
r=round(float(r)/(10**8),2)
csv_rev.loc[n,file_list[f][7:11]]=r
except:
pass
try:
txt12=get_netp(doc)
p2 = re.compile(d_list[1],re.DOTALL)
netp = p2.findall(txt12)[0]
netp=netp.replace(',','')
netp=round(float(netp)/(10**8),2)
csv_netp.loc[n,file_list[f][7:11]]=netp
except:
pass
#数据整理
csv_netp=csv_netp.T
csv_netp = csv_netp.sort_index(ascending=True)
# csv_rev=csv_rev.apply(pd.to_numeric,errors='ignore')
# csv_netp=csv_netp.apply(pd.to_numeric,errors='ignore')
csv_rev=csv_rev.fillna(0)
csv_netp=csv_netp.fillna(0)
index1=[ int(i)-1 for i in csv_rev.index]
csv_rev['index1']=index1
csv_rev=csv_rev.set_index(['index1'])
index2=[ int(i)-1 for i in csv_netp.index]
csv_netp['index1']=index2
csv_netp=csv_netp.set_index(['index1'])
csv_rev.to_csv('公司营业收入.csv')
csv_netp.to_csv('归属于股东的净利润.csv')
import matplotlib.pyplot as plt
import numpy as np
import tool
def draw_pics_twinx(df):
plt.rcParams['figure.dpi'] = 200
plt.rcParams['axes.unicode_minus']=False #用来正常显示负号
plt.rcParams['font.sans-serif'] = ['SimHei'] # 使图片显示中文
x = df['year']
tool.to_year_list(x)
y_rt = df['营业收入']
tool.to_num_list(y_rt)
y_ni = df['归属于股东的净利润']
tool.to_num_list(y_ni)
fig = plt.figure()
ax1 = fig.subplots()
ax1.plot(x, y_rt,'steelblue',label="营业收入",linestyle='-',linewidth=2,
marker='o',markeredgecolor='pink',markersize='2',markeredgewidth=2)
ax1.set_xlabel('年份')
ax1.set_ylabel('营业收入')
for i in range(len(x)):
plt.text(x[i],y_rt[i],(y_rt[i]),fontsize = '10')
ax1.legend(loc = 6)
ax2 = ax1.twinx()
ax2.plot(x, y_ni, 'orange',label = "归属于股东的净利润",linestyle='-',linewidth=2,
marker='o',markeredgecolor='teal',markersize='2',markeredgewidth=2)
ax2.set_ylabel('归属于股东的净利润')
for i in range(len(x)):
plt.text(x[i],y_ni[i],(y_ni[i]),fontsize = '10')
ax2.legend(loc = 2)
'''
title部分必须放最后,否则会出现左边的y轴有重复刻度的问题
'''
title = df['name'][0] + '历年' + '财务数据'
plt.title(title)
plt.savefig('../pics/' + title + '.png')
plt.show()
首先感谢老师在这一学期教学过程的细心教导和耐心帮助。其次我检讨自己,因为这学期的有几次课我是因为准备省赛而无法来上课,希望老师谅解。在完成这次实验报告的过程中,由于我对电脑以及编程的基础薄弱,导致我遇到了很多麻烦,意识到了自己的不足之处,最后尽我所能完成的报告。最后再次感谢老师。