Push 03.02.2026

This commit is contained in:
2026-02-03 18:31:48 +01:00
parent 4e7c55500b
commit 5842255a6a
4 changed files with 17997 additions and 76246 deletions

View File

@@ -3,6 +3,20 @@ from Netzqualität_Genauigkeit import Genauigkeitsmaße
from Datumsfestlegung import Datumsfestlegung
import numpy as np
import Export
import importlib
import Datenbank
import importlib
import Export
from Export import Export as Exporter
import Stochastisches_Modell
import Funktionales_Modell
import Koordinatentransformationen
import Parameterschaetzung
import Netzqualität_Genauigkeit
import Datumsfestlegung
import numpy as np
import sympy as sp
import pandas as pd
def ausgleichung_global(A, dl, Q_ext, P):
@@ -265,3 +279,286 @@ def ausgleichung_spurminimierung(A, dl, Q_ll, *, datumfestlegung, x0, unbekannte
}
return dict_ausgleichung, dx
class Iterationen:
"""Iterative Ausgleichung auf Basis des funktionalen und stochastischen Modells.
Die Klasse führt eine iterative Parameterschätzung nach dem Gauss-Markov-Modell durch. Zudem stellt Sie eine Methode zur Verfügung für:
- einmaligen Aufbau der symbolischen Matrizen/Vektoren (Jacobi-Matrix, Näherungs-Beobachtungsvektor, Qll, QAA, Unbekanntenvektor),
- numerische Auswertung je Iteration (A(x_k), l0(x_k), dl_k) und Berechnung des Vektors dx,
- Fortschreibung des Unbekanntenvektors x_{k+1} = x_k + dx,
- Abbruchprüfung über Konvergenzkriterium (max|dx_xyz|) und Stagnation,
- optionaler Export von Zwischenergebnisse als CSV in den Ordner Zwischenergebnisse.
Die Iteration verwendet bei weicher Lagerung eine erweiterte Kovarianzmatrix (Q_ext) inklusive Anschlusspunkten (lA_*),
aus der die Gewichtsmatrix P abgeleitet wird.
"""
def __init__(self, pfad_datenbank: str, pfad_tif_quasigeoidundolation: str, a: float, b: float) -> None:
"""Initialisiert die Iterationssteuerung.
Speichert Datenbankpfade und Ellipsoidparameter, initialisiert das funktionale Modell sowie das stochastische Modell
und legt den Datenbankzugriff an.
:param pfad_datenbank: Pfad zur SQLite-Datenbank.
:type pfad_datenbank: str
:param pfad_tif_quasigeoidundolation: Pfad zum GeoTIFF der Quasigeoidundulation (BKG), benötigt für UTM/Normalhöhen in Transformationen.
:type pfad_tif_quasigeoidundolation: str
:param a: Große Halbachse a des Referenzellipsoids in Meter.
:type a: float
:param b: Kleine Halbachse b des Referenzellipsoids in Meter.
:type b: float
:return: None
:rtype: None
"""
self.pfad_datenbank = pfad_datenbank
self.pfad_tif_quasigeoidundolation = pfad_tif_quasigeoidundolation
self.a = a
self.b = b
self.stoch_modell = Stochastisches_Modell.StochastischesModell(pfad_datenbank)
self.db_zugriff = Datenbank.Datenbankzugriff(pfad_datenbank)
self.fm = Funktionales_Modell.FunktionalesModell(self.pfad_datenbank, self.a, self.b, self.pfad_tif_quasigeoidundolation)
def iterationen(self, datumfestlegung: str, speichern_in_csv: bool) -> tuple[np.ndarray, list, list, np.ndarray, np.ndarray, np.ndarray, dict]:
"""Führt die iterative Ausgleichung (Parameterschätzung) aus und gibt die Ergebnisse zurück.
Ablauf (vereinfacht):
1) Symbolische Initialisierung (einmalig):
- Jacobi-Matrix_symbolisch inkl. Unbekanntenliste und Beobachtungskennungen,
- l0_symbolisch (Näherungs-Beobachtungsvektor),
- Qll_symbolisch (Beobachtungen) und ggf. QAA_symbolisch (Anschlusspunkte lA_*),
- Unbekanntenvektor_symbolisch.
2) Numerische Initialisierung (einmalig):
- numerischer Beobachtungsvektor l aus Datenbank/Substitutionen,
- Qll_numerisch (und ggf. QAA_numerisch) sowie Aufbau von Q_ext und P bei weicher Lagerung.
3) Iterationsschleife (k = 0..max_iter-1):
- Jacobi-Matrix(x_k) numerisch aus Jacobi-Matrix_symbolisch,
- l0(x_k) numerisch aus l0_symbolisch,
- dl_k = l l0(x_k) (inkl. Normierung der Richtungen),
- Parameterschätzung -> dx_k,
- Fortschreibung x_{k+1} = x_k + dx_k,
- Abbruch bei Konvergenz (max|dx_xyz| < tol) oder Stagnation.
Optional werden ausgewählte Matrizen/Vektoren als CSV-Datei exportiert.
:param datumfestlegung: Art der Datumsfestlegung. In der aktuellen Implementierung wird "weiche Lagerung" erwartet,
da Q_ext und P sonst nicht aufgebaut werden.
:type datumfestlegung: str
:param speichern_in_csv: Steuert den Export von Zwischenergebnissen als CSV-Datei in den Ordner Zwischenergebnisse.
:type speichern_in_csv: bool
:return: Tupel mit (Jacobi-Matrix_matrix_numerisch, liste_unbekannte, liste_beobachtungen, x, dx, dl, ausgabe_parameterschaetzung).
Dabei ist x der numerische Unbekanntenvektor der letzten Iteration, dx die letzte Korrektur, dl der letzte
Verbesserungsvektor und ausgabe_parameterschaetzung ein Dictionary der globalen Ausgleichungs-Ausgabe.
:rtype: tuple[np.ndarray, list, list, np.ndarray, np.ndarray, np.ndarray, dict]
"""
# Symbolische Matrizen vor Ausführung der ersten Iteration aufstellen
# Symbolische Jacobimatrix aufstellen
Jacobimatrix_symbolisch, Jacobimatrix_symbolisch_liste_unbekannte, Jacobimatrix_symbolisch_liste_beobachtungsvektor = self.fm.jacobi_matrix_symbolisch(
datumfestlegung, self.db_zugriff.get_datumskoordinate())
# Symbolischen Beobachtungsvektor aus Näherungskoordinaten (l0) aufstellen
beobachtungsvektor_naeherung_symbolisch = self.fm.beobachtungsvektor_naeherung_symbolisch(
Jacobimatrix_symbolisch_liste_beobachtungsvektor)
# Symbolische Qll-Matrix aufstellen
Qll_matrix_symbolisch = self.stoch_modell.Qll_symbolisch(Jacobimatrix_symbolisch_liste_beobachtungsvektor)
# Symbolische QAA-Matrix mit den Anschlusspunkten für die weiche Lagerung aufstellen, wenn datumsfestlegung = weiche Lagerung ist
if datumfestlegung == "weiche Lagerung":
QAA_matrix_symbolisch = self.stoch_modell.QAA_symbolisch(Jacobimatrix_symbolisch_liste_beobachtungsvektor)
# Symbolische Unbekanntenvektor aufstellen
unbekanntenvektor_symbolisch = (self.fm.unbekanntenvektor_symbolisch(Jacobimatrix_symbolisch_liste_unbekannte))
# Überprüfung, ob bereits eine Varianzkomponentenschätzung erfolgt ist
liste_varianzkomponentenschaetzung = self.db_zugriff.get_varianzkomponentenschaetzung()
for varianzkomponente in liste_varianzkomponentenschaetzung:
if varianzkomponente[3] != 1:
varianzkomponentenschaetzung_erfolgt = True
else:
varianzkomponentenschaetzung_erfolgt = False
dict_indizes_beobachtungsgruppen = Datumsfestlegung.Datumsfestlegung.indizes_beobachtungsvektor_nach_beobachtungsgruppe(
Jacobimatrix_symbolisch_liste_beobachtungsvektor)
# Klasse FunktionalesModell zurücksetzen
self.fm.substitutionen_dict = self.fm.dict_substitutionen_uebergeordnetes_system()
self.fm.liste_symbole_lambdify = sorted(self.fm.substitutionen_dict.keys(), key=lambda s: str(s))
self.fm.func_A0 = None
self.fm.func_beob0 = None
self.fm.func_u0 = None
# Numerischen Unbekanntenvektor aufstellen
x = self.fm.unbekanntenvektor_numerisch(
Jacobimatrix_symbolisch_liste_unbekannte,
unbekanntenvektor_symbolisch
)
# sp.Symbol in Strings konvertieren
labels_str = [str(s).strip() for s in Jacobimatrix_symbolisch_liste_beobachtungsvektor]
unk_str = [str(s).strip() for s in Jacobimatrix_symbolisch_liste_unbekannte]
# Beschriftungen für CSV-Dateien vorbereiten
mask_lA = np.array([l.startswith("lA_") for l in labels_str])
idx_ll = np.where(~mask_lA)[0]
idx_AA = np.where(mask_lA)[0]
perm = np.concatenate([idx_ll, idx_AA])
labels_perm = [labels_str[i] for i in perm]
idx_xyz = [i for i, s in enumerate(unk_str) if s.startswith(("X", "Y", "Z"))]
# Numerischen Beobachtungsvektor einmalig aufstellen
beobachtungsvektor_numerisch = np.asarray(
self.fm.beobachtungsvektor_numerisch(Jacobimatrix_symbolisch_liste_beobachtungsvektor),
float
).reshape(-1, 1)
# Beobachtungsvektor mit den Anschlusspunkten für die weiche Lagerung aufstellen
lA_prior = beobachtungsvektor_numerisch[mask_lA].copy() if datumfestlegung == "weiche Lagerung" else None
if datumfestlegung == "weiche Lagerung":
beobachtungsvektor_numerisch[mask_lA] = lA_prior
# Numerische QLL-MAtrix einmalig aufstellen
Qll_matrix_numerisch = self.stoch_modell.Qll_numerisch(
Qll_matrix_symbolisch,
Jacobimatrix_symbolisch_liste_beobachtungsvektor
)
# Einmalig die numerischen Qll- und QAA-Matrizen zu einer gemeinsamen Matrix verbinden und P-Matrix aufbauen
if datumfestlegung == "weiche Lagerung":
QAA_matrix_numerisch = self.stoch_modell.QAA_numerisch(
QAA_matrix_symbolisch,
Jacobimatrix_symbolisch_liste_beobachtungsvektor
)
Q_ext, P = Datumsfestlegung.Datumsfestlegung.weiches_datum(Qll_matrix_numerisch, QAA_matrix_numerisch,
varianzkomponentenschaetzung_erfolgt,
dict_indizes_beobachtungsgruppen)
# Abbruchbedingungen für die Iteration definieren
tol_dx_abs = 1e-3 # 1.0 mm
tol_stag = 1e-10 # Änderung in max|dx_xyz|
stag_max = 3
prev_max_dx_xyz = None
stag_count = 0
max_iter = 20 # Maximale Anzahl Iterationen
# Iterationen ausführen
for iteration in range(max_iter):
print("\n" + "=" * 60)
print(f"ITERATION {iteration}")
print(f" -> berechne A(x_{iteration})")
# Numerische Jacobi-Matrix bei Iteration 0 mit den Datensätzen aus der Datenbank aufbauen. In jeder weiteren Iteration aus den Ergebnissen der vorherigen Iteration.
A_matrix_numerisch = self.fm.jacobi_matrix_numerisch(
Jacobimatrix_symbolisch,
"naeherung_us",
Jacobimatrix_symbolisch_liste_unbekannte,
Jacobimatrix_symbolisch_liste_beobachtungsvektor,
iteration
)
A_matrix_numerisch = np.asarray(A_matrix_numerisch, float)
if datumfestlegung == "weiche Lagerung":
beobachtungsvektor_numerisch[mask_lA] = lA_prior
# Numerischen Beobachtungsvektor aus Näherungswerten (l0) aufbauen.
print(f" -> berechne l0(x_{iteration})")
beobachtungsvektor_naeherung_numerisch_iteration0 = self.fm.beobachtungsvektor_naeherung_numerisch(
Jacobimatrix_symbolisch_liste_beobachtungsvektor,
beobachtungsvektor_naeherung_symbolisch,
iteration
)
# Exportieren von Matrizen und Vektoren in CSV-Dateien
if iteration == 0:
if speichern_in_csv == True:
Exporter.matrix_to_csv(
fr"Zwischenergebnisse\_l.csv",
[""],
labels_str,
beobachtungsvektor_numerisch,
"l"
)
if datumfestlegung == "weiche Lagerung":
Exporter.matrix_to_csv(
fr"Zwischenergebnisse\{iteration}_Q_ext.csv",
labels_perm,
labels_perm,
Q_ext,
"Q_ext"
)
Exporter.matrix_to_csv(
fr"Zwischenergebnisse\{iteration}_P.csv",
labels_perm,
labels_perm,
P,
"P"
)
# Q-Matrix für weitere Berechnung auswählen
if datumfestlegung == "weiche Lagerung":
Q_k = Q_ext
else:
Q_k = Qll_matrix_numerisch
# dl = l - l0 Vektor berechnen
dl_k = self.fm.berechnung_dl(
beobachtungsvektor_numerisch,
beobachtungsvektor_naeherung_numerisch_iteration0,
Jacobimatrix_symbolisch_liste_beobachtungsvektor,
iteration
)
dl_k = np.asarray(dl_k, float)
# Parameterschätzung für die aktuelle Iteration berechnen
ausgabe_parameterschaetzung, dx = Parameterschaetzung.ausgleichung_global(A_matrix_numerisch, dl_k, Q_k, P)
dx = np.asarray(dx, float).reshape(-1, 1)
# Matrizen je Iteration als CSV-Datei speichern
if speichern_in_csv == True:
Exporter.matrix_to_csv(fr"Zwischenergebnisse\{iteration}_Qxx.csv",
[""], Jacobimatrix_symbolisch_liste_unbekannte, ausgabe_parameterschaetzung["Q_xx"], "Qxx")
Exporter.matrix_to_csv(fr"Zwischenergebnisse\{iteration}_dl.csv",
[""], labels_str, dl_k, "dl")
Exporter.matrix_to_csv(fr"Zwischenergebnisse\{iteration}_dx.csv",
[""], Jacobimatrix_symbolisch_liste_unbekannte, dx, "dx")
# Unbekanntenvektor als Ergebnis der aktuellen Iteration berechnen
x = self.fm.unbekanntenvektor_numerisch(
Jacobimatrix_symbolisch_liste_unbekannte,
unbekanntenvektor_symbolisch,
dX_Vektor=dx,
unbekanntenvektor_numerisch_vorherige_Iteration=x,
iterationsnummer=iteration
)
# Abbruchbedingungen überprüfen und ausgeben
max_dx_xyz = float(np.max(np.abs(dx[idx_xyz])))
print(" -> max|dx_xyz| [m] =", max_dx_xyz)
tol_dx = tol_dx_abs
if prev_max_dx_xyz is not None and abs(prev_max_dx_xyz - max_dx_xyz) < tol_stag:
stag_count += 1
else:
stag_count = 0
prev_max_dx_xyz = max_dx_xyz
if max_dx_xyz < tol_dx:
print("\nKONVERGENZ ERREICHT (XYZ)")
break
if stag_count >= stag_max:
print("\nABBRUCH: STAGNATION (XYZ)")
break
return A_matrix_numerisch, Jacobimatrix_symbolisch_liste_unbekannte, Jacobimatrix_symbolisch_liste_beobachtungsvektor, x, dx, dl_k, ausgabe_parameterschaetzung