Module PDESolver.Solver
Expand source code
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.transforms import blended_transform_factory
from scipy.stats import gaussian_kde
from tqdm import tqdm
import warnings
import logging
import os
__all__ = ['Solver', 'Optimizer']
warnings.filterwarnings("ignore", category=UserWarning)
logging.getLogger('tensorflow').setLevel(logging.ERROR)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
class Solver:
def __init__(self, bvp, optimizer, num_hidden_layers=4, num_neurons_per_layer=50):
"""
Constructor for a boundary value problem solver.
Parameters
-----------
bvp: BoundaryValueProblem
Boundary value problem to be solved
optimizer: Optimizer
Optimizer to be used for training the neural network
num_hidden_layers: int
Number of hidden layers in the neural network
Defaults to 4
num_neurons_per_layer: int
Number of neurons in each hidden layer
Defaults to 50
Examples
-----------
>>> from PDESolver import *
>>> optimizer = Optimizer(initial_learning_rate=1e-3, decay_steps=1000, decay_rate=0.9)
>>> bvp = Laplace()
>>> solver = Solver(bvp, optimizer, num_hidden_layers=4, num_neurons_per_layer=50)
"""
num_inputs = len(bvp.get_specification()['variables'])
num_outputs = len(bvp.get_specification()['components'])
inner_constraint = [condition for condition in bvp.get_conditions() if condition.name == 'inner'][0]
mean, variance = inner_constraint.get_normalization_constants()
self.model = init_model(num_inputs, num_outputs, num_hidden_layers, num_neurons_per_layer, mean, variance)
self.bvp = bvp
self.optimizer = optimizer
self.loss_history = []
self.weight_history = [{cond.name: 1. for cond in bvp.get_conditions()}]
self.weights = tf.Variable(np.ones(len(bvp.get_conditions())), dtype=tf.float32)
self.step = tf.Variable(0)
def compute_differentials(self, samplePoints):
"""
Calculates the differentials of the model at the given points.
Parameters
-----------
samplePoints: tensor
Tensor of points to calculate the differentials at
Returns
-----------
dict: Dictionary containing the differentials of the model needed for the bvp at the given points
Examples
-----------
>>> from PDESolver import *
>>> solver = Solver(Laplace(), Optimizer())
>>> samples = tf.constant([[0, 0], [0, 0.5], [0, 1]])
>>> solver.compute_differentials(samples)
{'x': <tf.Tensor: shape=(3, 1), dtype=float32, numpy=
array([[0.],
[0.],
[0.]], dtype=float32)>, 'y': <tf.Tensor: shape=(3, 1), dtype=float32, numpy=
array([[0. ],
[0.5],
[1. ]], dtype=float32)>, 'u': <tf.Tensor: shape=(3, 1), dtype=float32, numpy=
array([[-1.4298753 ],
[-1.0829226 ],
[-0.86016774]], dtype=float32)>, 'u_x': <tf.Tensor: shape=(3, 1), dtype=float32, numpy=
array([[0.7101533 ],
[0.7047943 ],
[0.45764494]], dtype=float32)>, 'u_xx': <tf.Tensor: shape=(3, 1), dtype=float32, numpy=
array([[-1.6525286],
[-1.6283079],
[-1.2708694]], dtype=float32)>, 'u_y': <tf.Tensor: shape=(3, 1), dtype=float32, numpy=
array([[0.71874154],
[0.6189531 ],
[0.2856549 ]], dtype=float32)>, 'u_yy': <tf.Tensor: shape=(3, 1), dtype=float32, numpy=
array([[-0.01344297],
[-0.5869958 ],
[-0.47277603]], dtype=float32)>}
"""
specs = self.bvp.get_specification()
stack = lambda *tensors: tf.concat(tensors, axis=1)
component_funs = {component: lambda x, index=i: self.model(x)[:, index]
for i, component in enumerate(specs["components"])}
gradient_dict = {}
with tf.GradientTape(persistent=True) as tape:
for i, variable in enumerate(specs["variables"]):
gradient_dict[variable] = samplePoints[:, i:i + 1]
tape.watch(gradient_dict[variable])
watched = [gradient_dict[variable] for variable in specs["variables"]]
for component in specs["components"]:
output = component_funs[component](tf.stack(watched, axis=1))
gradient_dict[component] = tf.reshape(output, (-1, 1))
for differential in specs["differentials"]:
component = differential.split("_")[0]
variables = differential.split("_")[1]
differential_head = component
for i, variable in enumerate(variables):
if i == 0:
differential_head_new = differential_head + "_" + variable
else:
differential_head_new = differential_head + variable
if not differential_head_new in gradient_dict:
gradient_dict[differential_head_new] = tape.gradient(gradient_dict[differential_head], gradient_dict[variable])
differential_head = differential_head_new
for (stacked_component_name, components) in specs["stacked_components"].items():
stacked = stack(*[gradient_dict[component] for component in components])
gradient_dict[stacked_component_name] = stacked
return gradient_dict
def compute_residuals(self):
"""
Gets the residuals for the boundary value problem.
Returns
-----------
dict: Dictionary of residuals of form {condition_name: residual}
Examples
-----------
>>> from PDESolver import *
>>> solver = Solver(Laplace(minibatch_size=3), Optimizer())
>>> solver.compute_residuals()
{'zero_boundary': <tf.Tensor: shape=(3, 1), dtype=float32, numpy=
array([[1.7787822],
[2.6922252],
[1.3558891]], dtype=float32)>, 'f_boundary': <tf.Tensor: shape=(3, 1), dtype=float32, numpy=
array([[2.4186134],
[2.4582624],
[2.4142814]], dtype=float32)>, 'inner': <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[ 0.53346014],
[-4.9377484 ],
[ 0.21452093],
[-1.6314204 ]], dtype=float32)>}
"""
conditions = self.bvp.get_conditions()
residuals = {}
for condition in conditions:
samples = condition.sample_points()
Du = self.compute_differentials(samples)
residuals[condition.name] = condition.residue_fn(Du)
return residuals
def compute_losses(self):
"""
Computes the losses for the neural network.
Returns
-----------
dict: Dictionary of losses of form {condition_name: loss}
Examples
-----------
>>> from PDESolver import *
>>> solver = Solver(Laplace(), Optimizer())
>>> solver.compute_losses()
{'zero_boundary': <tf.Tensor: shape=(), dtype=float32, numpy=0.29857153>, 'f_boundary': <tf.Tensor: shape=(), dtype=float32, numpy=0.07588613>, 'inner': <tf.Tensor: shape=(), dtype=float32, numpy=6.3834033>, 'L2': <tf.Tensor: shape=(), dtype=float32, numpy=6.757861>, 'total': <tf.Tensor: shape=(), dtype=float32, numpy=13.515722>}
"""
criterion = tf.keras.losses.MeanSquaredError()
residuals = self.compute_residuals()
losses = {name: 0.0 for name in residuals.keys()}
l2loss = 0.0
for i, (name, residual) in enumerate(residuals.items()):
losses[name] += self.weights[i] * criterion(residual, 0.0)
l2loss += criterion(residual, 0.0)
losses['total'] = sum(losses.values())
losses['L2'] = l2loss
return losses
def compute_gradients(self):
"""
Gets the gradients of the neural network.
Returns
-----------
tensor: L2 loss
dict: Dictionary of gradients of form {condition_name: gradient}
Examples
-----------
>>> from PDESolver import *
>>> solver = Solver(Laplace(), Optimizer())
>>> solver.compute_gradients()
(<tf.Tensor: shape=(), dtype=float32, numpy=4.756295>, LARGE DICTIONARY)
"""
with tf.GradientTape(persistent=True) as tape:
tape.watch(self.model.trainable_variables)
losses = self.compute_losses()
kwargs = {'sources': self.model.trainable_variables, 'unconnected_gradients': tf.UnconnectedGradients.ZERO}
grads = {name: tape.gradient(loss, **kwargs) for name, loss in losses.items() if name != 'L2'}
return losses['L2'], grads
def adjust_weights(self, gradients):
"""
Adjusts the weights of the PDE and data losses.
Parameters
-----------
gradients: dict
Dictionary of gradients as returned by compute_gradients()
Returns
-----------
dict: Dictionary of adjusted condition weights of form {condition_name: weight}
Examples
-----------
>>> from PDESolver import *
>>> solver = Solver(Laplace(), Optimizer())
>>> l2, gradients = solver.compute_gradients()
>>> solver.adjust_weights(gradients)
{'zero_boundary': <tf.Tensor: shape=(), dtype=float32, numpy=38.457893>, 'f_boundary': <tf.Tensor: shape=(), dtype=float32, numpy=22.964037>, 'inner': <tf.Tensor: shape=(), dtype=float32, numpy=1.0>}
"""
gradient_vectors = [tf.concat([tf.reshape(gradient, [-1]) for gradient in gradients_list], axis=0) for gradients_list in gradients.values()][:-1] # excluding 'totalgrad'
variances = [tf.math.reduce_variance(gradient) for gradient in gradient_vectors]
most_varying = tf.math.argmax(variances)
most_varying_absmax = tf.math.reduce_max(tf.abs(tf.gather(gradient_vectors, most_varying)))
new_weights = {}
minimal = tf.float32.max
for i, cond in enumerate(self.bvp.get_conditions()):
name = cond.name
if i != most_varying:
new_weight = most_varying_absmax / (tf.reduce_mean(tf.abs(gradient_vectors[i])))
new_weight = 0.9 * self.weights[i] + 0.1 * new_weight
else:
new_weight = self.weights[i]
new_weights[name] = new_weight
minimal = tf.math.minimum(minimal, new_weights[name])
new_weights = {name: new_weights[name] / minimal for name in new_weights.keys()}
self.weights.assign([new_weight for new_weight in new_weights.values()])
return new_weights
@tf.function
def train_step(self):
"""
Performs a single training step.
Returns
-----------
tensor: L2 loss
dict: Dictionary of gradients of form {condition_name: gradient}
dict: Dictionary of adjusted condition weights of form {condition_name: weight}
Examples
-----------
>>> from PDESolver import *
>>> solver = Solver(Laplace(), Optimizer())
>>> l2loss, gradients, new_weights = solver.train_step()
>>> l2loss
<tf.Tensor: shape=(), dtype=float32, numpy=4.371607>
>>> l2loss, gradients, new_weights = solver.train_step()
>>> l2loss
<tf.Tensor: shape=(), dtype=float32, numpy=3.0963678>
>>> l2loss, gradients, new_weights = solver.train_step()
>>> l2loss
<tf.Tensor: shape=(), dtype=float32, numpy=3.029224>
"""
l2loss, gradients = self.compute_gradients()
if self.step % 10 == 0:
new_weights = self.adjust_weights(gradients)
else:
new_weights = {cond.name: -1. for cond in self.bvp.get_conditions()}
self.optimizer.apply_gradients(zip(gradients['total'], self.model.trainable_variables))
self.step.assign(self.step + 1)
return l2loss, gradients, new_weights
def train(self, iterations=10000, debug_frequency=2500):
"""
Trains the neural network to solve the boundary value problem.
Parameters
-----------
iterations: int (default=10000)
Number of iterations to train for
debug_frequency: int (default=2500)
Frequency (every X iterations) at which to show debug panel.
If negative, no debug panel is shown
Examples
-----------
>>> from PDESolver import *
>>> solver = Solver(Laplace(), Optimizer())
>>> solver.train(iterations=10000, debug_frequency=2500)
øL²-loss = 6.044e+01 (best: 2.305e+01, 000064it ago) lr = 0.00090: 3%|█ | 1271/40000 [00:33<07:53, 81.76it/s]
"""
best_loss = np.inf
iterations_since_last_improvement = 0
k_max = int(np.ceil(np.log10(iterations))) + 1
pbar = tqdm(range(iterations), desc='Pending...')
for i in pbar:
l2loss, gradients, new_weights = self.train_step()
self.loss_history += [l2loss.numpy()]
if l2loss.numpy() < best_loss:
best_loss = l2loss.numpy()
iterations_since_last_improvement = 0
else:
iterations_since_last_improvement += 1
if list(new_weights.values())[0] != -1:
self.weight_history += [{name: new_weights[name].numpy() for name in new_weights.keys()}]
avg_loss = np.mean(self.loss_history[-100:])
pbar.desc = f'øL²-loss = {avg_loss:.3e} (best: {best_loss:.3e}, {iterations_since_last_improvement:0{k_max}d}it ago) lr = {self.optimizer.lr.numpy():.5f}'
if debug_frequency > 0 and (i % debug_frequency == 0 or i == iterations - 1):
self.show_debugplot(gradients)
def show_debugplot(self, gradients):
"""
Shows a debug plot of the neural network.
Parameters
-----------
gradients: tuple
Tuple of gradients of the PDE and data losses obtained from compute_gradients
Examples
-----------
>>> from PDESolver import *
>>> solver = Solver(Laplace(), Optimizer())
>>> l2loss, gradients = solver.compute_gradients()
>>> solver.loss_history += [l2loss.numpy()]
>>> solver.show_debugplot(gradients)
"""
fig = plt.figure(figsize=(16, 8), layout='compressed')
subfigs = fig.subfigures(2, 1, hspace=0)
axs = subfigs[0].subplots(1, len(self.model.layers) - 4)
subfigs[0].suptitle('Gradient Distributions')
ax_count = -1
for j in range(len(self.model.layers)):
if j in [0, 1, 3, 4]:
continue
else:
ax_count += 1
layer_gradient = lambda name: tf.concat([tf.reshape(gradient, [-1]) for gradient in gradients[name][2*(j-2):2*(j-1)]], axis=0)
density = lambda i, x: gaussian_kde(layer_gradient(i).numpy())(x)
xs = np.linspace(-2.5, 2.5, 1000)
for cond in self.bvp.get_conditions():
axs[ax_count].plot(xs, density(cond.name, xs), lw=0.5, label=cond.name)
axs[ax_count].set_xlim(min(xs), max(xs))
axs[ax_count].set_ylim(0, 100)
axs[ax_count].set_yscale('symlog')
axs[ax_count].set_title(self.model.layers[j].name)
axs[-1].legend()
n = len(self.loss_history)
averaged_loss = [np.mean(self.loss_history[:k+1] if k < 99 else self.loss_history[k-99:k+1]) for k in range(n)]
best_loss = np.min(self.loss_history)
axs = subfigs[1].subplots(1, 2)
trans = blended_transform_factory(axs[0].transAxes, axs[0].transData)
loss_handle, = axs[0].semilogy(range(n), self.loss_history, 'k-', lw=0.5, alpha=0.5, label='Loss')
avg_loss_handle, = axs[0].semilogy(range(n), averaged_loss, 'r--', lw=1, label=f'$ø_{{{min(100, n)}}}$ Loss')
axs[0].axhline(best_loss, color='g', lw=0.5)
axs[0].text(0.995, 0.97 * best_loss, f'Best: {best_loss:.3e}', ha='right', va='top', color='g', transform=trans)
ax = axs[0].twinx()
ax.set_yscale('log')
lr_handle, = ax.plot(self.optimizer.lr_history_upto(n), 'b-', lw=0.5, label='Learning rate')
axs[0].set_title('Loss (left) and learning rate (right) history')
axs[0].set_xlabel('Iteration')
axs[0].set_xlim(0, n - (1 if n > 1 else 0))
if n > 1:
handles = [loss_handle, avg_loss_handle, lr_handle]
axs[0].legend(handles=handles, loc='lower left')
for cond in self.bvp.get_conditions():
cond_weight_history = [self.weight_history[k][cond.name] for k in range(len(self.weight_history))]
axs[1].plot(cond_weight_history, lw=0.5)
axs[1].set_title('Weight History')
axs[1].set_xlabel('Update #')
axs[1].set_xlim(0, len(self.weight_history) - 1)
axs[1].set_yscale('log')
if os.name == 'nt':
figManager = plt.get_current_fig_manager()
figManager.window.state("zoomed")
plt.show()
class Optimizer(tf.keras.optimizers.Adam):
""""""
def __init__(self, initial_learning_rate=1e-3, decay_steps=1000, decay_rate=0.9):
"""
Optimizer for the neural network used in a solver object.
Acts as a wrapper for the Adam optimizer with an exponential learning rate decay.
Parameters
-----------
initial_learning_rate: float
Initial learning rate of the optimizer.
Defaults to 1e-3.
decay_steps: int
Number of iterations after which the learning rate has decayed by the specified factor.
Defaults to 1000.
decay_rate: float
Factor by which the learning rate is decayed.
Defaults to 0.9.
Examples
-----------
>>> from PDESolver import *
>>> optimizer = Optimizer(initial_learning_rate=1, decay_steps=500, decay_rate=0.5)
>>> solver = Solver(Laplace(), optimizer)
"""
self.lr_scheduler = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate, decay_steps, decay_rate)
super().__init__(learning_rate=self.lr_scheduler)
def lr_history_upto(self, iteration):
"""
Returns the learning rate history up to the specified iteration.
Parameters
-----------
iteration: int
Iteration up to which the learning rate history is returned.
Returns
-----------
numpy.ndarray: Learning rate history up to the specified iteration.
Examples
-----------
>>> optimizer = Optimizer()
>>> optimizer.lr_history_upto(10)
array([0.001 , 0.00099989, 0.00099979, 0.00099968, 0.00099958,
0.00099947, 0.00099937, 0.00099926, 0.00099916, 0.00099905,
0.00099895], dtype=float32)>
"""
iters = np.arange(0, iteration + 1)
return self.lr_scheduler(iters)
###################################################################################
###################################################################################
###################################################################################
def xavier_init(size):
in_dim = size[0]
out_dim = size[1]
xavier_stddev = 1. / np.sqrt((in_dim + out_dim) / 2.)
return tf.Variable(tf.random.normal([in_dim, out_dim], dtype=tf.float32) * xavier_stddev,
dtype=tf.float32, trainable=True)
class Encoder(tf.keras.layers.Layer):
def __init__(self, inputs, outputs, activation=tf.tanh):
super(Encoder, self).__init__()
self.inputs = inputs
self.outputs = outputs
self.activation = activation
def build(self, input_shape):
self.W = xavier_init([self.inputs, self.outputs])
self.b = xavier_init([1, self.outputs])
def call(self, inputs):
return self.activation(tf.add(tf.matmul(inputs, self.W), self.b))
class ImprovedLinear(tf.keras.layers.Layer):
def __init__(self, inputs, outputs, activation=tf.tanh):
super(ImprovedLinear, self).__init__()
self.inputs = inputs
self.outputs = outputs
self.activation = activation
def build(self, input_shape):
self.W = xavier_init([self.inputs, self.outputs])
self.b = xavier_init([1, self.outputs])
def call(self, inputs, encoder_1, encoder_2):
return tf.math.multiply(self.activation(tf.add(tf.matmul(inputs, self.W), self.b)), encoder_1) + \
tf.math.multiply(1 - self.activation(tf.add(tf.matmul(inputs, self.W), self.b)), encoder_2)
class Linear(tf.keras.layers.Layer):
def __init__(self, inputs, outputs, activation=tf.tanh):
super(Linear, self).__init__()
self.inputs = inputs
self.outputs = outputs
self.activation = activation
def build(self, input_shape):
self.W = xavier_init([self.inputs, self.outputs])
self.b = xavier_init([1, self.outputs])
def call(self, inputs):
return self.activation(tf.add(tf.matmul(inputs, self.W), self.b))
def init_model(num_inputs, num_outputs, num_hidden_layers, num_neurons_per_layer, mean, variance):
layer_sizes = [num_inputs] + [num_neurons_per_layer] * num_hidden_layers + [num_outputs]
layers = [Linear(layer_sizes[0], layer_sizes[1])] + \
[ImprovedLinear(layer_sizes[i], layer_sizes[i + 1]) for i in range(1, len(layer_sizes) - 2)] + \
[Linear(layer_sizes[-2], layer_sizes[-1], activation=tf.identity)]
encoder_1 = Encoder(num_inputs, num_neurons_per_layer)
encoder_2 = Encoder(num_inputs, num_neurons_per_layer)
inputs = tf.keras.Input(num_inputs)
outputs = tf.keras.layers.Normalization(axis=-1, mean=mean, variance=variance)(inputs)
E1 = encoder_1(outputs)
E2 = encoder_2(outputs)
for i in range(len(layers) - 1):
if i == 0:
outputs = layers[i](outputs)
else:
outputs = layers[i](outputs, E1, E2)
outputs = layers[-1](outputs)
model = tf.keras.Model(inputs=inputs, outputs=outputs)
return model
if __name__ == '__main__':
from Sampling import Cuboid, Equidistant
tf.random.set_seed(1)
txs = Cuboid([0, 0], [1, 1]).pick(9, Equidistant())
model = init_model(2, 1, 4, 50, mean=-1, variance=1)
model.summary()
print(model(txs))
Classes
class Optimizer (initial_learning_rate=0.001, decay_steps=1000, decay_rate=0.9)
-
Optimizer for the neural network used in a solver object. Acts as a wrapper for the Adam optimizer with an exponential learning rate decay.
Parameters
initial_learning_rate
:float
- Initial learning rate of the optimizer. Defaults to 1e-3.
decay_steps
:int
- Number of iterations after which the learning rate has decayed by the specified factor. Defaults to 1000.
decay_rate
:float
- Factor by which the learning rate is decayed. Defaults to 0.9.
Examples
>>> from PDESolver import * >>> optimizer = Optimizer(initial_learning_rate=1, decay_steps=500, decay_rate=0.5) >>> solver = Solver(Laplace(), optimizer)
Expand source code
class Optimizer(tf.keras.optimizers.Adam): """""" def __init__(self, initial_learning_rate=1e-3, decay_steps=1000, decay_rate=0.9): """ Optimizer for the neural network used in a solver object. Acts as a wrapper for the Adam optimizer with an exponential learning rate decay. Parameters ----------- initial_learning_rate: float Initial learning rate of the optimizer. Defaults to 1e-3. decay_steps: int Number of iterations after which the learning rate has decayed by the specified factor. Defaults to 1000. decay_rate: float Factor by which the learning rate is decayed. Defaults to 0.9. Examples ----------- >>> from PDESolver import * >>> optimizer = Optimizer(initial_learning_rate=1, decay_steps=500, decay_rate=0.5) >>> solver = Solver(Laplace(), optimizer) """ self.lr_scheduler = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate, decay_steps, decay_rate) super().__init__(learning_rate=self.lr_scheduler) def lr_history_upto(self, iteration): """ Returns the learning rate history up to the specified iteration. Parameters ----------- iteration: int Iteration up to which the learning rate history is returned. Returns ----------- numpy.ndarray: Learning rate history up to the specified iteration. Examples ----------- >>> optimizer = Optimizer() >>> optimizer.lr_history_upto(10) array([0.001 , 0.00099989, 0.00099979, 0.00099968, 0.00099958, 0.00099947, 0.00099937, 0.00099926, 0.00099916, 0.00099905, 0.00099895], dtype=float32)> """ iters = np.arange(0, iteration + 1) return self.lr_scheduler(iters)
Ancestors
- keras.optimizers.optimizer_experimental.adam.Adam
- keras.optimizers.optimizer_experimental.optimizer.Optimizer
- keras.optimizers.optimizer_experimental.optimizer._BaseOptimizer
- tensorflow.python.trackable.autotrackable.AutoTrackable
- tensorflow.python.trackable.base.Trackable
Methods
def lr_history_upto(self, iteration)
-
Returns the learning rate history up to the specified iteration.
Parameters
iteration
:int
- Iteration up to which the learning rate history is returned.
Returns
numpy.ndarray: Learning rate history up to the specified iteration.
Examples
>>> optimizer = Optimizer() >>> optimizer.lr_history_upto(10) array([0.001 , 0.00099989, 0.00099979, 0.00099968, 0.00099958, 0.00099947, 0.00099937, 0.00099926, 0.00099916, 0.00099905, 0.00099895], dtype=float32)>
Expand source code
def lr_history_upto(self, iteration): """ Returns the learning rate history up to the specified iteration. Parameters ----------- iteration: int Iteration up to which the learning rate history is returned. Returns ----------- numpy.ndarray: Learning rate history up to the specified iteration. Examples ----------- >>> optimizer = Optimizer() >>> optimizer.lr_history_upto(10) array([0.001 , 0.00099989, 0.00099979, 0.00099968, 0.00099958, 0.00099947, 0.00099937, 0.00099926, 0.00099916, 0.00099905, 0.00099895], dtype=float32)> """ iters = np.arange(0, iteration + 1) return self.lr_scheduler(iters)
class Solver (bvp, optimizer, num_hidden_layers=4, num_neurons_per_layer=50)
-
Constructor for a boundary value problem solver.
Parameters
bvp
:BoundaryValueProblem
- Boundary value problem to be solved
optimizer
:Optimizer
- Optimizer to be used for training the neural network
num_hidden_layers
:int
- Number of hidden layers in the neural network Defaults to 4
num_neurons_per_layer
:int
- Number of neurons in each hidden layer Defaults to 50
Examples
>>> from PDESolver import * >>> optimizer = Optimizer(initial_learning_rate=1e-3, decay_steps=1000, decay_rate=0.9) >>> bvp = Laplace() >>> solver = Solver(bvp, optimizer, num_hidden_layers=4, num_neurons_per_layer=50)
Expand source code
class Solver: def __init__(self, bvp, optimizer, num_hidden_layers=4, num_neurons_per_layer=50): """ Constructor for a boundary value problem solver. Parameters ----------- bvp: BoundaryValueProblem Boundary value problem to be solved optimizer: Optimizer Optimizer to be used for training the neural network num_hidden_layers: int Number of hidden layers in the neural network Defaults to 4 num_neurons_per_layer: int Number of neurons in each hidden layer Defaults to 50 Examples ----------- >>> from PDESolver import * >>> optimizer = Optimizer(initial_learning_rate=1e-3, decay_steps=1000, decay_rate=0.9) >>> bvp = Laplace() >>> solver = Solver(bvp, optimizer, num_hidden_layers=4, num_neurons_per_layer=50) """ num_inputs = len(bvp.get_specification()['variables']) num_outputs = len(bvp.get_specification()['components']) inner_constraint = [condition for condition in bvp.get_conditions() if condition.name == 'inner'][0] mean, variance = inner_constraint.get_normalization_constants() self.model = init_model(num_inputs, num_outputs, num_hidden_layers, num_neurons_per_layer, mean, variance) self.bvp = bvp self.optimizer = optimizer self.loss_history = [] self.weight_history = [{cond.name: 1. for cond in bvp.get_conditions()}] self.weights = tf.Variable(np.ones(len(bvp.get_conditions())), dtype=tf.float32) self.step = tf.Variable(0) def compute_differentials(self, samplePoints): """ Calculates the differentials of the model at the given points. Parameters ----------- samplePoints: tensor Tensor of points to calculate the differentials at Returns ----------- dict: Dictionary containing the differentials of the model needed for the bvp at the given points Examples ----------- >>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> samples = tf.constant([[0, 0], [0, 0.5], [0, 1]]) >>> solver.compute_differentials(samples) {'x': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[0.], [0.], [0.]], dtype=float32)>, 'y': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[0. ], [0.5], [1. ]], dtype=float32)>, 'u': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[-1.4298753 ], [-1.0829226 ], [-0.86016774]], dtype=float32)>, 'u_x': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[0.7101533 ], [0.7047943 ], [0.45764494]], dtype=float32)>, 'u_xx': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[-1.6525286], [-1.6283079], [-1.2708694]], dtype=float32)>, 'u_y': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[0.71874154], [0.6189531 ], [0.2856549 ]], dtype=float32)>, 'u_yy': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[-0.01344297], [-0.5869958 ], [-0.47277603]], dtype=float32)>} """ specs = self.bvp.get_specification() stack = lambda *tensors: tf.concat(tensors, axis=1) component_funs = {component: lambda x, index=i: self.model(x)[:, index] for i, component in enumerate(specs["components"])} gradient_dict = {} with tf.GradientTape(persistent=True) as tape: for i, variable in enumerate(specs["variables"]): gradient_dict[variable] = samplePoints[:, i:i + 1] tape.watch(gradient_dict[variable]) watched = [gradient_dict[variable] for variable in specs["variables"]] for component in specs["components"]: output = component_funs[component](tf.stack(watched, axis=1)) gradient_dict[component] = tf.reshape(output, (-1, 1)) for differential in specs["differentials"]: component = differential.split("_")[0] variables = differential.split("_")[1] differential_head = component for i, variable in enumerate(variables): if i == 0: differential_head_new = differential_head + "_" + variable else: differential_head_new = differential_head + variable if not differential_head_new in gradient_dict: gradient_dict[differential_head_new] = tape.gradient(gradient_dict[differential_head], gradient_dict[variable]) differential_head = differential_head_new for (stacked_component_name, components) in specs["stacked_components"].items(): stacked = stack(*[gradient_dict[component] for component in components]) gradient_dict[stacked_component_name] = stacked return gradient_dict def compute_residuals(self): """ Gets the residuals for the boundary value problem. Returns ----------- dict: Dictionary of residuals of form {condition_name: residual} Examples ----------- >>> from PDESolver import * >>> solver = Solver(Laplace(minibatch_size=3), Optimizer()) >>> solver.compute_residuals() {'zero_boundary': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[1.7787822], [2.6922252], [1.3558891]], dtype=float32)>, 'f_boundary': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[2.4186134], [2.4582624], [2.4142814]], dtype=float32)>, 'inner': <tf.Tensor: shape=(4, 1), dtype=float32, numpy= array([[ 0.53346014], [-4.9377484 ], [ 0.21452093], [-1.6314204 ]], dtype=float32)>} """ conditions = self.bvp.get_conditions() residuals = {} for condition in conditions: samples = condition.sample_points() Du = self.compute_differentials(samples) residuals[condition.name] = condition.residue_fn(Du) return residuals def compute_losses(self): """ Computes the losses for the neural network. Returns ----------- dict: Dictionary of losses of form {condition_name: loss} Examples ----------- >>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> solver.compute_losses() {'zero_boundary': <tf.Tensor: shape=(), dtype=float32, numpy=0.29857153>, 'f_boundary': <tf.Tensor: shape=(), dtype=float32, numpy=0.07588613>, 'inner': <tf.Tensor: shape=(), dtype=float32, numpy=6.3834033>, 'L2': <tf.Tensor: shape=(), dtype=float32, numpy=6.757861>, 'total': <tf.Tensor: shape=(), dtype=float32, numpy=13.515722>} """ criterion = tf.keras.losses.MeanSquaredError() residuals = self.compute_residuals() losses = {name: 0.0 for name in residuals.keys()} l2loss = 0.0 for i, (name, residual) in enumerate(residuals.items()): losses[name] += self.weights[i] * criterion(residual, 0.0) l2loss += criterion(residual, 0.0) losses['total'] = sum(losses.values()) losses['L2'] = l2loss return losses def compute_gradients(self): """ Gets the gradients of the neural network. Returns ----------- tensor: L2 loss dict: Dictionary of gradients of form {condition_name: gradient} Examples ----------- >>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> solver.compute_gradients() (<tf.Tensor: shape=(), dtype=float32, numpy=4.756295>, LARGE DICTIONARY) """ with tf.GradientTape(persistent=True) as tape: tape.watch(self.model.trainable_variables) losses = self.compute_losses() kwargs = {'sources': self.model.trainable_variables, 'unconnected_gradients': tf.UnconnectedGradients.ZERO} grads = {name: tape.gradient(loss, **kwargs) for name, loss in losses.items() if name != 'L2'} return losses['L2'], grads def adjust_weights(self, gradients): """ Adjusts the weights of the PDE and data losses. Parameters ----------- gradients: dict Dictionary of gradients as returned by compute_gradients() Returns ----------- dict: Dictionary of adjusted condition weights of form {condition_name: weight} Examples ----------- >>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> l2, gradients = solver.compute_gradients() >>> solver.adjust_weights(gradients) {'zero_boundary': <tf.Tensor: shape=(), dtype=float32, numpy=38.457893>, 'f_boundary': <tf.Tensor: shape=(), dtype=float32, numpy=22.964037>, 'inner': <tf.Tensor: shape=(), dtype=float32, numpy=1.0>} """ gradient_vectors = [tf.concat([tf.reshape(gradient, [-1]) for gradient in gradients_list], axis=0) for gradients_list in gradients.values()][:-1] # excluding 'totalgrad' variances = [tf.math.reduce_variance(gradient) for gradient in gradient_vectors] most_varying = tf.math.argmax(variances) most_varying_absmax = tf.math.reduce_max(tf.abs(tf.gather(gradient_vectors, most_varying))) new_weights = {} minimal = tf.float32.max for i, cond in enumerate(self.bvp.get_conditions()): name = cond.name if i != most_varying: new_weight = most_varying_absmax / (tf.reduce_mean(tf.abs(gradient_vectors[i]))) new_weight = 0.9 * self.weights[i] + 0.1 * new_weight else: new_weight = self.weights[i] new_weights[name] = new_weight minimal = tf.math.minimum(minimal, new_weights[name]) new_weights = {name: new_weights[name] / minimal for name in new_weights.keys()} self.weights.assign([new_weight for new_weight in new_weights.values()]) return new_weights @tf.function def train_step(self): """ Performs a single training step. Returns ----------- tensor: L2 loss dict: Dictionary of gradients of form {condition_name: gradient} dict: Dictionary of adjusted condition weights of form {condition_name: weight} Examples ----------- >>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> l2loss, gradients, new_weights = solver.train_step() >>> l2loss <tf.Tensor: shape=(), dtype=float32, numpy=4.371607> >>> l2loss, gradients, new_weights = solver.train_step() >>> l2loss <tf.Tensor: shape=(), dtype=float32, numpy=3.0963678> >>> l2loss, gradients, new_weights = solver.train_step() >>> l2loss <tf.Tensor: shape=(), dtype=float32, numpy=3.029224> """ l2loss, gradients = self.compute_gradients() if self.step % 10 == 0: new_weights = self.adjust_weights(gradients) else: new_weights = {cond.name: -1. for cond in self.bvp.get_conditions()} self.optimizer.apply_gradients(zip(gradients['total'], self.model.trainable_variables)) self.step.assign(self.step + 1) return l2loss, gradients, new_weights def train(self, iterations=10000, debug_frequency=2500): """ Trains the neural network to solve the boundary value problem. Parameters ----------- iterations: int (default=10000) Number of iterations to train for debug_frequency: int (default=2500) Frequency (every X iterations) at which to show debug panel. If negative, no debug panel is shown Examples ----------- >>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> solver.train(iterations=10000, debug_frequency=2500) øL²-loss = 6.044e+01 (best: 2.305e+01, 000064it ago) lr = 0.00090: 3%|█ | 1271/40000 [00:33<07:53, 81.76it/s] """ best_loss = np.inf iterations_since_last_improvement = 0 k_max = int(np.ceil(np.log10(iterations))) + 1 pbar = tqdm(range(iterations), desc='Pending...') for i in pbar: l2loss, gradients, new_weights = self.train_step() self.loss_history += [l2loss.numpy()] if l2loss.numpy() < best_loss: best_loss = l2loss.numpy() iterations_since_last_improvement = 0 else: iterations_since_last_improvement += 1 if list(new_weights.values())[0] != -1: self.weight_history += [{name: new_weights[name].numpy() for name in new_weights.keys()}] avg_loss = np.mean(self.loss_history[-100:]) pbar.desc = f'øL²-loss = {avg_loss:.3e} (best: {best_loss:.3e}, {iterations_since_last_improvement:0{k_max}d}it ago) lr = {self.optimizer.lr.numpy():.5f}' if debug_frequency > 0 and (i % debug_frequency == 0 or i == iterations - 1): self.show_debugplot(gradients) def show_debugplot(self, gradients): """ Shows a debug plot of the neural network. Parameters ----------- gradients: tuple Tuple of gradients of the PDE and data losses obtained from compute_gradients Examples ----------- >>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> l2loss, gradients = solver.compute_gradients() >>> solver.loss_history += [l2loss.numpy()] >>> solver.show_debugplot(gradients) """ fig = plt.figure(figsize=(16, 8), layout='compressed') subfigs = fig.subfigures(2, 1, hspace=0) axs = subfigs[0].subplots(1, len(self.model.layers) - 4) subfigs[0].suptitle('Gradient Distributions') ax_count = -1 for j in range(len(self.model.layers)): if j in [0, 1, 3, 4]: continue else: ax_count += 1 layer_gradient = lambda name: tf.concat([tf.reshape(gradient, [-1]) for gradient in gradients[name][2*(j-2):2*(j-1)]], axis=0) density = lambda i, x: gaussian_kde(layer_gradient(i).numpy())(x) xs = np.linspace(-2.5, 2.5, 1000) for cond in self.bvp.get_conditions(): axs[ax_count].plot(xs, density(cond.name, xs), lw=0.5, label=cond.name) axs[ax_count].set_xlim(min(xs), max(xs)) axs[ax_count].set_ylim(0, 100) axs[ax_count].set_yscale('symlog') axs[ax_count].set_title(self.model.layers[j].name) axs[-1].legend() n = len(self.loss_history) averaged_loss = [np.mean(self.loss_history[:k+1] if k < 99 else self.loss_history[k-99:k+1]) for k in range(n)] best_loss = np.min(self.loss_history) axs = subfigs[1].subplots(1, 2) trans = blended_transform_factory(axs[0].transAxes, axs[0].transData) loss_handle, = axs[0].semilogy(range(n), self.loss_history, 'k-', lw=0.5, alpha=0.5, label='Loss') avg_loss_handle, = axs[0].semilogy(range(n), averaged_loss, 'r--', lw=1, label=f'$ø_{{{min(100, n)}}}$ Loss') axs[0].axhline(best_loss, color='g', lw=0.5) axs[0].text(0.995, 0.97 * best_loss, f'Best: {best_loss:.3e}', ha='right', va='top', color='g', transform=trans) ax = axs[0].twinx() ax.set_yscale('log') lr_handle, = ax.plot(self.optimizer.lr_history_upto(n), 'b-', lw=0.5, label='Learning rate') axs[0].set_title('Loss (left) and learning rate (right) history') axs[0].set_xlabel('Iteration') axs[0].set_xlim(0, n - (1 if n > 1 else 0)) if n > 1: handles = [loss_handle, avg_loss_handle, lr_handle] axs[0].legend(handles=handles, loc='lower left') for cond in self.bvp.get_conditions(): cond_weight_history = [self.weight_history[k][cond.name] for k in range(len(self.weight_history))] axs[1].plot(cond_weight_history, lw=0.5) axs[1].set_title('Weight History') axs[1].set_xlabel('Update #') axs[1].set_xlim(0, len(self.weight_history) - 1) axs[1].set_yscale('log') if os.name == 'nt': figManager = plt.get_current_fig_manager() figManager.window.state("zoomed") plt.show()
Methods
def adjust_weights(self, gradients)
-
Adjusts the weights of the PDE and data losses.
Parameters
gradients
:dict
- Dictionary of gradients as returned by compute_gradients()
Returns
dict
:Dictionary
ofadjusted condition weights
ofform {condition_name: weight}
Examples
>>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> l2, gradients = solver.compute_gradients() >>> solver.adjust_weights(gradients) {'zero_boundary': <tf.Tensor: shape=(), dtype=float32, numpy=38.457893>, 'f_boundary': <tf.Tensor: shape=(), dtype=float32, numpy=22.964037>, 'inner': <tf.Tensor: shape=(), dtype=float32, numpy=1.0>}
Expand source code
def adjust_weights(self, gradients): """ Adjusts the weights of the PDE and data losses. Parameters ----------- gradients: dict Dictionary of gradients as returned by compute_gradients() Returns ----------- dict: Dictionary of adjusted condition weights of form {condition_name: weight} Examples ----------- >>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> l2, gradients = solver.compute_gradients() >>> solver.adjust_weights(gradients) {'zero_boundary': <tf.Tensor: shape=(), dtype=float32, numpy=38.457893>, 'f_boundary': <tf.Tensor: shape=(), dtype=float32, numpy=22.964037>, 'inner': <tf.Tensor: shape=(), dtype=float32, numpy=1.0>} """ gradient_vectors = [tf.concat([tf.reshape(gradient, [-1]) for gradient in gradients_list], axis=0) for gradients_list in gradients.values()][:-1] # excluding 'totalgrad' variances = [tf.math.reduce_variance(gradient) for gradient in gradient_vectors] most_varying = tf.math.argmax(variances) most_varying_absmax = tf.math.reduce_max(tf.abs(tf.gather(gradient_vectors, most_varying))) new_weights = {} minimal = tf.float32.max for i, cond in enumerate(self.bvp.get_conditions()): name = cond.name if i != most_varying: new_weight = most_varying_absmax / (tf.reduce_mean(tf.abs(gradient_vectors[i]))) new_weight = 0.9 * self.weights[i] + 0.1 * new_weight else: new_weight = self.weights[i] new_weights[name] = new_weight minimal = tf.math.minimum(minimal, new_weights[name]) new_weights = {name: new_weights[name] / minimal for name in new_weights.keys()} self.weights.assign([new_weight for new_weight in new_weights.values()]) return new_weights
def compute_differentials(self, samplePoints)
-
Calculates the differentials of the model at the given points.
Parameters
samplePoints
:tensor
- Tensor of points to calculate the differentials at
Returns
dict
:Dictionary containing the differentials
ofthe model needed for the bvp at the given points
Examples
>>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> samples = tf.constant([[0, 0], [0, 0.5], [0, 1]]) >>> solver.compute_differentials(samples) {'x': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[0.], [0.], [0.]], dtype=float32)>, 'y': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[0. ], [0.5], [1. ]], dtype=float32)>, 'u': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[-1.4298753 ], [-1.0829226 ], [-0.86016774]], dtype=float32)>, 'u_x': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[0.7101533 ], [0.7047943 ], [0.45764494]], dtype=float32)>, 'u_xx': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[-1.6525286], [-1.6283079], [-1.2708694]], dtype=float32)>, 'u_y': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[0.71874154], [0.6189531 ], [0.2856549 ]], dtype=float32)>, 'u_yy': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[-0.01344297], [-0.5869958 ], [-0.47277603]], dtype=float32)>}
Expand source code
def compute_differentials(self, samplePoints): """ Calculates the differentials of the model at the given points. Parameters ----------- samplePoints: tensor Tensor of points to calculate the differentials at Returns ----------- dict: Dictionary containing the differentials of the model needed for the bvp at the given points Examples ----------- >>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> samples = tf.constant([[0, 0], [0, 0.5], [0, 1]]) >>> solver.compute_differentials(samples) {'x': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[0.], [0.], [0.]], dtype=float32)>, 'y': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[0. ], [0.5], [1. ]], dtype=float32)>, 'u': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[-1.4298753 ], [-1.0829226 ], [-0.86016774]], dtype=float32)>, 'u_x': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[0.7101533 ], [0.7047943 ], [0.45764494]], dtype=float32)>, 'u_xx': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[-1.6525286], [-1.6283079], [-1.2708694]], dtype=float32)>, 'u_y': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[0.71874154], [0.6189531 ], [0.2856549 ]], dtype=float32)>, 'u_yy': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[-0.01344297], [-0.5869958 ], [-0.47277603]], dtype=float32)>} """ specs = self.bvp.get_specification() stack = lambda *tensors: tf.concat(tensors, axis=1) component_funs = {component: lambda x, index=i: self.model(x)[:, index] for i, component in enumerate(specs["components"])} gradient_dict = {} with tf.GradientTape(persistent=True) as tape: for i, variable in enumerate(specs["variables"]): gradient_dict[variable] = samplePoints[:, i:i + 1] tape.watch(gradient_dict[variable]) watched = [gradient_dict[variable] for variable in specs["variables"]] for component in specs["components"]: output = component_funs[component](tf.stack(watched, axis=1)) gradient_dict[component] = tf.reshape(output, (-1, 1)) for differential in specs["differentials"]: component = differential.split("_")[0] variables = differential.split("_")[1] differential_head = component for i, variable in enumerate(variables): if i == 0: differential_head_new = differential_head + "_" + variable else: differential_head_new = differential_head + variable if not differential_head_new in gradient_dict: gradient_dict[differential_head_new] = tape.gradient(gradient_dict[differential_head], gradient_dict[variable]) differential_head = differential_head_new for (stacked_component_name, components) in specs["stacked_components"].items(): stacked = stack(*[gradient_dict[component] for component in components]) gradient_dict[stacked_component_name] = stacked return gradient_dict
def compute_gradients(self)
-
Gets the gradients of the neural network.
Returns
tensor
:L2 loss
dict
:Dictionary
ofgradients
ofform {condition_name: gradient}
Examples
>>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> solver.compute_gradients() (<tf.Tensor: shape=(), dtype=float32, numpy=4.756295>, LARGE DICTIONARY)
Expand source code
def compute_gradients(self): """ Gets the gradients of the neural network. Returns ----------- tensor: L2 loss dict: Dictionary of gradients of form {condition_name: gradient} Examples ----------- >>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> solver.compute_gradients() (<tf.Tensor: shape=(), dtype=float32, numpy=4.756295>, LARGE DICTIONARY) """ with tf.GradientTape(persistent=True) as tape: tape.watch(self.model.trainable_variables) losses = self.compute_losses() kwargs = {'sources': self.model.trainable_variables, 'unconnected_gradients': tf.UnconnectedGradients.ZERO} grads = {name: tape.gradient(loss, **kwargs) for name, loss in losses.items() if name != 'L2'} return losses['L2'], grads
def compute_losses(self)
-
Computes the losses for the neural network.
Returns
dict
:Dictionary
oflosses
ofform {condition_name: loss}
Examples
>>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> solver.compute_losses() {'zero_boundary': <tf.Tensor: shape=(), dtype=float32, numpy=0.29857153>, 'f_boundary': <tf.Tensor: shape=(), dtype=float32, numpy=0.07588613>, 'inner': <tf.Tensor: shape=(), dtype=float32, numpy=6.3834033>, 'L2': <tf.Tensor: shape=(), dtype=float32, numpy=6.757861>, 'total': <tf.Tensor: shape=(), dtype=float32, numpy=13.515722>}
Expand source code
def compute_losses(self): """ Computes the losses for the neural network. Returns ----------- dict: Dictionary of losses of form {condition_name: loss} Examples ----------- >>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> solver.compute_losses() {'zero_boundary': <tf.Tensor: shape=(), dtype=float32, numpy=0.29857153>, 'f_boundary': <tf.Tensor: shape=(), dtype=float32, numpy=0.07588613>, 'inner': <tf.Tensor: shape=(), dtype=float32, numpy=6.3834033>, 'L2': <tf.Tensor: shape=(), dtype=float32, numpy=6.757861>, 'total': <tf.Tensor: shape=(), dtype=float32, numpy=13.515722>} """ criterion = tf.keras.losses.MeanSquaredError() residuals = self.compute_residuals() losses = {name: 0.0 for name in residuals.keys()} l2loss = 0.0 for i, (name, residual) in enumerate(residuals.items()): losses[name] += self.weights[i] * criterion(residual, 0.0) l2loss += criterion(residual, 0.0) losses['total'] = sum(losses.values()) losses['L2'] = l2loss return losses
def compute_residuals(self)
-
Gets the residuals for the boundary value problem.
Returns
dict
:Dictionary
ofresiduals
ofform {condition_name: residual}
Examples
>>> from PDESolver import * >>> solver = Solver(Laplace(minibatch_size=3), Optimizer()) >>> solver.compute_residuals() {'zero_boundary': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[1.7787822], [2.6922252], [1.3558891]], dtype=float32)>, 'f_boundary': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[2.4186134], [2.4582624], [2.4142814]], dtype=float32)>, 'inner': <tf.Tensor: shape=(4, 1), dtype=float32, numpy= array([[ 0.53346014], [-4.9377484 ], [ 0.21452093], [-1.6314204 ]], dtype=float32)>}
Expand source code
def compute_residuals(self): """ Gets the residuals for the boundary value problem. Returns ----------- dict: Dictionary of residuals of form {condition_name: residual} Examples ----------- >>> from PDESolver import * >>> solver = Solver(Laplace(minibatch_size=3), Optimizer()) >>> solver.compute_residuals() {'zero_boundary': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[1.7787822], [2.6922252], [1.3558891]], dtype=float32)>, 'f_boundary': <tf.Tensor: shape=(3, 1), dtype=float32, numpy= array([[2.4186134], [2.4582624], [2.4142814]], dtype=float32)>, 'inner': <tf.Tensor: shape=(4, 1), dtype=float32, numpy= array([[ 0.53346014], [-4.9377484 ], [ 0.21452093], [-1.6314204 ]], dtype=float32)>} """ conditions = self.bvp.get_conditions() residuals = {} for condition in conditions: samples = condition.sample_points() Du = self.compute_differentials(samples) residuals[condition.name] = condition.residue_fn(Du) return residuals
def show_debugplot(self, gradients)
-
Shows a debug plot of the neural network.
Parameters
gradients
:tuple
- Tuple of gradients of the PDE and data losses obtained from compute_gradients
Examples
>>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> l2loss, gradients = solver.compute_gradients() >>> solver.loss_history += [l2loss.numpy()] >>> solver.show_debugplot(gradients)
Expand source code
def show_debugplot(self, gradients): """ Shows a debug plot of the neural network. Parameters ----------- gradients: tuple Tuple of gradients of the PDE and data losses obtained from compute_gradients Examples ----------- >>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> l2loss, gradients = solver.compute_gradients() >>> solver.loss_history += [l2loss.numpy()] >>> solver.show_debugplot(gradients) """ fig = plt.figure(figsize=(16, 8), layout='compressed') subfigs = fig.subfigures(2, 1, hspace=0) axs = subfigs[0].subplots(1, len(self.model.layers) - 4) subfigs[0].suptitle('Gradient Distributions') ax_count = -1 for j in range(len(self.model.layers)): if j in [0, 1, 3, 4]: continue else: ax_count += 1 layer_gradient = lambda name: tf.concat([tf.reshape(gradient, [-1]) for gradient in gradients[name][2*(j-2):2*(j-1)]], axis=0) density = lambda i, x: gaussian_kde(layer_gradient(i).numpy())(x) xs = np.linspace(-2.5, 2.5, 1000) for cond in self.bvp.get_conditions(): axs[ax_count].plot(xs, density(cond.name, xs), lw=0.5, label=cond.name) axs[ax_count].set_xlim(min(xs), max(xs)) axs[ax_count].set_ylim(0, 100) axs[ax_count].set_yscale('symlog') axs[ax_count].set_title(self.model.layers[j].name) axs[-1].legend() n = len(self.loss_history) averaged_loss = [np.mean(self.loss_history[:k+1] if k < 99 else self.loss_history[k-99:k+1]) for k in range(n)] best_loss = np.min(self.loss_history) axs = subfigs[1].subplots(1, 2) trans = blended_transform_factory(axs[0].transAxes, axs[0].transData) loss_handle, = axs[0].semilogy(range(n), self.loss_history, 'k-', lw=0.5, alpha=0.5, label='Loss') avg_loss_handle, = axs[0].semilogy(range(n), averaged_loss, 'r--', lw=1, label=f'$ø_{{{min(100, n)}}}$ Loss') axs[0].axhline(best_loss, color='g', lw=0.5) axs[0].text(0.995, 0.97 * best_loss, f'Best: {best_loss:.3e}', ha='right', va='top', color='g', transform=trans) ax = axs[0].twinx() ax.set_yscale('log') lr_handle, = ax.plot(self.optimizer.lr_history_upto(n), 'b-', lw=0.5, label='Learning rate') axs[0].set_title('Loss (left) and learning rate (right) history') axs[0].set_xlabel('Iteration') axs[0].set_xlim(0, n - (1 if n > 1 else 0)) if n > 1: handles = [loss_handle, avg_loss_handle, lr_handle] axs[0].legend(handles=handles, loc='lower left') for cond in self.bvp.get_conditions(): cond_weight_history = [self.weight_history[k][cond.name] for k in range(len(self.weight_history))] axs[1].plot(cond_weight_history, lw=0.5) axs[1].set_title('Weight History') axs[1].set_xlabel('Update #') axs[1].set_xlim(0, len(self.weight_history) - 1) axs[1].set_yscale('log') if os.name == 'nt': figManager = plt.get_current_fig_manager() figManager.window.state("zoomed") plt.show()
def train(self, iterations=10000, debug_frequency=2500)
-
Trains the neural network to solve the boundary value problem.
Parameters
iterations
:int (default=10000)
- Number of iterations to train for
debug_frequency
:int (default=2500)
- Frequency (every X iterations) at which to show debug panel. If negative, no debug panel is shown
Examples
>>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> solver.train(iterations=10000, debug_frequency=2500) øL²-loss = 6.044e+01 (best: 2.305e+01, 000064it ago) lr = 0.00090: 3%|█ | 1271/40000 [00:33<07:53, 81.76it/s]
Expand source code
def train(self, iterations=10000, debug_frequency=2500): """ Trains the neural network to solve the boundary value problem. Parameters ----------- iterations: int (default=10000) Number of iterations to train for debug_frequency: int (default=2500) Frequency (every X iterations) at which to show debug panel. If negative, no debug panel is shown Examples ----------- >>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> solver.train(iterations=10000, debug_frequency=2500) øL²-loss = 6.044e+01 (best: 2.305e+01, 000064it ago) lr = 0.00090: 3%|█ | 1271/40000 [00:33<07:53, 81.76it/s] """ best_loss = np.inf iterations_since_last_improvement = 0 k_max = int(np.ceil(np.log10(iterations))) + 1 pbar = tqdm(range(iterations), desc='Pending...') for i in pbar: l2loss, gradients, new_weights = self.train_step() self.loss_history += [l2loss.numpy()] if l2loss.numpy() < best_loss: best_loss = l2loss.numpy() iterations_since_last_improvement = 0 else: iterations_since_last_improvement += 1 if list(new_weights.values())[0] != -1: self.weight_history += [{name: new_weights[name].numpy() for name in new_weights.keys()}] avg_loss = np.mean(self.loss_history[-100:]) pbar.desc = f'øL²-loss = {avg_loss:.3e} (best: {best_loss:.3e}, {iterations_since_last_improvement:0{k_max}d}it ago) lr = {self.optimizer.lr.numpy():.5f}' if debug_frequency > 0 and (i % debug_frequency == 0 or i == iterations - 1): self.show_debugplot(gradients)
def train_step(self)
-
Performs a single training step.
Returns
tensor
:L2 loss
dict
:Dictionary
ofgradients
ofform {condition_name: gradient}
dict
:Dictionary
ofadjusted condition weights
ofform {condition_name: weight}
Examples
>>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> l2loss, gradients, new_weights = solver.train_step() >>> l2loss <tf.Tensor: shape=(), dtype=float32, numpy=4.371607> >>> l2loss, gradients, new_weights = solver.train_step() >>> l2loss <tf.Tensor: shape=(), dtype=float32, numpy=3.0963678> >>> l2loss, gradients, new_weights = solver.train_step() >>> l2loss <tf.Tensor: shape=(), dtype=float32, numpy=3.029224>
Expand source code
@tf.function def train_step(self): """ Performs a single training step. Returns ----------- tensor: L2 loss dict: Dictionary of gradients of form {condition_name: gradient} dict: Dictionary of adjusted condition weights of form {condition_name: weight} Examples ----------- >>> from PDESolver import * >>> solver = Solver(Laplace(), Optimizer()) >>> l2loss, gradients, new_weights = solver.train_step() >>> l2loss <tf.Tensor: shape=(), dtype=float32, numpy=4.371607> >>> l2loss, gradients, new_weights = solver.train_step() >>> l2loss <tf.Tensor: shape=(), dtype=float32, numpy=3.0963678> >>> l2loss, gradients, new_weights = solver.train_step() >>> l2loss <tf.Tensor: shape=(), dtype=float32, numpy=3.029224> """ l2loss, gradients = self.compute_gradients() if self.step % 10 == 0: new_weights = self.adjust_weights(gradients) else: new_weights = {cond.name: -1. for cond in self.bvp.get_conditions()} self.optimizer.apply_gradients(zip(gradients['total'], self.model.trainable_variables)) self.step.assign(self.step + 1) return l2loss, gradients, new_weights