417 lines
16 KiB
Python
417 lines
16 KiB
Python
import sympy as sp
|
|
from sympy.algebras.quaternion import Quaternion
|
|
import Datenbank
|
|
from itertools import combinations
|
|
from pathlib import Path
|
|
import shutil
|
|
from pyproj import CRS, Transformer, datadir
|
|
import numpy as np
|
|
from pyproj.exceptions import ProjError
|
|
from pyproj import CRS, Transformer
|
|
|
|
|
|
class Transformationen:
|
|
def __init__(self, pfad_datenbank):
|
|
self.pfad_datenbank = pfad_datenbank
|
|
|
|
@staticmethod
|
|
def R_matrix_aus_euler(e1, e2, e3):
|
|
return sp.Matrix([
|
|
[
|
|
sp.cos(e2) * sp.cos(e3),
|
|
sp.cos(e1) * sp.sin(e3) + sp.sin(e1) * sp.sin(e2) * sp.cos(e3),
|
|
sp.sin(e1) * sp.sin(e3) - sp.cos(e1) * sp.sin(e2) * sp.cos(e3)
|
|
],
|
|
[
|
|
-sp.cos(e2) * sp.sin(e3),
|
|
sp.cos(e1) * sp.cos(e3) - sp.sin(e1) * sp.sin(e2) * sp.sin(e3),
|
|
sp.sin(e1) * sp.cos(e3) + sp.cos(e1) * sp.sin(e2) * sp.sin(e3)
|
|
],
|
|
[
|
|
sp.sin(e2),
|
|
-sp.sin(e1) * sp.cos(e2),
|
|
sp.cos(e1) * sp.cos(e2)
|
|
]
|
|
])
|
|
|
|
def Helmerttransformation_Euler_Transformationsparameter_berechne(self):
|
|
db = Datenbank.Datenbankzugriff(self.pfad_datenbank)
|
|
dict_ausgangssystem = db.get_koordinaten("naeherung_lh", "Dict")
|
|
dict_zielsystem = db.get_koordinaten("naeherung_us", "Dict")
|
|
|
|
gemeinsame_punktnummern = sorted(set(dict_ausgangssystem.keys()) & set(dict_zielsystem.keys()))
|
|
anzahl_gemeinsame_punkte = len(gemeinsame_punktnummern)
|
|
|
|
liste_punkte_ausgangssystem = [dict_ausgangssystem[i] for i in gemeinsame_punktnummern]
|
|
liste_punkte_zielsystem = [dict_zielsystem[i] for i in gemeinsame_punktnummern]
|
|
|
|
print("Anzahl gemeinsame Punkte:", anzahl_gemeinsame_punkte)
|
|
|
|
print("\nErste Zielpunkte:")
|
|
for pn, P in list(zip(gemeinsame_punktnummern, liste_punkte_zielsystem))[:5]:
|
|
print(pn, [float(P[0]), float(P[1]), float(P[2])])
|
|
|
|
print("\nErste Ausgangspunkte:")
|
|
for pn, p in list(zip(gemeinsame_punktnummern, liste_punkte_ausgangssystem))[:5]:
|
|
print(pn, [float(p[0]), float(p[1]), float(p[2])])
|
|
|
|
# --- Näherungswerte (minimal erweitert) ---
|
|
p1, p2, p3 = liste_punkte_ausgangssystem[0], liste_punkte_ausgangssystem[1], liste_punkte_ausgangssystem[2]
|
|
P1, P2, P3 = liste_punkte_zielsystem[0], liste_punkte_zielsystem[1], liste_punkte_zielsystem[2]
|
|
|
|
# 1) Näherungswert Maßstab: Mittelwert aus allen Punktpaaren
|
|
ratios = []
|
|
for i, j in combinations(range(anzahl_gemeinsame_punkte), 2):
|
|
dp = (liste_punkte_ausgangssystem[j] - liste_punkte_ausgangssystem[i]).norm()
|
|
dP = (liste_punkte_zielsystem[j] - liste_punkte_zielsystem[i]).norm()
|
|
dp_f = float(dp)
|
|
if dp_f > 0:
|
|
ratios.append(float(dP) / dp_f)
|
|
|
|
m0 = sum(ratios) / len(ratios)
|
|
|
|
if ratios:
|
|
print("min/mean/max:",
|
|
min(ratios),
|
|
sum(ratios) / len(ratios),
|
|
max(ratios))
|
|
|
|
U = (P2 - P1) / (P2 - P1).norm()
|
|
W = (U.cross(P3 - P1)) / (U.cross(P3 - P1)).norm()
|
|
V = W.cross(U)
|
|
|
|
u = (p2 - p1) / (p2 - p1).norm()
|
|
w = (u.cross(p3 - p1)) / (u.cross(p3 - p1)).norm()
|
|
v = w.cross(u)
|
|
|
|
R0 = sp.Matrix.hstack(U, V, W) * sp.Matrix.hstack(u, v, w).T
|
|
|
|
XS = sum(liste_punkte_zielsystem, sp.Matrix([0, 0, 0])) / anzahl_gemeinsame_punkte
|
|
xS = sum(liste_punkte_ausgangssystem, sp.Matrix([0, 0, 0])) / anzahl_gemeinsame_punkte
|
|
|
|
Translation0 = XS - m0 * R0 * xS
|
|
|
|
# 2) Test auf orthonormale Drehmatrix bei 3 Nachkommastellen!
|
|
if R0.T.applyfunc(lambda x: round(float(x), 3)) == R0.inv().applyfunc(lambda x: round(float(x), 3)) \
|
|
and (R0.T * R0).applyfunc(lambda x: round(float(x), 3)) == sp.eye(3).applyfunc(
|
|
lambda x: round(float(x), 3)) \
|
|
and ((round(R0.det(), 3) == 1.000 or round(R0.det(), 3) == -1.000)):
|
|
print("R ist Orthonormal!")
|
|
else:
|
|
print("R ist nicht Orthonormal!")
|
|
|
|
# 3) Euler-Näherungswerte aus R0
|
|
e2_0 = sp.asin(R0[2, 0])
|
|
# Schutz gegen Division durch 0 wenn cos(e2) ~ 0:
|
|
cos_e2_0 = sp.cos(e2_0)
|
|
|
|
e1_0 = sp.acos(R0[2, 2] / cos_e2_0)
|
|
e3_0 = sp.acos(R0[0, 0] / cos_e2_0)
|
|
|
|
# --- Symbolische Unbekannte (klassische 7 Parameter) ---
|
|
dX, dY, dZ, m, e1, e2, e3 = sp.symbols('dX dY dZ m e1 e2 e3')
|
|
R_symbolisch = self.R_matrix_aus_euler(e1, e2, e3)
|
|
|
|
# 4) Funktionales Modell
|
|
f_zeilen = []
|
|
for punkt in liste_punkte_ausgangssystem:
|
|
punkt_vektor = sp.Matrix([punkt[0], punkt[1], punkt[2]])
|
|
f_zeile_i = sp.Matrix([dX, dY, dZ]) + m * R_symbolisch * punkt_vektor
|
|
f_zeilen.extend(list(f_zeile_i))
|
|
|
|
f_matrix = sp.Matrix(f_zeilen)
|
|
f = f_matrix
|
|
|
|
A_ohne_zahlen = f_matrix.jacobian([dX, dY, dZ, m, e1, e2, e3])
|
|
|
|
# Parameterschätzung
|
|
schwellenwert = 1e-4
|
|
anzahl_iterationen = 0
|
|
alle_kleiner_vorherige_iteration = False
|
|
|
|
l_vektor = sp.Matrix([koord for P in liste_punkte_zielsystem for koord in P])
|
|
l = l_vektor
|
|
|
|
P_mat = sp.eye(3 * anzahl_gemeinsame_punkte)
|
|
l_berechnet_0 = None
|
|
|
|
while True:
|
|
if anzahl_iterationen == 0:
|
|
zahlen_0 = {
|
|
dX: float(Translation0[0]),
|
|
dY: float(Translation0[1]),
|
|
dZ: float(Translation0[2]),
|
|
m: float(m0),
|
|
e1: float(e1_0),
|
|
e2: float(e2_0),
|
|
e3: float(e3_0)
|
|
}
|
|
|
|
x0 = sp.Matrix([zahlen_0[dX], zahlen_0[dY], zahlen_0[dZ],
|
|
zahlen_0[m], zahlen_0[e1], zahlen_0[e2], zahlen_0[e3]])
|
|
|
|
l_berechnet_0 = f.subs(zahlen_0).evalf(n=30)
|
|
dl_0 = l_vektor - l_berechnet_0
|
|
|
|
A_0 = A_ohne_zahlen.subs(zahlen_0).evalf(n=30)
|
|
N = A_0.T * P_mat * A_0
|
|
n_0 = A_0.T * P_mat * dl_0
|
|
Qxx_0 = N.inv()
|
|
dx = Qxx_0 * n_0
|
|
x = x0 + dx
|
|
x = sp.N(x, 30)
|
|
|
|
anzahl_iterationen += 1
|
|
print(f"Iteration Nr.{anzahl_iterationen} abgeschlossen")
|
|
print(dx.evalf(n=3))
|
|
|
|
else:
|
|
zahlen_i = {
|
|
dX: float(x[0]),
|
|
dY: float(x[1]),
|
|
dZ: float(x[2]),
|
|
m: float(x[3]),
|
|
e1: float(x[4]),
|
|
e2: float(x[5]),
|
|
e3: float(x[6])
|
|
}
|
|
|
|
l_berechnet_i = f.subs(zahlen_i).evalf(n=30)
|
|
dl_i = l_vektor - l_berechnet_i
|
|
|
|
A_i = A_ohne_zahlen.subs(zahlen_i).evalf(n=30)
|
|
N_i = A_i.T * P_mat * A_i
|
|
Qxx_i = N_i.inv()
|
|
n_i = A_i.T * P_mat * dl_i
|
|
|
|
dx = Qxx_i * n_i
|
|
x = sp.Matrix(x + dx)
|
|
|
|
anzahl_iterationen += 1
|
|
print(f"Iteration Nr.{anzahl_iterationen} abgeschlossen")
|
|
print(dx.evalf(n=3))
|
|
|
|
alle_kleiner = True
|
|
for i in range(dx.rows):
|
|
wert = float(dx[i])
|
|
if abs(wert) > schwellenwert:
|
|
alle_kleiner = False
|
|
|
|
if (alle_kleiner and alle_kleiner_vorherige_iteration) or anzahl_iterationen == 100:
|
|
break
|
|
|
|
alle_kleiner_vorherige_iteration = alle_kleiner
|
|
|
|
print(l.evalf(n=3))
|
|
print(l_berechnet_0.evalf(n=3))
|
|
print(f"x = {x.evalf(n=3)}")
|
|
|
|
# --- Neuberechnung Zielsystem ---
|
|
zahlen_final = {
|
|
dX: float(x[0]),
|
|
dY: float(x[1]),
|
|
dZ: float(x[2]),
|
|
m: float(x[3]),
|
|
e1: float(x[4]),
|
|
e2: float(x[5]),
|
|
e3: float(x[6])
|
|
}
|
|
|
|
l_berechnet_final = f.subs(zahlen_final).evalf(n=30)
|
|
|
|
liste_l_berechnet_final = []
|
|
for i in range(anzahl_gemeinsame_punkte):
|
|
Xi = l_berechnet_final[3 * i + 0]
|
|
Yi = l_berechnet_final[3 * i + 1]
|
|
Zi = l_berechnet_final[3 * i + 2]
|
|
liste_l_berechnet_final.append(sp.Matrix([Xi, Yi, Zi]))
|
|
|
|
print("")
|
|
print("l_berechnet_final:")
|
|
for punktnummer, l_fin in zip(gemeinsame_punktnummern, liste_l_berechnet_final):
|
|
print(f"{punktnummer}: {float(l_fin[0]):.3f}, {float(l_fin[1]):.3f}, {float(l_fin[2]):.3f}")
|
|
|
|
print("Streckendifferenzen:")
|
|
streckendifferenzen = [
|
|
(punkt_zielsys - l_final).norm()
|
|
for punkt_zielsys, l_final in zip(liste_punkte_zielsystem, liste_l_berechnet_final)
|
|
]
|
|
print([round(float(s), 6) for s in streckendifferenzen])
|
|
|
|
Schwerpunkt_Zielsystem = sum(liste_punkte_zielsystem, sp.Matrix([0, 0, 0])) / anzahl_gemeinsame_punkte
|
|
Schwerpunkt_berechnet = sum(liste_l_berechnet_final, sp.Matrix([0, 0, 0])) / anzahl_gemeinsame_punkte
|
|
|
|
Schwerpunktsdifferenz = Schwerpunkt_Zielsystem - Schwerpunkt_berechnet
|
|
|
|
print("\nDifferenz Schwerpunkt (Vektor):")
|
|
print(Schwerpunktsdifferenz.evalf(3))
|
|
|
|
print("Betrag der Schwerpunkt-Differenz:")
|
|
print(f"{float(Schwerpunktsdifferenz.norm()):.3f}m")
|
|
|
|
return zahlen_final
|
|
|
|
def Helmerttransformation(self, transformationsparameter):
|
|
db = Datenbank.Datenbankzugriff(self.pfad_datenbank)
|
|
dict_ausgangssystem = db.get_koordinaten("naeherung_lh", "Dict")
|
|
dict_zielsystem = db.get_koordinaten("naeherung_us", "Dict")
|
|
|
|
dX, dY, dZ, m, e1, e2, e3 = sp.symbols('dX dY dZ m e1 e2 e3')
|
|
|
|
unterschiedliche_punktnummern = sorted(set(dict_ausgangssystem.keys()) ^ set(dict_zielsystem.keys()))
|
|
punktnummern_transformieren = [
|
|
punktnummer for punktnummer in unterschiedliche_punktnummern if punktnummer in dict_ausgangssystem
|
|
]
|
|
liste_punkte_ausgangssystem = [dict_ausgangssystem[punktnummer] for punktnummer in punktnummern_transformieren]
|
|
|
|
R = self.R_matrix_aus_euler(transformationsparameter[e1], transformationsparameter[e2], transformationsparameter[e3])
|
|
|
|
f_zeilen = []
|
|
for punkt in liste_punkte_ausgangssystem:
|
|
punkt_vektor = sp.Matrix([punkt[0], punkt[1], punkt[2]])
|
|
f_zeile_i = sp.Matrix([transformationsparameter[dX], transformationsparameter[dY], transformationsparameter[dZ]]) + transformationsparameter[m] * R * punkt_vektor
|
|
f_zeilen.extend(list(f_zeile_i))
|
|
|
|
f_matrix = sp.Matrix(f_zeilen)
|
|
dict_transformiert = {}
|
|
for i, pn in enumerate(punktnummern_transformieren):
|
|
Xi = f_matrix[3 * i + 0]
|
|
Yi = f_matrix[3 * i + 1]
|
|
Zi = f_matrix[3 * i + 2]
|
|
|
|
dict_transformiert[str(pn)] = sp.Matrix([
|
|
[float(Xi)],
|
|
[float(Yi)],
|
|
[float(Zi)]
|
|
])
|
|
return dict_transformiert
|
|
|
|
def utm_to_XYZ(self, pfad_tif_quasigeoidundolation, liste_utm):
|
|
pfad_gcg_tif = Path(pfad_tif_quasigeoidundolation)
|
|
pfad_gcg_tif_proj = pfad_gcg_tif.with_name("de_bkg_gcg2016.tif")
|
|
|
|
if (not pfad_gcg_tif_proj.exists()) or (pfad_gcg_tif_proj.stat().st_size != pfad_gcg_tif.stat().st_size):
|
|
shutil.copy2(pfad_gcg_tif, pfad_gcg_tif_proj)
|
|
|
|
datadir.append_data_dir(str(pfad_gcg_tif.parent))
|
|
|
|
utm_epsg = 25832
|
|
crs_src = CRS.from_user_input(f"EPSG:{utm_epsg}+EPSG:7837") # ETRS89/DREF91 + DHHN2016
|
|
crs_dst = CRS.from_epsg(4936) # ETRS89 geozentrisch (ECEF)
|
|
|
|
tr_best = Transformer.from_crs(
|
|
crs_src,
|
|
crs_dst,
|
|
always_xy=True,
|
|
allow_ballpark=False,
|
|
)
|
|
|
|
dict_geozentrisch_kartesisch = {}
|
|
for Punktnummer, E, N, Normalhoehe in liste_utm:
|
|
X, Y, Z = tr_best.transform(E, N, Normalhoehe)
|
|
dict_geozentrisch_kartesisch[Punktnummer] = sp.Matrix([X, Y, Z])
|
|
|
|
# geographisch 3D + zeta
|
|
#crs_geog3d = CRS.from_epsg(4937) # ETRS89 (lon, lat, h)
|
|
#tr_h = Transformer.from_crs(
|
|
# crs_src,
|
|
# crs_geog3d,
|
|
# always_xy=True,
|
|
# allow_ballpark=False,
|
|
#)
|
|
|
|
#lon, lat, h = tr_h.transform(E, N, H)
|
|
#print("lon/lat/h:", lon, lat, h)
|
|
#print("zeta (h-H):", h - H)
|
|
|
|
return dict_geozentrisch_kartesisch
|
|
|
|
def ecef_to_utm(
|
|
self,
|
|
dict_koordinaten: dict,
|
|
pfad_gcg_tif: str | Path | None = None,
|
|
zone: int = 32,
|
|
):
|
|
|
|
if pfad_gcg_tif is not None:
|
|
pfad_gcg_tif = Path(pfad_gcg_tif).resolve()
|
|
if not pfad_gcg_tif.exists():
|
|
raise FileNotFoundError(f"Quasigeoid-Datei nicht gefunden: {pfad_gcg_tif}")
|
|
|
|
pfad_proj_grid = pfad_gcg_tif.with_name("de_bkg_gcg2016.tif")
|
|
if (
|
|
not pfad_proj_grid.exists()
|
|
or pfad_proj_grid.stat().st_size != pfad_gcg_tif.stat().st_size
|
|
):
|
|
shutil.copy2(pfad_gcg_tif, pfad_proj_grid)
|
|
|
|
datadir.append_data_dir(str(pfad_proj_grid.parent))
|
|
|
|
crs_src = CRS.from_epsg(4936) # ETRS89 geocentric (ECEF)
|
|
|
|
# Ziel-CRS: ETRS89 / UTM Zone 32/33 + DHHN2016 Normalhöhe
|
|
# EPSG:25832/25833 = ETRS89 / UTM; EPSG:7837 = DHHN2016 height
|
|
utm_epsg = 25800 + zone # 25832 oder 25833
|
|
crs_dst = CRS.from_user_input(f"EPSG:{utm_epsg}+EPSG:7837")
|
|
|
|
tr = Transformer.from_crs(
|
|
crs_src,
|
|
crs_dst,
|
|
always_xy=True,
|
|
allow_ballpark=False,
|
|
)
|
|
|
|
tr_geo = Transformer.from_crs(CRS.from_epsg(4936), CRS.from_epsg(4979), always_xy=True)
|
|
|
|
dict_koordinaten_utm = {}
|
|
for punktnummer, koordinate in dict_koordinaten.items():
|
|
werte = []
|
|
queue = [koordinate]
|
|
while queue and len(werte) < 3:
|
|
v = queue.pop(0)
|
|
|
|
# Sympy Matrix
|
|
if isinstance(v, sp.Matrix):
|
|
if v.rows * v.cols == 1:
|
|
queue.insert(0, v[0])
|
|
else:
|
|
queue = list(np.array(v.tolist(), dtype=object).reshape(-1)) + queue
|
|
continue
|
|
|
|
# numpy array
|
|
if isinstance(v, np.ndarray):
|
|
if v.size == 1:
|
|
queue.insert(0, v.reshape(-1)[0])
|
|
else:
|
|
queue = list(v.reshape(-1)) + queue
|
|
continue
|
|
|
|
# Liste / Tuple
|
|
if isinstance(v, (list, tuple)):
|
|
if len(v) == 1:
|
|
queue.insert(0, v[0])
|
|
else:
|
|
queue = list(v) + queue
|
|
continue
|
|
|
|
# Skalar
|
|
werte.append(float(v))
|
|
|
|
if len(werte) < 3:
|
|
raise ValueError(f"Zu wenig skalare Werte gefunden: {werte}")
|
|
|
|
X, Y, Z = werte[0], werte[1], werte[2]
|
|
|
|
try:
|
|
E, N, H = tr.transform(X, Y, Z, errcheck=True)
|
|
except ProjError as e:
|
|
lon, lat, h_ell = tr_geo.transform(X, Y, Z, errcheck=True)
|
|
raise ProjError(
|
|
f"transform error (outside grid) | pn={punktnummer} | "
|
|
f"X,Y,Z={X},{Y},{Z} | lon/lat={lon},{lat} | h_ell={h_ell} | {e}"
|
|
)
|
|
|
|
E, N, H = tr.transform(X, Y, Z, errcheck=True)
|
|
# Runden, weil ansonsten aufgrund begrenzter Rechenkapazität falsche Werte Resultieren
|
|
dict_koordinaten_utm[punktnummer] = (round(E, 8), round(N, 8), round(H, 8))
|
|
return dict_koordinaten_utm |