from typing import Any import sympy as sp from sympy.algebras.quaternion import Quaternion import Datenbank from itertools import combinations from pathlib import Path import shutil from pyproj import CRS, Transformer, datadir import numpy as np from pyproj.exceptions import ProjError from pyproj import CRS, Transformer class Transformationen: """Koordinatentransformationen und Helmert-Transformation (Euler-Winkel) zwischen Referenzsystemen. Die Klasse stellt Methoden zur Verfügung für: - Aufbau einer Rotationsmatrix aus Eulerwinkeln, - Schätzung von 7-Parameter-Transformationsparametern (dX, dY, dZ, Maßstab m, Eulerwinkel e1/e2/e3) aus identischen Punkten zwischen lokalem Horizontsystem (LH) und geozentrisch-kartesischem System (ECEF), - Anwendung der geschätzten Helmerttransformation auf Punkte, die nur im Ausgangssystem vorliegen, - Transformation zwischen ETRS89 / UTM (+ DHHN2016 Normalhöhe) und ETRS89 geozentrisch-kartesisch (ECEF), inkl. Nutzung einer BKG-Quasigeoidundulations-Datei (GeoTIFF) für PROJ. Die grundlegende Funktionsweise der Transformationsschätzung lautet: 1) Identische Punkte aus Ausgangs- und Zielsystem ermitteln. 2) Näherung für Maßstab m0 aus mittleren Streckenverhältnissen bilden. 3) Näherungs-Rotation R0 aus lokalen Basen (u/v/w und U/V/W) bestimmen und daraus Euler-Näherungen ableiten. 4) Iterative Parameterschätzung (Gauss-Newton) auf Basis der Beobachtungsgleichung: P = T + m * R(e1,e2,e3) * p """ def __init__(self, pfad_datenbank: str) -> None: """Initialisiert die Transformationsklasse. Speichert den Pfad zur SQLite-Datenbank und initialisiert den Datenbankzugriff. :param pfad_datenbank: Pfad zur SQLite-Datenbank. :type pfad_datenbank: str :return: None :rtype: None """ self.pfad_datenbank = pfad_datenbank self.db_zugriff = Datenbank.Datenbankzugriff(self.pfad_datenbank) @staticmethod def R_matrix_aus_eulerwinkeln(e1: float, e2: float, e3: float) -> sp.Matrix: """Erstellt eine 3x3-Rotationsmatrix aus Eulerwinkeln. Die Rotationsmatrix wird symbolisch (SymPy) aufgebaut. Die Eulerwinkel e1, e2, e3 werden direkt in trigonometrische Ausdrücke eingesetzt und eine orthogonale Rotationsmatrix R(e1,e2,e3) zur Verwendung in Helmert-Transformationen zurückgegeben. :param e1: Eulerwinkel 1 (Radiant). :type e1: float :param e2: Eulerwinkel 2 (Radiant). :type e2: float :param e3: Eulerwinkel 3 (Radiant). :type e3: float :return: Rotationsmatrix R als SymPy-Matrix (3x3). :rtype: sp.Matrix """ return sp.Matrix([ [ sp.cos(e2) * sp.cos(e3), sp.cos(e1) * sp.sin(e3) + sp.sin(e1) * sp.sin(e2) * sp.cos(e3), sp.sin(e1) * sp.sin(e3) - sp.cos(e1) * sp.sin(e2) * sp.cos(e3) ], [ -sp.cos(e2) * sp.sin(e3), sp.cos(e1) * sp.cos(e3) - sp.sin(e1) * sp.sin(e2) * sp.sin(e3), sp.sin(e1) * sp.cos(e3) + sp.cos(e1) * sp.sin(e2) * sp.sin(e3) ], [ sp.sin(e2), -sp.sin(e1) * sp.cos(e2), sp.cos(e1) * sp.cos(e2) ] ]) def Helmerttransformation_Euler_Transformationsparameter_berechnen(self) -> dict[Any, float]: """Schätzt die Helmert-Transformationsparameter aus identischen Punkten. Aus der Datenbank werden Näherungskoordinaten des lokalen Horizontsystems (naeherung_lh) und des geozentrisch-kartesischen Systems (naeherung_us) geladen. Für die Schnittmenge der Punkte werden die 7 Helmertparameter geschätzt: - Translation (dX, dY, dZ), - Maßstab m, - Eulerwinkel (e1, e2, e3). Näherungen: - m0: Mittelwert der Streckenverhältnisse aus allen Punktpaaren, - R0: Anfangsrotationsmatrix aus lokalen Basisvektoren (u/v/w und U/V/W), - Translation0: aus Schwerpunkten und m0/R0. Die Parameterschätzung erfolgt iterativ mit P = I. Abbruchkriterium: |dx_i| < schwellenwert in zwei aufeinanderfolgenden Iterationen oder max. 100 Iterationen. :param: None :return: Dictionary der finalen Parameter mit SymPy-Symbolen als Keys und float-Werten als Values (Keys: dX, dY, dZ, m, e1, e2, e3). :rtype: dict[Any, float] """ # Koordinaten des lokalen Horizontsystems des Tachymeters und der geozentrisch Kartesischen Näherungskoordinaten aus den statischen GNSS-Messungen aus der Tabelle Netzpunkte abfragen dict_ausgangssystem = self.db_zugriff.get_koordinaten("naeherung_lh", "Dict") dict_zielsystem = self.db_zugriff.get_koordinaten("naeherung_us", "Dict") # Identische Punkte ermitteln gemeinsame_punktnummern = sorted(set(dict_ausgangssystem.keys()) & set(dict_zielsystem.keys())) anzahl_gemeinsame_punkte = len(gemeinsame_punktnummern) liste_punkte_ausgangssystem = [dict_ausgangssystem[i] for i in gemeinsame_punktnummern] liste_punkte_zielsystem = [dict_zielsystem[i] for i in gemeinsame_punktnummern] print("Anzahl verwendete Punkte für die Helmerttransformation:", anzahl_gemeinsame_punkte) p1, p2, p3 = liste_punkte_ausgangssystem[0], liste_punkte_ausgangssystem[1], liste_punkte_ausgangssystem[2] P1, P2, P3 = liste_punkte_zielsystem[0], liste_punkte_zielsystem[1], liste_punkte_zielsystem[2] # Näherungswert für dem Maßstab berechnen aus dem Mittelwert aller Punktpaare ratios = [] for i, j in combinations(range(anzahl_gemeinsame_punkte), 2): dp = (liste_punkte_ausgangssystem[j] - liste_punkte_ausgangssystem[i]).norm() dP = (liste_punkte_zielsystem[j] - liste_punkte_zielsystem[i]).norm() dp_f = float(dp) if dp_f > 0: ratios.append(float(dP) / dp_f) m0 = sum(ratios) / len(ratios) # Näherungswert für die Translation berechnen U = (P2 - P1) / (P2 - P1).norm() W = (U.cross(P3 - P1)) / (U.cross(P3 - P1)).norm() V = W.cross(U) u = (p2 - p1) / (p2 - p1).norm() w = (u.cross(p3 - p1)) / (u.cross(p3 - p1)).norm() v = w.cross(u) R0 = sp.Matrix.hstack(U, V, W) * sp.Matrix.hstack(u, v, w).T XS = sum(liste_punkte_zielsystem, sp.Matrix([0, 0, 0])) / anzahl_gemeinsame_punkte xS = sum(liste_punkte_ausgangssystem, sp.Matrix([0, 0, 0])) / anzahl_gemeinsame_punkte Translation0 = XS - m0 * R0 * xS # Euler-Näherungswerte aus der Anfangsrotationsmatrix e2_0 = sp.asin(R0[2, 0]) cos_e2_0 = sp.cos(e2_0) e1_0 = sp.acos(R0[2, 2] / cos_e2_0) e3_0 = sp.acos(R0[0, 0] / cos_e2_0) # Symbolische Unbekannte dX, dY, dZ, m, e1, e2, e3 = sp.symbols('dX dY dZ m e1 e2 e3') R_symbolisch = self.R_matrix_aus_eulerwinkeln(e1, e2, e3) # Funktionales Modell f_zeilen = [] for punkt in liste_punkte_ausgangssystem: punkt_vektor = sp.Matrix([punkt[0], punkt[1], punkt[2]]) f_zeile_i = sp.Matrix([dX, dY, dZ]) + m * R_symbolisch * punkt_vektor f_zeilen.extend(list(f_zeile_i)) f_matrix = sp.Matrix(f_zeilen) f = f_matrix A_ohne_zahlen = f_matrix.jacobian([dX, dY, dZ, m, e1, e2, e3]) # Parameterschätzung schwellenwert = 1e-4 anzahl_iterationen = 0 alle_kleiner_vorherige_iteration = False l_vektor = sp.Matrix([koord for P in liste_punkte_zielsystem for koord in P]) l = l_vektor P_matrix = sp.eye(3 * anzahl_gemeinsame_punkte) l_berechnet_0 = None while True: if anzahl_iterationen == 0: zahlen_0 = { dX: float(Translation0[0]), dY: float(Translation0[1]), dZ: float(Translation0[2]), m: float(m0), e1: float(e1_0), e2: float(e2_0), e3: float(e3_0) } x0 = sp.Matrix([zahlen_0[dX], zahlen_0[dY], zahlen_0[dZ], zahlen_0[m], zahlen_0[e1], zahlen_0[e2], zahlen_0[e3]]) l_berechnet_0 = f.subs(zahlen_0).evalf(n=30) dl_0 = l_vektor - l_berechnet_0 A_0 = A_ohne_zahlen.subs(zahlen_0).evalf(n=30) N = A_0.T * P_matrix * A_0 n_0 = A_0.T * P_matrix * dl_0 Qxx_0 = N.inv() dx = Qxx_0 * n_0 x = x0 + dx x = sp.N(x, 30) anzahl_iterationen += 1 print(f"Iteration Nr.{anzahl_iterationen} abgeschlossen") else: zahlen_i = { dX: float(x[0]), dY: float(x[1]), dZ: float(x[2]), m: float(x[3]), e1: float(x[4]), e2: float(x[5]), e3: float(x[6]) } l_berechnet_i = f.subs(zahlen_i).evalf(n=30) dl_i = l_vektor - l_berechnet_i A_i = A_ohne_zahlen.subs(zahlen_i).evalf(n=30) N_i = A_i.T * P_matrix * A_i Qxx_i = N_i.inv() n_i = A_i.T * P_matrix * dl_i dx = Qxx_i * n_i x = sp.Matrix(x + dx) anzahl_iterationen += 1 print(f"Iteration Nr.{anzahl_iterationen} abgeschlossen") alle_kleiner = True for i in range(dx.rows): wert = float(dx[i]) if abs(wert) > schwellenwert: alle_kleiner = False if (alle_kleiner and alle_kleiner_vorherige_iteration) or anzahl_iterationen == 100: break alle_kleiner_vorherige_iteration = alle_kleiner # Neuberechnung Zielsystem zahlen_final = { dX: float(x[0]), dY: float(x[1]), dZ: float(x[2]), m: float(x[3]), e1: float(x[4]), e2: float(x[5]), e3: float(x[6]) } l_berechnet_final = f.subs(zahlen_final).evalf(n=30) liste_l_berechnet_final = [] for i in range(anzahl_gemeinsame_punkte): Xi = l_berechnet_final[3 * i + 0] Yi = l_berechnet_final[3 * i + 1] Zi = l_berechnet_final[3 * i + 2] liste_l_berechnet_final.append(sp.Matrix([Xi, Yi, Zi])) print("Koordinaten berechnet aus Helmerttransformation:") for punktnummer, l_fin in zip(gemeinsame_punktnummern, liste_l_berechnet_final): print(f"{punktnummer}: {float(l_fin[0]):.3f}, {float(l_fin[1]):.3f}, {float(l_fin[2]):.3f}") print("Streckendifferenzen zwischen Näherungskoordinate aus statischer GNSS-Messung und ergebnis der Helmerttransformation:") streckendifferenzen = [ (punkt_zielsys - l_final).norm() for punkt_zielsys, l_final in zip(liste_punkte_zielsystem, liste_l_berechnet_final) ] print([round(float(s), 6) for s in streckendifferenzen]) Schwerpunkt_Zielsystem = sum(liste_punkte_zielsystem, sp.Matrix([0, 0, 0])) / anzahl_gemeinsame_punkte Schwerpunkt_berechnet = sum(liste_l_berechnet_final, sp.Matrix([0, 0, 0])) / anzahl_gemeinsame_punkte Schwerpunktsdifferenz = Schwerpunkt_Zielsystem - Schwerpunkt_berechnet print("\nDifferenz Schwerpunkt zwischen Näherungskoordinate aus statischer GNSS-Messung und ergebnis der Helmerttransformation::") print(Schwerpunktsdifferenz.evalf(3)) print("Betrag der Schwerpunkt-Differenz zwischen Näherungskoordinate aus statischer GNSS-Messung und ergebnis der Helmerttransformation::") print(f"{float(Schwerpunktsdifferenz.norm()):.3f}m") return zahlen_final def Helmerttransformation(self, transformationsparameter: dict) -> dict[Any, Any]: """Wendet eine Helmerttransformation auf Punkte des Ausgangssystems an. Aus der Datenbank werden Koordinaten des Ausgangssystems (naeherung_lh) und Zielsystems (naeherung_us) geladen. Transformiert werden genau die Punkte, die: - im Ausgangssystem vorhanden sind, - im Zielsystem fehlen (symmetrische Differenz der Punktmengen, anschließend Filter auf Ausgangssystem). Die Transformation erfolgt gemäß: P = [dX, dY, dZ]^T + m * R(e1,e2,e3) * p :param transformationsparameter: Transformationsparameter als Dictionary mit SymPy-Symbolen als Keys (dX, dY, dZ, m, e1, e2, e3) und numerischen Werten als Values. :type transformationsparameter: dict :return: Dictionary {punktnummer: sp.Matrix([X, Y, Z])} der transformierten geozentrisch-kartesischen Koordinaten. :rtype: dict[Any, Any] """ # Koordinaten des lokalen Horizontsystems des Tachymeters und der geozentrisch Kartesischen Näherungskoordinaten aus den statischen GNSS-Messungen aus der Tabelle Netzpunkte abfragen dict_ausgangssystem = self.db_zugriff.get_koordinaten("naeherung_lh", "Dict") dict_zielsystem = self.db_zugriff.get_koordinaten("naeherung_us", "Dict") # Symbole definieren dX, dY, dZ, m, e1, e2, e3 = sp.symbols('dX dY dZ m e1 e2 e3') # Unterschiedliche Punkte zwischen Ausgangs- und Zielsystem ermitteln unterschiedliche_punktnummern = sorted(set(dict_ausgangssystem.keys()) ^ set(dict_zielsystem.keys())) punktnummern_transformieren = [ punktnummer for punktnummer in unterschiedliche_punktnummern if punktnummer in dict_ausgangssystem ] liste_punkte_ausgangssystem = [dict_ausgangssystem[punktnummer] for punktnummer in punktnummern_transformieren] # Rotationsmatrix aufstellen R = self.R_matrix_aus_eulerwinkeln(transformationsparameter[e1], transformationsparameter[e2], transformationsparameter[e3]) f_zeilen = [] # Helmertransformation durchführen und Koordinaten speichern for punkt in liste_punkte_ausgangssystem: punkt_vektor = sp.Matrix([punkt[0], punkt[1], punkt[2]]) f_zeile_i = sp.Matrix([transformationsparameter[dX], transformationsparameter[dY], transformationsparameter[dZ]]) + transformationsparameter[m] * R * punkt_vektor f_zeilen.extend(list(f_zeile_i)) f_matrix = sp.Matrix(f_zeilen) dict_transformiert = {} for i, pn in enumerate(punktnummern_transformieren): Xi = f_matrix[3 * i + 0] Yi = f_matrix[3 * i + 1] Zi = f_matrix[3 * i + 2] dict_transformiert[str(pn)] = sp.Matrix([ [float(Xi)], [float(Yi)], [float(Zi)] ]) return dict_transformiert def utm_to_XYZ(self, pfad_tif_quasigeoidundulation: str, liste_utm: list) -> dict[Any, Any]: """Rechnet UTM-Koordinaten (ETRS89 / UTM + DHHN2016) in ECEF-Koordinaten (ETRS89 geozentrisch-kartesisch) um. Es wird ein PROJ-Transformer von: - Quelle: EPSG:25832 + EPSG:7837 (ETRS89 / UTM Zone 32N + DHHN2016 Normalhöhe), - Ziel: EPSG:4936 (ETRS89 geozentrisch-kartesisch) initialisiert. Zusätzlich wird ein BKG-GeoTIFF (Quasigeoidunndulation) in den PROJ-Datenpfad eingebunden, indem eine Kopie mit dem erwarteten Dateinamen "de_bkg_gcg2016.tif" im selben Ordner erzeugt wird. :param pfad_tif_quasigeoidundulation: Pfad zur BKG-GeoTIFF-Datei (Quasigeoidundulation). :type pfad_tif_quasigeoidundulation: str :param liste_utm: Liste von UTM-Koordinaten in der Form [(punktnummer, E, N, Normalhoehe), ...]. :type liste_utm: list :return: Dictionary {punktnummer: sp.Matrix([X, Y, Z])} mit ECEF-Koordinaten (Meter). :rtype: dict[Any, Any] """ # tif vom BKG zur Quasigeoidundulation übergeben pfad_gcg_tif = Path(pfad_tif_quasigeoidundulation) pfad_gcg_tif_proj = pfad_gcg_tif.with_name("de_bkg_gcg2016.tif") # Kopie des TIF anlegen (Dies ist voraussetzung für die Transformer-Bibliothek if (not pfad_gcg_tif_proj.exists()) or (pfad_gcg_tif_proj.stat().st_size != pfad_gcg_tif.stat().st_size): shutil.copy2(pfad_gcg_tif, pfad_gcg_tif_proj) datadir.append_data_dir(str(pfad_gcg_tif.parent)) utm_epsg = 25832 crs_src = CRS.from_user_input(f"EPSG:{utm_epsg}+EPSG:7837") # ETRS89/DREF91 + DHHN2016 crs_dst = CRS.from_epsg(4936) # ETRS89 geozentrisch kartesisch # Umrechnungsvorgaben übergeben tr_best = Transformer.from_crs( crs_src, crs_dst, always_xy=True, allow_ballpark=False, ) # Koordinaten rechnen und in Dictionary speichern dict_geozentrisch_kartesisch = {} for Punktnummer, E, N, Normalhoehe in liste_utm: X, Y, Z = tr_best.transform(E, N, Normalhoehe) dict_geozentrisch_kartesisch[Punktnummer] = sp.Matrix([X, Y, Z]) return dict_geozentrisch_kartesisch def ecef_to_utm( self, dict_koordinaten: dict, pfad_gcg_tif: str | Path | None = None): """Rechnet ECEF-Koordinaten (ETRS89 geozentrisch-kartesisch) nach nach UTM (+ DHHN2016 Normalhöhe). Es wird ein PROJ-Transformer von: - Quelle: EPSG:4936 (ETRS89 geozentrisch-kartesisch), - Ziel: EPSG:25832 + EPSG:7837 (ETRS89 / UTM Zone 32N + DHHN2016 Normalhöhe) initialisiert. Zusätzlich wird die BKG-GeoTIFF-Datei (Quasigeoidundulation) als PROJ-Grid eingebunden, indem eine Kopie mit dem erwarteten Namen "de_bkg_gcg2016.tif" im selben Ordner erzeugt wird. Die Methode akzeptiert Koordinatenwerte in verschiedenen Formen (SymPy-Matrix, numpy.ndarray, Liste/Tuple, Skalar) und extrahiert daraus drei Werte (X, Y, Z). Die Ergebnisse (E, N, H) werden auf 8 Nachkommastellen gerundet. :param dict_koordinaten: Dictionary {punktnummer: koordinate}, wobei koordinate X/Y/Z enthält. :type dict_koordinaten: dict :param pfad_gcg_tif: Pfad zur BKG-GeoTIFF-Datei (Quasigeoidundulation) als str. :type pfad_gcg_tif: str | Path | None :return: Dictionary {punktnummer: (E, N, H)} mit UTM-Koordinaten (Meter) und Normalhöhe. :rtype: dict """ # Kopie des TIF vom BKG mit der Quasigeoidundulation erstellen pfad_gcg_tif = Path(pfad_gcg_tif).resolve() pfad_proj_grid = pfad_gcg_tif.with_name("de_bkg_gcg2016.tif") if ( not pfad_proj_grid.exists() or pfad_proj_grid.stat().st_size != pfad_gcg_tif.stat().st_size ): shutil.copy2(pfad_gcg_tif, pfad_proj_grid) datadir.append_data_dir(str(pfad_proj_grid.parent)) # EPSG-Codes feslegen crs_src = CRS.from_epsg(4936) # ETRS89 geozentrisch-kartesisch # Ziel-CRS: ETRS89 / UTM Zone 32/33 + DHHN2016 Normalhöhe utm_epsg = 25832 crs_dst = CRS.from_user_input(f"EPSG:{utm_epsg}+EPSG:7837") tr = Transformer.from_crs( crs_src, crs_dst, always_xy=True, allow_ballpark=False, ) tr_geo = Transformer.from_crs(CRS.from_epsg(4936), CRS.from_epsg(4979), always_xy=True) # Koordinaten an Dictionary übergeben dict_koordinaten_utm = {} for punktnummer, koordinate in dict_koordinaten.items(): werte = [] queue = [koordinate] while queue and len(werte) < 3: v = queue.pop(0) # Sympy Matrix if isinstance(v, sp.Matrix): if v.rows * v.cols == 1: queue.insert(0, v[0]) else: queue = list(np.array(v.tolist(), dtype=object).reshape(-1)) + queue continue # numpy array if isinstance(v, np.ndarray): if v.size == 1: queue.insert(0, v.reshape(-1)[0]) else: queue = list(v.reshape(-1)) + queue continue # Liste / Tuple if isinstance(v, (list, tuple)): if len(v) == 1: queue.insert(0, v[0]) else: queue = list(v) + queue continue # Skalar werte.append(float(v)) X, Y, Z = werte[0], werte[1], werte[2] E, N, H = tr.transform(X, Y, Z, errcheck=True) # Runden, weil ansonsten aufgrund begrenzter Rechenkapazität falsche Werte Resultieren dict_koordinaten_utm[punktnummer] = (round(E, 8), round(N, 8), round(H, 8)) return dict_koordinaten_utm