目标:在中国的股票市场上盈利,每周都有单个股票盈利2%,月总盈利超过2%
计划实现方式:Pycharm + Anaconda3 + Python3 + Django + AKShare + MongoDB
目前采用的实现方式:Pycharm + Anaconda3 + Python3 + Flask + AKShare
以后可能会用到 :MongoDB , SQLAlchemy ,baostock ,Tushare
机器学习 会在以后的实践中逐步用到。
实现方式
上一篇文章写了采集的方法。本篇文章包含完整代码和调用代码。
采用后台执行的方式。
gupiao.py 如下:
import akshare as ak
import threading
import datetime
import os
from threading import Thread
def get_start():
start_stock_daily()
# 这里就是核心了,调用这部分就会自动下载 深圳A股 的所有股票的历史记录
def start_stock_daily(indicator="A股列表", folder="sz_a", prefix="sz"):
file_path = "D:/work/data/" + folder + "/"
file_path_name = get_sz_a(file_path, indicator)
print(file_path_name)
num = 0
with open(file_path_name, "r", encoding='UTF-8') as stock_lines:
for stock_line in stock_lines.readlines():
num = num + 1
if num == 1:
continue
stock_line_arr = stock_line.split("|")
symbol = prefix + stock_line_arr[5]
print("股票信息=" + symbol + "||" + stock_line_arr[6])
stock_csv = get_stock_daily(file_path, symbol)
print("stock_csv=" + stock_csv)
# 获得深圳主板A股列表,每天获取一次不重复获取
# file_path 需要全路径,以 | 进行间隔
# indicator 可选参数 "A股列表", "B股列表", "AB股列表", "上市公司列表", "主板", "中小企业板", "创业板"
def get_sz_a(file_path, indicator="A股列表"):
today = datetime.datetime.today()
file_name = "sz_a_" + today.strftime('%Y%m%d') + ".csv"
if not os.path.exists(file_path): # 如果路径不存在则创建
os.makedirs(file_path)
if os.path.exists(file_path + file_name):
print("今日已经获取无需再次获取," + today.strftime('%Y%m%d'))
return file_path + file_name
stock_info_sz_df = ak.stock_info_sz_name_code(indicator=indicator)
stock_info_sz_df.to_csv(file_path + file_name, sep="|")
print('获取深圳主板A股列表并存储为CSV!' + today.strftime('%Y%m%d'))
return file_path + file_name
# 根据股票代码获取股票历史数据
# symbol 股票代码 需要前缀 sh 上海 sz 深圳,例如:sz300846
def get_stock_daily(file_path, symbol):
stock_zh_a_daily_hfq_df = ak.stock_zh_a_daily(symbol=symbol) # 返回不复权的数据
file_name = symbol + '.csv'
stock_zh_a_daily_hfq_df.to_csv(file_path + file_name)
return file_path + file_name
调用下载的部分,注意命名我随便写的,请根据情况自己修改,App.py 如下:
from flask import Flask
import akshare as ak
import gupiao
import datetime
import os
from concurrent.futures import ThreadPoolExecutor
import time
executor = ThreadPoolExecutor(2)
app = Flask(__name__)
@app.route('/test_thread')
def test_thread():
executor.submit(gupiao.get_start)
return "thread is running at background !!!"
if __name__ == '__main__':
app.run()
使用 Flask 框架,生成一个项目,然后创建一个gupiao.py 在 app.py 中调用,然后运行项目。
在浏览器里面访问 http://127.0.0.1:5000/test_thread
就能在后台看到如图的画面,整个深圳A股的下载时间大约在2个小时到3个小时。
股票历史数据
下载到本地如图
股票历史数据
爬取数据部分就完成了,之后就是筛选了。