"""Freezing String Method driver for reaction pathway construction."""
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional
import numpy as np
from ase import Atoms
from scipy.interpolate import CubicSpline
if TYPE_CHECKING:
from numpy.typing import NDArray
from mlfsm.output import FSMOutput
from mlfsm.coords import Cartesian, Redundant
from mlfsm.geom import (
calculate_arc_length,
distance,
normalize,
project_trans_rot,
project_trans_rot_fixed,
)
from mlfsm.interp import LST, RIC, Linear
from mlfsm.utils import float_check
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
[docs]
class FreezingString:
"""Implements the Freezing String Method (FSM) for transition state guess generation.
The FSM grows two strings from the reactant and product endpoints toward each
other. At each iteration a new frontier node is placed by taking a step along
an interpolated path, optimized perpendicular to the local tangent direction,
and then frozen. The highest-energy node at convergence serves as the TS guess.
Parameters
----------
reactant : ase.Atoms
Reactant geometry with an attached ASE calculator (optional at
construction; required before calling :meth:`optimize`).
product : ase.Atoms
Product geometry. Must have the same atom ordering as ``reactant``.
nnodes_min : int, optional
Nominal number of string nodes used to determine the interpolation
step size. The actual node count may differ slightly depending on
the path arc length. Default is 10.
interp_method : {"ric", "lst", "cart"}
Interpolation scheme used to generate new frontier nodes:
* ``"ric"`` — redundant internal coordinates (recommended).
* ``"lst"`` — linear synchronous transit.
* ``"cart"`` — plain Cartesian linear interpolation.
ninterp : int, optional
Number of points used to discretize the dense interpolated path from
which frontier nodes are selected. Default is 100.
stepsize : float, optional
If > 0, sets the Cartesian step size (Å) explicitly and ignores
``nnodes_min``. The step size is measured along the Cartesian
arc length of the initial linear interpolation. Default is 0.0
(derive step size from ``nnodes_min``).
raise_on_backtransf_fail : bool, optional
If ``True``, raise a RuntimeError if the RIC back transformation fails
to converge during interpolation or node growth. Default is ``True``.
Attributes
----------
r_string, p_string : list[ase.Atoms]
Growing strings from the reactant and product ends.
r_energy, p_energy : list[float or None]
Energies at each node (``None`` until evaluated).
growing : bool
``True`` while the two string ends have not yet converged.
ngrad : int
Total number of gradient evaluations performed so far.
"""
[docs]
def __init__(
self,
reactant: Atoms,
product: Atoms,
nnodes_min: int = 10,
interp_method: str = "ric",
ninterp: int = 100,
stepsize: float = 0.0,
raise_on_backtransf_fail: bool = True,
output: Optional["FSMOutput"] = None,
) -> None:
self.output = output
self.interp: Any
self.interp_method = interp_method
self.nnodes_min = int(nnodes_min)
self.ninterp = int(ninterp)
self.use_cartesian_distance = True if stepsize > 0 else False
self.raise_on_backtransf_fail = raise_on_backtransf_fail
if interp_method == "cart":
self.interp = Linear
elif interp_method == "lst":
self.interp = LST
elif interp_method == "ric":
self.interp = RIC
else:
raise ValueError("Check interpolation method")
self.atoms = reactant.copy()
self.natoms = len(self.atoms.numbers)
if not self.use_cartesian_distance:
_interp_init = self.interp(reactant, product, ninterp=self.ninterp)
s = calculate_arc_length(_interp_init())
self.dist = s[-1]
self.stepsize = self.dist / self.nnodes_min
else:
_interp_init = Linear(reactant, product, ninterp=self.ninterp)
s = calculate_arc_length(_interp_init())
self.dist = s[-1]
self.stepsize = float(stepsize)
self.nnodes_min = int(self.dist / self.stepsize)
self.init_coordsobj: Optional[Redundant] = None
if interp_method == "ric":
self.init_coordsobj = (
_interp_init.coords if isinstance(_interp_init, RIC) else RIC(reactant, product, ninterp=2).coords
)
else:
self.init_coordsobj = None
logger.info(f"NNODES_MIN: {self.nnodes_min}")
logger.info(f"DIST: {self.dist:.3f} STEPSIZE: {self.stepsize:.3f}")
self.r_string: list[Atoms] = [reactant.copy()]
self.r_fix: list[bool] = [True]
self.r_energy: list[Optional[float]] = [None]
self.r_tangent: list[Optional[NDArray[Any]]] = [None]
self.r_nnodes = len(self.r_string)
self.p_string: list[Atoms] = [product.copy()]
self.p_fix: list[bool] = [True]
self.p_energy: list[Optional[float]] = [None]
self.p_tangent: list[Optional[NDArray[Any]]] = [None]
self.p_nnodes = len(self.p_string)
self.growing = True
self.iteration = 0
self.ngrad = 0
self.coordsobj: Any = None
[docs]
def interpolate(self, outdir: Path | str) -> None:
"""Generate and write the dense interpolated path between current frontier nodes.
Writes a single XYZ file named ``interp_<iteration>.xyz`` to ``outdir``.
Each frame comment line contains the cumulative arc-length value.
Parameters
----------
outdir : path-like
Directory in which to write the interpolated path XYZ file.
"""
outfile = Path(outdir) / f"interp_{self.iteration:02d}.xyz"
r_atoms = self.r_string[-1]
p_atoms = self.p_string[-1]
r_xyz, p_xyz = project_trans_rot(r_atoms.get_positions(), p_atoms.get_positions())
r_xyz, p_xyz = r_xyz.flatten(), p_xyz.flatten()
interp = self.interp(r_atoms, p_atoms, ninterp=self.ninterp)
string = interp()
s = calculate_arc_length(string)
path = []
for i in range(self.ninterp):
atoms = self.atoms.copy()
atoms.set_positions(string[i].reshape(-1, 3))
path.append(atoms)
with outfile.open("w") as f:
for i, atoms in enumerate(path):
f.write(f"{self.natoms}\n")
f.write(f"{s[i]:.5f}\n")
for atom, xyz in zip(
atoms.get_chemical_symbols(),
atoms.get_positions(),
strict=True,
):
x, y, z = map(float, xyz)
f.write(f"{atom} {x:.8f} {y:.8f} {z:.8f}\n")
[docs]
def grow(self) -> None:
"""Grow the string by adding one new frontier node to each end.
Interpolates between the current frontier nodes, selects positions
one step-size inward from each end, and appends those nodes to
``r_string`` and ``p_string`` with ``fix=False``. Sets
``self.growing = False`` when the two frontiers are within one
step-size of each other.
"""
if self.output is not None:
self.output._ensure_iteration_header(self.iteration + 1, self.dist)
r_atoms = self.r_string[-1]
p_atoms = self.p_string[-1]
r_xyz, p_xyz = project_trans_rot(r_atoms.get_positions(), p_atoms.get_positions())
r_xyz, p_xyz = r_xyz.flatten(), p_xyz.flatten()
return_q = self.use_cartesian_distance
interp = self.interp(r_atoms, p_atoms, ninterp=self.ninterp, return_q=return_q)
try:
self.coordsobj = interp.coords
except Exception:
self.coordsobj = Cartesian(
r_atoms,
p_atoms,
raise_on_backtransf_fail=self.raise_on_backtransf_fail,
)
if self.use_cartesian_distance and self.interp_method == "ric":
string = interp()
s = calculate_arc_length(string)
cs = CubicSpline(s, string, axis=0)
self.dist = distance(r_xyz, p_xyz)
if self.dist < self.stepsize:
self.growing = False
return
if self.output is not None:
self.output.write_current_frontier_node("r", r_atoms)
r_prev = r_xyz.copy().reshape(-1, 3)
r_idx = 1
r_s = 0.0
for qtarget in string[1:-1]:
r_next = interp.coords.x(r_prev, qtarget)
_, r_next = project_trans_rot(r_xyz.reshape(-1, 3), r_next)
r_next = r_next.reshape(-1, 3)
r_s = distance(r_xyz, r_next)
if r_s > self.stepsize:
break
r_prev = r_next.copy()
r_idx += 1
r_frontier = self.atoms.copy()
r_frontier.set_positions(r_next.reshape(-1, 3))
dqds = cs(s[r_idx], 1)
Bprim = interp.coords.b_matrix(r_next)
U = interp.coords.u_matrix(Bprim)
B = U.T @ Bprim
BT_inv = np.linalg.pinv(B @ B.T) @ B
dqds = U.T @ dqds
dxds = BT_inv.T @ dqds
self.r_string += [r_frontier]
self.r_fix += [False]
self.r_energy += [None]
self.r_tangent += [normalize(dxds)]
self.r_nnodes = len(self.r_string)
if self.output is not None:
self.output.write_frontier_node("r", r_frontier, r_s)
if self.dist <= 2 * self.stepsize:
self.growing = False
return
if self.output is not None:
self.output.write_current_frontier_node("p", p_atoms)
p_prev = p_xyz.copy().reshape(-1, 3)
p_idx = 1
p_s = 0.0
for qtarget in string[1:-1][::-1]:
p_next = interp.coords.x(p_prev, qtarget)
_, p_next = project_trans_rot(p_xyz.reshape(-1, 3), p_next)
p_next = p_next.reshape(-1, 3)
p_s = distance(p_xyz, p_next)
if p_s > self.stepsize:
break
p_prev = p_next.copy()
p_idx += 1
p_frontier = self.atoms.copy()
p_frontier.set_positions(p_next.reshape(-1, 3))
dqds = cs(s[p_idx], 1)
Bprim = interp.coords.b_matrix(p_next)
U = interp.coords.u_matrix(Bprim)
B = U.T @ Bprim
BT_inv = np.linalg.pinv(B @ B.T) @ B
dqds = U.T @ dqds
dxds = BT_inv.T @ dqds
self.p_string += [p_frontier]
self.p_fix += [False]
self.p_energy += [None]
self.p_tangent += [normalize(dxds)]
self.p_nnodes = len(self.p_string)
if self.output is not None:
self.output.write_frontier_node("p", p_frontier, p_s)
else:
string = interp()
s = calculate_arc_length(string)
cs = CubicSpline(s, string.reshape(self.ninterp, 3 * self.natoms), axis=0)
self.dist = s[-1]
if self.dist < self.stepsize:
self.growing = False
return
r_idx = np.abs(s - self.stepsize).argmin()
p_idx = np.abs(s - (s[-1] - self.stepsize)).argmin()
if self.output is not None:
self.output.write_current_frontier_node("r", r_atoms)
r_frontier = self.atoms.copy()
r_frontier.set_positions(string[r_idx].reshape(-1, 3))
self.r_string += [r_frontier]
self.r_fix += [False]
self.r_energy += [None]
self.r_tangent += [normalize(cs(s[r_idx], 1))]
self.r_nnodes = len(self.r_string)
if self.output is not None:
self.output.write_frontier_node("r", r_frontier, float(s[r_idx]))
if self.dist <= 2 * self.stepsize:
self.growing = False
return
if self.output is not None:
self.output.write_current_frontier_node("p", p_atoms)
p_frontier = self.atoms.copy()
p_frontier.set_positions(string[p_idx].reshape(-1, 3))
self.p_string += [p_frontier]
self.p_fix += [False]
self.p_energy += [None]
self.p_tangent += [normalize(cs(s[p_idx], 1))]
self.p_nnodes = len(self.p_string)
if self.output is not None:
self.output.write_frontier_node("p", p_frontier, float(s[-1] - s[p_idx]))
[docs]
def optimize(self, optimizer: Any) -> None:
"""Relax all unfixed frontier nodes perpendicular to the local tangent.
Evaluates energies for endpoint nodes that have not yet been computed,
then runs the optimizer on every unfixed node. After optimization each
node is frozen (``fix=True``) so it will not move in subsequent
iterations.
Parameters
----------
optimizer : CartesianOptimizer or InternalsOptimizer
Optimizer instance whose ``calc`` attribute provides energies and
gradients via the ASE calculator interface.
"""
self.iteration += 1
optimizer.coordsobj = self.coordsobj
for i in range(self.r_nnodes):
if self.r_energy[i] is None and self.r_fix[i]:
energy = optimizer.calc.get_potential_energy(self.r_string[i])
self.r_energy[i] = float_check(energy)
if self.output is not None:
self.output.write_optimized_node("r", i, self.r_string[i], self.r_energy[i], 0, 0)
elif not self.r_fix[i]:
assert self.r_tangent[i] is not None
atoms = self.r_string[i]
try:
atoms, energy, nfev, nit = optimizer.optimize(atoms, self.r_tangent[i])
self.r_string[i] = atoms
self.r_energy[i] = float_check(energy)
except Exception:
energy = optimizer.calc.get_potential_energy(atoms)
self.r_energy[i] = float_check(energy)
nfev, nit = 0, 0
self.r_fix[i] = True
self.ngrad += nfev
if self.output is not None:
self.output.write_optimized_node("r", i, self.r_string[i], self.r_energy[i], nfev, nit)
for i in range(self.p_nnodes):
if self.p_energy[i] is None and self.p_fix[i]:
energy = optimizer.calc.get_potential_energy(self.p_string[i])
self.p_energy[i] = float_check(energy)
if self.output is not None:
self.output.write_optimized_node("p", i, self.p_string[i], self.p_energy[i], 0, 0)
elif not self.p_fix[i]:
assert self.p_tangent[i] is not None
atoms = self.p_string[i]
try:
atoms, energy, nfev, nit = optimizer.optimize(atoms, self.p_tangent[i])
self.p_string[i] = atoms
self.p_energy[i] = float_check(energy)
except Exception:
energy = optimizer.calc.get_potential_energy(atoms)
self.p_energy[i] = float_check(energy)
nfev, nit = 0, 0
self.p_fix[i] = True
self.ngrad += nfev
if self.output is not None:
self.output.write_optimized_node("p", i, self.p_string[i], self.p_energy[i], nfev, nit)
self.dist = distance(self.r_string[-1].get_positions().flatten(), self.p_string[-1].get_positions().flatten())
if self.dist < self.stepsize:
self.growing = False
[docs]
def write(self, outdir: Path | str) -> None:
"""Write the current string geometries and relative energies to an XYZ file.
Writes ``vfile_<iteration>.xyz`` to ``outdir``. Frame comment lines
contain the cumulative arc-length and relative energy (eV). On the
final iteration (``growing=False``) also writes the total gradient
count to ``ngrad.txt``.
Parameters
----------
outdir : path-like
Directory in which to write output files.
"""
outdir = Path(outdir)
outfile = outdir / f"vfile_{self.iteration:02d}.xyz"
gradfile = outdir / "ngrad.txt"
path = self.r_string + self.p_string[::-1]
string = np.stack([atoms.get_positions() for atoms in path], axis=0)
s = calculate_arc_length(string)
energy = np.array(self.r_energy + self.p_energy[::-1])
energy = energy - energy.min() # now will be in just eV
# check for fixed atoms
c = path[0].constraints
if len(c) > 0:
fixed = True
fixed_atoms = c[0].get_indices()
else:
fixed = False
with outfile.open("w") as f:
for i, atoms in enumerate(path):
if fixed:
_, xyz = project_trans_rot_fixed(string[0], string[i], fixed=fixed_atoms)
else:
_, xyz = project_trans_rot(string[0], string[i])
xyz = xyz.reshape(-1, 3)
f.write(f"{self.natoms}\n")
f.write(f"{s[i]:.5f} {energy[i]:.3f}\n")
for atom, coord in zip(atoms.get_chemical_symbols(), xyz, strict=False):
f.write(f"{atom} {float(coord[0]):.8f} {float(coord[1]):.8f} {float(coord[2]):.8f}\n")
energy_str = np.array2string(energy, precision=1, floatmode="fixed")
logging.info(f"ITERATION: {self.iteration} DIST: {self.dist:.2f} ENERGY: {energy_str}")
if self.output is not None:
self.output.write_iteration_summary(self.iteration, self.r_energy, self.p_energy, self.dist)
if not self.growing:
with gradfile.open("w") as f:
f.write(f"{self.ngrad}\n")