finmlkit.bar.data_model module

class finmlkit.bar.data_model.FootprintData(bar_timestamps: ndarray[tuple[int, ...], dtype[int64]], price_tick: float, price_levels: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[int32]]]] | List[ndarray[tuple[int, ...], dtype[int32]]], buy_volumes: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[float32]]]] | List[ndarray[tuple[int, ...], dtype[float32]]], sell_volumes: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[float32]]]] | List[ndarray[tuple[int, ...], dtype[float32]]], buy_ticks: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[int32]]]] | List[ndarray[tuple[int, ...], dtype[int32]]], sell_ticks: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[int32]]]] | List[ndarray[tuple[int, ...], dtype[int32]]], buy_imbalances: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[bool]]]] | List[ndarray[tuple[int, ...], dtype[bool]]], sell_imbalances: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[bool]]]] | List[ndarray[tuple[int, ...], dtype[bool]]], cot_price_levels: ndarray[tuple[int, ...], dtype[int32]] | None = None, sell_imbalances_sum: ndarray[tuple[int, ...], dtype[uint16]] | None = None, buy_imbalances_sum: ndarray[tuple[int, ...], dtype[uint16]] | None = None, imb_max_run_signed: ndarray[tuple[int, ...], dtype[int16]] | None = None, vp_skew: ndarray[tuple[int, ...], dtype[float64]] | None = None, vp_gini: ndarray[tuple[int, ...], dtype[float64]] | None = None, _datetime_index: Series = None)[source]

Bases: object

Container for dynamic memory footprint calculations including trade volumes, price levels, and imbalance information.

Parameters:
  • bar_timestamps – Timestamps of each bar in nanoseconds.

  • price_tick – Price tick size.

  • price_levels – Array of price levels per bar.

  • buy_volumes – Buy volumes per price level.

  • sell_volumes – Sell volumes per price level.

  • buy_ticks – Number of buy ticks per price level.

  • sell_ticks – Number of sell ticks per price level.

  • buy_imbalances – Buy imbalance flags per price level.

  • sell_imbalances – Sell imbalance flags per price level.

  • cot_price_levels – Optional Commitment of Traders price levels.

  • sell_imbalances_sum – Optional total sell imbalance counts per bar.

  • buy_imbalances_sum – Optional total buy imbalance counts per bar.

  • imb_max_run_signed – Optional longest signed imbalance run for each bar.

  • vp_skew – Optional volume profile skew for each bar (positive = buy pressure above VWAP).

  • vp_gini – Optional volume profile Gini coefficient for each bar (0 = concentrated, →1 = even).

__init__(bar_timestamps: ndarray[tuple[int, ...], dtype[int64]], price_tick: float, price_levels: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[int32]]]] | List[ndarray[tuple[int, ...], dtype[int32]]], buy_volumes: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[float32]]]] | List[ndarray[tuple[int, ...], dtype[float32]]], sell_volumes: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[float32]]]] | List[ndarray[tuple[int, ...], dtype[float32]]], buy_ticks: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[int32]]]] | List[ndarray[tuple[int, ...], dtype[int32]]], sell_ticks: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[int32]]]] | List[ndarray[tuple[int, ...], dtype[int32]]], buy_imbalances: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[bool]]]] | List[ndarray[tuple[int, ...], dtype[bool]]], sell_imbalances: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[bool]]]] | List[ndarray[tuple[int, ...], dtype[bool]]], cot_price_levels: ndarray[tuple[int, ...], dtype[int32]] | None = None, sell_imbalances_sum: ndarray[tuple[int, ...], dtype[uint16]] | None = None, buy_imbalances_sum: ndarray[tuple[int, ...], dtype[uint16]] | None = None, imb_max_run_signed: ndarray[tuple[int, ...], dtype[int16]] | None = None, vp_skew: ndarray[tuple[int, ...], dtype[float64]] | None = None, vp_gini: ndarray[tuple[int, ...], dtype[float64]] | None = None, _datetime_index: Series = None) None
_datetime_index: Series = None
bar_timestamps: ndarray[tuple[int, ...], dtype[int64]]
buy_imbalances: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[bool]]]] | List[ndarray[tuple[int, ...], dtype[bool]]]
buy_imbalances_sum: ndarray[tuple[int, ...], dtype[uint16]] | None = None
buy_ticks: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[int32]]]] | List[ndarray[tuple[int, ...], dtype[int32]]]
buy_volumes: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[float32]]]] | List[ndarray[tuple[int, ...], dtype[float32]]]
cast_to_numba_list()[source]

Convert internal arrays to NumbaList for JIT-compatible processing.

cast_to_numpy()[source]

Convert internal lists to NumPy arrays for general-purpose processing.

cot_price_levels: ndarray[tuple[int, ...], dtype[int32]] | None = None
classmethod from_numba(data: Tuple, price_tick: float) FootprintData[source]

Create a FootprintData object from Numba-based output. :param data: Output tuple from comp_bar_footprint. :param price_tick: Tick size for price levels. :returns: A validated FootprintData instance. :raises ValueError: If data length is inconsistent.

get_df()[source]

Convert the footprint data into a pandas DataFrame. :returns: A DataFrame with structured footprint information.

imb_max_run_signed: ndarray[tuple[int, ...], dtype[int16]] | None = None
is_valid() bool[source]

Check if all internal arrays are consistent. :returns: True if valid, False otherwise.

memory_usage()[source]

Calculate the approximate memory usage of this object in MB.

price_levels: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[int32]]]] | List[ndarray[tuple[int, ...], dtype[int32]]]
price_tick: float
sell_imbalances: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[bool]]]] | List[ndarray[tuple[int, ...], dtype[bool]]]
sell_imbalances_sum: ndarray[tuple[int, ...], dtype[uint16]] | None = None
sell_ticks: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[int32]]]] | List[ndarray[tuple[int, ...], dtype[int32]]]
sell_volumes: ndarray[tuple[int, ...], dtype[ndarray[tuple[int, ...], dtype[float32]]]] | List[ndarray[tuple[int, ...], dtype[float32]]]
vp_gini: ndarray[tuple[int, ...], dtype[float64]] | None = None
vp_skew: ndarray[tuple[int, ...], dtype[float64]] | None = None
class finmlkit.bar.data_model.TradesData(ts: ndarray[tuple[int, ...], dtype[_ScalarType_co]], px: ndarray[tuple[int, ...], dtype[_ScalarType_co]], qty: ndarray[tuple[int, ...], dtype[_ScalarType_co]], id: ndarray[tuple[int, ...], dtype[_ScalarType_co]] = None, *, is_buyer_maker: ndarray[tuple[int, ...], dtype[_ScalarType_co]] = None, side=None, dt_index: DatetimeIndex | None = None, timestamp_unit: str | None = None, preprocess: bool = False, proc_res: str | None = None, name=None)[source]

Bases: object

Preprocessor class for raw trades data, designed for efficient bar building and financial analysis.

This class handles standardization of column names, timestamp conversion, trade merging, side inference, and data validation for consistent processing across different data sources. It serves as the primary data preparation component for high-frequency trading analysis and bar construction workflows.

In high-frequency trading data, raw trades often come in various formats with inconsistent timestamps, split trades at the same price level, missing side information, and data integrity issues. This class addresses these challenges by providing a robust preprocessing pipeline that:

  • Normalizes timestamps to nanosecond precision for consistent temporal analysis

  • Merges split trades that occur at identical timestamps and price levels to reduce noise

  • Infers trade sides (buyer/seller initiated) when not explicitly provided

  • Validates data integrity by detecting trade ID discontinuities and temporal inconsistencies

  • Provides efficient storage via compressed HDF5 format with monthly partitioning

  • Enables time-slice queries with multiprocessing support for large datasets

The preprocessing pipeline follows these steps when preprocess=True:

  1. Timestamp Conversion: Convert to nanosecond precision from various units (s, ms, μs, ns)

  2. Data Sorting: Sort by trade ID first to detect gaps, then by timestamp for chronological order

  3. Trade Merging: Aggregate trades with identical timestamps and prices.

  4. Resolution Processing: Optionally round timestamps to specified resolution (e.g., millisecond)

  5. Side Inference: Determine trade direction from price movements when side data is unavailable

The class supports monthly HDF5 partitioning for efficient storage and retrieval of large datasets. Each month’s data is stored under /trades/YYYY-MM with accompanying metadata for fast range queries. This approach enables handling multi-terabyte datasets while maintaining query performance.

Data Integrity Monitoring: The class tracks discontinuities in trade IDs and timestamps, computing missing data percentages and flagging potential data quality issues. This is crucial for ensuring reliable downstream analysis.

Tip

For optimal performance with large datasets (>10Gb trades), enable preprocessing and use HDF5 storage with compression. The class automatically handles memory-efficient processing via chunking and can leverage multiprocessing for data loading operations.

Note

Trade side inference uses price tick rule and other heuristics when explicit side information is unavailable. For critical applications, prefer data sources with explicit buyer/seller flags.

Parameters:
  • ts (NDArray) – Array of timestamps in various units (s, ms, μs, ns). Must be numeric and monotonic.

  • px (NDArray) – Array of trade prices as floating-point values.

  • qty (NDArray) – Array of trade quantities/amounts as floating-point values.

  • id (NDArray, optional) – Array of unique trade identifiers for data validation. Required if preprocess=True.

  • is_buyer_maker (NDArray, optional) – Boolean array indicating buyer-maker status (True if buyer is maker). If provided, used for accurate side determination.

  • side (NDArray, optional) – Pre-computed trade side array (-1: sell, 1: buy). Used when loading from HDF5.

  • dt_index (pd.DatetimeIndex, optional) – Pre-computed datetime index. If None, created from timestamps.

  • timestamp_unit (str, optional) – Explicit timestamp unit (‘s’, ‘ms’, ‘us’, ‘ns’). Auto-inferred if None.

  • preprocess (bool, optional) – Enable full preprocessing pipeline. Default: False.

  • proc_res (str, optional) – Target timestamp resolution for rounding (‘ms’, ‘us’). Default: None (no rounding).

  • name (str, optional) – Instance name for logging purposes. Default: None.

Raises:
  • TypeError – If input arrays are not numpy ndarrays or have incompatible types.

  • ValueError – If required columns are missing, timestamp format is invalid, or preprocessing fails.

Examples

Basic usage with preprocessing:

>>> import numpy as np
>>> import pandas as pd
>>> from finmlkit.bar.data_model import TradesData
>>> # Raw trades data
>>> timestamps = np.array([1609459200000, 1609459201000, 1609459202000])  # ms
>>> prices = np.array([100.0, 100.5, 99.8])
>>> quantities = np.array([1.5, 2.0, 0.8])
>>> trade_ids = np.array([1001, 1002, 1003])
>>>
>>> # Create TradesData with preprocessing
>>> trades = TradesData(timestamps, prices, quantities, trade_ids,
...                     timestamp_unit='ms', preprocess=True, name='BTCUSD')
>>> print(f"Processed {len(trades.data)} trades")
Processed 3 trades

Loading from HDF5 with time filtering:

>>> 
>>> # Load specific time range with multiprocessing
>>> trades = TradesData.load_trades_h5('trades.h5',
...                                     start_time='2021-01-01',
...                                     end_time='2021-01-31',
...                                     enable_multiprocessing=True)
>>> trades.set_view_range('2021-01-15', '2021-01-20')
>>> subset = trades.data  # Only data in view range

See also

finmlkit.bar.base.BarBuilderBase: Uses TradesData for constructing various bar types. finmlkit.bar.utils.merge_split_trades(): Core function for trade aggregation. finmlkit.bar.utils.comp_trade_side_vector(): Trade side inference algorithm.

References

__init__(ts: ndarray[tuple[int, ...], dtype[_ScalarType_co]], px: ndarray[tuple[int, ...], dtype[_ScalarType_co]], qty: ndarray[tuple[int, ...], dtype[_ScalarType_co]], id: ndarray[tuple[int, ...], dtype[_ScalarType_co]] = None, *, is_buyer_maker: ndarray[tuple[int, ...], dtype[_ScalarType_co]] = None, side=None, dt_index: DatetimeIndex | None = None, timestamp_unit: str | None = None, preprocess: bool = False, proc_res: str | None = None, name=None)[source]

Initialize the TradesData with raw trades data.

Parameters:
  • ts – array of timestamps

  • px – array of prices

  • qty – array of quantity or amount of trades

  • id – array of trades id

  • is_buyer_maker – Optional Array of side info: True if buyer maker, False otherwise. If None side information will be inferred from data.

  • side – Optional Array Market order side information (-1: sell, 1: buy) [needed when loading from HDF5 store].

  • dt_index – Optional DatetimeIndex for the trades data. If provided, it will be used as the index. [needed when loading from HDF5 store].

  • timestamp_unit – (Optional) timestamp unit (e.g., ‘ms’, ‘us’, ‘ns’); inferred if None.

  • proc_res – (Optional) processing resolution for timestamps (e.g., ‘ms’ cuts us to ms resolution).

  • preprocess – If True, runs the preprocessing pipeline (sorting, merging split trades etc…)

  • name – Optional name for the trades data instance (logging purposes).

Raises:

ValueError – If required columns are missing or timestamp format is invalid.

_add_trade_side_info() None[source]

Extract trade side information from the trades data.

Returns:

None - modifies the trades DataFrame in place to include a ‘side’ column.

_apply_timestamp_resolution(proc_res: str | None) None[source]

Apply processing resolution to timestamps if specified.

Parameters:

proc_res – Target processing resolution for timestamps.

Raises:

ValueError – If processing resolution is invalid.

_convert_timestamps_to_ns()[source]

Convert timestamps to nanosecond representation. :raises ValueError: If timestamp format is invalid.

_infer_timestamp_unit() str[source]

Infer the unit of timestamps in the trades data if not explicitly provided. :return: Inferred or provided timestamp unit.

classmethod _keys_for_timerange(store: HDFStore, start: Timestamp | None, end: Timestamp | None) list[str][source]

Internal helper – determine which monthly groups intersect the [start, end] interval by consulting the per‑group metadata.

_merge_trades()[source]

Merge trades that occur at the same timestamp and price level.

_sort_trades() None[source]

Sort trades by timestamp to ensure correct order for processing. Also performs data integrity checks by identifying discontinuities in trade IDs.

_validate_data()[source]

Check for gaps in trade IDs

property data: DataFrame

Get the processed trades data as a DataFrame, respecting the active view range.

Returns the full dataset if no view range is set, or a time-filtered subset otherwise. The DataFrame includes columns: timestamp, price, amount, and optionally side.

Returns:

DataFrame containing trades data with datetime index.

property end_date

Get the end date of the trades data.

Returns:

End date as a pandas Timestamp.

classmethod load_trades_h5(filepath: str, *, key: str | None = None, start_time: str | Timestamp | None = None, end_time: str | Timestamp | None = None, n_workers: int | None = None, enable_multiprocessing: bool = True, min_groups_for_mp: int = 2) TradesData[source]

Load trades from HDF5 storage with optional multiprocessing and time filtering.

Supports three loading modes:

  1. Single month: Load specific monthly partition using key parameter

  2. Time range: Auto-discover monthly groups intersecting [start_time, end_time]

  3. Filtered month: Combine key with time range for constrained loading

Multiprocessing is automatically enabled for loading multiple monthly groups, significantly improving performance for large time ranges.

Parameters:
  • filepath – Path to HDF5 file containing trades data.

  • key – Specific monthly key to load (e.g., “2021-03”). If None, uses time range discovery.

  • start_time – Start time for filtering (string or Timestamp). None for no start limit.

  • end_time – End time for filtering (string or Timestamp). None for no end limit.

  • n_workers – Number of worker processes. If None, uses CPU count - 1.

  • enable_multiprocessing – Enable parallel loading for multiple groups. Default: True.

  • min_groups_for_mp – Minimum groups required to trigger multiprocessing. Default: 2.

Returns:

TradesData instance with loaded and concatenated data.

Raises:
  • KeyError – If specified key doesn’t exist or no groups match the time range.

  • ValueError – If no data is successfully loaded from any group.

Examples

Load specific month:

>>> 
>>> trades = TradesData.load_trades_h5('data.h5', key='2021-03')

Load time range with multiprocessing:

>>> 
>>> trades = TradesData.load_trades_h5('data.h5',
...                                     start_time='2021-01-01',
...                                     end_time='2021-12-31',
...                                     n_workers=4)
property orig_timestamp_unit: str

Get the timestamp unit used for processing.

Returns:

Timestamp unit string.

save_h5(filepath: str, *, month_key: str | None = None, complib: str = 'blosc:lz4', complevel: int = 1, mode: str = 'a', chunksize: int = 1000000, overwrite_month: bool = True) str[source]

Persist trades data to HDF5 format with monthly partitioning and compression.

Stores data under /trades/YYYY-MM groups for efficient range queries. Each month includes metadata for fast discovery and data integrity information when available.

Parameters:
  • filepath – Destination HDF5 file path. Parent directories created automatically.

  • month_key – Override automatic monthly key derivation (format: “YYYY-MM”).

  • complib – Compression library (“blosc:lz4”, “blosc:zstd”, “zlib”). Default: “blosc:lz4”.

  • complevel – Compression level (0-9). Higher values increase compression ratio. Default: 1.

  • mode – File access mode (“a” for append, “w” for overwrite). Default: “a”.

  • chunksize – Row chunk size for writing large datasets. Default: 1,000,000.

  • overwrite_month – Prompt for confirmation when overwriting existing monthly data. Default: True.

Returns:

Full HDF5 key path used for storage (e.g., “/trades/2021-03”).

Raises:

ValueError – If user declines to overwrite existing data or if data format is invalid.

set_view_range(start: Timestamp | str, end: Timestamp | str)[source]

Set the active view range for data access, enabling efficient time-slice analysis.

Parameters:
  • start – Start timestamp for the view range. Accepts string or pd.Timestamp.

  • end – End timestamp for the view range. Accepts string or pd.Timestamp.

Raises:

ValueError – If start timestamp is not before end pd.timestamp.

property start_date

Get the start date of the trades data.

Returns:

Start date as a pandas Timestamp.

finmlkit.bar.data_model._is_notebook_environment() bool[source]

Detect if we’re running in a Jupyter notebook environment.

Returns:

True if in notebook, False otherwise

finmlkit.bar.data_model._load_single_h5_group(args: Tuple[str, str, str | None]) DataFrame[source]

Helper function to load a single HDF5 group in a separate process.

Parameters:

args – Tuple of (filepath, h5_key, where_clause)

Returns:

DataFrame with the loaded data