[docs]@njit(nogil=True,parallel=True)defroc(price:NDArray,period:int)->NDArray:""" Calculate the Rate of Change (ROC) feature. :param price: np.array, an array of prices. :param period: int, the period over which to calculate ROC. :return: np.array, ROC values. """n=len(price)roc=np.empty_like(price)roc[:period]=np.nan# First 'period' values are NaNforiinrange(period,n):roc[i]=((price[i]-price[i-period])/price[i-period])*100returnroc
[docs]@njit(nogil=True)defrsi_wilder(close:NDArray[np.float64],window:int)->NDArray[np.float64]:""" Calculate the Relative Strength Index (RSI) using Wilder's smoothing method. :param close: A one-dimensional numpy array of closing prices. :param window: The number of periods to use for the RSI calculation, default is 14. :return: A one-dimensional numpy array of RSI values, same length as `close`. """n=close.sizeout=np.empty(n,np.float64)out[:window]=np.nanifn<=window:returnout# not enough data# ----- first window (indices 1 … window) -----sum_gain=0.0sum_loss=0.0foriinrange(1,window+1):diff=close[i]-close[i-1]ifdiff>0.0:sum_gain+=diffelse:sum_loss+=-diffavg_gain=sum_gain/windowavg_loss=sum_loss/windowout[window]=100.0-100.0/(1.0+avg_gain/avg_loss)ifavg_loss>0elsenp.nan# ----- Wilder smoothing -----foriinrange(window+1,n):diff=close[i]-close[i-1]gain=diffifdiff>0.0else0.0loss=-diffifdiff<0.0else0.0avg_gain=((window-1)*avg_gain+gain)/windowavg_loss=((window-1)*avg_loss+loss)/windowout[i]=100.0-100.0/(1.0+avg_gain/avg_loss)ifavg_loss>0elsenp.nanreturnout
[docs]@njit(nogil=True)defstoch_k(close:NDArray[np.float64],low:NDArray[np.float64],high:NDArray[np.float64],length:int)->NDArray[np.float64]:""" Calculate the Stochastic Oscillator %K value. :param close: A one-dimensional numpy array of closing prices :param low: A one-dimensional numpy array of low prices :param high: A one-dimensional numpy array of high prices :param length: The lookback period for the stochastic calculation :return: A one-dimensional numpy array of %K values, same length as input arrays """n=close.sizeout=np.empty(n,np.float64)out[:length-1]=np.nanifn<length:returnout# initial windowlo=np.min(low[:length])hi=np.max(high[:length])out[length-1]=100.0*(close[length-1]-lo)/(hi-lo)ifhi>loelsenp.nan# rolling updatefortinrange(length,n):# update window min/max efficiently# if the outgoing bar was the min/max we need full recompute;# otherwise only compare the new bar.out_idx=t-lengthiflow[out_idx]==loorhigh[out_idx]==hi:lo=np.min(low[t-length+1:t+1])hi=np.max(high[t-length+1:t+1])else:lo=min(lo,low[t])hi=max(hi,high[t])out[t]=100.0*(close[t]-lo)/(hi-lo)ifhi>loelsenp.nanreturnout