对之前的小市值策略修正了几个Bug,并增加了RSRS-MV择时。同时最近又发现了QMT的如下几个问题:
-
有时候会出现pandas库找不到等问题,重启一下QMT即可。
-
似乎对于get_market_data_ex()等函数,即便事先不先下载数据,也会先自动去下载,所以不下载数据也能跑。
-
QMT似乎不适合做回测。上篇,已经提到过,QMT在回测的时候,一些函数不会动态地修改“今天”,因此,如果某个函数有end_time参数,默认的话都是返回当前的时间节点,并不会在回测模式下动态的进行一个调整。因此,在回测时候,凡是有end_time参数的,我都要根据ContextInfo.barpos 来获取时间,然后特别的指定end_time。但是对于一些不接受end_time参数的函数,如get_instrumentdetail(),我们就无从知晓其返回的是当前信息呢,还是回测过程中当时的信息。我在迅投文档上面也找不到相关信息,从这个角度来说,QMT似乎不怎么适合做回测。
以下是我使用的一个成长小市值策略回测版本的代码,加了RSRS-MV择时,修复了之前的几个bug。如果你有其他问题,也可以联系我
#coding:gbk
"""
成长小市值RSRS-MV择时策略-回测
"""
import pandas as pd
import numpy as np
import time
import statsmodels.api as sm
from datetime import timedelta, date
class A():
pass
g = A() #创建空的类的实例 用来保存委托状态
def init(ContextInfo):
g.acct = 'testS'
ContextInfo.set_account(g.acct)
g.acct_type = 'STOCK'
g.buy_code = 23
g.sell_code = 24
g.buy_stock_count = 5
g.ipo_days = 180
g.N = 18
g.M = 600
g.ref_stock = '000300.SH' # 沪深300
g.score_threshold = 0.7
g.R2_l = []
g.beta_l = []
g.rsrs = 'KEEP'
today = date.today()
yesterday = today - timedelta(days=1)
yesterday = yesterday.strftime('%Y%m%d')
g.yesterday = yesterday
initial_slope_series(ContextInfo)
# print('In init, now', end_time)
# ContextInfo.run_time('trade', '30nSecond', '2013-12-25 14:30:00')
def initial_slope_series(ContextInfo):
HS300 = ContextInfo.get_market_data_ex(['high', 'low'], \
[g.ref_stock],period='1d',end_time=g.yesterday, count = g.N + g.M)[g.ref_stock]
for i in range(g.M):
model = sm.OLS(HS300.high[i:i+g.N], sm.add_constant(HS300.low[i:i+g.N]))
result = model.fit()
g.beta_l.append(result.params[1])
g.R2_l.append(result.rsquared)
g.beta_l = g.beta_l[:-1]
g.R2_l = g.R2_l[:-1]
# 只看RSRS因子值作为买入、持有和清仓依据,前版本还加入了移动均线的上行作为条件
def get_rsrs_signal(ContextInfo):
close_data = ContextInfo.get_market_data_ex(['close'], \
[g.ref_stock],period='1d', end_time=g.yesterday, count = 20+2)[g.ref_stock]
HS300 = ContextInfo.get_market_data_ex(['high', 'low'], \
[g.ref_stock],period='1d', end_time=g.yesterday, count = g.N)[g.ref_stock]
HS300 = HS300.dropna()
model = sm.OLS(HS300.high, sm.add_constant(HS300.low))
result = model.fit()
beta = result.params[1]
g.beta_l.append(beta)
r2 = result.rsquared
g.R2_l.append(r2)
section = g.beta_l[-g.M:]
# 计算均值序列
mu = np.mean(section)
# 计算标准化RSRS指标序列
sigma = np.std(section)
beta_norm = (section[-1]-mu)/sigma
# beta_right= beta_norm*beta*r2
beta_right= beta_norm*r2
if beta_right < -g.score_threshold:
return "SELL"
elif beta_right > g.score_threshold and \
(np.mean(close_data['close'][-20:]) > np.mean(close_data['close'][-22:-2])):
return "BUY"
else:
return "KEEP"
def trade(ContextInfo):
realtime = ContextInfo.get_bar_timetag(ContextInfo.barpos)
now = timetag_to_datetime(realtime,'%H%M%S')
nowDate = timetag_to_datetime(realtime,'%Y%m%d %H:%M:%S')
# 跳过非交易时间
if now < '093000' or now > "150000":
return
account = get_trade_detail_data(g.acct, g.acct_type, 'account')
holdings = get_trade_detail_data(g.acct, g.acct_type, 'position')
g.holdings = {i.m_strInstrumentID + '.' + i.m_strExchangeID : i.m_nCanUseVolume for i in holdings}
if len(account)==0:
print(f'账号{g.acct} 未登录 请检查')
return
if '111000' >= now >= '105000':
# get dates
today = timetag_to_datetime(realtime,'%Y%m%d')
yesterday = realtime - 24 * 3600 * 1000
yesterday = timetag_to_datetime(yesterday,'%Y%m%d')
start_date = realtime - 30 * 24 * 3600 * 1000
start_date = timetag_to_datetime(start_date ,'%Y%m%d')
g.today = today
g.yesterday = yesterday
g.start_date = start_date
rsrs = get_rsrs_signal(ContextInfo)
g.rsrs = rsrs
print(nowDate, rsrs)
if rsrs == 'SELL' and len(g.holdings) > 0:
print(nowDate, 'RSRS SELL, 全部卖出')
for s in g.holdings.keys():
if not ContextInfo.is_suspended_stock(s):
passorder(g.sell_code, 1101, g.acct, s, 5, -1, g.holdings[s], 2, ContextInfo)
if '141000' >= now >= '135000':
if g.rsrs == 'SELL':
return
g.stock_pool = ContextInfo.get_stock_list_in_sector('沪深A股')
g.stock_list = prepare_stock_list(ContextInfo)
stocks_to_sell = [s for s in g.holdings.keys() if s not in g.stock_list]
for s in stocks_to_sell:
msg = f"小市值 {s} 卖出 {g.holdings[s]/100} 手"
print(nowDate, msg)
if not ContextInfo.is_suspended_stock(s):
passorder(g.sell_code, 1101, g.acct, s, 5, -1,g.holdings[s], 2, ContextInfo)
if '144000' >= now >= '142000':
if g.rsrs == 'SELL':
return
# 获取可用资金
print('可用资金', get_total_value(g.acct,'STOCK'))
stocks_to_buy = []
num_stocks_to_buy = g.buy_stock_count - len(g.holdings)
for s in g.stock_list:
if s not in g.holdings.keys():
stocks_to_buy.append(s)
if len(stocks_to_buy) >= num_stocks_to_buy:
break
g.stocks_to_buy = stocks_to_buy
if len(g.stocks_to_buy) > 0:
value = get_total_value(g.acct,'STOCK') / len(g.stocks_to_buy)
for s in g.stocks_to_buy: # 立即以最新价格下单
latest_price = ContextInfo.get_market_data_ex(['close'], \
[s],period='30m', end_time=g.today, count = 1)[s]['close'][0]
vol = value // (latest_price *100 * 1.01) # * 1.01 防止钱不够
msg = f"小市值 {s} 买入 {vol}手"
print(nowDate, msg)
passorder(g.buy_code, 1101, g.acct, s, 5, -1, vol*100, 2, ContextInfo)
else:
print(nowDate, '无需换仓')
def handlebar(ContextInfo):
trade(ContextInfo)
def prepare_stock_list(ContextInfo):
'''
选股模块,根据因子选出预持有的股票
'''
stock_pool = filter_new_stock(ContextInfo, g.stock_pool)
# 过滤科创板
stock_pool = filter_kcb_stock(ContextInfo, stock_pool)
stock_pool = filter_st_stock(ContextInfo,stock_pool)
#净利润增长率前10%
stock_pool = get_factor_filter_list(ContextInfo, stock_pool, 'PERSHAREINDEX',
'du_profit_rate', asc=False, p=0.1)
#PEG 前50%
stock_pool = get_peg_filter_list(ContextInfo, stock_pool, p=0.5)
# roe > 0.2%
stock_pool = get_factor_filter_list_ex(ContextInfo, stock_pool, 'PERSHAREINDEX',\
'du_return_on_equity', t=0.2)
# 净利润大于20000000
stock_pool = get_factor_filter_list_ex(ContextInfo, stock_pool, 'ASHAREINCOME',\
'net_profit_incl_min_int_inc', t=20000000)
# 营业收入同比增长>0
stock_pool = get_factor_filter_list_ex(ContextInfo, stock_pool, 'PERSHAREINDEX',\
'inc_revenue_rate', t=0)
# 获取小市值股票
scores = {}
price_data = ContextInfo.get_market_data_ex(['close'],stock_pool,period='1d',end_time=g.yesterday, count=1)
fieldList = ['CAPITALSTRUCTURE.total_capital']
share_data = ContextInfo.get_financial_data(fieldList, stock_pool, g.start_date, \
g.yesterday, report_type = 'report_time')
for s in stock_pool:
try:
scores[s] = price_data[s]['close'][0] * share_data[s]['total_capital'][0]
except:
print('Can not get mkt info for stock {}, skip'.format(s))
if len(scores) > 0:
df = pd.DataFrame.from_dict(scores, orient='index')
df.columns = ['cap']
df = df.sort_values(by=['cap'])
stock_pool = df.index.values.tolist()[:30]
# 过滤涨跌停, !! 这个过滤要放在最后,股票太多,调用get_market_data_ex 频率受限
stock_pool = filter_limitup_stock(ContextInfo,stock_pool)
stock_pool = filter_limitdown_stock(ContextInfo,stock_pool)
# 过滤停牌
stock_pool = filter_paused_stock(ContextInfo,stock_pool)
return stock_pool[:g.buy_stock_count]
else:
return []
def get_total_value(account_id,datatype):#(账号,账户类型)
'''
获取账户当前可用现金
'''
result = 0
result_list = get_trade_detail_data(account_id,datatype,'ACCOUNT')
for obj in result_list:
result = obj.m_dAvailable
return result
def get_peg_filter_list(ContextInfo, stock_list, p= 0.5):
score_list = []
filter_stocks = []
data = ContextInfo.get_financial_data(['PERSHAREINDEX.s_fa_eps_basic', 'PERSHAREINDEX.du_profit_rate'], stock_list, g.start_date,
g.yesterday, report_type = 'report_time')
price = ContextInfo.get_market_data_ex(['close'],stock_list,period='1d',end_time=g.yesterday, count = 1)
for s in stock_list:
try:
score_list.append(price[s]['close'][0] * data[s]['du_profit_rate'][0] / data[s]['s_fa_eps_basic'][0])
filter_stocks.append(s)
except:
print('Caculate PEG Error for stock {}, skip'.format(s))
df = pd.DataFrame(columns=['code','score'])
df['code'] = filter_stocks
df['score'] = score_list
df = df.dropna()
df = df[df['score']>0]
df.sort_values(by='score', ascending=True, inplace=True)
filter_list = list(df.code)[0:int(p*len(stock_list))]
return filter_list
def get_factor_filter_list(ContextInfo, stock_list, table, field, asc=True,p=0.3):
score_list = []
data = ContextInfo.get_financial_data([table + '.' + field], stock_list, g.start_date,
g.yesterday, report_type = 'report_time')
for s in stock_list:
score_list.append(data[s][field][0])
df = pd.DataFrame(columns=['code','score'])
df['code'] = stock_list
df['score'] = score_list
df = df.dropna()
df = df[df['score']>0]
df.sort_values(by='score', ascending=asc, inplace=True)
filter_list = list(df.code)[0:int(p*len(stock_list))]
return filter_list
def get_factor_filter_list_ex(ContextInfo, stock_list, table, field, t=0):
score_list = []
data = ContextInfo.get_financial_data([table + '.' + field], stock_list, g.start_date,
g.yesterday, report_type = 'report_time')
for s in stock_list:
score_list.append(data[s][field][0])
df = pd.DataFrame(columns=['code','score'])
df['code'] = stock_list
df['score'] = score_list
df = df.dropna()
df = df[df['score']>t]
filter_list = list(df.code)
return filter_list
#2-1 过滤停牌股票
def filter_paused_stock(ContextInfo,stock_list):
return [stock for stock in stock_list if not ContextInfo.is_suspended_stock(stock, 1)]
def filter_new_stock(ContextInfo, stock_list):
realtime = ContextInfo.get_bar_timetag(ContextInfo.barpos)
startDate = realtime - g.ipo_days * 24 * 3600 * 1000
startDate = int(timetag_to_datetime(startDate,'%Y%m%d'))
return [stock for stock in stock_list if ContextInfo.get_instrumentdetail(stock)['OpenDate'] < startDate]
#2-2 过滤ST及其他具有退市标签的股票
def filter_st_stock(ContextInfo,stock_list):
return [stock for stock in stock_list
if 'ST' not in ContextInfo.get_stock_name(stock)
and '*' not in ContextInfo.get_stock_name(stock)
and '退' not in ContextInfo.get_stock_name(stock)
and ContextInfo.get_instrumentdetail(stock)['InstrumentID'] is not None]
# 过滤掉科创板
def filter_kcb_stock(ContextInfo, stock_list):
return [stock for stock in stock_list if not stock.startswith('688')]
# 过滤涨停的股票
def filter_limitup_stock(ContextInfo, stock_list):
# 已存在于持仓的股票即使涨停也不过滤,避免此股票再次可买,但因被过滤而导致选择别的股票
filter_list = []
price_data = ContextInfo.get_market_data_ex(['close'],stock_list,period='30m', end_time=g.today, count=1)
limitup_price = ContextInfo.get_market_data_ex(['close'],stock_list,period='1d', end_time=g.yesterday, count=1)
for stock in stock_list:
if stock in g.holdings.keys():
filter_list.append(stock)
continue
if price_data[stock]['close'][0] < limitup_price[stock]['close'][0] * 1.1:
filter_list.append(stock)
return filter_list
# 过滤跌停股票
def filter_limitdown_stock(ContextInfo,stock_list):
filter_list = []
price_data = ContextInfo.get_market_data_ex(['close'],stock_list,period='30m', end_time=g.today, count=1)
limitdown_price = ContextInfo.get_market_data_ex(['close'],stock_list,period='1d', end_time=g.yesterday, count=1)
for stock in stock_list:
if price_data[stock]['close'][0] > limitdown_price[stock]['close'][0] * 0.9:
filter_list.append(stock)
return filter_list