Source code for finmlkit.bar.kit

import numpy as np
from typing import Dict, Tuple, Any
from numpy.typing import NDArray
from .base import BarBuilderBase
from .logic import _time_bar_indexer, _tick_bar_indexer, _volume_bar_indexer, _dollar_bar_indexer, _cusum_bar_indexer, _imbalance_bar_indexer, _run_bar_indexer
from finmlkit.utils.log import get_logger
from .data_model import TradesData
import pandas as pd
logger = get_logger(__name__)


[docs] class TimeBarKit(BarBuilderBase): """ Time bar builder class. """ def __init__(self,trades: TradesData, period: pd.Timedelta): """ Initialize the time bar builder with raw trades data and time interval. :param trades: DataFrame of raw trades with 'timestamp', 'price', and 'amount'. :param period: The time interval of a bar. """ super().__init__(trades) self.interval = period.total_seconds() logger.info(f"Time bar builder initialized with interval: {self.interval} seconds.") def _comp_bar_close(self) -> Tuple[NDArray[np.int64], NDArray[np.int64]]: """ Generate time bar indices using the time bar indexer. :returns: Close timestamps and corresponding Close indices in the raw trades data. """ timestamps = self.trades_df['timestamp'].astype(np.int64).values return _time_bar_indexer(timestamps, self.interval)
[docs] class TickBarKit(BarBuilderBase): """ Tick bar builder class. """ def __init__(self, trades: TradesData, tick_count_thrs: int): """ Initialize the tick bar builder with raw trades data and tick count. :param trades: DataFrame of raw trades with 'timestamp', 'price', and 'amount'. :param tick_count_thrs: Tick count threshold for the tick bar. """ super().__init__(trades) self.tick_count_thrs = tick_count_thrs logger.info(f"Tick bar builder initialized with tick count: {tick_count_thrs}.") def _comp_bar_close(self) -> Tuple[NDArray[np.int64], NDArray[np.int64]]: """ Generate tick bar indices using the tick bar indexer. :returns: Close timestamps and corresponding close indices in the raw trades data. """ timestamps = self.trades_df['timestamp'].astype(np.int64).values close_indices = _tick_bar_indexer(timestamps, self.tick_count_thrs) close_indices = np.array(close_indices, dtype=np.int64) close_ts = timestamps[close_indices] return close_ts, close_indices
[docs] class VolumeBarKit(BarBuilderBase): """ Volume bar builder class. """ def __init__(self, trades: TradesData, volume_ths: float): """ Initialize the volume bar builder with raw trades data and volume. :param trades: DataFrame of raw trades with 'timestamp', 'price', and 'amount'. :param volume_ths: Volume Bucket threshold for the volume bar. """ super().__init__(trades) self.volume_ths = volume_ths logger.info(f"Volume bar builder initialized with volume: {volume_ths}.") def _comp_bar_close(self) -> Tuple[NDArray[np.int64], NDArray[np.int64]]: """ Generate volume bar indices using the volume bar indexer. :returns: Close timestamps and corresponding close indices in the raw trades data. """ timestamps = self.trades_df['timestamp'].astype(np.int64).values volumes = self.trades_df['amount'].values close_indices = _volume_bar_indexer(volumes, self.volume_ths) close_indices = np.array(close_indices, dtype=np.int64) close_ts = timestamps[close_indices] return close_ts, close_indices
[docs] class DollarBarKit(BarBuilderBase): """ Dollar bar builder class. """ def __init__(self, trades: TradesData, dollar_thrs: float): """ Initialize the dollar bar builder with raw trades data and dollar amount. :param trades: DataFrame of raw trades with 'timestamp', 'price', and 'amount'. :param dollar_thrs: Dollar amount threshold for the dollar bar. """ super().__init__(trades) self.dollar_thrs = dollar_thrs logger.info(f"Dollar bar builder initialized with dollar amount: {dollar_thrs}.") def _comp_bar_close(self) -> Tuple[NDArray[np.int64], NDArray[np.int64]]: """ Generate dollar bar indices using the dollar bar indexer. :returns: Close timestamps and corresponding close indices in the raw trades data. """ timestamps = self.trades_df['timestamp'].astype(np.int64).values prices = self.trades_df['price'].values volumes = self.trades_df['amount'].values close_indices = _dollar_bar_indexer(prices, volumes, self.dollar_thrs) close_indices = np.array(close_indices, dtype=np.int64) close_ts = timestamps[close_indices] return close_ts, close_indices
[docs] class CUSUMBarKit(BarBuilderBase): def __init__(self, trades: TradesData, sigma: NDArray[np.float64], sigma_floor: float = 5e-4, sigma_mult: float = 2. ): """ Initialize the CUSUM bar builder with raw trades data and threshold. :param trades: DataFrame of raw trades with 'timestamp', 'price', and 'amount'. :param sigma: Standard deviation vector of the price series or a constant value for all ticks. :param sigma_floor: Minimum value for sigma to avoid small events. :param sigma_mult: the sigma multiplier for adaptive threshold (lambda_th = lambda_mult * sigma). """ super().__init__(trades) self.lambda_mult = sigma_mult self._sigma = sigma self.sigma_floor = sigma_floor logger.info(f"CUSUM Bar builder initialized with: sigma multiplier={sigma_mult}.") def _comp_bar_close(self) -> Tuple[NDArray[np.int64], NDArray[np.int64]]: """ Generate CUSUM bar indices using the CUSUM bar indexer. :returns: Open timestamps and corresponding open indices in the raw trades data. """ timestamps = self.trades_df['timestamp'].astype(np.int64).values prices = self.trades_df['price'].values close_indices = _cusum_bar_indexer(timestamps, prices, self._sigma, self.sigma_floor, self.lambda_mult) close_indices = np.array(close_indices, dtype=np.int64) close_ts = timestamps[close_indices] return close_ts, close_indices
[docs] def get_sigma(self) -> NDArray[np.float64]: """ The sigma threshold used for the CUSUM at close indices. :return: sigma vector """ return self._sigma[self.bar_close_indices]