API Reference¶
This page provides comprehensive API documentation for the HPC-Inference package.
hpc_inference.datasets
¶
High-performance dataset loaders for image data.
ImageFolderDataset
¶
Bases: IterableDataset
Loads images from a folder in a streaming fashion, with support for distributed processing. - Handles common image extensions - Loads images as RGB by default (can be changed via color_mode) - Validates images using PIL if validate=True - Supports distributed processing with rank-based file partitioning - Supports multi-worker data loading within each rank
Returns (uuid, processed_data) tuples where: - uuid: filename (without path) as identifier - processed_data: tensor (single model) or dict of tensors (multi-model)
__init__(image_dir, preprocess=None, color_mode='RGB', validate=False, rank=None, world_size=None, evenly_distribute=True, stagger=False, uuid_mode='filename')
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image_dir
|
Union[str, Path]
|
Path to image folder. |
required |
preprocess
|
Optional[Union[Callable, Dict[str, Callable]]]
|
Transform(s) to apply to images. - If callable: single model preprocessing - If dict: {model_name: preprocess_fn} for multi-model - If None: return PIL image as-is |
None
|
color_mode
|
str
|
Color mode for PIL.Image.convert (default: "RGB"). |
'RGB'
|
validate
|
bool
|
If True, validates images in the directory using PIL. |
False
|
rank
|
Optional[int]
|
Current process rank for distributed processing. |
None
|
world_size
|
Optional[int]
|
Total number of processes for distributed processing. |
None
|
evenly_distribute
|
bool
|
Whether to distribute files evenly based on size. Defaults to True. If False, files are distributed in a round-robin manner. |
True
|
stagger
|
bool
|
Whether to stagger the start of each worker. Defaults to False. |
False
|
uuid_mode
|
Literal['filename', 'relative', 'fullpath', 'hash']
|
How to generate UUIDs from image paths.
- "filename": Use just the filename (image001.jpg)
- "relative": Use relative path from image_dir (subfolder/image001.jpg) |
'filename'
|
__iter__()
¶
Iterate over images, yielding (uuid, processed_image) tuples. Supports multi-worker data loading within each rank.
__len__()
¶
Return approximate length (actual length depends on worker assignment).
parse_image(img_path)
¶
Load and process a single image.
validate_PIL(file_path)
staticmethod
¶
Validates if the file can be opened by PIL. Returns True if valid, False otherwise.
validate_image_files(image_files, max_workers=16)
classmethod
¶
Validates a list of image files using PIL.
ParquetEmbeddingDataset
¶
Bases: IterableDataset
Loads pre-computed embeddings from Parquet files in a streaming fashion.
- Reads embedding vectors from Parquet files
- Supports distributed processing with rank-based file partitioning
- Supports multi-worker data loading within each rank
- Optimized for loading numerical data (embeddings) rather than images
Returns (uuid, embedding) tuples where: - uuid: Unique identifier from the UUID column in Parquet - embedding: Numpy array or tensor containing the embedding vector
__init__(parquet_files, col_uuid='uuid', col_embedding='embedding', rank=None, world_size=None, evenly_distribute=True, read_batch_size=1000, read_columns=None, stagger=False, return_tensor=True)
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parquet_files
|
List[Union[str, Path]]
|
List of paths to Parquet files containing embedding data. |
required |
col_uuid
|
str
|
Name of the UUID column in Parquet files. Defaults to "uuid". |
'uuid'
|
col_embedding
|
str
|
Name of the embedding column in Parquet files. Defaults to "embedding". |
'embedding'
|
rank
|
Optional[int]
|
Current process rank for distributed processing. |
None
|
world_size
|
Optional[int]
|
Total number of processes for distributed processing. |
None
|
evenly_distribute
|
bool
|
Whether to distribute files evenly based on size. Defaults to True. If False, files are distributed in a round-robin manner. |
True
|
read_batch_size
|
int
|
Number of rows to read from Parquet at a time. Defaults to 1000. |
1000
|
read_columns
|
Optional[List[str]]
|
List of column names to read from Parquet. If None, reads all columns. Typically includes ["uuid", "embedding"]. |
None
|
stagger
|
bool
|
Whether to stagger the start of each worker. Defaults to False. |
False
|
return_tensor
|
bool
|
If True, convert embeddings to PyTorch tensors. If False, keep as numpy arrays. |
True
|
__iter__()
¶
Iterate over Parquet files and yield embedding data. Supports multi-worker data loading within each rank.
process_parquet_file(file_path)
¶
Process a single Parquet file and yield embedding data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path
|
str
|
Path to the Parquet file to process. |
required |
Yields:
Type | Description |
---|---|
Tuple[str, Any]
|
Tuples of (uuid, embedding) for each row in the file. |
ParquetImageDataset
¶
Bases: IterableDataset
Loads images from Parquet files in a streaming fashion, with support for distributed processing. - Reads image data from Parquet files containing encoded image bytes - Supports distributed processing with rank-based file partitioning - Supports multi-worker data loading within each rank - Handles both single-model and multi-model preprocessing - Provides staggered worker starts and load balancing
Returns (uuid, processed_data) tuples where: - uuid: Unique identifier from the UUID column in Parquet - processed_data: Tensor (single model) or dict of tensors (multi-model)
__init__(parquet_files, col_uuid='uuid', rank=None, world_size=None, evenly_distribute=True, decode_fn=None, preprocess=None, read_batch_size=128, read_columns=None, stagger=False, processed_files_log=None)
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parquet_files
|
List[Union[str, Path]]
|
List of paths to Parquet files containing image data. |
required |
col_uuid
|
str
|
Name of the UUID column in Parquet files. Defaults to "uuid". |
'uuid'
|
rank
|
Optional[int]
|
Current process rank for distributed processing. |
None
|
world_size
|
Optional[int]
|
Total number of processes for distributed processing. |
None
|
evenly_distribute
|
bool
|
Whether to distribute files evenly based on size. Defaults to True. If False, files are distributed in a round-robin manner. |
True
|
decode_fn
|
Optional[Callable]
|
Function to decode image bytes to PIL Image. Required for image processing. |
None
|
preprocess
|
Optional[Union[Callable, Dict[str, Callable]]]
|
Transform(s) to apply to images. - If callable: single model preprocessing - If dict: {model_name: preprocess_fn} for multi-model - If None: return decoded image as-is |
None
|
read_batch_size
|
int
|
Number of rows to read from Parquet at a time. Defaults to 128. |
128
|
read_columns
|
Optional[List[str]]
|
List of column names to read from Parquet. If None, reads all columns. Typically includes ["uuid", "image", "original_size", "resized_size"]. |
None
|
stagger
|
bool
|
Whether to stagger the start of each worker. Defaults to False. |
False
|
processed_files_log
|
Optional[Union[str, Path]]
|
Path to log file for tracking processed files. Optional. |
None
|
__iter__()
¶
Iterate over Parquet files and yield processed data. Supports multi-worker data loading within each rank.
log_processed_file(file_path)
¶
Log a processed file to the log file if configured.
parse_batch_data(batch_df)
¶
Parse a batch of data from Parquet DataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_df
|
DataFrame
|
DataFrame containing batch data from Parquet file. |
required |
Yields:
Type | Description |
---|---|
Tuple[str, Any]
|
Tuples of (uuid, processed_data) for each row in the batch. |
process_parquet_file(file_path)
¶
Process a single Parquet file and yield processed data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path
|
str
|
Path to the Parquet file to process. |
required |
Yields:
Type | Description |
---|---|
Tuple[str, Any]
|
Tuples of (uuid, processed_data) for each valid row in the file. |
multi_model_collate(batch)
¶
Collate function for batches where each sample is (uuid, {model_name: tensor, ...}).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch
|
List[Tuple[str, Dict[str, Any]]]
|
List of (uuid, processed_data_dict) tuples from dataset. |
required |
Returns:
Type | Description |
---|---|
Tuple[List[str], Dict[str, Tensor]]
|
Tuple containing: - uuids: List of UUID strings - batch_dict: Dict mapping model names to batched tensors |
hpc_inference.inference
¶
Inference modules for various model types.
hpc_inference.utils
¶
Utility functions for HPC inference.
assign_files_to_rank(rank, world_size, files, evenly_distribute=True)
¶
Assign files to the current rank based on the world size.
This method ensures that each rank gets a unique set of files to process. The files can be distributed evenly based on their size (LPT algorithm) or simply by their order. This is useful for large datasets where some files may be significantly larger than others.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rank
|
int
|
Current process rank (0-indexed). |
required |
world_size
|
int
|
Total number of processes across all ranks. |
required |
files
|
List[Union[str, Path]]
|
List of file paths to distribute across ranks. |
required |
evenly_distribute
|
bool
|
Whether to distribute files evenly based on file size. Defaults to True. - If True: Uses Longest Processing Time (LPT) algorithm for load balancing - If False: Uses simple round-robin distribution |
True
|
Returns:
Type | Description |
---|---|
List[str]
|
List of file paths assigned to the given rank. |
Raises:
Type | Description |
---|---|
ValueError
|
If rank is negative or >= world_size. |
FileNotFoundError
|
If any file in the list doesn't exist when evenly_distribute=True. |
Examples:
assign_indices_to_rank(rank, world_size, total_items, evenly_distribute=True)
¶
Assign item indices to the current rank based on the world size.
This function calculates which range of indices each rank should process when working with datasets that can be split by index ranges.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rank
|
int
|
Current process rank (0-indexed). |
required |
world_size
|
int
|
Total number of processes across all ranks. |
required |
total_items
|
int
|
Total number of items to distribute across ranks. |
required |
evenly_distribute
|
bool
|
Whether to distribute items evenly. Defaults to True. - If True: Each rank gets approximately equal number of items - If False: Simple round-robin style distribution |
True
|
Returns:
Type | Description |
---|---|
int
|
Tuple of (start_idx, end_idx) representing the range of indices for this rank. |
int
|
The range is [start_idx, end_idx) (end_idx is exclusive). |
Raises:
Type | Description |
---|---|
ValueError
|
If rank is negative or >= world_size, or if total_items is negative. |
Examples:
decode_image(row_data, return_type='pil')
¶
Decode an image stored as raw bytes in a row dictionary into a NumPy array or PIL Image.
The function expects the row dictionary to contain an "image" key with raw image bytes, and either an "original_size" or "resized_size" key specifying the (height, width) of the image.
It assumes the image has RGB channels, and reshapes the byte buffer accordingly.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
row_data
|
Union[Dict[str, Any], Series]
|
Dictionary containing image data and metadata. - "image": Raw image bytes (bytes or bytearray). - "original_size" or "resized_size": Tuple (height, width) of the image. |
required |
return_type
|
Literal['numpy', 'pil']
|
Type of the returned image. Can be "numpy" for NumPy array or "pil" for PIL Image. |
'pil'
|
Returns:
Type | Description |
---|---|
Optional[Union[ndarray, Image]]
|
Decoded image as a NumPy array (RGB format) or PIL Image, or None if decoding fails. |
Raises:
Type | Description |
---|---|
None
|
Returns None on any decoding error, with warning logged. |
Notes
- Returns None if the image size does not match the expected dimensions.
- Logs a warning if decoding fails.
- Converts BGR to RGB channel order automatically.
Examples:
format_time(seconds)
¶
Format seconds into a human-readable time string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
seconds
|
float
|
Time duration in seconds. |
required |
Returns:
Type | Description |
---|---|
str
|
Formatted time string (e.g., "1h 30m 45.67s", "2m 15.34s", "12.45s"). |
Examples:
get_distributed_info()
¶
Get distributed training information from environment variables.
This function extracts rank and world size from common distributed training environment variables (SLURM, torchrun, etc.).
Returns:
Type | Description |
---|---|
int
|
Tuple of (rank, world_size) extracted from environment variables. |
int
|
If no distributed environment is detected, returns (0, 1). |
Examples:
load_config(config_path)
¶
Load YAML configuration file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config_path
|
Union[str, Path]
|
Path to YAML configuration file. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any]
|
Parsed configuration as a dictionary. |
Raises:
Type | Description |
---|---|
FileNotFoundError
|
If the configuration file doesn't exist. |
YAMLError
|
If the YAML file is malformed. |
Examples:
multi_model_collate(batch)
¶
Collate function for batches where each sample is (uuid, {model_name: tensor, ...}).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch
|
List[Tuple[str, Dict[str, Any]]]
|
List of (uuid, processed_data_dict) tuples from dataset. |
required |
Returns:
Type | Description |
---|---|
Tuple[List[str], Dict[str, Tensor]]
|
Tuple containing: - uuids: List of UUID strings - batch_dict: Dict mapping model names to batched tensors |
pil_image_collate(batch)
¶
Custom collate function for batches containing PIL Images.
This function is required when working with datasets that return PIL Images because PyTorch's default collate function only handles tensors, numpy arrays, numbers, dicts, and lists - not PIL Image objects.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch
|
List[Tuple[str, Any]]
|
List of (uuid, image) tuples where image is a PIL Image. Each tuple contains a UUID string and a PIL Image object. |
required |
Returns:
Type | Description |
---|---|
Tuple[List[str], List[Any]]
|
Tuple containing: - uuids: List of UUID strings from the batch - images: List of PIL Image objects from the batch |
Examples:
save_emb_to_parquet(uuids, embeddings, path, compression='zstd')
¶
Save embeddings (as a dict of column names to tensors/arrays) and uuids to a Parquet file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuids
|
List[str]
|
List of unique identifiers corresponding to each embedding. |
required |
embeddings
|
Dict[str, Union[Tensor, ndarray]]
|
Dictionary mapping column names to torch.Tensor or np.ndarray. Each tensor/array should have the same first dimension as len(uuids). |
required |
path
|
Union[str, Path]
|
Output Parquet file path. |
required |
compression
|
str
|
Compression type for Parquet file. Defaults to "zstd". Other options: "snappy", "gzip", "brotli", "lz4", "uncompressed". |
'zstd'
|
Raises:
Type | Description |
---|---|
ValueError
|
If embeddings have mismatched dimensions or empty inputs. |
IOError
|
If file cannot be written. |
Examples:
validate_distributed_setup(rank, world_size)
¶
Validate distributed training setup parameters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rank
|
int
|
Process rank to validate. |
required |
world_size
|
int
|
World size to validate. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if setup is valid, False otherwise. |
Raises:
Type | Description |
---|---|
ValueError
|
If parameters are invalid. |
Quick Reference¶
Core Classes¶
ImageFolderDataset
- Process images from directory structuresParquetImageDataset
- Handle compressed image data in Parquet format
Utility Functions¶
- GPU utilization monitoring functions
- Performance profiling utilities
- SLURM integration helpers
Configuration¶
- Model configuration templates
- Preprocessing pipelines
- Batch processing optimization