This commit is contained in:
2026-01-19 14:48:24 +01:00
parent d2b6f604f0
commit 7a170f5ead
11 changed files with 56477 additions and 43084 deletions

View File

@@ -1,63 +1,67 @@
from typing import Any
import sympy as sp import sympy as sp
from decimal import Decimal from decimal import Decimal
import math import math
import numpy as np import numpy as np
from numpy import ndarray, dtype
import Datenbank import Datenbank
class Berechnungen: class Berechnungen:
def __init__(self, a, b): def __init__(self, a: float, b: float) -> None:
self.a_wert = a self.a_wert = a
self.b_wert = b self.b_wert = b
self.e_quadrat_wert = self.e_quadrat() self.e_quadrat_wert = self.e_quadrat()
self.e_strich_quadrat_wert = self.e_strich_quadrat() self.e_strich_quadrat_wert = self.e_strich_quadrat()
def e_quadrat(self): def e_quadrat(self) -> float:
return (self.a_wert**2 - self.b_wert**2) / self.a_wert **2 return (self.a_wert**2 - self.b_wert**2) / self.a_wert **2
def e_strich_quadrat(self): def e_strich_quadrat(self) -> float:
return (self.a_wert**2 - self.b_wert**2) / self.b_wert **2 return (self.a_wert**2 - self.b_wert**2) / self.b_wert **2
def P(self, x, y): def P(self, x: float, y: float) -> float:
return np.sqrt(x**2 + y**2) return np.sqrt(x**2 + y**2)
def hilfswinkel(self, x, y, z): def hilfswinkel(self, x: float, y: float, z: float) -> float:
hw = np.atan2(z * self.a_wert, self.P(x, y) * self.b_wert) hw = np.atan2(z * self.a_wert, self.P(x, y) * self.b_wert)
return hw return hw
def B(self, x, y, z): def B(self, x: float, y: float, z: float) -> float:
hilfswinkel = self.hilfswinkel(x, y, z) hilfswinkel = self.hilfswinkel(x, y, z)
B = np.atan2((z + self.e_strich_quadrat_wert * self.b_wert * np.sin(hilfswinkel) ** 3), (self.P(x, y) - self.e_quadrat_wert * self.a_wert * np.cos(hilfswinkel) ** 3)) B = np.atan2((z + self.e_strich_quadrat_wert * self.b_wert * np.sin(hilfswinkel) ** 3), (self.P(x, y) - self.e_quadrat_wert * self.a_wert * np.cos(hilfswinkel) ** 3))
return B return B
def L(self, x, y): def L(self, x: float, y: float) -> float:
return np.atan2(y, x) return np.atan2(y, x)
def H(self, x, y, z): def H(self, x: float, y: float, z: float) -> float:
B = self.B(x, y, z) B = self.B(x, y, z)
H = (self.P(x, y) / np.cos(B)) - self.a_wert / (np.sqrt(1 - self.e_quadrat_wert * np.sin(B) ** 2)) H = (self.P(x, y) / np.cos(B)) - self.a_wert / (np.sqrt(1 - self.e_quadrat_wert * np.sin(B) ** 2))
return H return H
def E(self, L, dX, dY): def E(self, L: float, dX: float, dY: float) -> float:
E = -np.sin(L) * dX + np.cos(L) * dY E = -np.sin(L) * dX + np.cos(L) * dY
return E return E
def N(self, B, L, dX, dY, dZ): def N(self, B: float, L: float, dX: float, dY: float, dZ: float) -> float:
N = -np.sin(B) * np.cos(L) * dX - np.sin(B) * np.sin(L) * dY + np.cos(B) * dZ N = -np.sin(B) * np.cos(L) * dX - np.sin(B) * np.sin(L) * dY + np.cos(B) * dZ
return N return N
def U(self, B, L, dX, dY, dZ): def U(self, B: float, L: float, dX: float, dY: float, dZ: float) -> float:
U = np.cos(B) * np.cos(L) * dX + np.cos(B) * np.sin(L) *dY + np.sin(B) * dZ U = np.cos(B) * np.cos(L) * dX + np.cos(B) * np.sin(L) *dY + np.sin(B) * dZ
return U return U
def horizontalstrecke_ENU(self, E, N): def horizontalstrecke_ENU(self, E: float, N: float) -> float:
s = np.sqrt(E**2 + N**2) s = np.sqrt(E**2 + N**2)
return s return s
def Zenitwinkel(self, horizontalstrecke_ENU, U): def Zenitwinkel(self, horizontalstrecke_ENU: float, U: float) -> float:
zw = np.atan2(horizontalstrecke_ENU, U) zw = np.atan2(horizontalstrecke_ENU, U)
return zw return zw
def Azimut(self, E, N): def Azimut(self, E: float, N: float) -> float:
Azimut = np.atan2(E, N) Azimut = np.atan2(E, N)
if Azimut < 0: if Azimut < 0:
Azimut += 2 * np.pi Azimut += 2 * np.pi
@@ -65,7 +69,7 @@ class Berechnungen:
Azimut -= 2 * np.pi Azimut -= 2 * np.pi
return Azimut return Azimut
def Richtung(self, Azimut, Orientierung): def Richtung(self, Azimut: float, Orientierung: float) -> float:
Richtung = Azimut - Orientierung Richtung = Azimut - Orientierung
if Richtung < 0: if Richtung < 0:
Richtung += 2 * np.pi Richtung += 2 * np.pi
@@ -73,13 +77,13 @@ class Berechnungen:
Richtung -= 2 * np.pi Richtung -= 2 * np.pi
return Richtung return Richtung
def geometrische_breite_laenge(self, dict_koordinaten): def geometrische_breite_laenge(self, dict_koordinaten: dict) -> dict:
for punktnummer, matrix in dict_koordinaten.items(): for punktnummer, matrix in dict_koordinaten.items():
dict_koordinaten[punktnummer] = [matrix, self.B(matrix[0], matrix[1], matrix[2]), self.L(matrix[0], matrix[1])] dict_koordinaten[punktnummer] = [matrix, self.B(matrix[0], matrix[1], matrix[2]), self.L(matrix[0], matrix[1])]
return dict_koordinaten return dict_koordinaten
def berechnung_richtung_azimut_zenitwinkel(self, pfad_datenbank, dict_koordinaten): def berechnung_richtung_azimut_zenitwinkel(self, pfad_datenbank: str, dict_koordinaten: dict) -> tuple[list[Any], dict[Any, Any]]:
dict_koordinaten_erweitert = {} dict_koordinaten_erweitert = {}
dict_orientierungen = {} dict_orientierungen = {}
liste_azimut_richtungen = [] liste_azimut_richtungen = []
@@ -145,29 +149,29 @@ class Berechnungen:
class Einheitenumrechnung: class Einheitenumrechnung:
def __init__(self): def __init__(self) -> None:
pass pass
def mas_to_rad(mas): def mas_to_rad(mas: float) -> float:
umrechnungsfaktor = 1 / 1000 * 1 / 3600 * sp.pi / 180 umrechnungsfaktor = 1 / 1000 * 1 / 3600 * sp.pi / 180
grad = mas * umrechnungsfaktor grad = mas * umrechnungsfaktor
return grad return grad
def mm_to_m(mm): def mm_to_m(mm: float) -> float:
m = mm / 1000 m = mm / 1000
return m return m
def ppb(ppb): def ppb(ppb: float) -> float:
ppb *= 10 ** (-9) ppb *= 10 ** (-9)
return ppb return ppb
def gon_to_rad_Decimal(gon): def gon_to_rad_Decimal(gon: float) -> float:
gon = Decimal(gon) gon = Decimal(gon)
pi = Decimal(str(math.pi)) pi = Decimal(str(math.pi))
rad = (gon / Decimal(200)) * pi rad = (gon / Decimal(200)) * pi
return rad return rad
def mgon_to_rad_Decimal(gon): def mgon_to_rad_Decimal(gon: float) -> float:
gon = Decimal(gon) gon = Decimal(gon)
pi = Decimal(str(math.pi)) pi = Decimal(str(math.pi))
rad = (gon / Decimal(200000)) * pi rad = (gon / Decimal(200000)) * pi

File diff suppressed because one or more lines are too long

View File

@@ -1,17 +1,21 @@
import os import os
import sqlite3 import sqlite3
from typing import Any
import sympy as sp import sympy as sp
from sympy import MutableDenseMatrix
from Berechnungen import Einheitenumrechnung from Berechnungen import Einheitenumrechnung
from decimal import Decimal from decimal import Decimal
class Datenbank_anlegen: class Datenbank_anlegen:
def __init__(self, pfad_datenbank): def __init__(self, pfad_datenbank: str) -> None:
self.pfad_datenbank = pfad_datenbank self.pfad_datenbank = pfad_datenbank
self.db_anlegen() self.db_anlegen()
def db_anlegen(self): def db_anlegen(self) -> None:
# pfad = r"C:\Users\fabia\OneDrive\Jade HS\Master\MGW2\Masterprojekt_allgemein\Masterprojekt\Programmierung\Campusnetz\Campusnetz.db" # pfad = r"C:\Users\fabia\OneDrive\Jade HS\Master\MGW2\Masterprojekt_allgemein\Masterprojekt\Programmierung\Campusnetz\Campusnetz.db"
if not os.path.exists(self.pfad_datenbank): if not os.path.exists(self.pfad_datenbank):
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
@@ -42,11 +46,17 @@ class Datenbank_anlegen:
punktnummer_zp TEXT(10), punktnummer_zp TEXT(10),
instrumenteID INTEGER, instrumenteID INTEGER,
tachymeter_richtung NUMERIC(8, 6), tachymeter_richtung NUMERIC(8, 6),
tachymeter_richtung_ausschalten INTEGER DEFAULT 0,
tachymeter_zenitwinkel NUMERIC(8, 6), tachymeter_zenitwinkel NUMERIC(8, 6),
tachymeter_zenitwinkel_ausschalten INTEGER DEFAULT 0,
tachymeter_distanz NUMERIC(8, 4), tachymeter_distanz NUMERIC(8, 4),
tachymeter_distanz_auschalten INTEGER DEFAULT 0,
gnss_bx NUMERIC(7, 4), gnss_bx NUMERIC(7, 4),
gnss_bx_ausschalten INTEGER DEFAULT 0,
gnss_by NUMERIC(7, 4), gnss_by NUMERIC(7, 4),
gnss_by_ausschalten INTEGER DEFAULT 0,
gnss_bz NUMERIC(7, 4), gnss_bz NUMERIC(7, 4),
gnss_bz_ausschalten INTEGER DEFAULT 0,
gnss_s0 NUMERIC(1, 8), gnss_s0 NUMERIC(1, 8),
gnss_cxx NUMERIC(1, 8), gnss_cxx NUMERIC(1, 8),
gnss_cxy NUMERIC(1, 8), gnss_cxy NUMERIC(1, 8),
@@ -57,6 +67,7 @@ class Datenbank_anlegen:
niv_dh NUMERIC(8, 6), niv_dh NUMERIC(8, 6),
niv_strecke NUMERIC(8, 6), niv_strecke NUMERIC(8, 6),
niv_anz_standpkte INTEGER, niv_anz_standpkte INTEGER,
niv_ausschalten INTEGER DEFAULT 0,
dateiname TEXT(200), dateiname TEXT(200),
CONSTRAINT pk_Beobachtungen PRIMARY KEY (beobachtungenID), CONSTRAINT pk_Beobachtungen PRIMARY KEY (beobachtungenID),
CONSTRAINT fk_Beobachtungen_Netzpunktesp FOREIGN KEY (punktnummer_sp) REFERENCES Netzpunkte(punktnummer), CONSTRAINT fk_Beobachtungen_Netzpunktesp FOREIGN KEY (punktnummer_sp) REFERENCES Netzpunkte(punktnummer),
@@ -86,10 +97,10 @@ class Datenbank_anlegen:
con.close() con.close()
class Datenbankzugriff: class Datenbankzugriff:
def __init__(self, pfad_datenbank): def __init__(self, pfad_datenbank: str) -> None:
self.pfad_datenbank = pfad_datenbank self.pfad_datenbank = pfad_datenbank
def set_koordinaten(self, dict_koordinaten, koordinatenart): def set_koordinaten(self, dict_koordinaten: dict, koordinatenart: str) -> None:
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
@@ -113,7 +124,7 @@ class Datenbankzugriff:
cursor.close() cursor.close()
con.close() con.close()
def set_instrument(self, typ, name): def set_instrument(self, typ: str, name: str) -> None:
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
liste_instrumente = cursor.execute("SELECT * FROM Instrumente WHERE typ = ? AND name =?", (typ, name)).fetchall() liste_instrumente = cursor.execute("SELECT * FROM Instrumente WHERE typ = ? AND name =?", (typ, name)).fetchall()
@@ -130,7 +141,8 @@ class Datenbankzugriff:
cursor.close() cursor.close()
con.close() con.close()
def set_genauigkeiten(self, instrumenteID, beobachtungsart, stabw_apriori_konstant =None, stabw_apriori_streckenprop =None): def set_genauigkeiten(self, instrumenteID: int, beobachtungsart: str, stabw_apriori_konstant: float = None,
stabw_apriori_streckenprop: float = None) -> None:
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
@@ -211,7 +223,9 @@ class Datenbankzugriff:
cursor.close() cursor.close()
con.close() con.close()
def set_datumskoordinaten(self, liste_datumskoordinaten_x, liste_datumskoordinaten_y, liste_datumskoordinaten_z, liste_datumskoordinaten_x_y_z): def set_datumskoordinaten(self, liste_datumskoordinaten_x: list, liste_datumskoordinaten_y: list,
liste_datumskoordinaten_z: list,
liste_datumskoordinaten_x_y_z: list) -> None:
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
@@ -259,7 +273,9 @@ class Datenbankzugriff:
cursor.close() cursor.close()
con.close() con.close()
def set_datumskoordinaten_to_neupunkte(self, liste_datumskoordinaten_x, liste_datumskoordinaten_y, liste_datumskoordinaten_z, liste_datumskoordinaten_x_y_z): def set_datumskoordinaten_to_neupunkte(self, liste_datumskoordinaten_x: list, liste_datumskoordinaten_y: list,
liste_datumskoordinaten_z: list,
liste_datumskoordinaten_x_y_z: list) -> None:
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
if liste_datumskoordinaten_x != []: if liste_datumskoordinaten_x != []:
@@ -291,7 +307,7 @@ class Datenbankzugriff:
cursor.close() cursor.close()
con.close() con.close()
def set_normalhoehe_hfp(self, liste_normalhoehe_hfp): def set_normalhoehe_hfp(self, liste_normalhoehe_hfp: list) -> LiteralString | str:
liste_hfp_in_db = self.get_normalhoehe_hfp() liste_hfp_in_db = self.get_normalhoehe_hfp()
if liste_normalhoehe_hfp != []: if liste_normalhoehe_hfp != []:
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
@@ -320,7 +336,81 @@ class Datenbankzugriff:
else: else:
return f"Es wurden keine neuen Normalhöhen übergeben. Folgende Normalhöhen sind in der Datenbank enthalten: {liste_hfp_in_db}" return f"Es wurden keine neuen Normalhöhen übergeben. Folgende Normalhöhen sind in der Datenbank enthalten: {liste_hfp_in_db}"
def get_koordinaten(self, koordinatenart, ausgabeart = "Dict"): def set_beobachtung_ausschalten(self, dict_beobachtung_ausschalten: dict) -> None:
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
for beobachtung, ausschalten in dict_beobachtung_ausschalten.items():
if ausschalten == "beobachtung_ausschalten":
beobachtung_gesplittet = beobachtung.split("_")
if beobachtung_gesplittet[1] == "SD":
#print(f"SD: {beobachtung_gesplittet}")
try:
cursor.execute(f"""UPDATE Beobachtungen SET tachymeter_distanz_auschalten = 1
WHERE beobachtungenID = ? AND beobachtungsgruppeID = ? AND punktnummer_sp = ? AND punktnummer_zp = ?""",
(beobachtung_gesplittet[0], beobachtung_gesplittet[2], beobachtung_gesplittet[3], beobachtung_gesplittet[4]))
except:
print(f"Die Beobachtung {beobachtung} konnte aufgrund eines Fehlers nicht ausgeschaltet werden.")
if beobachtung_gesplittet[1] == "R":
#print(f"R: {beobachtung_gesplittet}")
try:
cursor.execute(f"""UPDATE Beobachtungen SET tachymeter_richtung_ausschalten = 1
WHERE beobachtungenID = ? AND beobachtungsgruppeID = ? AND punktnummer_sp = ? AND punktnummer_zp = ?""",
(beobachtung_gesplittet[0], beobachtung_gesplittet[2], beobachtung_gesplittet[3], beobachtung_gesplittet[4]))
except:
print(f"Die Beobachtung {beobachtung} konnte aufgrund eines Fehlers nicht ausgeschaltet werden.")
if beobachtung_gesplittet[1] == "ZW":
#print(f"ZW: {beobachtung_gesplittet}")
try:
cursor.execute(f"""UPDATE Beobachtungen SET tachymeter_zenitwinkel_ausschalten = 1
WHERE beobachtungenID = ? AND beobachtungsgruppeID = ? AND punktnummer_sp = ? AND punktnummer_zp = ?""",
(beobachtung_gesplittet[0], beobachtung_gesplittet[2], beobachtung_gesplittet[3], beobachtung_gesplittet[4]))
except:
print(f"Die Beobachtung {beobachtung} konnte aufgrund eines Fehlers nicht ausgeschaltet werden.")
if beobachtung_gesplittet[1] == "gnssbx":
#print(f"gnssbx: {beobachtung_gesplittet}")
try:
cursor.execute(f"""UPDATE Beobachtungen SET gnss_bx_ausschalten = 1
WHERE beobachtungenID = ? AND punktnummer_sp = ? AND punktnummer_zp = ?""",
(beobachtung_gesplittet[0], beobachtung_gesplittet[2], beobachtung_gesplittet[3]))
except:
print(f"Die Beobachtung {beobachtung} konnte aufgrund eines Fehlers nicht ausgeschaltet werden.")
if beobachtung_gesplittet[1] == "gnssby":
#print(f"gnssby: {beobachtung_gesplittet}")
try:
cursor.execute(f"""UPDATE Beobachtungen SET gnss_by_ausschalten = 1
WHERE beobachtungenID = ? AND punktnummer_sp = ? AND punktnummer_zp = ?""",
(beobachtung_gesplittet[0], beobachtung_gesplittet[2], beobachtung_gesplittet[3]))
except:
print(f"Die Beobachtung {beobachtung} konnte aufgrund eines Fehlers nicht ausgeschaltet werden.")
if beobachtung_gesplittet[1] == "gnssbz":
#print(f"gnssbz: {beobachtung_gesplittet}")
try:
cursor.execute(f"""UPDATE Beobachtungen SET gnss_bz_ausschalten = 1
WHERE beobachtungenID = ? AND punktnummer_sp = ? AND punktnummer_zp = ?""",
(beobachtung_gesplittet[0], beobachtung_gesplittet[2], beobachtung_gesplittet[3]))
except:
print(f"Die Beobachtung {beobachtung} konnte aufgrund eines Fehlers nicht ausgeschaltet werden.")
if beobachtung_gesplittet[1] == "niv":
#print(f"niv: {beobachtung_gesplittet}")
try:
cursor.execute(f"""UPDATE Beobachtungen SET niv_ausschalten = 1
WHERE beobachtungenID = ? AND punktnummer_sp = ? AND punktnummer_zp = ?""",
(beobachtung_gesplittet[0], beobachtung_gesplittet[2], beobachtung_gesplittet[3]))
except:
print(f"Die Beobachtung {beobachtung} konnte aufgrund eines Fehlers nicht ausgeschaltet werden.")
print("Die ausgewählten Beobachtungen werden für die folgenden Iterationen ausgeschaltet.")
con.commit()
cursor.close()
con.close()
def get_koordinaten(self, koordinatenart: str, ausgabeart: str = "Dict") -> dict[Any, MutableDenseMatrix] | None:
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
if koordinatenart == "naeherung_lh": if koordinatenart == "naeherung_lh":
@@ -348,7 +438,7 @@ class Datenbankzugriff:
for koordinate in liste_koordinaten for koordinate in liste_koordinaten
} }
def get_normalhoehe_hfp(self): def get_normalhoehe_hfp(self) -> list[Any]:
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
liste_hfp = cursor.execute("SELECT punktnummer, normalhoehe_hfp FROM Netzpunkte WHERE normalhoehe_hfp IS NOT NULL").fetchall() liste_hfp = cursor.execute("SELECT punktnummer, normalhoehe_hfp FROM Netzpunkte WHERE normalhoehe_hfp IS NOT NULL").fetchall()
@@ -357,7 +447,7 @@ class Datenbankzugriff:
return liste_hfp return liste_hfp
def get_instrument_liste(self, typ): def get_instrument_liste(self, typ: str) -> list:
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
liste_instrumente = cursor.execute("SELECT * FROM Instrumente WHERE typ = ?", (typ,)).fetchall() liste_instrumente = cursor.execute("SELECT * FROM Instrumente WHERE typ = ?", (typ,)).fetchall()
@@ -368,7 +458,7 @@ class Datenbankzugriff:
liste_instrumente = f"Kein Instrument vom Typ {typ} gefunden. Folgende Typen stehen aktuell zur Auswahl: {liste_typen}" liste_instrumente = f"Kein Instrument vom Typ {typ} gefunden. Folgende Typen stehen aktuell zur Auswahl: {liste_typen}"
return liste_instrumente return liste_instrumente
def get_genauigkeiten_dict(self): def get_genauigkeiten_dict(self) -> dict[Any, Any]:
dict = {} dict = {}
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
@@ -379,7 +469,7 @@ class Datenbankzugriff:
con.close() con.close()
return dict return dict
def get_instrumenteID_beobachtungenID_dict(self): def get_instrumenteID_beobachtungenID_dict(self) -> dict[Any, Any]:
dict = {} dict = {}
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
@@ -390,15 +480,22 @@ class Datenbankzugriff:
con.close() con.close()
return dict return dict
def get_beobachtungen_id_beobachtungsgruppe_standpunkt_zielpunkt(self, beobachtungsart): def get_beobachtungen_id_beobachtungsgruppe_standpunkt_zielpunkt(self, beobachtungsart: str) -> list[Any]:
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
liste_beobachtungen = cursor.execute(f"SELECT beobachtungenID, beobachtungsgruppeID, punktnummer_sp, punktnummer_zp FROM Beobachtungen WHERE {beobachtungsart} IS NOT NULL").fetchall() sql_ausdruck = f"SELECT beobachtungenID, beobachtungsgruppeID, punktnummer_sp, punktnummer_zp FROM Beobachtungen WHERE {beobachtungsart} IS NOT NULL"
if beobachtungsart == "tachymeter_distanz":
sql_ausdruck += " AND tachymeter_distanz_auschalten = 0"
elif beobachtungsart == "tachymeter_richtung":
sql_ausdruck += " AND tachymeter_richtung_ausschalten = 0"
elif beobachtungsart == "tachymeter_zenitwinkel":
sql_ausdruck += " AND tachymeter_zenitwinkel_ausschalten = 0"
liste_beobachtungen = cursor.execute(sql_ausdruck).fetchall()
cursor.close() cursor.close()
con.close() con.close()
return liste_beobachtungen return liste_beobachtungen
def get_beobachtungen_from_beobachtungenid(self): def get_beobachtungen_from_beobachtungenid(self) -> list[Any]:
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
liste_beobachtungen = cursor.execute(f"SELECT punktnummer_sp, punktnummer_zp, beobachtungenID, beobachtungsgruppeID, tachymeter_richtung, tachymeter_zenitwinkel, tachymeter_distanz FROM Beobachtungen WHERE tachymeter_richtung IS NOT NULL").fetchall() liste_beobachtungen = cursor.execute(f"SELECT punktnummer_sp, punktnummer_zp, beobachtungenID, beobachtungsgruppeID, tachymeter_richtung, tachymeter_zenitwinkel, tachymeter_distanz FROM Beobachtungen WHERE tachymeter_richtung IS NOT NULL").fetchall()
@@ -406,7 +503,7 @@ class Datenbankzugriff:
con.close() con.close()
return liste_beobachtungen return liste_beobachtungen
def get_beobachtungen_gnssbasislinien(self): def get_beobachtungen_gnssbasislinien(self) -> list[Any]:
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
liste_beobachtungen = cursor.execute(f"SELECT beobachtungenID, punktnummer_sp, punktnummer_zp, gnss_bx, gnss_by, gnss_bz, gnss_s0, gnss_cxx, gnss_cxy, gnss_cxz, gnss_cyy, gnss_cyz, gnss_czz FROM Beobachtungen WHERE gnss_bx IS NOT NULL AND gnss_by IS NOT NULL AND gnss_bz IS NOT NULL AND gnss_s0 IS NOT NULL AND gnss_cxx IS NOT NULL AND gnss_cxy IS NOT NULL AND gnss_cxz IS NOT NULL AND gnss_cyy IS NOT NULL AND gnss_cyz IS NOT NULL AND gnss_czz IS NOT NULL").fetchall() liste_beobachtungen = cursor.execute(f"SELECT beobachtungenID, punktnummer_sp, punktnummer_zp, gnss_bx, gnss_by, gnss_bz, gnss_s0, gnss_cxx, gnss_cxy, gnss_cxz, gnss_cyy, gnss_cyz, gnss_czz FROM Beobachtungen WHERE gnss_bx IS NOT NULL AND gnss_by IS NOT NULL AND gnss_bz IS NOT NULL AND gnss_s0 IS NOT NULL AND gnss_cxx IS NOT NULL AND gnss_cxy IS NOT NULL AND gnss_cxz IS NOT NULL AND gnss_cyy IS NOT NULL AND gnss_cyz IS NOT NULL AND gnss_czz IS NOT NULL").fetchall()
@@ -414,15 +511,15 @@ class Datenbankzugriff:
con.close() con.close()
return liste_beobachtungen return liste_beobachtungen
def get_beobachtungen_nivellement(self): def get_beobachtungen_nivellement(self) -> list[Any]:
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
liste_beobachtungen = cursor.execute(f"SELECT beobachtungenID, punktnummer_sp, punktnummer_zp, niv_dh, niv_strecke, niv_anz_standpkte FROM Beobachtungen WHERE niv_dh IS NOT NULL AND niv_strecke IS NOT NULL AND niv_anz_standpkte IS NOT NULL").fetchall() liste_beobachtungen = cursor.execute(f"SELECT beobachtungenID, punktnummer_sp, punktnummer_zp, niv_dh, niv_strecke, niv_anz_standpkte FROM Beobachtungen WHERE niv_dh IS NOT NULL AND niv_strecke IS NOT NULL AND niv_anz_standpkte IS NOT NULL AND niv_ausschalten = 0").fetchall()
cursor.close() cursor.close()
con.close() con.close()
return liste_beobachtungen return liste_beobachtungen
def get_datumskoordinate(self): def get_datumskoordinate(self) -> list[Any]:
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
liste_datumskoordinate_x = cursor.execute( liste_datumskoordinate_x = cursor.execute(
@@ -451,7 +548,7 @@ class Datenbankzugriff:
return liste_datumskoordinaten return liste_datumskoordinaten
def get_stabw_AA_Netzpunkte(self): def get_stabw_AA_Netzpunkte(self) -> dict[Any, Any]:
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
liste_stabwAA_x = cursor.execute(f"SELECT punktnummer, stabw_vorinfo_x FROM Netzpunkte WHERE stabw_vorinfo_x IS NOT NULL").fetchall() liste_stabwAA_x = cursor.execute(f"SELECT punktnummer, stabw_vorinfo_x FROM Netzpunkte WHERE stabw_vorinfo_x IS NOT NULL").fetchall()
@@ -481,18 +578,32 @@ class Datenbankzugriff:
return dict_stabwAA return dict_stabwAA
def get_gnss_beobachtungen_punktnummern(self): def get_gnss_beobachtungen_punktnummern(self, gnss_komponente: str) -> list[Any]:
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
liste_gnss_beobachtungen = cursor.execute(f"SELECT beobachtungenID, punktnummer_sp, punktnummer_zp FROM Beobachtungen WHERE gnss_bx IS NOT NULL AND gnss_by IS NOT NULL AND gnss_bz IS NOT NULL AND gnss_s0 IS NOT NULL AND gnss_cxx IS NOT NULL AND gnss_cxy IS NOT NULL AND gnss_cxz IS NOT NULL AND gnss_cyy IS NOT NULL AND gnss_cyz IS NOT NULL AND gnss_czz IS NOT NULL").fetchall() sql = f"""SELECT beobachtungenID, punktnummer_sp, punktnummer_zp
FROM Beobachtungen
WHERE {gnss_komponente} IS NOT NULL
AND gnss_s0 IS NOT NULL
AND gnss_cxx IS NOT NULL AND gnss_cxy IS NOT NULL AND gnss_cxz IS NOT NULL
AND gnss_cyy IS NOT NULL AND gnss_cyz IS NOT NULL AND gnss_czz IS NOT NULL"""
if gnss_komponente == "gnss_bx":
sql += " AND gnss_bx_ausschalten = 0"
elif gnss_komponente == "gnss_by":
sql += " AND gnss_by_ausschalten = 0"
elif gnss_komponente == "gnss_bz":
sql += " AND gnss_bz_ausschalten = 0"
liste_gnss_beobachtungen = cursor.execute(sql).fetchall()
cursor.close() cursor.close()
con.close() con.close()
return liste_gnss_beobachtungen return liste_gnss_beobachtungen
def get_nivellement_beobachtungen_punktnummern(self): def get_nivellement_beobachtungen_punktnummern(self) -> list[Any]:
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
liste_nivellement_beobachtungen = cursor.execute(f"SELECT beobachtungenID, punktnummer_sp, punktnummer_zp FROM Beobachtungen WHERE niv_dh IS NOT NULL AND niv_strecke IS NOT NULL AND niv_anz_standpkte IS NOT NULL").fetchall() liste_nivellement_beobachtungen = cursor.execute(f"SELECT beobachtungenID, punktnummer_sp, punktnummer_zp FROM Beobachtungen WHERE niv_dh IS NOT NULL AND niv_strecke IS NOT NULL AND niv_anz_standpkte IS NOT NULL AND niv_ausschalten = 0").fetchall()
cursor.close() cursor.close()
con.close() con.close()
return liste_nivellement_beobachtungen return liste_nivellement_beobachtungen

View File

@@ -1,11 +1,19 @@
import csv import csv
import numpy as np
import os
import webbrowser
from datetime import datetime
class Export: class Export:
def __init__(self): def __init__(self) -> None:
pass pass
@staticmethod @staticmethod
def matrix_to_csv(dateiname, liste_spaltenbeschriftung, liste_zeilenbeschriftung, Matrix, beschriftung_kopfzeile = ""): def matrix_to_csv(dateiname: str, liste_spaltenbeschriftung: list, liste_zeilenbeschriftung: list, Matrix: np.matrix | sp.Matrix,
beschriftung_kopfzeile: object = "") -> None:
with open(dateiname, "w", newline="", encoding="utf-8") as csvfile: with open(dateiname, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile, delimiter=";") writer = csv.writer(csvfile, delimiter=";")
@@ -29,7 +37,7 @@ class Export:
writer.writerow(zeile_als_text) writer.writerow(zeile_als_text)
@staticmethod @staticmethod
def ausgleichung_to_datei(dateiname, dict_ausgleichung): def ausgleichung_to_datei(dateiname: str, dict_ausgleichung: dict) -> None:
with open(dateiname, "w", newline="", encoding="utf-8") as csvfile: with open(dateiname, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile, delimiter=";") writer = csv.writer(csvfile, delimiter=";")
@@ -66,3 +74,68 @@ class Export:
value_text = str(value) value_text = str(value)
writer.writerow([key, value_text]) writer.writerow([key, value_text])
def speichere_html_protokoll(metadaten, ergebnisse):
# Pfad für den Ordner erstellen
ordner = "Protokolle"
if not os.path.exists(ordner):
os.makedirs(ordner)
dateiname = f"{ordner}/Protokoll_{metadaten['projekt']}.html"
abs_path = os.path.abspath(dateiname)
# HTML Inhalt zusammenbauen
html_content = f"""
<!DOCTYPE html>
<html lang="de">
<head>
<meta charset="UTF-8">
<style>
body {{ font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; margin: 40px; line-height: 1.6; color: #333; }}
h1 {{ color: #2c3e50; border-bottom: 2px solid #2c3e50; }}
h3 {{ color: #2980b9; margin-top: 30px; }}
.metadata {{ background: #f9f9f9; padding: 15px; border-radius: 5px; border-left: 5px solid #2980b9; }}
table {{ border-collapse: collapse; width: 100%; margin-top: 10px; }}
th, td {{ text-align: left; padding: 8px; border-bottom: 1px solid #ddd; }}
tr:nth-child(even) {{ background-color: #f2f2f2; }}
th {{ background-color: #2980b9; color: white; }}
.footer {{ margin-top: 50px; font-size: 0.8em; color: #7f8c8d; text-align: center; }}
</style>
<title>Netzprotokoll - {metadaten['projekt']}</title>
</head>
<body>
<h1>Netzprotokoll: {metadaten['projekt']}</h1>
<div class="metadata">
<p><strong>Bearbeiter:</strong> {metadaten['bearbeiter']}</p>
<p><strong>Datum:</strong> {metadaten['datum']}</p>
<p><strong>Ausgleichungsmodus:</strong> Methode der kleinsten Quadrate (L2-Norm)</p>
</div>
<h3>1. Globaltest</h3>
{ergebnisse['df_globaltest'].to_html(index=False)}
<h3>2. Redundanzanteile</h3>
{ergebnisse['df_redundanz'].to_html(index=False)}
<h3>3. Standardellipsen</h3>
{ergebnisse['df_ellipsen'].to_html(index=False)}
<h3>3. Konfidenzellipsen</h3>
{ergebnisse['df_konfidenzellipsen'].to_html(index=False)}
<div class="footer">
Erstellt automatisch am {datetime.now().strftime('%d.%m.%Y um %H:%M:%S')}
</div>
</body>
</html>
"""
# Datei schreiben
with open(dateiname, "w", encoding="utf-8") as f:
f.write(html_content)
print(f"✅ Protokoll wurde gespeichert unter: {abs_path}")
# Datei automatisch im Standard-Browser öffnen
webbrowser.open(f"file://{abs_path}")

View File

@@ -1,3 +1,9 @@
from typing import Any
from numpy import ndarray, dtype
from sympy import MutableDenseMatrix
from sympy.matrices.expressions.matexpr import MatrixElement
from Datenbank import Datenbankzugriff from Datenbank import Datenbankzugriff
import sympy as sp import sympy as sp
from Export import Export from Export import Export
@@ -14,7 +20,7 @@ import sympy as sp
class FunktionalesModell: class FunktionalesModell:
def __init__(self, pfad_datenbank, a, b, pfad_tif_quasigeoidundolation = None): def __init__(self, pfad_datenbank: str, a: float, b: float, pfad_tif_quasigeoidundolation: str = None) -> None:
self.pfad_datenbank = pfad_datenbank self.pfad_datenbank = pfad_datenbank
self.a = a self.a = a
self.b = b self.b = b
@@ -30,11 +36,11 @@ class FunktionalesModell:
self.liste_beobachtungsvektor_symbolisch = None self.liste_beobachtungsvektor_symbolisch = None
def jacobi_matrix_symbolisch(self, datumsfestlegung = None, liste_unbekannte_datumsfestlegung = None): def jacobi_matrix_symbolisch(self, datumsfestlegung: str = None, liste_unbekannte_datumsfestlegung: list = None) -> tuple[MutableDenseMatrix | MatrixElement | list[Any] | Any, list[Any], list[Any]] | None:
#liste_beobachtungsarten = ["tachymeter_distanz", "tachymeter_richtung", "tachymeter_zenitwinkel"] #liste_beobachtungsarten = ["tachymeter_distanz", "tachymeter_richtung", "tachymeter_zenitwinkel"]
#liste_beobachtungsarten = ["tachymeter_distanz", "tachymeter_richtung", "tachymeter_zenitwinkel", "gnss_basislinien", "geometrisches_nivellement"] liste_beobachtungsarten = ["tachymeter_distanz", "tachymeter_richtung", "tachymeter_zenitwinkel", "gnss_basislinien", "geometrisches_nivellement"]
liste_beobachtungsarten = ["tachymeter_distanz", "tachymeter_richtung", "tachymeter_zenitwinkel", "gnss_basislinien"] #liste_beobachtungsarten = ["tachymeter_distanz", "tachymeter_richtung", "tachymeter_zenitwinkel", "gnss_basislinien"]
db_zugriff = Datenbankzugriff(self.pfad_datenbank) db_zugriff = Datenbankzugriff(self.pfad_datenbank)
liste_beobachtungen_rohdaten_gnssbasislinien = [] liste_beobachtungen_rohdaten_gnssbasislinien = []
@@ -63,19 +69,52 @@ class FunktionalesModell:
if beobachtungsgruppeID not in liste_orientierungsunbekannte: if beobachtungsgruppeID not in liste_orientierungsunbekannte:
liste_orientierungsunbekannte.append(beobachtungsgruppeID) liste_orientierungsunbekannte.append(beobachtungsgruppeID)
#GNSS Block #GNSS Block
if beobachtungsart == "gnss_basislinien": #if beobachtungsart == "gnss_basislinien":
liste_id_standpunkt_zielpunkt = db_zugriff.get_gnss_beobachtungen_punktnummern() # liste_id_standpunkt_zielpunkt = db_zugriff.get_gnss_beobachtungen_punktnummern()
# for beobachtungenID, standpunkt, zielpunkt in liste_id_standpunkt_zielpunkt:
# standpunkt = str(standpunkt).strip()
# zielpunkt = str(zielpunkt).strip()
# liste_beobachtungen_rohdaten_gnssbasislinien.append((beobachtungsart, beobachtungenID, standpunkt, zielpunkt))
# if standpunkt not in liste_punktnummern:
# liste_punktnummern.append(standpunkt)
# if zielpunkt not in liste_punktnummern:
# liste_punktnummern.append(zielpunkt)
#GNSS Block
if beobachtungsart == "gnss_basislinien":
liste_id_standpunkt_zielpunkt = db_zugriff.get_gnss_beobachtungen_punktnummern("gnss_bx")
for beobachtungenID, standpunkt, zielpunkt in liste_id_standpunkt_zielpunkt: for beobachtungenID, standpunkt, zielpunkt in liste_id_standpunkt_zielpunkt:
standpunkt = str(standpunkt).strip() standpunkt = str(standpunkt).strip()
zielpunkt = str(zielpunkt).strip() zielpunkt = str(zielpunkt).strip()
liste_beobachtungen_rohdaten_gnssbasislinien.append((beobachtungsart, beobachtungenID, standpunkt, zielpunkt)) liste_beobachtungen_rohdaten_gnssbasislinien.append(("gnssbx", beobachtungenID, standpunkt, zielpunkt))
if standpunkt not in liste_punktnummern: if standpunkt not in liste_punktnummern:
liste_punktnummern.append(standpunkt) liste_punktnummern.append(standpunkt)
if zielpunkt not in liste_punktnummern: if zielpunkt not in liste_punktnummern:
liste_punktnummern.append(zielpunkt) liste_punktnummern.append(zielpunkt)
liste_id_standpunkt_zielpunkt = db_zugriff.get_gnss_beobachtungen_punktnummern("gnss_by")
for beobachtungenID, standpunkt, zielpunkt in liste_id_standpunkt_zielpunkt:
standpunkt = str(standpunkt).strip()
zielpunkt = str(zielpunkt).strip()
liste_beobachtungen_rohdaten_gnssbasislinien.append(("gnssby", beobachtungenID, standpunkt, zielpunkt))
if standpunkt not in liste_punktnummern:
liste_punktnummern.append(standpunkt)
if zielpunkt not in liste_punktnummern:
liste_punktnummern.append(zielpunkt)
liste_id_standpunkt_zielpunkt = db_zugriff.get_gnss_beobachtungen_punktnummern("gnss_bz")
for beobachtungenID, standpunkt, zielpunkt in liste_id_standpunkt_zielpunkt:
standpunkt = str(standpunkt).strip()
zielpunkt = str(zielpunkt).strip()
liste_beobachtungen_rohdaten_gnssbasislinien.append(("gnssbz", beobachtungenID, standpunkt, zielpunkt))
if standpunkt not in liste_punktnummern:
liste_punktnummern.append(standpunkt)
if zielpunkt not in liste_punktnummern:
liste_punktnummern.append(zielpunkt)
if beobachtungsart == "geometrisches_nivellement": if beobachtungsart == "geometrisches_nivellement":
liste_id_standpunkt_zielpunkt = db_zugriff.get_nivellement_beobachtungen_punktnummern() liste_id_standpunkt_zielpunkt = db_zugriff.get_nivellement_beobachtungen_punktnummern()
@@ -214,25 +253,49 @@ class FunktionalesModell:
f"{beobachtungenID}_ZW_{beobachtungsgruppeID}_{standpunkt}_{zielpunkt}" f"{beobachtungenID}_ZW_{beobachtungsgruppeID}_{standpunkt}_{zielpunkt}"
) )
#if liste_beobachtungen_rohdaten_gnssbasislinien != []:
# for beobachtungsart, beobachtungenID, standpunkt, zielpunkt in liste_beobachtungen_rohdaten_gnssbasislinien:
# X_sp, Y_sp, Z_sp = self.dict_punkt_symbole[standpunkt]
# X_zp, Y_zp, Z_zp = self.dict_punkt_symbole[zielpunkt]
# if beobachtungsart == "gnss_basislinien":
# beobachtungsgleichung_bx = X_zp - X_sp
# beobachtungsgleichung_by = Y_zp - Y_sp
# beobachtungsgleichung_bz = Z_zp - Z_sp
# liste_beobachtungsgleichungen_gnssbasislinien.append(beobachtungsgleichung_bx)
# liste_beobachtungsgleichungen_gnssbasislinien.append(beobachtungsgleichung_by)
# liste_beobachtungsgleichungen_gnssbasislinien.append(beobachtungsgleichung_bz)
# liste_zeilenbeschriftungen_gnssbasislinien.append(
# f"{beobachtungenID}_gnssbx_{standpunkt}_{zielpunkt}")
# liste_zeilenbeschriftungen_gnssbasislinien.append(
# f"{beobachtungenID}_gnssby_{standpunkt}_{zielpunkt}")
# liste_zeilenbeschriftungen_gnssbasislinien.append(
# f"{beobachtungenID}_gnssbz_{standpunkt}_{zielpunkt}")
if liste_beobachtungen_rohdaten_gnssbasislinien != []: if liste_beobachtungen_rohdaten_gnssbasislinien != []:
for beobachtungsart, beobachtungenID, standpunkt, zielpunkt in liste_beobachtungen_rohdaten_gnssbasislinien: for beobachtungsart, beobachtungenID, standpunkt, zielpunkt in liste_beobachtungen_rohdaten_gnssbasislinien:
X_sp, Y_sp, Z_sp = self.dict_punkt_symbole[standpunkt] X_sp, Y_sp, Z_sp = self.dict_punkt_symbole[standpunkt]
X_zp, Y_zp, Z_zp = self.dict_punkt_symbole[zielpunkt] X_zp, Y_zp, Z_zp = self.dict_punkt_symbole[zielpunkt]
if beobachtungsart == "gnss_basislinien": if beobachtungsart == "gnssbx":
beobachtungsgleichung_bx = X_zp - X_sp beobachtungsgleichung_bx = X_zp - X_sp
beobachtungsgleichung_by = Y_zp - Y_sp
beobachtungsgleichung_bz = Z_zp - Z_sp
liste_beobachtungsgleichungen_gnssbasislinien.append(beobachtungsgleichung_bx) liste_beobachtungsgleichungen_gnssbasislinien.append(beobachtungsgleichung_bx)
liste_beobachtungsgleichungen_gnssbasislinien.append(beobachtungsgleichung_by)
liste_beobachtungsgleichungen_gnssbasislinien.append(beobachtungsgleichung_bz)
liste_zeilenbeschriftungen_gnssbasislinien.append( liste_zeilenbeschriftungen_gnssbasislinien.append(
f"{beobachtungenID}_gnssbx_{standpunkt}_{zielpunkt}") f"{beobachtungenID}_gnssbx_{standpunkt}_{zielpunkt}")
if beobachtungsart == "gnssby":
beobachtungsgleichung_by = Y_zp - Y_sp
liste_beobachtungsgleichungen_gnssbasislinien.append(beobachtungsgleichung_by)
liste_zeilenbeschriftungen_gnssbasislinien.append( liste_zeilenbeschriftungen_gnssbasislinien.append(
f"{beobachtungenID}_gnssby_{standpunkt}_{zielpunkt}") f"{beobachtungenID}_gnssby_{standpunkt}_{zielpunkt}")
if beobachtungsart == "gnssbz":
beobachtungsgleichung_bz = Z_zp - Z_sp
liste_beobachtungsgleichungen_gnssbasislinien.append(beobachtungsgleichung_bz)
liste_zeilenbeschriftungen_gnssbasislinien.append( liste_zeilenbeschriftungen_gnssbasislinien.append(
f"{beobachtungenID}_gnssbz_{standpunkt}_{zielpunkt}") f"{beobachtungenID}_gnssbz_{standpunkt}_{zielpunkt}")
if liste_beobachtungen_rohdaten_nivellement != []: if liste_beobachtungen_rohdaten_nivellement != []:
for beobachtungsart, beobachtungenID, standpunkt, zielpunkt in liste_beobachtungen_rohdaten_nivellement: for beobachtungsart, beobachtungenID, standpunkt, zielpunkt in liste_beobachtungen_rohdaten_nivellement:
X_sp, Y_sp, Z_sp = self.dict_punkt_symbole[standpunkt] X_sp, Y_sp, Z_sp = self.dict_punkt_symbole[standpunkt]
@@ -375,7 +438,10 @@ class FunktionalesModell:
return A_gesamt, liste_unbekannte, liste_zeilenbeschriftungen_gesamt return A_gesamt, liste_unbekannte, liste_zeilenbeschriftungen_gesamt
def jacobi_matrix_zahlen_iteration_0(self, A_symbolisch, koordinatenart, liste_unbekannte = None, liste_zeilenbeschriftungen_gesamt = None, iterationsnummer = 0): def jacobi_matrix_zahlen_iteration_0(self, A_symbolisch: sp.Matrix, koordinatenart: str,
liste_unbekannte: list = None,
liste_zeilenbeschriftungen_gesamt: list = None,
iterationsnummer: int = 0) -> ndarray[tuple[Any, ...], dtype[Any]] | None:
self.liste_beobachtungsvektor_symbolisch = [str(x) for x in liste_zeilenbeschriftungen_gesamt] self.liste_beobachtungsvektor_symbolisch = [str(x) for x in liste_zeilenbeschriftungen_gesamt]
if koordinatenart == "naeherung_us": if koordinatenart == "naeherung_us":
@@ -421,7 +487,7 @@ class FunktionalesModell:
else: else:
print("Koordinaten noch nicht implementiert!") print("Koordinaten noch nicht implementiert!")
def beobachtungsvektor_numerisch(self, liste_beobachtungsvektor_symbolisch): def beobachtungsvektor_numerisch(self, liste_beobachtungsvektor_symbolisch: list) -> MutableDenseMatrix:
liste_beobachtungsvektor_numerisch = [] liste_beobachtungsvektor_numerisch = []
for beobachtung_symbolisch in liste_beobachtungsvektor_symbolisch: for beobachtung_symbolisch in liste_beobachtungsvektor_symbolisch:
@@ -434,7 +500,7 @@ class FunktionalesModell:
Export.matrix_to_csv(r"Zwischenergebnisse\Beobachtungsvektor_Numerisch.csv", [""], liste_beobachtungsvektor_symbolisch, beobachtungsvektor_numerisch, "Beobachtungsvektor") Export.matrix_to_csv(r"Zwischenergebnisse\Beobachtungsvektor_Numerisch.csv", [""], liste_beobachtungsvektor_symbolisch, beobachtungsvektor_numerisch, "Beobachtungsvektor")
return beobachtungsvektor_numerisch return beobachtungsvektor_numerisch
def beobachtungsvektor_naeherung_symbolisch(self, liste_beobachtungsvektor_symbolisch): def beobachtungsvektor_naeherung_symbolisch(self, liste_beobachtungsvektor_symbolisch: list) -> sp.Matrix:
liste_beobachtungsgleichungen = [] liste_beobachtungsgleichungen = []
self.dict_punkt_symbole = {} self.dict_punkt_symbole = {}
liste_punktnummern = [] liste_punktnummern = []
@@ -568,7 +634,9 @@ class FunktionalesModell:
return beobachtungsvektor_naeherung_symbolisch return beobachtungsvektor_naeherung_symbolisch
def beobachtungsvektor_naeherung_numerisch_iteration0(self, liste_beobachtungsvektor_symbolisch, beobachtungsvektor_naeherung_symbolisch, iterationsnummer=0): def beobachtungsvektor_naeherung_numerisch_iteration0(self, liste_beobachtungsvektor_symbolisch: list,
beobachtungsvektor_naeherung_symbolisch: sp.Matrix,
iterationsnummer: int = 0) -> ndarray[tuple[int, int], Any]:
#beobachtungsvektor_naeherung_numerisch_iteration0 = beobachtungsvektor_naeherung_symbolisch.xreplace(self.substitutionen_dict) #beobachtungsvektor_naeherung_numerisch_iteration0 = beobachtungsvektor_naeherung_symbolisch.xreplace(self.substitutionen_dict)
if self.func_beob0 is None: if self.func_beob0 is None:
#self.liste_symbole_lambdify = sorted(self.substitutionen_dict.keys(), key=lambda s: str(s)) #self.liste_symbole_lambdify = sorted(self.substitutionen_dict.keys(), key=lambda s: str(s))
@@ -590,13 +658,17 @@ class FunktionalesModell:
return beobachtungsvektor_naeherung_numerisch_iteration0 return beobachtungsvektor_naeherung_numerisch_iteration0
def unbekanntenvektor_symbolisch(self, liste_unbekannte): def unbekanntenvektor_symbolisch(self, liste_unbekannte: list) -> sp.Matrix:
unbekanntenvektor_symbolisch = sp.Matrix(liste_unbekannte) unbekanntenvektor_symbolisch = sp.Matrix(liste_unbekannte)
Export.matrix_to_csv(r"Zwischenergebnisse\Unbekanntenvektor_Symbolisch.csv", [""], liste_unbekannte, unbekanntenvektor_symbolisch, Export.matrix_to_csv(r"Zwischenergebnisse\Unbekanntenvektor_Symbolisch.csv", [""], liste_unbekannte, unbekanntenvektor_symbolisch,
"Unbekanntenvektor") "Unbekanntenvektor")
return(unbekanntenvektor_symbolisch) return(unbekanntenvektor_symbolisch)
def unbekanntenvektor_numerisch(self, liste_unbekanntenvektor_symbolisch, unbekanntenvektor_symbolisch, dX_Vektor = None, unbekanntenvektor_neumerisch_vorherige_Iteration = None, iterationsnummer=0): def unbekanntenvektor_numerisch(self, liste_unbekanntenvektor_symbolisch: list,
unbekanntenvektor_symbolisch: sp.Matrix,
dX_Vektor: np.Matrix = None,
unbekanntenvektor_neumerisch_vorherige_Iteration: np.Matrix = None,
iterationsnummer: int = 0) -> ndarray[tuple[int, int], Any] | ndarray[tuple[Any, ...], dtype[Any]]:
self.liste_unbekanntenvektor_symbolisch = liste_unbekanntenvektor_symbolisch self.liste_unbekanntenvektor_symbolisch = liste_unbekanntenvektor_symbolisch
#if not hasattr(self, "liste_unbekanntenvektor_symbolisch"): #if not hasattr(self, "liste_unbekanntenvektor_symbolisch"):
@@ -636,7 +708,7 @@ class FunktionalesModell:
"Unbekanntenvektor") "Unbekanntenvektor")
return unbekanntenvektor_numerisch return unbekanntenvektor_numerisch
def unbekanntenvektor_numerisch_to_dict_unbekanntenvektor(self, liste_unbekanntenvektor_symbolisch, unbekanntenvektor_numerisch): def unbekanntenvektor_numerisch_to_dict_unbekanntenvektor(self, liste_unbekanntenvektor_symbolisch: list, unbekanntenvektor_numerisch: np.Matrix) -> dict:
dict_unbekanntenvektor_numerisch = {} dict_unbekanntenvektor_numerisch = {}
#index = 0 #index = 0
@@ -675,8 +747,8 @@ class FunktionalesModell:
#index += 3 #index += 3
return dict_koordinaten return dict_koordinaten
def berechnung_dl(self, beobachtungsvektor_numerisch, beobachtungsvektor_naeherung_numerisch, def berechnung_dl(self, beobachtungsvektor_numerisch: np.Matrix, beobachtungsvektor_naeherung_numerisch: sp.Matrix,
liste_beobachtungsvektor_symbolisch=None, iterationsnummer=0): liste_beobachtungsvektor_symbolisch: list = None, iterationsnummer: int = 0) -> np.Matrix:
dl = beobachtungsvektor_numerisch - beobachtungsvektor_naeherung_numerisch dl = beobachtungsvektor_numerisch - beobachtungsvektor_naeherung_numerisch
dl = np.asarray(dl, dtype=float) dl = np.asarray(dl, dtype=float)
@@ -697,7 +769,8 @@ class FunktionalesModell:
return dl return dl
def dict_substitutionen_uebergeordnetes_system(self, unbekanntenvektor_aus_iteration = None): def dict_substitutionen_uebergeordnetes_system(self,
unbekanntenvektor_aus_iteration: np.Matrix = None) -> dict[Any, Any]:
db_zugriff = Datenbankzugriff(self.pfad_datenbank) db_zugriff = Datenbankzugriff(self.pfad_datenbank)
berechnungen = Berechnungen(self.a, self.b) berechnungen = Berechnungen(self.a, self.b)
if unbekanntenvektor_aus_iteration is None: if unbekanntenvektor_aus_iteration is None:
@@ -828,8 +901,8 @@ class FunktionalesModell:
return substitutionen return substitutionen
def unbekanntenvektor_numerisch_to_dict_orientierungen(self, liste_unbekanntenvektor_symbolisch, def unbekanntenvektor_numerisch_to_dict_orientierungen(self, liste_unbekanntenvektor_symbolisch: list,
unbekanntenvektor_numerisch): unbekanntenvektor_numerisch: np.Matrix) -> dict[Any, Any]:
dict_O = {} dict_O = {}
unbekanntenvektor_numerisch = np.asarray(unbekanntenvektor_numerisch, dtype=float).reshape(-1, 1) unbekanntenvektor_numerisch = np.asarray(unbekanntenvektor_numerisch, dtype=float).reshape(-1, 1)
for i, symbol in enumerate(liste_unbekanntenvektor_symbolisch): for i, symbol in enumerate(liste_unbekanntenvektor_symbolisch):

View File

@@ -1,24 +1,25 @@
import csv import csv
import sqlite3 import sqlite3
from decimal import Decimal from decimal import Decimal
from typing import Any
import Berechnungen import Berechnungen
class Import: class Import:
def __init__(self, pfad_datenbank): def __init__(self, pfad_datenbank: str) -> None:
self.pfad_datenbank = pfad_datenbank self.pfad_datenbank = pfad_datenbank
pass pass
def string_to_float(self, zahl): def string_to_float(self, zahl: str) -> float:
zahl = zahl.replace(',', '.') zahl = zahl.replace(',', '.')
return float(zahl) return float(zahl)
def string_to_decimal(self, zahl): def string_to_decimal(self, zahl: str) -> Decimal:
zahl = zahl.replace(',', '.') zahl = zahl.replace(',', '.')
return Decimal(zahl) return Decimal(zahl)
def import_koordinaten_lh_tachymeter(self, pfad_datei): def import_koordinaten_lh_tachymeter(self, pfad_datei: str) -> None:
liste_punktnummern = [] liste_punktnummern = []
liste_punktnummern_vorher = [] liste_punktnummern_vorher = []
liste_punktnummern_vorher_db = [] liste_punktnummern_vorher_db = []
@@ -64,7 +65,7 @@ class Import:
con.close() con.close()
print("Der Import der Näherungskoordinaten wurde erfolgreich abgeschlossen") print("Der Import der Näherungskoordinaten wurde erfolgreich abgeschlossen")
def import_beobachtungen_tachymeter(self, pfad_datei, instrumentenID): def import_beobachtungen_tachymeter(self, pfad_datei: str, instrumentenID: int) -> None:
# Prüfen, ob Bereits Daten aus der Datei in der Datenbank vorhanden sind # Prüfen, ob Bereits Daten aus der Datei in der Datenbank vorhanden sind
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
@@ -228,7 +229,8 @@ class Import:
con.close() con.close()
print(f"Der Import der Datei {pfad_datei} wurde erfolgreich abgeschlossen.") print(f"Der Import der Datei {pfad_datei} wurde erfolgreich abgeschlossen.")
def vorbereitung_import_beobachtungen_nivellement_naeherung_punkthoehen(self, pfad_datei, instrumentenID): def vorbereitung_import_beobachtungen_nivellement_naeherung_punkthoehen(self, pfad_datei: str,
instrumentenID: int) -> None | tuple[None, None] | tuple[dict[Any, Any], list[Any]]:
# Prüfen, ob Bereits Daten aus der Datei in der Datenbank vorhanden sind # Prüfen, ob Bereits Daten aus der Datei in der Datenbank vorhanden sind
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
@@ -306,7 +308,9 @@ class Import:
print(f"Für folgende Nivellementpunkte werden die Höhen in der Ausgleichung berechnet: {liste_punktnummern_in_db}\nFür folgende Punkte wird aktuell keine Höhe in der Ausgleichung berechnet: {liste_punktnummern_nicht_in_db}. Bei Bedarf im folgenden Schritt ändern!") print(f"Für folgende Nivellementpunkte werden die Höhen in der Ausgleichung berechnet: {liste_punktnummern_in_db}\nFür folgende Punkte wird aktuell keine Höhe in der Ausgleichung berechnet: {liste_punktnummern_nicht_in_db}. Bei Bedarf im folgenden Schritt ändern!")
return dict_punkt_mittelwert_punkthoehen, liste_punktnummern_in_db return dict_punkt_mittelwert_punkthoehen, liste_punktnummern_in_db
def import_beobachtungen_nivellement_naeherung_punkthoehen(self, dict_punkt_mittelwert_punkthoehen, liste_punktnummern_in_db, liste_punktnummern_hinzufuegen): def import_beobachtungen_nivellement_naeherung_punkthoehen(self, dict_punkt_mittelwert_punkthoehen: dict,
liste_punktnummern_in_db: list,
liste_punktnummern_hinzufuegen: list) -> str | None:
Import_fortsetzen = True Import_fortsetzen = True
if dict_punkt_mittelwert_punkthoehen == None or liste_punktnummern_in_db == None or liste_punktnummern_hinzufuegen == None: if dict_punkt_mittelwert_punkthoehen == None or liste_punktnummern_in_db == None or liste_punktnummern_hinzufuegen == None:
@@ -346,7 +350,7 @@ class Import:
return f"Für folgende Punkte werden die Höhen Ausgeglichen: {liste_punktnummern_hinzufuegen + liste_punktnummern_in_db}" return f"Für folgende Punkte werden die Höhen Ausgeglichen: {liste_punktnummern_hinzufuegen + liste_punktnummern_in_db}"
def import_beobachtungen_nivellement_RVVR(self, pfad_datei, instrumentenID): def import_beobachtungen_nivellement_RVVR(self, pfad_datei: str, instrumentenID: int) -> str | None:
# Prüfen, ob Bereits Daten aus der Datei in der Datenbank vorhanden sind # Prüfen, ob Bereits Daten aus der Datei in der Datenbank vorhanden sind
con = sqlite3.connect(self.pfad_datenbank) con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor() cursor = con.cursor()
@@ -560,7 +564,7 @@ class Import:
def import_koordinaten_gnss(self, pfad_datei, liste_sapos_stationen_genauigkeiten): def import_koordinaten_gnss(self, pfad_datei: str, liste_sapos_stationen_genauigkeiten: list) -> str:
liste_zeilen = [] liste_zeilen = []
dict_koordinaten = {} dict_koordinaten = {}
@@ -597,7 +601,7 @@ class Import:
con.close() con.close()
return "Import der Koordinaten aus stationärem GNSS abgeschlossen." return "Import der Koordinaten aus stationärem GNSS abgeschlossen."
def import_basislinien_gnss(self, pfad_datei): def import_basislinien_gnss(self, pfad_datei: str) -> None:
Import_fortsetzen = True Import_fortsetzen = True
# Prüfen, ob Bereits Daten aus der Datei in der Datenbank vorhanden sind # Prüfen, ob Bereits Daten aus der Datei in der Datenbank vorhanden sind

View File

@@ -1,3 +1,5 @@
from typing import Any
import sympy as sp import sympy as sp
from sympy.algebras.quaternion import Quaternion from sympy.algebras.quaternion import Quaternion
import Datenbank import Datenbank
@@ -11,11 +13,11 @@ from pyproj import CRS, Transformer
class Transformationen: class Transformationen:
def __init__(self, pfad_datenbank): def __init__(self, pfad_datenbank: str) -> None:
self.pfad_datenbank = pfad_datenbank self.pfad_datenbank = pfad_datenbank
@staticmethod @staticmethod
def R_matrix_aus_euler(e1, e2, e3): def R_matrix_aus_euler(e1: float, e2: float, e3: float) -> sp.Matrix:
return sp.Matrix([ return sp.Matrix([
[ [
sp.cos(e2) * sp.cos(e3), sp.cos(e2) * sp.cos(e3),
@@ -34,7 +36,7 @@ class Transformationen:
] ]
]) ])
def Helmerttransformation_Euler_Transformationsparameter_berechne(self): def Helmerttransformation_Euler_Transformationsparameter_berechne(self) -> dict[Any, float]:
db = Datenbank.Datenbankzugriff(self.pfad_datenbank) db = Datenbank.Datenbankzugriff(self.pfad_datenbank)
dict_ausgangssystem = db.get_koordinaten("naeherung_lh", "Dict") dict_ausgangssystem = db.get_koordinaten("naeherung_lh", "Dict")
dict_zielsystem = db.get_koordinaten("naeherung_us", "Dict") dict_zielsystem = db.get_koordinaten("naeherung_us", "Dict")
@@ -251,7 +253,7 @@ class Transformationen:
return zahlen_final return zahlen_final
def Helmerttransformation(self, transformationsparameter): def Helmerttransformation(self, transformationsparameter: dict) -> dict[Any, Any]:
db = Datenbank.Datenbankzugriff(self.pfad_datenbank) db = Datenbank.Datenbankzugriff(self.pfad_datenbank)
dict_ausgangssystem = db.get_koordinaten("naeherung_lh", "Dict") dict_ausgangssystem = db.get_koordinaten("naeherung_lh", "Dict")
dict_zielsystem = db.get_koordinaten("naeherung_us", "Dict") dict_zielsystem = db.get_koordinaten("naeherung_us", "Dict")
@@ -286,7 +288,7 @@ class Transformationen:
]) ])
return dict_transformiert return dict_transformiert
def utm_to_XYZ(self, pfad_tif_quasigeoidundolation, liste_utm): def utm_to_XYZ(self, pfad_tif_quasigeoidundolation: str, liste_utm: list) -> dict[Any, Any]:
pfad_gcg_tif = Path(pfad_tif_quasigeoidundolation) pfad_gcg_tif = Path(pfad_tif_quasigeoidundolation)
pfad_gcg_tif_proj = pfad_gcg_tif.with_name("de_bkg_gcg2016.tif") pfad_gcg_tif_proj = pfad_gcg_tif.with_name("de_bkg_gcg2016.tif")

View File

@@ -146,50 +146,114 @@ class Genauigkeitsmaße:
return konfidenzellipsen return konfidenzellipsen
def plot_netz_komplett_final(x_vektor, unbekannten_labels, beobachtungs_labels, Qxx, sigma0_apost,
k_faktor=2.447, v_faktor=1000):
"""
Optimierter Plot für Jupyter Notebook:
- k_faktor: Statistischer Sicherheitsfaktor (2.447 entspricht 95% für 2D)
- v_faktor: Optische Überhöhung der Ellipsen (z.B. 1000 = mm werden als m dargestellt)
"""
@staticmethod x_vektor = np.asarray(x_vektor, float).reshape(-1)
def plot_ellipsen(punkt_coords: dict, ellipsen_parameter: list, scale: float = 1000): Qxx = np.asarray(Qxx, float)
fig = go.Figure()
# Titel dynamisch anpassen # 1. Datenaufbereitung
prob = ellipsen_parameter[0].get('prob', 0) coords = {}
titel = "Standard-Fehlerellipsen" if prob < 0.4 else f"{prob * 100:.0f}% Konfidenzellipsen" punkt_ids = sorted(set(str(l)[1:] for l in unbekannten_labels if str(l).startswith(('X', 'Y', 'Z'))))
pts_data = []
for p in ellipsen_parameter: for pid in punkt_ids:
name = p['name'] try:
x0, y0 = punkt_coords[name][0], punkt_coords[name][1] ix = next(i for i, s in enumerate(unbekannten_labels) if str(s) == f"X{pid}")
a, b, theta = p['a'], p['b'], p['theta'] iy = next(i for i, s in enumerate(unbekannten_labels) if str(s) == f"Y{pid}")
x, y = float(x_vektor[ix]), float(x_vektor[iy])
coords[pid] = (x, y)
t = np.linspace(0, 2 * np.pi, 100) # Kovarianzmatrix extrahieren und mit s0^2 skalieren
xs, ys = a * scale * np.cos(t), b * scale * np.sin(t) q_idx = [ix, iy]
Q_sub = Qxx[np.ix_(q_idx, q_idx)] * (sigma0_apost ** 2)
pts_data.append({'id': pid, 'x': x, 'y': y, 'Q': Q_sub})
except:
continue
x_plot = x0 + xs * np.cos(theta) - ys * np.sin(theta) if len(pts_data) == 0:
y_plot = y0 + xs * np.sin(theta) + ys * np.cos(theta) raise ValueError(
"Keine Netzpunkte extrahiert. Prüfe: x_vektor Form (u,) und Qxx Form (u,u) sowie Labels 'X<id>'/'Y<id>'.")
# Punkt fig = go.Figure()
fig.add_trace(go.Scatter(
x=[x0], y=[y0], mode='markers+text',
name=f"Punkt {name}", text=[name], textposition="top right",
marker=dict(size=8, color='black')
))
# Ellipse # 2. Beobachtungen (Gruppiert)
fig.add_trace(go.Scatter( beob_typen = {
x=x_plot, y=y_plot, mode='lines', 'GNSS-Basislinien': {'pattern': 'gnss', 'color': 'rgba(255, 100, 0, 0.4)'},
name=f"Ellipse {name}", 'Nivellement': {'pattern': 'niv', 'color': 'rgba(0, 200, 100, 0.4)'},
line=dict(color='blue' if prob > 0.4 else 'red', width=2, 'Tachymeter': {'pattern': '', 'color': 'rgba(100, 100, 100, 0.3)'}
dash='dash' if prob > 0.4 else 'solid'), }
fill='toself',
fillcolor='rgba(0, 0, 255, 0.1)' if prob > 0.4 else 'rgba(255, 0, 0, 0.1)',
hoverinfo='text',
text=(f"Punkt: {name}<br>a: {a * 1000:.2f} mm<br>"
f"b: {b * 1000:.2f} mm<br>Theta: {np.degrees(theta):.2f}°")
))
fig.update_layout( for typ, info in beob_typen.items():
title=titel, x_l, y_l = [], []
xaxis_title="Rechtswert (E) [m]", yaxis_title="Hochwert (N) [m]", for bl in beobachtungs_labels:
yaxis=dict(scaleanchor="x", scaleratio=1), bl_str = str(bl).lower()
template="plotly_white", showlegend=True if (info['pattern'] in bl_str and info['pattern'] != '') or (
) info['pattern'] == '' and 'gnss' not in bl_str and 'niv' not in bl_str):
return fig pts = [pid for pid in coords if f"_{pid}" in str(bl) or str(bl).startswith(f"{pid}_")]
if len(pts) >= 2:
x_l.extend([coords[pts[0]][0], coords[pts[1]][0], None])
y_l.extend([coords[pts[0]][1], coords[pts[1]][1], None])
if x_l:
fig.add_trace(go.Scatter(x=x_l, y=y_l, mode='lines', name=typ, line=dict(color=info['color'], width=1)))
# 3. Konfidenzellipsen mit v_faktor
for pt in pts_data:
vals, vecs = np.linalg.eigh(pt['Q'])
order = vals.argsort()[::-1]
vals, vecs = vals[order], vecs[:, order]
theta = np.degrees(np.arctan2(vecs[1, 0], vecs[0, 0]))
# Skalierung: k_faktor (Statistik) * v_faktor (Optik)
a = k_faktor * np.sqrt(vals[0]) * v_faktor
b = k_faktor * np.sqrt(vals[1]) * v_faktor
t = np.linspace(0, 2 * np.pi, 40)
e_x = a * np.cos(t)
e_y = b * np.sin(t)
R = np.array([[np.cos(np.radians(theta)), -np.sin(np.radians(theta))],
[np.sin(np.radians(theta)), np.cos(np.radians(theta))]])
rot = np.dot(R, np.array([e_x, e_y]))
fig.add_trace(go.Scatter(
x=rot[0, :] + pt['x'], y=rot[1, :] + pt['y'],
mode='lines', line=dict(color='red', width=1.5),
name=f"Ellipsen (Vergrößert {v_faktor}x)",
legendgroup="Ellipsen",
showlegend=(pt == pts_data[0]), # Nur einmal in der Legende zeigen
hoverinfo='skip'
))
# 4. Punkte
df_pts = pd.DataFrame(pts_data)
fig.add_trace(go.Scatter(
x=df_pts['x'], y=df_pts['y'], mode='markers+text',
text=df_pts['id'], textposition="top center",
marker=dict(size=8, color='black'), name="Netzpunkte"
))
# 5. Layout & Notebook-Größe
fig.update_layout(
title=f"Netzausgleichung: Ellipsen {v_faktor}-fach vergrößert (k={k_faktor})",
xaxis=dict(title="X [m]", tickformat="f", separatethousands=True, scaleanchor="y", scaleratio=1, showgrid=True,
gridcolor='lightgrey'),
yaxis=dict(title="Y [m]", tickformat="f", separatethousands=True, showgrid=True, gridcolor='lightgrey'),
width=1100, # Breite angepasst
height=900, # Höhe deutlich vergrößert für Jupiter Notebook
plot_bgcolor='white',
legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01, bgcolor="rgba(255,255,255,0.8)")
)
# Info-Annotation als Ersatz für einen physischen Maßstabstab
fig.add_annotation(
text=f"<b>Maßstab Ellipsen:</b><br>Dargestellte Größe = Wahre Ellipse × {v_faktor}",
align='left', showarrow=False, xref='paper', yref='paper', x=0.02, y=0.05,
bgcolor="white", bordercolor="black", borderwidth=1)
fig.show(config={'scrollZoom': True})

View File

@@ -49,9 +49,8 @@ class Zuverlaessigkeit:
interpretation = ( interpretation = (
"Nullhypothese H₀ verworfen!\n" "Nullhypothese H₀ verworfen!\n"
"Dies kann folgende Gründe haben:\n" "Dies kann folgende Gründe haben:\n"
"→ Es befinden sich grobe Fehler im Datenmaterial.\n" "→ Es befinden sich grobe Fehler im Datenmaterial. Bitte Lokaltest durchführen und ggf. grobe Fehler im Datenmaterial entfernen.\n"
"→ Das funktionale Modell ist fehlerhaft.\n" "→ Das stochastische Modell ist zu optimistisch. Bitte Gewichte überprüfen und ggf. anpassen."
"→ Das stochastische Modell ist zu optimistisch."
) )
return { return {
@@ -111,3 +110,107 @@ class Zuverlaessigkeit:
"δ0": nzp, "δ0": nzp,
}) })
return Lokaltest_innere_Zuv return Lokaltest_innere_Zuv
def aeussere_zuverlaessigkeit_EF_EP(Lokaltest, labels, Qxx, A, P, s0_apost, unbekannten_liste, x):
df = Lokaltest.copy()
labels = list(labels)
Qxx = np.asarray(Qxx, float)
A = np.asarray(A, float)
P = np.asarray(P, float)
x = np.asarray(x, float).reshape(-1)
ri = df["r_i"].astype(float).to_numpy()
GF = df["GF_i"].astype(float).to_numpy()
s_vi = df["s_vi"].astype(float).to_numpy()
GRZW = df["GRZW_i"].astype(float).to_numpy()
nzp = df["δ0"].astype(float).to_numpy()
n = A.shape[0] # Anzahl Beobachtungen
u = A.shape[1] # Anzahl Unbekannte
# Einflussfaktor EF berechnen
EF = np.zeros(n, dtype=float)
for i in range(n):
# 1) ∇l_i aufstellen
nabla_l = np.zeros((n, 1))
nabla_l[i, 0] = GRZW[i]
# 2) ∇x_i = Qxx * A^T * P * ∇l_i
nabla_x = Qxx @ (A.T @ (P @ nabla_l))
# 3) EF_i^2 = (∇x_i^T * Qxx^{-1} * ∇x_i) / s0^2
Qxx_inv_nabla_x = np.linalg.solve(Qxx, nabla_x) # = Qxx^{-1} ∇x_i
#EF2 = float((nabla_x.T @ Qxx_inv_nabla_x) / (float(s0_apost) ** 2)).item()
EF2 = ((nabla_x.T @ Qxx_inv_nabla_x) / (float(s0_apost) ** 2)).item()
EF[i] = np.sqrt(EF2)
# Koordinaten-Dict aus x
coords = {}
j = 0
while j < len(unbekannten_liste):
name = str(unbekannten_liste[j])
if name.startswith("X"):
pn = name[1:]
coords[pn] = (x[j], x[j + 1], x[j + 2])
j += 3
else:
j += 1
# EP + Standpunkte
EP_m = np.full(len(labels), np.nan, dtype=float)
standpunkte = [""] * len(labels)
for i, lbl in enumerate(labels):
parts = lbl.split("_")
sp = None
zp = None
# Tachymeter: ID_SD_GRP_SP_ZP / ID_R_GRP_SP_ZP / ID_ZW_GRP_SP_ZP
if ("_SD_" in lbl) or ("_R_" in lbl) or ("_ZW_" in lbl):
if len(parts) >= 5:
sp = parts[3].strip()
zp = parts[4].strip()
# GNSS: *_gnssbx_SP_ZP etc.
if ("gnss" in lbl) and (len(parts) >= 4):
sp = parts[-2].strip()
zp = parts[-1].strip()
standpunkte[i] = sp if sp is not None else ""
one_minus_r = (1.0 - ri[i])
# SD + GNSS: direkt in m
if ("_SD_" in lbl) or ("gnss" in lbl):
EP_m[i] = one_minus_r * GF[i]
# R / ZW: Winkel -> Streckenäquivalent über s
elif ("_R_" in lbl) or ("_ZW_" in lbl):
if sp and zp and (sp in coords) and (zp in coords):
X1, Y1, Z1 = coords[sp]
X2, Y2, Z2 = coords[zp]
s = float(np.sqrt((X2 - X1) ** 2 + (Y2 - Y1) ** 2 + (Z2 - Z1) ** 2))
EP_m[i] = one_minus_r * ((GF[i]) * s)
# SP am Standpunkt (2D)
diagQ = np.diag(Qxx)
SP_cache_mm = {}
for sp in set([s for s in standpunkte if s]):
idx_x = [k for k, sym in enumerate(unbekannten_liste) if str(sym) == f"X{sp}"][0]
qx = diagQ[idx_x]
qy = diagQ[idx_x + 1]
SP_cache_mm[sp] = float(s0_apost) * np.sqrt(qx + qy) * 1000.0
SP_mm = np.array([SP_cache_mm.get(sp, np.nan) for sp in standpunkte], dtype=float)
out = pd.DataFrame({
"Beobachtung": labels,
"Stand-Pkt": standpunkte,
"EF": EF,
"EP [mm]": EP_m * 1000.0,
"SP [mm]": SP_mm,
"EF*SP [mm]": EF * SP_mm,
})
return out

View File

@@ -42,6 +42,7 @@ def ausgleichung_global(A, dl, Q_ext):
"Q_xx": Q_xx, "Q_xx": Q_xx,
"Q_ll_dach": Q_ll_dach, "Q_ll_dach": Q_ll_dach,
"Q_vv": Q_vv, "Q_vv": Q_vv,
"Q_ext": Q_ext,
} }
return dict_ausgleichung, dx return dict_ausgleichung, dx

View File

@@ -1,7 +1,7 @@
import sympy as sp import sympy as sp
import numpy as np import numpy as np
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Dict, Tuple, Iterable from typing import Dict, Tuple, Iterable, Any
from Export import Export from Export import Export
from Datenbank import Datenbankzugriff from Datenbank import Datenbankzugriff
@@ -71,7 +71,7 @@ class StochastischesModell:
# Q_ll[i, i] = q_ii #Diagonale # Q_ll[i, i] = q_ii #Diagonale
# return Q_ll # return Q_ll
def Qll_symbolisch(self, pfad_datenbank, liste_beobachtungen_symbolisch): def Qll_symbolisch(self, pfad_datenbank: str, liste_beobachtungen_symbolisch: list) -> sp.Matrix:
liste_standardabweichungen_symbole = [] liste_standardabweichungen_symbole = []
liste_beobachtungen_symbolisch = [str(b) for b in liste_beobachtungen_symbolisch] liste_beobachtungen_symbolisch = [str(b) for b in liste_beobachtungen_symbolisch]
liste_beobachtungen_symbolisch = [b for b in liste_beobachtungen_symbolisch if not b.startswith("lA_")] liste_beobachtungen_symbolisch = [b for b in liste_beobachtungen_symbolisch if not b.startswith("lA_")]
@@ -191,7 +191,7 @@ class StochastischesModell:
Export.matrix_to_csv(r"Zwischenergebnisse\Qll_Symbolisch.csv", liste_beobachtungen_symbolisch, liste_beobachtungen_symbolisch, Qll, "Qll") Export.matrix_to_csv(r"Zwischenergebnisse\Qll_Symbolisch.csv", liste_beobachtungen_symbolisch, liste_beobachtungen_symbolisch, Qll, "Qll")
return Qll return Qll
def Qll_numerisch(self, pfad_datenbank, Qll_Matrix_Symbolisch, liste_beobachtungen_symbolisch): def Qll_numerisch(self, pfad_datenbank: str, Qll_Matrix_Symbolisch: sp.Matrix, liste_beobachtungen_symbolisch: list) -> np.Matrix:
liste_beobachtungen_symbolisch = [str(b).strip() for b in liste_beobachtungen_symbolisch] liste_beobachtungen_symbolisch = [str(b).strip() for b in liste_beobachtungen_symbolisch]
liste_beobachtungen_symbolisch = [b for b in liste_beobachtungen_symbolisch if not b.startswith("lA_")] liste_beobachtungen_symbolisch = [b for b in liste_beobachtungen_symbolisch if not b.startswith("lA_")]
@@ -308,8 +308,34 @@ class StochastischesModell:
raise ValueError( raise ValueError(
f"Qll_numerisch: Fehlende Substitutionen ({len(fehlend)}): {[str(s) for s in fehlend[:80]]}") f"Qll_numerisch: Fehlende Substitutionen ({len(fehlend)}): {[str(s) for s in fehlend[:80]]}")
liste_werte = [substitutionen[s] for s in self.liste_symbole_lambdify] #liste_werte = [substitutionen[s] for s in self.liste_symbole_lambdify]
Qll_numerisch = np.asarray(self.func_Qll_numerisch(*liste_werte), dtype=float) #Qll_numerisch = np.asarray(self.func_Qll_numerisch(*liste_werte), dtype=float)
rows = int(Qll_Matrix_Symbolisch.rows)
cols = int(Qll_Matrix_Symbolisch.cols)
Qll_numerisch = np.zeros((rows, cols), dtype=float)
for i in range(rows):
for j in range(cols):
eintrag = Qll_Matrix_Symbolisch[i, j]
if eintrag == 0:
continue
try:
if hasattr(eintrag, "is_zero") and (eintrag.is_zero is True):
continue
except Exception:
pass
eintrag_num = eintrag.xreplace(substitutionen)
if hasattr(eintrag_num, "free_symbols") and len(eintrag_num.free_symbols) > 0:
rest = sorted(list(eintrag_num.free_symbols), key=lambda s: str(s))
raise ValueError(
f"Qll_numerisch: Eintrag [{i},{j}] bleibt symbolisch. Rest-Symbole: {[str(s) for s in rest[:20]]}"
)
Qll_numerisch[i, j] = float(eintrag_num)
Export.matrix_to_csv( Export.matrix_to_csv(
r"Zwischenergebnisse\Qll_Numerisch.csv", r"Zwischenergebnisse\Qll_Numerisch.csv",
@@ -321,7 +347,7 @@ class StochastischesModell:
return Qll_numerisch return Qll_numerisch
def QAA_symbolisch(self, liste_beobachtungen_symbolisch): def QAA_symbolisch(self, liste_beobachtungen_symbolisch: list) -> np.Matrix:
liste_standardabweichungen_symbole = [] liste_standardabweichungen_symbole = []
liste_beobachtungen_symbolisch = [str(b) for b in liste_beobachtungen_symbolisch] liste_beobachtungen_symbolisch = [str(b) for b in liste_beobachtungen_symbolisch]
liste_beobachtungen_symbolisch = [b for b in liste_beobachtungen_symbolisch if b.startswith("lA_")] liste_beobachtungen_symbolisch = [b for b in liste_beobachtungen_symbolisch if b.startswith("lA_")]
@@ -341,7 +367,7 @@ class StochastischesModell:
Export.matrix_to_csv(r"Zwischenergebnisse\QAA_Symbolisch.csv", liste_beobachtungen_symbolisch, liste_beobachtungen_symbolisch, Qll, "Qll") Export.matrix_to_csv(r"Zwischenergebnisse\QAA_Symbolisch.csv", liste_beobachtungen_symbolisch, liste_beobachtungen_symbolisch, Qll, "Qll")
return Qll return Qll
def QAA_numerisch(self, pfad_datenbank, QAA_Matrix_Symbolisch, liste_beobachtungen_symbolisch): def QAA_numerisch(self, pfad_datenbank: str, QAA_Matrix_Symbolisch: sp.Matrix, liste_beobachtungen_symbolisch: list) -> np.Matrix:
liste_beobachtungen_symbolisch = [str(b).strip() for b in liste_beobachtungen_symbolisch] liste_beobachtungen_symbolisch = [str(b).strip() for b in liste_beobachtungen_symbolisch]
liste_beobachtungen_symbolisch = [b for b in liste_beobachtungen_symbolisch if b.startswith("lA_")] liste_beobachtungen_symbolisch = [b for b in liste_beobachtungen_symbolisch if b.startswith("lA_")]