366 lines
16 KiB
Python
366 lines
16 KiB
Python
from Stochastisches_Modell import StochastischesModell
|
||
from Datumsfestlegung import Datumsfestlegung
|
||
import Datenbank
|
||
from Export import Export as Exporter
|
||
import Stochastisches_Modell
|
||
import Funktionales_Modell
|
||
import Datumsfestlegung
|
||
import numpy as np
|
||
|
||
|
||
|
||
|
||
|
||
|
||
class Iterationen:
|
||
"""Iterative Ausgleichung auf Basis des funktionalen und stochastischen Modells.
|
||
|
||
Die Klasse führt eine iterative Parameterschätzung nach dem Gauss-Markov-Modell durch. Zudem stellt Sie eine Methode zur Verfügung für:
|
||
|
||
- einmaligen Aufbau der symbolischen Matrizen/Vektoren (Jacobi-Matrix, Näherungs-Beobachtungsvektor, Qll, QAA, Unbekanntenvektor),
|
||
- numerische Auswertung je Iteration (A(x_k), l0(x_k), dl_k) und Berechnung des Vektors dx,
|
||
- Fortschreibung des Unbekanntenvektors x_{k+1} = x_k + dx,
|
||
- Abbruchprüfung über Konvergenzkriterium (max|dx_xyz|) und Stagnation,
|
||
- optionaler Export von Zwischenergebnisse als CSV in den Ordner Zwischenergebnisse.
|
||
|
||
Die Iteration verwendet bei weicher Lagerung eine erweiterte Kovarianzmatrix (Q_ext) inklusive Anschlusspunkten (lA_*),
|
||
aus der die Gewichtsmatrix P abgeleitet wird.
|
||
"""
|
||
def __init__(self, pfad_datenbank: str, pfad_tif_quasigeoidundulation: str, a: float, b: float) -> None:
|
||
"""Initialisiert die Iterationssteuerung.
|
||
|
||
Speichert Datenbankpfade und Ellipsoidparameter, initialisiert das funktionale Modell sowie das stochastische Modell
|
||
und legt den Datenbankzugriff an.
|
||
|
||
:param pfad_datenbank: Pfad zur SQLite-Datenbank.
|
||
:type pfad_datenbank: str
|
||
:param pfad_tif_quasigeoidundulation: Pfad zum GeoTIFF der Quasigeoidundulation (BKG), benötigt für UTM/Normalhöhen in Transformationen.
|
||
:type pfad_tif_quasigeoidundulation: str
|
||
:param a: Große Halbachse a des Referenzellipsoids in Meter.
|
||
:type a: float
|
||
:param b: Kleine Halbachse b des Referenzellipsoids in Meter.
|
||
:type b: float
|
||
:return: None
|
||
:rtype: None
|
||
"""
|
||
self.pfad_datenbank = pfad_datenbank
|
||
self.pfad_tif_quasigeoidundulation = pfad_tif_quasigeoidundulation
|
||
self.a = a
|
||
self.b = b
|
||
|
||
self.stoch_modell = Stochastisches_Modell.StochastischesModell(pfad_datenbank)
|
||
self.db_zugriff = Datenbank.Datenbankzugriff(pfad_datenbank)
|
||
self.fm = Funktionales_Modell.FunktionalesModell(self.pfad_datenbank, self.a, self.b, self.pfad_tif_quasigeoidundulation)
|
||
|
||
|
||
def ausgleichung_global(self, A, dl, Q_ext, P):
|
||
"""
|
||
Führt eine Ausgleichung nach kleinsten Quadraten durch.
|
||
|
||
Aus der Designmatrix A, dem Verbesserungsvektor dl und der Gewichtsmatrix P wird das Normalgleichungssystem
|
||
aufgestellt und gelöst. Anschließend werden Residuen sowie Kofaktor-Matrizen der Unbekannten und Beobachtungen berechnet.
|
||
|
||
Es werden folgende Berechnungsschitte durchgeführt:
|
||
|
||
1) Normalgleichungsmatrix N = Aᵀ · P · A und Absolutglied n = Aᵀ · P · dl
|
||
2) Lösung dx = N⁻¹ · n
|
||
3) Residuen v = A · dx − dl
|
||
4) Kofaktormatrix der Unbekannten Q_xx
|
||
5) Kofaktormatrix der ausgeglichenen Beobachtungen Q_ll_dach
|
||
6) Kofaktormatrix der Verbesserungen Q_vv
|
||
|
||
:param A: Jacobi-Matrix (A-Matrix).
|
||
:type A: array_like
|
||
:param dl: Verbesserungsvektor bzw. Beobachtungsabweichungen.
|
||
:type dl: array_like
|
||
:param Q_ext: a-priori Kofaktormatrix der Beobachtungen.
|
||
:type Q_ext: array_like
|
||
:param P: Gewichtsmatrix der Beobachtungen.
|
||
:type P: array_like
|
||
:return: Dictionary mit Ausgleichungsergebnissen, Zuschlagsvektor dx.
|
||
:rtype: tuple[dict[str, Any], numpy.ndarray]
|
||
:raises numpy.linalg.LinAlgError: Wenn das Normalgleichungssystem singulär ist und nicht gelöst werden kann.
|
||
"""
|
||
A = np.asarray(A, float)
|
||
dl = np.asarray(dl, float).reshape(-1, 1)
|
||
Q_ext = np.asarray(Q_ext, float)
|
||
P = np.asarray(P, float)
|
||
|
||
# 1) Gewichtsmatrix P
|
||
# P = Q_ext
|
||
|
||
# 2) Normalgleichungsmatrix N und Absolutgliedvektor n
|
||
N = A.T @ P @ A
|
||
n = A.T @ P @ dl
|
||
|
||
# 3) Zuschlagsvektor dx
|
||
dx = np.linalg.solve(N, n)
|
||
|
||
# 4) Residuenvektor v
|
||
v = A @ dx - dl
|
||
|
||
# 5) Kofaktormatrix der Unbekannten Q_xx
|
||
Q_xx = StochastischesModell.berechne_Q_xx(N)
|
||
|
||
# 6) Kofaktormatrix der Beobachtungen Q_ll_dach
|
||
Q_ll_dach = StochastischesModell.berechne_Q_ll_dach(A, Q_xx)
|
||
|
||
# 7) Kofaktormatrix der Verbesserungen Q_vv
|
||
Q_vv = StochastischesModell.berechne_Qvv(Q_ext, Q_ll_dach)
|
||
|
||
# 8) Ausgabe
|
||
dict_ausgleichung = {
|
||
"dx": dx,
|
||
"v": v,
|
||
"P": P,
|
||
"N": N,
|
||
"Q_xx": Q_xx,
|
||
"Q_ll_dach": Q_ll_dach,
|
||
"Q_vv": Q_vv,
|
||
"Q_ext": Q_ext,
|
||
}
|
||
return dict_ausgleichung, dx
|
||
|
||
|
||
def iterationen(self, datumfestlegung: str, speichern_in_csv: bool) -> tuple[np.ndarray, list, list, np.ndarray, np.ndarray, np.ndarray, dict]:
|
||
"""Führt die iterative Ausgleichung (Parameterschätzung) aus und gibt die Ergebnisse zurück.
|
||
|
||
Ablauf (vereinfacht):
|
||
|
||
1) Symbolische Initialisierung (einmalig):
|
||
- Jacobi-Matrix_symbolisch inkl. Unbekanntenliste und Beobachtungskennungen,
|
||
- l0_symbolisch (Näherungs-Beobachtungsvektor),
|
||
- Qll_symbolisch (Beobachtungen) und ggf. QAA_symbolisch (Anschlusspunkte lA_*),
|
||
- Unbekanntenvektor_symbolisch.
|
||
|
||
2) Numerische Initialisierung (einmalig):
|
||
- numerischer Beobachtungsvektor l aus Datenbank/Substitutionen,
|
||
- Qll_numerisch (und ggf. QAA_numerisch) sowie Aufbau von Q_ext und P bei weicher Lagerung.
|
||
|
||
3) Iterationsschleife (k = 0..max_iter-1):
|
||
- Jacobi-Matrix(x_k) numerisch aus Jacobi-Matrix_symbolisch,
|
||
- l0(x_k) numerisch aus l0_symbolisch,
|
||
- dl_k = l − l0(x_k) (inkl. Normierung der Richtungen),
|
||
- Parameterschätzung -> dx_k,
|
||
- Fortschreibung x_{k+1} = x_k + dx_k,
|
||
- Abbruch bei Konvergenz (max|dx_xyz| < tol) oder Stagnation.
|
||
|
||
Optional werden ausgewählte Matrizen/Vektoren als CSV-Datei exportiert.
|
||
|
||
:param datumfestlegung: Art der Datumsfestlegung. In der aktuellen Implementierung wird "weiche Lagerung" erwartet,
|
||
da Q_ext und P sonst nicht aufgebaut werden.
|
||
:type datumfestlegung: str
|
||
:param speichern_in_csv: Steuert den Export von Zwischenergebnissen als CSV-Datei in den Ordner Zwischenergebnisse.
|
||
:type speichern_in_csv: bool
|
||
:return: Tupel mit (Jacobi-Matrix_matrix_numerisch, liste_unbekannte, liste_beobachtungen, x, dx, dl, ausgabe_parameterschaetzung).
|
||
Dabei ist x der numerische Unbekanntenvektor der letzten Iteration, dx die letzte Korrektur, dl der letzte
|
||
Verbesserungsvektor und ausgabe_parameterschaetzung ein Dictionary der globalen Ausgleichungs-Ausgabe.
|
||
:rtype: tuple[np.ndarray, list, list, np.ndarray, np.ndarray, np.ndarray, dict]
|
||
"""
|
||
# Symbolische Matrizen vor Ausführung der ersten Iteration aufstellen
|
||
# Symbolische Jacobimatrix aufstellen
|
||
Jacobimatrix_symbolisch, Jacobimatrix_symbolisch_liste_unbekannte, Jacobimatrix_symbolisch_liste_beobachtungsvektor = self.fm.jacobi_matrix_symbolisch(
|
||
datumfestlegung, self.db_zugriff.get_datumskoordinate())
|
||
|
||
# Symbolischen Beobachtungsvektor aus Näherungskoordinaten (l0) aufstellen
|
||
beobachtungsvektor_naeherung_symbolisch = self.fm.beobachtungsvektor_naeherung_symbolisch(
|
||
Jacobimatrix_symbolisch_liste_beobachtungsvektor)
|
||
|
||
# Symbolische Qll-Matrix aufstellen
|
||
Qll_matrix_symbolisch = self.stoch_modell.Qll_symbolisch(Jacobimatrix_symbolisch_liste_beobachtungsvektor)
|
||
|
||
# Symbolische QAA-Matrix mit den Anschlusspunkten für die weiche Lagerung aufstellen, wenn datumsfestlegung = weiche Lagerung ist
|
||
if datumfestlegung == "weiche Lagerung":
|
||
QAA_matrix_symbolisch = self.stoch_modell.QAA_symbolisch(Jacobimatrix_symbolisch_liste_beobachtungsvektor)
|
||
|
||
# Symbolische Unbekanntenvektor aufstellen
|
||
unbekanntenvektor_symbolisch = (self.fm.unbekanntenvektor_symbolisch(Jacobimatrix_symbolisch_liste_unbekannte))
|
||
|
||
|
||
# Überprüfung, ob bereits eine Varianzkomponentenschätzung erfolgt ist
|
||
liste_varianzkomponentenschaetzung = self.db_zugriff.get_varianzkomponentenschaetzung()
|
||
for varianzkomponente in liste_varianzkomponentenschaetzung:
|
||
if varianzkomponente[3] != 1:
|
||
varianzkomponentenschaetzung_erfolgt = True
|
||
else:
|
||
varianzkomponentenschaetzung_erfolgt = False
|
||
|
||
dict_indizes_beobachtungsgruppen = Datumsfestlegung.Datumsfestlegung.indizes_beobachtungsvektor_nach_beobachtungsgruppe(
|
||
Jacobimatrix_symbolisch_liste_beobachtungsvektor)
|
||
|
||
# Klasse FunktionalesModell zurücksetzen
|
||
self.fm.substitutionen_dict = self.fm.dict_substitutionen_uebergeordnetes_system()
|
||
self.fm.liste_symbole_lambdify = sorted(self.fm.substitutionen_dict.keys(), key=lambda s: str(s))
|
||
self.fm.func_A0 = None
|
||
self.fm.func_beob0 = None
|
||
self.fm.func_u0 = None
|
||
|
||
# Numerischen Unbekanntenvektor aufstellen
|
||
x = self.fm.unbekanntenvektor_numerisch(
|
||
Jacobimatrix_symbolisch_liste_unbekannte,
|
||
unbekanntenvektor_symbolisch
|
||
)
|
||
# sp.Symbol in Strings konvertieren
|
||
labels_str = [str(s).strip() for s in Jacobimatrix_symbolisch_liste_beobachtungsvektor]
|
||
unk_str = [str(s).strip() for s in Jacobimatrix_symbolisch_liste_unbekannte]
|
||
|
||
# Beschriftungen für CSV-Dateien vorbereiten
|
||
mask_lA = np.array([l.startswith("lA_") for l in labels_str])
|
||
idx_ll = np.where(~mask_lA)[0]
|
||
idx_AA = np.where(mask_lA)[0]
|
||
perm = np.concatenate([idx_ll, idx_AA])
|
||
labels_perm = [labels_str[i] for i in perm]
|
||
idx_xyz = [i for i, s in enumerate(unk_str) if s.startswith(("X", "Y", "Z"))]
|
||
|
||
# Numerischen Beobachtungsvektor einmalig aufstellen
|
||
beobachtungsvektor_numerisch = np.asarray(
|
||
self.fm.beobachtungsvektor_numerisch(Jacobimatrix_symbolisch_liste_beobachtungsvektor),
|
||
float
|
||
).reshape(-1, 1)
|
||
|
||
# Beobachtungsvektor mit den Anschlusspunkten für die weiche Lagerung aufstellen
|
||
lA_prior = beobachtungsvektor_numerisch[mask_lA].copy() if datumfestlegung == "weiche Lagerung" else None
|
||
if datumfestlegung == "weiche Lagerung":
|
||
beobachtungsvektor_numerisch[mask_lA] = lA_prior
|
||
|
||
# Numerische QLL-MAtrix einmalig aufstellen
|
||
Qll_matrix_numerisch = self.stoch_modell.Qll_numerisch(
|
||
Qll_matrix_symbolisch,
|
||
Jacobimatrix_symbolisch_liste_beobachtungsvektor
|
||
)
|
||
|
||
# Einmalig die numerischen Qll- und QAA-Matrizen zu einer gemeinsamen Matrix verbinden und P-Matrix aufbauen
|
||
if datumfestlegung == "weiche Lagerung":
|
||
QAA_matrix_numerisch = self.stoch_modell.QAA_numerisch(
|
||
QAA_matrix_symbolisch,
|
||
Jacobimatrix_symbolisch_liste_beobachtungsvektor
|
||
)
|
||
Q_ext, P = Datumsfestlegung.Datumsfestlegung.weiches_datum(Qll_matrix_numerisch, QAA_matrix_numerisch,
|
||
varianzkomponentenschaetzung_erfolgt,
|
||
dict_indizes_beobachtungsgruppen)
|
||
|
||
# Abbruchbedingungen für die Iteration definieren
|
||
tol_dx_abs = 1e-3 # 1.0 mm
|
||
tol_stag = 1e-10 # Änderung in max|dx_xyz|
|
||
stag_max = 3
|
||
|
||
prev_max_dx_xyz = None
|
||
stag_count = 0
|
||
|
||
max_iter = 20 # Maximale Anzahl Iterationen
|
||
|
||
# Iterationen ausführen
|
||
for iteration in range(max_iter):
|
||
print("\n" + "=" * 60)
|
||
print(f"ITERATION {iteration}")
|
||
|
||
print(f" -> berechne A(x_{iteration})")
|
||
|
||
# Numerische Jacobi-Matrix bei Iteration 0 mit den Datensätzen aus der Datenbank aufbauen. In jeder weiteren Iteration aus den Ergebnissen der vorherigen Iteration.
|
||
A_matrix_numerisch = self.fm.jacobi_matrix_numerisch(
|
||
Jacobimatrix_symbolisch,
|
||
"naeherung_us",
|
||
Jacobimatrix_symbolisch_liste_unbekannte,
|
||
Jacobimatrix_symbolisch_liste_beobachtungsvektor,
|
||
iteration
|
||
)
|
||
A_matrix_numerisch = np.asarray(A_matrix_numerisch, float)
|
||
|
||
if datumfestlegung == "weiche Lagerung":
|
||
beobachtungsvektor_numerisch[mask_lA] = lA_prior
|
||
|
||
# Numerischen Beobachtungsvektor aus Näherungswerten (l0) aufbauen.
|
||
print(f" -> berechne l0(x_{iteration})")
|
||
beobachtungsvektor_naeherung_numerisch_iteration0 = self.fm.beobachtungsvektor_naeherung_numerisch(
|
||
Jacobimatrix_symbolisch_liste_beobachtungsvektor,
|
||
beobachtungsvektor_naeherung_symbolisch,
|
||
iteration
|
||
)
|
||
|
||
# Exportieren von Matrizen und Vektoren in CSV-Dateien
|
||
if iteration == 0:
|
||
if speichern_in_csv == True:
|
||
Exporter.matrix_to_csv(
|
||
fr"Zwischenergebnisse\_l.csv",
|
||
[""],
|
||
labels_str,
|
||
beobachtungsvektor_numerisch,
|
||
"l"
|
||
)
|
||
|
||
if datumfestlegung == "weiche Lagerung":
|
||
Exporter.matrix_to_csv(
|
||
fr"Zwischenergebnisse\{iteration}_Q_ext.csv",
|
||
labels_perm,
|
||
labels_perm,
|
||
Q_ext,
|
||
"Q_ext"
|
||
)
|
||
Exporter.matrix_to_csv(
|
||
fr"Zwischenergebnisse\{iteration}_P.csv",
|
||
labels_perm,
|
||
labels_perm,
|
||
P,
|
||
"P"
|
||
)
|
||
|
||
# Q-Matrix für weitere Berechnung auswählen
|
||
if datumfestlegung == "weiche Lagerung":
|
||
Q_k = Q_ext
|
||
else:
|
||
Q_k = Qll_matrix_numerisch
|
||
|
||
# dl = l - l0 Vektor berechnen
|
||
dl_k = self.fm.berechnung_dl(
|
||
beobachtungsvektor_numerisch,
|
||
beobachtungsvektor_naeherung_numerisch_iteration0,
|
||
Jacobimatrix_symbolisch_liste_beobachtungsvektor,
|
||
iteration
|
||
)
|
||
dl_k = np.asarray(dl_k, float)
|
||
|
||
# Parameterschätzung für die aktuelle Iteration berechnen
|
||
ausgabe_parameterschaetzung, dx = self.ausgleichung_global(A_matrix_numerisch, dl_k, Q_k, P)
|
||
dx = np.asarray(dx, float).reshape(-1, 1)
|
||
|
||
# Matrizen je Iteration als CSV-Datei speichern
|
||
if speichern_in_csv == True:
|
||
Exporter.matrix_to_csv(fr"Zwischenergebnisse\{iteration}_Qxx.csv",
|
||
[""], Jacobimatrix_symbolisch_liste_unbekannte, ausgabe_parameterschaetzung["Q_xx"], "Qxx")
|
||
|
||
Exporter.matrix_to_csv(fr"Zwischenergebnisse\{iteration}_dl.csv",
|
||
[""], labels_str, dl_k, "dl")
|
||
|
||
Exporter.matrix_to_csv(fr"Zwischenergebnisse\{iteration}_dx.csv",
|
||
[""], Jacobimatrix_symbolisch_liste_unbekannte, dx, "dx")
|
||
|
||
# Unbekanntenvektor als Ergebnis der aktuellen Iteration berechnen
|
||
x = self.fm.unbekanntenvektor_numerisch(
|
||
Jacobimatrix_symbolisch_liste_unbekannte,
|
||
unbekanntenvektor_symbolisch,
|
||
dX_Vektor=dx,
|
||
unbekanntenvektor_numerisch_vorherige_Iteration=x,
|
||
iterationsnummer=iteration
|
||
)
|
||
|
||
# Abbruchbedingungen überprüfen und ausgeben
|
||
max_dx_xyz = float(np.max(np.abs(dx[idx_xyz])))
|
||
print(" -> max|dx_xyz| [m] =", max_dx_xyz)
|
||
|
||
tol_dx = tol_dx_abs
|
||
|
||
if prev_max_dx_xyz is not None and abs(prev_max_dx_xyz - max_dx_xyz) < tol_stag:
|
||
stag_count += 1
|
||
else:
|
||
stag_count = 0
|
||
prev_max_dx_xyz = max_dx_xyz
|
||
|
||
if max_dx_xyz < tol_dx:
|
||
print("\nKONVERGENZ ERREICHT (XYZ)")
|
||
break
|
||
|
||
if stag_count >= stag_max:
|
||
print("\nABBRUCH: STAGNATION (XYZ)")
|
||
break
|
||
|
||
return A_matrix_numerisch, Jacobimatrix_symbolisch_liste_unbekannte, Jacobimatrix_symbolisch_liste_beobachtungsvektor, x, dx, dl_k, ausgabe_parameterschaetzung |