"""
This module contains the functions to build candlestick bar and other intra-bar features
(i.e. directional features, footprints) from raw trades data using the indexer functions outputs
defined in the logic module.
"""
import numpy as np
from numba import njit
from numba import prange
from typing import Tuple, Optional
import pandas as pd
from numba.typed import List as NumbaList
from abc import ABC, abstractmethod
from numpy.typing import NDArray
import io
from .data_model import FootprintData
from .utils import comp_price_tick_size
from .data_model import TradesData
from finmlkit.utils.log import get_logger
logger = get_logger(__name__)
[docs]
class BarBuilderBase(ABC):
r"""Abstract base class for building various types of bars (e.g., time, tick, volume, or information based bars) from raw trades data.
This class serves as a template for subclasses that implement specific bar sampling strategies, enabling the transformation
of high-frequency trade data into structured bar features suitable for financial analysis and machine learning.
In financial machine learning, raw trade data (ticks) is often aggregated into bars to reduce noise, capture market dynamics,
and create features for modeling. This builder computes standard OHLCV (Open, High, Low, Close, Volume) bars, directional features
(e.g., buy/sell volumes), trade size metrics, and footprint data (order flow imbalances at price levels). It is inspired by
techniques from Marcos López de Prado's work on sampling methods to address issues like uneven information arrival rates
in high-frequency trading data.
Subclasses must implement the abstract method :meth:`_comp_bar_close` to define how bar close timestamps and indices are determined
(e.g., based on time intervals, tick counts, or volume thresholds). The builder uses these indices to aggregate trades efficiently
via Numba and Pandas, ensuring performance for large datasets.
Key functionalities include:
- :meth:`build_ohlcv`: Computes OHLCV, VWAP (Volume-Weighted Average Price), trade count, and median trade size.
- :meth:`build_directional_features`: Calculates buy/sell splits for ticks, volume, dollar value, spreads, and cumulative metrics,
revealing order flow directionality and market pressure.
- :meth:`build_trade_size_features`: Analyzes relative trade sizes, 95th percentile sizes, block trade percentages, and Gini coefficients
for trade size distribution, useful for detecting large orders or market concentration.
- :meth:`build_footprints`: Generates detailed footprint data, discretizing price levels to compute volumes, ticks, imbalances,
and metrics like volume profile skew and Gini, aiding in order flow and volume profile analysis.
Args:
trades (TradesData): Object containing raw trades DataFrame with columns 'timestamp', 'price', and 'amount'.
TradesData ensures the data is preprocessed and ready for bar construction.
Raises:
ValueError: If required columns are missing from trades data or if data is not properly formatted.
See Also:
:class:`finmlkit.bar.kit.TimeBarKit`: A concrete subclass for fixed-time interval bars.
:class:`finmlkit.bar.kit.TickBarKit`: For bars based on tick counts.
:class:`finmlkit.bar.kit.VolumeBarKit`: For volume-threshold bars.
"""
[docs]
def __init__(self, trades: TradesData):
"""
Initialize the bar builder with raw trades data.
:param trades: TradesData object containing raw trades DataFrame with columns 'timestamp', 'price', and 'amount'.
"""
self.trades_df = trades.data
self._close_ts: Optional[NDArray[np.int64]] = None
self._close_indices: Optional[NDArray[np.int64]] = None
self._highs: Optional[NDArray[np.float64]] = None
self._lows: Optional[NDArray[np.float64]] = None
def __str__(self) -> str:
members = "\n".join(f"{k}: {v}" for k, v in self.__dict__.items())
buf = io.StringIO()
try:
self.trades_df.info(buf=buf)
info = buf.getvalue()
except Exception:
info = "<unavailable>"
return (
f"Class: {self.__class__.__name__} with members:\n"
f"{members}\n"
f"Raw trades data:\n{info}"
)
[docs]
@abstractmethod
def _comp_bar_close(self) -> Tuple[NDArray[np.int64], NDArray[np.int64]]:
"""
Abstract method to generate bar close timestamps and indices.
:returns: Tuple of close timestamps and their corresponding indices.
"""
pass
[docs]
def _set_bar_close(self):
"""
Calculate and sets the close timestamps and indices if not already calculated.
"""
if self._close_ts is None and self._close_indices is None:
logger.info("Calculating bar close tick indices and timestamps...")
self._close_ts, self._close_indices = self._comp_bar_close()
@property
def bar_close_indices(self) -> Optional[NDArray[np.int64]]:
"""
Return the bar close indices in the raw trades data.
:return: The **bar close** indices regarding the raw trades data as a numpy array of int64.
"""
if self._close_indices is None:
self._set_bar_close()
return self._close_indices[1:] # Exclude the first timestamp as it is the bar open timestamp
@property
def bar_close_timestamps(self) -> Optional[NDArray[np.int64]]:
"""
Return the bar close timestamps in the raw trades data.
:return: The **bar close** ns timestamps as a numpy array of int64.
"""
if self._close_ts is None:
self._set_bar_close()
return self._close_ts[1:] # Exclude the first timestamp as it is the bar open timestamp
[docs]
def build_ohlcv(self) -> pd.DataFrame:
"""
Build the bar features using the generated indices and raw trades data.
:returns: A dataframe containing the OHLCV + VWAP features with datetime index corresponding to the bar open timestamps.
"""
self._set_bar_close() # Ensure bar close indices and timestamps are set
ohlcv_tuple = comp_bar_ohlcv(
self.trades_df['price'].values,
self.trades_df['amount'].values,
self._close_indices
)
self._highs, self._lows = ohlcv_tuple[1], ohlcv_tuple[2]
logger.info("OHLCV bar calculated successfully.")
ohlcv_df = pd.DataFrame({
'timestamp': self.bar_close_timestamps,
'open': ohlcv_tuple[0],
'high': ohlcv_tuple[1],
'low': ohlcv_tuple[2],
'close': ohlcv_tuple[3],
'volume': ohlcv_tuple[4],
'trades': ohlcv_tuple[6],
'median_trade_size': ohlcv_tuple[7],
'vwap': ohlcv_tuple[5]
})
logger.info("OHLCV bar converted to DataFrame.")
# Convert timestamps to datetime index
ohlcv_df['timestamp'] = pd.to_datetime(ohlcv_df['timestamp'], unit='ns')
ohlcv_df.set_index('timestamp', inplace=True)
# if there is a self.interval attribute, set the frequency to the interval
if hasattr(self, 'interval'):
ohlcv_df.index.freq = pd.Timedelta(seconds=self.interval)
return ohlcv_df
[docs]
def build_directional_features(self) -> pd.DataFrame:
"""
Build the directional features using the generated indices and raw trades data.
:returns: A dataframe containing the directional features:
ticks_buy, ticks_sell, volume_buy, volume_sell, dollars_buy, dollars_sell, max_spread,
cum_volumes_min, cum_volumes_max, cum_dollars_min, cum_dollars_max.
"""
self._set_bar_close()
directional_tuple = comp_bar_directional_features(
self.trades_df['price'].values,
self.trades_df['amount'].values,
self._close_indices,
self.trades_df['side'].values.astype(np.int8),
)
logger.info("Directional features calculated successfully.")
directional_df = pd.DataFrame({
'timestamp': self.bar_close_timestamps, # Close bar timestamps convention!!
'ticks_buy': directional_tuple[0],
'ticks_sell': directional_tuple[1],
'volume_buy': directional_tuple[2],
'volume_sell': directional_tuple[3],
'dollars_buy': directional_tuple[4],
'dollars_sell': directional_tuple[5],
'mean_spread': directional_tuple[6],
'max_spread': directional_tuple[7],
'cum_ticks_min': directional_tuple[8],
'cum_ticks_max': directional_tuple[9],
'cum_volume_min': directional_tuple[10],
'cum_volume_max': directional_tuple[11],
'cum_dollars_min': directional_tuple[12],
'cum_dollars_max': directional_tuple[13]
})
logger.info("Directional features converted to DataFrame.")
# Convert timestamps to datetime index
directional_df['timestamp'] = pd.to_datetime(directional_df['timestamp'], unit='ns')
directional_df.set_index('timestamp', inplace=True)
return directional_df
[docs]
def build_trade_size_features(self, theta: Optional[NDArray[np.float64]], theta_mult: float = 5.0) -> pd.DataFrame:
"""
Build the trade size features using the generated indices and raw trades data.
:param theta: Optional typical trade size (e.g., 30 day rolling median trade size).
:param theta_mult: Multiplier for theta to define the block size threshold. Default is 5.0.
:returns: A dataframe containing the trade size features:
mean_size_rel, size_95_rel, pct_block, size_gini.
"""
self._set_bar_close() # Ensure bar close indices and timestamps are set
trade_size_tuple = comp_bar_trade_size_features(
self.trades_df['amount'].values,
theta,
self._close_indices,
theta_mult
)
logger.info("Trade size features calculated successfully.")
trade_size_df = pd.DataFrame({
'timestamp': self.bar_close_timestamps,
'mean_size_rel': trade_size_tuple[0],
'size_95_rel': trade_size_tuple[1],
'pct_block': trade_size_tuple[2],
'size_gini': trade_size_tuple[3]
})
logger.info("Trade size features converted to DataFrame.")
# Convert timestamps to datetime index
trade_size_df['timestamp'] = pd.to_datetime(trade_size_df['timestamp'], unit='ns')
trade_size_df.set_index('timestamp', inplace=True)
return trade_size_df
# --------------------------------------------------------------------------------------------
# CORE FUNCTIONS
# --------------------------------------------------------------------------------------------
[docs]
@njit(nogil=True, parallel=True)
def comp_bar_ohlcv(
prices: NDArray[np.float64],
volumes: NDArray[np.float64],
bar_close_indices: NDArray[np.int64],
) -> tuple[
NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], NDArray[np.float32], NDArray[
np.float64], NDArray[np.int64], NDArray[np.float64]]:
"""
Build the candlestick bar from raw trades data based on bar close indices.
:param prices: Trade prices.
:param volumes: Trade volumes.
:param bar_close_indices: Indices marking the end of each bar.
:returns: Tuple containing:
- open: Opening price of each bar.
- high: Highest price of each bar.
- low: Lowest price of each bar.
- close: Closing price of each bar.
- volume: Total traded volume in each bar.
- vwap: Volume-weighted average price of each bar.
- bar_trades: Number of trades in each bar.
- bar_median_trade_size: Median trade size in each bar.
"""
# Check the input arrays match in length
if len(prices) != len(volumes):
raise ValueError("Prices and volumes arrays must have the same length.")
if len(bar_close_indices) < 2:
raise ValueError("Bar close indices must contain at least two elements.")
n_bars = len(bar_close_indices) - 1
bar_high = np.zeros(n_bars, dtype=np.float64)
bar_low = np.zeros(n_bars, dtype=np.float64)
bar_open = np.zeros(n_bars, dtype=np.float64)
bar_close = np.zeros(n_bars, dtype=np.float64)
bar_volume = np.zeros(n_bars, dtype=np.float32)
bar_trades = np.zeros(n_bars, dtype=np.int64)
bar_median_trade_size = np.zeros(n_bars, dtype=np.float64)
bar_vwap = np.zeros(n_bars, dtype=np.float64)
for i in prange(n_bars):
start = bar_close_indices[i]
end = bar_close_indices[i + 1]
# Handle empty bar
if start == end:
bar_open[i] = prices[end]
bar_close[i] = prices[end]
bar_high[i] = prices[end]
bar_low[i] = prices[end]
bar_volume[i] = 0.0
bar_vwap[i] = 0.0
bar_trades[i] = 0
bar_median_trade_size[i] = 0.0
continue
# Start from the next trade (start=previous bar close)
start += 1
# Initialize variables for this bar
high_price = prices[start]
low_price = prices[start]
total_volume = 0.0
total_dollar = 0.0
# Count number of trades and collect trade sizes for median calculation
trade_count = end - start + 1
trade_sizes = np.zeros(trade_count, dtype=np.float64)
trade_idx = 0
# Iterate over trades in the current bar, inclusive for the last trade (bar close)
for j in range(start, end + 1):
price = prices[j]
volume = volumes[j]
# Store trade size for median calculation
trade_sizes[trade_idx] = volume
trade_idx += 1
if price > high_price:
high_price = price
if price < low_price:
low_price = price
total_volume += volume
total_dollar += price * volume
bar_open[i] = prices[start] # First trade price in the bar exclusive
bar_close[i] = prices[end] # Last trade price in the bar inclusive
bar_high[i] = high_price
bar_low[i] = low_price
bar_volume[i] = total_volume
bar_vwap[i] = total_dollar / total_volume if total_volume > 0 else 0.0
bar_trades[i] = trade_count
# Calculate median trade size
if trade_count > 0:
bar_median_trade_size[i] = np.median(trade_sizes)
else:
bar_median_trade_size[i] = 0.0
return bar_open, bar_high, bar_low, bar_close, bar_volume, bar_vwap, bar_trades, bar_median_trade_size
[docs]
@njit(nogil=True, parallel=True)
def comp_bar_directional_features(
prices: NDArray[np.float64],
volumes: NDArray[np.float64],
bar_close_indices: NDArray[np.int64],
trade_sides: NDArray[np.int8]
) -> tuple[
NDArray[np.int64], NDArray[np.int64],
NDArray[np.float32], NDArray[np.float32],
NDArray[np.float32], NDArray[np.float32],
NDArray[np.float32], NDArray[np.float32],
NDArray[np.int64], NDArray[np.int64],
NDArray[np.float32], NDArray[np.float32],
NDArray[np.float32], NDArray[np.float32]
]:
"""
Compute directional bar features such as tick counts, volumes, dollars, spreads, and cumulative flows.
:param prices: Trade prices.
:param volumes: Trade volumes.
:param bar_close_indices: Indices marking the end of each bar.
:param trade_sides: Trade direction (1 for market buy, -1 for market sell).
:returns: Tuple containing:
- ticks_buy: Number of buy trades per bar.
- ticks_sell: Number of sell trades per bar.
- volume_buy: Volume of buy trades per bar.
- volume_sell: Volume of sell trades per bar.
- dollars_buy: Dollar value of buy trades per bar.
- dollars_sell: Dollar value of sell trades per bar.
- mean_spread: Mean bid/ask spread within each bar.
- max_spread: Maximum spread within each bar.
- cum_ticks_min: Minimum cumulative tick imbalance.
- cum_ticks_max: Maximum cumulative tick imbalance.
- cum_volumes_min: Minimum cumulative volume imbalance.
- cum_volumes_max: Maximum cumulative volume imbalance.
- cum_dollars_min: Minimum cumulative dollar imbalance.
- cum_dollars_max: Maximum cumulative dollar imbalance.
"""
n_bars = len(bar_close_indices) - 1
ticks_buy = np.zeros(n_bars, dtype=np.int64)
ticks_sell = np.zeros(n_bars, dtype=np.int64)
volume_buy = np.zeros(n_bars, dtype=np.float32)
volume_sell = np.zeros(n_bars, dtype=np.float32)
dollars_buy = np.zeros(n_bars, dtype=np.float32)
dollars_sell = np.zeros(n_bars, dtype=np.float32)
max_spread = np.zeros(n_bars, dtype=np.float32)
mean_spread = np.zeros(n_bars, dtype=np.float32)
# Initialize cumulative min and max arrays with appropriate values
cum_ticks_min = np.full(n_bars, 1e9, dtype=np.int64) # inf (large value)
cum_ticks_max = np.full(n_bars, -1e9, dtype=np.int64) # -inf (small value)
cum_volumes_min = np.full(n_bars, 1e9, dtype=np.float32)
cum_volumes_max = np.full(n_bars, -1e9, dtype=np.float32)
cum_dollars_min = np.full(n_bars, 1e9, dtype=np.float32)
cum_dollars_max = np.full(n_bars, -1e9, dtype=np.float32)
# Compute the bar directional features
for i in prange(n_bars):
start = bar_close_indices[i] + 1 # Start from the next trade (start=previous bar close)
end = bar_close_indices[i + 1]
current_tics_buy = 0
current_tics_sell = 0
current_volume_buy = 0.0
current_volume_sell = 0.0
current_dollars_buy = 0.0
current_dollars_sell = 0.0
# Cumulative values
current_cum_ticks = 0
current_cum_volumes = 0.0
current_cum_dollars = 0.0
current_max_spread = 0.0
current_cum_spread = 0.0
# Initialize previous tick sign for spread calculation
if end > start:
prev_tick_sign = trade_sides[start - 1] # Previous trade side at the start of the bar
else:
prev_tick_sign = 0 # Default value if no trades in bar
# Iterate over trades in the current bar (start exclusive, end inclusive)
for j in range(start, end + 1):
current_tick_sign = trade_sides[j]
# Calculate the spread between buy and sell prices
if current_tick_sign != prev_tick_sign:
spread = abs(prices[j] - prices[j - 1])
if spread > current_max_spread:
current_max_spread = spread
current_cum_spread += spread
prev_tick_sign = current_tick_sign
if current_tick_sign == 1:
current_tics_buy += 1
current_volume_buy += volumes[j]
current_dollars_buy += prices[j] * volumes[j]
# Cumulative values
current_cum_ticks += 1
current_cum_volumes += volumes[j]
current_cum_dollars += prices[j] * volumes[j]
elif current_tick_sign == -1:
current_tics_sell += 1
current_volume_sell += volumes[j]
current_dollars_sell += prices[j] * volumes[j]
# Cumulative values
current_cum_ticks -= 1
current_cum_volumes -= volumes[j]
current_cum_dollars -= prices[j] * volumes[j]
else:
continue
# Update the cumulative min and max values
cum_ticks_max[i] = max(cum_ticks_max[i], current_cum_ticks)
cum_ticks_min[i] = min(cum_ticks_min[i], current_cum_ticks)
cum_volumes_max[i] = max(cum_volumes_max[i], current_cum_volumes)
cum_volumes_min[i] = min(cum_volumes_min[i], current_cum_volumes)
cum_dollars_max[i] = max(cum_dollars_max[i], current_cum_dollars)
cum_dollars_min[i] = min(cum_dollars_min[i], current_cum_dollars)
ticks_buy[i] = current_tics_buy
ticks_sell[i] = current_tics_sell
volume_buy[i] = current_volume_buy
volume_sell[i] = current_volume_sell
dollars_buy[i] = current_dollars_buy
dollars_sell[i] = current_dollars_sell
max_spread[i] = current_max_spread
mean_spread[i] = current_cum_spread / (current_tics_buy + current_tics_sell)
return (
ticks_buy, ticks_sell,
volume_buy, volume_sell,
dollars_buy, dollars_sell,
mean_spread, max_spread,
cum_ticks_min, cum_ticks_max,
cum_volumes_min, cum_volumes_max,
cum_dollars_min, cum_dollars_max
)
[docs]
@njit(nogil=True, parallel=True)
def comp_bar_trade_size_features(
amounts: NDArray[np.float64],
theta: NDArray[np.float64],
bar_close_indices: NDArray[np.int64],
theta_mult: float
) -> tuple[NDArray[np.float32], NDArray[np.float32], NDArray[np.float32], NDArray[np.float32]]:
"""
Compute the size distribution features for each bar, including the mean, 95 percentile, pct_block relative to thehta and size_gini.
Are there large trade block prints in the bar?
:param amounts: Array of trade amounts (raw trade sizes).
:param theta: The typical trade size (e.g., 30 day rolling median trade size).
:param bar_close_indices: Indices marking the end of each bar.
:param theta_mult: Multiplier for theta to define the block size threshold. (eg. 5 times the median trade size)
:returns: A tuple containing:
- mean_size_rel: Mean trade size relative to theta per bar: log1p(mean_size / theta)
- size_95_rel: 95th percentile of trade sizes per bar relative to theta: log1p(size_95 / theta)
- pct_block: Percentage of trades that are larger than theta per bar: SUM( size_i [ size_i>theta ] / volume )
- size_gini: Gini coefficient of trade sizes per bar.
"""
if len(theta) != len(bar_close_indices) - 1:
raise ValueError("Theta should match the the number of bars (len(bar_close_indices) - 1).")
n_bars = len(bar_close_indices) - 1
mean_size_rel = np.full(n_bars, np.nan, dtype=np.float32)
size_95_rel = np.full(n_bars, np.nan, dtype=np.float32)
pct_block = np.full(n_bars, np.nan, dtype=np.float32)
size_gini = np.full(n_bars, np.nan, dtype=np.float32)
for i in prange(n_bars):
start = bar_close_indices[i] + 1 # Start from the next trade (start=previous bar close)
end = bar_close_indices[i + 1]
# Empty bar guard
if start > end: continue
if theta[i] == 0.0: continue
thr = theta[i] * theta_mult # Block size threshold
amounts_bar = amounts[start:end + 1] # End inclusive
mean_size_rel[i] = np.log1p(np.mean(amounts_bar) / thr)
size_95_rel[i] = np.log1p(np.percentile(amounts_bar, 95) / thr)
total_volume = amounts_bar.sum()
if total_volume == 0:
continue
# Calculate pct_block: Percentage of trades larger than block size threshold
block_volume = 0.0
for amount in amounts_bar:
if amount > thr:
block_volume += amount
pct_block[i] = block_volume / total_volume
# Calculate Gini coefficient for trade sizes
if amounts_bar.size == 1:
size_gini[i] = 0.0
else:
size_gini[i] = 1.0 - np.sum((amounts_bar / total_volume) ** 2)
return mean_size_rel, size_95_rel, pct_block, size_gini