Utils
cheminformatics
smiles2fingerprints(smiles, bond_radius=5, n_bits=2048)
Transforms a list of smiles to an array of morgan fingerprints.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
smiles |
List[str] |
List of smiles |
required |
bond_radius |
int |
Bond radius to use. Defaults to 5. |
5 |
n_bits |
int |
Number of bits. Defaults to 2048. |
2048 |
Returns:
Type | Description |
---|---|
np.ndarray |
Numpy array holding the fingerprints |
Source code in bofire/utils/cheminformatics.py
def smiles2fingerprints(
smiles: List[str], bond_radius: int = 5, n_bits: int = 2048
) -> np.ndarray:
"""Transforms a list of smiles to an array of morgan fingerprints.
Args:
smiles (List[str]): List of smiles
bond_radius (int, optional): Bond radius to use. Defaults to 5.
n_bits (int, optional): Number of bits. Defaults to 2048.
Returns:
np.ndarray: Numpy array holding the fingerprints
"""
rdkit_mols = [smiles2mol(m) for m in smiles]
fps = [
AllChem.GetMorganFingerprintAsBitVect( # type: ignore
mol, radius=bond_radius, nBits=n_bits
)
for mol in rdkit_mols
]
return np.asarray(fps)
smiles2fragments(smiles, fragments_list=None)
Transforms smiles to an array of fragments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
smiles |
List[str] |
List of smiles |
required |
Returns:
Type | Description |
---|---|
np.ndarray |
Array holding the fragment information. |
Source code in bofire/utils/cheminformatics.py
def smiles2fragments(
smiles: List[str], fragments_list: Optional[List[str]] = None
) -> np.ndarray:
"""Transforms smiles to an array of fragments.
Args:
smiles (List[str]): List of smiles
Returns:
np.ndarray: Array holding the fragment information.
"""
rdkit_fragment_list = [
item for item in Descriptors.descList if item[0].startswith("fr_")
]
if fragments_list is None:
fragments = {d[0]: d[1] for d in rdkit_fragment_list}
else:
fragments = {d[0]: d[1] for d in rdkit_fragment_list if d[0] in fragments_list}
frags = np.zeros((len(smiles), len(fragments)))
for i, smi in enumerate(smiles):
mol = smiles2mol(smi)
features = [fragments[d](mol) for d in fragments]
frags[i, :] = features
return frags
smiles2mol(smiles)
Transforms a smiles string to an rdkit mol object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
smiles |
str |
Smiles string. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If string is not a valid smiles. |
Returns:
Type | Description |
---|---|
rdkit.Mol |
rdkit.mol object |
Source code in bofire/utils/cheminformatics.py
def smiles2mol(smiles: str):
"""Transforms a smiles string to an rdkit mol object.
Args:
smiles (str): Smiles string.
Raises:
ValueError: If string is not a valid smiles.
Returns:
rdkit.Mol: rdkit.mol object
"""
mol = MolFromSmiles(smiles)
if mol is None:
raise ValueError(f"{smiles} is not a valid smiles string.")
return mol
smiles2mordred(smiles, descriptors_list)
Transforms list of smiles to mordred moelcular descriptors.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
smiles |
List[str] |
List of smiles |
required |
descriptors_list |
List[str] |
List of desired mordred descriptors |
required |
Returns:
Type | Description |
---|---|
np.ndarray |
Array holding the mordred moelcular descriptors. |
Source code in bofire/utils/cheminformatics.py
def smiles2mordred(smiles: List[str], descriptors_list: List[str]) -> np.ndarray:
"""Transforms list of smiles to mordred moelcular descriptors.
Args:
smiles (List[str]): List of smiles
descriptors_list (List[str]): List of desired mordred descriptors
Returns:
np.ndarray: Array holding the mordred moelcular descriptors.
"""
mols = [smiles2mol(smi) for smi in smiles]
calc = Calculator(descriptors, ignore_3D=True)
calc.descriptors = [d for d in calc.descriptors if str(d) in descriptors_list]
descriptors_df = calc.pandas(mols)
nan_list = [
pd.to_numeric(descriptors_df[col], errors="coerce").isnull().values.any()
for col in descriptors_df.columns
]
if any(nan_list):
raise ValueError(
f"Found NaN values in descriptors {list(descriptors_df.columns[nan_list])}"
)
return descriptors_df.astype(float).values
doe
get_confounding_matrix(inputs, design, powers=None, interactions=None)
Analyzes the confounding of a design and returns the confounding matrix.
Only takes continuous features into account.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs |
Inputs |
Input features. |
required |
design |
pd.DataFrame |
Design matrix. |
required |
powers |
List[int] |
List of powers of the individual factors/features that should be considered. Integers has to be larger than 1. Defaults to []. |
None |
interactions |
List[int] |
List with interaction levels to be considered. Integers has to be larger than 1. Defaults to [2]. |
None |
Returns:
Type | Description |
---|---|
_type_ |
description |
Source code in bofire/utils/doe.py
def get_confounding_matrix(
inputs: Inputs,
design: pd.DataFrame,
powers: Optional[List[int]] = None,
interactions: Optional[List[int]] = None,
):
"""Analyzes the confounding of a design and returns the confounding matrix.
Only takes continuous features into account.
Args:
inputs (Inputs): Input features.
design (pd.DataFrame): Design matrix.
powers (List[int], optional): List of powers of the individual factors/features that should be considered.
Integers has to be larger than 1. Defaults to [].
interactions (List[int], optional): List with interaction levels to be considered.
Integers has to be larger than 1. Defaults to [2].
Returns:
_type_: _description_
"""
if len(inputs.get(CategoricalInput)) > 0:
warnings.warn("Categorical input features will be ignored.")
keys = inputs.get_keys(ContinuousInput)
scaler = MinMaxScaler(feature_range=(-1, 1))
scaled_design = pd.DataFrame(
data=scaler.fit_transform(design[keys]),
columns=keys,
)
# add powers
if powers is not None:
for p in powers:
assert p > 1, "Power has to be at least of degree two."
for key in keys:
scaled_design[f"{key}**{p}"] = scaled_design[key] ** p
# add interactions
if interactions is None:
interactions = [2]
for i in interactions:
assert i > 1, "Interaction has to be at least of degree two."
assert i < len(keys) + 1, f"Interaction has to be smaller than {len(keys)+1}."
for combi in itertools.combinations(keys, i):
scaled_design[":".join(combi)] = scaled_design[list(combi)].prod(axis=1)
return scaled_design.corr()
multiobjective
get_ref_point_mask(domain, output_feature_keys=None)
Method to get a mask for the reference points taking into account if we want to maximize or minimize an objective. In case it is maximize the value in the mask is 1, in case we want to minimize it is -1.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
Domain for which the mask should be generated. |
required |
output_feature_keys |
Optional[list] |
Name of output feature keys that should be considered in the mask. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
np.ndarray |
description |
Source code in bofire/utils/multiobjective.py
def get_ref_point_mask(
domain: Domain, output_feature_keys: Optional[list] = None
) -> np.ndarray:
"""Method to get a mask for the reference points taking into account if we
want to maximize or minimize an objective. In case it is maximize the value
in the mask is 1, in case we want to minimize it is -1.
Args:
domain (Domain): Domain for which the mask should be generated.
output_feature_keys (Optional[list], optional): Name of output feature keys
that should be considered in the mask. Defaults to None.
Returns:
np.ndarray: _description_
"""
if output_feature_keys is None:
output_feature_keys = domain.outputs.get_keys_by_objective(
includes=[MaximizeObjective, MinimizeObjective, CloseToTargetObjective]
)
if len(output_feature_keys) < 2:
raise ValueError("At least two output features have to be provided.")
mask = []
for key in output_feature_keys:
feat = domain.outputs.get_by_key(key)
if isinstance(feat.objective, MaximizeObjective): # type: ignore
mask.append(1.0)
elif isinstance(feat.objective, MinimizeObjective): # type: ignore
mask.append(-1.0)
elif isinstance(feat.objective, CloseToTargetObjective): # type: ignore
mask.append(-1.0)
else:
raise ValueError(
"Only `MaximizeObjective` and `MinimizeObjective` supported"
)
return np.array(mask)
naming_conventions
get_column_names(outputs)
Specifies column names for given Outputs type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
outputs |
Outputs |
The Outputs object containing the individual outputs. |
required |
Returns:
Type | Description |
---|---|
Tuple[List[str], List[str]] |
A tuple containing the prediction column names and the standard deviation column names |
Source code in bofire/utils/naming_conventions.py
def get_column_names(outputs: Outputs) -> Tuple[List[str], List[str]]:
"""
Specifies column names for given Outputs type.
Args:
outputs (Outputs): The Outputs object containing the individual outputs.
Returns:
Tuple[List[str], List[str]]: A tuple containing the prediction column names and the standard deviation column names
"""
pred_cols, sd_cols = [], []
for featkey in outputs.get_keys(CategoricalOutput): # type: ignore
pred_cols = pred_cols + [
f"{featkey}_{cat}_prob"
for cat in outputs.get_by_key(featkey).categories # type: ignore
]
sd_cols = sd_cols + [
f"{featkey}_{cat}_sd"
for cat in outputs.get_by_key(featkey).categories # type: ignore
]
for featkey in outputs.get_keys(ContinuousOutput): # type: ignore
pred_cols = pred_cols + [f"{featkey}_pred"]
sd_cols = sd_cols + [f"{featkey}_sd"]
return pred_cols, sd_cols
postprocess_categorical_predictions(predictions, outputs)
Postprocess categorical predictions by finding the maximum probability location
Parameters:
Name | Type | Description | Default |
---|---|---|---|
predictions |
pd.DataFrame |
The dataframe containing the predictions. |
required |
outputs |
Outputs |
The Outputs object containing the individual outputs. |
required |
Returns:
Type | Description |
---|---|
predictions (pd.DataFrame) |
The (potentially modified) original dataframe with categorical predictions added |
Source code in bofire/utils/naming_conventions.py
def postprocess_categorical_predictions(predictions: pd.DataFrame, outputs: Outputs) -> pd.DataFrame: # type: ignore
"""
Postprocess categorical predictions by finding the maximum probability location
Args:
predictions (pd.DataFrame): The dataframe containing the predictions.
outputs (Outputs): The Outputs object containing the individual outputs.
Returns:
predictions (pd.DataFrame): The (potentially modified) original dataframe with categorical predictions added
"""
for feat in outputs.get():
if isinstance(feat, CategoricalOutput): # type: ignore
predictions.insert(
loc=0,
column=f"{feat.key}_pred",
value=predictions.filter(regex=f"{feat.key}(.*)_prob")
.idxmax(1)
.str.replace(f"{feat.key}_", "")
.str.replace("_prob", "")
.values,
)
predictions.insert(
loc=1,
column=f"{feat.key}_sd",
value=0.0,
)
return predictions
reduce
AffineTransform
Class to switch back and forth from the reduced to the original domain.
Source code in bofire/utils/reduce.py
class AffineTransform:
"""Class to switch back and forth from the reduced to the original domain."""
def __init__(self, equalities: List[Tuple[str, List[str], List[float]]]):
"""Initializes a `AffineTransformation` object.
Args:
equalities (List[Tuple[str,List[str],List[float]]]): List of equalities. Every equality
is defined as a tuple, in which the first entry is the key of the reduced feature, the second
one is a list of feature keys that can be used to compute the feature and the third list of floats
are the corresponding coefficients.
"""
self.equalities = equalities
def augment_data(self, data: pd.DataFrame) -> pd.DataFrame:
"""Restore the eliminated features in a dataframe
Args:
data (pd.DataFrame): Dataframe that should be restored.
Returns:
pd.DataFrame: Restored dataframe
"""
if len(self.equalities) == 0:
return data
data = data.copy()
for name_lhs, names_rhs, coeffs in self.equalities:
data[name_lhs] = coeffs[-1]
for i, name in enumerate(names_rhs):
data[name_lhs] += coeffs[i] * data[name]
return data
def drop_data(self, data: pd.DataFrame) -> pd.DataFrame:
"""Drop eliminated features from a dataframe.
Args:
data (pd.DataFrame): Dataframe with features to be dropped.
Returns:
pd.DataFrame: Reduced dataframe.
"""
if len(self.equalities) == 0:
return data
drop = []
for name_lhs, _, _ in self.equalities:
if name_lhs in data.columns:
drop.append(name_lhs)
return data.drop(columns=drop)
__init__(self, equalities)
special
Initializes a AffineTransformation
object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
equalities |
List[Tuple[str,List[str],List[float]]] |
List of equalities. Every equality is defined as a tuple, in which the first entry is the key of the reduced feature, the second one is a list of feature keys that can be used to compute the feature and the third list of floats are the corresponding coefficients. |
required |
Source code in bofire/utils/reduce.py
def __init__(self, equalities: List[Tuple[str, List[str], List[float]]]):
"""Initializes a `AffineTransformation` object.
Args:
equalities (List[Tuple[str,List[str],List[float]]]): List of equalities. Every equality
is defined as a tuple, in which the first entry is the key of the reduced feature, the second
one is a list of feature keys that can be used to compute the feature and the third list of floats
are the corresponding coefficients.
"""
self.equalities = equalities
augment_data(self, data)
Restore the eliminated features in a dataframe
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
pd.DataFrame |
Dataframe that should be restored. |
required |
Returns:
Type | Description |
---|---|
pd.DataFrame |
Restored dataframe |
Source code in bofire/utils/reduce.py
def augment_data(self, data: pd.DataFrame) -> pd.DataFrame:
"""Restore the eliminated features in a dataframe
Args:
data (pd.DataFrame): Dataframe that should be restored.
Returns:
pd.DataFrame: Restored dataframe
"""
if len(self.equalities) == 0:
return data
data = data.copy()
for name_lhs, names_rhs, coeffs in self.equalities:
data[name_lhs] = coeffs[-1]
for i, name in enumerate(names_rhs):
data[name_lhs] += coeffs[i] * data[name]
return data
drop_data(self, data)
Drop eliminated features from a dataframe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
pd.DataFrame |
Dataframe with features to be dropped. |
required |
Returns:
Type | Description |
---|---|
pd.DataFrame |
Reduced dataframe. |
Source code in bofire/utils/reduce.py
def drop_data(self, data: pd.DataFrame) -> pd.DataFrame:
"""Drop eliminated features from a dataframe.
Args:
data (pd.DataFrame): Dataframe with features to be dropped.
Returns:
pd.DataFrame: Reduced dataframe.
"""
if len(self.equalities) == 0:
return data
drop = []
for name_lhs, _, _ in self.equalities:
if name_lhs in data.columns:
drop.append(name_lhs)
return data.drop(columns=drop)
adjust_boundary(feature, coef, rhs)
Adjusts the boundaries of a feature.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
feature |
ContinuousInput |
Feature to be adjusted. |
required |
coef |
float |
Coefficient. |
required |
rhs |
float |
Right-hand-side of the constraint. |
required |
Source code in bofire/utils/reduce.py
def adjust_boundary(feature: ContinuousInput, coef: float, rhs: float):
"""Adjusts the boundaries of a feature.
Args:
feature (ContinuousInput): Feature to be adjusted.
coef (float): Coefficient.
rhs (float): Right-hand-side of the constraint.
"""
boundary = rhs / coef
if coef > 0:
if boundary > feature.lower_bound:
feature.bounds = (boundary, feature.upper_bound)
else:
if boundary < feature.upper_bound:
feature.bounds = (feature.lower_bound, boundary)
check_domain_for_reduction(domain)
Check if the reduction can be applied or if a trivial case is present.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
Domain to be checked. |
required |
Returns:
Type | Description |
---|---|
bool |
True if reducable, else False. |
Source code in bofire/utils/reduce.py
def check_domain_for_reduction(domain: Domain) -> bool:
"""Check if the reduction can be applied or if a trivial case is present.
Args:
domain (Domain): Domain to be checked.
Returns:
bool: True if reducable, else False.
"""
# are there any constraints?
if len(domain.constraints) == 0:
return False
# are there any linear equality constraints?
linear_equalities = domain.constraints.get(LinearEqualityConstraint)
if len(linear_equalities) == 0:
return False
# are there no NChooseKConstraint constraints?
if len(domain.constraints.get([NChooseKConstraint])) > 0:
return False
# are there continuous inputs
continuous_inputs = domain.inputs.get(ContinuousInput)
if len(continuous_inputs) == 0:
return False
# check that equality constraints only contain continuous inputs
for c in linear_equalities:
assert isinstance(c, LinearConstraint)
for feat in c.features:
if feat not in domain.inputs.get_keys(ContinuousInput):
return False
return True
check_existence_of_solution(A_aug)
Given an augmented coefficient matrix this function determines the existence (and uniqueness) of solution using the rank theorem.
Source code in bofire/utils/reduce.py
def check_existence_of_solution(A_aug):
"""Given an augmented coefficient matrix this function determines the existence (and uniqueness) of solution using the rank theorem."""
A = A_aug[:, :-1]
b = A_aug[:, -1]
len_inputs = np.shape(A)[1]
# catch special cases
rk_A_aug = np.linalg.matrix_rank(A_aug)
rk_A = np.linalg.matrix_rank(A)
if rk_A == rk_A_aug:
if rk_A < len_inputs:
return # all good
else:
x = np.linalg.solve(A, b)
raise Exception(
f"There is a unique solution x for the linear equality constraints: x={x}"
)
elif rk_A < rk_A_aug:
raise Exception(
"There is no solution fulfilling the linear equality constraints."
)
reduce_domain(domain)
Reduce a domain with linear equality constraints to a subdomain where linear equality constraints are eliminated.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
Domain to be reduced. |
required |
Returns:
Type | Description |
---|---|
Tuple[Domain, AffineTransform] |
reduced domain and the according transformation to switch between the reduced and orginal domain. |
Source code in bofire/utils/reduce.py
def reduce_domain(domain: Domain) -> Tuple[Domain, AffineTransform]:
"""Reduce a domain with linear equality constraints to a subdomain where linear equality constraints are eliminated.
Args:
domain (Domain): Domain to be reduced.
Returns:
Tuple[Domain, AffineTransform]: reduced domain and the according transformation to switch between the
reduced and orginal domain.
"""
# check if the domain can be reduced
if not check_domain_for_reduction(domain):
return domain, AffineTransform([])
# find linear equality constraints
linear_equalities = domain.constraints.get(LinearEqualityConstraint)
other_constraints = domain.constraints.get(
Constraint, excludes=[LinearEqualityConstraint]
)
# only consider continuous inputs
continuous_inputs = [
cast(ContinuousInput, f) for f in domain.inputs.get(ContinuousInput)
]
other_inputs = domain.inputs.get(Input, excludes=[ContinuousInput])
# assemble Matrix A from equality constraints
N = len(linear_equalities)
M = len(continuous_inputs) + 1
names = np.concatenate(([feat.key for feat in continuous_inputs], ["rhs"]))
A_aug = pd.DataFrame(data=np.zeros(shape=(N, M)), columns=names)
for i in range(len(linear_equalities)):
c = linear_equalities[i]
assert isinstance(c, LinearEqualityConstraint)
A_aug.loc[i, c.features] = c.coefficients # type: ignore
A_aug.loc[i, "rhs"] = c.rhs
A_aug = A_aug.values
# catch special cases
check_existence_of_solution(A_aug)
# bring A_aug to reduced row-echelon form
A_aug_rref, pivots = rref(A_aug)
pivots = np.array(pivots)
A_aug_rref = np.array(A_aug_rref).astype(np.float64)
# formulate box bounds as linear inequality constraints in matrix form
B = np.zeros(shape=(2 * (M - 1), M))
B[: M - 1, : M - 1] = np.eye(M - 1)
B[M - 1 :, : M - 1] = -np.eye(M - 1)
B[: M - 1, -1] = np.array([feat.upper_bound for feat in continuous_inputs])
B[M - 1 :, -1] = -1.0 * np.array([feat.lower_bound for feat in continuous_inputs])
# eliminate columns with pivot element
for i in range(len(pivots)):
p = pivots[i]
B[p, :] -= A_aug_rref[i, :]
B[p + M - 1, :] += A_aug_rref[i, :]
# build up reduced domain
_domain = Domain.model_construct(
# _fields_set = {"inputs", "outputs", "constraints"}
inputs=deepcopy(other_inputs),
outputs=deepcopy(domain.outputs),
constraints=deepcopy(other_constraints),
)
new_inputs = [
deepcopy(feat) for i, feat in enumerate(continuous_inputs) if i not in pivots
]
all_inputs = _domain.inputs + new_inputs
assert isinstance(all_inputs, Inputs)
_domain.inputs.features = all_inputs.features
constraints: List[AnyConstraint] = []
for i in pivots:
# reduce equation system of upper bounds
ind = np.where(B[i, :-1] != 0)[0]
if len(ind) > 0 and B[i, -1] < np.inf:
if len(list(names[ind])) > 1:
c = LinearInequalityConstraint.from_greater_equal(
features=list(names[ind]),
coefficients=(-1.0 * B[i, ind]).tolist(),
rhs=B[i, -1] * -1.0,
)
constraints.append(c)
else:
key = names[ind][0]
feat = cast(ContinuousInput, _domain.inputs.get_by_key(key))
adjust_boundary(feat, (-1.0 * B[i, ind])[0], B[i, -1] * -1.0)
else:
if B[i, -1] < -1e-16:
raise Exception("There is no solution that fulfills the constraints.")
# reduce equation system of lower bounds
ind = np.where(B[i + M - 1, :-1] != 0)[0]
if len(ind) > 0 and B[i + M - 1, -1] < np.inf:
if len(list(names[ind])) > 1:
c = LinearInequalityConstraint.from_greater_equal(
features=list(names[ind]),
coefficients=(-1.0 * B[i + M - 1, ind]).tolist(),
rhs=B[i + M - 1, -1] * -1.0,
)
constraints.append(c)
else:
key = names[ind][0]
feat = cast(ContinuousInput, _domain.inputs.get_by_key(key))
adjust_boundary(
feat,
(-1.0 * B[i + M - 1, ind])[0],
B[i + M - 1, -1] * -1.0,
)
else:
if B[i + M - 1, -1] < -1e-16:
raise Exception("There is no solution that fulfills the constraints.")
if len(constraints) > 0:
_domain.constraints.constraints = _domain.constraints.constraints + constraints # type: ignore
# assemble equalities
_equalities = []
for i in range(len(pivots)):
name_lhs = names[pivots[i]]
names_rhs = []
coeffs = []
for j in range(len(names) - 1):
if A_aug_rref[i, j] != 0 and j != pivots[i]:
coeffs.append(-A_aug_rref[i, j])
names_rhs.append(names[j])
coeffs.append(A_aug_rref[i, -1])
_equalities.append((name_lhs, names_rhs, coeffs))
trafo = AffineTransform(_equalities)
# remove remaining dependencies of eliminated inputs from the problem
_domain = remove_eliminated_inputs(_domain, trafo)
return _domain, trafo
remove_eliminated_inputs(domain, transform)
Eliminates remaining occurences of eliminated inputs in linear constraints.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
Domain in which the linear constraints should be purged. |
required |
transform |
AffineTransform |
Affine transformation object that defines the obsolete features. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If feature occurs in a constraint different from a linear one. |
Returns:
Type | Description |
---|---|
Domain |
Purged domain. |
Source code in bofire/utils/reduce.py
def remove_eliminated_inputs(domain: Domain, transform: AffineTransform) -> Domain:
"""Eliminates remaining occurences of eliminated inputs in linear constraints.
Args:
domain (Domain): Domain in which the linear constraints should be purged.
transform (AffineTransform): Affine transformation object that defines the obsolete features.
Raises:
ValueError: If feature occurs in a constraint different from a linear one.
Returns:
Domain: Purged domain.
"""
inputs_names = domain.inputs.get_keys()
M = len(inputs_names)
# write the equalities for the backtransformation into one matrix
inputs_dict = {inputs_names[i]: i for i in range(M)}
# build up dict from domain.equalities e.g. {"xi1": [coeff(xj1), ..., coeff(xjn)], ... "xik":...}
coeffs_dict = {}
for e in transform.equalities:
coeffs = np.zeros(M + 1)
for j, name in enumerate(e[1]):
coeffs[inputs_dict[name]] = e[2][j]
coeffs[-1] = e[2][-1]
coeffs_dict[e[0]] = coeffs
constraints = []
for c in domain.constraints.get():
# Nonlinear constraints not supported
if not isinstance(c, LinearConstraint):
raise ValueError(
"Elimination of variables is only supported for LinearEquality and LinearInequality constraints."
)
# no changes, if the constraint does not contain eliminated inputs
elif all(name in inputs_names for name in c.features):
constraints.append(c)
# remove inputs from the constraint that were eliminated from the inputs before
else:
totally_removed = False
_features = np.array(inputs_names)
_rhs = c.rhs
# create new lhs and rhs from the old one and knowledge from problem._equalities
_coefficients = np.zeros(M)
for j, name in enumerate(c.features):
if name in inputs_names:
_coefficients[inputs_dict[name]] += c.coefficients[j]
else:
_coefficients += c.coefficients[j] * coeffs_dict[name][:-1]
_rhs -= c.coefficients[j] * coeffs_dict[name][-1]
_features = _features[np.abs(_coefficients) > 1e-16]
_coefficients = _coefficients[np.abs(_coefficients) > 1e-16]
_c = None
if isinstance(c, LinearEqualityConstraint):
if len(_features) > 1:
_c = LinearEqualityConstraint(
features=_features.tolist(),
coefficients=_coefficients.tolist(),
rhs=_rhs,
)
elif len(_features) == 0:
totally_removed = True
else:
feat: ContinuousInput = ContinuousInput(
**domain.inputs.get_by_key(_features[0]).model_dump()
)
feat.bounds = (_coefficients[0], _coefficients[0])
totally_removed = True
else:
if len(_features) > 1:
_c = LinearInequalityConstraint(
features=_features.tolist(),
coefficients=_coefficients.tolist(),
rhs=_rhs,
)
elif len(_features) == 0:
totally_removed = True
else:
feat = cast(ContinuousInput, domain.inputs.get_by_key(_features[0]))
adjust_boundary(feat, _coefficients[0], _rhs)
totally_removed = True
# check if constraint is always fulfilled/not fulfilled
if not totally_removed:
assert _c is not None
if len(_c.features) == 0 and _c.rhs >= 0:
pass
elif len(_c.features) == 0 and _c.rhs < 0:
raise Exception("Linear constraints cannot be fulfilled.")
elif np.isinf(_c.rhs):
pass
else:
constraints.append(_c)
domain.constraints = Constraints(constraints=constraints)
return domain
rref(A, tol=1e-08)
Computes the reduced row echelon form of a Matrix
Parameters:
Name | Type | Description | Default |
---|---|---|---|
A |
ndarray |
2d array representing a matrix. |
required |
tol |
float |
tolerance for rounding to 0. Defaults to 1e-8. |
1e-08 |
Returns:
Type | Description |
---|---|
Tuple[numpy.ndarray, List[int]] |
(A_rref, pivots), where A_rref is the reduced row echelon form of A and pivots is a numpy array containing the pivot columns of A_rref |
Source code in bofire/utils/reduce.py
def rref(A: np.ndarray, tol: float = 1e-8) -> Tuple[np.ndarray, List[int]]:
"""Computes the reduced row echelon form of a Matrix
Args:
A (ndarray): 2d array representing a matrix.
tol (float, optional): tolerance for rounding to 0. Defaults to 1e-8.
Returns:
(A_rref, pivots), where A_rref is the reduced row echelon form of A and pivots
is a numpy array containing the pivot columns of A_rref
"""
A = np.array(A, dtype=np.float64)
n, m = np.shape(A)
col = 0
row = 0
pivots = []
for col in range(m):
# does a pivot element exist?
if all(np.abs(A[row:, col]) < tol):
pass
# if yes: start elimination
else:
pivots.append(col)
max_row = np.argmax(np.abs(A[row:, col])) + row
# switch to most stable row
A[[row, max_row], :] = A[[max_row, row], :] # type: ignore
# normalize row
A[row, :] /= A[row, col]
# eliminate other elements from column
for r in range(n):
if r != row:
A[r, :] -= A[r, col] / A[row, col] * A[row, :]
row += 1
prec = int(-np.log10(tol))
return np.round(A, prec), pivots
subdomain
get_subdomain(domain, feature_keys)
removes all features not defined as argument creating a subdomain of the provided domain
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
the original domain wherefrom a subdomain should be created |
required |
feature_keys |
List |
List of features that shall be included in the subdomain |
required |
Exceptions:
Type | Description |
---|---|
Assert |
when in total less than 2 features are provided |
ValueError |
when a provided feature key is not present in the provided domain |
Assert |
when no output feature is provided |
Assert |
when no input feature is provided |
ValueError |
description |
Returns:
Type | Description |
---|---|
Domain |
A new domain containing only parts of the original domain |
Source code in bofire/utils/subdomain.py
def get_subdomain(
domain: Domain,
feature_keys: List,
) -> Domain:
"""removes all features not defined as argument creating a subdomain of the provided domain
Args:
domain (Domain): the original domain wherefrom a subdomain should be created
feature_keys (List): List of features that shall be included in the subdomain
Raises:
Assert: when in total less than 2 features are provided
ValueError: when a provided feature key is not present in the provided domain
Assert: when no output feature is provided
Assert: when no input feature is provided
ValueError: _description_
Returns:
Domain: A new domain containing only parts of the original domain
"""
assert len(feature_keys) >= 2, "At least two features have to be provided."
outputs = []
inputs = []
for key in feature_keys:
try:
feat = (domain.inputs + domain.outputs).get_by_key(key)
except KeyError:
raise ValueError(f"Feature {key} not present in domain.")
if isinstance(feat, Input):
inputs.append(feat)
else:
outputs.append(feat)
assert len(outputs) > 0, "At least one output feature has to be provided."
assert len(inputs) > 0, "At least one input feature has to be provided."
inputs = Inputs(features=inputs)
outputs = Outputs(features=outputs)
# loop over constraints and make sure that all features used in constraints are in the input_feature_keys
for c in domain.constraints:
for key in c.features: # type: ignore
if key not in inputs.get_keys():
raise ValueError(
f"Removed input feature {key} is used in a constraint."
)
subdomain = deepcopy(domain)
subdomain.inputs = inputs
subdomain.outputs = outputs
return subdomain
torch_tools
constrained_objective2botorch(idx, objective, eps=1e-08)
Create a callable that can be used by botorch.utils.objective.apply_constraints
to setup ouput constrained optimizations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
idx |
int |
Index of the constraint objective in the list of outputs. |
required |
objective |
BotorchConstrainedObjective |
The objective that should be transformed. |
required |
Returns:
Type | Description |
---|---|
Tuple[List[Callable[[Tensor], Tensor]], List[float], int] |
List of callables that can be used by botorch for setting up the constrained objective, list of the corresponding botorch eta values, final index used by the method (to track for categorical variables) |
Source code in bofire/utils/torch_tools.py
def constrained_objective2botorch(
idx: int, objective: ConstrainedObjective, eps: float = 1e-8
) -> Tuple[List[Callable[[Tensor], Tensor]], List[float], int]:
"""Create a callable that can be used by `botorch.utils.objective.apply_constraints`
to setup ouput constrained optimizations.
Args:
idx (int): Index of the constraint objective in the list of outputs.
objective (BotorchConstrainedObjective): The objective that should be transformed.
Returns:
Tuple[List[Callable[[Tensor], Tensor]], List[float], int]: List of callables that can be used by botorch for setting up the constrained objective,
list of the corresponding botorch eta values, final index used by the method (to track for categorical variables)
"""
assert isinstance(
objective, ConstrainedObjective
), "Objective is not a `ConstrainedObjective`."
if isinstance(objective, MaximizeSigmoidObjective):
return (
[lambda Z: (Z[..., idx] - objective.tp) * -1.0],
[1.0 / objective.steepness],
idx + 1,
)
elif isinstance(objective, MinimizeSigmoidObjective):
return (
[lambda Z: (Z[..., idx] - objective.tp)],
[1.0 / objective.steepness],
idx + 1,
)
elif isinstance(objective, TargetObjective):
return (
[
lambda Z: (Z[..., idx] - (objective.target_value - objective.tolerance))
* -1.0,
lambda Z: (
Z[..., idx] - (objective.target_value + objective.tolerance)
),
],
[1.0 / objective.steepness, 1.0 / objective.steepness],
idx + 1,
)
elif isinstance(objective, ConstrainedCategoricalObjective):
# The output of a categorical objective has final dim `c` where `c` is number of classes
# Pass in the expected acceptance probability and perform an inverse sigmoid to atain the original probabilities
return (
[
lambda Z: torch.log(
1
/ torch.clamp(
(
Z[..., idx : idx + len(objective.desirability)]
* torch.tensor(objective.desirability).to(**tkwargs)
).sum(-1),
min=eps,
max=1 - eps,
)
- 1,
)
],
[1.0],
idx + len(objective.desirability),
)
else:
raise ValueError(f"Objective {objective.__class__.__name__} not known.")
get_initial_conditions_generator(strategy, transform_specs, ask_options=None, sequential=True)
Takes a strategy object and returns a callable which uses this
strategy to return a generator callable which can be used in botorchs
gen_batch_initial_conditions` to generate samples.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
strategy |
Strategy |
Strategy that should be used to generate samples. |
required |
transform_specs |
Dict |
Dictionary indicating how the samples should be transformed. |
required |
ask_options |
Dict |
Dictionary of keyword arguments that are
passed to the |
None |
sequential |
bool |
If True, samples for every q-batch are
generate indepenent from each other. If False, the |
True |
Returns:
Type | Description |
---|---|
Callable[[int, int, int], Tensor] |
Callable that can be passed to
|
Source code in bofire/utils/torch_tools.py
def get_initial_conditions_generator(
strategy: Strategy,
transform_specs: Dict,
ask_options: Optional[Dict] = None,
sequential: bool = True,
) -> Callable[[int, int, int], Tensor]:
"""Takes a strategy object and returns a callable which uses this
strategy to return a generator callable which can be used in botorch`s
`gen_batch_initial_conditions` to generate samples.
Args:
strategy (Strategy): Strategy that should be used to generate samples.
transform_specs (Dict): Dictionary indicating how the samples should be
transformed.
ask_options (Dict, optional): Dictionary of keyword arguments that are
passed to the `ask` method of the strategy. Defaults to {}.
sequential (bool, optional): If True, samples for every q-batch are
generate indepenent from each other. If False, the `n x q` samples
are generated at once.
Returns:
Callable[[int, int, int], Tensor]: Callable that can be passed to
`batch_initial_conditions`.
"""
if ask_options is None:
ask_options = {}
def generator(n: int, q: int, seed: int) -> Tensor:
if sequential:
initial_conditions = []
for _ in range(n):
candidates = strategy.ask(q, **ask_options)
# transform it
transformed_candidates = strategy.domain.inputs.transform(
candidates, transform_specs
)
# transform to tensor
initial_conditions.append(
torch.from_numpy(transformed_candidates.values).to(**tkwargs)
)
return torch.stack(initial_conditions, dim=0)
else:
candidates = strategy.ask(n * q, **ask_options)
# transform it
transformed_candidates = strategy.domain.inputs.transform(
candidates, transform_specs
)
return (
torch.from_numpy(transformed_candidates.values)
.to(**tkwargs)
.reshape(n, q, transformed_candidates.shape[1])
)
return generator
get_interpoint_constraints(domain, n_candidates)
Converts interpoint equality constraints to linear equality constraints,
that can be processed by botorch. For more information, see the docstring
of optimize_acqf
in botorch
(https://github.com/pytorch/botorch/blob/main/botorch/optim/optimize.py).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
Optimization problem definition. |
required |
n_candidates |
int |
Number of candidates that should be requested. |
required |
Returns:
Type | Description |
---|---|
List[Tuple[Tensor, Tensor, float]] |
List of tuples, each tuple consists of a tensor with the feature indices, coefficients and a float for the rhs. |
Source code in bofire/utils/torch_tools.py
def get_interpoint_constraints(
domain: Domain, n_candidates: int
) -> List[Tuple[Tensor, Tensor, float]]:
"""Converts interpoint equality constraints to linear equality constraints,
that can be processed by botorch. For more information, see the docstring
of `optimize_acqf` in botorch
(https://github.com/pytorch/botorch/blob/main/botorch/optim/optimize.py).
Args:
domain (Domain): Optimization problem definition.
n_candidates (int): Number of candidates that should be requested.
Returns:
List[Tuple[Tensor, Tensor, float]]: List of tuples, each tuple consists
of a tensor with the feature indices, coefficients and a float for the rhs.
"""
constraints = []
for constraint in domain.constraints.get(InterpointEqualityConstraint):
assert isinstance(constraint, InterpointEqualityConstraint)
coefficients = torch.tensor([1.0, -1.0]).to(**tkwargs)
feat_idx = domain.inputs.get_keys(Input).index(constraint.feature)
feat = domain.inputs.get_by_key(constraint.feature)
assert isinstance(feat, ContinuousInput)
if feat.is_fixed():
continue
multiplicity = constraint.multiplicity or n_candidates
for i in range(math.ceil(n_candidates / multiplicity)):
all_indices = torch.arange(
i * multiplicity, min((i + 1) * multiplicity, n_candidates)
)
for k in range(len(all_indices) - 1):
indices = torch.tensor(
[[all_indices[0], feat_idx], [all_indices[k + 1], feat_idx]],
dtype=torch.int64,
)
constraints.append((indices, coefficients, 0.0))
return constraints
get_linear_constraints(domain, constraint, unit_scaled=False)
Converts linear constraints to the form required by BoTorch.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
Optimization problem definition. |
required |
constraint |
Union[Type[bofire.data_models.constraints.linear.LinearEqualityConstraint], Type[bofire.data_models.constraints.linear.LinearInequalityConstraint]] |
Type of constraint that should be converted. |
required |
unit_scaled |
bool |
If True, transforms constraints by assuming that the bound for the continuous features are [0,1]. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
List[Tuple[Tensor, Tensor, float]] |
List of tuples, each tuple consists of a tensor with the feature indices, coefficients and a float for the rhs. |
Source code in bofire/utils/torch_tools.py
def get_linear_constraints(
domain: Domain,
constraint: Union[Type[LinearEqualityConstraint], Type[LinearInequalityConstraint]],
unit_scaled: bool = False,
) -> List[Tuple[Tensor, Tensor, float]]:
"""Converts linear constraints to the form required by BoTorch.
Args:
domain: Optimization problem definition.
constraint: Type of constraint that should be converted.
unit_scaled: If True, transforms constraints by assuming that the bound for the continuous features are [0,1]. Defaults to False.
Returns:
List[Tuple[Tensor, Tensor, float]]: List of tuples, each tuple consists of a tensor with the feature indices, coefficients and a float for the rhs.
"""
constraints = []
for c in domain.constraints.get(constraint):
indices = []
coefficients = []
lower = []
upper = []
rhs = 0.0
for i, featkey in enumerate(c.features): # type: ignore
idx = domain.inputs.get_keys(Input).index(featkey)
feat = domain.inputs.get_by_key(featkey)
if feat.is_fixed(): # type: ignore
rhs -= feat.fixed_value()[0] * c.coefficients[i] # type: ignore
else:
lower.append(feat.lower_bound) # type: ignore
upper.append(feat.upper_bound) # type: ignore
indices.append(idx)
coefficients.append(
c.coefficients[i] # type: ignore
) # if unit_scaled == False else c_scaled.coefficients[i])
if unit_scaled:
lower = np.array(lower)
upper = np.array(upper)
s = upper - lower
scaled_coefficients = s * np.array(coefficients)
constraints.append(
(
torch.tensor(indices),
-torch.tensor(scaled_coefficients).to(**tkwargs),
-(rhs + c.rhs - np.sum(np.array(coefficients) * lower)), # type: ignore
)
)
else:
constraints.append(
(
torch.tensor(indices),
-torch.tensor(coefficients).to(**tkwargs),
-(rhs + c.rhs), # type: ignore
)
)
return constraints
get_multiobjective_objective(outputs)
Returns
Parameters:
Name | Type | Description | Default |
---|---|---|---|
outputs |
Outputs |
description |
required |
Returns:
Type | Description |
---|---|
Callable[[Tensor], Tensor] |
description |
Source code in bofire/utils/torch_tools.py
def get_multiobjective_objective(
outputs: Outputs,
) -> Callable[[Tensor, Optional[Tensor]], Tensor]:
"""Returns
Args:
outputs (Outputs): _description_
Returns:
Callable[[Tensor], Tensor]: _description_
"""
callables = [
get_objective_callable(idx=i, objective=feat.objective) # type: ignore
for i, feat in enumerate(outputs.get())
if feat.objective is not None # type: ignore
and isinstance(
feat.objective, # type: ignore
(MaximizeObjective, MinimizeObjective, CloseToTargetObjective),
)
]
def objective(samples: Tensor, X: Optional[Tensor] = None) -> Tensor:
return torch.stack([c(samples, None) for c in callables], dim=-1)
return objective
get_nchoosek_constraints(domain)
Transforms NChooseK constraints into a list of non-linear inequality constraint callables that can be parsed by pydantic. For this purpose the NChooseK constraint is continuously relaxed by countig the number of zeros in a candidate by a sum of narrow gaussians centered at zero.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
Optimization problem definition. |
required |
Returns:
Type | Description |
---|---|
List[Callable[[Tensor], float]] |
List of callables that can be used as nonlinear equality constraints in botorch. |
Source code in bofire/utils/torch_tools.py
def get_nchoosek_constraints(domain: Domain) -> List[Callable[[Tensor], float]]:
"""Transforms NChooseK constraints into a list of non-linear inequality constraint callables
that can be parsed by pydantic. For this purpose the NChooseK constraint is continuously
relaxed by countig the number of zeros in a candidate by a sum of narrow gaussians centered
at zero.
Args:
domain (Domain): Optimization problem definition.
Returns:
List[Callable[[Tensor], float]]: List of callables that can be used
as nonlinear equality constraints in botorch.
"""
def narrow_gaussian(x, ell=1e-3):
return torch.exp(-0.5 * (x / ell) ** 2)
def max_constraint(indices: Tensor, num_features: int, max_count: int):
return lambda x: narrow_gaussian(x=x[..., indices]).sum(dim=-1) - (
num_features - max_count
)
def min_constraint(indices: Tensor, num_features: int, min_count: int):
return lambda x: -narrow_gaussian(x=x[..., indices]).sum(dim=-1) + (
num_features - min_count
)
constraints = []
# ignore none also valid for the start
for c in domain.constraints.get(NChooseKConstraint):
assert isinstance(c, NChooseKConstraint)
indices = torch.tensor(
[domain.inputs.get_keys(ContinuousInput).index(key) for key in c.features],
dtype=torch.int64,
)
if c.max_count != len(c.features):
constraints.append(
max_constraint(
indices=indices, num_features=len(c.features), max_count=c.max_count
)
)
if c.min_count > 0:
constraints.append(
min_constraint(
indices=indices, num_features=len(c.features), min_count=c.min_count
)
)
return constraints
get_nonlinear_constraints(domain)
Returns a list of callable functions that represent the nonlinear constraints for the given domain that can be processed by botorch.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
The domain for which to generate the nonlinear constraints. |
required |
Returns:
Type | Description |
---|---|
List[Callable[[Tensor], float]] |
A list of callable functions that take a tensor as input and return a float value representing the constraint evaluation. |
Source code in bofire/utils/torch_tools.py
def get_nonlinear_constraints(domain: Domain) -> List[Callable[[Tensor], float]]:
"""
Returns a list of callable functions that represent the nonlinear constraints
for the given domain that can be processed by botorch.
Parameters:
domain (Domain): The domain for which to generate the nonlinear constraints.
Returns:
List[Callable[[Tensor], float]]: A list of callable functions that take a tensor
as input and return a float value representing the constraint evaluation.
"""
return get_nchoosek_constraints(domain) + get_product_constraints(domain)
get_output_constraints(outputs)
Method to translate output constraint objectives into a list of callables and list of etas for use in botorch.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
outputs |
Outputs |
Output feature object that should be processed. |
required |
Returns:
Type | Description |
---|---|
Tuple[List[Callable[[Tensor], Tensor]], List[float]] |
List of constraint callables, list of associated etas. |
Source code in bofire/utils/torch_tools.py
def get_output_constraints(
outputs: Outputs,
) -> Tuple[List[Callable[[Tensor], Tensor]], List[float]]:
"""Method to translate output constraint objectives into a list of
callables and list of etas for use in botorch.
Args:
outputs (Outputs): Output feature object that should
be processed.
Returns:
Tuple[List[Callable[[Tensor], Tensor]], List[float]]: List of constraint callables,
list of associated etas.
"""
constraints = []
etas = []
idx = 0
for feat in outputs.get():
if isinstance(feat.objective, ConstrainedObjective): # type: ignore
iconstraints, ietas, idx = constrained_objective2botorch(
idx,
objective=feat.objective, # type: ignore
)
constraints += iconstraints
etas += ietas
else:
idx += 1
return constraints, etas
get_product_constraints(domain)
Returns a list of nonlinear constraint functions that can be processed by botorch based on the given domain.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
Domain |
The domain object containing the constraints. |
required |
Returns:
Type | Description |
---|---|
List[Callable[[Tensor], float]] |
A list of product constraint functions. |
Source code in bofire/utils/torch_tools.py
def get_product_constraints(domain: Domain) -> List[Callable[[Tensor], float]]:
"""
Returns a list of nonlinear constraint functions that can be processed by botorch
based on the given domain.
Args:
domain (Domain): The domain object containing the constraints.
Returns:
List[Callable[[Tensor], float]]: A list of product constraint functions.
"""
def product_constraint(indices: Tensor, exponents: Tensor, rhs: float, sign: int):
return lambda x: -1.0 * sign * (x[..., indices] ** exponents).prod(dim=-1) + rhs
constraints = []
for c in domain.constraints.get(ProductInequalityConstraint):
assert isinstance(c, ProductInequalityConstraint)
indices = torch.tensor(
[domain.inputs.get_keys(ContinuousInput).index(key) for key in c.features],
dtype=torch.int64,
)
constraints.append(
product_constraint(indices, torch.tensor(c.exponents), c.rhs, c.sign)
)
return constraints