Source code for hippylib.modeling.misfit

# Copyright (c) 2016-2018, The University of Texas at Austin 
# & University of California--Merced.
# Copyright (c) 2019-2020, The University of Texas at Austin 
# University of California--Merced, Washington University in St. Louis.
#
# All Rights reserved.
# See file COPYRIGHT for details.
#
# This file is part of the hIPPYlib library. For more information and source code
# availability see https://hippylib.github.io.
#
# hIPPYlib is free software; you can redistribute it and/or modify it under the
# terms of the GNU General Public License (as published by the Free
# Software Foundation) version 2.0 dated June 1991.

import dolfin as dl
import ufl
from .pointwiseObservation import assemblePointwiseObservation
from .variables import STATE, PARAMETER
from ..algorithms.linalg import Transpose
from ..utils.deprecate import deprecated
import numpy as np

[docs]class Misfit(object): """ Abstract class to model the misfit component of the cost functional. In the following :code:`x` will denote the variable :code:`[u, m, p]`, denoting respectively the state :code:`u`, the parameter :code:`m`, and the adjoint variable :code:`p`. The methods in the class misfit will usually access the state u and possibly the parameter :code:`m`. The adjoint variables will never be accessed. """
[docs] def cost(self,x): """ Given x evaluate the cost functional. Only the state u and (possibly) the parameter m are accessed. """ raise NotImplementedError("Child class should implement method cost") return 0
[docs] def grad(self, i, x, out): """ Given the state and the paramter in :code:`x`, compute the partial gradient of the misfit functional in with respect to the state (:code:`i == STATE`) or with respect to the parameter (:code:`i == PARAMETER`). """ raise NotImplementedError("Child class should implement method grad")
[docs] def setLinearizationPoint(self,x, gauss_newton_approx=False): """ Set the point for linearization. Inputs: :code:`x=[u, m, p]` - linearization point :code:`gauss_newton_approx (bool)` - whether to use Gauss Newton approximation """ raise NotImplementedError("Child class should implement method setLinearizationPoint")
[docs] def apply_ij(self,i,j, dir, out): """ Apply the second variation :math:`\delta_{ij}` (:code:`i,j = STATE,PARAMETER`) of the cost in direction :code:`dir`. """ raise NotImplementedError("Child class should implement method apply_ij")
[docs]@deprecated(version="3.1", msg="Use B = assemblePointwiseObservation(Vh, obs_points) followed by DiscreteStateObservation(B, Mpar)") def PointwiseStateObservation(Vh, obs_points): """ This function returns an instance of :code:`DiscreteStateObservation` for pointwise state observations at given locations. Inputs: :code:`Vh` is the finite element space for the state variable :code:`obs_points` is a 2D array number of points by geometric dimensions that stores \ the location of the observations. """ B = assemblePointwiseObservation(Vh, obs_points) return DiscreteStateObservation(B)
[docs]class DiscreteStateObservation(Misfit): """ This class define a misfit function for a discrete linear observation operator B """ def __init__(self, B, data=None, noise_variance=None): """ Constructor: :code:`B` is the observation operator :code:`data` is the data :code:`noise_variance` is the variance of the noise """ self.B = B if data is None: self.d = dl.Vector(self.B.mpi_comm()) self.B.init_vector(self.d, 0) else: self.d = data self.Bu = dl.Vector(self.B.mpi_comm()) self.B.init_vector(self.Bu, 0) self.noise_variance = noise_variance
[docs] def cost(self,x): if self.noise_variance is None: raise ValueError("Noise Variance must be specified") elif self.noise_variance == 0: raise ZeroDivisionError("Noise Variance must not be 0.0 Set to 1.0 for deterministic inverse problems") self.B.mult(x[STATE], self.Bu) self.Bu.axpy(-1., self.d) return (.5/self.noise_variance)*self.Bu.inner(self.Bu)
[docs] def grad(self, i, x, out): if self.noise_variance is None: raise ValueError("Noise Variance must be specified") elif self.noise_variance == 0: raise ZeroDivisionError("Noise Variance must not be 0.0 Set to 1.0 for deterministic inverse problems") if i == STATE: self.B.mult(x[STATE], self.Bu) self.Bu.axpy(-1., self.d) self.B.transpmult(self.Bu, out) out *= (1./self.noise_variance) elif i == PARAMETER: out.zero() else: raise IndexError()
[docs] def setLinearizationPoint(self,x, gauss_newton_approx=False): # The cost functional is already quadratic. Nothing to be done here return
[docs] def apply_ij(self,i,j,dir,out): if self.noise_variance is None: raise ValueError("Noise Variance must be specified") elif self.noise_variance == 0: raise ZeroDivisionError("Noise Variance must not be 0.0 Set to 1.0 for deterministic inverse problems") if i == STATE and j == STATE: self.B.mult(dir, self.Bu) self.B.transpmult(self.Bu, out) out *= (1./self.noise_variance) else: out.zero()
[docs]@deprecated(version="3.1", msg="Use B = assemblePointwiseObservation(Vh, obs_points) followed by MultDiscreteStateObservation(B, None, Mpar)") def MultPointwiseStateObservation(Vh, obs_points, Mpar): """ This function returns an instance of :code:`DiscreteStateObservation` for pointwise state observations at given locations. A multiplicative Gamma(M,M) noise model is assumed Inputs: :code:`Vh` is the finite element space for the state variable :code:`data` is the data :code:`obs_points` is a 2D array number of points by geometric dimensions that stores \ the location of the observations. :code:`Mpar` Gamma distribution parameter """ B = assemblePointwiseObservation(Vh, obs_points) return MultDiscreteStateObservation(B, None, Mpar)
[docs]class MultDiscreteStateObservation(Misfit): """ This class implements discrete state observations defined by the linear operator B. A multiplicative Gamma(M,M) noise model is assumed """ def __init__(self, B, data=None, Mpar=1.): """ Constructor: :code:`B` is the observation operator :code:`Mpar` Gamma distribution parameter """ self.B = B if data is None: self.d = dl.Vector(self.B.mpi_comm()) self.B.init_vector(self.d, 0) else: self.d = data self.Bu = dl.Vector(self.B.mpi_comm()) self.B.init_vector(self.Bu, 0) self.Bu_lin = dl.Vector(self.B.mpi_comm()) self.B.init_vector(self.Bu_lin, 0) self.help = dl.Vector(self.B.mpi_comm()) self.B.init_vector(self.help, 0) self.Mpar = Mpar self.ones = dl.Vector(self.B.mpi_comm()) self.B.init_vector(self.ones, 0) self.ones.set_local(np.ones_like(self.ones.get_local())) self.ones.apply("")
[docs] def cost(self,x): self.B.mult(x[STATE], self.Bu) Bu_local = self.Bu.get_local() self.help.set_local( np.log( Bu_local ) + self.d.get_local()/Bu_local ) self.help.apply("") return self.Mpar* self.help.inner(self.ones)
[docs] def grad(self, i, x, out): out.zero() if i == STATE: self.B.mult(x[STATE], self.Bu) Bu_local = self.Bu.get_local() self.help.zero() self.help.set_local( np.ones_like(Bu_local)/Bu_local - self.d.get_local()/(Bu_local*Bu_local) ) self.help.apply("") self.B.transpmult(self.help, out) out *= self.Mpar elif i == PARAMETER: out.zero() else: raise IndexError()
[docs] def setLinearizationPoint(self,x, gauss_newton_approx=False): self.B.mult(x[STATE], self.Bu_lin) # The cost functional is already quadratic. Nothing to be done here return
[docs] def apply_ij(self,i,j,dir,out): out.zero() if i == STATE and j == STATE: self.B.mult(dir, self.Bu) Bdir_local = self.Bu.get_local() Bu_lin_local = self.Bu_lin.get_local() self.help.zero() self.help.set_local( - Bdir_local*np.power(Bu_lin_local, -2) + 2.*self.d.get_local()*Bdir_local*np.power(Bu_lin_local, -3)) self.help.apply("") self.B.transpmult(self.help, out) out *= self.Mpar else: out.zero()
[docs]class ContinuousStateObservation(Misfit): """ This class implements continuous state observations in a subdomain :math:`X \subset \Omega` or :math:`X \subset \partial \Omega`. """ def __init__(self, Vh, dX, bcs, form = None): """ Constructor: :code:`Vh`: the finite element space for the state variable. :code:`dX`: the integrator on subdomain `X` where observation are presents. \ E.g. :code:`dX = ufl.dx` means observation on all :math:`\Omega` and :code:`dX = ufl.ds` means observations on all :math:`\partial \Omega`. :code:`bcs`: If the forward problem imposes Dirichlet boundary conditions :math:`u = u_D \mbox{ on } \Gamma_D`; \ :code:`bcs` is a list of :code:`dolfin.DirichletBC` object that prescribes homogeneuos Dirichlet conditions :math:`u = 0 \mbox{ on } \Gamma_D`. :code:`form`: if :code:`form = None` we compute the :math:`L^2(X)` misfit: :math:`\int_X (u - u_d)^2 dX,` \ otherwise the integrand specified in the given form will be used. """ if form is None: u, v = dl.TrialFunction(Vh), dl.TestFunction(Vh) self.W = dl.assemble(ufl.inner(u,v)*dX) else: self.W = dl.assemble( form ) if bcs is None: bcs = [] if isinstance(bcs, dl.DirichletBC): bcs = [bcs] if len(bcs): Wt = Transpose(self.W) [bc.zero(Wt) for bc in bcs] self.W = Transpose(Wt) [bc.zero(self.W) for bc in bcs] self.d = dl.Vector(self.W.mpi_comm()) self.W.init_vector(self.d,1) self.noise_variance = None
[docs] def cost(self,x): if self.noise_variance is None: raise ValueError("Noise Variance must be specified") elif self.noise_variance == 0: raise ZeroDivisionError("Noise Variance must not be 0.0 Set to 1.0 for deterministic inverse problems") r = self.d.copy() r.axpy(-1., x[STATE]) Wr = dl.Vector(self.W.mpi_comm()) self.W.init_vector(Wr,0) self.W.mult(r,Wr) return r.inner(Wr)/(2.*self.noise_variance)
[docs] def grad(self, i, x, out): if self.noise_variance is None: raise ValueError("Noise Variance must be specified") elif self.noise_variance == 0: raise ZeroDivisionError("Noise Variance must not be 0.0 Set to 1.0 for deterministic inverse problems") if i == STATE: self.W.mult(x[STATE]-self.d, out) out *= (1./self.noise_variance) elif i == PARAMETER: out.zero() else: raise IndexError()
[docs] def setLinearizationPoint(self,x, gauss_newton_approx=False): # The cost functional is already quadratic. Nothing to be done here return
[docs] def apply_ij(self,i,j,dir,out): if self.noise_variance is None: raise ValueError("Noise Variance must be specified") elif self.noise_variance == 0: raise ZeroDivisionError("Noise Variance must not be 0.0 Set to 1.0 for deterministic inverse problems") if i == STATE and j == STATE: self.W.mult(dir, out) out *= (1./self.noise_variance) else: out.zero()
[docs]class MultiStateMisfit(Misfit): def __init__(self, misfits): self.nmisfits = len(misfits) self.misfits = misfits
[docs] def append(self, misfit): self.nmisfits += 1 self.misfits.append(misfit)
[docs] def cost(self,x): u, m, p = x c = 0. for i in range(self.nmisfits): c += self.misfits[i].cost([ u.data[i], m, None ] ) return c
[docs] def grad(self, i, x, out): out.zero() u, m, p = x if i == STATE: for ii in range(self.nmisfits): self.misfits[ii].grad(i, [ u.data[ii], m, None ], out.data[ii] ) else: tmp = out.copy() for ii in range(self.nmisfits): self.misfits[ii].grad(i, [ u.data[ii], m, None ], tmp ) out.axpy(1., tmp)
[docs] def setLinearizationPoint(self,x, gauss_newton_approx=False): u, m, p = x for ii in range(self.nmisfits): self.misfits[ii].setLinearizationPoint([ u.data[ii], m, None ], gauss_newton_approx)
[docs] def apply_ij(self,i,j,dir,out): out.zero() if i == STATE and j == STATE: for s in range(self.nmisfits): self.misfits[s].apply_ij(i,j,dir.data[s],out.data[s]) elif i == STATE and j == PARAMETER: for s in range(self.nmisfits): self.misfits[s].apply_ij(i,j,dir,out.data[s]) elif i == PARAMETER and j == STATE: tmp = out.copy() for s in range(self.nmisfits): self.misfits[s].apply_ij(i,j,dir.data[s],tmp) out.axpy(1., tmp) elif i == PARAMETER and j == PARAMETER: tmp = out.copy() for s in range(self.nmisfits): self.misfits[s].apply_ij(i,j,dir,tmp) out.axpy(1., tmp) else: raise IndexError