[color=rgba(0, 0, 0, 0.75)]
[url=]funcat这个包有一部分功能,但大部分通达信函数是不支持的,[/url]
甚至有一些通达信用多维向量计算的方式,funcat只做成一维数据的循环
我自己对此有需求,所以实现了一部分代码
为什么设VIP呢,毕竟是我自己设计的,包括矩阵计算、向量循环,这些我的代码计算速度至少是比别快一些的,
其次,看到这有需求的朋友,
你如果是有通达信公式需要在python里回测的,可以直接留言联系我,我有一整套优化方案,
形态识别的,AI可以实现形态的缩放,还有因子阈值的匹配度调整,而且可以成功提高胜率(我自己的已经实现 - def REF(tp1, n):
- i = 0
- ZB_l = []
- y = 0
- while i < n:
- y=list(tp1)[i]
- ZB_l.append(y)
- i=i+1
- while i < len(tp1):
- y=list(tp1)[i-n]
- ZB_l.append(y)
- i = i + 1
- # ZB_s = pd.Series(ZB_l)
- return ZB_l#[-1]s
- def zig(k,x):
- #x = 0.055
- # k = CLOSE.tolist()
- #d = df["trade_date"]
- # d = df['date'].tolist()
- # 循环前的变量初始化
- # 端点 候选点 扫描点 端点列表 拐点线列表 趋势状态
- peer_i = 0
- candidate_i = None
- scan_i = 0
- peers = [0]
- # z = np.zeros(len(k))
- state = ZIG_STATE_START
- while True:
- #print(peers)
- scan_i += 1
- if scan_i == len(k) - 1:
- # 扫描到尾部
- if candidate_i is None:
- peer_i = scan_i
- peers.append(peer_i)
- else:
- if state == ZIG_STATE_RISE:
- if k[scan_i] >= k[candidate_i]:
- peer_i = scan_i
- peers.append(peer_i)
- else:
- peer_i = candidate_i
- peers.append(peer_i)
- peer_i = scan_i
- peers.append(peer_i)
- elif state == ZIG_STATE_FALL:
- if k[scan_i] <= k[candidate_i]:
- peer_i = scan_i
- peers.append(peer_i)
- else:
- peer_i = candidate_i
- peers.append(peer_i)
- peer_i = scan_i
- peers.append(peer_i)
- break
-
- if state == ZIG_STATE_START:
- if k[scan_i] >= k[peer_i] * (1 + x):
- candidate_i = scan_i
- state = ZIG_STATE_RISE
- elif k[scan_i] <= k[peer_i] * (1 - x):
- candidate_i = scan_i
- state = ZIG_STATE_FALL
- elif state == ZIG_STATE_RISE:
- if k[scan_i] >= k[candidate_i]:
- candidate_i = scan_i
- elif k[scan_i] <= k[candidate_i]*(1-x):
- peer_i = candidate_i
- peers.append(peer_i)
- state = ZIG_STATE_FALL
- candidate_i = scan_i
- elif state == ZIG_STATE_FALL:
- if k[scan_i] <= k[candidate_i]:
- candidate_i = scan_i
- elif k[scan_i] >= k[candidate_i]*(1+x):
- peer_i = candidate_i
- peers.append(peer_i)
- state = ZIG_STATE_RISE
- candidate_i = scan_i
-
- #线性插值, 计算出zig的值
- # for i in range(len(peers) - 1):
- # peer_start_i = peers[i]
- # peer_end_i = peers[i+1]
- # start_value = k[peer_start_i]
- # end_value = k[peer_end_i]
- # a = (end_value - start_value)/(peer_end_i - peer_start_i)# 斜率
- # for j in range(peer_end_i - peer_start_i +1):
- # z[j + peer_start_i] = start_value + a*j#z是zig线
- return [k[i] for i in peers]#,[d[i] for i in peers]
-
- def LLV(s, n):
- return s.rolling(n).min()
- def EMA(s, n):
- return s.rolling(n).ema()
- def HHV(s, n):
- if type(n)==int:
- return s.rolling(n).max()
- else:#当n不是单个值而是一个序列
- result = [0]*len(s)
- n=list(n)
- for i in range(1,len(s)):
- s_temp = s[0:i]
- result[i] = s_temp[-int(n[i]):].max()#过去n天
- return result
- def CROSS(cond1, cond2):
- '''x1上穿x2'''
- return np.where(eval(cond1)>eval(cond2),1,0).tolist()
- def barslast(df):
- lst=list(df)
- if sum(lst)>0:
- first_=lst.index(1)#出现1的所有位置
- bar_slast=[]
- for i in range(first_):
- bar_slast.append(np.nan)
- for i in range(first_,len(lst)):#出现1后往后计数
- if lst[i]==1:
- count_=0
- bar_slast.append(0)
- else:
- count_+=1
- bar_slast.append(count_)
- return bar_slast
-
- # @nb.jit
- def COUNT(cond, n):
- # TODO lazy compute
- series = cond
- size = len(cond) - n
- try:
- result = np.full(size, 0, dtype=np.int)
- except :
- pass
- for i in range(size - 1, 0, -1):
- s = series[-n:]
- result[i] = len(s[s == True])
- series = series[:-1]
- return result
- # @nb.jit
- def COUNT_(cond, n):
- if type(n)!=int:
- #两列序列一一 正序对应
- result = [0]*len(cond)
- cond=list(cond)
- n=list(n)
- for i in range(1,len(cond)):
- cond_temp = cond[0:i]
- cond_n_ture=cond_temp[-n[i]:]#过去n天
- result[i] = cond_n_ture[cond_n_ture == True]
- return np.array(result)
复制代码 |