Skip to content

API Reference

This page provides comprehensive API documentation for the HPC-Inference package.

hpc_inference.datasets

High-performance dataset loaders for image data.

ImageFolderDataset

Bases: IterableDataset

Loads images from a folder in a streaming fashion, with support for distributed processing. - Handles common image extensions - Loads images as RGB by default (can be changed via color_mode) - Validates images using PIL if validate=True - Supports distributed processing with rank-based file partitioning - Supports multi-worker data loading within each rank

Returns (uuid, processed_data) tuples where: - uuid: filename (without path) as identifier - processed_data: tensor (single model) or dict of tensors (multi-model)

__init__(image_dir, preprocess=None, color_mode='RGB', validate=False, rank=None, world_size=None, evenly_distribute=True, stagger=False, uuid_mode='filename')

Parameters:

Name Type Description Default
image_dir Union[str, Path]

Path to image folder.

required
preprocess Optional[Union[Callable, Dict[str, Callable]]]

Transform(s) to apply to images. - If callable: single model preprocessing - If dict: {model_name: preprocess_fn} for multi-model - If None: return PIL image as-is

None
color_mode str

Color mode for PIL.Image.convert (default: "RGB").

'RGB'
validate bool

If True, validates images in the directory using PIL.

False
rank Optional[int]

Current process rank for distributed processing.

None
world_size Optional[int]

Total number of processes for distributed processing.

None
evenly_distribute bool

Whether to distribute files evenly based on size. Defaults to True. If False, files are distributed in a round-robin manner.

True
stagger bool

Whether to stagger the start of each worker. Defaults to False.

False
uuid_mode Literal['filename', 'relative', 'fullpath', 'hash']

How to generate UUIDs from image paths. - "filename": Use just the filename (image001.jpg) - "relative": Use relative path from image_dir (subfolder/image001.jpg)
- "fullpath": Use full absolute path - "hash": Use hash of the full path

'filename'

__iter__()

Iterate over images, yielding (uuid, processed_image) tuples. Supports multi-worker data loading within each rank.

__len__()

Return approximate length (actual length depends on worker assignment).

parse_image(img_path)

Load and process a single image.

validate_PIL(file_path) staticmethod

Validates if the file can be opened by PIL. Returns True if valid, False otherwise.

validate_image_files(image_files, max_workers=16) classmethod

Validates a list of image files using PIL.

ParquetEmbeddingDataset

Bases: IterableDataset

Loads pre-computed embeddings from Parquet files in a streaming fashion. - Reads embedding vectors from Parquet files - Supports distributed processing with rank-based file partitioning
- Supports multi-worker data loading within each rank - Optimized for loading numerical data (embeddings) rather than images

Returns (uuid, embedding) tuples where: - uuid: Unique identifier from the UUID column in Parquet - embedding: Numpy array or tensor containing the embedding vector

__init__(parquet_files, col_uuid='uuid', col_embedding='embedding', rank=None, world_size=None, evenly_distribute=True, read_batch_size=1000, read_columns=None, stagger=False, return_tensor=True)

Parameters:

Name Type Description Default
parquet_files List[Union[str, Path]]

List of paths to Parquet files containing embedding data.

required
col_uuid str

Name of the UUID column in Parquet files. Defaults to "uuid".

'uuid'
col_embedding str

Name of the embedding column in Parquet files. Defaults to "embedding".

'embedding'
rank Optional[int]

Current process rank for distributed processing.

None
world_size Optional[int]

Total number of processes for distributed processing.

None
evenly_distribute bool

Whether to distribute files evenly based on size. Defaults to True. If False, files are distributed in a round-robin manner.

True
read_batch_size int

Number of rows to read from Parquet at a time. Defaults to 1000.

1000
read_columns Optional[List[str]]

List of column names to read from Parquet. If None, reads all columns. Typically includes ["uuid", "embedding"].

None
stagger bool

Whether to stagger the start of each worker. Defaults to False.

False
return_tensor bool

If True, convert embeddings to PyTorch tensors. If False, keep as numpy arrays.

True

__iter__()

Iterate over Parquet files and yield embedding data. Supports multi-worker data loading within each rank.

process_parquet_file(file_path)

Process a single Parquet file and yield embedding data.

Parameters:

Name Type Description Default
file_path str

Path to the Parquet file to process.

required

Yields:

Type Description
Tuple[str, Any]

Tuples of (uuid, embedding) for each row in the file.

ParquetImageDataset

Bases: IterableDataset

Loads images from Parquet files in a streaming fashion, with support for distributed processing. - Reads image data from Parquet files containing encoded image bytes - Supports distributed processing with rank-based file partitioning - Supports multi-worker data loading within each rank - Handles both single-model and multi-model preprocessing - Provides staggered worker starts and load balancing

Returns (uuid, processed_data) tuples where: - uuid: Unique identifier from the UUID column in Parquet - processed_data: Tensor (single model) or dict of tensors (multi-model)

__init__(parquet_files, col_uuid='uuid', rank=None, world_size=None, evenly_distribute=True, decode_fn=None, preprocess=None, read_batch_size=128, read_columns=None, stagger=False, processed_files_log=None)

Parameters:

Name Type Description Default
parquet_files List[Union[str, Path]]

List of paths to Parquet files containing image data.

required
col_uuid str

Name of the UUID column in Parquet files. Defaults to "uuid".

'uuid'
rank Optional[int]

Current process rank for distributed processing.

None
world_size Optional[int]

Total number of processes for distributed processing.

None
evenly_distribute bool

Whether to distribute files evenly based on size. Defaults to True. If False, files are distributed in a round-robin manner.

True
decode_fn Optional[Callable]

Function to decode image bytes to PIL Image. Required for image processing.

None
preprocess Optional[Union[Callable, Dict[str, Callable]]]

Transform(s) to apply to images. - If callable: single model preprocessing - If dict: {model_name: preprocess_fn} for multi-model - If None: return decoded image as-is

None
read_batch_size int

Number of rows to read from Parquet at a time. Defaults to 128.

128
read_columns Optional[List[str]]

List of column names to read from Parquet. If None, reads all columns. Typically includes ["uuid", "image", "original_size", "resized_size"].

None
stagger bool

Whether to stagger the start of each worker. Defaults to False.

False
processed_files_log Optional[Union[str, Path]]

Path to log file for tracking processed files. Optional.

None

__iter__()

Iterate over Parquet files and yield processed data. Supports multi-worker data loading within each rank.

log_processed_file(file_path)

Log a processed file to the log file if configured.

parse_batch_data(batch_df)

Parse a batch of data from Parquet DataFrame.

Parameters:

Name Type Description Default
batch_df DataFrame

DataFrame containing batch data from Parquet file.

required

Yields:

Type Description
Tuple[str, Any]

Tuples of (uuid, processed_data) for each row in the batch.

process_parquet_file(file_path)

Process a single Parquet file and yield processed data.

Parameters:

Name Type Description Default
file_path str

Path to the Parquet file to process.

required

Yields:

Type Description
Tuple[str, Any]

Tuples of (uuid, processed_data) for each valid row in the file.

multi_model_collate(batch)

Collate function for batches where each sample is (uuid, {model_name: tensor, ...}).

Parameters:

Name Type Description Default
batch List[Tuple[str, Dict[str, Any]]]

List of (uuid, processed_data_dict) tuples from dataset.

required

Returns:

Type Description
Tuple[List[str], Dict[str, Tensor]]

Tuple containing: - uuids: List of UUID strings - batch_dict: Dict mapping model names to batched tensors

hpc_inference.inference

Inference modules for various model types.

hpc_inference.utils

Utility functions for HPC inference.

assign_files_to_rank(rank, world_size, files, evenly_distribute=True)

Assign files to the current rank based on the world size.

This method ensures that each rank gets a unique set of files to process. The files can be distributed evenly based on their size (LPT algorithm) or simply by their order. This is useful for large datasets where some files may be significantly larger than others.

Parameters:

Name Type Description Default
rank int

Current process rank (0-indexed).

required
world_size int

Total number of processes across all ranks.

required
files List[Union[str, Path]]

List of file paths to distribute across ranks.

required
evenly_distribute bool

Whether to distribute files evenly based on file size. Defaults to True. - If True: Uses Longest Processing Time (LPT) algorithm for load balancing - If False: Uses simple round-robin distribution

True

Returns:

Type Description
List[str]

List of file paths assigned to the given rank.

Raises:

Type Description
ValueError

If rank is negative or >= world_size.

FileNotFoundError

If any file in the list doesn't exist when evenly_distribute=True.

Examples:

>>> files = ["/path/file1.parquet", "/path/file2.parquet", "/path/file3.parquet"]
>>> assign_files_to_rank(0, 2, files, evenly_distribute=False)
["/path/file1.parquet", "/path/file3.parquet"]
>>> assign_files_to_rank(1, 2, files, evenly_distribute=False) 
["/path/file2.parquet"]

assign_indices_to_rank(rank, world_size, total_items, evenly_distribute=True)

Assign item indices to the current rank based on the world size.

This function calculates which range of indices each rank should process when working with datasets that can be split by index ranges.

Parameters:

Name Type Description Default
rank int

Current process rank (0-indexed).

required
world_size int

Total number of processes across all ranks.

required
total_items int

Total number of items to distribute across ranks.

required
evenly_distribute bool

Whether to distribute items evenly. Defaults to True. - If True: Each rank gets approximately equal number of items - If False: Simple round-robin style distribution

True

Returns:

Type Description
int

Tuple of (start_idx, end_idx) representing the range of indices for this rank.

int

The range is [start_idx, end_idx) (end_idx is exclusive).

Raises:

Type Description
ValueError

If rank is negative or >= world_size, or if total_items is negative.

Examples:

>>> assign_indices_to_rank(0, 3, 10, evenly_distribute=True)
(0, 4)  # Rank 0 gets indices 0,1,2,3
>>> assign_indices_to_rank(1, 3, 10, evenly_distribute=True) 
(4, 7)  # Rank 1 gets indices 4,5,6
>>> assign_indices_to_rank(2, 3, 10, evenly_distribute=True)
(7, 10) # Rank 2 gets indices 7,8,9

decode_image(row_data, return_type='pil')

Decode an image stored as raw bytes in a row dictionary into a NumPy array or PIL Image.

The function expects the row dictionary to contain an "image" key with raw image bytes, and either an "original_size" or "resized_size" key specifying the (height, width) of the image.

It assumes the image has RGB channels, and reshapes the byte buffer accordingly.

Parameters:

Name Type Description Default
row_data Union[Dict[str, Any], Series]

Dictionary containing image data and metadata. - "image": Raw image bytes (bytes or bytearray). - "original_size" or "resized_size": Tuple (height, width) of the image.

required
return_type Literal['numpy', 'pil']

Type of the returned image. Can be "numpy" for NumPy array or "pil" for PIL Image.

'pil'

Returns:

Type Description
Optional[Union[ndarray, Image]]

Decoded image as a NumPy array (RGB format) or PIL Image, or None if decoding fails.

Raises:

Type Description
None

Returns None on any decoding error, with warning logged.

Notes
  • Returns None if the image size does not match the expected dimensions.
  • Logs a warning if decoding fails.
  • Converts BGR to RGB channel order automatically.

Examples:

>>> row = {"image": image_bytes, "original_size": (224, 224)}
>>> img = decode_image(row, return_type="pil")
>>> isinstance(img, Image.Image)
True
>>> img_array = decode_image(row, return_type="numpy")
>>> img_array.shape
(224, 224, 3)

format_time(seconds)

Format seconds into a human-readable time string.

Parameters:

Name Type Description Default
seconds float

Time duration in seconds.

required

Returns:

Type Description
str

Formatted time string (e.g., "1h 30m 45.67s", "2m 15.34s", "12.45s").

Examples:

>>> format_time(3661.5)
'1h 1m 1.50s'
>>> format_time(125.67)
'2m 5.67s'
>>> format_time(45.23)
'45.23s'

get_distributed_info()

Get distributed training information from environment variables.

This function extracts rank and world size from common distributed training environment variables (SLURM, torchrun, etc.).

Returns:

Type Description
int

Tuple of (rank, world_size) extracted from environment variables.

int

If no distributed environment is detected, returns (0, 1).

Examples:

>>> # In SLURM environment
>>> get_distributed_info()
(2, 8)  # rank=2, world_size=8
>>> # In single process environment  
>>> get_distributed_info()
(0, 1)  # rank=0, world_size=1

load_config(config_path)

Load YAML configuration file.

Parameters:

Name Type Description Default
config_path Union[str, Path]

Path to YAML configuration file.

required

Returns:

Type Description
Dict[str, Any]

Parsed configuration as a dictionary.

Raises:

Type Description
FileNotFoundError

If the configuration file doesn't exist.

YAMLError

If the YAML file is malformed.

Examples:

>>> config = load_config("config.yaml")
>>> print(config["batch_size"])
32

multi_model_collate(batch)

Collate function for batches where each sample is (uuid, {model_name: tensor, ...}).

Parameters:

Name Type Description Default
batch List[Tuple[str, Dict[str, Any]]]

List of (uuid, processed_data_dict) tuples from dataset.

required

Returns:

Type Description
Tuple[List[str], Dict[str, Tensor]]

Tuple containing: - uuids: List of UUID strings - batch_dict: Dict mapping model names to batched tensors

pil_image_collate(batch)

Custom collate function for batches containing PIL Images.

This function is required when working with datasets that return PIL Images because PyTorch's default collate function only handles tensors, numpy arrays, numbers, dicts, and lists - not PIL Image objects.

Parameters:

Name Type Description Default
batch List[Tuple[str, Any]]

List of (uuid, image) tuples where image is a PIL Image. Each tuple contains a UUID string and a PIL Image object.

required

Returns:

Type Description
Tuple[List[str], List[Any]]

Tuple containing: - uuids: List of UUID strings from the batch - images: List of PIL Image objects from the batch

Examples:

>>> from PIL import Image
>>> batch = [("img1.jpg", Image.new("RGB", (100, 100))), 
...           ("img2.jpg", Image.new("RGB", (200, 200)))]
>>> uuids, images = pil_image_collate(batch)
>>> print(uuids)
['img1.jpg', 'img2.jpg']
>>> print([img.size for img in images])
[(100, 100), (200, 200)]

save_emb_to_parquet(uuids, embeddings, path, compression='zstd')

Save embeddings (as a dict of column names to tensors/arrays) and uuids to a Parquet file.

Parameters:

Name Type Description Default
uuids List[str]

List of unique identifiers corresponding to each embedding.

required
embeddings Dict[str, Union[Tensor, ndarray]]

Dictionary mapping column names to torch.Tensor or np.ndarray. Each tensor/array should have the same first dimension as len(uuids).

required
path Union[str, Path]

Output Parquet file path.

required
compression str

Compression type for Parquet file. Defaults to "zstd". Other options: "snappy", "gzip", "brotli", "lz4", "uncompressed".

'zstd'

Raises:

Type Description
ValueError

If embeddings have mismatched dimensions or empty inputs.

IOError

If file cannot be written.

Examples:

>>> uuids = ["img_001", "img_002", "img_003"]
>>> embeddings = {
...     "clip_emb": torch.randn(3, 512),
...     "resnet_emb": torch.randn(3, 2048)
... }
>>> save_emb_to_parquet(uuids, embeddings, "embeddings.parquet")

validate_distributed_setup(rank, world_size)

Validate distributed training setup parameters.

Parameters:

Name Type Description Default
rank int

Process rank to validate.

required
world_size int

World size to validate.

required

Returns:

Type Description
bool

True if setup is valid, False otherwise.

Raises:

Type Description
ValueError

If parameters are invalid.

Quick Reference

Core Classes

  • ImageFolderDataset - Process images from directory structures
  • ParquetImageDataset - Handle compressed image data in Parquet format

Utility Functions

  • GPU utilization monitoring functions
  • Performance profiling utilities
  • SLURM integration helpers

Configuration

  • Model configuration templates
  • Preprocessing pipelines
  • Batch processing optimization