Utils
cheminformatics
smiles2fingerprints(smiles, bond_radius=5, n_bits=2048)
Transforms a list of smiles to an array of morgan fingerprints.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
smiles |
List[str] |
List of smiles |
required |
bond_radius |
int |
Bond radius to use. Defaults to 5. |
5 |
n_bits |
int |
Number of bits. Defaults to 2048. |
2048 |
Returns:
Type | Description |
---|---|
np.ndarray |
Numpy array holding the fingerprints |
Source code in bofire/utils/cheminformatics.py
def smiles2fingerprints(
smiles: List[str],
bond_radius: int = 5,
n_bits: int = 2048,
) -> np.ndarray:
"""Transforms a list of smiles to an array of morgan fingerprints.
Args:
smiles (List[str]): List of smiles
bond_radius (int, optional): Bond radius to use. Defaults to 5.
n_bits (int, optional): Number of bits. Defaults to 2048.
Returns:
np.ndarray: Numpy array holding the fingerprints
"""
rdkit_mols = [smiles2mol(m) for m in smiles]
fps = [
AllChem.GetMorganFingerprintAsBitVect(mol, radius=bond_radius, nBits=n_bits) # type: ignore
for mol in rdkit_mols
]
return np.asarray(fps)
smiles2fragments(smiles, fragments_list=None)
Transforms smiles to an array of fragments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
smiles |
list[str] |
List of smiles |
required |
fragments_list |
list[str] |
List of desired fragments. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
np.ndarray |
Array holding the fragment information. |
Source code in bofire/utils/cheminformatics.py
def smiles2fragments(
smiles: List[str],
fragments_list: Optional[List[str]] = None,
) -> np.ndarray:
"""Transforms smiles to an array of fragments.
Args:
smiles (list[str]): List of smiles
fragments_list (list[str], optional): List of desired fragments. Defaults to None.
Returns:
np.ndarray: Array holding the fragment information.
"""
rdkit_fragment_list = [
item
for item in Descriptors.descList
if item[0].startswith("fr_") # type: ignore
]
if fragments_list is None:
fragments = {d[0]: d[1] for d in rdkit_fragment_list}
else:
fragments = {d[0]: d[1] for d in rdkit_fragment_list if d[0] in fragments_list}
frags = np.zeros((len(smiles), len(fragments)))
for i, smi in enumerate(smiles):
mol = smiles2mol(smi)
features = [fragments[d](mol) for d in fragments]
frags[i, :] = features
return frags
smiles2mol(smiles)
Transforms a smiles string to an rdkit mol object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
smiles |
str |
Smiles string. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If string is not a valid smiles. |
Returns:
Type | Description |
---|---|
rdkit.Mol |
rdkit.mol object |
Source code in bofire/utils/cheminformatics.py
def smiles2mol(smiles: str):
"""Transforms a smiles string to an rdkit mol object.
Args:
smiles (str): Smiles string.
Raises:
ValueError: If string is not a valid smiles.
Returns:
rdkit.Mol: rdkit.mol object
"""
mol = MolFromSmiles(smiles) # type: ignore
if mol is None:
raise ValueError(f"{smiles} is not a valid smiles string.")
return mol
smiles2mordred(smiles, descriptors_list)
Transforms list of smiles to mordred moelcular descriptors.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
smiles |
List[str] |
List of smiles |
required |
descriptors_list |
List[str] |
List of desired mordred descriptors |
required |
Returns:
Type | Description |
---|---|
np.ndarray |
Array holding the mordred moelcular descriptors. |
Source code in bofire/utils/cheminformatics.py
def smiles2mordred(smiles: List[str], descriptors_list: List[str]) -> np.ndarray:
"""Transforms list of smiles to mordred moelcular descriptors.
Args:
smiles (List[str]): List of smiles
descriptors_list (List[str]): List of desired mordred descriptors
Returns:
np.ndarray: Array holding the mordred moelcular descriptors.
"""
mols = [smiles2mol(smi) for smi in smiles]
calc = Calculator(descriptors, ignore_3D=True) # type: ignore
calc.descriptors = [d for d in calc.descriptors if str(d) in descriptors_list]
descriptors_df = calc.pandas(mols)
nan_list = [
pd.to_numeric(descriptors_df[col], errors="coerce").isnull().values.any() # type: ignore
for col in descriptors_df.columns
]
if any(nan_list):
raise ValueError(
f"Found NaN values in descriptors {list(descriptors_df.columns[nan_list])}", # type: ignore
)
return descriptors_df.astype(float).values
doe
compute_generator(n_factors, n_generators)
Computes a generator for a given number of factors and generators.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n_factors |
int |
The number of factors. |
required |
n_generators |
int |
The number of generators. |
required |
Returns:
Type | Description |
---|---|
str |
The generator. |
Source code in bofire/utils/doe.py
def compute_generator(n_factors: int, n_generators: int) -> str:
"""Computes a generator for a given number of factors and generators.
Args:
n_factors: The number of factors.
n_generators: The number of generators.
Returns:
The generator.
"""
if n_generators == 0:
return " ".join(list(string.ascii_lowercase[:n_factors]))
n_base_factors = n_factors - n_generators
if n_generators == 1:
if n_base_factors == 1:
raise ValueError(
"Design not possible, as main factors are confounded with each other.",
)
return " ".join(
list(string.ascii_lowercase[:n_base_factors])
+ [string.ascii_lowercase[:n_base_factors]],
)
n_base_factors = n_factors - n_generators
if n_base_factors - 1 < 2:
raise ValueError(
"Design not possible, as main factors are confounded with each other.",
)
generators = [
"".join(i)
for i in (
itertools.combinations(
string.ascii_lowercase[:n_base_factors],
n_base_factors - 1,
)
)
]
if len(generators) > n_generators:
generators = generators[:n_generators]
elif (n_generators - len(generators) == 1) and (n_base_factors > 1):
generators += [string.ascii_lowercase[:n_base_factors]]
elif n_generators - len(generators) >= 1:
raise ValueError(
"Design not possible, as main factors are confounded with each other.",
)
return " ".join(list(string.ascii_lowercase[:n_base_factors]) + generators)
ff2n(n_factors)
Computes the full factorial design for a given number of factors.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n_factors |
int |
The number of factors. |
required |
Returns:
Type | Description |
---|---|
ndarray |
The full factorial design. |
Source code in bofire/utils/doe.py
def ff2n(n_factors: int) -> np.ndarray:
"""Computes the full factorial design for a given number of factors.
Args:
n_factors: The number of factors.
Returns:
The full factorial design.
"""
return np.array(list(itertools.product([-1, 1], repeat=n_factors)))
fracfact(gen)
Computes the fractional factorial design for a given generator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
gen |
The generator. |
required |
Returns:
Type | Description |
---|---|
ndarray |
The fractional factorial design. |
Source code in bofire/utils/doe.py
def fracfact(gen) -> np.ndarray:
"""Computes the fractional factorial design for a given generator.
Args:
gen: The generator.
Returns:
The fractional factorial design.
"""
gen = validate_generator(n_factors=gen.count(" ") + 1, generator=gen)
generators = [item for item in re.split(r"\-|\s|\+", gen) if item]
lengths = [len(i) for i in generators]
# Indices of single letters (main factors)
idx_main = [i for i, item in enumerate(lengths) if item == 1]
# Indices of letter combinations.
idx_combi = [i for i, item in enumerate(generators) if item != 1]
# Check if there are "-" operators in gen
idx_negative = [
i for i, item in enumerate(gen.split(" ")) if item[0] == "-"
] # remove empty strings
# Fill in design with two level factorial design
H1 = ff2n(len(idx_main))
H = np.zeros((H1.shape[0], len(lengths)))
H[:, idx_main] = H1
# Recognize combinations and fill in the rest of matrix H2 with the proper
# products
for k in idx_combi:
# For lowercase letters
xx = np.array([ord(c) for c in generators[k]]) - 97
H[:, k] = np.prod(H1[:, xx], axis=1)
# Update design if gen includes "-" operator
if len(idx_negative) > 0:
H[:, idx_negative] *= -1
# Return the fractional factorial design
return H
get_alias_structure(gen, order=4)
Computes the alias structure of the design matrix. Works only for generators with positive signs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
gen |
str |
The generator. |
required |
order |
int |
The order up to which the alias structure should be calculated. Defaults to 4. |
4 |
Returns:
Type | Description |
---|---|
List[str] |
The alias structure of the design matrix. |
Source code in bofire/utils/doe.py
def get_alias_structure(gen: str, order: int = 4) -> List[str]:
"""Computes the alias structure of the design matrix. Works only for generators
with positive signs.
Args:
gen: The generator.
order: The order up to which the alias structure should be calculated. Defaults to 4.
Returns:
The alias structure of the design matrix.
"""
design = fracfact(gen)
n_experiments, n_factors = design.shape
all_names = string.ascii_lowercase + "I"
factors = range(n_factors)
all_combinations = itertools.chain.from_iterable(
itertools.combinations(factors, n) for n in range(1, min(n_factors, order) + 1)
)
aliases = {n_experiments * "+": [(26,)]} # 26 is mapped to I
for combination in all_combinations:
# positive sign
contrast = np.prod(
design[:, combination],
axis=1,
) # this is the product of the combination
scontrast = "".join(np.where(contrast == 1, "+", "-").tolist())
aliases[scontrast] = aliases.get(scontrast, [])
aliases[scontrast].append(combination) # type: ignore
aliases_list = []
for alias in aliases.values():
aliases_list.append(
sorted(alias, key=lambda a: (len(a), a)),
) # sort by length and then by the combination
aliases_list = sorted(
aliases_list,
key=lambda list: ([len(a) for a in list], list),
) # sort by the length of the alias
aliases_readable = []
for alias in aliases_list:
aliases_readable.append(
" = ".join(["".join([all_names[f] for f in a]) for a in alias]),
)
return aliases_readable
get_confounding_matrix(inputs, design, powers=None, interactions=None)
Analyzes the confounding of a design and returns the confounding matrix.
Only takes continuous features into account.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs |
Inputs |
Input features. |
required |
design |
pd.DataFrame |
Design matrix. |
required |
powers |
List[int] |
List of powers of the individual factors/features that should be considered. Integers has to be larger than 1. Defaults to []. |
None |
interactions |
List[int] |
List with interaction levels to be considered. Integers has to be larger than 1. Defaults to [2]. |
None |
Returns:
Type | Description |
---|---|
_type_ |
description |
Source code in bofire/utils/doe.py
def get_confounding_matrix(
inputs: Inputs,
design: pd.DataFrame,
powers: Optional[List[int]] = None,
interactions: Optional[List[int]] = None,
):
"""Analyzes the confounding of a design and returns the confounding matrix.
Only takes continuous features into account.
Args:
inputs (Inputs): Input features.
design (pd.DataFrame): Design matrix.
powers (List[int], optional): List of powers of the individual factors/features that should be considered.
Integers has to be larger than 1. Defaults to [].
interactions (List[int], optional): List with interaction levels to be considered.
Integers has to be larger than 1. Defaults to [2].
Returns:
_type_: _description_
"""
from sklearn.preprocessing import MinMaxScaler
if len(inputs.get(CategoricalInput)) > 0:
warnings.warn("Categorical input features will be ignored.")
keys = inputs.get_keys(ContinuousInput)
scaler = MinMaxScaler(feature_range=(-1, 1))
scaled_design = pd.DataFrame(
data=scaler.fit_transform(design[keys]),
columns=keys,
)
# add powers
if powers is not None:
for p in powers:
assert p > 1, "Power has to be at least of degree two."
for key in keys:
scaled_design[f"{key}**{p}"] = scaled_design[key] ** p
# add interactions
if interactions is None:
interactions = [2]
for i in interactions:
assert i > 1, "Interaction has to be at least of degree two."
assert i < len(keys) + 1, f"Interaction has to be smaller than {len(keys)+1}."
for combi in itertools.combinations(keys, i):
scaled_design[":".join(combi)] = scaled_design[list(combi)].prod(axis=1)
return scaled_design.corr()
get_default_generator(n_factors, n_generators)
Returns the default generator for a given number of factors and generators.
In case the combination is not available, the function will raise an error.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n_factors |
int |
The number of factors. |
required |
n_generators |
int |
The number of generators. |
required |
Returns:
Type | Description |
---|---|
str |
The generator. |
Source code in bofire/utils/doe.py
def get_default_generator(n_factors: int, n_generators: int) -> str:
"""Returns the default generator for a given number of factors and generators.
In case the combination is not available, the function will raise an error.
Args:
n_factors: The number of factors.
n_generators: The number of generators.
Returns:
The generator.
"""
if n_generators == 0:
return " ".join(list(string.ascii_lowercase[:n_factors]))
df_generators = default_fracfac_generators
n_base_factors = n_factors - n_generators
if df_generators.loc[
(df_generators.n_factors == n_factors)
& (df_generators.n_generators == n_generators)
].empty:
raise ValueError("No generator available for the requested combination.")
generators = (
df_generators.loc[
(df_generators.n_factors == n_factors)
& (df_generators.n_generators == n_generators),
"generator",
]
.to_list()[0]
.split(";")
)
assert len(generators) == n_generators, "Number of generators does not match."
generators = [generator.split("=")[1].strip().lower() for generator in generators]
return " ".join(list(string.ascii_lowercase[:n_base_factors]) + generators)
get_generator(n_factors, n_generators)
Returns a generator for a given number of factors and generators.
If the requested combination is available in the default generators, it will return
this one. Otherwise, it will compute a new one using get_bofire_generator
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n_factors |
int |
The number of factors. |
required |
n_generators |
int |
The number of generators. |
required |
Returns:
Type | Description |
---|---|
str |
The generator. |
Source code in bofire/utils/doe.py
def get_generator(n_factors: int, n_generators: int) -> str:
"""Returns a generator for a given number of factors and generators.
If the requested combination is available in the default generators, it will return
this one. Otherwise, it will compute a new one using `get_bofire_generator`.
Args:
n_factors: The number of factors.
n_generators: The number of generators.
Returns:
The generator.
"""
try:
return get_default_generator(n_factors, n_generators)
except ValueError:
return compute_generator(n_factors, n_generators)
validate_generator(n_factors, generator)
Validates the generator and thows an error if it is not valid.
Source code in bofire/utils/doe.py
def validate_generator(n_factors: int, generator: str) -> str:
"""Validates the generator and thows an error if it is not valid."""
if len(generator.split(" ")) != n_factors:
raise ValueError("Generator does not match the number of factors.")
# clean it and transform it into a list
generators = [item for item in re.split(r"\-|\s|\+", generator) if item]
lengths = [len(i) for i in generators]
# Indices of single letters (main factors)
idx_main = [i for i, item in enumerate(lengths) if item == 1]
if len(idx_main) == 0:
raise ValueError("At least one unconfounded main factor is needed.")
# Check that single letters (main factors) are unique
if len(idx_main) != len({generators[i] for i in idx_main}):
raise ValueError("Main factors are confounded with each other.")
# Check that single letters (main factors) follow the alphabet
if (
"".join(sorted([generators[i] for i in idx_main]))
!= string.ascii_lowercase[: len(idx_main)]
):
raise ValueError(
f'Use the letters `{" ".join(string.ascii_lowercase[: len(idx_main)])}` for the main factors.',
)
# Indices of letter combinations.
idx_combi = [i for i, item in enumerate(generators) if item != 1]
# check that main factors come before combinations
if min(idx_combi) > max(idx_main):
raise ValueError("Main factors have to come before combinations.")
# Check that letter combinations are unique
if len(idx_combi) != len({generators[i] for i in idx_combi}):
raise ValueError("Generators are not unique.")
# Check that only letters are used in the combinations that are also single letters (main factors)
if not all(
set(item).issubset({generators[i] for i in idx_main})
for item in [generators[i] for i in idx_combi]
):
raise ValueError("Generators are not valid.")
return generator
multiobjective
get_ref_point_mask(domain, output_feature_keys=None)
Method to get a mask for the reference points taking into account if we want to maximize or minimize an objective. In case it is maximize the value in the mask is 1, in case we want to minimize it is -1.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
Domain for which the mask should be generated. |
required |
output_feature_keys |
Optional[list] |
Name of output feature keys that should be considered in the mask. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
np.ndarray |
description |
Source code in bofire/utils/multiobjective.py
def get_ref_point_mask(
domain: Domain,
output_feature_keys: Optional[list] = None,
) -> np.ndarray:
"""Method to get a mask for the reference points taking into account if we
want to maximize or minimize an objective. In case it is maximize the value
in the mask is 1, in case we want to minimize it is -1.
Args:
domain (Domain): Domain for which the mask should be generated.
output_feature_keys (Optional[list], optional): Name of output feature keys
that should be considered in the mask. Defaults to None.
Returns:
np.ndarray: _description_
"""
if output_feature_keys is None:
output_feature_keys = domain.outputs.get_keys_by_objective(
includes=[MaximizeObjective, MinimizeObjective, CloseToTargetObjective],
)
if len(output_feature_keys) < 2:
raise ValueError("At least two output features have to be provided.")
mask = []
for key in output_feature_keys:
feat = domain.outputs.get_by_key(key)
if isinstance(feat.objective, MaximizeObjective):
mask.append(1.0)
elif isinstance(feat.objective, MinimizeObjective) or isinstance(
feat.objective,
CloseToTargetObjective,
):
mask.append(-1.0)
else:
raise ValueError(
"Only `MaximizeObjective` and `MinimizeObjective` supported",
)
return np.array(mask)
naming_conventions
get_column_names(outputs)
Specifies column names for given Outputs type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
outputs |
Outputs |
The Outputs object containing the individual outputs. |
required |
Returns:
Type | Description |
---|---|
Tuple[List[str], List[str]] |
A tuple containing the prediction column names and the standard deviation column names |
Source code in bofire/utils/naming_conventions.py
def get_column_names(outputs: Outputs) -> Tuple[List[str], List[str]]:
"""Specifies column names for given Outputs type.
Args:
outputs (Outputs): The Outputs object containing the individual outputs.
Returns:
Tuple[List[str], List[str]]: A tuple containing the prediction column names and the standard deviation column names
"""
pred_cols, sd_cols = [], []
for featkey in outputs.get_keys(CategoricalOutput):
pred_cols = pred_cols + [
f"{featkey}_{cat}_prob"
for cat in outputs.get_by_key(featkey).categories # type: ignore
]
sd_cols = sd_cols + [
f"{featkey}_{cat}_sd"
for cat in outputs.get_by_key(featkey).categories # type: ignore
]
for featkey in outputs.get_keys(ContinuousOutput):
pred_cols = pred_cols + [f"{featkey}_pred"]
sd_cols = sd_cols + [f"{featkey}_sd"]
return pred_cols, sd_cols
postprocess_categorical_predictions(predictions, outputs)
Postprocess categorical predictions by finding the maximum probability location
Parameters:
Name | Type | Description | Default |
---|---|---|---|
predictions |
pd.DataFrame |
The dataframe containing the predictions. |
required |
outputs |
Outputs |
The Outputs object containing the individual outputs. |
required |
Returns:
Type | Description |
---|---|
predictions (pd.DataFrame) |
The (potentially modified) original dataframe with categorical predictions added |
Source code in bofire/utils/naming_conventions.py
def postprocess_categorical_predictions(
predictions: pd.DataFrame,
outputs: Outputs,
) -> pd.DataFrame:
"""Postprocess categorical predictions by finding the maximum probability location
Args:
predictions (pd.DataFrame): The dataframe containing the predictions.
outputs (Outputs): The Outputs object containing the individual outputs.
Returns:
predictions (pd.DataFrame): The (potentially modified) original dataframe with categorical predictions added
"""
for feat in outputs.get():
if isinstance(feat, CategoricalOutput):
predictions.insert(
loc=0,
column=f"{feat.key}_pred",
value=predictions.filter(regex=f"{feat.key}(.*)_prob")
.idxmax(1)
.str.replace(f"{feat.key}_", "")
.str.replace("_prob", "")
.values, # type: ignore
)
predictions.insert(
loc=1,
column=f"{feat.key}_sd",
value=0.0,
)
return predictions
reduce
AffineTransform
Class to switch back and forth from the reduced to the original domain.
Source code in bofire/utils/reduce.py
class AffineTransform:
"""Class to switch back and forth from the reduced to the original domain."""
def __init__(self, equalities: List[Tuple[str, List[str], List[float]]]):
"""Initializes a `AffineTransformation` object.
Args:
equalities (List[Tuple[str,List[str],List[float]]]): List of equalities. Every equality
is defined as a tuple, in which the first entry is the key of the reduced feature, the second
one is a list of feature keys that can be used to compute the feature and the third list of floats
are the corresponding coefficients.
"""
self.equalities = equalities
def augment_data(self, data: pd.DataFrame) -> pd.DataFrame:
"""Restore the eliminated features in a dataframe
Args:
data (pd.DataFrame): Dataframe that should be restored.
Returns:
pd.DataFrame: Restored dataframe
"""
if len(self.equalities) == 0:
return data
data = data.copy()
for name_lhs, names_rhs, coeffs in self.equalities:
data[name_lhs] = coeffs[-1]
for i, name in enumerate(names_rhs):
data[name_lhs] += coeffs[i] * data[name]
return data
def drop_data(self, data: pd.DataFrame) -> pd.DataFrame:
"""Drop eliminated features from a dataframe.
Args:
data (pd.DataFrame): Dataframe with features to be dropped.
Returns:
pd.DataFrame: Reduced dataframe.
"""
if len(self.equalities) == 0:
return data
drop = []
for name_lhs, _, _ in self.equalities:
if name_lhs in data.columns:
drop.append(name_lhs)
return data.drop(columns=drop)
__init__(self, equalities)
special
Initializes a AffineTransformation
object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
equalities |
List[Tuple[str,List[str],List[float]]] |
List of equalities. Every equality is defined as a tuple, in which the first entry is the key of the reduced feature, the second one is a list of feature keys that can be used to compute the feature and the third list of floats are the corresponding coefficients. |
required |
Source code in bofire/utils/reduce.py
def __init__(self, equalities: List[Tuple[str, List[str], List[float]]]):
"""Initializes a `AffineTransformation` object.
Args:
equalities (List[Tuple[str,List[str],List[float]]]): List of equalities. Every equality
is defined as a tuple, in which the first entry is the key of the reduced feature, the second
one is a list of feature keys that can be used to compute the feature and the third list of floats
are the corresponding coefficients.
"""
self.equalities = equalities
augment_data(self, data)
Restore the eliminated features in a dataframe
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
pd.DataFrame |
Dataframe that should be restored. |
required |
Returns:
Type | Description |
---|---|
pd.DataFrame |
Restored dataframe |
Source code in bofire/utils/reduce.py
def augment_data(self, data: pd.DataFrame) -> pd.DataFrame:
"""Restore the eliminated features in a dataframe
Args:
data (pd.DataFrame): Dataframe that should be restored.
Returns:
pd.DataFrame: Restored dataframe
"""
if len(self.equalities) == 0:
return data
data = data.copy()
for name_lhs, names_rhs, coeffs in self.equalities:
data[name_lhs] = coeffs[-1]
for i, name in enumerate(names_rhs):
data[name_lhs] += coeffs[i] * data[name]
return data
drop_data(self, data)
Drop eliminated features from a dataframe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
pd.DataFrame |
Dataframe with features to be dropped. |
required |
Returns:
Type | Description |
---|---|
pd.DataFrame |
Reduced dataframe. |
Source code in bofire/utils/reduce.py
def drop_data(self, data: pd.DataFrame) -> pd.DataFrame:
"""Drop eliminated features from a dataframe.
Args:
data (pd.DataFrame): Dataframe with features to be dropped.
Returns:
pd.DataFrame: Reduced dataframe.
"""
if len(self.equalities) == 0:
return data
drop = []
for name_lhs, _, _ in self.equalities:
if name_lhs in data.columns:
drop.append(name_lhs)
return data.drop(columns=drop)
adjust_boundary(feature, coef, rhs)
Adjusts the boundaries of a feature.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
feature |
ContinuousInput |
Feature to be adjusted. |
required |
coef |
float |
Coefficient. |
required |
rhs |
float |
Right-hand-side of the constraint. |
required |
Source code in bofire/utils/reduce.py
def adjust_boundary(feature: ContinuousInput, coef: float, rhs: float):
"""Adjusts the boundaries of a feature.
Args:
feature (ContinuousInput): Feature to be adjusted.
coef (float): Coefficient.
rhs (float): Right-hand-side of the constraint.
"""
boundary = rhs / coef
if coef > 0:
if boundary > feature.lower_bound:
feature.bounds = (boundary, feature.upper_bound)
elif boundary < feature.upper_bound:
feature.bounds = (feature.lower_bound, boundary)
check_domain_for_reduction(domain)
Check if the reduction can be applied or if a trivial case is present.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
Domain to be checked. |
required |
Returns:
Type | Description |
---|---|
bool |
True if reducable, else False. |
Source code in bofire/utils/reduce.py
def check_domain_for_reduction(domain: Domain) -> bool:
"""Check if the reduction can be applied or if a trivial case is present.
Args:
domain (Domain): Domain to be checked.
Returns:
bool: True if reducable, else False.
"""
# are there any constraints?
if len(domain.constraints) == 0:
return False
# are there any linear equality constraints?
linear_equalities = domain.constraints.get(LinearEqualityConstraint)
if len(linear_equalities) == 0:
return False
# are there no NChooseKConstraint constraints?
if len(domain.constraints.get([NChooseKConstraint])) > 0:
return False
# are there continuous inputs
continuous_inputs = domain.inputs.get(ContinuousInput)
if len(continuous_inputs) == 0:
return False
# check that equality constraints only contain continuous inputs
for c in linear_equalities:
assert isinstance(c, LinearConstraint)
for feat in c.features:
if feat not in domain.inputs.get_keys(ContinuousInput):
return False
return True
check_existence_of_solution(A_aug)
Given an augmented coefficient matrix this function determines the existence (and uniqueness) of solution using the rank theorem.
Source code in bofire/utils/reduce.py
def check_existence_of_solution(A_aug):
"""Given an augmented coefficient matrix this function determines the existence (and uniqueness) of solution using the rank theorem."""
A = A_aug[:, :-1]
b = A_aug[:, -1]
len_inputs = np.shape(A)[1]
# catch special cases
rk_A_aug = np.linalg.matrix_rank(A_aug)
rk_A = np.linalg.matrix_rank(A)
if rk_A == rk_A_aug:
if rk_A < len_inputs:
return # all good
x = np.linalg.solve(A, b)
raise Exception(
f"There is a unique solution x for the linear equality constraints: x={x}",
)
if rk_A < rk_A_aug:
raise Exception(
"There is no solution fulfilling the linear equality constraints.",
)
reduce_domain(domain)
Reduce a domain with linear equality constraints to a subdomain where linear equality constraints are eliminated.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
Domain to be reduced. |
required |
Returns:
Type | Description |
---|---|
Tuple[Domain, AffineTransform] |
reduced domain and the according transformation to switch between the reduced and original domain. |
Source code in bofire/utils/reduce.py
def reduce_domain(domain: Domain) -> Tuple[Domain, AffineTransform]:
"""Reduce a domain with linear equality constraints to a subdomain where linear equality constraints are eliminated.
Args:
domain (Domain): Domain to be reduced.
Returns:
Tuple[Domain, AffineTransform]: reduced domain and the according transformation to switch between the
reduced and original domain.
"""
# check if the domain can be reduced
if not check_domain_for_reduction(domain):
return domain, AffineTransform([])
# find linear equality constraints
linear_equalities = domain.constraints.get(LinearEqualityConstraint)
other_constraints = domain.constraints.get(
Constraint,
excludes=[LinearEqualityConstraint],
)
# only consider continuous inputs
continuous_inputs = [
cast(ContinuousInput, f) for f in domain.inputs.get(ContinuousInput)
]
other_inputs = domain.inputs.get(Input, excludes=[ContinuousInput])
# assemble Matrix A from equality constraints
N = len(linear_equalities)
M = len(continuous_inputs) + 1
names = np.concatenate(([feat.key for feat in continuous_inputs], ["rhs"]))
A_aug = pd.DataFrame(data=np.zeros(shape=(N, M)), columns=names)
for i in range(len(linear_equalities)):
c = linear_equalities[i]
assert isinstance(c, LinearEqualityConstraint)
A_aug.loc[i, c.features] = c.coefficients
A_aug.loc[i, "rhs"] = c.rhs
A_aug = A_aug.values
# catch special cases
check_existence_of_solution(A_aug)
# bring A_aug to reduced row-echelon form
A_aug_rref, pivots = rref(A_aug)
pivots = np.array(pivots)
A_aug_rref = np.array(A_aug_rref).astype(np.float64)
# formulate box bounds as linear inequality constraints in matrix form
B = np.zeros(shape=(2 * (M - 1), M))
B[: M - 1, : M - 1] = np.eye(M - 1)
B[M - 1 :, : M - 1] = -np.eye(M - 1)
B[: M - 1, -1] = np.array([feat.upper_bound for feat in continuous_inputs])
B[M - 1 :, -1] = -1.0 * np.array([feat.lower_bound for feat in continuous_inputs])
# eliminate columns with pivot element
for i in range(len(pivots)):
p = pivots[i]
B[p, :] -= A_aug_rref[i, :]
B[p + M - 1, :] += A_aug_rref[i, :]
# build up reduced domain
_domain = Domain.model_construct(
# _fields_set = {"inputs", "outputs", "constraints"}
inputs=deepcopy(other_inputs),
outputs=deepcopy(domain.outputs),
constraints=deepcopy(other_constraints),
)
new_inputs = [
deepcopy(feat) for i, feat in enumerate(continuous_inputs) if i not in pivots
]
all_inputs = _domain.inputs + new_inputs
assert isinstance(all_inputs, Inputs)
_domain.inputs.features = all_inputs.features
constraints: List[AnyConstraint] = []
for i in pivots:
# reduce equation system of upper bounds
ind = np.where(B[i, :-1] != 0)[0]
if len(ind) > 0 and B[i, -1] < np.inf:
if len(list(names[ind])) > 1:
c = LinearInequalityConstraint.from_greater_equal(
features=list(names[ind]),
coefficients=(-1.0 * B[i, ind]).tolist(),
rhs=B[i, -1] * -1.0,
)
constraints.append(c)
else:
key = names[ind][0]
feat = cast(ContinuousInput, _domain.inputs.get_by_key(key))
adjust_boundary(feat, (-1.0 * B[i, ind])[0], B[i, -1] * -1.0)
elif B[i, -1] < -1e-16:
raise Exception("There is no solution that fulfills the constraints.")
# reduce equation system of lower bounds
ind = np.where(B[i + M - 1, :-1] != 0)[0]
if len(ind) > 0 and B[i + M - 1, -1] < np.inf:
if len(list(names[ind])) > 1:
c = LinearInequalityConstraint.from_greater_equal(
features=list(names[ind]),
coefficients=(-1.0 * B[i + M - 1, ind]).tolist(),
rhs=B[i + M - 1, -1] * -1.0,
)
constraints.append(c)
else:
key = names[ind][0]
feat = cast(ContinuousInput, _domain.inputs.get_by_key(key))
adjust_boundary(
feat,
(-1.0 * B[i + M - 1, ind])[0],
B[i + M - 1, -1] * -1.0,
)
elif B[i + M - 1, -1] < -1e-16:
raise Exception("There is no solution that fulfills the constraints.")
if len(constraints) > 0:
_domain.constraints.constraints = _domain.constraints.constraints + constraints # type: ignore
# assemble equalities
_equalities = []
for i in range(len(pivots)):
name_lhs = names[pivots[i]]
names_rhs = []
coeffs = []
for j in range(len(names) - 1):
if A_aug_rref[i, j] != 0 and j != pivots[i]:
coeffs.append(-A_aug_rref[i, j])
names_rhs.append(names[j])
coeffs.append(A_aug_rref[i, -1])
_equalities.append((name_lhs, names_rhs, coeffs))
trafo = AffineTransform(_equalities)
# remove remaining dependencies of eliminated inputs from the problem
_domain = remove_eliminated_inputs(_domain, trafo)
return _domain, trafo
remove_eliminated_inputs(domain, transform)
Eliminates remaining occurrences of eliminated inputs in linear constraints.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
Domain in which the linear constraints should be purged. |
required |
transform |
AffineTransform |
Affine transformation object that defines the obsolete features. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If feature occurs in a constraint different from a linear one. |
Returns:
Type | Description |
---|---|
Domain |
Purged domain. |
Source code in bofire/utils/reduce.py
def remove_eliminated_inputs(domain: Domain, transform: AffineTransform) -> Domain:
"""Eliminates remaining occurrences of eliminated inputs in linear constraints.
Args:
domain (Domain): Domain in which the linear constraints should be purged.
transform (AffineTransform): Affine transformation object that defines the obsolete features.
Raises:
ValueError: If feature occurs in a constraint different from a linear one.
Returns:
Domain: Purged domain.
"""
inputs_names = domain.inputs.get_keys()
M = len(inputs_names)
# write the equalities for the backtransformation into one matrix
inputs_dict = {inputs_names[i]: i for i in range(M)}
# build up dict from domain.equalities e.g. {"xi1": [coeff(xj1), ..., coeff(xjn)], ... "xik":...}
coeffs_dict = {}
for e in transform.equalities:
coeffs = np.zeros(M + 1)
for j, name in enumerate(e[1]):
coeffs[inputs_dict[name]] = e[2][j]
coeffs[-1] = e[2][-1]
coeffs_dict[e[0]] = coeffs
constraints = []
for c in domain.constraints.get():
# Nonlinear constraints not supported
if not isinstance(c, LinearConstraint):
raise ValueError(
"Elimination of variables is only supported for LinearEquality and LinearInequality constraints.",
)
# no changes, if the constraint does not contain eliminated inputs
if all(name in inputs_names for name in c.features):
constraints.append(c)
# remove inputs from the constraint that were eliminated from the inputs before
else:
totally_removed = False
_features = np.array(inputs_names)
_rhs = c.rhs
# create new lhs and rhs from the old one and knowledge from problem._equalities
_coefficients = np.zeros(M)
for j, name in enumerate(c.features):
if name in inputs_names:
_coefficients[inputs_dict[name]] += c.coefficients[j]
else:
_coefficients += c.coefficients[j] * coeffs_dict[name][:-1]
_rhs -= c.coefficients[j] * coeffs_dict[name][-1]
_features = _features[np.abs(_coefficients) > 1e-16]
_coefficients = _coefficients[np.abs(_coefficients) > 1e-16]
_c = None
if isinstance(c, LinearEqualityConstraint):
if len(_features) > 1:
_c = LinearEqualityConstraint(
features=_features.tolist(),
coefficients=_coefficients.tolist(),
rhs=_rhs,
)
elif len(_features) == 0:
totally_removed = True
else:
feat: ContinuousInput = ContinuousInput(
**domain.inputs.get_by_key(_features[0]).model_dump(),
)
feat.bounds = (_coefficients[0], _coefficients[0])
totally_removed = True
elif len(_features) > 1:
_c = LinearInequalityConstraint(
features=_features.tolist(),
coefficients=_coefficients.tolist(),
rhs=_rhs,
)
elif len(_features) == 0:
totally_removed = True
else:
feat = cast(ContinuousInput, domain.inputs.get_by_key(_features[0]))
adjust_boundary(feat, _coefficients[0], _rhs)
totally_removed = True
# check if constraint is always fulfilled/not fulfilled
if not totally_removed:
assert _c is not None
if len(_c.features) == 0 and _c.rhs >= 0:
pass
elif len(_c.features) == 0 and _c.rhs < 0:
raise Exception("Linear constraints cannot be fulfilled.")
elif np.isinf(_c.rhs):
pass
else:
constraints.append(_c)
domain.constraints = Constraints(constraints=constraints)
return domain
rref(A, tol=1e-08)
Computes the reduced row echelon form of a Matrix
Parameters:
Name | Type | Description | Default |
---|---|---|---|
A |
ndarray |
2d array representing a matrix. |
required |
tol |
float |
tolerance for rounding to 0. Defaults to 1e-8. |
1e-08 |
Returns:
Type | Description |
---|---|
Tuple[numpy.ndarray, List[int]] |
(A_rref, pivots), where A_rref is the reduced row echelon form of A and pivots is a numpy array containing the pivot columns of A_rref |
Source code in bofire/utils/reduce.py
def rref(A: np.ndarray, tol: float = 1e-8) -> Tuple[np.ndarray, List[int]]:
"""Computes the reduced row echelon form of a Matrix
Args:
A (ndarray): 2d array representing a matrix.
tol (float, optional): tolerance for rounding to 0. Defaults to 1e-8.
Returns:
(A_rref, pivots), where A_rref is the reduced row echelon form of A and pivots
is a numpy array containing the pivot columns of A_rref
"""
A = np.array(A, dtype=np.float64)
n, m = np.shape(A)
col = 0
row = 0
pivots = []
for col in range(m):
# does a pivot element exist?
if all(np.abs(A[row:, col]) < tol):
pass
# if yes: start elimination
else:
pivots.append(col)
max_row = np.argmax(np.abs(A[row:, col])) + row
# switch to most stable row
A[[row, max_row], :] = A[[max_row, row], :]
# normalize row
A[row, :] /= A[row, col]
# eliminate other elements from column
for r in range(n):
if r != row:
A[r, :] -= A[r, col] / A[row, col] * A[row, :]
row += 1
prec = int(-np.log10(tol))
return np.round(A, prec), pivots
subdomain
get_subdomain(domain, feature_keys)
Removes all features not defined as argument creating a subdomain of the provided domain
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
the original domain wherefrom a subdomain should be created |
required |
feature_keys |
List |
List of features that shall be included in the subdomain |
required |
Exceptions:
Type | Description |
---|---|
Assert |
when in total less than 2 features are provided |
ValueError |
when a provided feature key is not present in the provided domain |
Assert |
when no output feature is provided |
Assert |
when no input feature is provided |
ValueError |
description |
Returns:
Type | Description |
---|---|
Domain |
A new domain containing only parts of the original domain |
Source code in bofire/utils/subdomain.py
def get_subdomain(
domain: Domain,
feature_keys: List,
) -> Domain:
"""Removes all features not defined as argument creating a subdomain of the provided domain
Args:
domain (Domain): the original domain wherefrom a subdomain should be created
feature_keys (List): List of features that shall be included in the subdomain
Raises:
Assert: when in total less than 2 features are provided
ValueError: when a provided feature key is not present in the provided domain
Assert: when no output feature is provided
Assert: when no input feature is provided
ValueError: _description_
Returns:
Domain: A new domain containing only parts of the original domain
"""
assert len(feature_keys) >= 2, "At least two features have to be provided."
outputs = []
inputs = []
for key in feature_keys:
try:
feat = (domain.inputs + domain.outputs).get_by_key(key)
except KeyError:
raise ValueError(f"Feature {key} not present in domain.")
if isinstance(feat, Input):
inputs.append(feat)
else:
outputs.append(feat)
assert len(outputs) > 0, "At least one output feature has to be provided."
assert len(inputs) > 0, "At least one input feature has to be provided."
inputs = Inputs(features=inputs)
outputs = Outputs(features=outputs)
# loop over constraints and make sure that all features used in constraints are in the input_feature_keys
for c in domain.constraints:
for key in c.features:
if key not in inputs.get_keys():
raise ValueError(
f"Removed input feature {key} is used in a constraint.",
)
subdomain = deepcopy(domain)
subdomain.inputs = inputs
subdomain.outputs = outputs
return subdomain
torch_tools
InterpolateTransform (InputTransform, Module)
Botorch input transform that interpolates values between given x and y values.
Source code in bofire/utils/torch_tools.py
class InterpolateTransform(InputTransform, Module):
"""Botorch input transform that interpolates values between given x and y values."""
def __init__(
self,
new_x: Tensor,
idx_x: List[int],
idx_y: List[int],
prepend_x: Tensor,
prepend_y: Tensor,
append_x: Tensor,
append_y: Tensor,
keep_original: bool = False,
transform_on_train: bool = True,
transform_on_eval: bool = True,
transform_on_fantasize: bool = True,
):
super().__init__()
if len(set(idx_x + idx_y)) != len(idx_x) + len(idx_y):
raise ValueError("Indices are not unique.")
self.idx_x = torch.as_tensor(idx_x, dtype=torch.long)
self.idx_y = torch.as_tensor(idx_y, dtype=torch.long)
self.transform_on_train = transform_on_train
self.transform_on_eval = transform_on_eval
self.transform_on_fantasize = transform_on_fantasize
self.new_x = new_x
self.prepend_x = prepend_x
self.prepend_y = prepend_y
self.append_x = append_x
self.append_y = append_y
self.keep_original = keep_original
if len(self.idx_x) + len(self.prepend_x) + len(self.append_x) != len(
self.idx_y,
) + len(self.prepend_y) + len(self.append_y):
raise ValueError("The number of x and y indices must be equal.")
def _to(self, X: Tensor) -> None:
self.new_x = self.coefficient.to(X)
def append(self, X: Tensor, values: Tensor) -> Tensor:
shape = X.shape
values_reshaped = values.view(*([1] * (len(shape) - 1)), -1)
values_expanded = values_reshaped.expand(*shape[:-1], -1).to(X)
return torch.cat([X, values_expanded], dim=-1)
def prepend(self, X: Tensor, values: Tensor) -> Tensor:
shape = X.shape
values_reshaped = values.view(*([1] * (len(shape) - 1)), -1)
values_expanded = values_reshaped.expand(*shape[:-1], -1).to(X)
return torch.cat([values_expanded, X], dim=-1)
def transform(self, X: Tensor):
shapeX = X.shape
x = X[..., self.idx_x]
x = self.prepend(x, self.prepend_x)
x = self.append(x, self.append_x)
y = X[..., self.idx_y]
y = self.prepend(y, self.prepend_y)
y = self.append(y, self.append_y)
if X.dim() == 3:
x = x.reshape((shapeX[0] * shapeX[1], x.shape[-1]))
y = y.reshape((shapeX[0] * shapeX[1], y.shape[-1]))
new_x = self.new_x.expand(x.shape[0], -1)
new_y = torch.vmap(interp1d)(x, y, new_x)
if X.dim() == 3:
new_y = new_y.reshape((shapeX[0], shapeX[1], new_y.shape[-1]))
if self.keep_original:
return torch.cat([new_y, X], dim=-1)
return new_y
transform(self, X)
Transform the inputs to a model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
X |
Tensor |
A |
required |
Returns:
Type | Description |
---|---|
A |
Source code in bofire/utils/torch_tools.py
def transform(self, X: Tensor):
shapeX = X.shape
x = X[..., self.idx_x]
x = self.prepend(x, self.prepend_x)
x = self.append(x, self.append_x)
y = X[..., self.idx_y]
y = self.prepend(y, self.prepend_y)
y = self.append(y, self.append_y)
if X.dim() == 3:
x = x.reshape((shapeX[0] * shapeX[1], x.shape[-1]))
y = y.reshape((shapeX[0] * shapeX[1], y.shape[-1]))
new_x = self.new_x.expand(x.shape[0], -1)
new_y = torch.vmap(interp1d)(x, y, new_x)
if X.dim() == 3:
new_y = new_y.reshape((shapeX[0], shapeX[1], new_y.shape[-1]))
if self.keep_original:
return torch.cat([new_y, X], dim=-1)
return new_y
constrained_objective2botorch(idx, objective, x_adapt, eps=1e-08)
Create a callable that can be used by botorch.utils.objective.apply_constraints
to setup output constrained optimizations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
idx |
int |
Index of the constraint objective in the list of outputs. |
required |
objective |
BotorchConstrainedObjective |
The objective that should be transformed. |
required |
x_adapt |
Optional[Tensor] |
The tensor that should be used to adapt the objective,
for example in case of a moving turning point in the |
required |
eps |
float |
Small value to avoid numerical instabilities in case of the |
1e-08 |
Returns:
Type | Description |
---|---|
Tuple[List[Callable[[Tensor], Tensor]], List[float], int] |
List of callables that can be used by botorch for setting up the constrained objective, list of the corresponding botorch eta values, final index used by the method (to track for categorical variables) |
Source code in bofire/utils/torch_tools.py
def constrained_objective2botorch(
idx: int,
objective: ConstrainedObjective,
x_adapt: Optional[Tensor],
eps: float = 1e-8,
) -> Tuple[List[Callable[[Tensor], Tensor]], List[float], int]:
"""Create a callable that can be used by `botorch.utils.objective.apply_constraints`
to setup output constrained optimizations.
Args:
idx (int): Index of the constraint objective in the list of outputs.
objective (BotorchConstrainedObjective): The objective that should be transformed.
x_adapt (Optional[Tensor]): The tensor that should be used to adapt the objective,
for example in case of a moving turning point in the `MovingMaximizeSigmoidObjective`.
eps (float, optional): Small value to avoid numerical instabilities in case of the `ConstrainedCategoricalObjective`.
Defaults to 1e-8.
Returns:
Tuple[List[Callable[[Tensor], Tensor]], List[float], int]: List of callables that can be used by botorch for setting up the constrained objective,
list of the corresponding botorch eta values, final index used by the method (to track for categorical variables)
"""
assert isinstance(
objective,
ConstrainedObjective,
), "Objective is not a `ConstrainedObjective`."
if isinstance(objective, MaximizeSigmoidObjective):
return (
[lambda Z: (Z[..., idx] - objective.tp) * -1.0],
[1.0 / objective.steepness],
idx + 1,
)
if isinstance(objective, MovingMaximizeSigmoidObjective):
assert x_adapt is not None
tp = x_adapt.max().item() + objective.tp
return (
[lambda Z: (Z[..., idx] - tp) * -1.0],
[1.0 / objective.steepness],
idx + 1,
)
if isinstance(objective, MinimizeSigmoidObjective):
return (
[lambda Z: (Z[..., idx] - objective.tp)],
[1.0 / objective.steepness],
idx + 1,
)
if isinstance(objective, TargetObjective):
return (
[
lambda Z: (Z[..., idx] - (objective.target_value - objective.tolerance))
* -1.0,
lambda Z: (
Z[..., idx] - (objective.target_value + objective.tolerance)
),
],
[1.0 / objective.steepness, 1.0 / objective.steepness],
idx + 1,
)
if isinstance(objective, ConstrainedCategoricalObjective):
# The output of a categorical objective has final dim `c` where `c` is number of classes
# Pass in the expected acceptance probability and perform an inverse sigmoid to attain the original probabilities
return (
[
lambda Z: torch.log(
1
/ torch.clamp(
(
Z[..., idx : idx + len(objective.desirability)]
* torch.tensor(objective.desirability).to(**tkwargs)
).sum(-1),
min=eps,
max=1 - eps,
)
- 1,
),
],
[1.0],
idx + len(objective.desirability),
)
raise ValueError(f"Objective {objective.__class__.__name__} not known.")
get_initial_conditions_generator(strategy, transform_specs, ask_options=None, sequential=True)
Takes a strategy object and returns a callable which uses this
strategy to return a generator callable which can be used in botorchs
gen_batch_initial_conditions` to generate samples.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
strategy |
Strategy |
Strategy that should be used to generate samples. |
required |
transform_specs |
Dict |
Dictionary indicating how the samples should be transformed. |
required |
ask_options |
Dict |
Dictionary of keyword arguments that are
passed to the |
None |
sequential |
bool |
If True, samples for every q-batch are
generate independent from each other. If False, the |
True |
Returns:
Type | Description |
---|---|
Callable[[int, int, int], Tensor] |
Callable that can be passed to
|
Source code in bofire/utils/torch_tools.py
def get_initial_conditions_generator(
strategy: Strategy,
transform_specs: Dict,
ask_options: Optional[Dict] = None,
sequential: bool = True,
) -> Callable[[int, int, int], Tensor]:
"""Takes a strategy object and returns a callable which uses this
strategy to return a generator callable which can be used in botorch`s
`gen_batch_initial_conditions` to generate samples.
Args:
strategy (Strategy): Strategy that should be used to generate samples.
transform_specs (Dict): Dictionary indicating how the samples should be
transformed.
ask_options (Dict, optional): Dictionary of keyword arguments that are
passed to the `ask` method of the strategy. Defaults to {}.
sequential (bool, optional): If True, samples for every q-batch are
generate independent from each other. If False, the `n x q` samples
are generated at once.
Returns:
Callable[[int, int, int], Tensor]: Callable that can be passed to
`batch_initial_conditions`.
"""
if ask_options is None:
ask_options = {}
def generator(n: int, q: int, seed: int) -> Tensor:
if sequential:
initial_conditions = []
for _ in range(n):
candidates = strategy.ask(q, **ask_options)
# transform it
transformed_candidates = strategy.domain.inputs.transform(
candidates,
transform_specs,
)
# transform to tensor
initial_conditions.append(
torch.from_numpy(transformed_candidates.values).to(**tkwargs),
)
return torch.stack(initial_conditions, dim=0)
candidates = strategy.ask(n * q, **ask_options)
# transform it
transformed_candidates = strategy.domain.inputs.transform(
candidates,
transform_specs,
)
return (
torch.from_numpy(transformed_candidates.values)
.to(**tkwargs)
.reshape(n, q, transformed_candidates.shape[1])
)
return generator
get_interpoint_constraints(domain, n_candidates)
Converts interpoint equality constraints to linear equality constraints,
that can be processed by botorch. For more information, see the docstring
of optimize_acqf
in botorch
(https://github.com/pytorch/botorch/blob/main/botorch/optim/optimize.py).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
Optimization problem definition. |
required |
n_candidates |
int |
Number of candidates that should be requested. |
required |
Returns:
Type | Description |
---|---|
List[Tuple[Tensor, Tensor, float]] |
List of tuples, each tuple consists of a tensor with the feature indices, coefficients and a float for the rhs. |
Source code in bofire/utils/torch_tools.py
def get_interpoint_constraints(
domain: Domain,
n_candidates: int,
) -> List[Tuple[Tensor, Tensor, float]]:
"""Converts interpoint equality constraints to linear equality constraints,
that can be processed by botorch. For more information, see the docstring
of `optimize_acqf` in botorch
(https://github.com/pytorch/botorch/blob/main/botorch/optim/optimize.py).
Args:
domain (Domain): Optimization problem definition.
n_candidates (int): Number of candidates that should be requested.
Returns:
List[Tuple[Tensor, Tensor, float]]: List of tuples, each tuple consists
of a tensor with the feature indices, coefficients and a float for the rhs.
"""
constraints = []
if n_candidates == 1:
return constraints
for constraint in domain.constraints.get(InterpointEqualityConstraint):
assert isinstance(constraint, InterpointEqualityConstraint)
coefficients = torch.tensor([1.0, -1.0]).to(**tkwargs)
feat_idx = domain.inputs.get_keys(Input).index(constraint.feature)
feat = domain.inputs.get_by_key(constraint.feature)
assert isinstance(feat, ContinuousInput)
if feat.is_fixed():
continue
multiplicity = constraint.multiplicity or n_candidates
for i in range(math.ceil(n_candidates / multiplicity)):
all_indices = torch.arange(
i * multiplicity,
min((i + 1) * multiplicity, n_candidates),
)
for k in range(len(all_indices) - 1):
indices = torch.tensor(
[[all_indices[0], feat_idx], [all_indices[k + 1], feat_idx]],
dtype=torch.int64,
)
constraints.append((indices, coefficients, 0.0))
return constraints
get_linear_constraints(domain, constraint, unit_scaled=False)
Converts linear constraints to the form required by BoTorch.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
Optimization problem definition. |
required |
constraint |
Union[Type[bofire.data_models.constraints.linear.LinearEqualityConstraint], Type[bofire.data_models.constraints.linear.LinearInequalityConstraint]] |
Type of constraint that should be converted. |
required |
unit_scaled |
bool |
If True, transforms constraints by assuming that the bound for the continuous features are [0,1]. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
List[Tuple[Tensor, Tensor, float]] |
List of tuples, each tuple consists of a tensor with the feature indices, coefficients and a float for the rhs. |
Source code in bofire/utils/torch_tools.py
def get_linear_constraints(
domain: Domain,
constraint: Union[Type[LinearEqualityConstraint], Type[LinearInequalityConstraint]],
unit_scaled: bool = False,
) -> List[Tuple[Tensor, Tensor, float]]:
"""Converts linear constraints to the form required by BoTorch.
Args:
domain: Optimization problem definition.
constraint: Type of constraint that should be converted.
unit_scaled: If True, transforms constraints by assuming that the bound for the continuous features are [0,1]. Defaults to False.
Returns:
List[Tuple[Tensor, Tensor, float]]: List of tuples, each tuple consists of a tensor with the feature indices, coefficients and a float for the rhs.
"""
constraints = []
for c in domain.constraints.get(constraint):
indices = []
coefficients = []
lower = []
upper = []
rhs = 0.0
for i, featkey in enumerate(c.features):
idx = domain.inputs.get_keys(Input).index(featkey)
feat = domain.inputs.get_by_key(featkey)
if feat.is_fixed():
rhs -= feat.fixed_value()[0] * c.coefficients[i] # type: ignore
else:
lower.append(feat.lower_bound) # type: ignore
upper.append(feat.upper_bound) # type: ignore
indices.append(idx)
coefficients.append(
c.coefficients[i],
) # if unit_scaled == False else c_scaled.coefficients[i])
if unit_scaled:
lower = np.array(lower)
upper = np.array(upper)
s = upper - lower
scaled_coefficients = s * np.array(coefficients)
constraints.append(
(
torch.tensor(indices),
-torch.tensor(scaled_coefficients).to(**tkwargs),
-(rhs + c.rhs - np.sum(np.array(coefficients) * lower)),
),
)
else:
constraints.append(
(
torch.tensor(indices),
-torch.tensor(coefficients).to(**tkwargs),
-(rhs + c.rhs),
),
)
return constraints
get_multiobjective_objective(outputs, experiments)
Returns a callable that can be used by botorch for multiobjective optimization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
outputs |
Outputs |
Outputs object for which the callable should be generated. |
required |
experiments |
pd.DataFrame |
DataFrame containing the experiments that are used for
adapting the objectives on the fly, for example in the case of the
|
required |
Returns:
Type | Description |
---|---|
Callable[[Tensor], Tensor] |
description |
Source code in bofire/utils/torch_tools.py
def get_multiobjective_objective(
outputs: Outputs,
experiments: pd.DataFrame,
) -> Callable[[Tensor, Optional[Tensor]], Tensor]:
"""Returns a callable that can be used by botorch for multiobjective optimization.
Args:
outputs (Outputs): Outputs object for which the callable should be generated.
experiments (pd.DataFrame): DataFrame containing the experiments that are used for
adapting the objectives on the fly, for example in the case of the
`MovingMaximizeSigmoidObjective`.
Returns:
Callable[[Tensor], Tensor]: _description_
"""
callables = [
get_objective_callable(
idx=i,
objective=feat.objective,
x_adapt=torch.from_numpy(
outputs.preprocess_experiments_one_valid_output(feat.key, experiments)[
feat.key
].values,
).to(**tkwargs),
)
for i, feat in enumerate(outputs.get())
if feat.objective is not None
and isinstance(
feat.objective,
(MaximizeObjective, MinimizeObjective, CloseToTargetObjective),
)
]
def objective(samples: Tensor, X: Optional[Tensor] = None) -> Tensor:
return torch.stack([c(samples, None) for c in callables], dim=-1)
return objective
get_nchoosek_constraints(domain)
Transforms NChooseK constraints into a list of non-linear inequality constraint callables that can be parsed by pydantic. For this purpose the NChooseK constraint is continuously relaxed by countig the number of zeros in a candidate by a sum of narrow gaussians centered at zero.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
Optimization problem definition. |
required |
Returns:
Type | Description |
---|---|
List[Callable[[Tensor], float]] |
List of callables that can be used as nonlinear equality constraints in botorch. |
Source code in bofire/utils/torch_tools.py
def get_nchoosek_constraints(domain: Domain) -> List[Callable[[Tensor], float]]:
"""Transforms NChooseK constraints into a list of non-linear inequality constraint callables
that can be parsed by pydantic. For this purpose the NChooseK constraint is continuously
relaxed by countig the number of zeros in a candidate by a sum of narrow gaussians centered
at zero.
Args:
domain (Domain): Optimization problem definition.
Returns:
List[Callable[[Tensor], float]]: List of callables that can be used
as nonlinear equality constraints in botorch.
"""
def narrow_gaussian(x, ell=1e-3):
return torch.exp(-0.5 * (x / ell) ** 2)
def max_constraint(indices: Tensor, num_features: int, max_count: int):
return lambda x: narrow_gaussian(x=x[..., indices]).sum(dim=-1) - (
num_features - max_count
)
def min_constraint(indices: Tensor, num_features: int, min_count: int):
return lambda x: -narrow_gaussian(x=x[..., indices]).sum(dim=-1) + (
num_features - min_count
)
constraints = []
# ignore none also valid for the start
for c in domain.constraints.get(NChooseKConstraint):
assert isinstance(c, NChooseKConstraint)
indices = torch.tensor(
[domain.inputs.get_keys(ContinuousInput).index(key) for key in c.features],
dtype=torch.int64,
)
if c.max_count != len(c.features):
constraints.append(
max_constraint(
indices=indices,
num_features=len(c.features),
max_count=c.max_count,
),
)
if c.min_count > 0:
constraints.append(
min_constraint(
indices=indices,
num_features=len(c.features),
min_count=c.min_count,
),
)
return constraints
get_nonlinear_constraints(domain)
Returns a list of callable functions that represent the nonlinear constraints for the given domain that can be processed by botorch.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
The domain for which to generate the nonlinear constraints. |
required |
Returns:
Type | Description |
---|---|
List[Callable[[Tensor], float]] |
A list of callable functions that take a tensor as input and return a float value representing the constraint evaluation. |
Source code in bofire/utils/torch_tools.py
def get_nonlinear_constraints(domain: Domain) -> List[Callable[[Tensor], float]]:
"""Returns a list of callable functions that represent the nonlinear constraints
for the given domain that can be processed by botorch.
Args:
domain (Domain): The domain for which to generate the nonlinear constraints.
Returns:
List[Callable[[Tensor], float]]: A list of callable functions that take a tensor
as input and return a float value representing the constraint evaluation.
"""
return get_nchoosek_constraints(domain) + get_product_constraints(domain)
get_output_constraints(outputs, experiments)
Method to translate output constraint objectives into a list of callables and list of etas for use in botorch.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
outputs |
Outputs |
Output feature object that should be processed. |
required |
experiments |
pd.DataFrame |
DataFrame containing the experiments that are used for
adapting the objectives on the fly, for example in the case of the
|
required |
Returns:
Type | Description |
---|---|
Tuple[List[Callable[[Tensor], Tensor]], List[float]] |
List of constraint callables, list of associated etas. |
Source code in bofire/utils/torch_tools.py
def get_output_constraints(
outputs: Outputs,
experiments: pd.DataFrame,
) -> Tuple[List[Callable[[Tensor], Tensor]], List[float]]:
"""Method to translate output constraint objectives into a list of
callables and list of etas for use in botorch.
Args:
outputs (Outputs): Output feature object that should
be processed.
experiments (pd.DataFrame): DataFrame containing the experiments that are used for
adapting the objectives on the fly, for example in the case of the
`MovingMaximizeSigmoidObjective`.
Returns:
Tuple[List[Callable[[Tensor], Tensor]], List[float]]: List of constraint callables,
list of associated etas.
"""
constraints = []
etas = []
idx = 0
for feat in outputs.get():
if isinstance(feat.objective, ConstrainedObjective):
cleaned_experiments = outputs.preprocess_experiments_one_valid_output(
feat.key,
experiments,
)
iconstraints, ietas, idx = constrained_objective2botorch(
idx,
objective=feat.objective,
x_adapt=torch.from_numpy(cleaned_experiments[feat.key].values).to(
**tkwargs,
)
if not isinstance(feat.objective, ConstrainedCategoricalObjective)
else None,
)
constraints += iconstraints
etas += ietas
else:
idx += 1
return constraints, etas
get_product_constraints(domain)
Returns a list of nonlinear constraint functions that can be processed by botorch based on the given domain.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
The domain object containing the constraints. |
required |
Returns:
Type | Description |
---|---|
List[Callable[[Tensor], float]] |
A list of product constraint functions. |
Source code in bofire/utils/torch_tools.py
def get_product_constraints(domain: Domain) -> List[Callable[[Tensor], float]]:
"""Returns a list of nonlinear constraint functions that can be processed by botorch
based on the given domain.
Args:
domain (Domain): The domain object containing the constraints.
Returns:
List[Callable[[Tensor], float]]: A list of product constraint functions.
"""
def product_constraint(indices: Tensor, exponents: Tensor, rhs: float, sign: int):
return lambda x: -1.0 * sign * (x[..., indices] ** exponents).prod(dim=-1) + rhs
constraints = []
for c in domain.constraints.get(ProductInequalityConstraint):
assert isinstance(c, ProductInequalityConstraint)
indices = torch.tensor(
[domain.inputs.get_keys(ContinuousInput).index(key) for key in c.features],
dtype=torch.int64,
)
constraints.append(
product_constraint(indices, torch.tensor(c.exponents), c.rhs, c.sign),
)
return constraints