Skip to content

Utils

cheminformatics

smiles2fingerprints(smiles, bond_radius=5, n_bits=2048)

Transforms a list of smiles to an array of morgan fingerprints.

Parameters:

Name Type Description Default
smiles List[str]

List of smiles

required
bond_radius int

Bond radius to use. Defaults to 5.

5
n_bits int

Number of bits. Defaults to 2048.

2048

Returns:

Type Description
np.ndarray

Numpy array holding the fingerprints

Source code in bofire/utils/cheminformatics.py
def smiles2fingerprints(
    smiles: List[str],
    bond_radius: int = 5,
    n_bits: int = 2048,
) -> np.ndarray:
    """Transforms a list of smiles to an array of morgan fingerprints.

    Args:
        smiles (List[str]): List of smiles
        bond_radius (int, optional): Bond radius to use. Defaults to 5.
        n_bits (int, optional): Number of bits. Defaults to 2048.

    Returns:
        np.ndarray: Numpy array holding the fingerprints

    """
    rdkit_mols = [smiles2mol(m) for m in smiles]
    fps = [
        AllChem.GetMorganFingerprintAsBitVect(mol, radius=bond_radius, nBits=n_bits)  # type: ignore
        for mol in rdkit_mols
    ]

    return np.asarray(fps)

smiles2fragments(smiles, fragments_list=None)

Transforms smiles to an array of fragments.

Parameters:

Name Type Description Default
smiles list[str]

List of smiles

required
fragments_list list[str]

List of desired fragments. Defaults to None.

None

Returns:

Type Description
np.ndarray

Array holding the fragment information.

Source code in bofire/utils/cheminformatics.py
def smiles2fragments(
    smiles: List[str],
    fragments_list: Optional[List[str]] = None,
) -> np.ndarray:
    """Transforms smiles to an array of fragments.

    Args:
        smiles (list[str]): List of smiles
        fragments_list (list[str], optional): List of desired fragments. Defaults to None.

    Returns:
        np.ndarray: Array holding the fragment information.

    """
    rdkit_fragment_list = [
        item
        for item in Descriptors.descList
        if item[0].startswith("fr_")  # type: ignore
    ]
    if fragments_list is None:
        fragments = {d[0]: d[1] for d in rdkit_fragment_list}
    else:
        fragments = {d[0]: d[1] for d in rdkit_fragment_list if d[0] in fragments_list}

    frags = np.zeros((len(smiles), len(fragments)))
    for i, smi in enumerate(smiles):
        mol = smiles2mol(smi)
        features = [fragments[d](mol) for d in fragments]
        frags[i, :] = features

    return frags

smiles2mol(smiles)

Transforms a smiles string to an rdkit mol object.

Parameters:

Name Type Description Default
smiles str

Smiles string.

required

Exceptions:

Type Description
ValueError

If string is not a valid smiles.

Returns:

Type Description
rdkit.Mol

rdkit.mol object

Source code in bofire/utils/cheminformatics.py
def smiles2mol(smiles: str):
    """Transforms a smiles string to an rdkit mol object.

    Args:
        smiles (str): Smiles string.

    Raises:
        ValueError: If string is not a valid smiles.

    Returns:
        rdkit.Mol: rdkit.mol object

    """
    mol = MolFromSmiles(smiles)  # type: ignore
    if mol is None:
        raise ValueError(f"{smiles} is not a valid smiles string.")
    return mol

smiles2mordred(smiles, descriptors_list)

Transforms list of smiles to mordred moelcular descriptors.

Parameters:

Name Type Description Default
smiles List[str]

List of smiles

required
descriptors_list List[str]

List of desired mordred descriptors

required

Returns:

Type Description
np.ndarray

Array holding the mordred moelcular descriptors.

Source code in bofire/utils/cheminformatics.py
def smiles2mordred(smiles: List[str], descriptors_list: List[str]) -> np.ndarray:
    """Transforms list of smiles to mordred moelcular descriptors.

    Args:
        smiles (List[str]): List of smiles
        descriptors_list (List[str]): List of desired mordred descriptors

    Returns:
        np.ndarray: Array holding the mordred moelcular descriptors.

    """
    mols = [smiles2mol(smi) for smi in smiles]

    calc = Calculator(descriptors, ignore_3D=True)  # type: ignore
    calc.descriptors = [d for d in calc.descriptors if str(d) in descriptors_list]

    descriptors_df = calc.pandas(mols)
    nan_list = [
        pd.to_numeric(descriptors_df[col], errors="coerce").isnull().values.any()  # type: ignore
        for col in descriptors_df.columns
    ]
    if any(nan_list):
        raise ValueError(
            f"Found NaN values in descriptors {list(descriptors_df.columns[nan_list])}",  # type: ignore
        )

    return descriptors_df.astype(float).values

doe

compute_generator(n_factors, n_generators)

Computes a generator for a given number of factors and generators.

Parameters:

Name Type Description Default
n_factors int

The number of factors.

required
n_generators int

The number of generators.

required

Returns:

Type Description
str

The generator.

Source code in bofire/utils/doe.py
def compute_generator(n_factors: int, n_generators: int) -> str:
    """Computes a generator for a given number of factors and generators.

    Args:
        n_factors: The number of factors.
        n_generators: The number of generators.

    Returns:
        The generator.

    """
    if n_generators == 0:
        return " ".join(list(string.ascii_lowercase[:n_factors]))
    n_base_factors = n_factors - n_generators
    if n_generators == 1:
        if n_base_factors == 1:
            raise ValueError(
                "Design not possible, as main factors are confounded with each other.",
            )
        return " ".join(
            list(string.ascii_lowercase[:n_base_factors])
            + [string.ascii_lowercase[:n_base_factors]],
        )
    n_base_factors = n_factors - n_generators
    if n_base_factors - 1 < 2:
        raise ValueError(
            "Design not possible, as main factors are confounded with each other.",
        )
    generators = [
        "".join(i)
        for i in (
            itertools.combinations(
                string.ascii_lowercase[:n_base_factors],
                n_base_factors - 1,
            )
        )
    ]
    if len(generators) > n_generators:
        generators = generators[:n_generators]
    elif (n_generators - len(generators) == 1) and (n_base_factors > 1):
        generators += [string.ascii_lowercase[:n_base_factors]]
    elif n_generators - len(generators) >= 1:
        raise ValueError(
            "Design not possible, as main factors are confounded with each other.",
        )
    return " ".join(list(string.ascii_lowercase[:n_base_factors]) + generators)

ff2n(n_factors)

Computes the full factorial design for a given number of factors.

Parameters:

Name Type Description Default
n_factors int

The number of factors.

required

Returns:

Type Description
ndarray

The full factorial design.

Source code in bofire/utils/doe.py
def ff2n(n_factors: int) -> np.ndarray:
    """Computes the full factorial design for a given number of factors.

    Args:
        n_factors: The number of factors.

    Returns:
        The full factorial design.

    """
    return np.array(list(itertools.product([-1, 1], repeat=n_factors)))

fracfact(gen)

Computes the fractional factorial design for a given generator.

Parameters:

Name Type Description Default
gen

The generator.

required

Returns:

Type Description
ndarray

The fractional factorial design.

Source code in bofire/utils/doe.py
def fracfact(gen) -> np.ndarray:
    """Computes the fractional factorial design for a given generator.

    Args:
        gen: The generator.

    Returns:
        The fractional factorial design.

    """
    gen = validate_generator(n_factors=gen.count(" ") + 1, generator=gen)

    generators = [item for item in re.split(r"\-|\s|\+", gen) if item]
    lengths = [len(i) for i in generators]

    # Indices of single letters (main factors)
    idx_main = [i for i, item in enumerate(lengths) if item == 1]

    # Indices of letter combinations.
    idx_combi = [i for i, item in enumerate(generators) if item != 1]

    # Check if there are "-" operators in gen
    idx_negative = [
        i for i, item in enumerate(gen.split(" ")) if item[0] == "-"
    ]  # remove empty strings

    # Fill in design with two level factorial design
    H1 = ff2n(len(idx_main))
    H = np.zeros((H1.shape[0], len(lengths)))
    H[:, idx_main] = H1

    # Recognize combinations and fill in the rest of matrix H2 with the proper
    # products
    for k in idx_combi:
        # For lowercase letters
        xx = np.array([ord(c) for c in generators[k]]) - 97

        H[:, k] = np.prod(H1[:, xx], axis=1)

    # Update design if gen includes "-" operator
    if len(idx_negative) > 0:
        H[:, idx_negative] *= -1

    # Return the fractional factorial design
    return H

get_alias_structure(gen, order=4)

Computes the alias structure of the design matrix. Works only for generators with positive signs.

Parameters:

Name Type Description Default
gen str

The generator.

required
order int

The order up to which the alias structure should be calculated. Defaults to 4.

4

Returns:

Type Description
List[str]

The alias structure of the design matrix.

Source code in bofire/utils/doe.py
def get_alias_structure(gen: str, order: int = 4) -> List[str]:
    """Computes the alias structure of the design matrix. Works only for generators
    with positive signs.

    Args:
        gen: The generator.
        order: The order up to which the alias structure should be calculated. Defaults to 4.

    Returns:
        The alias structure of the design matrix.

    """
    design = fracfact(gen)

    n_experiments, n_factors = design.shape

    all_names = string.ascii_lowercase + "I"
    factors = range(n_factors)
    all_combinations = itertools.chain.from_iterable(
        itertools.combinations(factors, n) for n in range(1, min(n_factors, order) + 1)
    )
    aliases = {n_experiments * "+": [(26,)]}  # 26 is mapped to I

    for combination in all_combinations:
        # positive sign
        contrast = np.prod(
            design[:, combination],
            axis=1,
        )  # this is the product of the combination
        scontrast = "".join(np.where(contrast == 1, "+", "-").tolist())
        aliases[scontrast] = aliases.get(scontrast, [])
        aliases[scontrast].append(combination)  # type: ignore

    aliases_list = []
    for alias in aliases.values():
        aliases_list.append(
            sorted(alias, key=lambda a: (len(a), a)),
        )  # sort by length and then by the combination
    aliases_list = sorted(
        aliases_list,
        key=lambda list: ([len(a) for a in list], list),
    )  # sort by the length of the alias

    aliases_readable = []

    for alias in aliases_list:
        aliases_readable.append(
            " = ".join(["".join([all_names[f] for f in a]) for a in alias]),
        )

    return aliases_readable

get_confounding_matrix(inputs, design, powers=None, interactions=None)

Analyzes the confounding of a design and returns the confounding matrix.

Only takes continuous features into account.

Parameters:

Name Type Description Default
inputs Inputs

Input features.

required
design pd.DataFrame

Design matrix.

required
powers List[int]

List of powers of the individual factors/features that should be considered. Integers has to be larger than 1. Defaults to [].

None
interactions List[int]

List with interaction levels to be considered. Integers has to be larger than 1. Defaults to [2].

None

Returns:

Type Description
_type_

description

Source code in bofire/utils/doe.py
def get_confounding_matrix(
    inputs: Inputs,
    design: pd.DataFrame,
    powers: Optional[List[int]] = None,
    interactions: Optional[List[int]] = None,
):
    """Analyzes the confounding of a design and returns the confounding matrix.

    Only takes continuous features into account.

    Args:
        inputs (Inputs): Input features.
        design (pd.DataFrame): Design matrix.
        powers (List[int], optional): List of powers of the individual factors/features that should be considered.
            Integers has to be larger than 1. Defaults to [].
        interactions (List[int], optional): List with interaction levels to be considered.
            Integers has to be larger than 1. Defaults to [2].

    Returns:
        _type_: _description_

    """
    from sklearn.preprocessing import MinMaxScaler

    if len(inputs.get(CategoricalInput)) > 0:
        warnings.warn("Categorical input features will be ignored.")

    keys = inputs.get_keys(ContinuousInput)
    scaler = MinMaxScaler(feature_range=(-1, 1))
    scaled_design = pd.DataFrame(
        data=scaler.fit_transform(design[keys]),
        columns=keys,
    )

    # add powers
    if powers is not None:
        for p in powers:
            assert p > 1, "Power has to be at least of degree two."
            for key in keys:
                scaled_design[f"{key}**{p}"] = scaled_design[key] ** p

    # add interactions
    if interactions is None:
        interactions = [2]

    for i in interactions:
        assert i > 1, "Interaction has to be at least of degree two."
        assert i < len(keys) + 1, f"Interaction has to be smaller than {len(keys)+1}."
        for combi in itertools.combinations(keys, i):
            scaled_design[":".join(combi)] = scaled_design[list(combi)].prod(axis=1)

    return scaled_design.corr()

get_default_generator(n_factors, n_generators)

Returns the default generator for a given number of factors and generators.

In case the combination is not available, the function will raise an error.

Parameters:

Name Type Description Default
n_factors int

The number of factors.

required
n_generators int

The number of generators.

required

Returns:

Type Description
str

The generator.

Source code in bofire/utils/doe.py
def get_default_generator(n_factors: int, n_generators: int) -> str:
    """Returns the default generator for a given number of factors and generators.

    In case the combination is not available, the function will raise an error.

    Args:
        n_factors: The number of factors.
        n_generators: The number of generators.

    Returns:
        The generator.

    """
    if n_generators == 0:
        return " ".join(list(string.ascii_lowercase[:n_factors]))
    df_generators = default_fracfac_generators
    n_base_factors = n_factors - n_generators
    if df_generators.loc[
        (df_generators.n_factors == n_factors)
        & (df_generators.n_generators == n_generators)
    ].empty:
        raise ValueError("No generator available for the requested combination.")
    generators = (
        df_generators.loc[
            (df_generators.n_factors == n_factors)
            & (df_generators.n_generators == n_generators),
            "generator",
        ]
        .to_list()[0]
        .split(";")
    )
    assert len(generators) == n_generators, "Number of generators does not match."
    generators = [generator.split("=")[1].strip().lower() for generator in generators]
    return " ".join(list(string.ascii_lowercase[:n_base_factors]) + generators)

get_generator(n_factors, n_generators)

Returns a generator for a given number of factors and generators.

If the requested combination is available in the default generators, it will return this one. Otherwise, it will compute a new one using get_bofire_generator.

Parameters:

Name Type Description Default
n_factors int

The number of factors.

required
n_generators int

The number of generators.

required

Returns:

Type Description
str

The generator.

Source code in bofire/utils/doe.py
def get_generator(n_factors: int, n_generators: int) -> str:
    """Returns a generator for a given number of factors and generators.

    If the requested combination is available in the default generators, it will return
    this one. Otherwise, it will compute a new one using `get_bofire_generator`.

    Args:
        n_factors: The number of factors.
        n_generators: The number of generators.

    Returns:
        The generator.

    """
    try:
        return get_default_generator(n_factors, n_generators)
    except ValueError:
        return compute_generator(n_factors, n_generators)

validate_generator(n_factors, generator)

Validates the generator and thows an error if it is not valid.

Source code in bofire/utils/doe.py
def validate_generator(n_factors: int, generator: str) -> str:
    """Validates the generator and thows an error if it is not valid."""
    if len(generator.split(" ")) != n_factors:
        raise ValueError("Generator does not match the number of factors.")
    # clean it and transform it into a list
    generators = [item for item in re.split(r"\-|\s|\+", generator) if item]
    lengths = [len(i) for i in generators]

    # Indices of single letters (main factors)
    idx_main = [i for i, item in enumerate(lengths) if item == 1]

    if len(idx_main) == 0:
        raise ValueError("At least one unconfounded main factor is needed.")

    # Check that single letters (main factors) are unique
    if len(idx_main) != len({generators[i] for i in idx_main}):
        raise ValueError("Main factors are confounded with each other.")

    # Check that single letters (main factors) follow the alphabet
    if (
        "".join(sorted([generators[i] for i in idx_main]))
        != string.ascii_lowercase[: len(idx_main)]
    ):
        raise ValueError(
            f'Use the letters `{" ".join(string.ascii_lowercase[: len(idx_main)])}` for the main factors.',
        )

    # Indices of letter combinations.
    idx_combi = [i for i, item in enumerate(generators) if item != 1]

    # check that main factors come before combinations
    if min(idx_combi) > max(idx_main):
        raise ValueError("Main factors have to come before combinations.")

    # Check that letter combinations are unique
    if len(idx_combi) != len({generators[i] for i in idx_combi}):
        raise ValueError("Generators are not unique.")

    # Check that only letters are used in the combinations that are also single letters (main factors)
    if not all(
        set(item).issubset({generators[i] for i in idx_main})
        for item in [generators[i] for i in idx_combi]
    ):
        raise ValueError("Generators are not valid.")

    return generator

multiobjective

get_ref_point_mask(domain, output_feature_keys=None)

Method to get a mask for the reference points taking into account if we want to maximize or minimize an objective. In case it is maximize the value in the mask is 1, in case we want to minimize it is -1.

Parameters:

Name Type Description Default
domain Domain

Domain for which the mask should be generated.

required
output_feature_keys Optional[list]

Name of output feature keys that should be considered in the mask. Defaults to None.

None

Returns:

Type Description
np.ndarray

description

Source code in bofire/utils/multiobjective.py
def get_ref_point_mask(
    domain: Domain,
    output_feature_keys: Optional[list] = None,
) -> np.ndarray:
    """Method to get a mask for the reference points taking into account if we
    want to maximize or minimize an objective. In case it is maximize the value
    in the mask is 1, in case we want to minimize it is -1.

    Args:
        domain (Domain): Domain for which the mask should be generated.
        output_feature_keys (Optional[list], optional): Name of output feature keys
            that should be considered in the mask. Defaults to None.

    Returns:
        np.ndarray: _description_

    """
    if output_feature_keys is None:
        output_feature_keys = domain.outputs.get_keys_by_objective(
            includes=[MaximizeObjective, MinimizeObjective, CloseToTargetObjective],
        )
    if len(output_feature_keys) < 2:
        raise ValueError("At least two output features have to be provided.")
    mask = []
    for key in output_feature_keys:
        feat = domain.outputs.get_by_key(key)
        if isinstance(feat.objective, MaximizeObjective):
            mask.append(1.0)
        elif isinstance(feat.objective, MinimizeObjective) or isinstance(
            feat.objective,
            CloseToTargetObjective,
        ):
            mask.append(-1.0)
        else:
            raise ValueError(
                "Only `MaximizeObjective` and `MinimizeObjective` supported",
            )
    return np.array(mask)

naming_conventions

get_column_names(outputs)

Specifies column names for given Outputs type.

Parameters:

Name Type Description Default
outputs Outputs

The Outputs object containing the individual outputs.

required

Returns:

Type Description
Tuple[List[str], List[str]]

A tuple containing the prediction column names and the standard deviation column names

Source code in bofire/utils/naming_conventions.py
def get_column_names(outputs: Outputs) -> Tuple[List[str], List[str]]:
    """Specifies column names for given Outputs type.

    Args:
        outputs (Outputs): The Outputs object containing the individual outputs.

    Returns:
        Tuple[List[str], List[str]]: A tuple containing the prediction column names and the standard deviation column names

    """
    pred_cols, sd_cols = [], []
    for featkey in outputs.get_keys(CategoricalOutput):
        pred_cols = pred_cols + [
            f"{featkey}_{cat}_prob"
            for cat in outputs.get_by_key(featkey).categories  # type: ignore
        ]
        sd_cols = sd_cols + [
            f"{featkey}_{cat}_sd"
            for cat in outputs.get_by_key(featkey).categories  # type: ignore
        ]
    for featkey in outputs.get_keys(ContinuousOutput):
        pred_cols = pred_cols + [f"{featkey}_pred"]
        sd_cols = sd_cols + [f"{featkey}_sd"]

    return pred_cols, sd_cols

postprocess_categorical_predictions(predictions, outputs)

Postprocess categorical predictions by finding the maximum probability location

Parameters:

Name Type Description Default
predictions pd.DataFrame

The dataframe containing the predictions.

required
outputs Outputs

The Outputs object containing the individual outputs.

required

Returns:

Type Description
predictions (pd.DataFrame)

The (potentially modified) original dataframe with categorical predictions added

Source code in bofire/utils/naming_conventions.py
def postprocess_categorical_predictions(
    predictions: pd.DataFrame,
    outputs: Outputs,
) -> pd.DataFrame:
    """Postprocess categorical predictions by finding the maximum probability location

    Args:
        predictions (pd.DataFrame): The dataframe containing the predictions.
        outputs (Outputs): The Outputs object containing the individual outputs.

    Returns:
        predictions (pd.DataFrame): The (potentially modified) original dataframe with categorical predictions added

    """
    for feat in outputs.get():
        if isinstance(feat, CategoricalOutput):
            predictions.insert(
                loc=0,
                column=f"{feat.key}_pred",
                value=predictions.filter(regex=f"{feat.key}(.*)_prob")
                .idxmax(1)
                .str.replace(f"{feat.key}_", "")
                .str.replace("_prob", "")
                .values,  # type: ignore
            )
            predictions.insert(
                loc=1,
                column=f"{feat.key}_sd",
                value=0.0,
            )
    return predictions

reduce

AffineTransform

Class to switch back and forth from the reduced to the original domain.

Source code in bofire/utils/reduce.py
class AffineTransform:
    """Class to switch back and forth from the reduced to the original domain."""

    def __init__(self, equalities: List[Tuple[str, List[str], List[float]]]):
        """Initializes a `AffineTransformation` object.

        Args:
            equalities (List[Tuple[str,List[str],List[float]]]): List of equalities. Every equality
                is defined as a tuple, in which the first entry is the key of the reduced feature, the second
                one is a list of feature keys that can be used to compute the feature and the third list of floats
                are the corresponding coefficients.

        """
        self.equalities = equalities

    def augment_data(self, data: pd.DataFrame) -> pd.DataFrame:
        """Restore the eliminated features in a dataframe

        Args:
            data (pd.DataFrame): Dataframe that should be restored.

        Returns:
            pd.DataFrame: Restored dataframe

        """
        if len(self.equalities) == 0:
            return data
        data = data.copy()
        for name_lhs, names_rhs, coeffs in self.equalities:
            data[name_lhs] = coeffs[-1]
            for i, name in enumerate(names_rhs):
                data[name_lhs] += coeffs[i] * data[name]
        return data

    def drop_data(self, data: pd.DataFrame) -> pd.DataFrame:
        """Drop eliminated features from a dataframe.

        Args:
            data (pd.DataFrame): Dataframe with features to be dropped.

        Returns:
            pd.DataFrame: Reduced dataframe.

        """
        if len(self.equalities) == 0:
            return data
        drop = []
        for name_lhs, _, _ in self.equalities:
            if name_lhs in data.columns:
                drop.append(name_lhs)
        return data.drop(columns=drop)

__init__(self, equalities) special

Initializes a AffineTransformation object.

Parameters:

Name Type Description Default
equalities List[Tuple[str,List[str],List[float]]]

List of equalities. Every equality is defined as a tuple, in which the first entry is the key of the reduced feature, the second one is a list of feature keys that can be used to compute the feature and the third list of floats are the corresponding coefficients.

required
Source code in bofire/utils/reduce.py
def __init__(self, equalities: List[Tuple[str, List[str], List[float]]]):
    """Initializes a `AffineTransformation` object.

    Args:
        equalities (List[Tuple[str,List[str],List[float]]]): List of equalities. Every equality
            is defined as a tuple, in which the first entry is the key of the reduced feature, the second
            one is a list of feature keys that can be used to compute the feature and the third list of floats
            are the corresponding coefficients.

    """
    self.equalities = equalities

augment_data(self, data)

Restore the eliminated features in a dataframe

Parameters:

Name Type Description Default
data pd.DataFrame

Dataframe that should be restored.

required

Returns:

Type Description
pd.DataFrame

Restored dataframe

Source code in bofire/utils/reduce.py
def augment_data(self, data: pd.DataFrame) -> pd.DataFrame:
    """Restore the eliminated features in a dataframe

    Args:
        data (pd.DataFrame): Dataframe that should be restored.

    Returns:
        pd.DataFrame: Restored dataframe

    """
    if len(self.equalities) == 0:
        return data
    data = data.copy()
    for name_lhs, names_rhs, coeffs in self.equalities:
        data[name_lhs] = coeffs[-1]
        for i, name in enumerate(names_rhs):
            data[name_lhs] += coeffs[i] * data[name]
    return data

drop_data(self, data)

Drop eliminated features from a dataframe.

Parameters:

Name Type Description Default
data pd.DataFrame

Dataframe with features to be dropped.

required

Returns:

Type Description
pd.DataFrame

Reduced dataframe.

Source code in bofire/utils/reduce.py
def drop_data(self, data: pd.DataFrame) -> pd.DataFrame:
    """Drop eliminated features from a dataframe.

    Args:
        data (pd.DataFrame): Dataframe with features to be dropped.

    Returns:
        pd.DataFrame: Reduced dataframe.

    """
    if len(self.equalities) == 0:
        return data
    drop = []
    for name_lhs, _, _ in self.equalities:
        if name_lhs in data.columns:
            drop.append(name_lhs)
    return data.drop(columns=drop)

adjust_boundary(feature, coef, rhs)

Adjusts the boundaries of a feature.

Parameters:

Name Type Description Default
feature ContinuousInput

Feature to be adjusted.

required
coef float

Coefficient.

required
rhs float

Right-hand-side of the constraint.

required
Source code in bofire/utils/reduce.py
def adjust_boundary(feature: ContinuousInput, coef: float, rhs: float):
    """Adjusts the boundaries of a feature.

    Args:
        feature (ContinuousInput): Feature to be adjusted.
        coef (float): Coefficient.
        rhs (float): Right-hand-side of the constraint.

    """
    boundary = rhs / coef
    if coef > 0:
        if boundary > feature.lower_bound:
            feature.bounds = (boundary, feature.upper_bound)
    elif boundary < feature.upper_bound:
        feature.bounds = (feature.lower_bound, boundary)

check_domain_for_reduction(domain)

Check if the reduction can be applied or if a trivial case is present.

Parameters:

Name Type Description Default
domain Domain

Domain to be checked.

required

Returns:

Type Description
bool

True if reducable, else False.

Source code in bofire/utils/reduce.py
def check_domain_for_reduction(domain: Domain) -> bool:
    """Check if the reduction can be applied or if a trivial case is present.

    Args:
        domain (Domain): Domain to be checked.

    Returns:
        bool: True if reducable, else False.

    """
    # are there any constraints?
    if len(domain.constraints) == 0:
        return False

    # are there any linear equality constraints?
    linear_equalities = domain.constraints.get(LinearEqualityConstraint)
    if len(linear_equalities) == 0:
        return False

    # are there no NChooseKConstraint constraints?
    if len(domain.constraints.get([NChooseKConstraint])) > 0:
        return False

    # are there continuous inputs
    continuous_inputs = domain.inputs.get(ContinuousInput)
    if len(continuous_inputs) == 0:
        return False

    # check that equality constraints only contain continuous inputs
    for c in linear_equalities:
        assert isinstance(c, LinearConstraint)
        for feat in c.features:
            if feat not in domain.inputs.get_keys(ContinuousInput):
                return False
    return True

check_existence_of_solution(A_aug)

Given an augmented coefficient matrix this function determines the existence (and uniqueness) of solution using the rank theorem.

Source code in bofire/utils/reduce.py
def check_existence_of_solution(A_aug):
    """Given an augmented coefficient matrix this function determines the existence (and uniqueness) of solution using the rank theorem."""
    A = A_aug[:, :-1]
    b = A_aug[:, -1]
    len_inputs = np.shape(A)[1]

    # catch special cases
    rk_A_aug = np.linalg.matrix_rank(A_aug)
    rk_A = np.linalg.matrix_rank(A)

    if rk_A == rk_A_aug:
        if rk_A < len_inputs:
            return  # all good
        x = np.linalg.solve(A, b)
        raise Exception(
            f"There is a unique solution x for the linear equality constraints: x={x}",
        )
    if rk_A < rk_A_aug:
        raise Exception(
            "There is no solution fulfilling the linear equality constraints.",
        )

reduce_domain(domain)

Reduce a domain with linear equality constraints to a subdomain where linear equality constraints are eliminated.

Parameters:

Name Type Description Default
domain Domain

Domain to be reduced.

required

Returns:

Type Description
Tuple[Domain, AffineTransform]

reduced domain and the according transformation to switch between the reduced and original domain.

Source code in bofire/utils/reduce.py
def reduce_domain(domain: Domain) -> Tuple[Domain, AffineTransform]:
    """Reduce a domain with linear equality constraints to a subdomain where linear equality constraints are eliminated.

    Args:
        domain (Domain): Domain to be reduced.

    Returns:
        Tuple[Domain, AffineTransform]: reduced domain and the according transformation to switch between the
            reduced and original domain.

    """
    # check if the domain can be reduced
    if not check_domain_for_reduction(domain):
        return domain, AffineTransform([])

    # find linear equality constraints
    linear_equalities = domain.constraints.get(LinearEqualityConstraint)
    other_constraints = domain.constraints.get(
        Constraint,
        excludes=[LinearEqualityConstraint],
    )

    # only consider continuous inputs
    continuous_inputs = [
        cast(ContinuousInput, f) for f in domain.inputs.get(ContinuousInput)
    ]
    other_inputs = domain.inputs.get(Input, excludes=[ContinuousInput])

    # assemble Matrix A from equality constraints
    N = len(linear_equalities)
    M = len(continuous_inputs) + 1
    names = np.concatenate(([feat.key for feat in continuous_inputs], ["rhs"]))

    A_aug = pd.DataFrame(data=np.zeros(shape=(N, M)), columns=names)

    for i in range(len(linear_equalities)):
        c = linear_equalities[i]
        assert isinstance(c, LinearEqualityConstraint)
        A_aug.loc[i, c.features] = c.coefficients
        A_aug.loc[i, "rhs"] = c.rhs
    A_aug = A_aug.values

    # catch special cases
    check_existence_of_solution(A_aug)

    # bring A_aug to reduced row-echelon form
    A_aug_rref, pivots = rref(A_aug)
    pivots = np.array(pivots)
    A_aug_rref = np.array(A_aug_rref).astype(np.float64)

    # formulate box bounds as linear inequality constraints in matrix form
    B = np.zeros(shape=(2 * (M - 1), M))
    B[: M - 1, : M - 1] = np.eye(M - 1)
    B[M - 1 :, : M - 1] = -np.eye(M - 1)

    B[: M - 1, -1] = np.array([feat.upper_bound for feat in continuous_inputs])
    B[M - 1 :, -1] = -1.0 * np.array([feat.lower_bound for feat in continuous_inputs])

    # eliminate columns with pivot element
    for i in range(len(pivots)):
        p = pivots[i]
        B[p, :] -= A_aug_rref[i, :]
        B[p + M - 1, :] += A_aug_rref[i, :]

    # build up reduced domain
    _domain = Domain.model_construct(
        # _fields_set = {"inputs", "outputs", "constraints"}
        inputs=deepcopy(other_inputs),
        outputs=deepcopy(domain.outputs),
        constraints=deepcopy(other_constraints),
    )
    new_inputs = [
        deepcopy(feat) for i, feat in enumerate(continuous_inputs) if i not in pivots
    ]
    all_inputs = _domain.inputs + new_inputs
    assert isinstance(all_inputs, Inputs)
    _domain.inputs.features = all_inputs.features

    constraints: List[AnyConstraint] = []
    for i in pivots:
        # reduce equation system of upper bounds
        ind = np.where(B[i, :-1] != 0)[0]
        if len(ind) > 0 and B[i, -1] < np.inf:
            if len(list(names[ind])) > 1:
                c = LinearInequalityConstraint.from_greater_equal(
                    features=list(names[ind]),
                    coefficients=(-1.0 * B[i, ind]).tolist(),
                    rhs=B[i, -1] * -1.0,
                )
                constraints.append(c)
            else:
                key = names[ind][0]
                feat = cast(ContinuousInput, _domain.inputs.get_by_key(key))
                adjust_boundary(feat, (-1.0 * B[i, ind])[0], B[i, -1] * -1.0)
        elif B[i, -1] < -1e-16:
            raise Exception("There is no solution that fulfills the constraints.")

        # reduce equation system of lower bounds
        ind = np.where(B[i + M - 1, :-1] != 0)[0]
        if len(ind) > 0 and B[i + M - 1, -1] < np.inf:
            if len(list(names[ind])) > 1:
                c = LinearInequalityConstraint.from_greater_equal(
                    features=list(names[ind]),
                    coefficients=(-1.0 * B[i + M - 1, ind]).tolist(),
                    rhs=B[i + M - 1, -1] * -1.0,
                )
                constraints.append(c)
            else:
                key = names[ind][0]
                feat = cast(ContinuousInput, _domain.inputs.get_by_key(key))
                adjust_boundary(
                    feat,
                    (-1.0 * B[i + M - 1, ind])[0],
                    B[i + M - 1, -1] * -1.0,
                )
        elif B[i + M - 1, -1] < -1e-16:
            raise Exception("There is no solution that fulfills the constraints.")

    if len(constraints) > 0:
        _domain.constraints.constraints = _domain.constraints.constraints + constraints  # type: ignore

    # assemble equalities
    _equalities = []
    for i in range(len(pivots)):
        name_lhs = names[pivots[i]]
        names_rhs = []
        coeffs = []

        for j in range(len(names) - 1):
            if A_aug_rref[i, j] != 0 and j != pivots[i]:
                coeffs.append(-A_aug_rref[i, j])
                names_rhs.append(names[j])

        coeffs.append(A_aug_rref[i, -1])

        _equalities.append((name_lhs, names_rhs, coeffs))

    trafo = AffineTransform(_equalities)
    # remove remaining dependencies of eliminated inputs from the problem
    _domain = remove_eliminated_inputs(_domain, trafo)
    return _domain, trafo

remove_eliminated_inputs(domain, transform)

Eliminates remaining occurrences of eliminated inputs in linear constraints.

Parameters:

Name Type Description Default
domain Domain

Domain in which the linear constraints should be purged.

required
transform AffineTransform

Affine transformation object that defines the obsolete features.

required

Exceptions:

Type Description
ValueError

If feature occurs in a constraint different from a linear one.

Returns:

Type Description
Domain

Purged domain.

Source code in bofire/utils/reduce.py
def remove_eliminated_inputs(domain: Domain, transform: AffineTransform) -> Domain:
    """Eliminates remaining occurrences of eliminated inputs in linear constraints.

    Args:
        domain (Domain): Domain in which the linear constraints should be purged.
        transform (AffineTransform): Affine transformation object that defines the obsolete features.

    Raises:
        ValueError: If feature occurs in a constraint different from a linear one.

    Returns:
        Domain: Purged domain.

    """
    inputs_names = domain.inputs.get_keys()
    M = len(inputs_names)

    # write the equalities for the backtransformation into one matrix
    inputs_dict = {inputs_names[i]: i for i in range(M)}

    # build up dict from domain.equalities e.g. {"xi1": [coeff(xj1), ..., coeff(xjn)], ... "xik":...}
    coeffs_dict = {}
    for e in transform.equalities:
        coeffs = np.zeros(M + 1)
        for j, name in enumerate(e[1]):
            coeffs[inputs_dict[name]] = e[2][j]
        coeffs[-1] = e[2][-1]
        coeffs_dict[e[0]] = coeffs

    constraints = []
    for c in domain.constraints.get():
        # Nonlinear constraints not supported
        if not isinstance(c, LinearConstraint):
            raise ValueError(
                "Elimination of variables is only supported for LinearEquality and LinearInequality constraints.",
            )

        # no changes, if the constraint does not contain eliminated inputs
        if all(name in inputs_names for name in c.features):
            constraints.append(c)

        # remove inputs from the constraint that were eliminated from the inputs before
        else:
            totally_removed = False
            _features = np.array(inputs_names)
            _rhs = c.rhs

            # create new lhs and rhs from the old one and knowledge from problem._equalities
            _coefficients = np.zeros(M)
            for j, name in enumerate(c.features):
                if name in inputs_names:
                    _coefficients[inputs_dict[name]] += c.coefficients[j]
                else:
                    _coefficients += c.coefficients[j] * coeffs_dict[name][:-1]
                    _rhs -= c.coefficients[j] * coeffs_dict[name][-1]

            _features = _features[np.abs(_coefficients) > 1e-16]
            _coefficients = _coefficients[np.abs(_coefficients) > 1e-16]
            _c = None
            if isinstance(c, LinearEqualityConstraint):
                if len(_features) > 1:
                    _c = LinearEqualityConstraint(
                        features=_features.tolist(),
                        coefficients=_coefficients.tolist(),
                        rhs=_rhs,
                    )
                elif len(_features) == 0:
                    totally_removed = True
                else:
                    feat: ContinuousInput = ContinuousInput(
                        **domain.inputs.get_by_key(_features[0]).model_dump(),
                    )
                    feat.bounds = (_coefficients[0], _coefficients[0])
                    totally_removed = True
            elif len(_features) > 1:
                _c = LinearInequalityConstraint(
                    features=_features.tolist(),
                    coefficients=_coefficients.tolist(),
                    rhs=_rhs,
                )
            elif len(_features) == 0:
                totally_removed = True
            else:
                feat = cast(ContinuousInput, domain.inputs.get_by_key(_features[0]))
                adjust_boundary(feat, _coefficients[0], _rhs)
                totally_removed = True

            # check if constraint is always fulfilled/not fulfilled
            if not totally_removed:
                assert _c is not None
                if len(_c.features) == 0 and _c.rhs >= 0:
                    pass
                elif len(_c.features) == 0 and _c.rhs < 0:
                    raise Exception("Linear constraints cannot be fulfilled.")
                elif np.isinf(_c.rhs):
                    pass
                else:
                    constraints.append(_c)
    domain.constraints = Constraints(constraints=constraints)
    return domain

rref(A, tol=1e-08)

Computes the reduced row echelon form of a Matrix

Parameters:

Name Type Description Default
A ndarray

2d array representing a matrix.

required
tol float

tolerance for rounding to 0. Defaults to 1e-8.

1e-08

Returns:

Type Description
Tuple[numpy.ndarray, List[int]]

(A_rref, pivots), where A_rref is the reduced row echelon form of A and pivots is a numpy array containing the pivot columns of A_rref

Source code in bofire/utils/reduce.py
def rref(A: np.ndarray, tol: float = 1e-8) -> Tuple[np.ndarray, List[int]]:
    """Computes the reduced row echelon form of a Matrix

    Args:
        A (ndarray): 2d array representing a matrix.
        tol (float, optional): tolerance for rounding to 0. Defaults to 1e-8.

    Returns:
        (A_rref, pivots), where A_rref is the reduced row echelon form of A and pivots
        is a numpy array containing the pivot columns of A_rref

    """
    A = np.array(A, dtype=np.float64)
    n, m = np.shape(A)

    col = 0
    row = 0
    pivots = []

    for col in range(m):
        # does a pivot element exist?
        if all(np.abs(A[row:, col]) < tol):
            pass
        # if yes: start elimination
        else:
            pivots.append(col)
            max_row = np.argmax(np.abs(A[row:, col])) + row
            # switch to most stable row
            A[[row, max_row], :] = A[[max_row, row], :]
            # normalize row
            A[row, :] /= A[row, col]
            # eliminate other elements from column
            for r in range(n):
                if r != row:
                    A[r, :] -= A[r, col] / A[row, col] * A[row, :]
            row += 1

    prec = int(-np.log10(tol))
    return np.round(A, prec), pivots

subdomain

get_subdomain(domain, feature_keys)

Removes all features not defined as argument creating a subdomain of the provided domain

Parameters:

Name Type Description Default
domain Domain

the original domain wherefrom a subdomain should be created

required
feature_keys List

List of features that shall be included in the subdomain

required

Exceptions:

Type Description
Assert

when in total less than 2 features are provided

ValueError

when a provided feature key is not present in the provided domain

Assert

when no output feature is provided

Assert

when no input feature is provided

ValueError

description

Returns:

Type Description
Domain

A new domain containing only parts of the original domain

Source code in bofire/utils/subdomain.py
def get_subdomain(
    domain: Domain,
    feature_keys: List,
) -> Domain:
    """Removes all features not defined as argument creating a subdomain of the provided domain

    Args:
        domain (Domain): the original domain wherefrom a subdomain should be created
        feature_keys (List): List of features that shall be included in the subdomain

    Raises:
        Assert: when in total less than 2 features are provided
        ValueError: when a provided feature key is not present in the provided domain
        Assert: when no output feature is provided
        Assert: when no input feature is provided
        ValueError: _description_

    Returns:
        Domain: A new domain containing only parts of the original domain

    """
    assert len(feature_keys) >= 2, "At least two features have to be provided."
    outputs = []
    inputs = []
    for key in feature_keys:
        try:
            feat = (domain.inputs + domain.outputs).get_by_key(key)
        except KeyError:
            raise ValueError(f"Feature {key} not present in domain.")
        if isinstance(feat, Input):
            inputs.append(feat)
        else:
            outputs.append(feat)
    assert len(outputs) > 0, "At least one output feature has to be provided."
    assert len(inputs) > 0, "At least one input feature has to be provided."
    inputs = Inputs(features=inputs)
    outputs = Outputs(features=outputs)
    # loop over constraints and make sure that all features used in constraints are in the input_feature_keys
    for c in domain.constraints:
        for key in c.features:
            if key not in inputs.get_keys():
                raise ValueError(
                    f"Removed input feature {key} is used in a constraint.",
                )
    subdomain = deepcopy(domain)
    subdomain.inputs = inputs
    subdomain.outputs = outputs
    return subdomain

torch_tools

InterpolateTransform (InputTransform, Module)

Botorch input transform that interpolates values between given x and y values.

Source code in bofire/utils/torch_tools.py
class InterpolateTransform(InputTransform, Module):
    """Botorch input transform that interpolates values between given x and y values."""

    def __init__(
        self,
        new_x: Tensor,
        idx_x: List[int],
        idx_y: List[int],
        prepend_x: Tensor,
        prepend_y: Tensor,
        append_x: Tensor,
        append_y: Tensor,
        keep_original: bool = False,
        transform_on_train: bool = True,
        transform_on_eval: bool = True,
        transform_on_fantasize: bool = True,
    ):
        super().__init__()
        if len(set(idx_x + idx_y)) != len(idx_x) + len(idx_y):
            raise ValueError("Indices are not unique.")

        self.idx_x = torch.as_tensor(idx_x, dtype=torch.long)
        self.idx_y = torch.as_tensor(idx_y, dtype=torch.long)

        self.transform_on_train = transform_on_train
        self.transform_on_eval = transform_on_eval
        self.transform_on_fantasize = transform_on_fantasize
        self.new_x = new_x

        self.prepend_x = prepend_x
        self.prepend_y = prepend_y
        self.append_x = append_x
        self.append_y = append_y

        self.keep_original = keep_original

        if len(self.idx_x) + len(self.prepend_x) + len(self.append_x) != len(
            self.idx_y,
        ) + len(self.prepend_y) + len(self.append_y):
            raise ValueError("The number of x and y indices must be equal.")

    def _to(self, X: Tensor) -> None:
        self.new_x = self.coefficient.to(X)

    def append(self, X: Tensor, values: Tensor) -> Tensor:
        shape = X.shape
        values_reshaped = values.view(*([1] * (len(shape) - 1)), -1)
        values_expanded = values_reshaped.expand(*shape[:-1], -1).to(X)
        return torch.cat([X, values_expanded], dim=-1)

    def prepend(self, X: Tensor, values: Tensor) -> Tensor:
        shape = X.shape
        values_reshaped = values.view(*([1] * (len(shape) - 1)), -1)
        values_expanded = values_reshaped.expand(*shape[:-1], -1).to(X)
        return torch.cat([values_expanded, X], dim=-1)

    def transform(self, X: Tensor):
        shapeX = X.shape

        x = X[..., self.idx_x]
        x = self.prepend(x, self.prepend_x)
        x = self.append(x, self.append_x)

        y = X[..., self.idx_y]
        y = self.prepend(y, self.prepend_y)
        y = self.append(y, self.append_y)

        if X.dim() == 3:
            x = x.reshape((shapeX[0] * shapeX[1], x.shape[-1]))
            y = y.reshape((shapeX[0] * shapeX[1], y.shape[-1]))

        new_x = self.new_x.expand(x.shape[0], -1)
        new_y = torch.vmap(interp1d)(x, y, new_x)

        if X.dim() == 3:
            new_y = new_y.reshape((shapeX[0], shapeX[1], new_y.shape[-1]))

        if self.keep_original:
            return torch.cat([new_y, X], dim=-1)

        return new_y

transform(self, X)

Transform the inputs to a model.

Parameters:

Name Type Description Default
X Tensor

A batch_shape x n x d-dim tensor of inputs.

required

Returns:

Type Description

A batch_shape x n x d-dim tensor of transformed inputs.

Source code in bofire/utils/torch_tools.py
def transform(self, X: Tensor):
    shapeX = X.shape

    x = X[..., self.idx_x]
    x = self.prepend(x, self.prepend_x)
    x = self.append(x, self.append_x)

    y = X[..., self.idx_y]
    y = self.prepend(y, self.prepend_y)
    y = self.append(y, self.append_y)

    if X.dim() == 3:
        x = x.reshape((shapeX[0] * shapeX[1], x.shape[-1]))
        y = y.reshape((shapeX[0] * shapeX[1], y.shape[-1]))

    new_x = self.new_x.expand(x.shape[0], -1)
    new_y = torch.vmap(interp1d)(x, y, new_x)

    if X.dim() == 3:
        new_y = new_y.reshape((shapeX[0], shapeX[1], new_y.shape[-1]))

    if self.keep_original:
        return torch.cat([new_y, X], dim=-1)

    return new_y

constrained_objective2botorch(idx, objective, x_adapt, eps=1e-08)

Create a callable that can be used by botorch.utils.objective.apply_constraints to setup output constrained optimizations.

Parameters:

Name Type Description Default
idx int

Index of the constraint objective in the list of outputs.

required
objective BotorchConstrainedObjective

The objective that should be transformed.

required
x_adapt Optional[Tensor]

The tensor that should be used to adapt the objective, for example in case of a moving turning point in the MovingMaximizeSigmoidObjective.

required
eps float

Small value to avoid numerical instabilities in case of the ConstrainedCategoricalObjective. Defaults to 1e-8.

1e-08

Returns:

Type Description
Tuple[List[Callable[[Tensor], Tensor]], List[float], int]

List of callables that can be used by botorch for setting up the constrained objective, list of the corresponding botorch eta values, final index used by the method (to track for categorical variables)

Source code in bofire/utils/torch_tools.py
def constrained_objective2botorch(
    idx: int,
    objective: ConstrainedObjective,
    x_adapt: Optional[Tensor],
    eps: float = 1e-8,
) -> Tuple[List[Callable[[Tensor], Tensor]], List[float], int]:
    """Create a callable that can be used by `botorch.utils.objective.apply_constraints`
    to setup output constrained optimizations.

    Args:
        idx (int): Index of the constraint objective in the list of outputs.
        objective (BotorchConstrainedObjective): The objective that should be transformed.
        x_adapt (Optional[Tensor]): The tensor that should be used to adapt the objective,
            for example in case of a moving turning point in the `MovingMaximizeSigmoidObjective`.
        eps (float, optional): Small value to avoid numerical instabilities in case of the `ConstrainedCategoricalObjective`.
            Defaults to 1e-8.

    Returns:
        Tuple[List[Callable[[Tensor], Tensor]], List[float], int]: List of callables that can be used by botorch for setting up the constrained objective,
            list of the corresponding botorch eta values, final index used by the method (to track for categorical variables)

    """
    assert isinstance(
        objective,
        ConstrainedObjective,
    ), "Objective is not a `ConstrainedObjective`."
    if isinstance(objective, MaximizeSigmoidObjective):
        return (
            [lambda Z: (Z[..., idx] - objective.tp) * -1.0],
            [1.0 / objective.steepness],
            idx + 1,
        )
    if isinstance(objective, MovingMaximizeSigmoidObjective):
        assert x_adapt is not None
        tp = x_adapt.max().item() + objective.tp
        return (
            [lambda Z: (Z[..., idx] - tp) * -1.0],
            [1.0 / objective.steepness],
            idx + 1,
        )
    if isinstance(objective, MinimizeSigmoidObjective):
        return (
            [lambda Z: (Z[..., idx] - objective.tp)],
            [1.0 / objective.steepness],
            idx + 1,
        )
    if isinstance(objective, TargetObjective):
        return (
            [
                lambda Z: (Z[..., idx] - (objective.target_value - objective.tolerance))
                * -1.0,
                lambda Z: (
                    Z[..., idx] - (objective.target_value + objective.tolerance)
                ),
            ],
            [1.0 / objective.steepness, 1.0 / objective.steepness],
            idx + 1,
        )
    if isinstance(objective, ConstrainedCategoricalObjective):
        # The output of a categorical objective has final dim `c` where `c` is number of classes
        # Pass in the expected acceptance probability and perform an inverse sigmoid to attain the original probabilities
        return (
            [
                lambda Z: torch.log(
                    1
                    / torch.clamp(
                        (
                            Z[..., idx : idx + len(objective.desirability)]
                            * torch.tensor(objective.desirability).to(**tkwargs)
                        ).sum(-1),
                        min=eps,
                        max=1 - eps,
                    )
                    - 1,
                ),
            ],
            [1.0],
            idx + len(objective.desirability),
        )
    raise ValueError(f"Objective {objective.__class__.__name__} not known.")

get_initial_conditions_generator(strategy, transform_specs, ask_options=None, sequential=True)

Takes a strategy object and returns a callable which uses this strategy to return a generator callable which can be used in botorchsgen_batch_initial_conditions` to generate samples.

Parameters:

Name Type Description Default
strategy Strategy

Strategy that should be used to generate samples.

required
transform_specs Dict

Dictionary indicating how the samples should be transformed.

required
ask_options Dict

Dictionary of keyword arguments that are passed to the ask method of the strategy. Defaults to {}.

None
sequential bool

If True, samples for every q-batch are generate independent from each other. If False, the n x q samples are generated at once.

True

Returns:

Type Description
Callable[[int, int, int], Tensor]

Callable that can be passed to batch_initial_conditions.

Source code in bofire/utils/torch_tools.py
def get_initial_conditions_generator(
    strategy: Strategy,
    transform_specs: Dict,
    ask_options: Optional[Dict] = None,
    sequential: bool = True,
) -> Callable[[int, int, int], Tensor]:
    """Takes a strategy object and returns a callable which uses this
    strategy to return a generator callable which can be used in botorch`s
    `gen_batch_initial_conditions` to generate samples.

    Args:
        strategy (Strategy): Strategy that should be used to generate samples.
        transform_specs (Dict): Dictionary indicating how the samples should be
            transformed.
        ask_options (Dict, optional): Dictionary of keyword arguments that are
            passed to the `ask` method of the strategy. Defaults to {}.
        sequential (bool, optional): If True, samples for every q-batch are
            generate independent from each other. If False, the `n x q` samples
            are generated at once.

    Returns:
        Callable[[int, int, int], Tensor]: Callable that can be passed to
            `batch_initial_conditions`.

    """
    if ask_options is None:
        ask_options = {}

    def generator(n: int, q: int, seed: int) -> Tensor:
        if sequential:
            initial_conditions = []
            for _ in range(n):
                candidates = strategy.ask(q, **ask_options)
                # transform it
                transformed_candidates = strategy.domain.inputs.transform(
                    candidates,
                    transform_specs,
                )
                # transform to tensor
                initial_conditions.append(
                    torch.from_numpy(transformed_candidates.values).to(**tkwargs),
                )
            return torch.stack(initial_conditions, dim=0)
        candidates = strategy.ask(n * q, **ask_options)
        # transform it
        transformed_candidates = strategy.domain.inputs.transform(
            candidates,
            transform_specs,
        )
        return (
            torch.from_numpy(transformed_candidates.values)
            .to(**tkwargs)
            .reshape(n, q, transformed_candidates.shape[1])
        )

    return generator

get_interpoint_constraints(domain, n_candidates)

Converts interpoint equality constraints to linear equality constraints, that can be processed by botorch. For more information, see the docstring of optimize_acqf in botorch (https://github.com/pytorch/botorch/blob/main/botorch/optim/optimize.py).

Parameters:

Name Type Description Default
domain Domain

Optimization problem definition.

required
n_candidates int

Number of candidates that should be requested.

required

Returns:

Type Description
List[Tuple[Tensor, Tensor, float]]

List of tuples, each tuple consists of a tensor with the feature indices, coefficients and a float for the rhs.

Source code in bofire/utils/torch_tools.py
def get_interpoint_constraints(
    domain: Domain,
    n_candidates: int,
) -> List[Tuple[Tensor, Tensor, float]]:
    """Converts interpoint equality constraints to linear equality constraints,
        that can be processed by botorch. For more information, see the docstring
        of `optimize_acqf` in botorch
        (https://github.com/pytorch/botorch/blob/main/botorch/optim/optimize.py).

    Args:
        domain (Domain): Optimization problem definition.
        n_candidates (int): Number of candidates that should be requested.

    Returns:
        List[Tuple[Tensor, Tensor, float]]: List of tuples, each tuple consists
            of a tensor with the feature indices, coefficients and a float for the rhs.

    """
    constraints = []
    if n_candidates == 1:
        return constraints
    for constraint in domain.constraints.get(InterpointEqualityConstraint):
        assert isinstance(constraint, InterpointEqualityConstraint)
        coefficients = torch.tensor([1.0, -1.0]).to(**tkwargs)
        feat_idx = domain.inputs.get_keys(Input).index(constraint.feature)
        feat = domain.inputs.get_by_key(constraint.feature)
        assert isinstance(feat, ContinuousInput)
        if feat.is_fixed():
            continue
        multiplicity = constraint.multiplicity or n_candidates
        for i in range(math.ceil(n_candidates / multiplicity)):
            all_indices = torch.arange(
                i * multiplicity,
                min((i + 1) * multiplicity, n_candidates),
            )
            for k in range(len(all_indices) - 1):
                indices = torch.tensor(
                    [[all_indices[0], feat_idx], [all_indices[k + 1], feat_idx]],
                    dtype=torch.int64,
                )
                constraints.append((indices, coefficients, 0.0))
    return constraints

get_linear_constraints(domain, constraint, unit_scaled=False)

Converts linear constraints to the form required by BoTorch.

Parameters:

Name Type Description Default
domain Domain

Optimization problem definition.

required
constraint Union[Type[bofire.data_models.constraints.linear.LinearEqualityConstraint], Type[bofire.data_models.constraints.linear.LinearInequalityConstraint]]

Type of constraint that should be converted.

required
unit_scaled bool

If True, transforms constraints by assuming that the bound for the continuous features are [0,1]. Defaults to False.

False

Returns:

Type Description
List[Tuple[Tensor, Tensor, float]]

List of tuples, each tuple consists of a tensor with the feature indices, coefficients and a float for the rhs.

Source code in bofire/utils/torch_tools.py
def get_linear_constraints(
    domain: Domain,
    constraint: Union[Type[LinearEqualityConstraint], Type[LinearInequalityConstraint]],
    unit_scaled: bool = False,
) -> List[Tuple[Tensor, Tensor, float]]:
    """Converts linear constraints to the form required by BoTorch.

    Args:
        domain: Optimization problem definition.
        constraint: Type of constraint that should be converted.
        unit_scaled: If True, transforms constraints by assuming that the bound for the continuous features are [0,1]. Defaults to False.

    Returns:
        List[Tuple[Tensor, Tensor, float]]: List of tuples, each tuple consists of a tensor with the feature indices, coefficients and a float for the rhs.

    """
    constraints = []
    for c in domain.constraints.get(constraint):
        indices = []
        coefficients = []
        lower = []
        upper = []
        rhs = 0.0
        for i, featkey in enumerate(c.features):
            idx = domain.inputs.get_keys(Input).index(featkey)
            feat = domain.inputs.get_by_key(featkey)
            if feat.is_fixed():
                rhs -= feat.fixed_value()[0] * c.coefficients[i]  # type: ignore
            else:
                lower.append(feat.lower_bound)  # type: ignore
                upper.append(feat.upper_bound)  # type: ignore
                indices.append(idx)
                coefficients.append(
                    c.coefficients[i],
                )  # if unit_scaled == False else c_scaled.coefficients[i])
        if unit_scaled:
            lower = np.array(lower)
            upper = np.array(upper)
            s = upper - lower
            scaled_coefficients = s * np.array(coefficients)
            constraints.append(
                (
                    torch.tensor(indices),
                    -torch.tensor(scaled_coefficients).to(**tkwargs),
                    -(rhs + c.rhs - np.sum(np.array(coefficients) * lower)),
                ),
            )
        else:
            constraints.append(
                (
                    torch.tensor(indices),
                    -torch.tensor(coefficients).to(**tkwargs),
                    -(rhs + c.rhs),
                ),
            )
    return constraints

get_multiobjective_objective(outputs, experiments)

Returns a callable that can be used by botorch for multiobjective optimization.

Parameters:

Name Type Description Default
outputs Outputs

Outputs object for which the callable should be generated.

required
experiments pd.DataFrame

DataFrame containing the experiments that are used for adapting the objectives on the fly, for example in the case of the MovingMaximizeSigmoidObjective.

required

Returns:

Type Description
Callable[[Tensor], Tensor]

description

Source code in bofire/utils/torch_tools.py
def get_multiobjective_objective(
    outputs: Outputs,
    experiments: pd.DataFrame,
) -> Callable[[Tensor, Optional[Tensor]], Tensor]:
    """Returns a callable that can be used by botorch for multiobjective optimization.

    Args:
        outputs (Outputs): Outputs object for which the callable should be generated.
        experiments (pd.DataFrame): DataFrame containing the experiments that are used for
            adapting the objectives on the fly, for example in the case of the
            `MovingMaximizeSigmoidObjective`.

    Returns:
        Callable[[Tensor], Tensor]: _description_

    """
    callables = [
        get_objective_callable(
            idx=i,
            objective=feat.objective,
            x_adapt=torch.from_numpy(
                outputs.preprocess_experiments_one_valid_output(feat.key, experiments)[
                    feat.key
                ].values,
            ).to(**tkwargs),
        )
        for i, feat in enumerate(outputs.get())
        if feat.objective is not None
        and isinstance(
            feat.objective,
            (MaximizeObjective, MinimizeObjective, CloseToTargetObjective),
        )
    ]

    def objective(samples: Tensor, X: Optional[Tensor] = None) -> Tensor:
        return torch.stack([c(samples, None) for c in callables], dim=-1)

    return objective

get_nchoosek_constraints(domain)

Transforms NChooseK constraints into a list of non-linear inequality constraint callables that can be parsed by pydantic. For this purpose the NChooseK constraint is continuously relaxed by countig the number of zeros in a candidate by a sum of narrow gaussians centered at zero.

Parameters:

Name Type Description Default
domain Domain

Optimization problem definition.

required

Returns:

Type Description
List[Callable[[Tensor], float]]

List of callables that can be used as nonlinear equality constraints in botorch.

Source code in bofire/utils/torch_tools.py
def get_nchoosek_constraints(domain: Domain) -> List[Callable[[Tensor], float]]:
    """Transforms NChooseK constraints into a list of non-linear inequality constraint callables
    that can be parsed by pydantic. For this purpose the NChooseK constraint is continuously
    relaxed by countig the number of zeros in a candidate by a sum of narrow gaussians centered
    at zero.

    Args:
        domain (Domain): Optimization problem definition.

    Returns:
        List[Callable[[Tensor], float]]: List of callables that can be used
            as nonlinear equality constraints in botorch.

    """

    def narrow_gaussian(x, ell=1e-3):
        return torch.exp(-0.5 * (x / ell) ** 2)

    def max_constraint(indices: Tensor, num_features: int, max_count: int):
        return lambda x: narrow_gaussian(x=x[..., indices]).sum(dim=-1) - (
            num_features - max_count
        )

    def min_constraint(indices: Tensor, num_features: int, min_count: int):
        return lambda x: -narrow_gaussian(x=x[..., indices]).sum(dim=-1) + (
            num_features - min_count
        )

    constraints = []
    # ignore none also valid for the start
    for c in domain.constraints.get(NChooseKConstraint):
        assert isinstance(c, NChooseKConstraint)
        indices = torch.tensor(
            [domain.inputs.get_keys(ContinuousInput).index(key) for key in c.features],
            dtype=torch.int64,
        )
        if c.max_count != len(c.features):
            constraints.append(
                max_constraint(
                    indices=indices,
                    num_features=len(c.features),
                    max_count=c.max_count,
                ),
            )
        if c.min_count > 0:
            constraints.append(
                min_constraint(
                    indices=indices,
                    num_features=len(c.features),
                    min_count=c.min_count,
                ),
            )
    return constraints

get_nonlinear_constraints(domain)

Returns a list of callable functions that represent the nonlinear constraints for the given domain that can be processed by botorch.

Parameters:

Name Type Description Default
domain Domain

The domain for which to generate the nonlinear constraints.

required

Returns:

Type Description
List[Callable[[Tensor], float]]

A list of callable functions that take a tensor as input and return a float value representing the constraint evaluation.

Source code in bofire/utils/torch_tools.py
def get_nonlinear_constraints(domain: Domain) -> List[Callable[[Tensor], float]]:
    """Returns a list of callable functions that represent the nonlinear constraints
    for the given domain that can be processed by botorch.

    Args:
        domain (Domain): The domain for which to generate the nonlinear constraints.

    Returns:
        List[Callable[[Tensor], float]]: A list of callable functions that take a tensor
        as input and return a float value representing the constraint evaluation.

    """
    return get_nchoosek_constraints(domain) + get_product_constraints(domain)

get_output_constraints(outputs, experiments)

Method to translate output constraint objectives into a list of callables and list of etas for use in botorch.

Parameters:

Name Type Description Default
outputs Outputs

Output feature object that should be processed.

required
experiments pd.DataFrame

DataFrame containing the experiments that are used for adapting the objectives on the fly, for example in the case of the MovingMaximizeSigmoidObjective.

required

Returns:

Type Description
Tuple[List[Callable[[Tensor], Tensor]], List[float]]

List of constraint callables, list of associated etas.

Source code in bofire/utils/torch_tools.py
def get_output_constraints(
    outputs: Outputs,
    experiments: pd.DataFrame,
) -> Tuple[List[Callable[[Tensor], Tensor]], List[float]]:
    """Method to translate output constraint objectives into a list of
    callables and list of etas for use in botorch.

    Args:
        outputs (Outputs): Output feature object that should
            be processed.
        experiments (pd.DataFrame): DataFrame containing the experiments that are used for
            adapting the objectives on the fly, for example in the case of the
            `MovingMaximizeSigmoidObjective`.

    Returns:
        Tuple[List[Callable[[Tensor], Tensor]], List[float]]: List of constraint callables,
            list of associated etas.

    """
    constraints = []
    etas = []
    idx = 0
    for feat in outputs.get():
        if isinstance(feat.objective, ConstrainedObjective):
            cleaned_experiments = outputs.preprocess_experiments_one_valid_output(
                feat.key,
                experiments,
            )
            iconstraints, ietas, idx = constrained_objective2botorch(
                idx,
                objective=feat.objective,
                x_adapt=torch.from_numpy(cleaned_experiments[feat.key].values).to(
                    **tkwargs,
                )
                if not isinstance(feat.objective, ConstrainedCategoricalObjective)
                else None,
            )
            constraints += iconstraints
            etas += ietas
        else:
            idx += 1
    return constraints, etas

get_product_constraints(domain)

Returns a list of nonlinear constraint functions that can be processed by botorch based on the given domain.

Parameters:

Name Type Description Default
domain Domain

The domain object containing the constraints.

required

Returns:

Type Description
List[Callable[[Tensor], float]]

A list of product constraint functions.

Source code in bofire/utils/torch_tools.py
def get_product_constraints(domain: Domain) -> List[Callable[[Tensor], float]]:
    """Returns a list of nonlinear constraint functions that can be processed by botorch
    based on the given domain.

    Args:
        domain (Domain): The domain object containing the constraints.

    Returns:
        List[Callable[[Tensor], float]]: A list of product constraint functions.

    """

    def product_constraint(indices: Tensor, exponents: Tensor, rhs: float, sign: int):
        return lambda x: -1.0 * sign * (x[..., indices] ** exponents).prod(dim=-1) + rhs

    constraints = []
    for c in domain.constraints.get(ProductInequalityConstraint):
        assert isinstance(c, ProductInequalityConstraint)
        indices = torch.tensor(
            [domain.inputs.get_keys(ContinuousInput).index(key) for key in c.features],
            dtype=torch.int64,
        )
        constraints.append(
            product_constraint(indices, torch.tensor(c.exponents), c.rhs, c.sign),
        )
    return constraints