import pandas as pd
import numpy as np
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from bs4 import BeautifulSoup
import requests
import time
import re
import requests
import pandas as pd
import csv
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm
from urllib.parse import urljoin
browser = webdriver.Edge()
browser.get('https://www.szse.cn/disclosure/listed/fixed/index.html')
element = browser.find_element(By.CSS_SELECTOR, "#select_gonggao .glyphicon").click()
element = browser.find_element(By.LINK_TEXT, "年度报告").click()
time.sleep(1)
firm = [['001209' ,'洪兴股份'],['002003' ,'伟星股份'],['002029' ,'七匹狼'],['002154' ,'报喜鸟'],
['002269' ,'美邦服饰'],['002291' ,'星期六'],['002404' ,'嘉欣丝绸'],['002486' ,'嘉麟杰'],
['002503' ,'搜于特']]
for i in range(len(firm)):
name = firm[i][1]
code = firm[i][0]
f = open('inner_HTML_%s.html' %name,'w',encoding='utf-8')
element = browser.find_element(By.ID, "input_code").click()
element = browser.find_element(By.ID,'input_code').send_keys('%s' %code)
time.sleep(0.5)
element = browser.find_element(By.ID, "input_code").send_keys(Keys.ENTER)
element = browser.find_element(By.ID,'disclosure-table')
time.sleep(0.5)
innerHTML = element.get_attribute('innerHTML')
f.write(innerHTML)
time.sleep(0.5)
f.close()
element = browser.find_element(By.CSS_SELECTOR, ".selected-item:nth-child(2) > .icon-remove").click()
time.sleep(0.5)
browser.quit()
def get_adress(company_list):
url = "http://www.cninfo.com.cn/new/information/topSearch/detailOfQuery"
data = {
'keyWord': company_list,
'maxSecNum': 10,
'maxListNum': 5,
}
hd = {
'Host': 'www.cninfo.com.cn',
'Origin': 'http://www.cninfo.com.cn',
'Pragma': 'no-cache',
'Accept-Encoding': 'gzip,deflate',
'Connection': 'keep-alive',
'Content-Length': '70',
'User-Agent': 'Mozilla/5.0(Windows NT 10.0;Win64;x64) AppleWebKit / 537.36(KHTML, likeGecko) Chrome / 75.0.3770.100Safari / 537.36',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Accept': 'application/json,text/plain,*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
}
r = requests.post(url, headers=hd, data=data)
print(r.text)
r = r.content
m = str(r, encoding="utf-8")
pk = json.loads(m)
orgId = pk["keyBoardList"][0]["orgId"] # 获取参数
plate = pk["keyBoardList"][0]["plate"]
code = pk["keyBoardList"][0]["code"]
print(orgId, plate, code)
return orgId, plate, code
def download_PDF(url, file_name): # 下载pdf
url = url
r = requests.get(url)
f = open(company + "/" + file_name + ".pdf", "wb")
f.write(r.content)
def get_PDF(orgId, plate, code):
url = "http://www.cninfo.com.cn/new/hisAnnouncement/query"
data = {
'stock': '{},{}'.format(code, orgId),
'tabName': 'fulltext',
'pageSize': 30,
'pageNum': 1,
'column': plate,
'category': 'category_ndbg_szsh;',
'plate': '',
'seDate': '',
'searchkey': '',
'secid': '',
'sortName': '',
'sortType': '',
'isHLtitle': 'true',
}
hd = {
'Host': 'www.cninfo.com.cn',
'Origin': 'http://www.cninfo.com.cn',
'Pragma': 'no-cache',
'Accept-Encoding': 'gzip,deflate',
'Connection': 'keep-alive',
# 'Content-Length': '216',
'User-Agent': 'User-Agent:Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US) AppleWebKit/533.20.25 (KHTML, like Gecko) Version/5.0.4 Safari/533.20.27',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Accept': 'application/json,text/plain,*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'X-Requested-With': 'XMLHttpRequest',
# 'Cookie': cookies
}
data = parse.urlencode(data)
print(data)
r = requests.post(url, headers=hd, data=data)
print(r.text)
r = str(r.content, encoding="utf-8")
r = json.loads(r)
reports_list = r['announcements']
for report in reports_list:
if '摘要' in report['announcementTitle'] or "20" not in report['announcementTitle']:
continue
if 'H' in report['announcementTitle']:
continue
else: # http://static.cninfo.com.cn/finalpage/2019-03-29/1205958883.PDF
pdf_url = "http://static.cninfo.com.cn/" + report['adjunctUrl']
file_name = report['announcementTitle']
print("正在下载:" + pdf_url, "存放在当前目录:/" + company + "/" + file_name)
download_PDF(pdf_url, file_name)
sleep(2)
company_list = companylist.loc[ : ,"上市公司简称"]
if __name__ == '__main__':
for company in company_list[:]:
os.mkdir(company)
orgId, plate, code = get_adress(company)
get_PDF(orgId, plate, code)
print("下载下一家")
print("全部下载完毕!")
import re
import pandas as pd
import os
import tool
import pdfplumber
def is_fin_number(string):
if string == '':
return False
try:
string = string.strip()
string = string.replace(',','')
except: return False
for s in string:
if s.isdigit() == True or s == '-' or s == '.' or s == ' ' or s == '\n':
continue
else:
return False
return True
def get_data(row,name_mode):
rc = re.compile(name_mode,re.DOTALL)
bound = 0
for i in range(0,len(row)):
rs = None
try:
rs = rc.search(row[i]) #row[i]可能是None
except:
continue
if rs is None:
continue
else:
bound = i
break
if rs is None: #意味着没有找到
return -1
for i in range(bound,len(row)):
if is_fin_number(row[i]) == True:
return row[i]
return 'other row' # 说明虽然匹配到了文字,但数据不在当行
#该页是否是主要会计数据和财务指标
def is_this_page(text):
mode = '\n.*?主+要+会+计+数+据+和+财+务+指+标+.*?\n'
if re.search(mode,text) is None:
return False
else:
return True
def get_twin_data(fname):
earnings = -1
try: #未知原因, 打开文件时会出现assert error
with pdfplumber.open('../pdf/' + fname) as pdf:
s = 0
for i in range(0,len(pdf.pages)):
text = pdf.pages[i].extract_text()
if is_this_page(text) == True:
s = i
break
else:
continue
page_index = 0
bound = 0
for i in range(s,s+2): #deterministic
table = pdf.pages[i].extract_table()
try: len(table)
except: continue
for j in range(0,len(table)):
e = get_data(table[j],'.*?营业收入.*?')
#此时文字和数据错行,需要继续往上搜索
if e == 'other row':
for k in range(j-1, 0,-1):
for h in range(0,len(table[k])):
if is_fin_number(table[k][h]) == True:
e = table[k][h]
break
else:
continue
else:
if is_fin_number(e) == True:
break
if e != -1:
earnings = e
bound = j
break
else:
continue
if earnings == -1:
continue
page_index = i
break
#循环结束仍然没有获得营业收入
if earnings == 0:
return None
net_income = -1
for i in range(page_index,page_index + 2):
table = pdf.pages[i].extract_table()
try: len(table)
except: continue
ni_mode = '.*?归属于.*?(所有者|股东)?的?.?净?.?利?.?润?.*?'
if i == page_index: #说明此时还没有换页
for j in range(bound + 1,len(table)):
ni = get_data(table[j], ni_mode)
#此时文字和数据错行,需要继续往下搜索
if ni == 'other row':
for k in range(j, len(table)):
for h in range(0,len(table[k])):
if is_fin_number(table[k][h]) == True:
net_income = table[k][h]
return [earnings,net_income]
else:
continue
if ni == 'other row':
return 'data is at the next page'
elif ni != -1:
net_income = ni
break
else:
continue
else: #此时换页
for j in range(0,len(table)):
ni = get_data(table[j], ni_mode)
if ni != -1:
net_income = ni
break
else:
continue
if net_income == -1: continue
else: return [earnings,net_income]
except: print(fname+'出现AssertionError')
'''
import read_data
read_data.get_twin_data('../pdf/洪兴股份:2012年年度报告.PDF')
'''
#该函数需要在pdf目录下查找对应的文件名
def read_all_data(df):
#df为包含两列(code_list和name_list)的dataframe
filename_list = []
year_list = []
data_list = []
for index,row in df.iterrows():
for filepath,dirnames,filenames in os.walk('../pdf'):
for filename in filenames:
#print(filename)
if (row['name_list'] in filename) or (row['code_list'] in filename):
print(filename)
data = get_twin_data(filename)
if data is not None:
filename_list.append(filename)
year_list.append(tool.get_year(filename,row['code_list']))
data_list.append(get_twin_data(filename))
print(filename + ' completed')
rt_list,ni_list = zip(*data_list)
df_data = {'filename':filename_list,'year':year_list,
'营业收入':rt_list,'净利润':ni_list}
df_data = pd.DataFrame(df_data)
return df_data
import os
import re
import pandas as pd
def get_No():
print('程序开始前的注意事项:\n')
print("1.请先下载好相关模块,并在网络通畅的情况下运行本程序;\n")
print("2.本程序的爬虫部分使用的是chrome浏览器,请下载对应的webdriver;\n")
print('3.下载时间较长,请耐心等待;\n')
print('4.由于爬取深交所的代码的下载路径是绝对路径,请先创建如下目录路径:')
print("D:\\CS\\py_fi\\scores_3\\nianbao\\src\\pdf\n")
print('\n\n\n--------程序开始--------\n\n\n')
return int(input("请输入你的序号:"))
def to_wan(num):
return num/10000
def to_yi(num):
return num/100000000
def is_year(string):
if len(string) == 4:
return True
else:
return False
def to_num(string):
if type(string) == type('str'):
string = string.replace(',','')
string = string.replace('\n','')
return float(format(to_yi(float(string)), '.3f'))
else:
return string
def to_year_list(str_list):
for i in range(0,len(str_list)):
str_list[i] = str(str_list[i])
def to_num_list(str_list):
for i in range(0,len(str_list)):
str_list[i] = to_num(str_list[i])
def which_market(code):
if code[0:2] == '60' or code[0:3] == '688' or code[0:3] == '900':
return 'sse'
elif code[0:2] == '00' or code[0:3] == '200' or code[0:2] == '30':
return 'szse'
def clean_pdf():
for filepath,dirnames,filenames in os.walk('../pdf'):
for filename in filenames:
if '取消' in filename:
os.remove('../pdf/'+ filename)
print(filename + '+ deleted')
def get_year_sse(fname):
year = re.search('\d{6}_(\d{4}).*?\.pdf',fname,re.IGNORECASE)
return year.group(1)
def get_year_szse(fname):
year = re.search('.*?(\d{4}).*?\.pdf',fname,re.IGNORECASE)
return year.group(1)
def get_year(fname,code):
m = which_market(code)
if m == 'sse':
return get_year_sse(fname)
elif m == 'szse':
return get_year_szse(fname)
def be_contigious(this_data):
#对于某个公司的绘图数据,我们只取从最近时间到最远时间的连续数据
length = len(this_data)
last = int(this_data['year'][length - 1])
for i in range(length-2, -1, -1):
nxt = int(this_data['year'][i])
if last - nxt != 1:#说明不连续
return this_data.loc[i+1 : length]
else:
continue
return this_data
def seperate_df(df_code,data):
seperated_data = {}
for j,row1 in df_code.iterrows():
name = row1['name_list']
code = row1['code_list']
this_data = pd.DataFrame(columns = ['year','营业收入','净利润'])
for i,row2 in data.iterrows():
fn = row2['filename']
if name in fn or code in fn:
data_dict = {'name': name,
'year': row2['year'],
'营业收入':row2['营业收入'],
'净利润':row2['净利润']}
this_data = this_data.append(data_dict,ignore_index=True)
be_contigious(this_data)
seperated_data[code] = this_data
return seperated_data
import matplotlib.pyplot as plt
import numpy as np
import tool
def draw_pics_twinx(df):
plt.rcParams['figure.dpi'] = 200
plt.rcParams['axes.unicode_minus']=False #用来正常显示负号
plt.rcParams['font.sans-serif'] = ['SimHei'] # 使图片显示中文
x = df['year']
tool.to_year_list(x)
y_rt = df['营业收入']
tool.to_num_list(y_rt)
y_ni = df['净利润']
tool.to_num_list(y_ni)
fig = plt.figure()
ax1 = fig.subplots()
ax1.plot(x, y_rt,'steelblue',label="营业收入",linestyle='-',linewidth=2,
marker='o',markeredgecolor='pink',markersize='2',markeredgewidth=2)
ax1.set_xlabel('年份')
ax1.set_ylabel('营业收入(单位:亿元)')
for i in range(len(x)):
plt.text(x[i],y_rt[i],(y_rt[i]),fontsize = '10')
ax1.legend(loc = 6)
ax2 = ax1.twinx()
ax2.plot(x, y_ni, 'orange',label = "归属于股东的净利润",linestyle='-',linewidth=2,
marker='o',markeredgecolor='teal',markersize='2',markeredgewidth=2)
ax2.set_ylabel('归属于股东的净利润(单位:亿元)')
for i in range(len(x)):
plt.text(x[i],y_ni[i],(y_ni[i]),fontsize = '10')
ax2.legend(loc = 2)
'''
title部分必须放最后,否则会出现左边的y轴有重复刻度的问题
'''
title = df['name'][0] + '历年' + '财务数据'
plt.title(title)
plt.savefig('../pics/' + title + '.png')
plt.show()
从图表中可以看出,服装业、纺织业整体情况向好。过去几年,服装、纺织行业整体收到宏观环境的冲击,波动之下挑战与机遇并存。
报喜鸟、嘉麟杰、星期六等公司早年间营业收入发展较为平缓,近几年抵住市场冲击,发展迅速。报喜鸟品牌主要从事男士西服、衬衫的设计、生产与销售,属于国内高档次西服品牌。 嘉麟杰专注于中高端专业户外运动功能性面料的设计、研发与生产,注重发展绿色环保的先进材料科技。 星期六专注于都市白领丽人的女装制造,风格以年轻和时尚为主。
反观于美邦服饰近年来营业收入却一路下跌,原因是多方面的。从产品本身来说,美邦服饰早年间是走中端产品路线,受到年轻人的喜爱。如今产品的设计风格得不到创新,价格定位不清晰,难以受到消费者的喜爱。 在经营管理层面,美邦服饰也存在着一些漏洞,存货周转率和总资产周转率持续下降,运营效率不足。
随着大众的消费能力和追求品味的提升,未来的服装业、纺织业还面临着较大的发展空间。从以上一些例子中可以窥见未来可持续发展和环保生产将会成为一个新的趋势;另外,想要取得不断的营业增长,需要推成出新,打破传统,设计出更高质量的产品。
这次实验让我感受到python的强大,经过数据爬取与处理后,繁杂的信息变得清晰可视化,为数据分析过程提供了强有力的帮助。对于python,我还有许多需要学习,感谢老师与同学的帮助,在学习的道路上为我答疑解惑,为我前进亮起正确方向的明灯