Skip to content

Utils

cheminformatics

smiles2fingerprints(smiles, bond_radius=5, n_bits=2048)

Transforms a list of smiles to an array of morgan fingerprints.

Parameters:

Name Type Description Default
smiles List[str]

List of smiles

required
bond_radius int

Bond radius to use. Defaults to 5.

5
n_bits int

Number of bits. Defaults to 2048.

2048

Returns:

Type Description
ndarray

np.ndarray: Numpy array holding the fingerprints

Source code in bofire/utils/cheminformatics.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def smiles2fingerprints(
    smiles: List[str],
    bond_radius: int = 5,
    n_bits: int = 2048,
) -> np.ndarray:
    """Transforms a list of smiles to an array of morgan fingerprints.

    Args:
        smiles (List[str]): List of smiles
        bond_radius (int, optional): Bond radius to use. Defaults to 5.
        n_bits (int, optional): Number of bits. Defaults to 2048.

    Returns:
        np.ndarray: Numpy array holding the fingerprints

    """
    rdkit_mols = [smiles2mol(m) for m in smiles]
    fps = [
        AllChem.GetMorganFingerprintAsBitVect(mol, radius=bond_radius, nBits=n_bits)  # type: ignore
        for mol in rdkit_mols
    ]

    return np.asarray(fps)

smiles2fragments(smiles, fragments_list=None)

Transforms smiles to an array of fragments.

Parameters:

Name Type Description Default
smiles list[str]

List of smiles

required
fragments_list list[str]

List of desired fragments. Defaults to None.

None

Returns:

Type Description
ndarray

np.ndarray: Array holding the fragment information.

Source code in bofire/utils/cheminformatics.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def smiles2fragments(
    smiles: List[str],
    fragments_list: Optional[List[str]] = None,
) -> np.ndarray:
    """Transforms smiles to an array of fragments.

    Args:
        smiles (list[str]): List of smiles
        fragments_list (list[str], optional): List of desired fragments. Defaults to None.

    Returns:
        np.ndarray: Array holding the fragment information.

    """
    rdkit_fragment_list = [
        item
        for item in Descriptors.descList
        if item[0].startswith("fr_")  # type: ignore
    ]
    if fragments_list is None:
        fragments = {d[0]: d[1] for d in rdkit_fragment_list}
    else:
        fragments = {d[0]: d[1] for d in rdkit_fragment_list if d[0] in fragments_list}

    frags = np.zeros((len(smiles), len(fragments)))
    for i, smi in enumerate(smiles):
        mol = smiles2mol(smi)
        features = [fragments[d](mol) for d in fragments]
        frags[i, :] = features

    return frags

smiles2mol(smiles)

Transforms a smiles string to an rdkit mol object.

Parameters:

Name Type Description Default
smiles str

Smiles string.

required

Raises:

Type Description
ValueError

If string is not a valid smiles.

Returns:

Type Description

rdkit.Mol: rdkit.mol object

Source code in bofire/utils/cheminformatics.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def smiles2mol(smiles: str):
    """Transforms a smiles string to an rdkit mol object.

    Args:
        smiles (str): Smiles string.

    Raises:
        ValueError: If string is not a valid smiles.

    Returns:
        rdkit.Mol: rdkit.mol object

    """
    mol = MolFromSmiles(smiles)  # type: ignore
    if mol is None:
        raise ValueError(f"{smiles} is not a valid smiles string.")
    return mol

smiles2mordred(smiles, descriptors_list)

Transforms list of smiles to mordred moelcular descriptors.

Parameters:

Name Type Description Default
smiles List[str]

List of smiles

required
descriptors_list List[str]

List of desired mordred descriptors

required

Returns:

Type Description
ndarray

np.ndarray: Array holding the mordred moelcular descriptors.

Source code in bofire/utils/cheminformatics.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def smiles2mordred(smiles: List[str], descriptors_list: List[str]) -> np.ndarray:
    """Transforms list of smiles to mordred moelcular descriptors.

    Args:
        smiles (List[str]): List of smiles
        descriptors_list (List[str]): List of desired mordred descriptors

    Returns:
        np.ndarray: Array holding the mordred moelcular descriptors.

    """
    mols = [smiles2mol(smi) for smi in smiles]

    calc = Calculator(descriptors, ignore_3D=True)  # type: ignore
    calc.descriptors = [d for d in calc.descriptors if str(d) in descriptors_list]

    descriptors_df = calc.pandas(mols)
    nan_list = [
        pd.to_numeric(descriptors_df[col], errors="coerce").isnull().values.any()  # type: ignore
        for col in descriptors_df.columns
    ]
    if any(nan_list):
        raise ValueError(
            f"Found NaN values in descriptors {list(descriptors_df.columns[nan_list])}",  # type: ignore
        )

    return descriptors_df.astype(float).values

doe

apply_block_generator(design, gen)

Applies blocking to a design matrix.

Parameters:

Name Type Description Default
design ndarray

The design matrix.

required
gen str

The generator.

required

Returns:

Type Description
List[int]

List of integers which assigns an experiment in the design matrix to a block.

Source code in bofire/utils/doe.py
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
def apply_block_generator(design: np.ndarray, gen: str) -> List[int]:
    """Applies blocking to a design matrix.

    Args:
        design: The design matrix.
        gen: The generator.

    Returns:
        List of integers which assigns an experiment in the design matrix to a block.

    """
    generators = [i.strip().lower() for i in gen.split(";")]

    # Fill in design with two level factorial design
    blocking_design = np.zeros((design.shape[0], len(generators)))

    # Recognize combinations and fill in the rest of matrix H2 with the proper
    # products
    for i, g in enumerate(generators):
        # For lowercase letters
        xx = np.array([ord(c) for c in g]) - 97
        blocking_design[:, i] = np.prod(design[:, xx], axis=1)

    # Create a list to store the block assignments
    blocks = []

    # Iterate over each row in the design matrix
    for row in blocking_design:
        # Find the index of the unique row in the blocking design
        block = np.where((blocking_design == row).all(axis=1))[0][0]
        blocks.append(block)

    return blocks

compute_generator(n_factors, n_generators)

Computes a generator for a given number of factors and generators.

Parameters:

Name Type Description Default
n_factors int

The number of factors.

required
n_generators int

The number of generators.

required

Returns:

Type Description
str

The generator.

Source code in bofire/utils/doe.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
def compute_generator(n_factors: int, n_generators: int) -> str:
    """Computes a generator for a given number of factors and generators.

    Args:
        n_factors: The number of factors.
        n_generators: The number of generators.

    Returns:
        The generator.

    """
    if n_generators == 0:
        return " ".join(list(string.ascii_lowercase[:n_factors]))
    n_base_factors = n_factors - n_generators
    if n_generators == 1:
        if n_base_factors == 1:
            raise ValueError(
                "Design not possible, as main factors are confounded with each other.",
            )
        return " ".join(
            list(string.ascii_lowercase[:n_base_factors])
            + [string.ascii_lowercase[:n_base_factors]],
        )
    n_base_factors = n_factors - n_generators
    if n_base_factors - 1 < 2:
        raise ValueError(
            "Design not possible, as main factors are confounded with each other.",
        )
    generators = [
        "".join(i)
        for i in (
            itertools.combinations(
                string.ascii_lowercase[:n_base_factors],
                n_base_factors - 1,
            )
        )
    ]
    if len(generators) > n_generators:
        generators = generators[:n_generators]
    elif (n_generators - len(generators) == 1) and (n_base_factors > 1):
        generators += [string.ascii_lowercase[:n_base_factors]]
    elif n_generators - len(generators) >= 1:
        raise ValueError(
            "Design not possible, as main factors are confounded with each other.",
        )
    return " ".join(list(string.ascii_lowercase[:n_base_factors]) + generators)

ff2n(n_factors)

Computes the full factorial design for a given number of factors.

Parameters:

Name Type Description Default
n_factors int

The number of factors.

required

Returns:

Type Description
ndarray

The full factorial design.

Source code in bofire/utils/doe.py
72
73
74
75
76
77
78
79
80
81
82
def ff2n(n_factors: int) -> np.ndarray:
    """Computes the full factorial design for a given number of factors.

    Args:
        n_factors: The number of factors.

    Returns:
        The full factorial design.

    """
    return np.array(list(itertools.product([-1, 1], repeat=n_factors)))

fracfact(gen)

Computes the fractional factorial design for a given generator.

Parameters:

Name Type Description Default
gen str

The generator.

required

Returns:

Type Description
ndarray

The fractional factorial design.

Source code in bofire/utils/doe.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def fracfact(gen: str) -> np.ndarray:
    """Computes the fractional factorial design for a given generator.

    Args:
        gen: The generator.

    Returns:
        The fractional factorial design.

    """
    gen = validate_generator(n_factors=gen.count(" ") + 1, generator=gen)

    generators = [item for item in re.split(r"\-|\s|\+", gen) if item]
    lengths = [len(i) for i in generators]

    # Indices of single letters (main factors)
    idx_main = [i for i, item in enumerate(lengths) if item == 1]

    # Indices of letter combinations.
    idx_combi = [i for i, item in enumerate(generators) if item != 1]

    # Check if there are "-" operators in gen
    idx_negative = [
        i for i, item in enumerate(gen.split(" ")) if item[0] == "-"
    ]  # remove empty strings

    # Fill in design with two level factorial design
    H1 = ff2n(len(idx_main))
    H = np.zeros((H1.shape[0], len(lengths)))
    H[:, idx_main] = H1

    # Recognize combinations and fill in the rest of matrix H2 with the proper
    # products
    for k in idx_combi:
        # For lowercase letters
        xx = np.array([ord(c) for c in generators[k]]) - 97

        H[:, k] = np.prod(H1[:, xx], axis=1)

    # Update design if gen includes "-" operator
    if len(idx_negative) > 0:
        H[:, idx_negative] *= -1

    # Return the fractional factorial design
    return H

get_alias_structure(gen, order=4)

Computes the alias structure of the design matrix. Works only for generators with positive signs.

Parameters:

Name Type Description Default
gen str

The generator.

required
order int

The order up to which the alias structure should be calculated. Defaults to 4.

4

Returns:

Type Description
List[str]

The alias structure of the design matrix.

Source code in bofire/utils/doe.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def get_alias_structure(gen: str, order: int = 4) -> List[str]:
    """Computes the alias structure of the design matrix. Works only for generators
    with positive signs.

    Args:
        gen: The generator.
        order: The order up to which the alias structure should be calculated. Defaults to 4.

    Returns:
        The alias structure of the design matrix.

    """
    design = fracfact(gen)

    n_experiments, n_factors = design.shape

    all_names = string.ascii_lowercase + "I"
    factors = range(n_factors)
    all_combinations = itertools.chain.from_iterable(
        itertools.combinations(factors, n) for n in range(1, min(n_factors, order) + 1)
    )
    aliases = {n_experiments * "+": [(26,)]}  # 26 is mapped to I

    for combination in all_combinations:
        # positive sign
        contrast = np.prod(
            design[:, combination],
            axis=1,
        )  # this is the product of the combination
        scontrast = "".join(np.where(contrast == 1, "+", "-").tolist())
        aliases[scontrast] = aliases.get(scontrast, [])
        aliases[scontrast].append(combination)  # type: ignore

    aliases_list = []
    for alias in aliases.values():
        aliases_list.append(
            sorted(alias, key=lambda a: (len(a), a)),
        )  # sort by length and then by the combination
    aliases_list = sorted(
        aliases_list,
        key=lambda list: ([len(a) for a in list], list),
    )  # sort by the length of the alias

    aliases_readable = []

    for alias in aliases_list:
        aliases_readable.append(
            " = ".join(["".join([all_names[f] for f in a]) for a in alias]),
        )

    return aliases_readable

get_block_generator(n_factors, n_generators, n_repetitions, n_blocks)

Gets the block generator for a given number of factors, generators, repetitions, and blocks.

Should be only used if blocking cannot be reached by repetitions only.

Parameters:

Name Type Description Default
n_factors int

number of factors

required
n_generators int

number of generators/reducing factors

required
n_repetitions int

number of repetitions

required
n_blocks int

number of blocks that should be realized

required

Raises:

Type Description
ValueError

If blocking can be reached by repetitions only.

Returns:

Type Description
str

The blocking generator.

Source code in bofire/utils/doe.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
def get_block_generator(
    n_factors: int, n_generators: int, n_repetitions: int, n_blocks: int
) -> str:
    """Gets the block generator for a given number of factors, generators, repetitions, and blocks.

    Should be only used if blocking cannot be reached by repetitions only.

    Args:
        n_factors: number of factors
        n_generators: number of generators/reducing factors
        n_repetitions: number of repetitions
        n_blocks: number of blocks that should be realized

    Raises:
        ValueError: If blocking can be reached by repetitions only.

    Returns:
        The blocking generator.
    """
    if n_repetitions % n_blocks == 0:
        raise ValueError("Blocking can be reached by repetitions only.")

    possible_blocks = sorted(
        set(
            default_blocking_generators.loc[
                default_blocking_generators.n_factors == n_factors
            ].n_blocks.to_list()
        )
    )

    if n_blocks in possible_blocks:
        return default_blocking_generators.loc[
            (default_blocking_generators.n_factors == n_factors)
            & (default_blocking_generators.n_blocks == n_blocks)
        ].block_generator.to_list()[0]

    for b in possible_blocks:
        if b * n_repetitions % n_blocks == 0:
            return default_blocking_generators.loc[
                (default_blocking_generators.n_factors == n_factors)
                & (default_blocking_generators.n_blocks == b)
            ].block_generator.to_list()[0]

    raise ValueError("No block generator available for the requested combination.")

get_confounding_matrix(inputs, design, powers=None, interactions=None)

Analyzes the confounding of a design and returns the confounding matrix.

Only takes continuous features into account.

Parameters:

Name Type Description Default
inputs Inputs

Input features.

required
design DataFrame

Design matrix.

required
powers List[int]

List of powers of the individual factors/features that should be considered. Integers has to be larger than 1. Defaults to [].

None
interactions List[int]

List with interaction levels to be considered. Integers has to be larger than 1. Defaults to [2].

None

Returns:

Name Type Description
_type_

description

Source code in bofire/utils/doe.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def get_confounding_matrix(
    inputs: Inputs,
    design: pd.DataFrame,
    powers: Optional[List[int]] = None,
    interactions: Optional[List[int]] = None,
):
    """Analyzes the confounding of a design and returns the confounding matrix.

    Only takes continuous features into account.

    Args:
        inputs (Inputs): Input features.
        design (pd.DataFrame): Design matrix.
        powers (List[int], optional): List of powers of the individual factors/features that should be considered.
            Integers has to be larger than 1. Defaults to [].
        interactions (List[int], optional): List with interaction levels to be considered.
            Integers has to be larger than 1. Defaults to [2].

    Returns:
        _type_: _description_

    """
    from sklearn.preprocessing import MinMaxScaler

    if len(inputs.get(CategoricalInput)) > 0:
        warnings.warn("Categorical input features will be ignored.")

    keys = inputs.get_keys(ContinuousInput)
    scaler = MinMaxScaler(feature_range=(-1, 1))
    scaled_design = pd.DataFrame(
        data=scaler.fit_transform(design[keys]),
        columns=keys,
    )

    # add powers
    if powers is not None:
        for p in powers:
            assert p > 1, "Power has to be at least of degree two."
            for key in keys:
                scaled_design[f"{key}**{p}"] = scaled_design[key] ** p

    # add interactions
    if interactions is None:
        interactions = [2]

    for i in interactions:
        assert i > 1, "Interaction has to be at least of degree two."
        assert i < len(keys) + 1, f"Interaction has to be smaller than {len(keys) + 1}."
        for combi in itertools.combinations(keys, i):
            scaled_design[":".join(combi)] = scaled_design[list(combi)].prod(axis=1)

    return scaled_design.corr()

get_default_generator(n_factors, n_generators)

Returns the default generator for a given number of factors and generators.

In case the combination is not available, the function will raise an error.

Parameters:

Name Type Description Default
n_factors int

The number of factors.

required
n_generators int

The number of generators.

required

Returns:

Type Description
str

The generator.

Source code in bofire/utils/doe.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def get_default_generator(n_factors: int, n_generators: int) -> str:
    """Returns the default generator for a given number of factors and generators.

    In case the combination is not available, the function will raise an error.

    Args:
        n_factors: The number of factors.
        n_generators: The number of generators.

    Returns:
        The generator.

    """
    if n_generators == 0:
        return " ".join(list(string.ascii_lowercase[:n_factors]))
    df_generators = default_fracfac_generators
    n_base_factors = n_factors - n_generators
    if df_generators.loc[
        (df_generators.n_factors == n_factors)
        & (df_generators.n_generators == n_generators)
    ].empty:
        raise ValueError("No generator available for the requested combination.")
    generators = (
        df_generators.loc[
            (df_generators.n_factors == n_factors)
            & (df_generators.n_generators == n_generators),
            "generator",
        ]
        .to_list()[0]
        .split(";")
    )
    assert len(generators) == n_generators, "Number of generators does not match."
    generators = [generator.split("=")[1].strip().lower() for generator in generators]
    return " ".join(list(string.ascii_lowercase[:n_base_factors]) + generators)

get_generator(n_factors, n_generators)

Returns a generator for a given number of factors and generators.

If the requested combination is available in the default generators, it will return this one. Otherwise, it will compute a new one using get_bofire_generator.

Parameters:

Name Type Description Default
n_factors int

The number of factors.

required
n_generators int

The number of generators.

required

Returns:

Type Description
str

The generator.

Source code in bofire/utils/doe.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def get_generator(n_factors: int, n_generators: int) -> str:
    """Returns a generator for a given number of factors and generators.

    If the requested combination is available in the default generators, it will return
    this one. Otherwise, it will compute a new one using `get_bofire_generator`.

    Args:
        n_factors: The number of factors.
        n_generators: The number of generators.

    Returns:
        The generator.

    """
    try:
        return get_default_generator(n_factors, n_generators)
    except ValueError:
        return compute_generator(n_factors, n_generators)

get_n_blocks(n_factors, n_generators, n_repetitions)

Computes the number of possible blocks for a given number of factors, generators, and repetitions.

Parameters:

Name Type Description Default
n_factors int

number of factors

required
n_generators int

number of generators/reducing factors

required
n_repetitions int

number of repetitions

required

Returns:

Type Description
List[int]

List[int]: List of possible number of blocks.

Source code in bofire/utils/doe.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def get_n_blocks(n_factors: int, n_generators: int, n_repetitions: int) -> List[int]:
    """Computes the number of possible blocks for a given number of factors, generators, and repetitions.

    Args:
        n_factors: number of factors
        n_generators: number of generators/reducing factors
        n_repetitions: number of repetitions

    Returns:
        List[int]: List of possible number of blocks.
    """
    n_blocks = []
    # check if no repetitions are planned
    if n_repetitions == 1:
        return sorted(
            set(
                default_blocking_generators.loc[
                    default_blocking_generators.n_factors == n_factors
                ].n_blocks.to_list()
            )
        )
    else:
        # check if blocking can be reached just by repetitions
        for i in range(2, n_repetitions + 1):
            if n_repetitions % i == 0:
                n_blocks.append(i)

        # check if blocking can be reached by a combination of explicit blocks and repetitions
        n_blocks += (
            default_blocking_generators.loc[
                default_blocking_generators.n_factors == n_factors, "n_blocks"
            ].to_numpy()
            * (n_repetitions)
        ).tolist() + default_blocking_generators.loc[
            default_blocking_generators.n_factors == n_factors, "n_blocks"
        ].to_list()

        return sorted(set(n_blocks))

validate_generator(n_factors, generator)

Validates the generator and thows an error if it is not valid.

Source code in bofire/utils/doe.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def validate_generator(n_factors: int, generator: str) -> str:
    """Validates the generator and thows an error if it is not valid."""
    if len(generator.split(" ")) != n_factors:
        raise ValueError("Generator does not match the number of factors.")
    # clean it and transform it into a list
    generators = [item for item in re.split(r"\-|\s|\+", generator) if item]
    lengths = [len(i) for i in generators]

    # Indices of single letters (main factors)
    idx_main = [i for i, item in enumerate(lengths) if item == 1]

    if len(idx_main) == 0:
        raise ValueError("At least one unconfounded main factor is needed.")

    # Check that single letters (main factors) are unique
    if len(idx_main) != len({generators[i] for i in idx_main}):
        raise ValueError("Main factors are confounded with each other.")

    # Check that single letters (main factors) follow the alphabet
    if (
        "".join(sorted([generators[i] for i in idx_main]))
        != string.ascii_lowercase[: len(idx_main)]
    ):
        raise ValueError(
            f"Use the letters `{' '.join(string.ascii_lowercase[: len(idx_main)])}` for the main factors.",
        )

    # Indices of letter combinations.
    idx_combi = [i for i, item in enumerate(generators) if item != 1]

    # check that main factors come before combinations
    if min(idx_combi) > max(idx_main):
        raise ValueError("Main factors have to come before combinations.")

    # Check that letter combinations are unique
    if len(idx_combi) != len({generators[i] for i in idx_combi}):
        raise ValueError("Generators are not unique.")

    # Check that only letters are used in the combinations that are also single letters (main factors)
    if not all(
        set(item).issubset({generators[i] for i in idx_main})
        for item in [generators[i] for i in idx_combi]
    ):
        raise ValueError("Generators are not valid.")

    return generator

multiobjective

compute_hypervolume(domain, optimal_experiments, ref_point)

Method to compute the hypervolume for a given domain and pareto optimal experiments.

Parameters:

Name Type Description Default
domain Domain

Domain for which the hypervolume should be computed.

required
optimal_experiments DataFrame

Pareto optimal experiments for which the hypervolume should be computed.

required
ref_point dict

Unmasked reference point for the hypervolume computation. Masking is happening inside the method.

required

Returns:

Type Description
float

Hypervolume for the given domain and pareto optimal experiments.

Source code in bofire/utils/multiobjective.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def compute_hypervolume(
    domain: Domain,
    optimal_experiments: pd.DataFrame,
    ref_point: dict,
) -> float:
    """Method to compute the hypervolume for a given domain and pareto optimal experiments.

    Args:
        domain: Domain for which the hypervolume should be computed.
        optimal_experiments: Pareto optimal experiments for which the hypervolume
            should be computed.
        ref_point: Unmasked reference point for the hypervolume computation.
            Masking is happening inside the method.

    Returns:
        Hypervolume for the given domain and pareto optimal experiments.
    """
    outputs = domain.outputs.get_by_objective(
        includes=[MaximizeObjective, MinimizeObjective, CloseToTargetObjective],
    )
    objective = get_multiobjective_objective(
        outputs=outputs,
        experiments=optimal_experiments,
    )
    ref_point_mask = torch.from_numpy(get_ref_point_mask(domain)).to(**tkwargs)
    hv = Hypervolume(
        ref_point=torch.tensor(
            [
                ref_point[feat]
                for feat in domain.outputs.get_keys_by_objective(
                    includes=[
                        MaximizeObjective,
                        MinimizeObjective,
                        CloseToTargetObjective,
                    ],
                )
            ],
        ).to(**tkwargs)
        * ref_point_mask,
    )

    return hv.compute(
        objective(
            torch.from_numpy(
                optimal_experiments[
                    domain.outputs.get_keys_by_objective(
                        includes=[
                            MaximizeObjective,
                            MinimizeObjective,
                            CloseToTargetObjective,
                        ],
                    )
                ].values,  # type: ignore
            ).to(**tkwargs),
        ),
    )

get_pareto_front(domain, experiments, output_feature_keys=None)

Method to compute the pareto optimal experiments for a given domain and experiments.

Parameters:

Name Type Description Default
domain Domain

Domain for which the pareto front should be computed.

required
experiments DataFrame

Experiments for which the pareto front should be computed.

required
output_feature_keys Optional[list]

Key of output feature that should be considered in the pareto front. If None is provided, all keys belonging to output features with one of the following objectives will be considered: MaximizeObjective, MinimizeObjective, CloseToTargetObjective. Defaults to None.

None

Returns:

Type Description
DataFrame

DataFrame with pareto optimal experiments.

Source code in bofire/utils/multiobjective.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def get_pareto_front(
    domain: Domain,
    experiments: pd.DataFrame,
    output_feature_keys: Optional[list] = None,
) -> pd.DataFrame:
    """Method to compute the pareto optimal experiments for a given domain and
    experiments.

    Args:
        domain: Domain for which the pareto front should be computed.
        experiments: Experiments for which the pareto front should be computed.
        output_feature_keys: Key of output feature that should be considered
            in the pareto front. If `None` is provided, all keys
            belonging to output features with one of the following objectives will
            be considered: `MaximizeObjective`, `MinimizeObjective`,
            `CloseToTargetObjective`. Defaults to None.

    Returns:
        DataFrame with pareto optimal experiments.

    """
    if output_feature_keys is None:
        outputs = domain.outputs.get_by_objective(
            includes=[MaximizeObjective, MinimizeObjective, CloseToTargetObjective],
        )
    else:
        outputs = domain.outputs.get_by_keys(output_feature_keys)
    assert len(outputs) >= 2, "At least two output features have to be provided."
    output_feature_keys = [f.key for f in outputs]
    df = domain.outputs.preprocess_experiments_all_valid_outputs(
        experiments,
        output_feature_keys,
    )
    objective = get_multiobjective_objective(outputs=outputs, experiments=experiments)
    pareto_mask = np.array(
        is_non_dominated(
            objective(
                torch.from_numpy(df[output_feature_keys].values).to(**tkwargs),
                None,
            ),
        ),
    )
    return df.loc[pareto_mask]

get_ref_point_mask(domain, output_feature_keys=None)

Method to get a mask for the reference points taking into account if we want to maximize or minimize an objective. In case it is maximize the value in the mask is 1, in case we want to minimize it is -1.

Parameters:

Name Type Description Default
domain Domain

Domain for which the mask should be generated.

required
output_feature_keys Optional[list]

Name of output feature keys that should be considered in the mask. If None is provided, all keys belonging to output features with one of the following objectives will be considered: MaximizeObjective, MinimizeObjective, CloseToTargetObjective. Defaults to None.

None

Returns:

Type Description
ndarray

Array of ones for maximization and array of negative ones for minimization.

Source code in bofire/utils/multiobjective.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def get_ref_point_mask(
    domain: Domain,
    output_feature_keys: Optional[list] = None,
) -> np.ndarray:
    """Method to get a mask for the reference points taking into account if we
    want to maximize or minimize an objective. In case it is maximize the value
    in the mask is 1, in case we want to minimize it is -1.

    Args:
        domain: Domain for which the mask should be generated.
        output_feature_keys: Name of output feature keys
            that should be considered in the mask. If `None` is provided, all keys
            belonging to output features with one of the following objectives will
            be considered: `MaximizeObjective`, `MinimizeObjective`,
            `CloseToTargetObjective`. Defaults to None.

    Returns:
        Array of ones for maximization and array of negative ones for
            minimization.

    """
    if output_feature_keys is None:
        output_feature_keys = domain.outputs.get_keys_by_objective(
            includes=[MaximizeObjective, MinimizeObjective, CloseToTargetObjective],
        )
    if len(output_feature_keys) < 2:
        raise ValueError("At least two output features have to be provided.")
    mask = []
    for key in output_feature_keys:
        feat = domain.outputs.get_by_key(key)
        if isinstance(feat.objective, MaximizeObjective):
            mask.append(1.0)
        elif isinstance(feat.objective, MinimizeObjective) or isinstance(
            feat.objective,
            CloseToTargetObjective,
        ):
            mask.append(-1.0)
        else:
            raise ValueError(
                "Only `MaximizeObjective` and `MinimizeObjective` supported",
            )
    return np.array(mask)

infer_ref_point(domain, experiments, return_masked=False, reference_point=None)

Method to infer the reference point for a given domain and experiments.

Parameters:

Name Type Description Default
domain Domain

Domain for which the reference point should be inferred.

required
experiments DataFrame

Experiments for which the reference point should be inferred.

required
return_masked bool

If True, the masked reference point is returned. If False, the unmasked reference point is returned. Defaults to False.

False
reference_point Optional[ExplicitReferencePoint]

Reference point to be used. If None is provided, the reference value is inferred by the worst values seen so far for every objective. Defaults to None.

None
Source code in bofire/utils/multiobjective.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def infer_ref_point(
    domain: Domain,
    experiments: pd.DataFrame,
    return_masked: bool = False,
    reference_point: Optional[ExplicitReferencePoint] = None,
) -> Dict[str, float]:
    """Method to infer the reference point for a given domain and experiments.

    Args:
        domain: Domain for which the reference point should be inferred.
        experiments: Experiments for which the reference point should be inferred.
        return_masked: If True, the masked reference point is returned. If False,
            the unmasked reference point is returned. Defaults to False.
        reference_point: Reference point to be used. If None is provided, the
            reference value is inferred by the worst values seen so far for
            every objective. Defaults to None.
    """
    outputs = domain.outputs.get_by_objective(
        includes=[MaximizeObjective, MinimizeObjective, CloseToTargetObjective],
    )
    keys = outputs.get_keys()

    if reference_point is None:
        reference_point = ExplicitReferencePoint(
            values={
                key: AbsoluteMovingReferenceValue(orient_at_best=False, offset=0.0)
                for key in keys
            }
        )

    objective = get_multiobjective_objective(outputs=outputs, experiments=experiments)

    df = domain.outputs.preprocess_experiments_all_valid_outputs(
        experiments,
        output_feature_keys=keys,
    )

    worst_values_array = (
        objective(torch.from_numpy(df[keys].values).to(**tkwargs), None)
        .numpy()
        .min(axis=0)
    )

    best_values_array = (
        objective(torch.from_numpy(df[keys].values).to(**tkwargs), None)
        .numpy()
        .max(axis=0)
    )
    # In the ref_point_array want masked values, which means that
    # maximization is assumed for everything, this is because we use
    # botorch objective for getting the best and worst values
    # that are passed to the reference values. Botorch always assumes
    # maximization.
    # In case of FixedReferenceValue, the unmasked values are stored in the data model,
    # this means the need to mask them to account for the botorch convention.
    # In case of FixedReferenceValue, we multiply with -1 in for `MinimizeObjective`
    # and `CloseToTargetObjective`.
    ref_point_array = np.array(
        [
            -reference_point.values[key].get_reference_value(
                best=best_values_array[i], worst=worst_values_array[i]
            )
            if isinstance(reference_point.values[key], FixedReferenceValue)
            and isinstance(
                outputs[i].objective, (MinimizeObjective, CloseToTargetObjective)
            )
            else reference_point.values[key].get_reference_value(
                best=best_values_array[i], worst=worst_values_array[i]
            )
            for i, key in enumerate(keys)
        ]
    )

    mask = get_ref_point_mask(domain)

    # here we unmask again by dividing by the mask
    if return_masked is False:
        ref_point_array /= mask
    return {feat: ref_point_array[i] for i, feat in enumerate(keys)}

naming_conventions

get_column_names(outputs)

Specifies column names for given Outputs type.

Parameters:

Name Type Description Default
outputs Outputs

The Outputs object containing the individual outputs.

required

Returns:

Type Description
Tuple[List[str], List[str]]

Tuple[List[str], List[str]]: A tuple containing the prediction column names and the standard deviation column names

Source code in bofire/utils/naming_conventions.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def get_column_names(outputs: Outputs) -> Tuple[List[str], List[str]]:
    """Specifies column names for given Outputs type.

    Args:
        outputs (Outputs): The Outputs object containing the individual outputs.

    Returns:
        Tuple[List[str], List[str]]: A tuple containing the prediction column names and the standard deviation column names

    """
    pred_cols, sd_cols = [], []
    for featkey in outputs.get_keys(CategoricalOutput):
        pred_cols = pred_cols + [
            f"{featkey}_{cat}_prob"
            for cat in outputs.get_by_key(featkey).categories  # type: ignore
        ]
        sd_cols = sd_cols + [
            f"{featkey}_{cat}_sd"
            for cat in outputs.get_by_key(featkey).categories  # type: ignore
        ]
    for featkey in outputs.get_keys(ContinuousOutput):
        pred_cols = pred_cols + [f"{featkey}_pred"]
        sd_cols = sd_cols + [f"{featkey}_sd"]

    return pred_cols, sd_cols

postprocess_categorical_predictions(predictions, outputs)

Postprocess categorical predictions by finding the maximum probability location

Parameters:

Name Type Description Default
predictions DataFrame

The dataframe containing the predictions.

required
outputs Outputs

The Outputs object containing the individual outputs.

required

Returns:

Name Type Description
predictions DataFrame

The (potentially modified) original dataframe with categorical predictions added

Source code in bofire/utils/naming_conventions.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def postprocess_categorical_predictions(
    predictions: pd.DataFrame,
    outputs: Outputs,
) -> pd.DataFrame:
    """Postprocess categorical predictions by finding the maximum probability location

    Args:
        predictions (pd.DataFrame): The dataframe containing the predictions.
        outputs (Outputs): The Outputs object containing the individual outputs.

    Returns:
        predictions (pd.DataFrame): The (potentially modified) original dataframe with categorical predictions added

    """
    for feat in outputs.get():
        if isinstance(feat, CategoricalOutput):
            predictions.insert(
                loc=0,
                column=f"{feat.key}_pred",
                value=predictions.filter(regex=f"{feat.key}(.*)_prob")
                .idxmax(1)
                .str.replace(f"{feat.key}_", "")
                .str.replace("_prob", "")
                .values,  # type: ignore
            )
            predictions.insert(
                loc=1,
                column=f"{feat.key}_sd",
                value=0.0,
            )
    return predictions

reduce

AffineTransform

Class to switch back and forth from the reduced to the original domain.

Source code in bofire/utils/reduce.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class AffineTransform:
    """Class to switch back and forth from the reduced to the original domain."""

    def __init__(self, equalities: List[Tuple[str, List[str], List[float]]]):
        """Initializes a `AffineTransformation` object.

        Args:
            equalities (List[Tuple[str,List[str],List[float]]]): List of equalities. Every equality
                is defined as a tuple, in which the first entry is the key of the reduced feature, the second
                one is a list of feature keys that can be used to compute the feature and the third list of floats
                are the corresponding coefficients.

        """
        self.equalities = equalities

    def augment_data(self, data: pd.DataFrame) -> pd.DataFrame:
        """Restore the eliminated features in a dataframe

        Args:
            data (pd.DataFrame): Dataframe that should be restored.

        Returns:
            pd.DataFrame: Restored dataframe

        """
        if len(self.equalities) == 0:
            return data
        data = data.copy()
        for name_lhs, names_rhs, coeffs in self.equalities:
            data[name_lhs] = coeffs[-1]
            for i, name in enumerate(names_rhs):
                data[name_lhs] += coeffs[i] * data[name]
        return data

    def drop_data(self, data: pd.DataFrame) -> pd.DataFrame:
        """Drop eliminated features from a dataframe.

        Args:
            data (pd.DataFrame): Dataframe with features to be dropped.

        Returns:
            pd.DataFrame: Reduced dataframe.

        """
        if len(self.equalities) == 0:
            return data
        drop = []
        for name_lhs, _, _ in self.equalities:
            if name_lhs in data.columns:
                drop.append(name_lhs)
        return data.drop(columns=drop)

__init__(equalities)

Initializes a AffineTransformation object.

Parameters:

Name Type Description Default
equalities List[Tuple[str, List[str], List[float]]]

List of equalities. Every equality is defined as a tuple, in which the first entry is the key of the reduced feature, the second one is a list of feature keys that can be used to compute the feature and the third list of floats are the corresponding coefficients.

required
Source code in bofire/utils/reduce.py
25
26
27
28
29
30
31
32
33
34
35
def __init__(self, equalities: List[Tuple[str, List[str], List[float]]]):
    """Initializes a `AffineTransformation` object.

    Args:
        equalities (List[Tuple[str,List[str],List[float]]]): List of equalities. Every equality
            is defined as a tuple, in which the first entry is the key of the reduced feature, the second
            one is a list of feature keys that can be used to compute the feature and the third list of floats
            are the corresponding coefficients.

    """
    self.equalities = equalities

augment_data(data)

Restore the eliminated features in a dataframe

Parameters:

Name Type Description Default
data DataFrame

Dataframe that should be restored.

required

Returns:

Type Description
DataFrame

pd.DataFrame: Restored dataframe

Source code in bofire/utils/reduce.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def augment_data(self, data: pd.DataFrame) -> pd.DataFrame:
    """Restore the eliminated features in a dataframe

    Args:
        data (pd.DataFrame): Dataframe that should be restored.

    Returns:
        pd.DataFrame: Restored dataframe

    """
    if len(self.equalities) == 0:
        return data
    data = data.copy()
    for name_lhs, names_rhs, coeffs in self.equalities:
        data[name_lhs] = coeffs[-1]
        for i, name in enumerate(names_rhs):
            data[name_lhs] += coeffs[i] * data[name]
    return data

drop_data(data)

Drop eliminated features from a dataframe.

Parameters:

Name Type Description Default
data DataFrame

Dataframe with features to be dropped.

required

Returns:

Type Description
DataFrame

pd.DataFrame: Reduced dataframe.

Source code in bofire/utils/reduce.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def drop_data(self, data: pd.DataFrame) -> pd.DataFrame:
    """Drop eliminated features from a dataframe.

    Args:
        data (pd.DataFrame): Dataframe with features to be dropped.

    Returns:
        pd.DataFrame: Reduced dataframe.

    """
    if len(self.equalities) == 0:
        return data
    drop = []
    for name_lhs, _, _ in self.equalities:
        if name_lhs in data.columns:
            drop.append(name_lhs)
    return data.drop(columns=drop)

adjust_boundary(feature, coef, rhs)

Adjusts the boundaries of a feature.

Parameters:

Name Type Description Default
feature ContinuousInput

Feature to be adjusted.

required
coef float

Coefficient.

required
rhs float

Right-hand-side of the constraint.

required
Source code in bofire/utils/reduce.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
def adjust_boundary(feature: ContinuousInput, coef: float, rhs: float):
    """Adjusts the boundaries of a feature.

    Args:
        feature (ContinuousInput): Feature to be adjusted.
        coef (float): Coefficient.
        rhs (float): Right-hand-side of the constraint.

    """
    boundary = rhs / coef
    if coef > 0:
        if boundary > feature.lower_bound:
            feature.bounds = [boundary, feature.upper_bound]
    elif boundary < feature.upper_bound:
        feature.bounds = [feature.lower_bound, boundary]

check_domain_for_reduction(domain)

Check if the reduction can be applied or if a trivial case is present.

Parameters:

Name Type Description Default
domain Domain

Domain to be checked.

required

Returns:

Name Type Description
bool bool

True if reducable, else False.

Source code in bofire/utils/reduce.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def check_domain_for_reduction(domain: Domain) -> bool:
    """Check if the reduction can be applied or if a trivial case is present.

    Args:
        domain (Domain): Domain to be checked.

    Returns:
        bool: True if reducable, else False.

    """
    # are there any constraints?
    if len(domain.constraints) == 0:
        return False

    # are there any linear equality constraints?
    linear_equalities = domain.constraints.get(LinearEqualityConstraint)
    if len(linear_equalities) == 0:
        return False

    # are there no NChooseKConstraint constraints?
    if len(domain.constraints.get([NChooseKConstraint])) > 0:
        return False

    # are there continuous inputs
    continuous_inputs = domain.inputs.get(ContinuousInput)
    if len(continuous_inputs) == 0:
        return False

    # check that equality constraints only contain continuous inputs
    for c in linear_equalities:
        assert isinstance(c, LinearConstraint)
        for feat in c.features:
            if feat not in domain.inputs.get_keys(ContinuousInput):
                return False
    return True

check_existence_of_solution(A_aug)

Given an augmented coefficient matrix this function determines the existence (and uniqueness) of solution using the rank theorem.

Source code in bofire/utils/reduce.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def check_existence_of_solution(A_aug):
    """Given an augmented coefficient matrix this function determines the existence (and uniqueness) of solution using the rank theorem."""
    A = A_aug[:, :-1]
    b = A_aug[:, -1]
    len_inputs = np.shape(A)[1]

    # catch special cases
    rk_A_aug = np.linalg.matrix_rank(A_aug)
    rk_A = np.linalg.matrix_rank(A)

    if rk_A == rk_A_aug:
        if rk_A < len_inputs:
            return  # all good
        x = np.linalg.solve(A, b)
        raise Exception(
            f"There is a unique solution x for the linear equality constraints: x={x}",
        )
    if rk_A < rk_A_aug:
        raise Exception(
            "There is no solution fulfilling the linear equality constraints.",
        )

reduce_domain(domain)

Reduce a domain with linear equality constraints to a subdomain where linear equality constraints are eliminated.

Parameters:

Name Type Description Default
domain Domain

Domain to be reduced.

required

Returns:

Type Description
Tuple[Domain, AffineTransform]

Tuple[Domain, AffineTransform]: reduced domain and the according transformation to switch between the reduced and original domain.

Source code in bofire/utils/reduce.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def reduce_domain(domain: Domain) -> Tuple[Domain, AffineTransform]:
    """Reduce a domain with linear equality constraints to a subdomain where linear equality constraints are eliminated.

    Args:
        domain (Domain): Domain to be reduced.

    Returns:
        Tuple[Domain, AffineTransform]: reduced domain and the according transformation to switch between the
            reduced and original domain.

    """
    # check if the domain can be reduced
    if not check_domain_for_reduction(domain):
        return domain, AffineTransform([])

    # find linear equality constraints
    linear_equalities = domain.constraints.get(LinearEqualityConstraint)
    other_constraints = domain.constraints.get(
        Constraint,
        excludes=[LinearEqualityConstraint],
    )

    # only consider continuous inputs
    continuous_inputs = [
        cast(ContinuousInput, f) for f in domain.inputs.get(ContinuousInput)
    ]
    other_inputs = domain.inputs.get(Input, excludes=[ContinuousInput])

    # assemble Matrix A from equality constraints
    N = len(linear_equalities)
    M = len(continuous_inputs) + 1
    names = np.concatenate(([feat.key for feat in continuous_inputs], ["rhs"]))

    A_aug = pd.DataFrame(data=np.zeros(shape=(N, M)), columns=names)

    for i in range(len(linear_equalities)):
        c = linear_equalities[i]
        assert isinstance(c, LinearEqualityConstraint)
        A_aug.loc[i, c.features] = c.coefficients
        A_aug.loc[i, "rhs"] = c.rhs
    A_aug = A_aug.values

    # catch special cases
    check_existence_of_solution(A_aug)

    # bring A_aug to reduced row-echelon form
    A_aug_rref, pivots = rref(A_aug)
    pivots = np.array(pivots)
    A_aug_rref = np.array(A_aug_rref).astype(np.float64)

    # formulate box bounds as linear inequality constraints in matrix form
    B = np.zeros(shape=(2 * (M - 1), M))
    B[: M - 1, : M - 1] = np.eye(M - 1)
    B[M - 1 :, : M - 1] = -np.eye(M - 1)

    B[: M - 1, -1] = np.array([feat.upper_bound for feat in continuous_inputs])
    B[M - 1 :, -1] = -1.0 * np.array([feat.lower_bound for feat in continuous_inputs])

    # eliminate columns with pivot element
    for i in range(len(pivots)):
        p = pivots[i]
        B[p, :] -= A_aug_rref[i, :]
        B[p + M - 1, :] += A_aug_rref[i, :]

    # build up reduced domain
    _domain = Domain.model_construct(
        # _fields_set = {"inputs", "outputs", "constraints"}
        inputs=deepcopy(other_inputs),
        outputs=deepcopy(domain.outputs),
        constraints=deepcopy(other_constraints),
    )
    new_inputs = [
        deepcopy(feat) for i, feat in enumerate(continuous_inputs) if i not in pivots
    ]
    all_inputs = _domain.inputs + new_inputs
    assert isinstance(all_inputs, Inputs)
    _domain.inputs.features = all_inputs.features

    constraints: List[AnyConstraint] = []
    for i in pivots:
        # reduce equation system of upper bounds
        ind = np.where(B[i, :-1] != 0)[0]
        if len(ind) > 0 and B[i, -1] < np.inf:
            if len(list(names[ind])) > 1:
                c = LinearInequalityConstraint.from_greater_equal(
                    features=list(names[ind]),
                    coefficients=(-1.0 * B[i, ind]).tolist(),
                    rhs=B[i, -1] * -1.0,
                )
                constraints.append(c)
            else:
                key = names[ind][0]
                feat = cast(ContinuousInput, _domain.inputs.get_by_key(key))
                adjust_boundary(feat, (-1.0 * B[i, ind])[0], B[i, -1] * -1.0)
        elif B[i, -1] < -1e-16:
            raise Exception("There is no solution that fulfills the constraints.")

        # reduce equation system of lower bounds
        ind = np.where(B[i + M - 1, :-1] != 0)[0]
        if len(ind) > 0 and B[i + M - 1, -1] < np.inf:
            if len(list(names[ind])) > 1:
                c = LinearInequalityConstraint.from_greater_equal(
                    features=list(names[ind]),
                    coefficients=(-1.0 * B[i + M - 1, ind]).tolist(),
                    rhs=B[i + M - 1, -1] * -1.0,
                )
                constraints.append(c)
            else:
                key = names[ind][0]
                feat = cast(ContinuousInput, _domain.inputs.get_by_key(key))
                adjust_boundary(
                    feat,
                    (-1.0 * B[i + M - 1, ind])[0],
                    B[i + M - 1, -1] * -1.0,
                )
        elif B[i + M - 1, -1] < -1e-16:
            raise Exception("There is no solution that fulfills the constraints.")

    if len(constraints) > 0:
        _domain.constraints.constraints = _domain.constraints.constraints + constraints  # type: ignore

    # assemble equalities
    _equalities = []
    for i in range(len(pivots)):
        name_lhs = names[pivots[i]]
        names_rhs = []
        coeffs = []

        for j in range(len(names) - 1):
            if A_aug_rref[i, j] != 0 and j != pivots[i]:
                coeffs.append(-A_aug_rref[i, j])
                names_rhs.append(names[j])

        coeffs.append(A_aug_rref[i, -1])

        _equalities.append((name_lhs, names_rhs, coeffs))

    trafo = AffineTransform(_equalities)
    # remove remaining dependencies of eliminated inputs from the problem
    _domain = remove_eliminated_inputs(_domain, trafo)
    return _domain, trafo

remove_eliminated_inputs(domain, transform)

Eliminates remaining occurrences of eliminated inputs in linear constraints.

Parameters:

Name Type Description Default
domain Domain

Domain in which the linear constraints should be purged.

required
transform AffineTransform

Affine transformation object that defines the obsolete features.

required

Raises:

Type Description
ValueError

If feature occurs in a constraint different from a linear one.

Returns:

Name Type Description
Domain Domain

Purged domain.

Source code in bofire/utils/reduce.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
def remove_eliminated_inputs(domain: Domain, transform: AffineTransform) -> Domain:
    """Eliminates remaining occurrences of eliminated inputs in linear constraints.

    Args:
        domain (Domain): Domain in which the linear constraints should be purged.
        transform (AffineTransform): Affine transformation object that defines the obsolete features.

    Raises:
        ValueError: If feature occurs in a constraint different from a linear one.

    Returns:
        Domain: Purged domain.

    """
    inputs_names = domain.inputs.get_keys()
    M = len(inputs_names)

    # write the equalities for the backtransformation into one matrix
    inputs_dict = {inputs_names[i]: i for i in range(M)}

    # build up dict from domain.equalities e.g. {"xi1": [coeff(xj1), ..., coeff(xjn)], ... "xik":...}
    coeffs_dict = {}
    for e in transform.equalities:
        coeffs = np.zeros(M + 1)
        for j, name in enumerate(e[1]):
            coeffs[inputs_dict[name]] = e[2][j]
        coeffs[-1] = e[2][-1]
        coeffs_dict[e[0]] = coeffs

    constraints = []
    for c in domain.constraints.get():
        # Nonlinear constraints not supported
        if not isinstance(c, LinearConstraint):
            raise ValueError(
                "Elimination of variables is only supported for LinearEquality and LinearInequality constraints.",
            )

        # no changes, if the constraint does not contain eliminated inputs
        if all(name in inputs_names for name in c.features):
            constraints.append(c)

        # remove inputs from the constraint that were eliminated from the inputs before
        else:
            totally_removed = False
            _features = np.array(inputs_names)
            _rhs = c.rhs

            # create new lhs and rhs from the old one and knowledge from problem._equalities
            _coefficients = np.zeros(M)
            for j, name in enumerate(c.features):
                if name in inputs_names:
                    _coefficients[inputs_dict[name]] += c.coefficients[j]
                else:
                    _coefficients += c.coefficients[j] * coeffs_dict[name][:-1]
                    _rhs -= c.coefficients[j] * coeffs_dict[name][-1]

            _features = _features[np.abs(_coefficients) > 1e-16]
            _coefficients = _coefficients[np.abs(_coefficients) > 1e-16]
            _c = None
            if isinstance(c, LinearEqualityConstraint):
                if len(_features) > 1:
                    _c = LinearEqualityConstraint(
                        features=_features.tolist(),
                        coefficients=_coefficients.tolist(),
                        rhs=_rhs,
                    )
                elif len(_features) == 0:
                    totally_removed = True
                else:
                    feat: ContinuousInput = ContinuousInput(
                        **domain.inputs.get_by_key(_features[0]).model_dump(),
                    )
                    feat.bounds = [_coefficients[0], _coefficients[0]]
                    totally_removed = True
            elif len(_features) > 1:
                _c = LinearInequalityConstraint(
                    features=_features.tolist(),
                    coefficients=_coefficients.tolist(),
                    rhs=_rhs,
                )
            elif len(_features) == 0:
                totally_removed = True
            else:
                feat = cast(ContinuousInput, domain.inputs.get_by_key(_features[0]))
                adjust_boundary(feat, _coefficients[0], _rhs)
                totally_removed = True

            # check if constraint is always fulfilled/not fulfilled
            if not totally_removed:
                assert _c is not None
                if len(_c.features) == 0 and _c.rhs >= 0:
                    pass
                elif len(_c.features) == 0 and _c.rhs < 0:
                    raise Exception("Linear constraints cannot be fulfilled.")
                elif np.isinf(_c.rhs):
                    pass
                else:
                    constraints.append(_c)
    domain.constraints = Constraints(constraints=constraints)
    return domain

rref(A, tol=1e-08)

Computes the reduced row echelon form of a Matrix

Parameters:

Name Type Description Default
A ndarray

2d array representing a matrix.

required
tol float

tolerance for rounding to 0. Defaults to 1e-8.

1e-08

Returns:

Type Description
ndarray

(A_rref, pivots), where A_rref is the reduced row echelon form of A and pivots

List[int]

is a numpy array containing the pivot columns of A_rref

Source code in bofire/utils/reduce.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
def rref(A: np.ndarray, tol: float = 1e-8) -> Tuple[np.ndarray, List[int]]:
    """Computes the reduced row echelon form of a Matrix

    Args:
        A (ndarray): 2d array representing a matrix.
        tol (float, optional): tolerance for rounding to 0. Defaults to 1e-8.

    Returns:
        (A_rref, pivots), where A_rref is the reduced row echelon form of A and pivots
        is a numpy array containing the pivot columns of A_rref

    """
    A = np.array(A, dtype=np.float64)
    n, m = np.shape(A)

    col = 0
    row = 0
    pivots = []

    for col in range(m):
        # does a pivot element exist?
        if all(np.abs(A[row:, col]) < tol):
            pass
        # if yes: start elimination
        else:
            pivots.append(col)
            max_row = np.argmax(np.abs(A[row:, col])) + row
            # switch to most stable row
            A[[row, max_row], :] = A[[max_row, row], :]
            # normalize row
            A[row, :] /= A[row, col]
            # eliminate other elements from column
            for r in range(n):
                if r != row:
                    A[r, :] -= A[r, col] / A[row, col] * A[row, :]
            row += 1

    prec = int(-np.log10(tol))
    return np.round(A, prec), pivots

subdomain

get_subdomain(domain, feature_keys)

Removes all features not defined as argument creating a subdomain of the provided domain

Parameters:

Name Type Description Default
domain Domain

the original domain wherefrom a subdomain should be created

required
feature_keys List

List of features that shall be included in the subdomain

required

Raises:

Type Description
Assert

when in total less than 2 features are provided

ValueError

when a provided feature key is not present in the provided domain

Assert

when no output feature is provided

Assert

when no input feature is provided

ValueError

description

Returns:

Name Type Description
Domain Domain

A new domain containing only parts of the original domain

Source code in bofire/utils/subdomain.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def get_subdomain(
    domain: Domain,
    feature_keys: List,
) -> Domain:
    """Removes all features not defined as argument creating a subdomain of the provided domain

    Args:
        domain (Domain): the original domain wherefrom a subdomain should be created
        feature_keys (List): List of features that shall be included in the subdomain

    Raises:
        Assert: when in total less than 2 features are provided
        ValueError: when a provided feature key is not present in the provided domain
        Assert: when no output feature is provided
        Assert: when no input feature is provided
        ValueError: _description_

    Returns:
        Domain: A new domain containing only parts of the original domain

    """
    assert len(feature_keys) >= 2, "At least two features have to be provided."
    outputs = []
    inputs = []
    for key in feature_keys:
        try:
            feat = (domain.inputs + domain.outputs).get_by_key(key)
        except KeyError:
            raise ValueError(f"Feature {key} not present in domain.")
        if isinstance(feat, Input):
            inputs.append(feat)
        else:
            outputs.append(feat)
    assert len(outputs) > 0, "At least one output feature has to be provided."
    assert len(inputs) > 0, "At least one input feature has to be provided."
    inputs = Inputs(features=inputs)
    outputs = Outputs(features=outputs)
    # loop over constraints and make sure that all features used in constraints are in the input_feature_keys
    for c in domain.constraints:
        for key in c.features:
            if key not in inputs.get_keys():
                raise ValueError(
                    f"Removed input feature {key} is used in a constraint.",
                )
    subdomain = deepcopy(domain)
    subdomain.inputs = inputs
    subdomain.outputs = outputs
    return subdomain

torch_tools

InterpolateTransform

Bases: InputTransform, Module

Botorch input transform that interpolates values between given x and y values.

Source code in bofire/utils/torch_tools.py
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
class InterpolateTransform(InputTransform, Module):
    """Botorch input transform that interpolates values between given x and y values."""

    def __init__(
        self,
        new_x: Tensor,
        idx_x: List[int],
        idx_y: List[int],
        prepend_x: Tensor,
        prepend_y: Tensor,
        append_x: Tensor,
        append_y: Tensor,
        keep_original: bool = False,
        transform_on_train: bool = True,
        transform_on_eval: bool = True,
        transform_on_fantasize: bool = True,
    ):
        super().__init__()
        if len(set(idx_x + idx_y)) != len(idx_x) + len(idx_y):
            raise ValueError("Indices are not unique.")

        self.idx_x = torch.as_tensor(idx_x, dtype=torch.long)
        self.idx_y = torch.as_tensor(idx_y, dtype=torch.long)

        self.transform_on_train = transform_on_train
        self.transform_on_eval = transform_on_eval
        self.transform_on_fantasize = transform_on_fantasize
        self.new_x = new_x

        self.prepend_x = prepend_x
        self.prepend_y = prepend_y
        self.append_x = append_x
        self.append_y = append_y

        self.keep_original = keep_original

        if len(self.idx_x) + len(self.prepend_x) + len(self.append_x) != len(
            self.idx_y,
        ) + len(self.prepend_y) + len(self.append_y):
            raise ValueError("The number of x and y indices must be equal.")

    def _to(self, X: Tensor) -> None:
        self.new_x = self.coefficient.to(X)

    def append(self, X: Tensor, values: Tensor) -> Tensor:
        shape = X.shape
        values_reshaped = values.view(*([1] * (len(shape) - 1)), -1)
        values_expanded = values_reshaped.expand(*shape[:-1], -1).to(X)
        return torch.cat([X, values_expanded], dim=-1)

    def prepend(self, X: Tensor, values: Tensor) -> Tensor:
        shape = X.shape
        values_reshaped = values.view(*([1] * (len(shape) - 1)), -1)
        values_expanded = values_reshaped.expand(*shape[:-1], -1).to(X)
        return torch.cat([values_expanded, X], dim=-1)

    def transform(self, X: Tensor):
        shapeX = X.shape

        x = X[..., self.idx_x]
        x = self.prepend(x, self.prepend_x)
        x = self.append(x, self.append_x)

        y = X[..., self.idx_y]
        y = self.prepend(y, self.prepend_y)
        y = self.append(y, self.append_y)

        if X.dim() == 3:
            x = x.reshape((shapeX[0] * shapeX[1], x.shape[-1]))
            y = y.reshape((shapeX[0] * shapeX[1], y.shape[-1]))

        new_x = self.new_x.expand(x.shape[0], -1)
        new_y = torch.vmap(interp1d)(x, y, new_x)

        if X.dim() == 3:
            new_y = new_y.reshape((shapeX[0], shapeX[1], new_y.shape[-1]))

        if self.keep_original:
            return torch.cat([new_y, X], dim=-1)

        return new_y

constrained_objective2botorch(idx, objective, x_adapt, eps=1e-08)

Create a callable that can be used by botorch.utils.objective.apply_constraints to setup output constrained optimizations.

Parameters:

Name Type Description Default
idx int

Index of the constraint objective in the list of outputs.

required
objective BotorchConstrainedObjective

The objective that should be transformed.

required
x_adapt Optional[Tensor]

The tensor that should be used to adapt the objective, for example in case of a moving turning point in the MovingMaximizeSigmoidObjective.

required
eps float

Small value to avoid numerical instabilities in case of the ConstrainedCategoricalObjective. Defaults to 1e-8.

1e-08

Returns:

Type Description
Tuple[List[Callable[[Tensor], Tensor]], List[float], int]

Tuple[List[Callable[[Tensor], Tensor]], List[float], int]: List of callables that can be used by botorch for setting up the constrained objective, list of the corresponding botorch eta values, final index used by the method (to track for categorical variables)

Source code in bofire/utils/torch_tools.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
def constrained_objective2botorch(
    idx: int,
    objective: ConstrainedObjective,
    x_adapt: Optional[Tensor],
    eps: float = 1e-8,
) -> Tuple[List[Callable[[Tensor], Tensor]], List[float], int]:
    """Create a callable that can be used by `botorch.utils.objective.apply_constraints`
    to setup output constrained optimizations.

    Args:
        idx (int): Index of the constraint objective in the list of outputs.
        objective (BotorchConstrainedObjective): The objective that should be transformed.
        x_adapt (Optional[Tensor]): The tensor that should be used to adapt the objective,
            for example in case of a moving turning point in the `MovingMaximizeSigmoidObjective`.
        eps (float, optional): Small value to avoid numerical instabilities in case of the `ConstrainedCategoricalObjective`.
            Defaults to 1e-8.

    Returns:
        Tuple[List[Callable[[Tensor], Tensor]], List[float], int]: List of callables that can be used by botorch for setting up the constrained objective,
            list of the corresponding botorch eta values, final index used by the method (to track for categorical variables)

    """
    assert isinstance(
        objective,
        ConstrainedObjective,
    ), "Objective is not a `ConstrainedObjective`."
    if isinstance(objective, MaximizeSigmoidObjective):
        return (
            [lambda Z: (Z[..., idx] - objective.tp) * -1.0],
            [1.0 / objective.steepness],
            idx + 1,
        )
    if isinstance(objective, MovingMaximizeSigmoidObjective):
        assert x_adapt is not None
        tp = x_adapt.max().item() + objective.tp
        return (
            [lambda Z: (Z[..., idx] - tp) * -1.0],
            [1.0 / objective.steepness],
            idx + 1,
        )
    if isinstance(objective, MinimizeSigmoidObjective):
        return (
            [lambda Z: (Z[..., idx] - objective.tp)],
            [1.0 / objective.steepness],
            idx + 1,
        )
    if isinstance(objective, TargetObjective):
        return (
            [
                lambda Z: (Z[..., idx] - (objective.target_value - objective.tolerance))
                * -1.0,
                lambda Z: (
                    Z[..., idx] - (objective.target_value + objective.tolerance)
                ),
            ],
            [1.0 / objective.steepness, 1.0 / objective.steepness],
            idx + 1,
        )
    if isinstance(objective, ConstrainedCategoricalObjective):
        # The output of a categorical objective has final dim `c` where `c` is number of classes
        # Pass in the expected acceptance probability and perform an inverse sigmoid to attain the original probabilities
        # as Botorch does sigmoid transformation for the constraint by default, therefore we need to unsigmoid the probability
        # (0-1) to (-inf,inf) also we need to invert the probability, where -inf means the constraint is satisfied.
        # Finally, we add some slack (`eps`) 1e-8 to avoid log(0). Have also a look at this botorch community notebook
        # https://github.com/pytorch/botorch/blob/main/notebooks_community/clf_constrained_bo.ipynb and this botorch issue
        # https://github.com/pytorch/botorch/issues/725 which is inspired by this implementation.
        return (
            [
                lambda Z: torch.log(
                    1
                    / torch.clamp(
                        (
                            Z[..., idx : idx + len(objective.desirability)]
                            * torch.tensor(objective.desirability).to(**tkwargs)
                        ).sum(-1),
                        min=eps,
                        max=1 - eps,
                    )
                    - 1,
                ),
            ],
            [1.0],
            idx + len(objective.desirability),
        )
    raise ValueError(f"Objective {objective.__class__.__name__} not known.")

get_initial_conditions_generator(strategy, transform_specs, ask_options=None, sequential=True)

Takes a strategy object and returns a callable which uses this strategy to return a generator callable which can be used in botorchsgen_batch_initial_conditions` to generate samples.

Parameters:

Name Type Description Default
strategy Strategy

Strategy that should be used to generate samples.

required
transform_specs Dict

Dictionary indicating how the samples should be transformed.

required
ask_options Dict

Dictionary of keyword arguments that are passed to the ask method of the strategy. Defaults to {}.

None
sequential bool

If True, samples for every q-batch are generate independent from each other. If False, the n x q samples are generated at once.

True

Returns:

Type Description
Callable[[int, int, int], Tensor]

Callable[[int, int, int], Tensor]: Callable that can be passed to batch_initial_conditions.

Source code in bofire/utils/torch_tools.py
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
def get_initial_conditions_generator(
    strategy: Strategy,
    transform_specs: Dict,
    ask_options: Optional[Dict] = None,
    sequential: bool = True,
) -> Callable[[int, int, int], Tensor]:
    """Takes a strategy object and returns a callable which uses this
    strategy to return a generator callable which can be used in botorch`s
    `gen_batch_initial_conditions` to generate samples.

    Args:
        strategy (Strategy): Strategy that should be used to generate samples.
        transform_specs (Dict): Dictionary indicating how the samples should be
            transformed.
        ask_options (Dict, optional): Dictionary of keyword arguments that are
            passed to the `ask` method of the strategy. Defaults to {}.
        sequential (bool, optional): If True, samples for every q-batch are
            generate independent from each other. If False, the `n x q` samples
            are generated at once.

    Returns:
        Callable[[int, int, int], Tensor]: Callable that can be passed to
            `batch_initial_conditions`.

    """
    if ask_options is None:
        ask_options = {}

    def generator(n: int, q: int, seed: int) -> Tensor:
        if sequential:
            initial_conditions = []
            for _ in range(n):
                candidates = strategy.ask(q, **ask_options)
                # transform it
                transformed_candidates = strategy.domain.inputs.transform(
                    candidates,
                    transform_specs,
                )
                # transform to tensor
                initial_conditions.append(
                    torch.from_numpy(transformed_candidates.values).to(**tkwargs),
                )
            return torch.stack(initial_conditions, dim=0)
        candidates = strategy.ask(n * q, **ask_options)
        # transform it
        transformed_candidates = strategy.domain.inputs.transform(
            candidates,
            transform_specs,
        )
        return (
            torch.from_numpy(transformed_candidates.values)
            .to(**tkwargs)
            .reshape(n, q, transformed_candidates.shape[1])
        )

    return generator

get_interpoint_constraints(domain, n_candidates)

Converts interpoint equality constraints to linear equality constraints, that can be processed by botorch. For more information, see the docstring of optimize_acqf in botorch (https://github.com/pytorch/botorch/blob/main/botorch/optim/optimize.py).

Parameters:

Name Type Description Default
domain Domain

Optimization problem definition.

required
n_candidates int

Number of candidates that should be requested.

required

Returns:

Type Description
List[Tuple[Tensor, Tensor, float]]

List[Tuple[Tensor, Tensor, float]]: List of tuples, each tuple consists of a tensor with the feature indices, coefficients and a float for the rhs.

Source code in bofire/utils/torch_tools.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def get_interpoint_constraints(
    domain: Domain,
    n_candidates: int,
) -> List[Tuple[Tensor, Tensor, float]]:
    """Converts interpoint equality constraints to linear equality constraints,
        that can be processed by botorch. For more information, see the docstring
        of `optimize_acqf` in botorch
        (https://github.com/pytorch/botorch/blob/main/botorch/optim/optimize.py).

    Args:
        domain (Domain): Optimization problem definition.
        n_candidates (int): Number of candidates that should be requested.

    Returns:
        List[Tuple[Tensor, Tensor, float]]: List of tuples, each tuple consists
            of a tensor with the feature indices, coefficients and a float for the rhs.

    """
    constraints = []
    if n_candidates == 1:
        return constraints
    for constraint in domain.constraints.get(InterpointEqualityConstraint):
        assert isinstance(constraint, InterpointEqualityConstraint)
        coefficients = torch.tensor([1.0, -1.0]).to(**tkwargs)
        feat_idx = domain.inputs.get_keys(Input).index(constraint.feature)
        feat = domain.inputs.get_by_key(constraint.feature)
        assert isinstance(feat, ContinuousInput)
        if feat.is_fixed():
            continue
        multiplicity = constraint.multiplicity or n_candidates
        for i in range(math.ceil(n_candidates / multiplicity)):
            all_indices = torch.arange(
                i * multiplicity,
                min((i + 1) * multiplicity, n_candidates),
            )
            for k in range(len(all_indices) - 1):
                indices = torch.tensor(
                    [[all_indices[0], feat_idx], [all_indices[k + 1], feat_idx]],
                    dtype=torch.int64,
                )
                constraints.append((indices, coefficients, 0.0))
    return constraints

get_linear_constraints(domain, constraint, unit_scaled=False)

Converts linear constraints to the form required by BoTorch. For inequality constraints, this is A * x >= b (!).

Parameters:

Name Type Description Default
domain Domain

Optimization problem definition.

required
constraint Union[Type[LinearEqualityConstraint], Type[LinearInequalityConstraint]]

Type of constraint that should be converted.

required
unit_scaled bool

If True, transforms constraints by assuming that the bound for the continuous features are [0,1]. Defaults to False.

False

Returns:

Type Description
List[Tuple[Tensor, Tensor, float]]

List[Tuple[Tensor, Tensor, float]]: List of tuples, each tuple consists of a tensor with the feature indices, coefficients and a float for the rhs.

Source code in bofire/utils/torch_tools.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def get_linear_constraints(
    domain: Domain,
    constraint: Union[Type[LinearEqualityConstraint], Type[LinearInequalityConstraint]],
    unit_scaled: bool = False,
) -> List[Tuple[Tensor, Tensor, float]]:
    """Converts linear constraints to the form required by BoTorch. For inequality constraints, this is A * x >= b (!).

    Args:
        domain: Optimization problem definition.
        constraint: Type of constraint that should be converted.
        unit_scaled: If True, transforms constraints by assuming that the bound for the continuous features are [0,1]. Defaults to False.

    Returns:
        List[Tuple[Tensor, Tensor, float]]: List of tuples, each tuple consists of a tensor with the feature indices, coefficients and a float for the rhs.

    """
    constraints = []
    for c in domain.constraints.get(constraint):
        indices = []
        coefficients = []
        lower = []
        upper = []
        rhs = 0.0
        for i, featkey in enumerate(c.features):
            idx = domain.inputs.get_keys(Input).index(featkey)
            feat = domain.inputs.get_by_key(featkey)
            if feat.is_fixed():
                rhs -= feat.fixed_value()[0] * c.coefficients[i]  # type: ignore
            else:
                lower.append(feat.lower_bound)  # type: ignore
                upper.append(feat.upper_bound)  # type: ignore
                indices.append(idx)
                coefficients.append(
                    c.coefficients[i],
                )  # if unit_scaled == False else c_scaled.coefficients[i])
        if unit_scaled:
            lower = np.array(lower)
            upper = np.array(upper)
            s = upper - lower
            scaled_coefficients = s * np.array(coefficients)
            constraints.append(
                (
                    torch.tensor(indices),
                    -torch.tensor(scaled_coefficients).to(**tkwargs),
                    -(rhs + c.rhs - np.sum(np.array(coefficients) * lower)),
                ),
            )
        else:
            constraints.append(
                (
                    torch.tensor(indices),
                    -torch.tensor(coefficients).to(**tkwargs),
                    -(rhs + c.rhs),
                ),
            )
    return constraints

get_multiobjective_objective(outputs, experiments)

Returns a callable that can be used by botorch for multiobjective optimization.

Parameters:

Name Type Description Default
outputs Outputs

Outputs object for which the callable should be generated.

required
experiments DataFrame

DataFrame containing the experiments that are used for adapting the objectives on the fly, for example in the case of the MovingMaximizeSigmoidObjective.

required

Returns:

Type Description
Callable[[Tensor, Optional[Tensor]], Tensor]

Callable[[Tensor], Tensor]: description

Source code in bofire/utils/torch_tools.py
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
def get_multiobjective_objective(
    outputs: Outputs,
    experiments: pd.DataFrame,
) -> Callable[[Tensor, Optional[Tensor]], Tensor]:
    """Returns a callable that can be used by botorch for multiobjective optimization.

    Args:
        outputs (Outputs): Outputs object for which the callable should be generated.
        experiments (pd.DataFrame): DataFrame containing the experiments that are used for
            adapting the objectives on the fly, for example in the case of the
            `MovingMaximizeSigmoidObjective`.

    Returns:
        Callable[[Tensor], Tensor]: _description_

    """
    allowed_objectives = [MaximizeObjective, MinimizeObjective, CloseToTargetObjective]

    callables_outputs, _, _ = _callables_and_weights(
        outputs,
        experiments,
        allowed_objectives=allowed_objectives,
        adapt_weights_to_1_inf=False,
    )

    def objective(samples: Tensor, X: Optional[Tensor] = None) -> Tensor:
        return torch.stack([c(samples, None) for c in callables_outputs], dim=-1)

    return objective

get_multiplicative_additive_objective(outputs, experiments, exclude_constraints=True, additive_features=None, adapt_weights_to_1_inf=True)

Computes the objective as a mix of multiplicative and additive objectives. By default, all objectives are multiplicative. Additive features (inputs or outputs) can be specified in the additive_features list.

The formular for a mixed objective with two multiplicative features (f1, and f2 with weights w1 and w2) and two additive features (f3 and f4 with weights w3 and w4) is:

additive_objective = 1 + f3*w3 + f4*w4

objective = f1^w1 * f2^w2 * additive_objective

Parameters:

Name Type Description Default
additive_features List[str]

list of features that should be treated as additive

None
adapt_weights_to_1_inf bool

will transform weights from [0,1] to [1,inf) space

True

Returns:

Name Type Description
objective callable

callable that can be used by botorch for optimization

Source code in bofire/utils/torch_tools.py
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
def get_multiplicative_additive_objective(
    outputs: Outputs,
    experiments: pd.DataFrame,
    exclude_constraints: bool = True,
    additive_features: Optional[List[str]] = None,
    adapt_weights_to_1_inf: bool = True,
) -> Callable[[Tensor, Tensor], Tensor]:
    """Computes the objective as a mix of multiplicative and additive objectives. By default, all objectives are multiplicative.
    Additive features (inputs or outputs) can be specified in the `additive_features` list.

    The formular for a mixed objective with two multiplicative features (f1, and f2 with weights w1 and w2) and two
    additive features (f3 and f4 with weights w3 and w4) is:

        additive_objective = 1 + f3*w3 + f4*w4

        objective = f1^w1 * f2^w2 * additive_objective


    Args:
        outputs
        experiments
        exclude_constraints
        additive_features (List[str]): list of features that should be treated as additive
        adapt_weights_to_1_inf (bool): will transform weights from [0,1] to [1,inf) space

    Returns:
        objective (callable): callable that can be used by botorch for optimization

    """

    callables, weights, keys = _callables_and_weights(
        outputs,
        experiments,
        exclude_constraints=exclude_constraints,
        adapt_weights_to_1_inf=adapt_weights_to_1_inf,
    )

    if additive_features is None:
        additive_features = []

    def _differ_additive_and_multiplicative_features(callables, weights, feature_names):
        callables_additive, weights_additive = [], []
        callables_multiplicative, weights_multiplicative = [], []
        for c, w, key in zip(callables, weights, feature_names):
            if key in additive_features:
                callables_additive.append(c)
                weights_additive.append(w)
            else:
                callables_multiplicative.append(c)
                weights_multiplicative.append(w)
        return (
            callables_additive,
            weights_additive,
            callables_multiplicative,
            weights_multiplicative,
        )

    (
        callables_additive,
        weights_additive,
        callables_multiplicative,
        weights_multiplicative,
    ) = _differ_additive_and_multiplicative_features(callables, weights, keys)

    def objective(samples: Tensor, X: Optional[Tensor] = None) -> Tensor:
        additive_objective = torch.tensor(1.0).to(**tkwargs)
        for c, w in zip(callables_additive, weights_additive):
            additive_objective = additive_objective + c(samples, None) * w

        multiplicative_objective = torch.tensor(1.0).to(**tkwargs)
        for c, w in zip(callables_multiplicative, weights_multiplicative):
            multiplicative_objective = multiplicative_objective * c(samples, None) ** w

        y: Tensor = multiplicative_objective * additive_objective
        return y

    return objective

get_nchoosek_constraints(domain)

Transforms NChooseK constraints into a list of non-linear inequality constraint callables that can be parsed by pydantic. For this purpose the NChooseK constraint is continuously relaxed by countig the number of zeros in a candidate by a sum of narrow gaussians centered at zero.

Parameters:

Name Type Description Default
domain Domain

Optimization problem definition.

required

Returns:

Type Description
List[Tuple[Callable[[Tensor], float], bool]]

List[Callable[[Tensor], float]]: List of callables that can be used as nonlinear equality constraints in botorch.

Source code in bofire/utils/torch_tools.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def get_nchoosek_constraints(
    domain: Domain,
) -> List[Tuple[Callable[[Tensor], float], bool]]:
    """Transforms NChooseK constraints into a list of non-linear inequality constraint callables
    that can be parsed by pydantic. For this purpose the NChooseK constraint is continuously
    relaxed by countig the number of zeros in a candidate by a sum of narrow gaussians centered
    at zero.

    Args:
        domain (Domain): Optimization problem definition.

    Returns:
        List[Callable[[Tensor], float]]: List of callables that can be used
            as nonlinear equality constraints in botorch.

    """

    def narrow_gaussian(x, ell=1e-3):
        return torch.exp(-0.5 * (x / ell) ** 2)

    def max_constraint(indices: Tensor, num_features: int, max_count: int):
        return lambda x: narrow_gaussian(x=x[..., indices]).sum(dim=-1) - (
            num_features - max_count
        )

    def min_constraint(indices: Tensor, num_features: int, min_count: int):
        return lambda x: -narrow_gaussian(x=x[..., indices]).sum(dim=-1) + (
            num_features - min_count
        )

    constraints = []
    # ignore none also valid for the start
    for c in domain.constraints.get(NChooseKConstraint):
        assert isinstance(c, NChooseKConstraint)
        indices = torch.tensor(
            [domain.inputs.get_keys(ContinuousInput).index(key) for key in c.features],
            dtype=torch.int64,
        )
        if c.max_count != len(c.features):
            constraints.append(
                (
                    max_constraint(
                        indices=indices,
                        num_features=len(c.features),
                        max_count=c.max_count,
                    ),
                    True,
                )
            )
        if c.min_count > 0:
            constraints.append(
                (
                    min_constraint(
                        indices=indices,
                        num_features=len(c.features),
                        min_count=c.min_count,
                    ),
                    True,
                )
            )
    return constraints

get_nonlinear_constraints(domain, includes=None)

Returns a list of callable functions that represent the nonlinear constraints for the given domain that can be processed by botorch.

Parameters:

Name Type Description Default
domain Domain

The domain for which to generate the nonlinear constraints.

required

Returns:

Type Description
List[Tuple[Callable[[Tensor], float], bool]]

List[Callable[[Tensor], float]]: A list of callable functions that take a tensor

List[Tuple[Callable[[Tensor], float], bool]]

as input and return a float value representing the constraint evaluation.

Source code in bofire/utils/torch_tools.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def get_nonlinear_constraints(
    domain: Domain,
    includes: Optional[List[Type[Constraint]]] = None,
) -> List[Tuple[Callable[[Tensor], float], bool]]:
    """Returns a list of callable functions that represent the nonlinear constraints
    for the given domain that can be processed by botorch.

    Args:
        domain (Domain): The domain for which to generate the nonlinear constraints.

    Returns:
        List[Callable[[Tensor], float]]: A list of callable functions that take a tensor
        as input and return a float value representing the constraint evaluation.

    """
    includes = includes or [NChooseKConstraint, ProductInequalityConstraint]
    assert all(
        (c in (NChooseKConstraint, ProductInequalityConstraint) for c in includes)
    ), "Only NChooseK and ProductInequality constraints are supported."

    callables = []
    if NChooseKConstraint in includes:
        callables += get_nchoosek_constraints(domain)
    if ProductInequalityConstraint in includes:
        callables += get_product_constraints(domain)

    return callables

get_output_constraints(outputs, experiments)

Method to translate output constraint objectives into a list of callables and list of etas for use in botorch.

Parameters:

Name Type Description Default
outputs Outputs

Output feature object that should be processed.

required
experiments DataFrame

DataFrame containing the experiments that are used for adapting the objectives on the fly, for example in the case of the MovingMaximizeSigmoidObjective.

required

Returns:

Type Description
Tuple[List[Callable[[Tensor], Tensor]], List[float]]

Tuple[List[Callable[[Tensor], Tensor]], List[float]]: List of constraint callables, list of associated etas.

Source code in bofire/utils/torch_tools.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
def get_output_constraints(
    outputs: Outputs,
    experiments: pd.DataFrame,
) -> Tuple[List[Callable[[Tensor], Tensor]], List[float]]:
    """Method to translate output constraint objectives into a list of
    callables and list of etas for use in botorch.

    Args:
        outputs (Outputs): Output feature object that should
            be processed.
        experiments (pd.DataFrame): DataFrame containing the experiments that are used for
            adapting the objectives on the fly, for example in the case of the
            `MovingMaximizeSigmoidObjective`.

    Returns:
        Tuple[List[Callable[[Tensor], Tensor]], List[float]]: List of constraint callables,
            list of associated etas.

    """
    constraints = []
    etas = []
    idx = 0
    for feat in outputs.get():
        if isinstance(feat.objective, ConstrainedObjective):
            cleaned_experiments = outputs.preprocess_experiments_one_valid_output(
                feat.key,
                experiments,
            )
            iconstraints, ietas, idx = constrained_objective2botorch(
                idx,
                objective=feat.objective,
                x_adapt=torch.from_numpy(cleaned_experiments[feat.key].values).to(
                    **tkwargs,
                )
                if not isinstance(feat.objective, ConstrainedCategoricalObjective)
                else None,
            )
            constraints += iconstraints
            etas += ietas
        else:
            idx += 1
    return constraints, etas

get_product_constraints(domain)

Returns a list of nonlinear constraint functions that can be processed by botorch based on the given domain.

Parameters:

Name Type Description Default
domain Domain

The domain object containing the constraints.

required

Returns:

Type Description
List[Tuple[Callable[[Tensor], float], bool]]

List[Callable[[Tensor], float]]: A list of product constraint functions.

Source code in bofire/utils/torch_tools.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def get_product_constraints(
    domain: Domain,
) -> List[Tuple[Callable[[Tensor], float], bool]]:
    """Returns a list of nonlinear constraint functions that can be processed by botorch
    based on the given domain.

    Args:
        domain (Domain): The domain object containing the constraints.

    Returns:
        List[Callable[[Tensor], float]]: A list of product constraint functions.

    """

    def product_constraint(indices: Tensor, exponents: Tensor, rhs: float, sign: int):
        return lambda x: -1.0 * sign * (x[..., indices] ** exponents).prod(dim=-1) + rhs

    constraints = []
    for c in domain.constraints.get(ProductInequalityConstraint):
        assert isinstance(c, ProductInequalityConstraint)
        indices = torch.tensor(
            [domain.inputs.get_keys(ContinuousInput).index(key) for key in c.features],
            dtype=torch.int64,
        )
        constraints.append(
            (
                product_constraint(indices, torch.tensor(c.exponents), c.rhs, c.sign),
                True,
            )
        )
    return constraints

interp1d(x, y, x_new)

Interpolates values in the y tensor based on the x tensor using linear interpolation.

Parameters:

Name Type Description Default
x Tensor

The x-coordinates of the data points.

required
y Tensor

The y-coordinates of the data points.

required
x_new Tensor

The x-coordinates at which to interpolate the values.

required

Returns:

Type Description
Tensor

torch.Tensor: The interpolated values at the x_new x-coordinates.

Source code in bofire/utils/torch_tools.py
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
@torch.jit.script  # type: ignore
def interp1d(
    x: torch.Tensor,
    y: torch.Tensor,
    x_new: torch.Tensor,
) -> torch.Tensor:
    """Interpolates values in the y tensor based on the x tensor using linear interpolation.

    Args:
        x (torch.Tensor): The x-coordinates of the data points.
        y (torch.Tensor): The y-coordinates of the data points.
        x_new (torch.Tensor): The x-coordinates at which to interpolate the values.

    Returns:
        torch.Tensor: The interpolated values at the x_new x-coordinates.

    """
    m = (y[1:] - y[:-1]) / (x[1:] - x[:-1])
    b = y[:-1] - (m * x[:-1])

    idx = torch.sum(torch.ge(x_new[:, None], x[None, :]), 1) - 1
    idx = torch.clamp(idx, 0, len(m) - 1)

    itp = m[idx] * x_new + b[idx]

    return itp