关于vnpy的日内单品种策略(CtaStrategy模块),vnpy自带的strategies 文件夹里面有很多可以参考。但是涉及多品种、日间的交易的策略,同时又要求高度的灵活性,就需要使用脚本策略交易模块ScriptTrader,这方面的代码网上相对比较难找。
CtaStrategy模块的策略大部分是靠tick驱动,或者bar驱动。但是有些策略(如使用ScriptTrader模块的策略),或者是日间的策略,就必须显示地指定代码在什么时候运行。这个时候一种选择是利用Python中的schedule模块,来指定在早上9点的时候运行。
以下是一个多品种的日间的截面动量策略模板,供大家参考。
import numpy as np
from datetime import date
import datetime
import re
import schedule
import time
from vnpy.trader.utility import load_json
from vnpy.vnpy_scripttrader import init_cli_trading
from vnpy_ctp import CtpGateway
import rqdatac
rqdatac.init()
setting = load_json("connect_ctp.json")
engine = init_cli_trading([CtpGateway])
# 查询所有合约
instruments = [
# 农产品, 白糖,棉花,普麦,强麦,早籼稻,晚籼稻,粳稻,菜籽粕,油菜籽,菜籽油,棉纱,苹果,红枣,玉米,玉米淀粉,
# 黄大豆1号,黄大豆2号,豆粕,豆油,棕榈油,纤维板,胶合板,鸡蛋,粳米,生猪,花生,原木
'SR', 'CF', 'PM', 'WH', 'RI', 'LR', 'JR', 'RM', 'RS','OI', 'CY', 'AP','CJ','C','CS',
'A','B','M','Y','P', 'FB','BB', 'JD','RR','LH','PK','LG',
# 有色金属,铜,铜(BC),铝,锌,铅,镍,锡,工业硅,氧化铝,碳酸锂,多晶硅,铸造铝合金,
'CU','BC', 'AL', 'ZN','PB','NI','SN','SI','AO','LC','PS','AD',
# 黑色金属,螺纹钢,热轧卷板,不锈钢,焦炭,焦煤,铁矿石,硅铁,锰硅,线材
'RB', 'HC','SS','J','JM','I','SF','SM','WR',
# 贵金属,
'AU','AG',
# 能源化工, 原油,低硫燃料油,燃料油,石油沥青,天然橡胶,20号胶,纸浆,动力煤,PTA,甲醇,玻璃,
# 尿素,纯碱,聚乙烯,聚氯乙烯,聚丙烯,乙二醇,苯乙烯,液化石油气,短纤,合成橡胶,对二甲苯,烧碱,瓶片
'SC','LU','FU','BU','RU','NR','SP','ZC','TA','MA', 'FG','UR','SA','L','V','PP','EG','EB','PG','PF',
'BR','PX','SH','PR',
# 金融权益, 沪深300,中证500, 中证1000,上证50,
'IF','IC','IM','IH',
# 金融利率,2年期国债,5年期国债,10年期国债,30年期国债
'TS', 'TF','T','TL'
]
# 合约规模(Contract Size),也称交易单位
f_lots = {
'SR': 10, 'CF': 5, 'PM': 50, 'WH': 20, 'RI': 20, 'LR': 10, 'JR': 20, 'RM': 10, 'RS': 10, 'OI': 10, 'CY': 5,
'AP': 10, 'CJ': 5, 'C': 10, 'CS': 10,
'A': 10, 'B': 10, 'M': 10, 'Y': 10, 'P': 10, 'FB': 500, 'BB': 500, 'JD': 5, 'RR': 10, 'LH': 16, 'PK': 5, 'LG': 90,
'CU': 5, 'BC': 5, 'AL': 5, 'ZN': 5, 'PB': 5, 'NI': 1, 'SN': 1, 'SI': 5, 'AO': 20, 'LC': 1, 'PS': 3, 'AD': 10,
'RB': 10, 'HC': 10, 'SS': 5, 'J': 100, 'JM': 60, 'I': 100, 'SF': 5, 'SM': 5, 'WR': 10,
'AU': 1000, 'AG': 15,
'SC': 1000, 'LU': 10, 'FU': 50, 'BU': 10, 'RU': 10, 'NR': 10, 'SP': 10, 'ZC': 100, 'TA': 5, 'MA': 10, 'FG': 20,
'UR': 20, 'SA': 20, 'L': 5, 'V': 5, 'PP': 5, 'EG': 10, 'EB': 5, 'PG': 20, 'PF': 5,
'BR': 5, 'PX': 5, 'SH': 30, 'PR': 15,
'IF': 300, 'IC': 200, 'IM': 200, 'IH': 300,
'TS': 20000, 'TF': 10000, 'T': 10000, 'TL': 10000
}
margin_rate = 0.15
g = {}
g['ins_num'] = 10
date_today = date.today()
yesterday = date_today - datetime.timedelta(days=1)
s_date = date_today - datetime.timedelta(days=10) # to make sure there are data
def connect_to_server():
engine.connect_gateway(setting, "CTP")
def get_vn_code(code, exchange):
if exchange == 'CZCE':
ins = re.search(r'\D+', code).group()
return ins + code[-3:] + '.' + exchange
else:
return code.lower() + '.' + exchange
def subscribe():
vt_symbols = []
for ins in instruments:
dom_code = rqdatac.futures.get_dominant(ins, start_date=date_today)[0]
df = rqdatac.futures.get_contract_multiplier(ins, start_date=s_date)
vn_dom_code = get_vn_code(dom_code, df.exchange[0])
vt_symbols.append(vn_dom_code)
engine.subscribe(vt_symbols)
print("合约信息订阅成功")
def get_long_short_positions():
positions = engine.get_all_positions(use_df=False)
long_ = []
short_ = []
for p in positions:
if p.volume != 0:
if p.direction.value == '多':
long_.append(p)
else:
short_.append(p)
return long_, short_
def before_market_open():
print('Prepare for trading')
start_d = date_today - datetime.timedelta(days=10)
_long = {}
_short = {}
ins_returns = {}
for ins in instruments:
try:
price_df = rqdatac.futures.get_dominant_price(ins,start_date=start_d,end_date=date_today,frequency='1d',
fields=['close'])
close_prices = price_df.close
r_t = (close_prices[-1] - close_prices[0]) / close_prices[0]
if not np.isnan(r_t):
ins_returns[ins] = float(r_t)
except:
print('escaping, {}'.format(ins))
long_ins = []
short_ins = []
ins_sorted = sorted(ins_returns.items(), key=lambda item: item[1])
mean_rt = np.mean(list(ins_returns.values()))
for i in range(g['ins_num']):
if ins_sorted[-(i + 1)][1] > mean_rt and ins_sorted[-(i + 1)][1] > 0:
long_ins.append(ins_sorted[-(i + 1)][0])
if ins_sorted[i][1] < mean_rt and ins_sorted[i][1] < 0:
short_ins.append(ins_sorted[i][0])
g['long_ins'] = long_ins
g['short_ins'] = short_ins
def trade_sell():
# Sell first
hold_future_l, hold_future_s = get_long_short_positions()
for p in hold_future_l:
h_l = p.symbol + '.' + p.exchange.value
ins_code = re.search(r'\D+', p.symbol).group().upper()
if ins_code not in g['long_ins']:
last_price = engine.get_tick(h_l).last_price
v = p.volume
engine.sell(vt_symbol=h_l, price=last_price, volume=v)
print('以价格:{} 卖出:{}, {}手 平仓'.format(last_price, h_l, v))
for p in hold_future_s:
s_l = p.symbol + '.' + p.exchange.value
ins_code = re.search(r'\D+', p.symbol).group().upper()
if ins_code not in g['short_ins']:
last_price = engine.get_tick(s_l).last_price
v = p.volume
engine.cover(vt_symbol=s_l, price=last_price, volume=v)
print('以价格:{} 买入:{}, {}手 平仓'.format(last_price, s_l, v))
print('Finished Selling')
def get_volume_from_value(value, code, price):
return int(value / (f_lots[code] * price * margin_rate))
def get_codes_from_p(positions):
codes = []
for p in positions:
codes.append(p.symbol)
return codes
def trade_buy():
hold_future_l, hold_future_s = get_long_short_positions()
accounts = engine.get_all_accounts(use_df=False)
available_cash = accounts[0].available
max_cash = accounts[0].balance / (g['ins_num'] * 2) * 0.6
value = available_cash / (g['ins_num'] * 2 - len(hold_future_s) - len(hold_future_l))
value = min(max_cash, value)
num_long_buy = g['ins_num'] - len(hold_future_l)
num_short_buy = g['ins_num'] - len(hold_future_s)
count = 0
for ins in g['long_ins']:
dom_code = rqdatac.futures.get_dominant(ins, start_date=date_today)[0]
df = rqdatac.futures.get_contract_multiplier(ins, start_date=s_date)
vn_dom_code = get_vn_code(dom_code, df.exchange[0])
try:
if vn_dom_code not in get_codes_from_p(hold_future_l):
last_price = engine.get_tick(vn_dom_code).last_price
v = get_volume_from_value(value, ins, last_price)
engine.buy(vn_dom_code, price=last_price, volume=v)
count += 1
if count >= num_long_buy:
break
except:
print("Something wrong buying {}".format(ins))
count = 0
for ins in g['short_ins']:
dom_code = rqdatac.futures.get_dominant(ins, start_date=date_today)[0]
df = rqdatac.futures.get_contract_multiplier(ins, start_date=s_date)
vn_dom_code = get_vn_code(dom_code, df.exchange[0])
try:
if vn_dom_code not in get_codes_from_p(hold_future_s):
last_price = engine.get_tick(vn_dom_code).last_price
v = get_volume_from_value(value, ins, last_price)
engine.short(vn_dom_code, price=last_price, volume=v)
count += 1
if count >= num_short_buy:
break
except:
print("Something wrong shorting {}".format(ins))
if __name__ == '__main__':
#before_market_open()
schedule.every().day.at("22:02").do(before_market_open)
schedule.every().day.at("22:03").do(connect_to_server)
schedule.every().day.at("22:04").do(subscribe)
schedule.every().day.at("22:05").do(trade_sell)
schedule.every().day.at("22:06").do(trade_buy)
while True:
schedule.run_pending()
time.sleep(1)