Files
Masterprojekt_V3/Stochastisches_Modell.py
2026-01-07 13:51:17 +01:00

364 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sympy as sp
import numpy as np
from dataclasses import dataclass, field
from typing import Dict, Tuple, Iterable
from Export import Export
from Datenbank import Datenbankzugriff
@dataclass
class StochastischesModell:
n_beob: int
sigma_beob: Iterable[float] =None #σ a priori der einzelnen Beobachtung
gruppe_beob: Iterable[int] =None #Gruppenzugehörigkeit jeder Beobachtung (Distanz, Richtung, GNSS, Nivellement,...,)
sigma0_gruppe: Dict[int, float] = field(default_factory=dict) #σ0² für jede Gruppe
def __post_init__(self):
# Defaults setzen
if self.sigma_beob is None:
self.sigma_beob = [1.0] * int(self.n_beob)
if self.gruppe_beob is None:
self.gruppe_beob = [1] * int(self.n_beob)
# In SymPy-Spaltenvektoren umwandeln
#self.sigma_beob = sp.Matrix(list(self.sigma_beob))
#self.gruppe_beob = sp.Matrix(list(self.gruppe_beob))
# Dimension prüfen
#if self.sigma_beob.rows != self.gruppe_beob.rows:
# raise ValueError("sigma_beob und gruppe_beob müssen gleich viele Einträge haben.")
#if self.sigma_beob.rows != int(self.n_beob):
# raise ValueError("n_beob passt nicht zur Länge von sigma_beob / gruppe_beob.")
# Fehlende Gruppen mit sigma0_sq = 1.0 ergänzen
#unique_groups = sorted({int(g) for g in self.gruppe_beob})
#for g in unique_groups:
# if g not in self.sigma0_gruppe:
# self.sigma0_gruppe[g] = 1.0
# In NumPy-Spaltenvektoren umwandeln
self.sigma_beob = np.asarray(list(self.sigma_beob), dtype=float).reshape(-1, 1)
self.gruppe_beob = np.asarray(list(self.gruppe_beob), dtype=int).reshape(-1, 1)
# Dimension prüfen
if self.sigma_beob.shape[0] != self.gruppe_beob.shape[0]:
raise ValueError("sigma_beob und gruppe_beob müssen gleich viele Einträge haben.")
if self.sigma_beob.shape[0] != int(self.n_beob):
raise ValueError("n_beob passt nicht zur Länge von sigma_beob / gruppe_beob.")
# Fehlende Gruppen mit sigma0_sq = 1.0 ergänzen
unique_groups = sorted({int(g) for g in self.gruppe_beob.flatten()})
for g in unique_groups:
if g not in self.sigma0_gruppe:
self.sigma0_gruppe[g] = 1.0
self.func_Qll_numerisch = None
self.liste_symbole_lambdify = None
#def berechne_Qll(self) -> Tuple[sp.Matrix, sp.Matrix]:
# n = self.n_beob
# Q_ll = sp.zeros(n, n)
# P = sp.zeros(n, n)
# for i in range(self.n_beob):
# sigma_i = self.sigma_beob[i, 0] #σ-Wert der i-ten Beobachtung holen
# g = int(self.gruppe_beob[i, 0]) #Gruppenzugehörigkeit der Beobachtung bestimmen
# sigma0_sq = self.sigma0_gruppe[g] #Den Varianzfaktor der Gruppe holen
# q_ii = sigma_i**2 #σ² berechnen
# Q_ll[i, i] = q_ii #Diagonale
# return Q_ll
def Qll_symbolisch(self, pfad_datenbank, liste_beobachtungen_symbolisch):
liste_standardabweichungen_symbole = []
liste_beobachtungen_symbolisch = [str(b) for b in liste_beobachtungen_symbolisch]
liste_beobachtungen_symbolisch = [b for b in liste_beobachtungen_symbolisch if not b.startswith("lA_")]
Qll = sp.zeros(len(liste_beobachtungen_symbolisch), len(liste_beobachtungen_symbolisch))
db_zugriff = Datenbankzugriff(pfad_datenbank)
dict_beobachtungenID_instrumenteID = db_zugriff.get_instrumenteID_beobachtungenID_dict()
for i, beobachtung_symbolisch_i in enumerate(liste_beobachtungen_symbolisch):
aufgeteilt_i = beobachtung_symbolisch_i.split("_")
if aufgeteilt_i[1] == "SD" or aufgeteilt_i[1] == "R" or aufgeteilt_i[1] == "ZW":
beobachtungenID_i = int(aufgeteilt_i[0])
instrumenteID_i = dict_beobachtungenID_instrumenteID[beobachtungenID_i]
beobachtungsart_i = str(aufgeteilt_i[1])
if beobachtungsart_i == "SD":
stabw_apriori_konstant = sp.Symbol(f"stabw_apriori_konstant_{beobachtungsart_i}_{instrumenteID_i}")
stabw_apriori_streckenprop = sp.Symbol(f"stabw_apriori_streckenprop_{beobachtungsart_i}_{instrumenteID_i}")
tachymeter_distanz = sp.Symbol(f"SD_{beobachtungenID_i}")
sigma = sp.sqrt(stabw_apriori_konstant ** 2 + (stabw_apriori_streckenprop * tachymeter_distanz / 1000000) ** 2)
liste_standardabweichungen_symbole.append(sigma)
Qll[i, i] = sigma ** 2
elif beobachtungsart_i == "R" or beobachtungsart_i == "ZW":
stabw_apriori_konstant = sp.Symbol(f"stabw_apriori_konstant_{beobachtungsart_i}_{instrumenteID_i}")
stabw_apriori_konstant_distanz = sp.Symbol(f"stabw_apriori_konstant_SD_{instrumenteID_i}")
tachymeter_distanz = sp.Symbol(f"SD_{beobachtungenID_i}")
sigma = sp.sqrt(
stabw_apriori_konstant ** 2 + (stabw_apriori_konstant_distanz / tachymeter_distanz) ** 2)
liste_standardabweichungen_symbole.append(sigma)
Qll[i, i] = sigma ** 2
for j in range(i + 1, len(liste_beobachtungen_symbolisch)):
beobachtung_symbolisch_j = liste_beobachtungen_symbolisch[j]
aufgeteilt_j = beobachtung_symbolisch_j.split("_")
beobachtungsart_j = str(aufgeteilt_j[1])
if beobachtungsart_i == "SD" and beobachtungsart_j == "SD":
Qll[i, j] = 0
Qll[j, i] = 0
if aufgeteilt_i [1] == "gnssbx" or aufgeteilt_i[1] == "gnssby" or aufgeteilt_i[1] == "gnssbz":
beobachtungenID_i = int(aufgeteilt_i[0])
beobachtungsart_i = str(aufgeteilt_i[1])
if beobachtungsart_i == "gnssbx":
cxx = sp.symbols(f"cxx_{beobachtungenID_i}")
s0 = sp.symbols(f"s0_{beobachtungenID_i}**2")
liste_standardabweichungen_symbole.append(cxx)
Qll[i, i] = cxx * s0
cxy = sp.Symbol(f"cxy_{beobachtungenID_i}")
s0 = sp.symbols(f"s0_{beobachtungenID_i}**2")
for j in range(i + 1, len(liste_beobachtungen_symbolisch)):
beobachtung_symbolisch_j = liste_beobachtungen_symbolisch[j]
aufgeteilt_j = beobachtung_symbolisch_j.split("_")
if int(aufgeteilt_j[0]) == beobachtungenID_i and aufgeteilt_j[1] == "gnssby":
Qll[i, j] = cxy * s0
Qll[j, i] = cxy * s0
break
cxz = sp.Symbol(f"cxz_{beobachtungenID_i}")
s0 = sp.symbols(f"s0_{beobachtungenID_i}**2")
for j in range(i + 1, len(liste_beobachtungen_symbolisch)):
beobachtung_symbolisch_j = liste_beobachtungen_symbolisch[j]
aufgeteilt_j = beobachtung_symbolisch_j.split("_")
if int(aufgeteilt_j[0]) == beobachtungenID_i and aufgeteilt_j[1] == "gnssbz":
Qll[i, j] = cxz * s0
Qll[j, i] = cxz * s0
break
if beobachtungsart_i == "gnssby":
cyy = sp.symbols(f"cyy_{beobachtungenID_i}")
s0 = sp.symbols(f"s0_{beobachtungenID_i}**2")
liste_standardabweichungen_symbole.append(cyy)
Qll[i, i] = cyy * s0
cyz = sp.Symbol(f"cyz_{beobachtungenID_i}")
s0 = sp.symbols(f"s0_{beobachtungenID_i}**2")
for j in range(i + 1, len(liste_beobachtungen_symbolisch)):
beobachtung_symbolisch_j = liste_beobachtungen_symbolisch[j]
aufgeteilt_j = beobachtung_symbolisch_j.split("_")
if int(aufgeteilt_j[0]) == beobachtungenID_i and aufgeteilt_j[1] == "gnssbz":
Qll[i, j] = cyz * s0
Qll[j, i] = cyz * s0
break
if beobachtungsart_i == "gnssbz":
czz = sp.symbols(f"czz_{beobachtungenID_i}")
s0 = sp.symbols(f"s0_{beobachtungenID_i}**2")
liste_standardabweichungen_symbole.append(czz)
Qll[i, i] = czz * s0
Export.matrix_to_csv(r"Zwischenergebnisse\Qll_Symbolisch.csv", liste_beobachtungen_symbolisch, liste_beobachtungen_symbolisch, Qll, "Qll")
return Qll
def Qll_numerisch(self, pfad_datenbank, Qll_Matrix_Symbolisch, liste_beobachtungen_symbolisch):
liste_beobachtungen_symbolisch = [str(b).strip() for b in liste_beobachtungen_symbolisch]
liste_beobachtungen_symbolisch = [b for b in liste_beobachtungen_symbolisch if not b.startswith("lA_")]
db_zugriff = Datenbankzugriff(pfad_datenbank)
dict_genauigkeiten = db_zugriff.get_genauigkeiten_dict()
dict_beobachtungenID_instrumenteID = db_zugriff.get_instrumenteID_beobachtungenID_dict()
liste_beobachtungen_tachymeter = db_zugriff.get_beobachtungen_from_beobachtungenid()
liste_beobachtungen_gnss = db_zugriff.get_beobachtungen_gnssbasislinien()
dict_beobachtungenID_distanz = {}
for standpunkt, zielpunkt, beobachtungenID, beobachtungsgruppeID, tachymeter_richtung, tachymeter_zenitwinkel, tachymeter_distanz in liste_beobachtungen_tachymeter:
dict_beobachtungenID_distanz[int(beobachtungenID)] = tachymeter_distanz
dict_genauigkeiten_neu = {}
for genauigkeitenID, eintrag in dict_genauigkeiten.items():
instrumenteID = int(eintrag[0])
beobachtungsart = str(eintrag[1])
stabw_apriori_konstant = eintrag[2]
stabw_apriori_streckenprop = eintrag[3]
dict_genauigkeiten_neu[(instrumenteID, beobachtungsart)] = (stabw_apriori_konstant,
stabw_apriori_streckenprop)
substitutionen = {}
dict_konstante_sd = {}
for (instrumenteID, beobachtungsart), (stabw_apriori_konstant,
stabw_apriori_streckenprop) in dict_genauigkeiten_neu.items():
if beobachtungsart == "Tachymeter_Strecke":
if stabw_apriori_konstant is not None:
dict_konstante_sd[instrumenteID] = float(stabw_apriori_konstant)
for (instrumenteID, beobachtungsart), (stabw_apriori_konstant,
stabw_apriori_streckenprop) in dict_genauigkeiten_neu.items():
if beobachtungsart == "Tachymeter_Strecke":
beobachtungsart_kurz = "SD"
elif beobachtungsart == "Tachymeter_Richtung":
beobachtungsart_kurz = "R"
elif beobachtungsart == "Tachymeter_Zenitwinkel":
beobachtungsart_kurz = "ZW"
if stabw_apriori_konstant is not None:
substitutionen[sp.Symbol(f"stabw_apriori_konstant_{beobachtungsart_kurz}_{instrumenteID}")] = float(stabw_apriori_konstant)
if stabw_apriori_streckenprop is not None:
substitutionen[sp.Symbol(f"stabw_apriori_streckenprop_{beobachtungsart_kurz}_{instrumenteID}")] = float(stabw_apriori_streckenprop)
for instrumenteID, wert in dict_konstante_sd.items():
substitutionen[sp.Symbol(f"stabw_apriori_konstant_SD_{instrumenteID}")] = float(wert)
liste_beobachtungen_symbolisch = [str(b) for b in liste_beobachtungen_symbolisch]
for beobachtung_symbolisch in liste_beobachtungen_symbolisch:
aufgeteilt = beobachtung_symbolisch.split("_")
beobachtungenID = int(aufgeteilt[0])
distanz = dict_beobachtungenID_distanz.get(beobachtungenID, None)
if distanz is not None:
substitutionen[sp.Symbol(f"SD_{beobachtungenID}")] = float(distanz)
#GNSS Basislinien
for gnss_beobachtungen in liste_beobachtungen_gnss:
beobachtungenID = gnss_beobachtungen[0]
gnss_s0 = gnss_beobachtungen[6]
gnss_cxx = gnss_beobachtungen[7]
gnss_cxy = gnss_beobachtungen[8]
gnss_cxz = gnss_beobachtungen[9]
gnss_cyy = gnss_beobachtungen[10]
gnss_cyz = gnss_beobachtungen[11]
gnss_czz = gnss_beobachtungen[12]
substitutionen[sp.Symbol(f"cxx_{beobachtungenID}")] = float(gnss_cxx)
substitutionen[sp.Symbol(f"cxy_{beobachtungenID}")] = float(gnss_cxy)
substitutionen[sp.Symbol(f"cxz_{beobachtungenID}")] = float(gnss_cxz)
substitutionen[sp.Symbol(f"cyy_{beobachtungenID}")] = float(gnss_cyy)
substitutionen[sp.Symbol(f"cyz_{beobachtungenID}")] = float(gnss_cyz)
substitutionen[sp.Symbol(f"czz_{beobachtungenID}")] = float(gnss_czz)
substitutionen[sp.Symbol(f"s0_{beobachtungenID}")] = float(gnss_s0)
#Qll_numerisch = Qll_Matrix_Symbolisch.xreplace(substitutionen)
if (self.func_Qll_numerisch is None) or (set(self.liste_symbole_lambdify) != set(substitutionen.keys())):
self.liste_symbole_lambdify = sorted(substitutionen.keys(), key=lambda s: str(s))
self.func_Qll_numerisch = sp.lambdify(
self.liste_symbole_lambdify,
Qll_Matrix_Symbolisch,
modules="numpy",
cse=True
)
liste_werte = [substitutionen[s] for s in self.liste_symbole_lambdify]
Qll_numerisch = np.asarray(self.func_Qll_numerisch(*liste_werte), dtype=float)
Export.matrix_to_csv(
r"Zwischenergebnisse\Qll_Numerisch.csv",
liste_beobachtungen_symbolisch,
liste_beobachtungen_symbolisch,
Qll_numerisch,
"Qll"
)
return Qll_numerisch
def QAA_symbolisch(self, liste_beobachtungen_symbolisch):
liste_standardabweichungen_symbole = []
liste_beobachtungen_symbolisch = [str(b) for b in liste_beobachtungen_symbolisch]
liste_beobachtungen_symbolisch = [b for b in liste_beobachtungen_symbolisch if b.startswith("lA_")]
Qll = sp.zeros(len(liste_beobachtungen_symbolisch), len(liste_beobachtungen_symbolisch))
for i, beobachtung_symbolisch_i in enumerate(liste_beobachtungen_symbolisch):
aufgeteilt_i = beobachtung_symbolisch_i.split("_")
datumskoordinate = str(aufgeteilt_i[1])
beobachtungsart_i = str(aufgeteilt_i[0])
if beobachtungsart_i == "lA":
sigma = sp.Symbol(f"StabwAA_{datumskoordinate}")
liste_standardabweichungen_symbole.append(sigma)
Qll[i, i] = sigma ** 2
Export.matrix_to_csv(r"Zwischenergebnisse\QAA_Symbolisch.csv", liste_beobachtungen_symbolisch, liste_beobachtungen_symbolisch, Qll, "Qll")
return Qll
def QAA_numerisch(self, pfad_datenbank, QAA_Matrix_Symbolisch, liste_beobachtungen_symbolisch):
liste_beobachtungen_symbolisch = [str(b).strip() for b in liste_beobachtungen_symbolisch]
liste_beobachtungen_symbolisch = [b for b in liste_beobachtungen_symbolisch if b.startswith("lA_")]
db_zugriff = Datenbankzugriff(pfad_datenbank)
dict_stabwAA_vorinfo = db_zugriff.get_stabw_AA_Netzpunkte()
substitutionen = {}
for koordinate, stabwAA in dict_stabwAA_vorinfo.items():
substitutionen[sp.Symbol(str(koordinate).strip())] = float(stabwAA)
if not hasattr(self, "func_QAA_numerisch"):
self.func_QAA_numerisch = None
if not hasattr(self, "liste_symbole_lambdify_QAA"):
self.liste_symbole_lambdify_QAA = []
if (self.func_QAA_numerisch is None) or (set(self.liste_symbole_lambdify_QAA) != set(substitutionen.keys())):
self.liste_symbole_lambdify_QAA = sorted(substitutionen.keys(), key=lambda s: str(s))
self.func_QAA_numerisch = sp.lambdify(
self.liste_symbole_lambdify_QAA,
QAA_Matrix_Symbolisch,
modules="numpy",
cse=True
)
liste_werte = [substitutionen[s] for s in self.liste_symbole_lambdify_QAA]
QAA_numerisch = np.asarray(self.func_QAA_numerisch(*liste_werte), dtype=float)
Export.matrix_to_csv(
r"Zwischenergebnisse\QAA_Numerisch.csv",
liste_beobachtungen_symbolisch,
liste_beobachtungen_symbolisch,
QAA_numerisch,
"QAA"
)
return QAA_numerisch
@staticmethod
def berechne_P(Q_ll):
return np.linalg.inv(Q_ll)
@staticmethod
def berechne_Q_xx(N):
if N.shape[0] != N.shape[1]:
raise ValueError("N muss eine quadratische Matrix sein")
return np.linalg.inv(N)
def berechne_Qvv(self, A, P, Q_xx):
Q_vv = np.linalg.inv(P) - A @ Q_xx @ A.T
return Q_vv
def berechne_R(self, Q_vv, P):
return Q_vv @ P #Redundanzmatrix
def berechne_r(self, R):
return np.diag(R).reshape(-1, 1) #Redundanzanteile