from Stochastisches_Modell import StochastischesModell from Datumsfestlegung import Datumsfestlegung import Datenbank from Export import Export as Exporter import Stochastisches_Modell import Funktionales_Modell import Datumsfestlegung import numpy as np def ausgleichung_global(A, dl, Q_ext, P): """ Führt eine Ausgleichung nach kleinsten Quadraten durch. Aus der Designmatrix A, dem Verbesserungsvektor dl und der Gewichtsmatrix P wird das Normalgleichungssystem aufgestellt und gelöst. Anschließend werden Residuen sowie Kofaktor-Matrizen der Unbekannten und Beobachtungen berechnet. Es werden folgende Berechnungsschitte durchgeführt: 1) Normalgleichungsmatrix N = Aᵀ · P · A und Absolutglied n = Aᵀ · P · dl 2) Lösung dx = N⁻¹ · n 3) Residuen v = A · dx − dl 4) Kofaktormatrix der Unbekannten Q_xx 5) Kofaktormatrix der ausgeglichenen Beobachtungen Q_ll_dach 6) Kofaktormatrix der Verbesserungen Q_vv :param A: Jacobi-Matrix (A-Matrix). :type A: array_like :param dl: Verbesserungsvektor bzw. Beobachtungsabweichungen. :type dl: array_like :param Q_ext: a-priori Kofaktormatrix der Beobachtungen. :type Q_ext: array_like :param P: Gewichtsmatrix der Beobachtungen. :type P: array_like :return: Dictionary mit Ausgleichungsergebnissen, Zuschlagsvektor dx. :rtype: tuple[dict[str, Any], numpy.ndarray] :raises numpy.linalg.LinAlgError: Wenn das Normalgleichungssystem singulär ist und nicht gelöst werden kann. """ A=np.asarray(A, float) dl = np.asarray(dl, float).reshape(-1, 1) Q_ext = np.asarray(Q_ext, float) P = np.asarray(P, float) # 1) Gewichtsmatrix P #P = Q_ext # 2) Normalgleichungsmatrix N und Absolutgliedvektor n N = A.T @ P @ A n = A.T @ P @ dl # 3) Zuschlagsvektor dx dx = np.linalg.solve(N, n) # 4) Residuenvektor v v = A @ dx - dl # 5) Kofaktormatrix der Unbekannten Q_xx Q_xx = StochastischesModell.berechne_Q_xx(N) # 6) Kofaktormatrix der Beobachtungen Q_ll_dach Q_ll_dach = StochastischesModell.berechne_Q_ll_dach(A, Q_xx) # 7) Kofaktormatrix der Verbesserungen Q_vv Q_vv = StochastischesModell.berechne_Qvv(Q_ext, Q_ll_dach) # 8) Ausgabe dict_ausgleichung = { "dx": dx, "v": v, "P": P, "N": N, "Q_xx": Q_xx, "Q_ll_dach": Q_ll_dach, "Q_vv": Q_vv, "Q_ext": Q_ext, } return dict_ausgleichung, dx def ausgleichung_lokal(A, dl, Q_ll): A = np.asarray(A, dtype=float) dl = np.asarray(dl, dtype=float).reshape(-1, 1) Q_ll = np.asarray(Q_ll, dtype=float) # 1) Gewichtsmatrix P = np.linalg.inv(Q_ll) # 2) Normalgleichungen N = A.T @ P @ A n = A.T @ P @ dl # 3) Datumsfestlegung G = Datumsfestlegung.build_G_from_names(x0, Jacobimatrix_symbolisch_liste_unbekannte, liste_punktnummern, mit_massstab=True) u = A.shape[1] aktive = Datumsfestlegung.aktive_indices_from_selection(auswahl, Jacobimatrix_symbolisch_liste_unbekannte) E = Datumsfestlegung.auswahlmatrix_E(u, aktive) Gi = E @ G # 3) Zuschlagsvektor dx dx, k = Datumsfestlegung.berechne_dx_geraendert(N, n, Gi) # 5) Residuen v = dl - A @ dx # 5) Kofaktormatrix der Unbekannten Q_xx Q_xx = StochastischesModell.berechne_Q_xx(N) # 6) Kofaktormatrix der Beobachtungen Q_ll_dach Q_ll_dach = StochastischesModell.berechne_Q_ll_dach(A, Q_xx) # 7) Kofaktormatrix der Verbesserungen Q_vv Q_vv = StochastischesModell.berechne_Qvv(Q_ll, Q_ll_dach) # 8) Ausgabe dict_ausgleichung = { "dx": dx, "v": v, "P": P, "N": N, "Q_xx": Q_xx, "Q_ll_dach": Q_ll_dach, "Q_vv": Q_vv, "Q_ll": Q_ll, } return dict_ausgleichung, dx class Iterationen: """Iterative Ausgleichung auf Basis des funktionalen und stochastischen Modells. Die Klasse führt eine iterative Parameterschätzung nach dem Gauss-Markov-Modell durch. Zudem stellt Sie eine Methode zur Verfügung für: - einmaligen Aufbau der symbolischen Matrizen/Vektoren (Jacobi-Matrix, Näherungs-Beobachtungsvektor, Qll, QAA, Unbekanntenvektor), - numerische Auswertung je Iteration (A(x_k), l0(x_k), dl_k) und Berechnung des Vektors dx, - Fortschreibung des Unbekanntenvektors x_{k+1} = x_k + dx, - Abbruchprüfung über Konvergenzkriterium (max|dx_xyz|) und Stagnation, - optionaler Export von Zwischenergebnisse als CSV in den Ordner Zwischenergebnisse. Die Iteration verwendet bei weicher Lagerung eine erweiterte Kovarianzmatrix (Q_ext) inklusive Anschlusspunkten (lA_*), aus der die Gewichtsmatrix P abgeleitet wird. """ def __init__(self, pfad_datenbank: str, pfad_tif_quasigeoidundolation: str, a: float, b: float) -> None: """Initialisiert die Iterationssteuerung. Speichert Datenbankpfade und Ellipsoidparameter, initialisiert das funktionale Modell sowie das stochastische Modell und legt den Datenbankzugriff an. :param pfad_datenbank: Pfad zur SQLite-Datenbank. :type pfad_datenbank: str :param pfad_tif_quasigeoidundolation: Pfad zum GeoTIFF der Quasigeoidundulation (BKG), benötigt für UTM/Normalhöhen in Transformationen. :type pfad_tif_quasigeoidundolation: str :param a: Große Halbachse a des Referenzellipsoids in Meter. :type a: float :param b: Kleine Halbachse b des Referenzellipsoids in Meter. :type b: float :return: None :rtype: None """ self.pfad_datenbank = pfad_datenbank self.pfad_tif_quasigeoidundolation = pfad_tif_quasigeoidundolation self.a = a self.b = b self.stoch_modell = Stochastisches_Modell.StochastischesModell(pfad_datenbank) self.db_zugriff = Datenbank.Datenbankzugriff(pfad_datenbank) self.fm = Funktionales_Modell.FunktionalesModell(self.pfad_datenbank, self.a, self.b, self.pfad_tif_quasigeoidundolation) def iterationen(self, datumfestlegung: str, speichern_in_csv: bool) -> tuple[np.ndarray, list, list, np.ndarray, np.ndarray, np.ndarray, dict]: """Führt die iterative Ausgleichung (Parameterschätzung) aus und gibt die Ergebnisse zurück. Ablauf (vereinfacht): 1) Symbolische Initialisierung (einmalig): - Jacobi-Matrix_symbolisch inkl. Unbekanntenliste und Beobachtungskennungen, - l0_symbolisch (Näherungs-Beobachtungsvektor), - Qll_symbolisch (Beobachtungen) und ggf. QAA_symbolisch (Anschlusspunkte lA_*), - Unbekanntenvektor_symbolisch. 2) Numerische Initialisierung (einmalig): - numerischer Beobachtungsvektor l aus Datenbank/Substitutionen, - Qll_numerisch (und ggf. QAA_numerisch) sowie Aufbau von Q_ext und P bei weicher Lagerung. 3) Iterationsschleife (k = 0..max_iter-1): - Jacobi-Matrix(x_k) numerisch aus Jacobi-Matrix_symbolisch, - l0(x_k) numerisch aus l0_symbolisch, - dl_k = l − l0(x_k) (inkl. Normierung der Richtungen), - Parameterschätzung -> dx_k, - Fortschreibung x_{k+1} = x_k + dx_k, - Abbruch bei Konvergenz (max|dx_xyz| < tol) oder Stagnation. Optional werden ausgewählte Matrizen/Vektoren als CSV-Datei exportiert. :param datumfestlegung: Art der Datumsfestlegung. In der aktuellen Implementierung wird "weiche Lagerung" erwartet, da Q_ext und P sonst nicht aufgebaut werden. :type datumfestlegung: str :param speichern_in_csv: Steuert den Export von Zwischenergebnissen als CSV-Datei in den Ordner Zwischenergebnisse. :type speichern_in_csv: bool :return: Tupel mit (Jacobi-Matrix_matrix_numerisch, liste_unbekannte, liste_beobachtungen, x, dx, dl, ausgabe_parameterschaetzung). Dabei ist x der numerische Unbekanntenvektor der letzten Iteration, dx die letzte Korrektur, dl der letzte Verbesserungsvektor und ausgabe_parameterschaetzung ein Dictionary der globalen Ausgleichungs-Ausgabe. :rtype: tuple[np.ndarray, list, list, np.ndarray, np.ndarray, np.ndarray, dict] """ # Symbolische Matrizen vor Ausführung der ersten Iteration aufstellen # Symbolische Jacobimatrix aufstellen Jacobimatrix_symbolisch, Jacobimatrix_symbolisch_liste_unbekannte, Jacobimatrix_symbolisch_liste_beobachtungsvektor = self.fm.jacobi_matrix_symbolisch( datumfestlegung, self.db_zugriff.get_datumskoordinate()) # Symbolischen Beobachtungsvektor aus Näherungskoordinaten (l0) aufstellen beobachtungsvektor_naeherung_symbolisch = self.fm.beobachtungsvektor_naeherung_symbolisch( Jacobimatrix_symbolisch_liste_beobachtungsvektor) # Symbolische Qll-Matrix aufstellen Qll_matrix_symbolisch = self.stoch_modell.Qll_symbolisch(Jacobimatrix_symbolisch_liste_beobachtungsvektor) # Symbolische QAA-Matrix mit den Anschlusspunkten für die weiche Lagerung aufstellen, wenn datumsfestlegung = weiche Lagerung ist if datumfestlegung == "weiche Lagerung": QAA_matrix_symbolisch = self.stoch_modell.QAA_symbolisch(Jacobimatrix_symbolisch_liste_beobachtungsvektor) # Symbolische Unbekanntenvektor aufstellen unbekanntenvektor_symbolisch = (self.fm.unbekanntenvektor_symbolisch(Jacobimatrix_symbolisch_liste_unbekannte)) # Überprüfung, ob bereits eine Varianzkomponentenschätzung erfolgt ist liste_varianzkomponentenschaetzung = self.db_zugriff.get_varianzkomponentenschaetzung() for varianzkomponente in liste_varianzkomponentenschaetzung: if varianzkomponente[3] != 1: varianzkomponentenschaetzung_erfolgt = True else: varianzkomponentenschaetzung_erfolgt = False dict_indizes_beobachtungsgruppen = Datumsfestlegung.Datumsfestlegung.indizes_beobachtungsvektor_nach_beobachtungsgruppe( Jacobimatrix_symbolisch_liste_beobachtungsvektor) # Klasse FunktionalesModell zurücksetzen self.fm.substitutionen_dict = self.fm.dict_substitutionen_uebergeordnetes_system() self.fm.liste_symbole_lambdify = sorted(self.fm.substitutionen_dict.keys(), key=lambda s: str(s)) self.fm.func_A0 = None self.fm.func_beob0 = None self.fm.func_u0 = None # Numerischen Unbekanntenvektor aufstellen x = self.fm.unbekanntenvektor_numerisch( Jacobimatrix_symbolisch_liste_unbekannte, unbekanntenvektor_symbolisch ) # sp.Symbol in Strings konvertieren labels_str = [str(s).strip() for s in Jacobimatrix_symbolisch_liste_beobachtungsvektor] unk_str = [str(s).strip() for s in Jacobimatrix_symbolisch_liste_unbekannte] # Beschriftungen für CSV-Dateien vorbereiten mask_lA = np.array([l.startswith("lA_") for l in labels_str]) idx_ll = np.where(~mask_lA)[0] idx_AA = np.where(mask_lA)[0] perm = np.concatenate([idx_ll, idx_AA]) labels_perm = [labels_str[i] for i in perm] idx_xyz = [i for i, s in enumerate(unk_str) if s.startswith(("X", "Y", "Z"))] # Numerischen Beobachtungsvektor einmalig aufstellen beobachtungsvektor_numerisch = np.asarray( self.fm.beobachtungsvektor_numerisch(Jacobimatrix_symbolisch_liste_beobachtungsvektor), float ).reshape(-1, 1) # Beobachtungsvektor mit den Anschlusspunkten für die weiche Lagerung aufstellen lA_prior = beobachtungsvektor_numerisch[mask_lA].copy() if datumfestlegung == "weiche Lagerung" else None if datumfestlegung == "weiche Lagerung": beobachtungsvektor_numerisch[mask_lA] = lA_prior # Numerische QLL-MAtrix einmalig aufstellen Qll_matrix_numerisch = self.stoch_modell.Qll_numerisch( Qll_matrix_symbolisch, Jacobimatrix_symbolisch_liste_beobachtungsvektor ) # Einmalig die numerischen Qll- und QAA-Matrizen zu einer gemeinsamen Matrix verbinden und P-Matrix aufbauen if datumfestlegung == "weiche Lagerung": QAA_matrix_numerisch = self.stoch_modell.QAA_numerisch( QAA_matrix_symbolisch, Jacobimatrix_symbolisch_liste_beobachtungsvektor ) Q_ext, P = Datumsfestlegung.Datumsfestlegung.weiches_datum(Qll_matrix_numerisch, QAA_matrix_numerisch, varianzkomponentenschaetzung_erfolgt, dict_indizes_beobachtungsgruppen) # Abbruchbedingungen für die Iteration definieren tol_dx_abs = 1e-3 # 1.0 mm tol_stag = 1e-10 # Änderung in max|dx_xyz| stag_max = 3 prev_max_dx_xyz = None stag_count = 0 max_iter = 20 # Maximale Anzahl Iterationen # Iterationen ausführen for iteration in range(max_iter): print("\n" + "=" * 60) print(f"ITERATION {iteration}") print(f" -> berechne A(x_{iteration})") # Numerische Jacobi-Matrix bei Iteration 0 mit den Datensätzen aus der Datenbank aufbauen. In jeder weiteren Iteration aus den Ergebnissen der vorherigen Iteration. A_matrix_numerisch = self.fm.jacobi_matrix_numerisch( Jacobimatrix_symbolisch, "naeherung_us", Jacobimatrix_symbolisch_liste_unbekannte, Jacobimatrix_symbolisch_liste_beobachtungsvektor, iteration ) A_matrix_numerisch = np.asarray(A_matrix_numerisch, float) if datumfestlegung == "weiche Lagerung": beobachtungsvektor_numerisch[mask_lA] = lA_prior # Numerischen Beobachtungsvektor aus Näherungswerten (l0) aufbauen. print(f" -> berechne l0(x_{iteration})") beobachtungsvektor_naeherung_numerisch_iteration0 = self.fm.beobachtungsvektor_naeherung_numerisch( Jacobimatrix_symbolisch_liste_beobachtungsvektor, beobachtungsvektor_naeherung_symbolisch, iteration ) # Exportieren von Matrizen und Vektoren in CSV-Dateien if iteration == 0: if speichern_in_csv == True: Exporter.matrix_to_csv( fr"Zwischenergebnisse\_l.csv", [""], labels_str, beobachtungsvektor_numerisch, "l" ) if datumfestlegung == "weiche Lagerung": Exporter.matrix_to_csv( fr"Zwischenergebnisse\{iteration}_Q_ext.csv", labels_perm, labels_perm, Q_ext, "Q_ext" ) Exporter.matrix_to_csv( fr"Zwischenergebnisse\{iteration}_P.csv", labels_perm, labels_perm, P, "P" ) # Q-Matrix für weitere Berechnung auswählen if datumfestlegung == "weiche Lagerung": Q_k = Q_ext else: Q_k = Qll_matrix_numerisch # dl = l - l0 Vektor berechnen dl_k = self.fm.berechnung_dl( beobachtungsvektor_numerisch, beobachtungsvektor_naeherung_numerisch_iteration0, Jacobimatrix_symbolisch_liste_beobachtungsvektor, iteration ) dl_k = np.asarray(dl_k, float) # Parameterschätzung für die aktuelle Iteration berechnen ausgabe_parameterschaetzung, dx = Parameterschaetzung.ausgleichung_global(A_matrix_numerisch, dl_k, Q_k, P) dx = np.asarray(dx, float).reshape(-1, 1) # Matrizen je Iteration als CSV-Datei speichern if speichern_in_csv == True: Exporter.matrix_to_csv(fr"Zwischenergebnisse\{iteration}_Qxx.csv", [""], Jacobimatrix_symbolisch_liste_unbekannte, ausgabe_parameterschaetzung["Q_xx"], "Qxx") Exporter.matrix_to_csv(fr"Zwischenergebnisse\{iteration}_dl.csv", [""], labels_str, dl_k, "dl") Exporter.matrix_to_csv(fr"Zwischenergebnisse\{iteration}_dx.csv", [""], Jacobimatrix_symbolisch_liste_unbekannte, dx, "dx") # Unbekanntenvektor als Ergebnis der aktuellen Iteration berechnen x = self.fm.unbekanntenvektor_numerisch( Jacobimatrix_symbolisch_liste_unbekannte, unbekanntenvektor_symbolisch, dX_Vektor=dx, unbekanntenvektor_numerisch_vorherige_Iteration=x, iterationsnummer=iteration ) # Abbruchbedingungen überprüfen und ausgeben max_dx_xyz = float(np.max(np.abs(dx[idx_xyz]))) print(" -> max|dx_xyz| [m] =", max_dx_xyz) tol_dx = tol_dx_abs if prev_max_dx_xyz is not None and abs(prev_max_dx_xyz - max_dx_xyz) < tol_stag: stag_count += 1 else: stag_count = 0 prev_max_dx_xyz = max_dx_xyz if max_dx_xyz < tol_dx: print("\nKONVERGENZ ERREICHT (XYZ)") break if stag_count >= stag_max: print("\nABBRUCH: STAGNATION (XYZ)") break return A_matrix_numerisch, Jacobimatrix_symbolisch_liste_unbekannte, Jacobimatrix_symbolisch_liste_beobachtungsvektor, x, dx, dl_k, ausgabe_parameterschaetzung