from typing import Literal
from numba import njit, prange
from numpy.typing import NDArray
import numpy as np
import pandas as pd
import math
from typing import Optional, Union
[docs]
@njit(nogil=True, fastmath=True)
def comp_trade_side(price: float, prev_price: float, prev_tick: int) -> Literal[-1, 1]:
"""
Classify a trade as buy or sell using the tick rule from AFML.
:param price: Current trade price.
:param prev_price: Previous trade price.
:param prev_tick: Previous tick direction.
:returns: 1 for an upward move (buy), -1 for a downward move (sell).
"""
epsilon = 1e-12 # Small value to avoid floating point error
dp = price - prev_price
return np.sign(dp) if abs(dp) > epsilon else prev_tick
[docs]
@njit(nogil=True)
def comp_trade_side_vector(prices: NDArray[np.float64]) -> NDArray[np.int8]:
"""
Compute tick rule-based trade sides for a sequence of prices.
:param prices: Sequence of trade prices.
:returns: Sequence of trade sides (1 for buy, -1 for sell).
"""
n = len(prices)
trade_sides = np.zeros(n, dtype=np.int8)
prev_tick_sign = 0
prev_price = np.float64(prices[0]) # Explicit cast to avoid warnings
for i in range(1, n):
price = np.float64(prices[i]) # Explicit cast to avoid warnings
prev_tick_sign = comp_trade_side(price, prev_price, prev_tick_sign)
trade_sides[i] = prev_tick_sign
prev_price = price
return trade_sides
[docs]
@njit(nogil=True)
def comp_price_tick_size(prices: NDArray[np.float64]) -> float:
"""
Estimate the smallest price increment (tick size) based on trade prices.
:param prices: Array of trade prices.
:returns: Estimated price tick size. Returns 0.0 if undeterminable.
:raises ValueError: If input array is empty.
"""
if len(prices) == 0:
raise ValueError("Empty prices array")
# Limit sample size and round to mitigate floating-point errors
price_sample = np.round(prices[:min(10000, len(prices))], decimals=12)
unique_prices = np.unique(price_sample)
if len(unique_prices) <= 1:
return 0.0
diffs = np.diff(unique_prices)
scale = 10.0 ** (-np.floor(np.log10(np.min(diffs[diffs > 0]))))
int_px = np.round(unique_prices * scale).astype(np.int64)
# Calculate greatest common divisor
# tick_int = np.gcd.reduce(np.diff(int_px)) # numba does not support it
tick_int = 0
for diff_int in np.diff(int_px):
if diff_int > 0:
tick_int = diff_int if tick_int == 0 else math.gcd(tick_int, diff_int)
if tick_int == 1:
break
return tick_int / scale
[docs]
@njit(nogil=True)
def comp_price_tick_size_old(prices: NDArray[np.float64]) -> float:
"""
Legacy method to estimate tick size using median price differences.
:param prices: Array of trade prices.
:returns: Rounded tick size estimate.
:raises ValueError: If input array is empty.
"""
# Select first 10000 trades to calculate the price tick size
n_samples = len(prices)
price_sample = prices[:min(10000, n_samples)]
# raise value error if prices is empty
if len(price_sample) == 0:
raise ValueError("Empty prices array")
# Round the prices to mitigate floating-point errors
price_sample = np.round(price_sample, decimals=12)
# Get the sorted unique prices
unique_prices = np.unique(price_sample)
if len(unique_prices) <= 1:
# No variation in prices; tick size is zero
return 0.0
# Calculate the median price tick size
price_tick_size = np.median(np.diff(unique_prices))
if price_tick_size == 0.0:
# Avoid computing log10(0)
return 0.0
# Determine the exponent for adaptive rounding
exponent = np.floor(np.log10(abs(price_tick_size)))
# Specify the desired number of significant digits
desired_significant_digits = 2
# Calculate the number of decimal places to round to
ndigits = int(desired_significant_digits - 1 - exponent)
# Round the tick size adaptively based on the exponent
rounded_tick_size = round(price_tick_size, ndigits)
return rounded_tick_size
[docs]
@njit(nogil=True)
def check_timestamps_order(timestamps: NDArray[np.int64]) -> bool:
"""
Are timestamps sorted in ascending order?
:param timestamps: nanosec timestamps array
:return: True if timestamps are sorted in ascending order else False
"""
for i in range(1, len(timestamps)):
if timestamps[i] < timestamps[i - 1]:
return False
return True
[docs]
@njit(nogil=True)
def fast_sort_trades(timestamps: NDArray[np.int64],
prices: NDArray[np.float64],
amounts: NDArray[np.float32],
is_buyer_maker: Optional[NDArray[np.bool_]] = None) -> \
tuple[NDArray[np.int64], NDArray[np.float64], NDArray[np.float32], Optional[NDArray[np.bool_]]]:
"""
Fast sorting of trade data by timestamps using Numba.
For very large datasets, this is much faster than pandas sorting.
:param timestamps: nanosec timestamp array
:param prices: raw trades price array
:param amounts: raw trades amount array
:param is_buyer_maker: Optional array for trade side
:return: tuple of sorted (timestamps, prices, amounts, is_buyer_maker)
"""
idxs = np.argsort(timestamps) # quicksort algorithm
return (timestamps[idxs],
prices[idxs],
amounts[idxs],
is_buyer_maker[idxs] if is_buyer_maker is not None else None)
[docs]
@njit(nogil=True)
def merge_split_trades(
timestamps: NDArray[np.int64],
prices: NDArray[np.float64],
amounts: NDArray[np.float32],
is_buyer_maker: Optional[NDArray[np.bool_]]
) -> tuple[NDArray[np.int64], NDArray[np.float64], NDArray[np.float32], NDArray[np.int8]]:
"""
Merge split transaction trades. Inputs must already be ordered by (timestamp, price, side).
:param timestamps: nanosec timestamp array
:param prices: raw trades price array
:param amounts: raw trades amount array
:param is_buyer_maker: Optional array to compute side
:return: a tuple of arrays containing:
1. the merged trades timestamps
2. the merged trades price
3. the merged trades amount
4. the merged trades side if is_buyer_maker provided (else empty list)
"""
n = len(timestamps)
merged_timestamps = np.empty(n, dtype=np.int64)
merged_prices = np.empty(n, dtype=np.float64)
merged_amounts = np.empty(n, dtype=np.float32)
merged_side = None
with_side = is_buyer_maker is not None
if with_side:
merged_side = np.empty(n, dtype=np.int8)
# Initialize with the first trade
merged_timestamps[0] = timestamps[0]
merged_prices[0] = prices[0]
merged_amounts[0] = amounts[0]
if with_side:
merged_side[0] = -1 if is_buyer_maker[0] else 1
merged_idx = 0
for i in range(1, n):
same_trade = (timestamps[i] == merged_timestamps[merged_idx] and
abs(prices[i] - merged_prices[merged_idx]) < 1e-8)
if with_side:
same_trade &= is_buyer_maker[i] == (merged_side[merged_idx] == -1)
if same_trade:
merged_amounts[merged_idx] += amounts[i]
# Price and side remain unchanged for merged trades
else:
# New timestamp or price; store the new unique trade values
merged_idx += 1
merged_timestamps[merged_idx] = timestamps[i]
merged_prices[merged_idx] = prices[i]
merged_amounts[merged_idx] = amounts[i]
if merged_side is not None:
merged_side[merged_idx] = -1 if is_buyer_maker[i] else 1
# Trim the arrays to the actual number of merged trades
merged_timestamps = merged_timestamps[:merged_idx + 1]
merged_prices = merged_prices[:merged_idx + 1]
merged_amounts = merged_amounts[:merged_idx + 1]
if merged_side is not None:
merged_side = merged_side[:merged_idx + 1]
return (merged_timestamps,
merged_prices,
merged_amounts,
merged_side if with_side else np.empty(0, dtype=np.int8))