Skip to content

fusion_bench.taskpool

Base Class

BaseTaskPool

Bases: BaseYAMLSerializable

Abstract base class for task pools in the FusionBench framework.

A task pool represents a collection of evaluation tasks that can be used to assess model performance across multiple benchmarks or datasets. This base class defines the common interface that all task pool implementations must follow, ensuring consistency across different task types and evaluation scenarios.

Task pools are designed to be configurable through YAML files and can be used in various model fusion and evaluation workflows. They provide a standardized way to evaluate models on multiple tasks and aggregate results.

The class inherits from BaseYAMLSerializable to support configuration management and serialization capabilities.

Attributes:

  • _program

    Optional program reference for execution context.

  • _config_key

    Configuration key used for YAML configuration ("taskpool").

Abstract Methods

evaluate: Must be implemented by subclasses to define task-specific evaluation logic.

Example

Implementing a custom task pool:

class MyTaskPool(BaseTaskPool):


    def evaluate(self, model, **kwargs):
        results = {}
        for task_name in self.tasks:
            # Implement task-specific evaluation
            results[task_name] = self._evaluate_task(model, task_name)
        return results
Source code in fusion_bench/taskpool/base_pool.py
class BaseTaskPool(BaseYAMLSerializable):
    """Abstract base class for task pools in the FusionBench framework.

    A task pool represents a collection of evaluation tasks that can be used to
    assess model performance across multiple benchmarks or datasets. This base
    class defines the common interface that all task pool implementations must
    follow, ensuring consistency across different task types and evaluation
    scenarios.

    Task pools are designed to be configurable through YAML files and can be
    used in various model fusion and evaluation workflows. They provide a
    standardized way to evaluate models on multiple tasks and aggregate results.

    The class inherits from BaseYAMLSerializable to support configuration
    management and serialization capabilities.

    Attributes:
        _program: Optional program reference for execution context.
        _config_key: Configuration key used for YAML configuration ("taskpool").

    Abstract Methods:
        evaluate: Must be implemented by subclasses to define task-specific
            evaluation logic.

    Example:
        Implementing a custom task pool:

        ```python
        class MyTaskPool(BaseTaskPool):


            def evaluate(self, model, **kwargs):
                results = {}
                for task_name in self.tasks:
                    # Implement task-specific evaluation
                    results[task_name] = self._evaluate_task(model, task_name)
                return results
        ```
    """

    _program = None
    _config_key = "taskpool"

    @abstractmethod
    def evaluate(self, model: Any, *args: Any, **kwargs: Any) -> Dict[str, Any]:
        """Evaluate a model on all tasks in the task pool and return aggregated results.

        This abstract method defines the core evaluation interface that all task pool
        implementations must provide. It should evaluate the given model on all tasks
        managed by the pool and return a structured report of the results.

        The evaluation process typically involves:
        1. Iterating through all tasks in the pool
        2. Running model inference on each task's dataset
        3. Computing task-specific metrics
        4. Aggregating results into a standardized report format

        Args:
            model: The model to evaluate. Can be any model type (PyTorch model,
                Hugging Face model, etc.) that is compatible with the specific
                task pool implementation.
            *args: Additional positional arguments that may be needed for
                task-specific evaluation procedures.
            **kwargs: Additional keyword arguments for evaluation configuration,
                such as batch_size, device, evaluation metrics, etc.

        Returns:
            Dict[str, Any]: A dictionary containing evaluation results for each task.
                The structure follows the pattern:

                ```python
                {
                    "task_name_1": {
                        "metric_1": value,
                        "metric_2": value,
                        ...
                    },
                    "task_name_2": {
                        "metric_1": value,
                        "metric_2": value,
                        ...
                    },
                    ...
                }
                ```

        Example:
            For an image classification task pool:

            ```python
            results = task_pool.evaluate(model)
            # Returns:
            # {
            #     "mnist": {
            #         "accuracy": 0.95,
            #         "loss": 0.15,
            #     },
            #     "cifar10": {
            #         "accuracy": 0.87,
            #         "loss": 0.42,
            #     }
            # }
            ```

        Raises:
            NotImplementedError: This method must be implemented by subclasses.

        Note:
            Implementations should ensure that the returned dictionary structure
            is consistent and that metric names are standardized across similar
            task types to enable meaningful comparison and aggregation.
        """
        pass

evaluate(model, *args, **kwargs) abstractmethod

Evaluate a model on all tasks in the task pool and return aggregated results.

This abstract method defines the core evaluation interface that all task pool implementations must provide. It should evaluate the given model on all tasks managed by the pool and return a structured report of the results.

The evaluation process typically involves: 1. Iterating through all tasks in the pool 2. Running model inference on each task's dataset 3. Computing task-specific metrics 4. Aggregating results into a standardized report format

Parameters:

  • model (Any) –

    The model to evaluate. Can be any model type (PyTorch model, Hugging Face model, etc.) that is compatible with the specific task pool implementation.

  • *args (Any, default: () ) –

    Additional positional arguments that may be needed for task-specific evaluation procedures.

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments for evaluation configuration, such as batch_size, device, evaluation metrics, etc.

Returns:

  • Dict[str, Any]

    Dict[str, Any]: A dictionary containing evaluation results for each task. The structure follows the pattern:

    {
        "task_name_1": {
            "metric_1": value,
            "metric_2": value,
            ...
        },
        "task_name_2": {
            "metric_1": value,
            "metric_2": value,
            ...
        },
        ...
    }
    
Example

For an image classification task pool:

results = task_pool.evaluate(model)
# Returns:
# {
#     "mnist": {
#         "accuracy": 0.95,
#         "loss": 0.15,
#     },
#     "cifar10": {
#         "accuracy": 0.87,
#         "loss": 0.42,
#     }
# }

Raises:

  • NotImplementedError

    This method must be implemented by subclasses.

Note

Implementations should ensure that the returned dictionary structure is consistent and that metric names are standardized across similar task types to enable meaningful comparison and aggregation.

Source code in fusion_bench/taskpool/base_pool.py
@abstractmethod
def evaluate(self, model: Any, *args: Any, **kwargs: Any) -> Dict[str, Any]:
    """Evaluate a model on all tasks in the task pool and return aggregated results.

    This abstract method defines the core evaluation interface that all task pool
    implementations must provide. It should evaluate the given model on all tasks
    managed by the pool and return a structured report of the results.

    The evaluation process typically involves:
    1. Iterating through all tasks in the pool
    2. Running model inference on each task's dataset
    3. Computing task-specific metrics
    4. Aggregating results into a standardized report format

    Args:
        model: The model to evaluate. Can be any model type (PyTorch model,
            Hugging Face model, etc.) that is compatible with the specific
            task pool implementation.
        *args: Additional positional arguments that may be needed for
            task-specific evaluation procedures.
        **kwargs: Additional keyword arguments for evaluation configuration,
            such as batch_size, device, evaluation metrics, etc.

    Returns:
        Dict[str, Any]: A dictionary containing evaluation results for each task.
            The structure follows the pattern:

            ```python
            {
                "task_name_1": {
                    "metric_1": value,
                    "metric_2": value,
                    ...
                },
                "task_name_2": {
                    "metric_1": value,
                    "metric_2": value,
                    ...
                },
                ...
            }
            ```

    Example:
        For an image classification task pool:

        ```python
        results = task_pool.evaluate(model)
        # Returns:
        # {
        #     "mnist": {
        #         "accuracy": 0.95,
        #         "loss": 0.15,
        #     },
        #     "cifar10": {
        #         "accuracy": 0.87,
        #         "loss": 0.42,
        #     }
        # }
        ```

    Raises:
        NotImplementedError: This method must be implemented by subclasses.

    Note:
        Implementations should ensure that the returned dictionary structure
        is consistent and that metric names are standardized across similar
        task types to enable meaningful comparison and aggregation.
    """
    pass

Vision Task Pool

NYUv2 Tasks

NYUv2TaskPool

Bases: TaskPool

Task pool for multi-task learning evaluation on the NYUv2 dataset.

This task pool provides evaluation capabilities for multi-task learning models on the NYU Depth V2 (NYUv2) dataset, which is a popular benchmark for indoor scene understanding. The dataset supports multiple computer vision tasks including semantic segmentation, depth estimation, and surface normal prediction.

The task pool is designed to work with encoder-decoder architectures where a shared encoder processes input images and task-specific decoders generate predictions for different tasks. It integrates with PyTorch Lightning for streamlined training and evaluation workflows.

Supported Tasks
  • Semantic segmentation
  • Depth estimation
  • Surface normal prediction
Source code in fusion_bench/taskpool/nyuv2_taskpool.py
class NYUv2TaskPool(TaskPool):
    """Task pool for multi-task learning evaluation on the NYUv2 dataset.

    This task pool provides evaluation capabilities for multi-task learning models
    on the NYU Depth V2 (NYUv2) dataset, which is a popular benchmark for indoor
    scene understanding. The dataset supports multiple computer vision tasks
    including semantic segmentation, depth estimation, and surface normal prediction.

    The task pool is designed to work with encoder-decoder architectures where
    a shared encoder processes input images and task-specific decoders generate
    predictions for different tasks. It integrates with PyTorch Lightning for
    streamlined training and evaluation workflows.

    Supported Tasks:
        - Semantic segmentation
        - Depth estimation
        - Surface normal prediction
    """

    _trainer: L.Trainer = None

    def __init__(self, taskpool_config: DictConfig):
        """Initialize the NYUv2 task pool with configuration settings.

        Args:
            taskpool_config: Configuration object containing all necessary
                parameters for the task pool, including:
                - data_dir: Path to the directory containing NYUv2 dataset
                - tasks: List of tasks to evaluate (e.g., ["semantic", "depth"])
                - batch_size: Batch size for evaluation data loader
                - num_workers: Number of worker processes for data loading
        """
        self.config = taskpool_config

    def load_datasets(self):
        log.info("Loading NYUv2 dataset")
        data_path = str(Path(self.config.data_dir) / "nyuv2")

        train_dataset = NYUv2(root=data_path, train=True)
        val_dataset = NYUv2(root=data_path, train=False)
        return train_dataset, val_dataset

    @property
    def trainer(self):
        if self._trainer is None:
            self._trainer = L.Trainer(devices=1)
        return self._trainer

    def get_decoders(self):
        from fusion_bench.modelpool.nyuv2_modelpool import NYUv2ModelPool

        modelpool: NYUv2ModelPool = self._program.modelpool
        decoders = nn.ModuleDict()
        for task in self.config.tasks:
            decoders[task] = modelpool.load_model(task, encoder_only=False).decoders[
                task
            ]
        return decoders

    def evaluate(self, encoder: ResnetDilated):
        model = NYUv2MTLModule(
            encoder,
            self.get_decoders(),
            tasks=self.config.tasks,
            task_weights=[1] * len(self.config.tasks),
        )
        _, val_dataset = self.load_datasets()
        val_loader = DataLoader(
            val_dataset,
            batch_size=self.config.batch_size,
            shuffle=False,
            num_workers=self.config.num_workers,
        )
        report = self.trainer.validate(model, val_loader)
        if isinstance(report, list) and len(report) == 1:
            report = report[0]
        return report
__init__(taskpool_config)

Initialize the NYUv2 task pool with configuration settings.

Parameters:

  • taskpool_config (DictConfig) –

    Configuration object containing all necessary parameters for the task pool, including: - data_dir: Path to the directory containing NYUv2 dataset - tasks: List of tasks to evaluate (e.g., ["semantic", "depth"]) - batch_size: Batch size for evaluation data loader - num_workers: Number of worker processes for data loading

Source code in fusion_bench/taskpool/nyuv2_taskpool.py
def __init__(self, taskpool_config: DictConfig):
    """Initialize the NYUv2 task pool with configuration settings.

    Args:
        taskpool_config: Configuration object containing all necessary
            parameters for the task pool, including:
            - data_dir: Path to the directory containing NYUv2 dataset
            - tasks: List of tasks to evaluate (e.g., ["semantic", "depth"])
            - batch_size: Batch size for evaluation data loader
            - num_workers: Number of worker processes for data loading
    """
    self.config = taskpool_config

CLIP Task Pool

CLIPVisionModelTaskPool

Bases: HydraConfigMixin, LightningFabricMixin, BaseTaskPool

This class is used to define the image classification task for CLIP models.

Attributes:

  • test_datasets (Union[DictConfig, Dict[str, Dataset]]) –

    The test datasets to evaluate the model on.

  • processor (Union[DictConfig, CLIPProcessor]) –

    The processor used for preprocessing the input data.

  • data_processor (Union[DictConfig, CLIPProcessor]) –

    The data processor used for processing the input data.

  • clip_model (Union[DictConfig, CLIPModel]) –

    The CLIP model used for evaluation.

  • dataloader_kwargs (DictConfig) –

    Keyword arguments for the data loader.

  • layer_wise_feature_save_path (Optional[str]) –

    Path to save the layer-wise features.

  • layer_wise_feature_first_token_only (bool) –

    Boolean indicating whether to save only the first token of the features.

  • layer_wise_feature_max_num (Optional[int]) –

    Maximum number of features to save.

  • fast_dev_run (bool) –

    Boolean indicating whether to run in fast development mode.

Source code in fusion_bench/taskpool/clip_vision/taskpool.py
@auto_register_config
class CLIPVisionModelTaskPool(
    HydraConfigMixin,
    LightningFabricMixin,
    BaseTaskPool,
):
    """
    This class is used to define the image classification task for CLIP models.

    Attributes:
        test_datasets (Union[DictConfig, Dict[str, Dataset]]): The test datasets to evaluate the model on.
        processor (Union[DictConfig, CLIPProcessor]): The processor used for preprocessing the input data.
        data_processor (Union[DictConfig, CLIPProcessor]): The data processor used for processing the input data.
        clip_model (Union[DictConfig, CLIPModel]): The CLIP model used for evaluation.
        dataloader_kwargs (DictConfig): Keyword arguments for the data loader.
        layer_wise_feature_save_path (Optional[str]): Path to save the layer-wise features.
        layer_wise_feature_first_token_only (bool): Boolean indicating whether to save only the first token of the features.
        layer_wise_feature_max_num (Optional[int]): Maximum number of features to save.
        fast_dev_run (bool): Boolean indicating whether to run in fast development mode.
    """

    _is_setup = False

    # hooks and handles for saving layer-wise features
    _layer_wise_feature_save_hooks: Dict[int, LayerWiseFeatureSaver] = {}
    _layer_wise_feature_save_hook_handles: Dict[int, RemovableHandle] = {}

    _config_mapping = BaseTaskPool._config_mapping | {
        "_test_datasets": "test_datasets",
        "_processor": "processor",
        "_data_processor": "data_processor",
        "_clip_model": "clip_model",
        "_dataloader_kwargs": "dataloader_kwargs",
        "_layer_wise_feature_save_path": "layer_wise_feature_save_path",
        "fast_dev_run": "fast_dev_run",
    }

    def __init__(
        self,
        test_datasets: Union[DictConfig, Dict[str, Dataset]],
        *,
        processor: Union[str, DictConfig, CLIPProcessor],
        clip_model: Union[str, DictConfig, CLIPModel],
        data_processor: Union[DictConfig, CLIPProcessor] = None,
        dataloader_kwargs: DictConfig = None,
        layer_wise_feature_save_path: Optional[str] = None,
        layer_wise_feature_first_token_only: bool = True,
        layer_wise_feature_max_num: Optional[int] = None,
        fast_dev_run: Optional[bool] = None,
        move_to_device: bool = True,
        **kwargs,
    ):
        """
        Initialize the CLIPVisionModelTaskPool.
        """
        super().__init__(**kwargs)
        self._test_datasets = test_datasets
        self._processor = processor
        self._data_processor = data_processor
        self._clip_model = clip_model
        self._dataloader_kwargs = dataloader_kwargs or {}

        # layer-wise feature saving
        self._layer_wise_feature_save_path = layer_wise_feature_save_path
        self.layer_wise_feature_save_path = (
            Path(layer_wise_feature_save_path)
            if layer_wise_feature_save_path is not None
            else None
        )
        self.layer_wise_feature_first_token_only = layer_wise_feature_first_token_only
        self.layer_wise_feature_max_num = layer_wise_feature_max_num

        if self.fast_dev_run is None:
            self.fast_dev_run = RuntimeConstants().debug
        else:
            self.fast_dev_run = fast_dev_run

    def setup(self):
        """
        Set up the processor, data processor, CLIP model, test datasets, and data loaders.
        """
        # setup processor and clip model
        if isinstance(self._processor, str):
            self.processor = CLIPProcessor.from_pretrained(self._processor)
        elif (
            isinstance(self._processor, (dict, DictConfig))
            and "_target_" in self._processor
        ):
            self.processor = instantiate(self._processor)
        else:
            self.processor = self._processor

        if self._data_processor is None:
            self.data_processor = self.processor
        else:
            self.data_processor = (
                instantiate(self._data_processor)
                if isinstance(self._data_processor, DictConfig)
                else self._data_processor
            )

        if isinstance(self._clip_model, str):
            self.clip_model = CLIPModel.from_pretrained(self._clip_model)
        elif (
            isinstance(self._clip_model, (dict, DictConfig))
            and "_target_" in self._clip_model
        ):
            self.clip_model = instantiate(self._clip_model)
        else:
            self.clip_model = self._clip_model

        self.clip_model = self.fabric.to_device(self.clip_model)
        self.clip_model.requires_grad_(False)
        self.clip_model.eval()

        # Load the test datasets
        self.test_datasets = {
            name: instantiate(dataset) if isinstance(dataset, DictConfig) else dataset
            for name, dataset in self._test_datasets.items()
        }
        self.test_datasets = {
            name: CLIPDataset(dataset, self.data_processor)
            for name, dataset in self.test_datasets.items()
        }
        # Setup the dataloaders
        self.test_dataloaders = {
            name: DataLoader(
                dataset,
                **self._dataloader_kwargs,
                collate_fn=(
                    raw_image_collate_fn if self.data_processor is None else None
                ),
            )
            for name, dataset in self.test_datasets.items()
        }
        self.test_dataloaders = {
            name: self.fabric.setup_dataloaders(
                dataloader, move_to_device=self.move_to_device
            )
            for name, dataloader in self.test_dataloaders.items()
        }

        self._is_setup = True

    @torch.no_grad()
    def _evaluate(
        self,
        classifier: HFCLIPClassifier,
        test_loader: DataLoader,
        num_classes: int,
        task_name: str = None,
    ):
        """
        Evaluate the classifier on the test dataset (single-task evaluation).

        Args:
            classifier (HFCLIPClassifier): The classifier to evaluate.
            test_loader (DataLoader): The data loader for the test dataset.
            num_classes (int): The number of classes in the classification task.
            task_name (str): The name of the task.

        Returns:
            Dict[str, float]: A dictionary containing the accuracy and loss of the classifier on the test dataset.
        """
        accuracy: MulticlassAccuracy = Accuracy(
            task="multiclass", num_classes=num_classes
        )
        classifier.eval()
        loss_metric = MeanMetric()
        # if fast_dev_run is set, we only evaluate on a batch of the data
        if self.fast_dev_run:
            log.info("Running under fast_dev_run mode, evaluating on a single batch.")
            test_loader = itertools.islice(test_loader, 1)
        else:
            test_loader = test_loader

        pbar = tqdm(
            test_loader,
            desc=f"Evaluating {task_name}",
            leave=False,
            dynamic_ncols=True,
        )
        for batch in pbar:
            inputs, targets = batch
            outputs = classifier(
                inputs,
                return_image_embeds=True,
                return_dict=True,
                task_name=task_name,
            )
            logits: Tensor = outputs["logits"]
            if logits.device != targets.device:
                targets = targets.to(logits.device)

            loss = F.cross_entropy(logits, targets)
            loss_metric.update(loss.detach().cpu())
            acc = accuracy(logits.detach().cpu(), targets.detach().cpu())
            pbar.set_postfix(
                {
                    "accuracy": accuracy.compute().item(),
                    "loss": loss_metric.compute().item(),
                }
            )

        acc = accuracy.compute().item()
        loss = loss_metric.compute().item()
        results = {"accuracy": acc, "loss": loss}
        return results

    def evaluate(
        self,
        model: Union[CLIPVisionModel, CLIPVisionTransformer],
        name=None,
        **kwargs,
    ):
        """
        Evaluate the model on the image classification task.

        Args:
            model (Union[CLIPVisionModel, CLIPVisionTransformer]): The model to evaluate.
            name (Optional[str]): The name of the model. This will be logged into the report if not None.

        Returns:
            Dict[str, Any]: A dictionary containing the evaluation results for each task.
        """
        if not self._is_setup:
            self.setup()

        report = {}
        # CLIPVisionModel works the same with CLIPVisionTransformer, so we can use it directly
        if hasattr(model, "is_surgery_model") and model.is_surgery_model:
            log.info("running evaluation on a surgery model.")
            model: "SurgeryModelWrapper" = model
            self.clip_model.vision_model = model
        else:
            # replace the vision encoder with the model
            self.clip_model.vision_model = model
        classifier = HFCLIPClassifier(
            self.clip_model,
            processor=self.processor,
        )
        if self.move_to_device:
            classifier = cast(HFCLIPClassifier, self.fabric.to_device(classifier))
        # collect basic model information
        training_params, all_params = count_parameters(model)
        report["model_info"] = {
            "trainable_params": training_params,
            "all_params": all_params,
            "trainable_percentage": training_params / all_params,
        }
        if name is not None:
            report["model_info"]["name"] = name

        # evaluate on each task
        pbar = tqdm(
            self.test_dataloaders.items(),
            desc="Evaluating tasks",
            total=len(self.test_dataloaders),
        )
        for task_name, test_dataloader in pbar:
            classnames, templates = get_classnames_and_templates(task_name)
            self.on_task_evaluation_begin(classifier, task_name)
            classifier.set_classification_task(classnames, templates)
            result = self._evaluate(
                classifier,
                test_dataloader,
                num_classes=len(classnames),
                task_name=task_name,
            )
            report[task_name] = result
            self.on_task_evaluation_end()

        # calculate the average accuracy and loss
        if "average" not in report:
            report["average"] = {}
            accuracies = [
                value["accuracy"]
                for key, value in report.items()
                if "accuracy" in value
            ]
            if len(accuracies) > 0:
                average_accuracy = sum(accuracies) / len(accuracies)
                report["average"]["accuracy"] = average_accuracy
            losses = [value["loss"] for key, value in report.items() if "loss" in value]
            if len(losses) > 0:
                average_loss = sum(losses) / len(losses)
                report["average"]["loss"] = average_loss

        log.info(f"Evaluation Result: {report}")
        if self.fabric.is_global_zero and len(self.fabric._loggers) > 0:
            save_path = os.path.join(self.log_dir, "report.json")
            for version in itertools.count(1):
                if not os.path.exists(save_path):
                    break
                # if the file already exists, increment the version to avoid overwriting
                save_path = os.path.join(self.log_dir, f"report_{version}.json")
            with open(save_path, "w") as fp:
                json.dump(report, fp)
            log.info(f"Evaluation report saved to {save_path}")
        return report

    def on_task_evaluation_begin(self, classifier: HFCLIPClassifier, task_name: str):
        """
        Called at the beginning of task evaluation to set up hooks for saving layer-wise features.

        Args:
            classifier (HFCLIPClassifier): The classifier being evaluated.
            task_name (str): The name of the task being evaluated.
        """
        if self.layer_wise_feature_save_path is not None:
            # setup hooks for saving layer-wise features
            assert isinstance(
                classifier.clip_model.vision_model,
                (CLIPVisionTransformer, CLIPVisionModel),
            ), "Vision model is expected to be a CLIPVisionTransformer"
            vision_model = classifier.clip_model.vision_model
            if isinstance(vision_model, CLIPVisionModel):
                vision_model = vision_model.vision_model
                # assign forward hooks for each layer
            for i, layer in enumerate(vision_model.encoder.layers):
                self._layer_wise_feature_save_hooks[i] = LayerWiseFeatureSaver(
                    self.layer_wise_feature_save_path / task_name / f"layer_{i}.pth",
                    first_token_only=self.layer_wise_feature_first_token_only,
                    max_num=self.layer_wise_feature_max_num,
                )
                self._layer_wise_feature_save_hook_handles[i] = (
                    layer.register_forward_hook(self._layer_wise_feature_save_hooks[i])
                )

    def on_task_evaluation_end(self):
        """
        Called at the end of task evaluation to save features and remove hooks.
        """
        if self.layer_wise_feature_save_path is not None:
            # save features and remove hooks after evaluation
            for i, hook in self._layer_wise_feature_save_hooks.items():
                hook.save_features()
                self._layer_wise_feature_save_hook_handles[i].remove()
__init__(test_datasets, *, processor, clip_model, data_processor=None, dataloader_kwargs=None, layer_wise_feature_save_path=None, layer_wise_feature_first_token_only=True, layer_wise_feature_max_num=None, fast_dev_run=None, move_to_device=True, **kwargs)

Initialize the CLIPVisionModelTaskPool.

Source code in fusion_bench/taskpool/clip_vision/taskpool.py
def __init__(
    self,
    test_datasets: Union[DictConfig, Dict[str, Dataset]],
    *,
    processor: Union[str, DictConfig, CLIPProcessor],
    clip_model: Union[str, DictConfig, CLIPModel],
    data_processor: Union[DictConfig, CLIPProcessor] = None,
    dataloader_kwargs: DictConfig = None,
    layer_wise_feature_save_path: Optional[str] = None,
    layer_wise_feature_first_token_only: bool = True,
    layer_wise_feature_max_num: Optional[int] = None,
    fast_dev_run: Optional[bool] = None,
    move_to_device: bool = True,
    **kwargs,
):
    """
    Initialize the CLIPVisionModelTaskPool.
    """
    super().__init__(**kwargs)
    self._test_datasets = test_datasets
    self._processor = processor
    self._data_processor = data_processor
    self._clip_model = clip_model
    self._dataloader_kwargs = dataloader_kwargs or {}

    # layer-wise feature saving
    self._layer_wise_feature_save_path = layer_wise_feature_save_path
    self.layer_wise_feature_save_path = (
        Path(layer_wise_feature_save_path)
        if layer_wise_feature_save_path is not None
        else None
    )
    self.layer_wise_feature_first_token_only = layer_wise_feature_first_token_only
    self.layer_wise_feature_max_num = layer_wise_feature_max_num

    if self.fast_dev_run is None:
        self.fast_dev_run = RuntimeConstants().debug
    else:
        self.fast_dev_run = fast_dev_run
evaluate(model, name=None, **kwargs)

Evaluate the model on the image classification task.

Parameters:

  • model (Union[CLIPVisionModel, CLIPVisionTransformer]) –

    The model to evaluate.

  • name (Optional[str], default: None ) –

    The name of the model. This will be logged into the report if not None.

Returns:

  • Dict[str, Any]: A dictionary containing the evaluation results for each task.

Source code in fusion_bench/taskpool/clip_vision/taskpool.py
def evaluate(
    self,
    model: Union[CLIPVisionModel, CLIPVisionTransformer],
    name=None,
    **kwargs,
):
    """
    Evaluate the model on the image classification task.

    Args:
        model (Union[CLIPVisionModel, CLIPVisionTransformer]): The model to evaluate.
        name (Optional[str]): The name of the model. This will be logged into the report if not None.

    Returns:
        Dict[str, Any]: A dictionary containing the evaluation results for each task.
    """
    if not self._is_setup:
        self.setup()

    report = {}
    # CLIPVisionModel works the same with CLIPVisionTransformer, so we can use it directly
    if hasattr(model, "is_surgery_model") and model.is_surgery_model:
        log.info("running evaluation on a surgery model.")
        model: "SurgeryModelWrapper" = model
        self.clip_model.vision_model = model
    else:
        # replace the vision encoder with the model
        self.clip_model.vision_model = model
    classifier = HFCLIPClassifier(
        self.clip_model,
        processor=self.processor,
    )
    if self.move_to_device:
        classifier = cast(HFCLIPClassifier, self.fabric.to_device(classifier))
    # collect basic model information
    training_params, all_params = count_parameters(model)
    report["model_info"] = {
        "trainable_params": training_params,
        "all_params": all_params,
        "trainable_percentage": training_params / all_params,
    }
    if name is not None:
        report["model_info"]["name"] = name

    # evaluate on each task
    pbar = tqdm(
        self.test_dataloaders.items(),
        desc="Evaluating tasks",
        total=len(self.test_dataloaders),
    )
    for task_name, test_dataloader in pbar:
        classnames, templates = get_classnames_and_templates(task_name)
        self.on_task_evaluation_begin(classifier, task_name)
        classifier.set_classification_task(classnames, templates)
        result = self._evaluate(
            classifier,
            test_dataloader,
            num_classes=len(classnames),
            task_name=task_name,
        )
        report[task_name] = result
        self.on_task_evaluation_end()

    # calculate the average accuracy and loss
    if "average" not in report:
        report["average"] = {}
        accuracies = [
            value["accuracy"]
            for key, value in report.items()
            if "accuracy" in value
        ]
        if len(accuracies) > 0:
            average_accuracy = sum(accuracies) / len(accuracies)
            report["average"]["accuracy"] = average_accuracy
        losses = [value["loss"] for key, value in report.items() if "loss" in value]
        if len(losses) > 0:
            average_loss = sum(losses) / len(losses)
            report["average"]["loss"] = average_loss

    log.info(f"Evaluation Result: {report}")
    if self.fabric.is_global_zero and len(self.fabric._loggers) > 0:
        save_path = os.path.join(self.log_dir, "report.json")
        for version in itertools.count(1):
            if not os.path.exists(save_path):
                break
            # if the file already exists, increment the version to avoid overwriting
            save_path = os.path.join(self.log_dir, f"report_{version}.json")
        with open(save_path, "w") as fp:
            json.dump(report, fp)
        log.info(f"Evaluation report saved to {save_path}")
    return report
on_task_evaluation_begin(classifier, task_name)

Called at the beginning of task evaluation to set up hooks for saving layer-wise features.

Parameters:

  • classifier (HFCLIPClassifier) –

    The classifier being evaluated.

  • task_name (str) –

    The name of the task being evaluated.

Source code in fusion_bench/taskpool/clip_vision/taskpool.py
def on_task_evaluation_begin(self, classifier: HFCLIPClassifier, task_name: str):
    """
    Called at the beginning of task evaluation to set up hooks for saving layer-wise features.

    Args:
        classifier (HFCLIPClassifier): The classifier being evaluated.
        task_name (str): The name of the task being evaluated.
    """
    if self.layer_wise_feature_save_path is not None:
        # setup hooks for saving layer-wise features
        assert isinstance(
            classifier.clip_model.vision_model,
            (CLIPVisionTransformer, CLIPVisionModel),
        ), "Vision model is expected to be a CLIPVisionTransformer"
        vision_model = classifier.clip_model.vision_model
        if isinstance(vision_model, CLIPVisionModel):
            vision_model = vision_model.vision_model
            # assign forward hooks for each layer
        for i, layer in enumerate(vision_model.encoder.layers):
            self._layer_wise_feature_save_hooks[i] = LayerWiseFeatureSaver(
                self.layer_wise_feature_save_path / task_name / f"layer_{i}.pth",
                first_token_only=self.layer_wise_feature_first_token_only,
                max_num=self.layer_wise_feature_max_num,
            )
            self._layer_wise_feature_save_hook_handles[i] = (
                layer.register_forward_hook(self._layer_wise_feature_save_hooks[i])
            )
on_task_evaluation_end()

Called at the end of task evaluation to save features and remove hooks.

Source code in fusion_bench/taskpool/clip_vision/taskpool.py
def on_task_evaluation_end(self):
    """
    Called at the end of task evaluation to save features and remove hooks.
    """
    if self.layer_wise_feature_save_path is not None:
        # save features and remove hooks after evaluation
        for i, hook in self._layer_wise_feature_save_hooks.items():
            hook.save_features()
            self._layer_wise_feature_save_hook_handles[i].remove()
setup()

Set up the processor, data processor, CLIP model, test datasets, and data loaders.

Source code in fusion_bench/taskpool/clip_vision/taskpool.py
def setup(self):
    """
    Set up the processor, data processor, CLIP model, test datasets, and data loaders.
    """
    # setup processor and clip model
    if isinstance(self._processor, str):
        self.processor = CLIPProcessor.from_pretrained(self._processor)
    elif (
        isinstance(self._processor, (dict, DictConfig))
        and "_target_" in self._processor
    ):
        self.processor = instantiate(self._processor)
    else:
        self.processor = self._processor

    if self._data_processor is None:
        self.data_processor = self.processor
    else:
        self.data_processor = (
            instantiate(self._data_processor)
            if isinstance(self._data_processor, DictConfig)
            else self._data_processor
        )

    if isinstance(self._clip_model, str):
        self.clip_model = CLIPModel.from_pretrained(self._clip_model)
    elif (
        isinstance(self._clip_model, (dict, DictConfig))
        and "_target_" in self._clip_model
    ):
        self.clip_model = instantiate(self._clip_model)
    else:
        self.clip_model = self._clip_model

    self.clip_model = self.fabric.to_device(self.clip_model)
    self.clip_model.requires_grad_(False)
    self.clip_model.eval()

    # Load the test datasets
    self.test_datasets = {
        name: instantiate(dataset) if isinstance(dataset, DictConfig) else dataset
        for name, dataset in self._test_datasets.items()
    }
    self.test_datasets = {
        name: CLIPDataset(dataset, self.data_processor)
        for name, dataset in self.test_datasets.items()
    }
    # Setup the dataloaders
    self.test_dataloaders = {
        name: DataLoader(
            dataset,
            **self._dataloader_kwargs,
            collate_fn=(
                raw_image_collate_fn if self.data_processor is None else None
            ),
        )
        for name, dataset in self.test_datasets.items()
    }
    self.test_dataloaders = {
        name: self.fabric.setup_dataloaders(
            dataloader, move_to_device=self.move_to_device
        )
        for name, dataloader in self.test_dataloaders.items()
    }

    self._is_setup = True

SparseWEMoECLIPVisionModelTaskPool

Bases: CLIPVisionModelTaskPool

Source code in fusion_bench/taskpool/clip_vision/clip_sparse_wemoe_taskpool.py
class SparseWEMoECLIPVisionModelTaskPool(CLIPVisionModelTaskPool):

    # hooks and handles for saving layer-wise routing weights
    _layer_wise_routing_weights_save_hooks: Dict[Any, LayerWiseRoutingWeightSaver] = {}
    _layer_wise_routing_weights_save_hook_handles: Dict[Any, RemovableHandle] = {}

    _config_mapping = CLIPVisionModelTaskPool._config_mapping | {
        "_layer_wise_routing_weights_save_path": "layer_wise_routing_weights_save_path",
    }

    def __init__(
        self,
        layer_wise_routing_weights_save_path: Optional[str],
        layer_wise_routing_weights_max_num: Optional[int] = None,
        **kwargs,
    ):
        # save path for layer-wise routing weights
        self._layer_wise_routing_weights_save_path = (
            layer_wise_routing_weights_save_path
        )
        self.layer_wise_routing_weights_save_path = (
            Path(layer_wise_routing_weights_save_path)
            if layer_wise_routing_weights_save_path is not None
            else None
        )
        self.layer_wise_routing_weights_max_num = layer_wise_routing_weights_max_num
        super().__init__(**kwargs)

    def on_task_evaluation_begin(self, classifier: HFCLIPClassifier, task_name: str):
        super().on_task_evaluation_begin(classifier, task_name)
        if self.layer_wise_routing_weights_save_path is not None:
            # setup hooks for saving layer-wise routing weights
            assert isinstance(
                classifier.clip_model.vision_model,
                (CLIPVisionTransformer, CLIPVisionModel),
            ), "Vision model is expected to be a CLIPVisionTransformer"
            vision_model = classifier.clip_model.vision_model
            if isinstance(vision_model, CLIPVisionModel):
                vision_model = vision_model.vision_model
                # assign forward hooks for each layer
            shared_gate = None
            for i, layer in enumerate(vision_model.encoder.layers):
                mlp = layer.mlp
                assert isinstance(
                    mlp,
                    (SparseWeightEnsemblingMoE, SparseWeightEnsemblingMoE_ShardGate),
                ), f"MLP is expected to be a SparseWeightEnsemblingMoE or SparseWeightEnsemblingMoE_ShardGate, but got {type(mlp)}"
                # layer-wise routing weights
                hook = LayerWiseRoutingWeightSaver(
                    self.layer_wise_routing_weights_save_path
                    / task_name
                    / f"layer_{i}.pt",
                    max_num=self.layer_wise_routing_weights_max_num,
                )
                self._layer_wise_routing_weights_save_hooks[i] = hook
                if isinstance(mlp, SparseWeightEnsemblingMoE_ShardGate):
                    # if use shared gate, copy the gate to all layers to avoid multiple hooks
                    if shared_gate is None:
                        shared_gate = mlp.gate
                    mlp.gate = deepcopy(shared_gate)
                self._layer_wise_routing_weights_save_hook_handles[i] = (
                    mlp.gate.register_forward_hook(hook)
                )

    def on_task_evaluation_end(self):
        super().on_task_evaluation_end()
        if self.layer_wise_routing_weights_save_path is not None:
            # remove hooks for saving layer-wise routing weights
            for i, handle in self._layer_wise_routing_weights_save_hook_handles.items():
                self._layer_wise_routing_weights_save_hooks[i].save_routing_weights()
                self._layer_wise_routing_weights_save_hook_handles.pop(i)
                handle.remove()

RankoneMoECLIPVisionModelTaskPool

Bases: CLIPVisionModelTaskPool

Source code in fusion_bench/taskpool/clip_vision/clip_rankone_moe_taskpool.py
class RankoneMoECLIPVisionModelTaskPool(CLIPVisionModelTaskPool):

    # hooks and handles for saving layer-wise routing weights
    _layer_wise_routing_weights_save_hooks: Dict[Any, LayerWiseRoutingWeightSaver] = {}
    _layer_wise_routing_weights_save_hook_handles: Dict[Any, RemovableHandle] = {}

    _config_mapping = CLIPVisionModelTaskPool._config_mapping | {
        "_layer_wise_routing_weights_save_path": "layer_wise_routing_weights_save_path",
    }

    def __init__(
        self,
        layer_wise_routing_weights_save_path: Optional[str],
        layer_wise_routing_weights_max_num: Optional[int] = None,
        **kwargs,
    ):
        # save path for layer-wise routing weights
        self._layer_wise_routing_weights_save_path = (
            layer_wise_routing_weights_save_path
        )
        self.layer_wise_routing_weights_save_path = (
            Path(layer_wise_routing_weights_save_path)
            if layer_wise_routing_weights_save_path is not None
            else None
        )
        self.layer_wise_routing_weights_max_num = layer_wise_routing_weights_max_num
        super().__init__(**kwargs)

    def on_task_evaluation_begin(self, classifier: HFCLIPClassifier, task_name: str):
        super().on_task_evaluation_begin(classifier, task_name)
        if self.layer_wise_routing_weights_save_path is not None:
            # setup hooks for saving layer-wise routing weights
            assert isinstance(
                classifier.clip_model.vision_model,
                (CLIPVisionTransformer, CLIPVisionModel),
            ), "Vision model is expected to be a CLIPVisionTransformer"
            vision_model = classifier.clip_model.vision_model
            if isinstance(vision_model, CLIPVisionModel):
                vision_model = vision_model.vision_model
                # assign forward hooks for each layer

            for i, layer in enumerate(vision_model.encoder.layers):
                mlp = layer.mlp
                assert isinstance(
                    mlp,
                    (RankOneMoE),
                ), f"MLP is expected to be a RankOneWeightEnsemblingMoE, but got {type(mlp)}"
                # layer-wise routing weights
                hook = LayerWiseRoutingWeightSaver(
                    self.layer_wise_routing_weights_save_path
                    / task_name
                    / f"layer_{i}.pt",
                    max_num=self.layer_wise_routing_weights_max_num,
                )
                self._layer_wise_routing_weights_save_hooks[i] = hook
                self._layer_wise_routing_weights_save_hook_handles[i] = (
                    mlp.gate.register_forward_hook(hook)
                )

    def on_task_evaluation_end(self):
        super().on_task_evaluation_end()
        if self.layer_wise_routing_weights_save_path is not None:
            # remove hooks for saving layer-wise routing weights
            for i, handle in self._layer_wise_routing_weights_save_hook_handles.items():
                self._layer_wise_routing_weights_save_hooks[i].save_routing_weights()
                self._layer_wise_routing_weights_save_hook_handles.pop(i)
                handle.remove()

Natural Language Processing (NLP) Tasks

GPT-2

GPT2TextClassificationTaskPool

Bases: BaseTaskPool, LightningFabricMixin

A task pool for GPT2 text classification tasks. This class manages the tasks and provides methods for loading test dataset and evaluation.

Source code in fusion_bench/taskpool/gpt2_text_classification.py
class GPT2TextClassificationTaskPool(BaseTaskPool, LightningFabricMixin):
    """
    A task pool for GPT2 text classification tasks.
    This class manages the tasks and provides methods for loading test dataset and evaluation.
    """

    _config_mapping = BaseTaskPool._config_mapping | {
        "_test_datasets": "test_datasets",
        "_tokenizer": "tokenizer",
        "dataloader_kwargs": "dataloader_kwargs",
        "fast_dev_run": "fast_dev_run",
    }

    def __init__(
        self,
        test_datasets: DictConfig,
        tokenizer: DictConfig,
        dataloader_kwargs: DictConfig,
        fast_dev_run: bool,
        **kwargs,
    ):
        self._test_datasets = test_datasets
        self._tokenizer = tokenizer
        self.dataloader_kwargs = dataloader_kwargs
        self.fast_dev_run = fast_dev_run
        super().__init__(**kwargs)

        self.setup()

    def setup(self):
        global tokenizer
        self.tokenizer = tokenizer = instantiate(self._tokenizer)

    def get_classifier(
        self, task_name: str, model: GPT2Model
    ) -> GPT2ForSequenceClassification:
        modelpool = self._program.modelpool
        classifier = modelpool.load_classifier(task_name)
        classifier.transformer = deepcopy(model)
        return classifier

    @torch.no_grad()
    def evaluate_single_task(
        self,
        task_name: str,
        model: GPT2Model,
        test_loader: DataLoader,
    ):
        loss_metric = MeanMetric()
        # load classifier and replace the backbone with the passed model
        model: GPT2ForSequenceClassification = self.get_classifier(task_name, model)
        accuracy = Accuracy("multiclass", num_classes=model.num_labels)
        model = self.fabric.setup(model)

        if self.config.get("fast_dev_run", False):
            log.info("Running under fast_dev_run mode, evaluating on a single batch.")
            test_loader = itertools.islice(test_loader, 1)
        else:
            test_loader = test_loader

        for batch in (
            pbar := tqdm(
                test_loader, desc="Evaluating", leave=False, dynamic_ncols=True
            )
        ):
            input_ids = batch["input_ids"]
            attention_mask = batch["attention_mask"]
            labels = batch["labels"]

            outputs = model(input_ids, attention_mask=attention_mask)
            logits = outputs.logits
            loss = F.cross_entropy(logits, labels)

            accuracy(logits.detach().cpu(), labels.detach().cpu())
            loss_metric.update(loss.detach().cpu())
            pbar.set_postfix(
                {
                    "accuracy": accuracy.compute().item(),
                    "loss": loss_metric.compute().item(),
                }
            )

        acc = accuracy.compute().item()
        loss = loss_metric.compute().item()
        results = {"accuracy": acc, "loss": loss}
        log.info(f"Results for task {task_name}: {results}")
        return results

    def get_test_dataloader(self, task_name: str):
        dataset = instantiate(self._test_datasets[task_name])
        dataloader_kwargs = {
            "shuffle": False,
        }
        dataloader_kwargs.update(self.dataloader_kwargs)
        dataloader = DataLoader(
            dataset, collate_fn=default_data_collator, **dataloader_kwargs
        )
        if self.fabric is not None:
            dataloader = self.fabric.setup_dataloaders(dataloader)
        return dataloader

    @override
    def evaluate(self, model: GPT2Model, name: str = None):
        """Evaluate the model on the test datasets.

        Args:
            model (GPT2Model): The model to evaluate.
            name (str, optional): The name of the model. Defaults to None. This is used to identify the model in the report.

        Returns:
            dict: A dictionary containing the evaluation results for each task.
        """
        report = {}
        if name is not None:
            report["name"] = name
        for task_name in (pbar := tqdm(self._test_datasets, desc="Evaluating tasks")):
            pbar.set_description(f"Evaluating task {task_name}")
            dataloader = self.get_test_dataloader(task_name)
            result = self.evaluate_single_task(task_name, model, dataloader)
            report[task_name] = result

        # calculate the average accuracy and loss
        if "average" not in report:
            report["average"] = {}
            accuracies = [
                value["accuracy"]
                for key, value in report.items()
                if isinstance(value, dict) and "accuracy" in value
            ]
            if len(accuracies) > 0:
                average_accuracy = sum(accuracies) / len(accuracies)
                report["average"]["accuracy"] = average_accuracy
            losses = [value["loss"] for key, value in report.items() if "loss" in value]
            if len(losses) > 0:
                average_loss = sum(losses) / len(losses)
                report["average"]["loss"] = average_loss

        log.info(f"Evaluation Result: {report}")
        return report
evaluate(model, name=None)

Evaluate the model on the test datasets.

Parameters:

  • model (GPT2Model) –

    The model to evaluate.

  • name (str, default: None ) –

    The name of the model. Defaults to None. This is used to identify the model in the report.

Returns:

  • dict

    A dictionary containing the evaluation results for each task.

Source code in fusion_bench/taskpool/gpt2_text_classification.py
@override
def evaluate(self, model: GPT2Model, name: str = None):
    """Evaluate the model on the test datasets.

    Args:
        model (GPT2Model): The model to evaluate.
        name (str, optional): The name of the model. Defaults to None. This is used to identify the model in the report.

    Returns:
        dict: A dictionary containing the evaluation results for each task.
    """
    report = {}
    if name is not None:
        report["name"] = name
    for task_name in (pbar := tqdm(self._test_datasets, desc="Evaluating tasks")):
        pbar.set_description(f"Evaluating task {task_name}")
        dataloader = self.get_test_dataloader(task_name)
        result = self.evaluate_single_task(task_name, model, dataloader)
        report[task_name] = result

    # calculate the average accuracy and loss
    if "average" not in report:
        report["average"] = {}
        accuracies = [
            value["accuracy"]
            for key, value in report.items()
            if isinstance(value, dict) and "accuracy" in value
        ]
        if len(accuracies) > 0:
            average_accuracy = sum(accuracies) / len(accuracies)
            report["average"]["accuracy"] = average_accuracy
        losses = [value["loss"] for key, value in report.items() if "loss" in value]
        if len(losses) > 0:
            average_loss = sum(losses) / len(losses)
            report["average"]["loss"] = average_loss

    log.info(f"Evaluation Result: {report}")
    return report

Flan-T5

fusion_bench.compat.taskpool.flan_t5_glue_text_generation.FlanT5GLUETextGenerationTask

Bases: BaseTask

Source code in fusion_bench/compat/taskpool/flan_t5_glue_text_generation.py
class FlanT5GLUETextGenerationTask(BaseTask):
    _taskpool: "FlanT5GLUETextGenerationTaskPool" = None

    @property
    def taskpool(self):
        if self._taskpool is not None:
            return self._taskpool
        else:
            raise ValueError("Taskpool not set")

    @property
    def fabric(self):
        return self.taskpool.fabric

    @property
    def tokenizer(self):
        return self.taskpool.tokenizer

    @functools.cached_property
    def dataset(self):
        log.info(f'Loading dataset: "{self.config.dataset.name}"')
        dataset = load_glue_dataset(
            self.config.dataset.name, self.tokenizer, self.taskpool.config.cache_dir
        )
        return dataset

    @functools.cached_property
    def test_dataset(self):
        return self.dataset[self.config.dataset.split]

    @property
    def test_loader(self):
        loader = DataLoader(
            self.test_dataset,
            batch_size=self.taskpool.config.batch_size,
            num_workers=self.taskpool.config.num_workers,
            shuffle=False,
            collate_fn=default_data_collator,
        )
        loader = self.fabric.setup_dataloaders(loader)
        return loader

LM-Eval-Harness Integration (LLM)

LMEvalHarnessTaskPool

Bases: BaseTaskPool, LightningFabricMixin

A task pool implementation that interfaces with the LM Evaluation Harness framework.

This class provides a wrapper around the LM Evaluation Harness (lm-eval) library, enabling evaluation of language models on various standardized benchmarks and tasks. It inherits from BaseTaskPool and LightningFabricMixin to provide distributed computing capabilities through PyTorch Lightning Fabric.

The task pool supports evaluation on multiple tasks simultaneously and provides flexible configuration options for batch processing, output formatting, and logging. It automatically handles model setup and wrapping for distributed evaluation when using Lightning Fabric.

Parameters:

  • tasks (Union[str, List[str]]) –

    A single task name or list of task names to evaluate on. Examples: "hellaswag", ["arc_easy", "arc_challenge", "hellaswag"]

  • apply_chat_template (bool, default: False ) –

    Whether to apply chat template formatting to inputs. Useful for instruction-tuned or chat models.

  • include_path (Optional[str], default: None ) –

    Path to additional task definitions or custom tasks.

  • batch_size (int, default: 1 ) –

    Number of samples to process in each batch. Larger values may improve throughput but require more memory.

  • metadata (Optional[DictConfig], default: None ) –

    Additional metadata to include in evaluation results.

  • verbosity (Optional[Literal['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG']], default: None ) –

    Logging verbosity level for the evaluation process.

  • output_path (Optional[str], default: None ) –

    Custom path for saving evaluation results. If None, results are saved to the default log directory.

  • log_samples (bool, default: False ) –

    Whether to log individual sample predictions and targets. Useful for debugging but increases output size significantly.

  • _usage_ (Optional[str], default: None ) –

    Internal usage tracking string.

  • _version_ (Optional[str], default: None ) –

    Internal version tracking string.

  • **kwargs

    Additional arguments passed to the LM Evaluation Harness.

Example
>>> taskpool = LMEvalHarnessTaskPool(
...     tasks=["arc_easy", "hellaswag"],
...     batch_size=8,
...     verbosity="INFO"
... )
>>> results = taskpool.evaluate(model)
Source code in fusion_bench/taskpool/lm_eval_harness/taskpool.py
class LMEvalHarnessTaskPool(BaseTaskPool, LightningFabricMixin):
    """A task pool implementation that interfaces with the LM Evaluation Harness framework.

    This class provides a wrapper around the LM Evaluation Harness (lm-eval) library,
    enabling evaluation of language models on various standardized benchmarks and tasks.
    It inherits from BaseTaskPool and LightningFabricMixin to provide distributed
    computing capabilities through PyTorch Lightning Fabric.

    The task pool supports evaluation on multiple tasks simultaneously and provides
    flexible configuration options for batch processing, output formatting, and
    logging. It automatically handles model setup and wrapping for distributed
    evaluation when using Lightning Fabric.

    Args:
        tasks: A single task name or list of task names to evaluate on.
            Examples: "hellaswag", ["arc_easy", "arc_challenge", "hellaswag"]
        apply_chat_template: Whether to apply chat template formatting to inputs.
            Useful for instruction-tuned or chat models.
        include_path: Path to additional task definitions or custom tasks.
        batch_size: Number of samples to process in each batch. Larger values
            may improve throughput but require more memory.
        metadata: Additional metadata to include in evaluation results.
        verbosity: Logging verbosity level for the evaluation process.
        output_path: Custom path for saving evaluation results. If None,
            results are saved to the default log directory.
        log_samples: Whether to log individual sample predictions and targets.
            Useful for debugging but increases output size significantly.
        _usage_: Internal usage tracking string.
        _version_: Internal version tracking string.
        **kwargs: Additional arguments passed to the LM Evaluation Harness.

    Example:
        ```python
        >>> taskpool = LMEvalHarnessTaskPool(
        ...     tasks=["arc_easy", "hellaswag"],
        ...     batch_size=8,
        ...     verbosity="INFO"
        ... )
        >>> results = taskpool.evaluate(model)
        ```
    """

    def __init__(
        self,
        tasks: Union[str, List[str]],
        apply_chat_template: bool = False,
        include_path: Optional[str] = None,
        batch_size: int = 1,
        metadata: Optional[DictConfig] = None,
        verbosity: Optional[
            Literal["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]
        ] = None,
        output_path: Optional[str] = None,
        log_samples: bool = False,
        _usage_: Optional[str] = None,
        _version_: Optional[str] = None,
        **kwargs,
    ):
        super().__init__(_usage_=_usage_, _version_=_version_)
        self.tasks = tasks
        self.include_path = include_path
        self.batch_size = batch_size
        self.metadata = metadata
        self.apply_chat_template = apply_chat_template
        self.verbosity = verbosity
        self.kwargs = kwargs
        self.output_path = output_path
        self.log_samples = log_samples

    def evaluate(self, model, *command_line_args, **kwargs):
        """Evaluate a language model on the configured tasks using LM Evaluation Harness.

        This method wraps the model with the LM Evaluation Harness framework and
        executes evaluation on all configured tasks. It automatically handles
        command-line argument construction, model wrapping with Lightning Fabric
        for distributed evaluation, and result logging.

        The evaluation process includes:
        1. Building command-line arguments from instance configuration
        2. Setting up the LM Evaluation Harness argument parser
        3. Wrapping the model with Lightning Fabric if not already wrapped
        4. Creating an HFLM (Hugging Face Language Model) wrapper
        5. Executing the evaluation through the LM-Eval CLI interface

        Args:
            model: The language model to evaluate. Can be a Hugging Face model,
                PyTorch model, or any model compatible with the LM Evaluation Harness.
                The model will be automatically wrapped with Lightning Fabric for
                distributed evaluation if not already wrapped.
            *command_line_args: Additional positional command-line arguments
                (currently unused but preserved for interface compatibility).
            **kwargs: Additional keyword arguments that will be converted to
                command-line flags and passed to the LM Evaluation Harness.
                Keys will be prefixed with '--' and values converted to strings.

        Returns:
            None: Results are written to the configured output path and logged.

        Example:
            ```python
            >>> taskpool = LMEvalHarnessTaskPool(tasks=["arc_easy"])
            >>> taskpool.evaluate(model, limit=100, device="cuda")
            ```

        Note:
            The method leverages the LM Evaluation Harness's command-line interface
            internally, which provides standardized evaluation procedures and
            ensures compatibility with the broader evaluation ecosystem.
        """
        command_line_args = []
        if self.include_path is not None:
            command_line_args.extend(["--include_path", self.include_path])
        if isinstance(self.tasks, (list, ListConfig)):
            command_line_args.extend(["--tasks", ",".join(self.tasks)])
        elif isinstance(self.tasks, str):
            command_line_args.extend(["--tasks", self.tasks])
        if self.apply_chat_template:
            command_line_args.extend(
                ["--apply_chat_template", str(self.apply_chat_template)]
            )
        if self.batch_size is not None:
            command_line_args.extend(["--batch_size", str(self.batch_size)])
        if self.verbosity is not None:
            command_line_args.extend(["--verbosity", str(self.verbosity)])
        if self.metadata is not None:
            command_line_args.extend(["--metadata", str(self.metadata)])
        if self.output_path is None:
            command_line_args.extend(
                [
                    "--output_path",
                    os.path.join(self.log_dir, "lm_eval_results"),
                ]
            )
        else:
            command_line_args.extend(["--output_path", self.output_path])
        if self.log_samples:
            command_line_args.extend(["--log_samples"])
        for key, value in kwargs.items():
            command_line_args.extend([f"--{key}", str(value)])

        parser = setup_parser()
        check_argument_types(parser)
        args = parser.parse_args(args=command_line_args)
        log.info("LM-Eval Harness arguments:\n%s", args)

        if not lightning.fabric.is_wrapped(model):
            model = self.fabric.setup(model)
        args.model = lm_eval.models.huggingface.HFLM(pretrained=model)
        cli_evaluate(args)
evaluate(model, *command_line_args, **kwargs)

Evaluate a language model on the configured tasks using LM Evaluation Harness.

This method wraps the model with the LM Evaluation Harness framework and executes evaluation on all configured tasks. It automatically handles command-line argument construction, model wrapping with Lightning Fabric for distributed evaluation, and result logging.

The evaluation process includes: 1. Building command-line arguments from instance configuration 2. Setting up the LM Evaluation Harness argument parser 3. Wrapping the model with Lightning Fabric if not already wrapped 4. Creating an HFLM (Hugging Face Language Model) wrapper 5. Executing the evaluation through the LM-Eval CLI interface

Parameters:

  • model

    The language model to evaluate. Can be a Hugging Face model, PyTorch model, or any model compatible with the LM Evaluation Harness. The model will be automatically wrapped with Lightning Fabric for distributed evaluation if not already wrapped.

  • *command_line_args

    Additional positional command-line arguments (currently unused but preserved for interface compatibility).

  • **kwargs

    Additional keyword arguments that will be converted to command-line flags and passed to the LM Evaluation Harness. Keys will be prefixed with '--' and values converted to strings.

Returns:

  • None

    Results are written to the configured output path and logged.

Example
>>> taskpool = LMEvalHarnessTaskPool(tasks=["arc_easy"])
>>> taskpool.evaluate(model, limit=100, device="cuda")
Note

The method leverages the LM Evaluation Harness's command-line interface internally, which provides standardized evaluation procedures and ensures compatibility with the broader evaluation ecosystem.

Source code in fusion_bench/taskpool/lm_eval_harness/taskpool.py
def evaluate(self, model, *command_line_args, **kwargs):
    """Evaluate a language model on the configured tasks using LM Evaluation Harness.

    This method wraps the model with the LM Evaluation Harness framework and
    executes evaluation on all configured tasks. It automatically handles
    command-line argument construction, model wrapping with Lightning Fabric
    for distributed evaluation, and result logging.

    The evaluation process includes:
    1. Building command-line arguments from instance configuration
    2. Setting up the LM Evaluation Harness argument parser
    3. Wrapping the model with Lightning Fabric if not already wrapped
    4. Creating an HFLM (Hugging Face Language Model) wrapper
    5. Executing the evaluation through the LM-Eval CLI interface

    Args:
        model: The language model to evaluate. Can be a Hugging Face model,
            PyTorch model, or any model compatible with the LM Evaluation Harness.
            The model will be automatically wrapped with Lightning Fabric for
            distributed evaluation if not already wrapped.
        *command_line_args: Additional positional command-line arguments
            (currently unused but preserved for interface compatibility).
        **kwargs: Additional keyword arguments that will be converted to
            command-line flags and passed to the LM Evaluation Harness.
            Keys will be prefixed with '--' and values converted to strings.

    Returns:
        None: Results are written to the configured output path and logged.

    Example:
        ```python
        >>> taskpool = LMEvalHarnessTaskPool(tasks=["arc_easy"])
        >>> taskpool.evaluate(model, limit=100, device="cuda")
        ```

    Note:
        The method leverages the LM Evaluation Harness's command-line interface
        internally, which provides standardized evaluation procedures and
        ensures compatibility with the broader evaluation ecosystem.
    """
    command_line_args = []
    if self.include_path is not None:
        command_line_args.extend(["--include_path", self.include_path])
    if isinstance(self.tasks, (list, ListConfig)):
        command_line_args.extend(["--tasks", ",".join(self.tasks)])
    elif isinstance(self.tasks, str):
        command_line_args.extend(["--tasks", self.tasks])
    if self.apply_chat_template:
        command_line_args.extend(
            ["--apply_chat_template", str(self.apply_chat_template)]
        )
    if self.batch_size is not None:
        command_line_args.extend(["--batch_size", str(self.batch_size)])
    if self.verbosity is not None:
        command_line_args.extend(["--verbosity", str(self.verbosity)])
    if self.metadata is not None:
        command_line_args.extend(["--metadata", str(self.metadata)])
    if self.output_path is None:
        command_line_args.extend(
            [
                "--output_path",
                os.path.join(self.log_dir, "lm_eval_results"),
            ]
        )
    else:
        command_line_args.extend(["--output_path", self.output_path])
    if self.log_samples:
        command_line_args.extend(["--log_samples"])
    for key, value in kwargs.items():
        command_line_args.extend([f"--{key}", str(value)])

    parser = setup_parser()
    check_argument_types(parser)
    args = parser.parse_args(args=command_line_args)
    log.info("LM-Eval Harness arguments:\n%s", args)

    if not lightning.fabric.is_wrapped(model):
        model = self.fabric.setup(model)
    args.model = lm_eval.models.huggingface.HFLM(pretrained=model)
    cli_evaluate(args)

Task Agnostic

Utility Classes

DummyTaskPool

Bases: BaseTaskPool

A lightweight task pool implementation for debugging and development workflows.

This dummy task pool provides a minimal evaluation interface that focuses on model introspection rather than task-specific performance evaluation. It's designed for development scenarios where you need to test model fusion pipelines, validate architectures, or debug workflows without the overhead of running actual evaluation tasks.

The task pool is particularly useful when
  • You want to verify model fusion works correctly
  • You need to check parameter counts after fusion
  • You're developing new fusion algorithms
  • You want to test infrastructure without expensive evaluations
Example
>>> taskpool = DummyTaskPool(model_save_path="/tmp/fused_model")
>>> results = taskpool.evaluate(fused_model)
>>> print(f"Model has {results['model_info']['trainable_params']} parameters")
Source code in fusion_bench/taskpool/dummy.py
class DummyTaskPool(BaseTaskPool):
    """A lightweight task pool implementation for debugging and development workflows.

    This dummy task pool provides a minimal evaluation interface that focuses on
    model introspection rather than task-specific performance evaluation. It's
    designed for development scenarios where you need to test model fusion
    pipelines, validate architectures, or debug workflows without the overhead
    of running actual evaluation tasks.

    The task pool is particularly useful when:
        - You want to verify model fusion works correctly
        - You need to check parameter counts after fusion
        - You're developing new fusion algorithms
        - You want to test infrastructure without expensive evaluations

    Example:
        ```python
        >>> taskpool = DummyTaskPool(model_save_path="/tmp/fused_model")
        >>> results = taskpool.evaluate(fused_model)
        >>> print(f"Model has {results['model_info']['trainable_params']} parameters")
        ```
    """

    def __init__(self, model_save_path: Optional[str] = None, **kwargs):
        """Initialize the dummy task pool with optional model saving capability.

        Args:
            model_save_path: Optional path where the evaluated model should be saved.
                If provided, the model will be serialized and saved to this location
                after evaluation using the separate_save utility. If None, no model
                saving will be performed.

        Example:
            ```python
            >>> # Create taskpool without saving
            >>> taskpool = DummyTaskPool()

            >>> # Create taskpool with model saving
            >>> taskpool = DummyTaskPool(model_save_path="/path/to/save/model.pth")
            ```
        """
        super().__init__(**kwargs)
        self.model_save_path = model_save_path

    def evaluate(self, model):
        """Perform lightweight evaluation and analysis of the given model.

        This method provides a minimal evaluation that focuses on model introspection
        rather than task-specific performance metrics. It performs parameter analysis,
        optionally saves the model, and returns a summary report.

        The evaluation process includes:
        1. Printing human-readable parameter information (rank-zero only)
        2. Optionally saving the model if a save path was configured
        3. Generating and returning a model summary report

        Args:
            model: The model to evaluate. Can be any PyTorch nn.Module including
                fusion models, pre-trained models, or custom architectures.

        Returns:
            dict: A model summary report containing parameter statistics and
                architecture information. See get_model_summary() for detailed
                format specification.

        Example:
            ```python
            >>> taskpool = DummyTaskPool(model_save_path="/tmp/model.pth")
            >>> model = torch.nn.Linear(10, 5)
            >>> results = taskpool.evaluate(model)
            >>> print(f"Trainable params: {results['model_info']['trainable_params']}")
            ```
        """
        if rank_zero_only.rank == 0:
            print_parameters(model, is_human_readable=True)

            if self.model_save_path is not None:
                with timeit_context(f"Saving the model to {self.model_save_path}"):
                    separate_save(model, self.model_save_path)

        return get_model_summary(model)
__init__(model_save_path=None, **kwargs)

Initialize the dummy task pool with optional model saving capability.

Parameters:

  • model_save_path (Optional[str], default: None ) –

    Optional path where the evaluated model should be saved. If provided, the model will be serialized and saved to this location after evaluation using the separate_save utility. If None, no model saving will be performed.

Example
>>> # Create taskpool without saving
>>> taskpool = DummyTaskPool()

>>> # Create taskpool with model saving
>>> taskpool = DummyTaskPool(model_save_path="/path/to/save/model.pth")
Source code in fusion_bench/taskpool/dummy.py
def __init__(self, model_save_path: Optional[str] = None, **kwargs):
    """Initialize the dummy task pool with optional model saving capability.

    Args:
        model_save_path: Optional path where the evaluated model should be saved.
            If provided, the model will be serialized and saved to this location
            after evaluation using the separate_save utility. If None, no model
            saving will be performed.

    Example:
        ```python
        >>> # Create taskpool without saving
        >>> taskpool = DummyTaskPool()

        >>> # Create taskpool with model saving
        >>> taskpool = DummyTaskPool(model_save_path="/path/to/save/model.pth")
        ```
    """
    super().__init__(**kwargs)
    self.model_save_path = model_save_path
evaluate(model)

Perform lightweight evaluation and analysis of the given model.

This method provides a minimal evaluation that focuses on model introspection rather than task-specific performance metrics. It performs parameter analysis, optionally saves the model, and returns a summary report.

The evaluation process includes: 1. Printing human-readable parameter information (rank-zero only) 2. Optionally saving the model if a save path was configured 3. Generating and returning a model summary report

Parameters:

  • model

    The model to evaluate. Can be any PyTorch nn.Module including fusion models, pre-trained models, or custom architectures.

Returns:

  • dict

    A model summary report containing parameter statistics and architecture information. See get_model_summary() for detailed format specification.

Example
>>> taskpool = DummyTaskPool(model_save_path="/tmp/model.pth")
>>> model = torch.nn.Linear(10, 5)
>>> results = taskpool.evaluate(model)
>>> print(f"Trainable params: {results['model_info']['trainable_params']}")
Source code in fusion_bench/taskpool/dummy.py
def evaluate(self, model):
    """Perform lightweight evaluation and analysis of the given model.

    This method provides a minimal evaluation that focuses on model introspection
    rather than task-specific performance metrics. It performs parameter analysis,
    optionally saves the model, and returns a summary report.

    The evaluation process includes:
    1. Printing human-readable parameter information (rank-zero only)
    2. Optionally saving the model if a save path was configured
    3. Generating and returning a model summary report

    Args:
        model: The model to evaluate. Can be any PyTorch nn.Module including
            fusion models, pre-trained models, or custom architectures.

    Returns:
        dict: A model summary report containing parameter statistics and
            architecture information. See get_model_summary() for detailed
            format specification.

    Example:
        ```python
        >>> taskpool = DummyTaskPool(model_save_path="/tmp/model.pth")
        >>> model = torch.nn.Linear(10, 5)
        >>> results = taskpool.evaluate(model)
        >>> print(f"Trainable params: {results['model_info']['trainable_params']}")
        ```
    """
    if rank_zero_only.rank == 0:
        print_parameters(model, is_human_readable=True)

        if self.model_save_path is not None:
            with timeit_context(f"Saving the model to {self.model_save_path}"):
                separate_save(model, self.model_save_path)

    return get_model_summary(model)