Files
Masterprojekt_V3/Parameterschaetzung.py
2026-02-05 15:02:28 +01:00

366 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from Stochastisches_Modell import StochastischesModell
from Datumsfestlegung import Datumsfestlegung
import Datenbank
from Export import Export as Exporter
import Stochastisches_Modell
import Funktionales_Modell
import Datumsfestlegung
import numpy as np
class Iterationen:
"""Iterative Ausgleichung auf Basis des funktionalen und stochastischen Modells.
Die Klasse führt eine iterative Parameterschätzung nach dem Gauss-Markov-Modell durch. Zudem stellt Sie eine Methode zur Verfügung für:
- einmaligen Aufbau der symbolischen Matrizen/Vektoren (Jacobi-Matrix, Näherungs-Beobachtungsvektor, Qll, QAA, Unbekanntenvektor),
- numerische Auswertung je Iteration (A(x_k), l0(x_k), dl_k) und Berechnung des Vektors dx,
- Fortschreibung des Unbekanntenvektors x_{k+1} = x_k + dx,
- Abbruchprüfung über Konvergenzkriterium (max|dx_xyz|) und Stagnation,
- optionaler Export von Zwischenergebnisse als CSV in den Ordner Zwischenergebnisse.
Die Iteration verwendet bei weicher Lagerung eine erweiterte Kovarianzmatrix (Q_ext) inklusive Anschlusspunkten (lA_*),
aus der die Gewichtsmatrix P abgeleitet wird.
"""
def __init__(self, pfad_datenbank: str, pfad_tif_quasigeoidundolation: str, a: float, b: float) -> None:
"""Initialisiert die Iterationssteuerung.
Speichert Datenbankpfade und Ellipsoidparameter, initialisiert das funktionale Modell sowie das stochastische Modell
und legt den Datenbankzugriff an.
:param pfad_datenbank: Pfad zur SQLite-Datenbank.
:type pfad_datenbank: str
:param pfad_tif_quasigeoidundolation: Pfad zum GeoTIFF der Quasigeoidundulation (BKG), benötigt für UTM/Normalhöhen in Transformationen.
:type pfad_tif_quasigeoidundolation: str
:param a: Große Halbachse a des Referenzellipsoids in Meter.
:type a: float
:param b: Kleine Halbachse b des Referenzellipsoids in Meter.
:type b: float
:return: None
:rtype: None
"""
self.pfad_datenbank = pfad_datenbank
self.pfad_tif_quasigeoidundolation = pfad_tif_quasigeoidundolation
self.a = a
self.b = b
self.stoch_modell = Stochastisches_Modell.StochastischesModell(pfad_datenbank)
self.db_zugriff = Datenbank.Datenbankzugriff(pfad_datenbank)
self.fm = Funktionales_Modell.FunktionalesModell(self.pfad_datenbank, self.a, self.b, self.pfad_tif_quasigeoidundolation)
def ausgleichung_global(self, A, dl, Q_ext, P):
"""
Führt eine Ausgleichung nach kleinsten Quadraten durch.
Aus der Designmatrix A, dem Verbesserungsvektor dl und der Gewichtsmatrix P wird das Normalgleichungssystem
aufgestellt und gelöst. Anschließend werden Residuen sowie Kofaktor-Matrizen der Unbekannten und Beobachtungen berechnet.
Es werden folgende Berechnungsschitte durchgeführt:
1) Normalgleichungsmatrix N = Aᵀ · P · A und Absolutglied n = Aᵀ · P · dl
2) Lösung dx = N⁻¹ · n
3) Residuen v = A · dx dl
4) Kofaktormatrix der Unbekannten Q_xx
5) Kofaktormatrix der ausgeglichenen Beobachtungen Q_ll_dach
6) Kofaktormatrix der Verbesserungen Q_vv
:param A: Jacobi-Matrix (A-Matrix).
:type A: array_like
:param dl: Verbesserungsvektor bzw. Beobachtungsabweichungen.
:type dl: array_like
:param Q_ext: a-priori Kofaktormatrix der Beobachtungen.
:type Q_ext: array_like
:param P: Gewichtsmatrix der Beobachtungen.
:type P: array_like
:return: Dictionary mit Ausgleichungsergebnissen, Zuschlagsvektor dx.
:rtype: tuple[dict[str, Any], numpy.ndarray]
:raises numpy.linalg.LinAlgError: Wenn das Normalgleichungssystem singulär ist und nicht gelöst werden kann.
"""
A = np.asarray(A, float)
dl = np.asarray(dl, float).reshape(-1, 1)
Q_ext = np.asarray(Q_ext, float)
P = np.asarray(P, float)
# 1) Gewichtsmatrix P
# P = Q_ext
# 2) Normalgleichungsmatrix N und Absolutgliedvektor n
N = A.T @ P @ A
n = A.T @ P @ dl
# 3) Zuschlagsvektor dx
dx = np.linalg.solve(N, n)
# 4) Residuenvektor v
v = A @ dx - dl
# 5) Kofaktormatrix der Unbekannten Q_xx
Q_xx = StochastischesModell.berechne_Q_xx(N)
# 6) Kofaktormatrix der Beobachtungen Q_ll_dach
Q_ll_dach = StochastischesModell.berechne_Q_ll_dach(A, Q_xx)
# 7) Kofaktormatrix der Verbesserungen Q_vv
Q_vv = StochastischesModell.berechne_Qvv(Q_ext, Q_ll_dach)
# 8) Ausgabe
dict_ausgleichung = {
"dx": dx,
"v": v,
"P": P,
"N": N,
"Q_xx": Q_xx,
"Q_ll_dach": Q_ll_dach,
"Q_vv": Q_vv,
"Q_ext": Q_ext,
}
return dict_ausgleichung, dx
def iterationen(self, datumfestlegung: str, speichern_in_csv: bool) -> tuple[np.ndarray, list, list, np.ndarray, np.ndarray, np.ndarray, dict]:
"""Führt die iterative Ausgleichung (Parameterschätzung) aus und gibt die Ergebnisse zurück.
Ablauf (vereinfacht):
1) Symbolische Initialisierung (einmalig):
- Jacobi-Matrix_symbolisch inkl. Unbekanntenliste und Beobachtungskennungen,
- l0_symbolisch (Näherungs-Beobachtungsvektor),
- Qll_symbolisch (Beobachtungen) und ggf. QAA_symbolisch (Anschlusspunkte lA_*),
- Unbekanntenvektor_symbolisch.
2) Numerische Initialisierung (einmalig):
- numerischer Beobachtungsvektor l aus Datenbank/Substitutionen,
- Qll_numerisch (und ggf. QAA_numerisch) sowie Aufbau von Q_ext und P bei weicher Lagerung.
3) Iterationsschleife (k = 0..max_iter-1):
- Jacobi-Matrix(x_k) numerisch aus Jacobi-Matrix_symbolisch,
- l0(x_k) numerisch aus l0_symbolisch,
- dl_k = l l0(x_k) (inkl. Normierung der Richtungen),
- Parameterschätzung -> dx_k,
- Fortschreibung x_{k+1} = x_k + dx_k,
- Abbruch bei Konvergenz (max|dx_xyz| < tol) oder Stagnation.
Optional werden ausgewählte Matrizen/Vektoren als CSV-Datei exportiert.
:param datumfestlegung: Art der Datumsfestlegung. In der aktuellen Implementierung wird "weiche Lagerung" erwartet,
da Q_ext und P sonst nicht aufgebaut werden.
:type datumfestlegung: str
:param speichern_in_csv: Steuert den Export von Zwischenergebnissen als CSV-Datei in den Ordner Zwischenergebnisse.
:type speichern_in_csv: bool
:return: Tupel mit (Jacobi-Matrix_matrix_numerisch, liste_unbekannte, liste_beobachtungen, x, dx, dl, ausgabe_parameterschaetzung).
Dabei ist x der numerische Unbekanntenvektor der letzten Iteration, dx die letzte Korrektur, dl der letzte
Verbesserungsvektor und ausgabe_parameterschaetzung ein Dictionary der globalen Ausgleichungs-Ausgabe.
:rtype: tuple[np.ndarray, list, list, np.ndarray, np.ndarray, np.ndarray, dict]
"""
# Symbolische Matrizen vor Ausführung der ersten Iteration aufstellen
# Symbolische Jacobimatrix aufstellen
Jacobimatrix_symbolisch, Jacobimatrix_symbolisch_liste_unbekannte, Jacobimatrix_symbolisch_liste_beobachtungsvektor = self.fm.jacobi_matrix_symbolisch(
datumfestlegung, self.db_zugriff.get_datumskoordinate())
# Symbolischen Beobachtungsvektor aus Näherungskoordinaten (l0) aufstellen
beobachtungsvektor_naeherung_symbolisch = self.fm.beobachtungsvektor_naeherung_symbolisch(
Jacobimatrix_symbolisch_liste_beobachtungsvektor)
# Symbolische Qll-Matrix aufstellen
Qll_matrix_symbolisch = self.stoch_modell.Qll_symbolisch(Jacobimatrix_symbolisch_liste_beobachtungsvektor)
# Symbolische QAA-Matrix mit den Anschlusspunkten für die weiche Lagerung aufstellen, wenn datumsfestlegung = weiche Lagerung ist
if datumfestlegung == "weiche Lagerung":
QAA_matrix_symbolisch = self.stoch_modell.QAA_symbolisch(Jacobimatrix_symbolisch_liste_beobachtungsvektor)
# Symbolische Unbekanntenvektor aufstellen
unbekanntenvektor_symbolisch = (self.fm.unbekanntenvektor_symbolisch(Jacobimatrix_symbolisch_liste_unbekannte))
# Überprüfung, ob bereits eine Varianzkomponentenschätzung erfolgt ist
liste_varianzkomponentenschaetzung = self.db_zugriff.get_varianzkomponentenschaetzung()
for varianzkomponente in liste_varianzkomponentenschaetzung:
if varianzkomponente[3] != 1:
varianzkomponentenschaetzung_erfolgt = True
else:
varianzkomponentenschaetzung_erfolgt = False
dict_indizes_beobachtungsgruppen = Datumsfestlegung.Datumsfestlegung.indizes_beobachtungsvektor_nach_beobachtungsgruppe(
Jacobimatrix_symbolisch_liste_beobachtungsvektor)
# Klasse FunktionalesModell zurücksetzen
self.fm.substitutionen_dict = self.fm.dict_substitutionen_uebergeordnetes_system()
self.fm.liste_symbole_lambdify = sorted(self.fm.substitutionen_dict.keys(), key=lambda s: str(s))
self.fm.func_A0 = None
self.fm.func_beob0 = None
self.fm.func_u0 = None
# Numerischen Unbekanntenvektor aufstellen
x = self.fm.unbekanntenvektor_numerisch(
Jacobimatrix_symbolisch_liste_unbekannte,
unbekanntenvektor_symbolisch
)
# sp.Symbol in Strings konvertieren
labels_str = [str(s).strip() for s in Jacobimatrix_symbolisch_liste_beobachtungsvektor]
unk_str = [str(s).strip() for s in Jacobimatrix_symbolisch_liste_unbekannte]
# Beschriftungen für CSV-Dateien vorbereiten
mask_lA = np.array([l.startswith("lA_") for l in labels_str])
idx_ll = np.where(~mask_lA)[0]
idx_AA = np.where(mask_lA)[0]
perm = np.concatenate([idx_ll, idx_AA])
labels_perm = [labels_str[i] for i in perm]
idx_xyz = [i for i, s in enumerate(unk_str) if s.startswith(("X", "Y", "Z"))]
# Numerischen Beobachtungsvektor einmalig aufstellen
beobachtungsvektor_numerisch = np.asarray(
self.fm.beobachtungsvektor_numerisch(Jacobimatrix_symbolisch_liste_beobachtungsvektor),
float
).reshape(-1, 1)
# Beobachtungsvektor mit den Anschlusspunkten für die weiche Lagerung aufstellen
lA_prior = beobachtungsvektor_numerisch[mask_lA].copy() if datumfestlegung == "weiche Lagerung" else None
if datumfestlegung == "weiche Lagerung":
beobachtungsvektor_numerisch[mask_lA] = lA_prior
# Numerische QLL-MAtrix einmalig aufstellen
Qll_matrix_numerisch = self.stoch_modell.Qll_numerisch(
Qll_matrix_symbolisch,
Jacobimatrix_symbolisch_liste_beobachtungsvektor
)
# Einmalig die numerischen Qll- und QAA-Matrizen zu einer gemeinsamen Matrix verbinden und P-Matrix aufbauen
if datumfestlegung == "weiche Lagerung":
QAA_matrix_numerisch = self.stoch_modell.QAA_numerisch(
QAA_matrix_symbolisch,
Jacobimatrix_symbolisch_liste_beobachtungsvektor
)
Q_ext, P = Datumsfestlegung.Datumsfestlegung.weiches_datum(Qll_matrix_numerisch, QAA_matrix_numerisch,
varianzkomponentenschaetzung_erfolgt,
dict_indizes_beobachtungsgruppen)
# Abbruchbedingungen für die Iteration definieren
tol_dx_abs = 1e-3 # 1.0 mm
tol_stag = 1e-10 # Änderung in max|dx_xyz|
stag_max = 3
prev_max_dx_xyz = None
stag_count = 0
max_iter = 20 # Maximale Anzahl Iterationen
# Iterationen ausführen
for iteration in range(max_iter):
print("\n" + "=" * 60)
print(f"ITERATION {iteration}")
print(f" -> berechne A(x_{iteration})")
# Numerische Jacobi-Matrix bei Iteration 0 mit den Datensätzen aus der Datenbank aufbauen. In jeder weiteren Iteration aus den Ergebnissen der vorherigen Iteration.
A_matrix_numerisch = self.fm.jacobi_matrix_numerisch(
Jacobimatrix_symbolisch,
"naeherung_us",
Jacobimatrix_symbolisch_liste_unbekannte,
Jacobimatrix_symbolisch_liste_beobachtungsvektor,
iteration
)
A_matrix_numerisch = np.asarray(A_matrix_numerisch, float)
if datumfestlegung == "weiche Lagerung":
beobachtungsvektor_numerisch[mask_lA] = lA_prior
# Numerischen Beobachtungsvektor aus Näherungswerten (l0) aufbauen.
print(f" -> berechne l0(x_{iteration})")
beobachtungsvektor_naeherung_numerisch_iteration0 = self.fm.beobachtungsvektor_naeherung_numerisch(
Jacobimatrix_symbolisch_liste_beobachtungsvektor,
beobachtungsvektor_naeherung_symbolisch,
iteration
)
# Exportieren von Matrizen und Vektoren in CSV-Dateien
if iteration == 0:
if speichern_in_csv == True:
Exporter.matrix_to_csv(
fr"Zwischenergebnisse\_l.csv",
[""],
labels_str,
beobachtungsvektor_numerisch,
"l"
)
if datumfestlegung == "weiche Lagerung":
Exporter.matrix_to_csv(
fr"Zwischenergebnisse\{iteration}_Q_ext.csv",
labels_perm,
labels_perm,
Q_ext,
"Q_ext"
)
Exporter.matrix_to_csv(
fr"Zwischenergebnisse\{iteration}_P.csv",
labels_perm,
labels_perm,
P,
"P"
)
# Q-Matrix für weitere Berechnung auswählen
if datumfestlegung == "weiche Lagerung":
Q_k = Q_ext
else:
Q_k = Qll_matrix_numerisch
# dl = l - l0 Vektor berechnen
dl_k = self.fm.berechnung_dl(
beobachtungsvektor_numerisch,
beobachtungsvektor_naeherung_numerisch_iteration0,
Jacobimatrix_symbolisch_liste_beobachtungsvektor,
iteration
)
dl_k = np.asarray(dl_k, float)
# Parameterschätzung für die aktuelle Iteration berechnen
ausgabe_parameterschaetzung, dx = self.ausgleichung_global(A_matrix_numerisch, dl_k, Q_k, P)
dx = np.asarray(dx, float).reshape(-1, 1)
# Matrizen je Iteration als CSV-Datei speichern
if speichern_in_csv == True:
Exporter.matrix_to_csv(fr"Zwischenergebnisse\{iteration}_Qxx.csv",
[""], Jacobimatrix_symbolisch_liste_unbekannte, ausgabe_parameterschaetzung["Q_xx"], "Qxx")
Exporter.matrix_to_csv(fr"Zwischenergebnisse\{iteration}_dl.csv",
[""], labels_str, dl_k, "dl")
Exporter.matrix_to_csv(fr"Zwischenergebnisse\{iteration}_dx.csv",
[""], Jacobimatrix_symbolisch_liste_unbekannte, dx, "dx")
# Unbekanntenvektor als Ergebnis der aktuellen Iteration berechnen
x = self.fm.unbekanntenvektor_numerisch(
Jacobimatrix_symbolisch_liste_unbekannte,
unbekanntenvektor_symbolisch,
dX_Vektor=dx,
unbekanntenvektor_numerisch_vorherige_Iteration=x,
iterationsnummer=iteration
)
# Abbruchbedingungen überprüfen und ausgeben
max_dx_xyz = float(np.max(np.abs(dx[idx_xyz])))
print(" -> max|dx_xyz| [m] =", max_dx_xyz)
tol_dx = tol_dx_abs
if prev_max_dx_xyz is not None and abs(prev_max_dx_xyz - max_dx_xyz) < tol_stag:
stag_count += 1
else:
stag_count = 0
prev_max_dx_xyz = max_dx_xyz
if max_dx_xyz < tol_dx:
print("\nKONVERGENZ ERREICHT (XYZ)")
break
if stag_count >= stag_max:
print("\nABBRUCH: STAGNATION (XYZ)")
break
return A_matrix_numerisch, Jacobimatrix_symbolisch_liste_unbekannte, Jacobimatrix_symbolisch_liste_beobachtungsvektor, x, dx, dl_k, ausgabe_parameterschaetzung