547 lines
28 KiB
Python
547 lines
28 KiB
Python
from dataclasses import dataclass, field
|
||
import numpy as np
|
||
import sympy as sp
|
||
from typing import Dict, Iterable
|
||
|
||
from Datenbank import Datenbankzugriff
|
||
from Export import Export
|
||
|
||
@dataclass
|
||
class StochastischesModell:
|
||
"""Stochastisches Modell zur Aufstellung und Auswertung von Varianz-Kovarianz-Matrizen.
|
||
|
||
Die Klasse stellt Methoden zur Verfügung für:
|
||
|
||
- symbolischen Aufbau der Varianz-Kovarianz-Matrix der Beobachtungen Qll,
|
||
- numerische Substitution von Qll aus der Datenbank,
|
||
- symbolischen und numerischen Aufbau der Zusatz-Varianz-Kovarianz-Matrix QAA für Anschlusspunkte (weiche Lagerung, lA_*),
|
||
- Ableitung zentraler Matrizen der Ausgleichung (P, Qxx, Qll_dach, Qvv) als Hilfsfunktionen.
|
||
|
||
Die Grundidee ist, die stochastischen Zusammenhänge zunächst symbolisch abzubilden (SymPy) und anschließend
|
||
mit Datenbankwerten zu substituieren.
|
||
"""
|
||
|
||
n_beob: int
|
||
sigma_beob: Iterable[float] =None #σ a priori der einzelnen Beobachtung
|
||
gruppe_beob: Iterable[int] =None #Gruppenzugehörigkeit jeder Beobachtung (Distanz, Richtung, GNSS, Nivellement,...,)
|
||
sigma0_gruppe: Dict[int, float] = field(default_factory=dict) #σ0² für jede Gruppe
|
||
|
||
|
||
def __init__(self, pfad_datenbank: str) -> None:
|
||
"""Initialisiert das stochastische Modell.
|
||
|
||
Speichert den Pfad zur SQLite-Datenbank, initialisiert den Datenbankzugriff und legt Cache-Variablen an,
|
||
die in den numerischen Auswertungen zur Rechenzeitersparnis wiederverwendet werden.
|
||
|
||
:param pfad_datenbank: Pfad zur SQLite-Datenbank.
|
||
:type pfad_datenbank: str
|
||
:return: None
|
||
:rtype: None
|
||
"""
|
||
self.pfad_datenbank = pfad_datenbank
|
||
self.func_Qll_numerisch = None
|
||
self.liste_symbole_lambdify = None
|
||
self.db_zugriff = Datenbankzugriff(self.pfad_datenbank)
|
||
|
||
def Qll_symbolisch(self, liste_beobachtungen_symbolisch: list) -> sp.Matrix:
|
||
"""Erstellt die symbolische Kofaktormatrix der Beobachtungen.
|
||
|
||
Aus den symbolischen Beobachtungskennungen wird die Beobachtungsart abgeleitet (Tachymeter: SD/R/ZW,
|
||
GNSS: gnssbx/gnssby/gnssbz, Geometrisches Nivellement: niv). Für jede Beobachtung wird eine symbolische Varianzgleichung
|
||
aufgestellt und in Qll eingetragen.
|
||
|
||
Berücksichtigte Gleichungen:
|
||
|
||
- Tachymeter SD: σ = sqrt(σ_konstant² + (σ_streckenprop * SD / 1 000 000)²), Varianz = varkomp * σ²
|
||
- Tachymeter R/ZW: σ = sqrt(σ_konstant² + (σ_konstant_SD / SD)²), Varianz = varkomp * σ²
|
||
- GNSS: Diagonale und Korrelationen je Basislinie aus cxx/cyy/czz und cxy/cxz/cyz, jeweils skaliert mit s0²
|
||
- Nivellement: σ = sqrt(n_wechselpunkte * σ_konstant² + σ_streckenprop² * distanz / 1000), Varianz = varkomp * σ²
|
||
|
||
Die symbolische Matrix wird als CSV-Datei in Zwischenergebnisse\\Qll_Symbolisch.csv exportiert.
|
||
|
||
:param liste_beobachtungen_symbolisch: Liste der symbolischen Beobachtungskennungen.
|
||
:type liste_beobachtungen_symbolisch: list
|
||
:return: Symbolische Kofaktormatrix Qll.
|
||
:rtype: sp.Matrix
|
||
"""
|
||
liste_standardabweichungen_symbole = []
|
||
|
||
# Vorbereitung und Abfrage der notwendigen Listen und Dictionaries
|
||
liste_beobachtungen_symbolisch = [str(b) for b in liste_beobachtungen_symbolisch]
|
||
liste_beobachtungen_symbolisch = [b for b in liste_beobachtungen_symbolisch if not b.startswith("lA_")]
|
||
Qll = sp.zeros(len(liste_beobachtungen_symbolisch), len(liste_beobachtungen_symbolisch))
|
||
dict_beobachtungenID_instrumenteID = self.db_zugriff.get_instrumenteID_beobachtungenID_dict()
|
||
|
||
# Aufstellen der Symbolischen Gleichungen für die Einträge in der Qll-Matrix jeder Beobachtungsgruppe
|
||
for i, beobachtung_symbolisch_i in enumerate(liste_beobachtungen_symbolisch):
|
||
aufgeteilt_i = beobachtung_symbolisch_i.split("_")
|
||
beobachtungenID_i = int(aufgeteilt_i[0])
|
||
instrumenteID_i = dict_beobachtungenID_instrumenteID[beobachtungenID_i]
|
||
|
||
# SD = Schrägdistanzen | R = Richtung | ZW = Zenitwinkel
|
||
# varkomp = Varianzkomponentenschätzung (Ist noch keine Erfolgt, ist dieser Faktor = 1
|
||
if aufgeteilt_i[1] == "SD" or aufgeteilt_i[1] == "R" or aufgeteilt_i[1] == "ZW":
|
||
|
||
beobachtungsart_i = str(aufgeteilt_i[1])
|
||
|
||
if beobachtungsart_i == "SD":
|
||
stabw_apriori_konstant = sp.Symbol(f"stabw_apriori_konstant_{beobachtungsart_i}_{instrumenteID_i}")
|
||
stabw_apriori_streckenprop = sp.Symbol(f"stabw_apriori_streckenprop_{beobachtungsart_i}_{instrumenteID_i}")
|
||
tachymeter_distanz = sp.Symbol(f"SD_{beobachtungenID_i}")
|
||
|
||
sigma = sp.sqrt(stabw_apriori_konstant ** 2 + (stabw_apriori_streckenprop * tachymeter_distanz / 1000000) ** 2)
|
||
liste_standardabweichungen_symbole.append(sigma)
|
||
|
||
varianzkompontenschaetzung = sp.Symbol(f"varkomp_{instrumenteID_i}_Tachymeter_Streckenbeobachtungen")
|
||
|
||
Qll[i, i] = (varianzkompontenschaetzung) * (sigma ** 2)
|
||
|
||
elif beobachtungsart_i == "R" or beobachtungsart_i == "ZW":
|
||
stabw_apriori_konstant = sp.Symbol(f"stabw_apriori_konstant_{beobachtungsart_i}_{instrumenteID_i}")
|
||
|
||
stabw_apriori_konstant_distanz = sp.Symbol(f"stabw_apriori_konstant_SD_{instrumenteID_i}")
|
||
tachymeter_distanz = sp.Symbol(f"SD_{beobachtungenID_i}")
|
||
|
||
sigma = sp.sqrt(
|
||
stabw_apriori_konstant ** 2 + (stabw_apriori_konstant_distanz / tachymeter_distanz) ** 2)
|
||
liste_standardabweichungen_symbole.append(sigma)
|
||
if beobachtungsart_i == "R":
|
||
varianzkompontenschaetzung = sp.Symbol(
|
||
f"varkomp_{instrumenteID_i}_Tachymeter_Richtungsbeobachtungen")
|
||
if beobachtungsart_i == "ZW":
|
||
varianzkompontenschaetzung = sp.Symbol(
|
||
f"varkomp_{instrumenteID_i}_Tachymeter_Zenitwinkelbeobachtungen")
|
||
|
||
Qll[i, i] = (varianzkompontenschaetzung) * sigma ** 2
|
||
|
||
# Setzen der 0-Einträge in der Qll-Matrix
|
||
for j in range(i + 1, len(liste_beobachtungen_symbolisch)):
|
||
beobachtung_symbolisch_j = liste_beobachtungen_symbolisch[j]
|
||
aufgeteilt_j = beobachtung_symbolisch_j.split("_")
|
||
beobachtungsart_j = str(aufgeteilt_j[1])
|
||
|
||
if beobachtungsart_i == "SD" and beobachtungsart_j == "SD":
|
||
Qll[i, j] = 0
|
||
Qll[j, i] = 0
|
||
|
||
# GNSS
|
||
# s0, sowie die kovarianzen cxx, ... entstammen direkt LeicaGeooffice
|
||
if aufgeteilt_i [1] == "gnssbx" or aufgeteilt_i[1] == "gnssby" or aufgeteilt_i[1] == "gnssbz":
|
||
beobachtungsart_i = str(aufgeteilt_i[1])
|
||
varianzkompontenschaetzung = sp.Symbol(
|
||
f"varkomp_{instrumenteID_i}_GNSS-Rover_Basislinienbeobachtungen")
|
||
|
||
if beobachtungsart_i == "gnssbx":
|
||
cxx = sp.symbols(f"cxx_{beobachtungenID_i}")
|
||
s0 = sp.symbols(f"s0_{beobachtungenID_i}")
|
||
liste_standardabweichungen_symbole.append(cxx)
|
||
Qll[i, i] = (varianzkompontenschaetzung) * (cxx * (s0 ** 2))
|
||
|
||
cxy = sp.Symbol(f"cxy_{beobachtungenID_i}")
|
||
s0 = sp.symbols(f"s0_{beobachtungenID_i}")
|
||
for j in range(i + 1, len(liste_beobachtungen_symbolisch)):
|
||
beobachtung_symbolisch_j = liste_beobachtungen_symbolisch[j]
|
||
aufgeteilt_j = beobachtung_symbolisch_j.split("_")
|
||
|
||
if int(aufgeteilt_j[0]) == beobachtungenID_i and aufgeteilt_j[1] == "gnssby":
|
||
Qll[i, j] = (varianzkompontenschaetzung) * (cxy * (s0 ** 2))
|
||
Qll[j, i] = (varianzkompontenschaetzung) * (cxy * (s0 ** 2))
|
||
break
|
||
|
||
cxz = sp.Symbol(f"cxz_{beobachtungenID_i}")
|
||
s0 = sp.symbols(f"s0_{beobachtungenID_i}")
|
||
for j in range(i + 1, len(liste_beobachtungen_symbolisch)):
|
||
beobachtung_symbolisch_j = liste_beobachtungen_symbolisch[j]
|
||
aufgeteilt_j = beobachtung_symbolisch_j.split("_")
|
||
|
||
if int(aufgeteilt_j[0]) == beobachtungenID_i and aufgeteilt_j[1] == "gnssbz":
|
||
Qll[i, j] = (varianzkompontenschaetzung) * (cxz * (s0 ** 2))
|
||
Qll[j, i] = (varianzkompontenschaetzung) * (cxz * (s0 ** 2))
|
||
break
|
||
|
||
if beobachtungsart_i == "gnssby":
|
||
cyy = sp.symbols(f"cyy_{beobachtungenID_i}")
|
||
s0 = sp.symbols(f"s0_{beobachtungenID_i}")
|
||
liste_standardabweichungen_symbole.append(cyy)
|
||
Qll[i, i] = (varianzkompontenschaetzung) * (cyy * (s0 ** 2))
|
||
|
||
cyz = sp.Symbol(f"cyz_{beobachtungenID_i}")
|
||
s0 = sp.symbols(f"s0_{beobachtungenID_i}")
|
||
for j in range(i + 1, len(liste_beobachtungen_symbolisch)):
|
||
beobachtung_symbolisch_j = liste_beobachtungen_symbolisch[j]
|
||
aufgeteilt_j = beobachtung_symbolisch_j.split("_")
|
||
|
||
if int(aufgeteilt_j[0]) == beobachtungenID_i and aufgeteilt_j[1] == "gnssbz":
|
||
Qll[i, j] = (varianzkompontenschaetzung) * (cyz * (s0 ** 2))
|
||
Qll[j, i] = (varianzkompontenschaetzung) * (cyz * (s0 ** 2))
|
||
break
|
||
|
||
if beobachtungsart_i == "gnssbz":
|
||
czz = sp.symbols(f"czz_{beobachtungenID_i}")
|
||
s0 = sp.symbols(f"s0_{beobachtungenID_i}")
|
||
liste_standardabweichungen_symbole.append(czz)
|
||
Qll[i, i] = (varianzkompontenschaetzung) * (czz * (s0 ** 2))
|
||
|
||
# Geometrisches Nivellement
|
||
if aufgeteilt_i[1] == "niv":
|
||
beobachtungsart_i = str(aufgeteilt_i[1])
|
||
|
||
varianzkompontenschaetzung = sp.Symbol(
|
||
f"varkomp_{instrumenteID_i}_Nivellier_Hoehendifferenzbeobachtungen")
|
||
|
||
stabw_apriori_konstant = sp.Symbol(f"stabw_apriori_konstant_{beobachtungsart_i}_{instrumenteID_i}")
|
||
stabw_apriori_streckenprop = sp.Symbol(f"stabw_apriori_streckenprop_{beobachtungsart_i}_{instrumenteID_i}")
|
||
nivellement_distanz = sp.Symbol(f"niv_distanz_{beobachtungenID_i}")
|
||
nivellement_anz_wechselpunkte = sp.Symbol(f"niv_anz_wechselpunkte_{beobachtungenID_i}")
|
||
|
||
# Berechnen der Standardabweichung unter Einbeziehung der Anzahl Wechselpunkte
|
||
sigma = sp.sqrt(nivellement_anz_wechselpunkte * stabw_apriori_konstant ** 2 + stabw_apriori_streckenprop ** 2 * nivellement_distanz / 1000)
|
||
liste_standardabweichungen_symbole.append(sigma)
|
||
|
||
Qll[i, i] = (varianzkompontenschaetzung) * (sigma ** 2)
|
||
|
||
Export.matrix_to_csv(r"Zwischenergebnisse\Qll_Symbolisch.csv", liste_beobachtungen_symbolisch, liste_beobachtungen_symbolisch, Qll, "Qll")
|
||
return Qll
|
||
|
||
def Qll_numerisch(self, Qll_Matrix_Symbolisch: sp.Matrix, liste_beobachtungen_symbolisch: list) -> np.Matrix:
|
||
"""Erstellt eine numerische Kofaktormatrix der Beobachtungen aus einer symbolischen Qll-Matrix.
|
||
|
||
Es werden die zur Substitution benötigten Werte aus der Datenbank abgefragt und den in Qll vorkommenden Symbolen zugeordnet,
|
||
u. a.:
|
||
|
||
- Genauigkeiten (stabw_apriori_konstant / stabw_apriori_streckenprop) je Instrument und Beobachtungsart,
|
||
- Varianzkomponenten (varkomp_*) je Instrument und Beobachtungsgruppe,
|
||
- gemessene Tachymeter-Distanzen SD_* zur Bildung der Winkel-/Zenitwinkel-Ansätze,
|
||
- GNSS-Kovarianzen cxx, cxy, cxz, cyy, cyz, czz sowie s0 je Basislinie,
|
||
- Geomtrisches Nivellement-Strecken und Anzahl Wechsel-/Standpunkte.
|
||
|
||
Die Methode prüft, ob alle freien Symbole der Qll-Matrix substituierbar sind.
|
||
Zur Rechenzeitersparnis wird ein Lambdify-Cache geführt. Die Matrix wird anschließend gezielt
|
||
über Nicht-Null-Einträge befüllt, um unnötige Auswertung von Nullzellen zu vermeiden.
|
||
|
||
Die numerische Matrix wird als CSV-Datei in Zwischenergebnisse\\Qll_Numerisch.csv exportiert.
|
||
|
||
:param Qll_Matrix_Symbolisch: Symbolische Kofaktormatrix der Beobachtungen Qll.
|
||
:type Qll_Matrix_Symbolisch: sp.Matrix
|
||
:param liste_beobachtungen_symbolisch: Liste der symbolischen Beobachtungskennungen.
|
||
:type liste_beobachtungen_symbolisch: list
|
||
:return: Numerische Kofaktormatrix Qll als Numpy-Array.
|
||
:rtype: np.Matrix
|
||
:raises ValueError: Falls Symbole in Qll_Matrix_Symbolisch enthalten sind, für die keine Substitutionen vorhanden sind.
|
||
"""
|
||
liste_beobachtungen_symbolisch = [str(b).strip() for b in liste_beobachtungen_symbolisch]
|
||
liste_beobachtungen_symbolisch = [b for b in liste_beobachtungen_symbolisch if not b.startswith("lA_")]
|
||
|
||
# Abfragen der Zahlen zu den Symbolen aus der Datenbank
|
||
dict_genauigkeiten = self.db_zugriff.get_genauigkeiten_dict()
|
||
|
||
liste_beobachtungen_tachymeter = self.db_zugriff.get_beobachtungen_from_beobachtungenid()
|
||
liste_beobachtungen_gnss = self.db_zugriff.get_beobachtungen_gnssbasislinien()
|
||
liste_beobachtungen_nivellement = self.db_zugriff.get_beobachtungen_nivellement()
|
||
liste_varianzkomponenten = self.db_zugriff.get_varianzkomponentenschaetzung()
|
||
|
||
# Erstellen eines Dicts mit der zuordnung beobachtungenID : Distanz
|
||
dict_beobachtungenID_distanz = {}
|
||
for standpunkt, zielpunkt, beobachtungenID, beobachtungsgruppeID, tachymeter_richtung, tachymeter_zenitwinkel, tachymeter_distanz in liste_beobachtungen_tachymeter:
|
||
dict_beobachtungenID_distanz[int(beobachtungenID)] = tachymeter_distanz
|
||
|
||
# Erstellen eines Dicts mit den Genauigkeiten aus der Tabelle Genauigkeiten
|
||
dict_genauigkeiten_neu = {}
|
||
for genauigkeitenID, eintrag in dict_genauigkeiten.items():
|
||
instrumenteID = int(eintrag[0])
|
||
beobachtungsart = str(eintrag[1])
|
||
stabw_apriori_konstant = eintrag[2]
|
||
stabw_apriori_streckenprop = eintrag[3]
|
||
dict_genauigkeiten_neu[(instrumenteID, beobachtungsart)] = (stabw_apriori_konstant,
|
||
stabw_apriori_streckenprop)
|
||
|
||
substitutionen = {}
|
||
|
||
# Erstellen eines Dicts mit den konstanten Anteilen der Standardabweichungen aus der Datenbank
|
||
dict_konstante_sd = {}
|
||
for (instrumenteID, beobachtungsart), (stabw_apriori_konstant,
|
||
stabw_apriori_streckenprop) in dict_genauigkeiten_neu.items():
|
||
if beobachtungsart == "Tachymeter_Strecke":
|
||
if stabw_apriori_konstant is not None:
|
||
dict_konstante_sd[instrumenteID] = float(stabw_apriori_konstant)
|
||
|
||
# Zuordnen des numerischen Varianzfaktors zum entsprechenden Symbol für die Substitution
|
||
for (varianzkomponenteID, instrumenteID, beobachtungsgruppe, varianz_varianzkomponentenschaetzung) in liste_varianzkomponenten:
|
||
substitutionen[sp.Symbol(f"varkomp_{instrumenteID}_{beobachtungsgruppe.strip()}")] = float(varianz_varianzkomponentenschaetzung)
|
||
|
||
# Zuordnen der numerischen Genauigkeitsangaben aus der Datenbank zum entsprechenden Symbol für die Substitution
|
||
for (instrumenteID, beobachtungsart), (stabw_apriori_konstant,
|
||
stabw_apriori_streckenprop) in dict_genauigkeiten_neu.items():
|
||
|
||
if beobachtungsart == "Tachymeter_Strecke":
|
||
beobachtungsart_kurz = "SD"
|
||
elif beobachtungsart == "Tachymeter_Richtung":
|
||
beobachtungsart_kurz = "R"
|
||
elif beobachtungsart == "Tachymeter_Zenitwinkel":
|
||
beobachtungsart_kurz = "ZW"
|
||
elif beobachtungsart == "Geometrisches_Nivellement":
|
||
beobachtungsart_kurz = "niv"
|
||
|
||
if stabw_apriori_konstant is not None:
|
||
substitutionen[sp.Symbol(f"stabw_apriori_konstant_{beobachtungsart_kurz}_{instrumenteID}")] = float(stabw_apriori_konstant)
|
||
if stabw_apriori_streckenprop is not None:
|
||
wert = float(stabw_apriori_streckenprop)
|
||
if beobachtungsart_kurz == "niv":
|
||
wert = wert / 1000.0
|
||
substitutionen[sp.Symbol(f"stabw_apriori_streckenprop_{beobachtungsart_kurz}_{instrumenteID}")] = wert
|
||
|
||
for instrumenteID, wert in dict_konstante_sd.items():
|
||
substitutionen[sp.Symbol(f"stabw_apriori_konstant_SD_{instrumenteID}")] = float(wert)
|
||
|
||
liste_beobachtungen_symbolisch = [str(b) for b in liste_beobachtungen_symbolisch]
|
||
|
||
# Zuordnen der numerischen gemessenen Distanz aus der Datenbank zum entsprechenden Symbol für die Substitution
|
||
for beobachtung_symbolisch in liste_beobachtungen_symbolisch:
|
||
aufgeteilt = beobachtung_symbolisch.split("_")
|
||
beobachtungenID = int(aufgeteilt[0])
|
||
|
||
distanz = dict_beobachtungenID_distanz.get(beobachtungenID, None)
|
||
if distanz is not None:
|
||
substitutionen[sp.Symbol(f"SD_{beobachtungenID}")] = float(distanz)
|
||
|
||
# Zuordnen der numerischen GNSS-Bestandteile aus der Tabelle Beobachtungen zum entsprechenden Symbol für die Substitution
|
||
for gnss_beobachtungen in liste_beobachtungen_gnss:
|
||
beobachtungenID = gnss_beobachtungen[0]
|
||
gnss_s0 = gnss_beobachtungen[6]
|
||
gnss_cxx = gnss_beobachtungen[7]
|
||
gnss_cxy = gnss_beobachtungen[8]
|
||
gnss_cxz = gnss_beobachtungen[9]
|
||
gnss_cyy = gnss_beobachtungen[10]
|
||
gnss_cyz = gnss_beobachtungen[11]
|
||
gnss_czz = gnss_beobachtungen[12]
|
||
|
||
substitutionen[sp.Symbol(f"cxx_{beobachtungenID}")] = float(gnss_cxx)
|
||
substitutionen[sp.Symbol(f"cxy_{beobachtungenID}")] = float(gnss_cxy)
|
||
substitutionen[sp.Symbol(f"cxz_{beobachtungenID}")] = float(gnss_cxz)
|
||
substitutionen[sp.Symbol(f"cyy_{beobachtungenID}")] = float(gnss_cyy)
|
||
substitutionen[sp.Symbol(f"cyz_{beobachtungenID}")] = float(gnss_cyz)
|
||
substitutionen[sp.Symbol(f"czz_{beobachtungenID}")] = float(gnss_czz)
|
||
substitutionen[sp.Symbol(f"s0_{beobachtungenID}")] = float(gnss_s0)
|
||
|
||
# Geometrisches Nivellement
|
||
for nivellement in liste_beobachtungen_nivellement:
|
||
beobachtungenID = nivellement[0]
|
||
niv_strecke = nivellement[4]
|
||
niv_anz_standpkte = nivellement[5]
|
||
|
||
substitutionen[sp.Symbol(f"niv_anz_wechselpunkte_{beobachtungenID}")] = float(niv_anz_standpkte)
|
||
substitutionen[sp.Symbol(f"niv_distanz_{beobachtungenID}")] = float(niv_strecke)
|
||
|
||
# Substituieren mit der Methode lambdify
|
||
if (self.func_Qll_numerisch is None) or (set(self.liste_symbole_lambdify) != set(substitutionen.keys())):
|
||
self.liste_symbole_lambdify = sorted(substitutionen.keys(), key=lambda s: str(s))
|
||
self.func_Qll_numerisch = sp.lambdify(
|
||
self.liste_symbole_lambdify,
|
||
Qll_Matrix_Symbolisch,
|
||
modules="numpy",
|
||
cse=True
|
||
)
|
||
|
||
# Ausgeben der fehlenden Substitutionen
|
||
fehlend = sorted(list(Qll_Matrix_Symbolisch.free_symbols - set(substitutionen.keys())), key=lambda s: str(s))
|
||
if fehlend:
|
||
raise ValueError(
|
||
f"Qll_numerisch: Fehlende Substitutionen ({len(fehlend)}): {[str(s) for s in fehlend[:80]]}")
|
||
|
||
# Eingrenzen der Substitution auf nicht 0 Zellen
|
||
rows = int(Qll_Matrix_Symbolisch.rows)
|
||
cols = int(Qll_Matrix_Symbolisch.cols)
|
||
|
||
Qll_numerisch = np.zeros((rows, cols), dtype=float)
|
||
|
||
for i in range(rows):
|
||
for j in range(cols):
|
||
eintrag = Qll_Matrix_Symbolisch[i, j]
|
||
if eintrag == 0:
|
||
continue
|
||
try:
|
||
if hasattr(eintrag, "is_zero") and (eintrag.is_zero is True):
|
||
continue
|
||
except Exception:
|
||
pass
|
||
|
||
eintrag_num = eintrag.xreplace(substitutionen)
|
||
|
||
Qll_numerisch[i, j] = float(eintrag_num)
|
||
|
||
Export.matrix_to_csv(
|
||
r"Zwischenergebnisse\Qll_Numerisch.csv",
|
||
liste_beobachtungen_symbolisch,
|
||
liste_beobachtungen_symbolisch,
|
||
Qll_numerisch,
|
||
"Qll"
|
||
)
|
||
|
||
return Qll_numerisch
|
||
|
||
def QAA_symbolisch(self, liste_beobachtungen_symbolisch: list) -> np.Matrix:
|
||
"""Erstellt die symbolische Kofaktormatrix der Anschlusspunkte (weiche Lagerung).
|
||
|
||
Es werden ausschließlich Beobachtungen berücksichtigt, deren Kennung mit "lA_" beginnt. Für jede Anschlussbedingung
|
||
wird eine (symbolische) Standardabweichung StabwAA_* angesetzt und mit der Varianzkomponente der Beobachtungsgruppe
|
||
"Anschlusspunkte" multipliziert.
|
||
|
||
Die symbolische Matrix wird als CSV-Datei in Zwischenergebnisse\\QAA_Symbolisch.csv exportiert.
|
||
|
||
:param liste_beobachtungen_symbolisch: Liste der symbolischen Beobachtungskennungen.
|
||
:type liste_beobachtungen_symbolisch: list
|
||
:return: Symbolische Kofaktormatrix QAA.
|
||
:rtype: sp.Matrix
|
||
"""
|
||
liste_standardabweichungen_symbole = []
|
||
liste_beobachtungen_symbolisch = [str(b) for b in liste_beobachtungen_symbolisch]
|
||
liste_beobachtungen_symbolisch = [b for b in liste_beobachtungen_symbolisch if b.startswith("lA_")]
|
||
Qll = sp.zeros(len(liste_beobachtungen_symbolisch), len(liste_beobachtungen_symbolisch))
|
||
|
||
instrumente_id_anschlusspunkte = self.db_zugriff.get_instrument_liste("Anschlusspunkte")[0][0]
|
||
|
||
# Erstellen der Symbolischen Gleichungen für die QAA-Matrix
|
||
for i, beobachtung_symbolisch_i in enumerate(liste_beobachtungen_symbolisch):
|
||
aufgeteilt_i = beobachtung_symbolisch_i.split("_")
|
||
datumskoordinate = str(aufgeteilt_i[1])
|
||
beobachtungsart_i = str(aufgeteilt_i[0])
|
||
|
||
if beobachtungsart_i == "lA":
|
||
sigma = sp.Symbol(f"StabwAA_{datumskoordinate}")
|
||
liste_standardabweichungen_symbole.append(sigma)
|
||
|
||
varianzkompontenschaetzung = sp.Symbol(
|
||
f"varkomp_{instrumente_id_anschlusspunkte}_Anschlusspunkte")
|
||
liste_standardabweichungen_symbole.append(varianzkompontenschaetzung)
|
||
|
||
Qll[i, i] = (varianzkompontenschaetzung) * sigma ** 2
|
||
|
||
Export.matrix_to_csv(r"Zwischenergebnisse\QAA_Symbolisch.csv", liste_beobachtungen_symbolisch, liste_beobachtungen_symbolisch, Qll, "Qll")
|
||
return Qll
|
||
|
||
def QAA_numerisch(self, QAA_Matrix_Symbolisch: sp.Matrix, liste_beobachtungen_symbolisch: list) -> np.Matrix:
|
||
"""Erstellt eine numerische Matrix aus einer symbolischen QAA-Matrix.
|
||
|
||
Es werden die numerischen Standardabweichungen der Anschlussbedingungen sowie die Varianzkomponente der Gruppe
|
||
"Anschlusspunkte" aus der Datenbank abgefragt und substituiert. Zur Rechenzeitersparnis wird ein Lambdify-Cache geführt,
|
||
der bei Änderung der Symbolmenge neu aufgebaut.
|
||
|
||
Die numerische Matrix wird als CSV-Datei in Zwischenergebnisse\\QAA_Numerisch.csv exportiert.
|
||
|
||
:param QAA_Matrix_Symbolisch: Symbolische Kofaktormatrix QAA.
|
||
:type QAA_Matrix_Symbolisch: sp.Matrix
|
||
:param liste_beobachtungen_symbolisch: Liste der symbolischen Beobachtungskennungen.
|
||
:type liste_beobachtungen_symbolisch: list
|
||
:return: Numerische VKofaktormatrix QAA als Numpy-Array.
|
||
:rtype: np.Matrix
|
||
"""
|
||
# Symbolische Listen
|
||
liste_beobachtungen_symbolisch = [str(b).strip() for b in liste_beobachtungen_symbolisch]
|
||
liste_beobachtungen_symbolisch = [b for b in liste_beobachtungen_symbolisch if b.startswith("lA_")]
|
||
|
||
# Abfrage der numerischen Werte aus der Datenbank
|
||
dict_stabwAA_vorinfo = self.db_zugriff.get_stabw_AA_Netzpunkte()
|
||
liste_varianzkomponenten = self.db_zugriff.get_varianzkomponentenschaetzung()
|
||
|
||
# Zuordnen des Symbols zum numerischen Wert
|
||
substitutionen = {}
|
||
for koordinate, stabwAA in dict_stabwAA_vorinfo.items():
|
||
substitutionen[sp.Symbol(str(koordinate).strip())] = float(stabwAA)
|
||
|
||
for (varianzkomponenteID, instrumenteID, beobachtungsgruppe,
|
||
varianz_varianzkomponentenschaetzung) in liste_varianzkomponenten:
|
||
substitutionen[sp.Symbol(f"varkomp_{instrumenteID}_Anschlusspunkte")] = float(varianz_varianzkomponentenschaetzung)
|
||
|
||
# Speichern der MAtrix in einer Instanzvariablen
|
||
if not hasattr(self, "func_QAA_numerisch"):
|
||
self.func_QAA_numerisch = None
|
||
if not hasattr(self, "liste_symbole_lambdify_QAA"):
|
||
self.liste_symbole_lambdify_QAA = []
|
||
|
||
# Substituieren
|
||
if (self.func_QAA_numerisch is None) or (set(self.liste_symbole_lambdify_QAA) != set(substitutionen.keys())):
|
||
self.liste_symbole_lambdify_QAA = sorted(substitutionen.keys(), key=lambda s: str(s))
|
||
self.func_QAA_numerisch = sp.lambdify(
|
||
self.liste_symbole_lambdify_QAA,
|
||
QAA_Matrix_Symbolisch,
|
||
modules="numpy",
|
||
cse=True
|
||
)
|
||
|
||
liste_werte = [substitutionen[s] for s in self.liste_symbole_lambdify_QAA]
|
||
QAA_numerisch = np.asarray(self.func_QAA_numerisch(*liste_werte), dtype=float)
|
||
|
||
Export.matrix_to_csv(
|
||
r"Zwischenergebnisse\QAA_Numerisch.csv",
|
||
liste_beobachtungen_symbolisch,
|
||
liste_beobachtungen_symbolisch,
|
||
QAA_numerisch,
|
||
"QAA"
|
||
)
|
||
|
||
return QAA_numerisch
|
||
|
||
@staticmethod
|
||
def berechne_P(Q_ll: np.ndarray) -> np.ndarray:
|
||
"""Berechnet die Gewichtsmatrix P aus der Kofaktormatrix.
|
||
|
||
Die Gewichtsmatrix wird als Inverse von Qll gebildet: P = inv(Qll).
|
||
|
||
:param Q_ll: Kofaktormatrix der Beobachtungen.
|
||
:type Q_ll: np.ndarray
|
||
:return: Gewichtsmatrix P.
|
||
:rtype: np.ndarray
|
||
"""
|
||
P = np.linalg.inv(Q_ll)
|
||
return P
|
||
|
||
@staticmethod
|
||
def berechne_Q_xx(N: np.ndarray) -> np.ndarray:
|
||
"""Berechnet die Kofaktormatrix der Unbekannten Qxx aus der Normalgleichungsmatrix N.
|
||
|
||
Die Kofaktormatrix wird als Inverse der Normalgleichungsmatrix gebildet: Qxx = inv(N).
|
||
Vor der Inversion wird geprüft, ob N quadratisch ist.
|
||
|
||
:param N: Normalgleichungsmatrix.
|
||
:type N: np.ndarray
|
||
:return: Kofaktormatrix der Unbekannten Qxx.
|
||
:rtype: np.ndarray
|
||
:raises ValueError: Falls N nicht quadratisch ist.
|
||
"""
|
||
if N.shape[0] != N.shape[1]:
|
||
raise ValueError("N muss eine quadratische Matrix sein")
|
||
Qxx = np.linalg.inv(N)
|
||
return Qxx
|
||
|
||
@staticmethod
|
||
def berechne_Q_ll_dach(A: np.ndarray, Q_xx: np.ndarray) -> np.ndarray:
|
||
"""Berechnet die (geschätzte) Kofaktormatrix der Beobachtungen Qll_dach.
|
||
|
||
Die Matrix wird gemäß Qll_dach = A @ Qxx @ A.T gebildet.
|
||
|
||
:param A: Jacobi-Matrix.
|
||
:type A: np.ndarray
|
||
:param Q_xx: Kofaktormatrix der Unbekannten.
|
||
:type Q_xx: np.ndarray
|
||
:return: Geschätzte VKofaktormatrix der Beobachtungen Qll_dach.
|
||
:rtype: np.ndarray
|
||
"""
|
||
Q_ll_dach = A @ Q_xx @ A.T
|
||
return Q_ll_dach
|
||
|
||
@staticmethod
|
||
def berechne_Qvv(Q_ll: np.ndarray, Q_ll_dach: np.ndarray) -> np.ndarray:
|
||
"""Berechnet die Kofaktormatrix der Residuen Qvv.
|
||
|
||
Die Residuenkovarianz wird als Differenz aus Beobachtungs-Kovarianz und dem durch das Modell erklärten Anteil gebildet:
|
||
Qvv = Qll - Qll_dach.
|
||
|
||
:param Q_ll: Kofaktormatrix der Beobachtungen.
|
||
:type Q_ll: np.ndarray
|
||
:param Q_ll_dach: Geschätzte Kofaktormatrix der Beobachtungen.
|
||
:type Q_ll_dach: np.ndarray
|
||
:return: Kofaktormatrix der Residuen Qvv.
|
||
:rtype: np.ndarray
|
||
"""
|
||
Q_vv = Q_ll - Q_ll_dach
|
||
return Q_vv |