# Copyright (c) 2016-2018, The University of Texas at Austin
# & University of California--Merced.
# Copyright (c) 2019-2020, The University of Texas at Austin
# University of California--Merced, Washington University in St. Louis.
#
# All Rights reserved.
# See file COPYRIGHT for details.
#
# This file is part of the hIPPYlib library. For more information and source code
# availability see https://hippylib.github.io.
#
# hIPPYlib is free software; you can redistribute it and/or modify it under the
# terms of the GNU General Public License (as published by the Free
# Software Foundation) version 2.0 dated June 1991.
from ..modeling.variables import STATE, PARAMETER
from .tracers import NullTracer
[docs]class NullQoi(object):
def __init__(self):
pass
[docs] def eval(self,x):
return 0.
[docs]class SampleStruct:
def __init__(self, kernel):
self.derivative_info = kernel.derivativeInfo()
self.u = kernel.model.generate_vector(STATE)
self.m = kernel.model.generate_vector(PARAMETER)
self.cost = 0
if self.derivative_info >= 1:
self.p = kernel.model.generate_vector(STATE)
self.g = kernel.model.generate_vector(PARAMETER)
self.Cg = kernel.model.generate_vector(PARAMETER)
else:
self.p = None
self.g = None
[docs] def assign(self, other):
assert self.derivative_info == other.derivative_info
self.cost = other.cost
self.m = other.m.copy()
self.u = other.u.copy()
if self.derivative_info >= 1:
self.g = other.g.copy()
self.p = other.p.copy()
self.Cg = other.Cg.copy()
[docs]class MCMC(object):
def __init__(self, kernel):
self.kernel = kernel
self.parameters = {}
self.parameters["number_of_samples"] = 2000
self.parameters["burn_in"] = 1000
self.parameters["print_progress"] = 20
self.parameters["print_level"] = 1
self.sum_q = 0.
self.sum_q2 = 0.
[docs] def run(self, m0, qoi=None, tracer = None):
if qoi is None:
qoi = NullQoi()
if tracer is None:
tracer = NullTracer()
number_of_samples = self.parameters["number_of_samples"]
burn_in = self.parameters["burn_in"]
current = SampleStruct(self.kernel)
proposed = SampleStruct(self.kernel)
current.m.zero()
current.m.axpy(1., m0)
self.kernel.init_sample(current)
if self.parameters["print_level"] > 0:
print( "Burn {0} samples".format(burn_in) )
sample_count = 0
naccept = 0
n_check = burn_in // self.parameters["print_progress"]
while (sample_count < burn_in):
naccept +=self.kernel.sample(current, proposed)
sample_count += 1
if sample_count % n_check == 0 and self.parameters["print_level"] > 0:
print( "{0:2.1f} % completed, Acceptance ratio {1:2.1f} %".format(float(sample_count)/float(burn_in)*100,
float(naccept)/float(sample_count)*100 ) )
if self.parameters["print_level"] > 0:
print( "Generate {0} samples".format(number_of_samples) )
sample_count = 0
naccept = 0
n_check = number_of_samples // self.parameters["print_progress"]
while (sample_count < number_of_samples):
naccept +=self.kernel.sample(current, proposed)
q = qoi.eval([current.u, current.m])
self.sum_q += q
self.sum_q2 += q*q
tracer.append(current, q)
sample_count += 1
if sample_count % n_check == 0 and self.parameters["print_level"] > 0:
print( "{0:2.1f} % completed, Acceptance ratio {1:2.1f} %".format(float(sample_count)/float(number_of_samples)*100,
float(naccept)/float(sample_count)*100 ) )
return naccept
[docs] def consume_random(self):
number_of_samples = self.parameters["number_of_samples"]
burn_in = self.parameters["burn_in"]
for ii in range(number_of_samples+burn_in):
self.kernel.consume_random()