zusammenfügen 30.1.

This commit is contained in:
2026-01-30 18:41:46 +01:00
parent 1561eb242e
commit d069b7ca81
10 changed files with 65708 additions and 35899 deletions

File diff suppressed because one or more lines are too long

View File

@@ -85,7 +85,7 @@ class Datenbank_anlegen:
name TEXT(200),
CONSTRAINT pk_Instrumente PRIMARY KEY (instrumenteID)
);
""")
""");
cursor.executescript("""CREATE TABLE Genauigkeiten(
genauigkeitenID INTEGER,
instrumenteID INTEGER,
@@ -95,7 +95,18 @@ class Datenbank_anlegen:
CONSTRAINT pk_Genauigkeiten PRIMARY KEY (genauigkeitenID),
CONSTRAINT fk_Genauigkeiten_Instrumente FOREIGN KEY (instrumenteID) REFERENCES Instrumente(instrumenteID)
);
""");
cursor.executescript("""CREATE TABLE Varianzkomponentenschaetzung(
varianzkomponenteID INTEGER,
instrumenteID INTEGER,
beobachtungsgruppe TEXT(50),
varianz_varianzkomponentenschaetzung NUMERIC(3, 16) DEFAULT 1,
s0_apriori_vorherige_iteration NUMERIC(3, 16) DEFAULT 1,
CONSTRAINT pk_Varianzkomponentenschaetzung PRIMARY KEY (varianzkomponenteID),
CONSTRAINT fk_Varianzkomponentenschaetzung_Instrumente FOREIGN KEY (instrumenteID) REFERENCES Instrumente(instrumenteID)
);
""")
con.commit()
cursor.close()
con.close()
@@ -128,7 +139,7 @@ class Datenbankzugriff:
cursor.close()
con.close()
def set_instrument(self, typ: str, name: str) -> None:
def set_instrument(self, typ: str, name: str, liste_beobachtungsarten: list) -> None:
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
liste_instrumente = cursor.execute("SELECT * FROM Instrumente WHERE typ = ? AND name =?", (typ, name)).fetchall()
@@ -136,7 +147,25 @@ class Datenbankzugriff:
cursor.execute(
"INSERT INTO Instrumente (typ, name) VALUES (?, ?)", (typ, name)
)
id_instrument = cursor.lastrowid
print(f"Das Instrument {name} wurde erfolgreich hinzugefügt.")
if self.get_varianzkomponentenschaetzung() == []:
cursor.execute(
"INSERT INTO Instrumente (typ, name) VALUES (?, ?)", ("Anschlusspunkte", "lA")
)
id_instrument_anschlusspunkte = cursor.lastrowid
cursor.execute(
"INSERT INTO Varianzkomponentenschaetzung (instrumenteID, beobachtungsgruppe) VALUES (?, ?)",
(id_instrument_anschlusspunkte, "Anschlusspunkte")
)
for beobachtungsart in liste_beobachtungsarten:
cursor.execute(
"INSERT INTO Varianzkomponentenschaetzung (instrumenteID, beobachtungsgruppe) VALUES (?, ?)",
(id_instrument, beobachtungsart)
)
else:
id_instrument = cursor.execute(
"SELECT instrumenteID FROM Instrumente WHERE typ = ? AND name =?", (typ, name))
@@ -414,6 +443,33 @@ class Datenbankzugriff:
cursor.close()
con.close()
def set_varianzkomponente(self, liste_varianzkomponten_anpassen: list) -> None:
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
if liste_varianzkomponten_anpassen != []:
for varianzkomponente in liste_varianzkomponten_anpassen:
cursor.execute(
f"UPDATE Varianzkomponentenschaetzung SET varianz_varianzkomponentenschaetzung = ? WHERE instrumenteID = ? AND beobachtungsgruppe = ?",
(varianzkomponente[2], varianzkomponente[0], varianzkomponente[1]))
print(f"Folgende Varianzkomponente wurde für die nächste Iteration gespeichert: {liste_varianzkomponten_anpassen}.")
con.commit()
cursor.close()
con.close()
def set_s0_apriori(self, liste_s0_apriori_anpassen: list) -> None:
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
if liste_s0_apriori_anpassen != []:
for s0_apriori in liste_s0_apriori_anpassen:
cursor.execute(
f"UPDATE Varianzkomponentenschaetzung SET s0_apriori_vorherige_iteration = ? WHERE instrumenteID = ? AND beobachtungsgruppe = ?",
(s0_apriori[2], s0_apriori[0], s0_apriori[1]))
print(f"Folgende S0_apriori wurde für die nächste Iteration gespeichert: {liste_s0_apriori_anpassen}.")
con.commit()
cursor.close()
con.close()
def get_koordinaten(self, koordinatenart: str, ausgabeart: str = "Dict") -> dict[Any, MutableDenseMatrix] | None:
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
@@ -612,3 +668,20 @@ class Datenbankzugriff:
con.close()
return liste_nivellement_beobachtungen
def get_varianzkomponentenschaetzung(self) -> list[Any]:
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
liste_varianzkomponenten = cursor.execute(f"SELECT varianzkomponenteID, instrumenteID, beobachtungsgruppe, varianz_varianzkomponentenschaetzung FROM Varianzkomponentenschaetzung").fetchall()
cursor.close()
con.close()
return liste_varianzkomponenten
def get_s0_apriori(self) -> list[Any]:
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
liste_s0_apriori = cursor.execute(f"SELECT varianzkomponenteID, instrumenteID, beobachtungsgruppe, varianz_varianzkomponentenschaetzung, s0_apriori_vorherige_iteration FROM Varianzkomponentenschaetzung").fetchall()
cursor.close()
con.close()
return liste_s0_apriori

View File

@@ -1,18 +1,153 @@
import sympy as sp
import numpy as np
from typing import Iterable, List, Sequence, Tuple, Optional
import Stochastisches_Modell
import Export
class Datumsfestlegung:
@staticmethod
def weiches_datum(Q_ll: np.ndarray, Q_AA: np.ndarray) -> np.ndarray:
if Q_ll.ndim != 2 or Q_ll.shape[0] != Q_ll.shape[1]:
def weiches_datum(Q_ll: np.ndarray, Q_AA: np.ndarray, varianzkompontenschaetzung_erfolgt: bool,
dict_indizes_beobachtungsgruppen: dict) -> np.ndarray:
if Stochastisches_Modell.StochastischesModell.berechne_P(Q_ll).ndim != 2 or \
Stochastisches_Modell.StochastischesModell.berechne_P(Q_ll).shape[0] != \
Stochastisches_Modell.StochastischesModell.berechne_P(Q_ll).shape[1]:
raise ValueError("Q_ll muss quadratisch sein.")
if Q_AA.ndim != 2 or Q_AA.shape[0] != Q_AA.shape[1]:
if Stochastisches_Modell.StochastischesModell.berechne_P(Q_AA).ndim != 2 or \
Stochastisches_Modell.StochastischesModell.berechne_P(Q_AA).shape[0] != \
Stochastisches_Modell.StochastischesModell.berechne_P(Q_AA).shape[1]:
raise ValueError("Q_AA muss quadratisch sein.")
Q_ext = np.block([[Q_ll, np.zeros((Q_ll.shape[0], Q_AA.shape[0]))],[np.zeros((Q_AA.shape[0], Q_ll.shape[0])), Q_AA]])
return Q_ext
Q_ext = np.block(
[[Q_ll, np.zeros((Q_ll.shape[0], Q_AA.shape[0]))], [np.zeros((Q_AA.shape[0], Q_ll.shape[0])), Q_AA]])
if varianzkompontenschaetzung_erfolgt == False:
P = np.block([[Stochastisches_Modell.StochastischesModell.berechne_P(Q_ll), np.zeros(
(Stochastisches_Modell.StochastischesModell.berechne_P(Q_ll).shape[0],
Stochastisches_Modell.StochastischesModell.berechne_P(Q_AA).shape[0]))], [np.zeros(
(Stochastisches_Modell.StochastischesModell.berechne_P(Q_AA).shape[0],
Stochastisches_Modell.StochastischesModell.berechne_P(Q_ll).shape[0])),
Stochastisches_Modell.StochastischesModell.berechne_P(
Q_AA)]])
else:
print("Varianzkomponentenschätzung wurde durchgeführt")
for beobachtungsgruppe, indizes in dict_indizes_beobachtungsgruppen.items():
z_start, z_ende = indizes[0], indizes[1] # Zeile 2 bis inklusive 5
s_start, s_ende = indizes[0], indizes[1] # Spalte 1 bis inklusive 4
if beobachtungsgruppe == "SD":
aufgeteilt_SD = Q_ext[z_start: z_ende + 1, s_start: s_ende + 1]
if beobachtungsgruppe == "R":
aufgeteilt_R = Q_ext[z_start: z_ende + 1, s_start: s_ende + 1]
if beobachtungsgruppe == "ZW":
aufgeteilt_ZW = Q_ext[z_start: z_ende + 1, s_start: s_ende + 1]
if beobachtungsgruppe == "gnss":
aufgeteilt_gnss = Q_ext[z_start: z_ende + 1, s_start: s_ende + 1]
if beobachtungsgruppe == "niv":
aufgeteilt_niv = Q_ext[z_start: z_ende + 1, s_start: s_ende + 1]
if beobachtungsgruppe == "lA":
aufgeteilt_lA = Q_ext[z_start: z_ende + 1, s_start: s_ende + 1]
aufgeteilt_SD_invertiert = Stochastisches_Modell.StochastischesModell.berechne_P(aufgeteilt_SD)
aufgeteilt_R_invertiert = Stochastisches_Modell.StochastischesModell.berechne_P(aufgeteilt_R)
aufgeteilt_ZW_invertiert = Stochastisches_Modell.StochastischesModell.berechne_P(aufgeteilt_ZW)
aufgeteilt_gnss_invertiert = Stochastisches_Modell.StochastischesModell.berechne_P(aufgeteilt_gnss)
aufgeteilt_niv_invertiert = Stochastisches_Modell.StochastischesModell.berechne_P(aufgeteilt_niv)
aufgeteilt_lA_invertiert = Stochastisches_Modell.StochastischesModell.berechne_P(aufgeteilt_lA)
def Z(A, B):
return np.zeros((A.shape[0], B.shape[0]))
P = np.block([
[aufgeteilt_SD_invertiert, Z(aufgeteilt_SD_invertiert, aufgeteilt_R_invertiert),
Z(aufgeteilt_SD_invertiert, aufgeteilt_ZW_invertiert),
Z(aufgeteilt_SD_invertiert, aufgeteilt_gnss_invertiert),
Z(aufgeteilt_SD_invertiert, aufgeteilt_niv_invertiert),
Z(aufgeteilt_SD_invertiert, aufgeteilt_lA_invertiert)],
[Z(aufgeteilt_R_invertiert, aufgeteilt_SD_invertiert), aufgeteilt_R_invertiert,
Z(aufgeteilt_R_invertiert, aufgeteilt_ZW_invertiert),
Z(aufgeteilt_R_invertiert, aufgeteilt_gnss_invertiert),
Z(aufgeteilt_R_invertiert, aufgeteilt_niv_invertiert),
Z(aufgeteilt_R_invertiert, aufgeteilt_lA_invertiert)],
[Z(aufgeteilt_ZW_invertiert, aufgeteilt_SD_invertiert),
Z(aufgeteilt_ZW_invertiert, aufgeteilt_R_invertiert), aufgeteilt_ZW_invertiert,
Z(aufgeteilt_ZW_invertiert, aufgeteilt_gnss_invertiert),
Z(aufgeteilt_ZW_invertiert, aufgeteilt_niv_invertiert),
Z(aufgeteilt_ZW_invertiert, aufgeteilt_lA_invertiert)],
[Z(aufgeteilt_gnss_invertiert, aufgeteilt_SD_invertiert),
Z(aufgeteilt_gnss_invertiert, aufgeteilt_R_invertiert),
Z(aufgeteilt_gnss_invertiert, aufgeteilt_ZW_invertiert), aufgeteilt_gnss_invertiert,
Z(aufgeteilt_gnss_invertiert, aufgeteilt_niv_invertiert),
Z(aufgeteilt_gnss_invertiert, aufgeteilt_lA_invertiert)],
[Z(aufgeteilt_niv_invertiert, aufgeteilt_SD_invertiert),
Z(aufgeteilt_niv_invertiert, aufgeteilt_R_invertiert),
Z(aufgeteilt_niv_invertiert, aufgeteilt_ZW_invertiert),
Z(aufgeteilt_niv_invertiert, aufgeteilt_gnss_invertiert), aufgeteilt_niv_invertiert,
Z(aufgeteilt_niv_invertiert, aufgeteilt_lA_invertiert)],
[Z(aufgeteilt_lA_invertiert, aufgeteilt_SD_invertiert),
Z(aufgeteilt_lA_invertiert, aufgeteilt_R_invertiert),
Z(aufgeteilt_lA_invertiert, aufgeteilt_ZW_invertiert),
Z(aufgeteilt_lA_invertiert, aufgeteilt_gnss_invertiert),
Z(aufgeteilt_lA_invertiert, aufgeteilt_niv_invertiert), aufgeteilt_lA_invertiert]
])
# print(aufgeteilt)
# print(beobachtungsgruppe, indizes)
# Export.matrix_to_csv(
# fr"Zwischenergebnisse\_{beobachtungsgruppe}.csv",
# [""],
# labels,
# aufgeteilt,
# f"{beobachtungsgruppe}"
# )
return Q_ext, P
@staticmethod
def indizes_beobachtungsvektor_nach_beobachtungsgruppe(Jacobimatrix_symbolisch_liste_beobachtungsvektor):
liste_strecken_indizes = []
liste_richtungen_indizes = []
liste_zenitwinkel_indizes = []
liste_gnss_indizes = []
liste_nivellement_indizes = []
liste_anschlusspunkte_indizes = []
dict_beobachtungsgruppen_indizes = {}
for beobachtung in Jacobimatrix_symbolisch_liste_beobachtungsvektor:
if beobachtung.split("_")[1] == "SD":
index = Jacobimatrix_symbolisch_liste_beobachtungsvektor.index(beobachtung)
liste_strecken_indizes.append(index)
if beobachtung.split("_")[1] == "R":
index = Jacobimatrix_symbolisch_liste_beobachtungsvektor.index(beobachtung)
liste_richtungen_indizes.append(index)
if beobachtung.split("_")[1] == "ZW":
index = Jacobimatrix_symbolisch_liste_beobachtungsvektor.index(beobachtung)
liste_zenitwinkel_indizes.append(index)
if beobachtung.split("_")[1] == "gnssbx" or beobachtung.split("_")[1] == "gnssby" or beobachtung.split("_")[
1] == "gnssbz":
index = Jacobimatrix_symbolisch_liste_beobachtungsvektor.index(beobachtung)
liste_gnss_indizes.append(index)
if beobachtung.split("_")[1] == "niv":
index = Jacobimatrix_symbolisch_liste_beobachtungsvektor.index(beobachtung)
liste_nivellement_indizes.append(index)
if beobachtung.split("_")[0] == "lA":
index = Jacobimatrix_symbolisch_liste_beobachtungsvektor.index(beobachtung)
liste_anschlusspunkte_indizes.append(index)
dict_beobachtungsgruppen_indizes["SD"] = min(liste_strecken_indizes), max(liste_strecken_indizes)
dict_beobachtungsgruppen_indizes["R"] = min(liste_richtungen_indizes), max(liste_richtungen_indizes)
dict_beobachtungsgruppen_indizes["ZW"] = min(liste_zenitwinkel_indizes), max(liste_zenitwinkel_indizes)
dict_beobachtungsgruppen_indizes["gnss"] = min(liste_gnss_indizes), max(liste_gnss_indizes)
dict_beobachtungsgruppen_indizes["niv"] = min(liste_nivellement_indizes), max(liste_nivellement_indizes)
dict_beobachtungsgruppen_indizes["lA"] = min(liste_anschlusspunkte_indizes), max(liste_anschlusspunkte_indizes)
return dict_beobachtungsgruppen_indizes
@staticmethod
@@ -20,8 +155,13 @@ class Datumsfestlegung:
names = [str(s).strip() for s in unbekannten_liste]
return names, {n: i for i, n in enumerate(names)}
def build_G_from_names(x0, unbekannten_liste, liste_punktnummern, mit_massstab=True):
@staticmethod
def build_G_from_names(x0, unbekannten_liste, liste_punktnummern=None, mit_massstab=True):
"""
Baut G (u x d) in den vollen Unbekanntenraum.
Wenn liste_punktnummern=None, werden alle Punkt-IDs aus unbekannten_liste
über X*/Y*/Z* automatisch bestimmt.
"""
x0 = np.asarray(x0, float).reshape(-1)
names, idx = Datumsfestlegung.make_index(unbekannten_liste)
@@ -29,11 +169,18 @@ class Datumsfestlegung:
d = 7 if mit_massstab else 6
G = np.zeros((u, d), dtype=float)
# --- Punktliste automatisch, falls nicht gegeben ---
if liste_punktnummern is None:
pids = set()
for n in names:
if len(n) >= 2 and n[0].upper() in ("X", "Y", "Z"):
pids.add(n[1:]) # alles nach X/Y/Z
liste_punktnummern = sorted(pids)
for pid in liste_punktnummern:
sx, sy, sz = f"X{pid}", f"Y{pid}", f"Z{pid}"
if sx not in idx or sy not in idx or sz not in idx:
# Punkt nicht als voller XYZ-Unbekannter vorhanden -> skip
continue
continue # Punkt nicht vollständig als XYZ-Unbekannte vorhanden
ix, iy, iz = idx[sx], idx[sy], idx[sz]
xi, yi, zi = x0[ix], x0[iy], x0[iz]
@@ -45,17 +192,21 @@ class Datumsfestlegung:
# Rotationen (δr = ω × r)
# Rx: δY=-Z, δZ=+Y
G[iy, 3] = -zi; G[iz, 3] = yi
G[iy, 3] = -zi
G[iz, 3] = yi
# Ry: δX=+Z, δZ=-X
G[ix, 4] = zi; G[iz, 4] = -xi
G[ix, 4] = zi
G[iz, 4] = -xi
# Rz: δX=-Y, δY=+X
G[ix, 5] = -yi; G[iy, 5] = xi
G[ix, 5] = -yi
G[iy, 5] = xi
# Maßstab
if mit_massstab:
G[ix, 6] = xi
G[iy, 6] = yi
G[iz, 6] = zi
return G
@@ -84,7 +235,11 @@ class Datumsfestlegung:
return E
def berechne_dx_geraendert(N, n, G):
def loese_geraendert_mit_Qxx(N, n, G):
"""
löst [N G; G^T 0] [dx;k] = [n;0]
und liefert zusätzlich Q_xx als oberen linken Block von inv(K).
"""
N = np.asarray(N, float)
n = np.asarray(n, float).reshape(-1, 1)
G = np.asarray(G, float)
@@ -98,7 +253,10 @@ class Datumsfestlegung:
])
rhs = np.vstack([n, np.zeros((d, 1))])
sol = np.linalg.solve(K, rhs)
K_inv = np.linalg.inv(K)
sol = K_inv @ rhs
dx = sol[:u]
k = sol[u:]
return dx, k
Q_xx = K_inv[:u, :u]
return dx, k, Q_xx

View File

@@ -124,6 +124,12 @@ class Export:
<h3>3. Konfidenzellipsen</h3>
{ergebnisse['df_konfidenzellipsen'].to_html(index=False)}
<h3>3. Koordinaten Geozentrisch Kartesisch</h3>
{ergebnisse['df_koordinaten_geozentrisch_kartesisch'].to_html(index=True)}
<h3>3. Koordinaten UTM</h3>
{ergebnisse['df_koordinaten_utm'].to_html(index=True)}
<div class="footer">
Erstellt automatisch am {datetime.now().strftime('%d.%m.%Y um %H:%M:%S')}
</div>

View File

@@ -912,3 +912,4 @@ class FunktionalesModell:
dict_O[orientierungs_id] = float(unbekanntenvektor_numerisch[i, 0])
return dict_O

View File

@@ -1019,7 +1019,7 @@ class Import:
con.close()
return "Import der Koordinaten aus stationärem GNSS abgeschlossen."
def import_basislinien_gnss(self, pfad_datei: str) -> None:
def import_basislinien_gnss(self, pfad_datei: str, instrumentenID: int) -> None:
Import_fortsetzen = True
# Prüfen, ob Bereits Daten aus der Datei in der Datenbank vorhanden sind
@@ -1028,13 +1028,23 @@ class Import:
liste_dateinamen_in_db = [r[0] for r in cursor.execute(
"SELECT DISTINCT dateiname FROM Beobachtungen"
).fetchall()]
con.close()
cursor.close
if pfad_datei in liste_dateinamen_in_db:
Import_fortsetzen = False
liste_instrumentenid = [r[0] for r in cursor.execute("SELECT instrumenteID FROM Instrumente").fetchall()]
con.close()
cursor.close
if instrumentenID not in liste_instrumentenid:
Import_fortsetzen = False
print(
"Der Import wurde abgebrochen. Bitte eine gültige InstrumentenID eingeben. Bei Bedarf ist das Instrument neu anzulegen.")
if Import_fortsetzen:
liste_basilinien = []
tupel_basislinie = ()
with (open(pfad_datei, "r", encoding="utf-8") as txt):
@@ -1063,8 +1073,8 @@ class Import:
cursor = con.cursor()
for basislinie in liste_basilinien:
cursor.execute(
"INSERT INTO Beobachtungen (punktnummer_sp, punktnummer_zp, gnss_bx, gnss_by, gnss_bz, gnss_s0, gnss_cxx, gnss_cxy, gnss_cxz, gnss_cyy, gnss_cyz, gnss_czz, dateiname) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(basislinie[0], basislinie[1], float(basislinie[2]), float(basislinie[3]), float(basislinie[4]), float(basislinie[5]), float(basislinie[6]), float(basislinie[7]), float(basislinie[8]), float(basislinie[9]), float(basislinie[10]), float(basislinie[11]), pfad_datei))
"INSERT INTO Beobachtungen (punktnummer_sp, punktnummer_zp, gnss_bx, gnss_by, gnss_bz, gnss_s0, gnss_cxx, gnss_cxy, gnss_cxz, gnss_cyy, gnss_cyz, gnss_czz, dateiname, instrumenteID) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(basislinie[0], basislinie[1], float(basislinie[2]), float(basislinie[3]), float(basislinie[4]), float(basislinie[5]), float(basislinie[6]), float(basislinie[7]), float(basislinie[8]), float(basislinie[9]), float(basislinie[10]), float(basislinie[11]), pfad_datei, instrumentenID))
con.commit()
cursor.close()
con.close()

View File

@@ -2,6 +2,7 @@ import numpy as np
import plotly.graph_objects as go
from scipy.stats import f
import pandas as pd
import Berechnungen
class Genauigkeitsmaße:
@@ -196,6 +197,159 @@ class Genauigkeitsmaße:
@staticmethod
def konfidenzellipsoid(Qxx, s0_apost, unbekannten_liste, R, alpha, skala="f", return_2d_schnitte=True):
Qxx = np.asarray(Qxx, float)
namen_str = [str(sym) for sym in unbekannten_liste]
punkt_ids = sorted({n[1:] for n in namen_str if n and n[0].upper() in ("X", "Y", "Z")})
# Skalierungsfaktor für Konfidenzbereich
if skala.lower() == "f":
k2_3d = f.ppf(1.0 - alpha, df=3)
elif skala.lower() == "f":
k2_3d = 3.0 * f.ppf(1.0 - alpha, dfn=3, dfd=R)
else:
raise ValueError("skala muss 'chi2' oder 'f' sein.")
daten = []
for pid in punkt_ids:
try:
idx_x = next(i for i, n in enumerate(namen_str) if n.upper() == f"X{pid}".upper())
idx_y = next(i for i, n in enumerate(namen_str) if n.upper() == f"Y{pid}".upper())
idx_z = next(i for i, n in enumerate(namen_str) if n.upper() == f"Z{pid}".upper())
except StopIteration:
continue
# 3x3-Block aus Qxx ziehen
I = [idx_x, idx_y, idx_z]
Qp = Qxx[np.ix_(I, I)]
# Kovarianzmatrix (Sigma) des Punkts
Sigma = (s0_apost ** 2) * Qp
# Standardabweichungen
sx = float(np.sqrt(Sigma[0, 0]))
sy = float(np.sqrt(Sigma[1, 1]))
sz = float(np.sqrt(Sigma[2, 2]))
# Kovarianzen
sxy = float(Sigma[0, 1])
sxz = float(Sigma[0, 2])
syz = float(Sigma[1, 2])
# Eigenzerlegung (symmetrisch -> eigh)
evals, evecs = np.linalg.eigh(Sigma)
order = np.argsort(evals)[::-1]
evals = evals[order]
evecs = evecs[:, order]
# Numerische Sicherheit: negative Mini-Eigenwerte durch Rundung abklemmen
evals = np.clip(evals, 0.0, None)
# Halbachsen des Konfidenzellipsoids:
A, B, C = (np.sqrt(evals * k2_3d)).tolist()
row = {
"Punkt": pid,
"σx": sx, "σy": sy, "σz": sz,
"σxy": sxy, "σxz": sxz, "σyz": syz,
"A_K": float(A), "B_K": float(B), "C_K": float(C),
# Orientierung als Spaltenvektoren (Eigenvektoren)
"evec_1": evecs[:, 0].tolist(),
"evec_2": evecs[:, 1].tolist(),
"evec_3": evecs[:, 2].tolist(),
"skala_k2": float(k2_3d),
"skala_typ": skala.lower()
}
# Optional: 2D-Schnitte (XY, XZ, YZ) als Ellipsenparameter
if return_2d_schnitte:
row.update(Genauigkeitsmaße.ellipsen_schnitt_2d(Sigma, alpha, R, skala))
daten.append(row)
return pd.DataFrame(daten)
@staticmethod
def ellipsen_schnitt_2d(Sigma3, alpha, R, skala):
def ellipse_from_2x2(S2):
# Skalierung für 2D
if skala.lower() == "f":
k2 = f.ppf(1.0 - alpha, df=2)
else:
k2 = 2.0 * f.ppf(1.0 - alpha, dfn=2, dfd=R)
evals, evecs = np.linalg.eigh(S2)
order = np.argsort(evals)[::-1]
evals = np.clip(evals[order], 0.0, None)
evecs = evecs[:, order]
a, b = np.sqrt(evals * k2)
# Winkel der Hauptachse (zu a) in der Ebene: atan2(vy, vx)
vx, vy = evecs[0, 0], evecs[1, 0]
theta_rad = np.arctan2(vy, vx)
theta_gon = float(theta_rad * (200.0 / np.pi)) % 200.0
return float(a), float(b), theta_gon
# Submatrizen
S_xy = Sigma3[np.ix_([0, 1], [0, 1])]
S_xz = Sigma3[np.ix_([0, 2], [0, 2])]
S_yz = Sigma3[np.ix_([1, 2], [1, 2])]
axy, bxy, txy = ellipse_from_2x2(S_xy)
axz, bxz, txz = ellipse_from_2x2(S_xz)
ayz, byz, tyz = ellipse_from_2x2(S_yz)
return {
"aXY": axy, "bXY": bxy, "θXY [gon]": txy,
"aXZ": axz, "bXZ": bxz, "θXZ [gon]": txz,
"aYZ": ayz, "bYZ": byz, "θYZ [gon]": tyz,
}
@staticmethod
def transform_q_with_your_functions(q_xyz, B, L):
# East
r11 = Berechnungen.E(L, 1, 0)
r12 = Berechnungen.E(L, 0, 1)
r13 = 0
# North
r21 = Berechnungen.N(B, L, 1, 0, 0)
r22 = Berechnungen.N(B, L, 0, 1, 0)
r23 = Berechnungen.N(B, L, 0, 0, 1)
# Up
r31 = Berechnungen.U(B, L, 1, 0, 0)
r32 = Berechnungen.U(B, L, 0, 1, 0)
r33 = Berechnungen.U(B, L, 0, 0, 1)
R = np.array([
[r11, r12, r13],
[r21, r22, r23],
[r31, r32, r33]
])
q_enu = R @ q_xyz @ R.T
return q_enu
def plot_netz_komplett_final(x_vektor, unbekannten_labels, beobachtungs_labels, Qxx, sigma0_apost,
k_faktor=2.447, v_faktor=1000):
"""

View File

@@ -116,7 +116,103 @@ class Zuverlaessigkeit:
return Lokaltest_innere_Zuv
def aeussere_zuverlaessigkeit_EF_EP1(Lokaltest, labels, Qxx, A, P, s0_apost, unbekannten_liste, x):
def aeussere_zuverlaessigkeit_EF_EP_stabil(Lokaltest, labels, Qxx, A, P, s0_apost, unbekannten_liste, x):
df = Lokaltest.copy()
labels = list(labels)
Qxx = np.asarray(Qxx, float)
A = np.asarray(A, float)
P = np.asarray(P, float)
x = np.asarray(x, float).reshape(-1)
ri = df["r_i"].astype(float).to_numpy()
GF = df["GF_i"].astype(float).to_numpy()
GRZW = df["GRZW_i"].astype(float).to_numpy()
n = A.shape[0]
# Namen als Strings für die Suche
namen_str = [str(sym) for sym in unbekannten_liste]
# 1) Einflussfaktor EF berechnen
EF = np.zeros(n, dtype=float)
for i in range(n):
nabla_l = np.zeros((n, 1))
nabla_l[i, 0] = GRZW[i]
nabla_x = Qxx @ (A.T @ (P @ nabla_l))
Qxx_inv_nabla_x = np.linalg.solve(Qxx, nabla_x)
EF2 = ((nabla_x.T @ Qxx_inv_nabla_x) / (float(s0_apost) ** 2)).item()
EF[i] = np.sqrt(max(0, EF2))
# 2) Koordinaten-Dict
coords = {}
punkt_ids = [n[1:] for n in namen_str if n.upper().startswith("X")]
for pid in punkt_ids:
try:
ix = namen_str.index(f"X{pid}")
iy = namen_str.index(f"Y{pid}")
iz = namen_str.index(f"Z{pid}")
coords[pid] = (x[ix], x[iy], x[iz] if iz is not None else 0.0)
except:
continue
# 3) EP + Standpunkte
EP_m = np.full(len(labels), np.nan, dtype=float)
standpunkte = [""] * len(labels)
for i, lbl in enumerate(labels):
parts = lbl.split("_")
sp, zp = None, None
if any(k in lbl for k in ["_SD_", "_R_", "_ZW_"]):
if len(parts) >= 5: sp, zp = parts[3].strip(), parts[4].strip()
elif "gnss" in lbl.lower():
sp, zp = parts[-2].strip(), parts[-1].strip()
elif "niv" in lbl.lower():
if len(parts) >= 4:
sp = parts[3].strip()
else:
sp = parts[-1].strip()
standpunkte[i] = sp if sp is not None else ""
# SD, GNSS, Niv: direkt Wegfehler
if "_SD_" in lbl or "gnss" in lbl.lower() or "niv" in lbl.lower():
EP_m[i] = (1.0 - ri[i]) * GF[i]
# Winkel: Streckenäquivalent
elif "_R_" in lbl or "_ZW_" in lbl:
if sp in coords and zp in coords:
X1, Y1, _ = coords[sp]
X2, Y2, _ = coords[zp]
s = np.sqrt((X2 - X1) ** 2 + (Y2 - Y1) ** 2)
EP_m[i] = (1.0 - ri[i]) * (GF[i] * s)
# 4) SP am Standpunkt (2D oder 1D)
diagQ = np.diag(Qxx)
SP_cache_mm = {}
for sp in set([s for s in standpunkte if s]):
try:
ix = namen_str.index(f"X{sp}")
iy = namen_str.index(f"Y{sp}")
SP_cache_mm[sp] = float(s0_apost) * np.sqrt(diagQ[ix] + diagQ[iy]) * 1000.0
except ValueError:
# Falls keine Lage, prüfe Höhe (Nivellement)
try:
iz = namen_str.index(f"Z{sp}")
SP_cache_mm[sp] = float(s0_apost) * np.sqrt(diagQ[iz]) * 1000.0
except ValueError:
SP_cache_mm[sp] = 0.0
SP_mm = np.array([SP_cache_mm.get(sp, np.nan) for sp in standpunkte], dtype=float)
return pd.DataFrame({
"Beobachtung": labels, "Stand-Pkt": standpunkte, "EF": EF,
"EP [mm]": EP_m * 1000.0, "SP [mm]": SP_mm, "EF*SP [mm]": EF * SP_mm
})
def aeussere_zuverlaessigkeit_EF_EP(Lokaltest, labels, Qxx, A, P, s0_apost, unbekannten_liste, x):
df = Lokaltest.copy()
labels = list(labels)
Qxx = np.asarray(Qxx, float)
@@ -217,98 +313,3 @@ class Zuverlaessigkeit:
"EF*SP [mm]": EF * SP_mm,
})
return out
def aeussere_zuverlaessigkeit_EF_EP_stabil(Lokaltest, labels, Qxx, A, P, s0_apost, unbekannten_liste, x):
df = Lokaltest.copy()
labels = list(labels)
Qxx = np.asarray(Qxx, float)
A = np.asarray(A, float)
P = np.asarray(P, float)
x = np.asarray(x, float).reshape(-1)
ri = df["r_i"].astype(float).to_numpy()
GF = df["GF_i"].astype(float).to_numpy()
GRZW = df["GRZW_i"].astype(float).to_numpy()
n = A.shape[0]
# Namen als Strings für die Suche
namen_str = [str(sym) for sym in unbekannten_liste]
# 1) Einflussfaktor EF berechnen
EF = np.zeros(n, dtype=float)
for i in range(n):
nabla_l = np.zeros((n, 1))
nabla_l[i, 0] = GRZW[i]
nabla_x = Qxx @ (A.T @ (P @ nabla_l))
Qxx_inv_nabla_x = np.linalg.solve(Qxx, nabla_x)
EF2 = ((nabla_x.T @ Qxx_inv_nabla_x) / (float(s0_apost) ** 2)).item()
EF[i] = np.sqrt(max(0, EF2))
# 2) Koordinaten-Dict
coords = {}
punkt_ids = [n[1:] for n in namen_str if n.upper().startswith("X")]
for pid in punkt_ids:
try:
ix = namen_str.index(f"X{pid}")
iy = namen_str.index(f"Y{pid}")
iz = namen_str.index(f"Z{pid}")
coords[pid] = (x[ix], x[iy], x[iz] if iz is not None else 0.0)
except:
continue
# 3) EP + Standpunkte
EP_m = np.full(len(labels), np.nan, dtype=float)
standpunkte = [""] * len(labels)
for i, lbl in enumerate(labels):
parts = lbl.split("_")
sp, zp = None, None
if any(k in lbl for k in ["_SD_", "_R_", "_ZW_"]):
if len(parts) >= 5: sp, zp = parts[3].strip(), parts[4].strip()
elif "gnss" in lbl.lower():
sp, zp = parts[-2].strip(), parts[-1].strip()
elif "niv" in lbl.lower():
if len(parts) >= 4:
sp = parts[3].strip()
else:
sp = parts[-1].strip()
standpunkte[i] = sp if sp is not None else ""
one_minus_r = (1.0 - ri[i])
# SD, GNSS, Niv: direkt Wegfehler
if "_SD_" in lbl or "gnss" in lbl.lower() or "niv" in lbl.lower():
EP_m[i] = one_minus_r * GF[i]
# Winkel: Streckenäquivalent
elif "_R_" in lbl or "_ZW_" in lbl:
if sp in coords and zp in coords:
X1, Y1, _ = coords[sp]
X2, Y2, _ = coords[zp]
s = np.sqrt((X2 - X1) ** 2 + (Y2 - Y1) ** 2)
EP_m[i] = one_minus_r * (GF[i] * s)
# 4) SP am Standpunkt (2D oder 1D)
diagQ = np.diag(Qxx)
SP_cache_mm = {}
for sp in set([s for s in standpunkte if s]):
try:
ix = namen_str.index(f"X{sp}")
iy = namen_str.index(f"Y{sp}")
SP_cache_mm[sp] = float(s0_apost) * np.sqrt(diagQ[ix] + diagQ[iy]) * 1000.0
except ValueError:
# Falls keine Lage, prüfe Höhe (Nivellement)
try:
iz = namen_str.index(f"Z{sp}")
SP_cache_mm[sp] = float(s0_apost) * np.sqrt(diagQ[iz]) * 1000.0
except ValueError:
SP_cache_mm[sp] = 0.0
SP_mm = np.array([SP_cache_mm.get(sp, np.nan) for sp in standpunkte], dtype=float)
return pd.DataFrame({
"Beobachtung": labels, "Stand-Pkt": standpunkte, "EF": EF,
"EP [mm]": EP_m * 1000.0, "SP [mm]": SP_mm, "EF*SP [mm]": EF * SP_mm
})

View File

@@ -1,18 +1,18 @@
from Stochastisches_Modell import StochastischesModell
from Netzqualität_Genauigkeit import Genauigkeitsmaße
from Datumsfestlegung import Datumsfestlegung
import sympy as sp
import numpy as np
import Export
def ausgleichung_global(A, dl, Q_ext):
def ausgleichung_global(A, dl, Q_ext, P):
A=np.asarray(A, float)
dl = np.asarray(dl, float).reshape(-1, 1)
Q_ext = np.asarray(Q_ext, float)
P = np.asarray(P, float)
# 1) Gewichtsmatrix P
P = StochastischesModell.berechne_P(Q_ext)
#P = Q_ext
# 2) Normalgleichungsmatrix N und Absolutgliedvektor n
N = A.T @ P @ A
@@ -48,7 +48,7 @@ def ausgleichung_global(A, dl, Q_ext):
def ausgleichung_lokal(A, dl, Q_ll):
def ausgleichung_lokal1(A, dl, Q_ll):
A = np.asarray(A, dtype=float)
dl = np.asarray(dl, dtype=float).reshape(-1, 1)
Q_ll = np.asarray(Q_ll, dtype=float)
@@ -94,3 +94,174 @@ def ausgleichung_lokal(A, dl, Q_ll):
"Q_ll": Q_ll,
}
return dict_ausgleichung, dx
def ausgleichung_lokal(
A, dl, Q_ll,
*,
datumfestlegung="hart",
x0=None,
unbekannten_liste=None,
liste_punktnummern=None,
datenbank=None,
mit_massstab=True
):
A = np.asarray(A, float)
dl = np.asarray(dl, float).reshape(-1, 1)
Q_ll = np.asarray(Q_ll, float)
# 1) Gewichtsmatrix
P = np.linalg.inv(Q_ll)
# 2) Normalgleichungen
N = A.T @ P @ A
n = A.T @ P @ dl
u = N.shape[0]
# 3) Datumsfestlegung
if datumfestlegung == "weiche Lagerung":
# hier wurde Q_ll bereits extern erweitert → ganz normal lösen
dx = np.linalg.solve(N, n)
elif datumfestlegung == "gesamtspur":
if x0 is None or unbekannten_liste is None or liste_punktnummern is None:
raise ValueError("gesamtspur benötigt x0, unbekannten_liste, liste_punktnummern")
G = Datumsfestlegung.build_G_from_names(
x0,
unbekannten_liste,
liste_punktnummern,
mit_massstab=mit_massstab
)
dx, k = Datumsfestlegung.berechne_dx_geraendert(N, n, G)
elif datumfestlegung == "teilspur":
if x0 is None or unbekannten_liste is None or liste_punktnummern is None or datenbank is None:
raise ValueError("teilspur benötigt x0, unbekannten_liste, liste_punktnummern, datenbank")
# G über alle Punkte
G = Datumsfestlegung.build_G_from_names(
x0,
unbekannten_liste,
liste_punktnummern,
mit_massstab=mit_massstab
)
# Auswahl aus DB
liste_datumskoordinaten = datenbank.get_datumskoordinate()
if not liste_datumskoordinaten:
raise ValueError("Teilspur gewählt, aber keine Datumskoordinaten in der DB gesetzt.")
aktive = Datumsfestlegung.aktive_indices_from_selection(
[(s[1:], s[0]) for s in liste_datumskoordinaten],
unbekannten_liste
)
E = Datumsfestlegung.auswahlmatrix_E(u, aktive)
Gi = E @ G
dx, k = Datumsfestlegung.berechne_dx_geraendert(N, n, Gi)
else:
raise ValueError(f"Unbekannte Datumsfestlegung: {datumfestlegung}")
# 4) Residuen
v = dl - A @ dx
# 5) Kofaktormatrix der Unbekannten
Q_xx = StochastischesModell.berechne_Q_xx(N)
# 6) Kofaktormatrix der Beobachtungen
Q_ll_dach = StochastischesModell.berechne_Q_ll_dach(A, Q_xx)
# 7) Kofaktormatrix der Verbesserungen
Q_vv = StochastischesModell.berechne_Qvv(Q_ll, Q_ll_dach)
dict_ausgleichung = {
"dx": dx,
"v": v,
"P": P,
"N": N,
"Q_xx": Q_xx,
"Q_ll_dach": Q_ll_dach,
"Q_vv": Q_vv,
"Q_ll": Q_ll,
}
return dict_ausgleichung, dx
def ausgleichung_spurminimierung(A, dl, Q_ll, *, datumfestlegung, x0, unbekannten_liste, datenbank=None, mit_massstab=True):
"""
Freie Ausgleichung mit Gesamtspur- oder Teilspurminimierung.
Rückgabe-Layout wie ausgleichung_global.
"""
A = np.asarray(A, float)
dl = np.asarray(dl, float).reshape(-1, 1)
Q_ll = np.asarray(Q_ll, float)
# 1) Gewichtsmatrix P
P = StochastischesModell.berechne_P(Q_ll)
# 2) Normalgleichungen
N = A.T @ P @ A
n = A.T @ P @ dl
# 3) G über ALLE Punkte aus unbekannten_liste (automatisch)
G = Datumsfestlegung.build_G_from_names(
x0=x0,
unbekannten_liste=unbekannten_liste,
liste_punktnummern=None, # <- wichtig: auto
mit_massstab=mit_massstab
)
# 4) Gesamtspur / Teilspur
if datumfestlegung == "gesamtspur":
Gi = G
k = None # wird unten überschrieben
elif datumfestlegung == "teilspur":
if datenbank is None:
raise ValueError("teilspur benötigt datenbank mit get_datumskoordinate()")
liste_datumskoordinaten = datenbank.get_datumskoordinate()
if not liste_datumskoordinaten:
raise ValueError("Teilspur gewählt, aber keine Datumskoordinaten in der DB gesetzt.")
# ["X10034","Y10034"] -> [("10034","X"),("10034","Y")]
auswahl = [(s[1:], s[0]) for s in liste_datumskoordinaten]
aktive = Datumsfestlegung.aktive_indices_from_selection(auswahl, unbekannten_liste)
E = Datumsfestlegung.auswahlmatrix_E(N.shape[0], aktive)
Gi = E @ G
else:
raise ValueError("datumfestlegung muss 'gesamtspur' oder 'teilspur' sein")
# 5) Lösung per Ränderung + korrektes Q_xx
dx, k, Q_xx = Datumsfestlegung.loese_geraendert_mit_Qxx(N, n, Gi)
# 6) Residuen (wie bei dir)
v = A @ dx - dl
# 7) Kofaktormatrix der Beobachtungen
Q_ll_dach = StochastischesModell.berechne_Q_ll_dach(A, Q_xx)
# 8) Kofaktormatrix der Verbesserungen
Q_vv = StochastischesModell.berechne_Qvv(Q_ll, Q_ll_dach)
dict_ausgleichung = {
"dx": dx,
"v": v,
"P": P,
"N": N,
"n": n,
"k": k,
"Q_xx": Q_xx,
"Q_ll_dach": Q_ll_dach,
"Q_vv": Q_vv,
"Q_ll": Q_ll,
"datumfestlegung": datumfestlegung,
}
return dict_ausgleichung, dx

View File

@@ -83,9 +83,11 @@ class StochastischesModell:
for i, beobachtung_symbolisch_i in enumerate(liste_beobachtungen_symbolisch):
aufgeteilt_i = beobachtung_symbolisch_i.split("_")
if aufgeteilt_i[1] == "SD" or aufgeteilt_i[1] == "R" or aufgeteilt_i[1] == "ZW":
beobachtungenID_i = int(aufgeteilt_i[0])
instrumenteID_i = dict_beobachtungenID_instrumenteID[beobachtungenID_i]
if aufgeteilt_i[1] == "SD" or aufgeteilt_i[1] == "R" or aufgeteilt_i[1] == "ZW":
beobachtungsart_i = str(aufgeteilt_i[1])
if beobachtungsart_i == "SD":
@@ -96,7 +98,9 @@ class StochastischesModell:
sigma = sp.sqrt(stabw_apriori_konstant ** 2 + (stabw_apriori_streckenprop * tachymeter_distanz / 1000000) ** 2)
liste_standardabweichungen_symbole.append(sigma)
Qll[i, i] = sigma ** 2
varianzkompontenschaetzung = sp.Symbol(f"varkomp_{instrumenteID_i}_Tachymeter_Streckenbeobachtungen")
Qll[i, i] = (1 / varianzkompontenschaetzung) * (sigma ** 2)
elif beobachtungsart_i == "R" or beobachtungsart_i == "ZW":
stabw_apriori_konstant = sp.Symbol(f"stabw_apriori_konstant_{beobachtungsart_i}_{instrumenteID_i}")
@@ -107,8 +111,14 @@ class StochastischesModell:
sigma = sp.sqrt(
stabw_apriori_konstant ** 2 + (stabw_apriori_konstant_distanz / tachymeter_distanz) ** 2)
liste_standardabweichungen_symbole.append(sigma)
if beobachtungsart_i == "R":
varianzkompontenschaetzung = sp.Symbol(
f"varkomp_{instrumenteID_i}_Tachymeter_Richtungsbeobachtungen")
if beobachtungsart_i == "ZW":
varianzkompontenschaetzung = sp.Symbol(
f"varkomp_{instrumenteID_i}_Tachymeter_Zenitwinkelbeobachtungen")
Qll[i, i] = sigma ** 2
Qll[i, i] = (1 / varianzkompontenschaetzung) * sigma ** 2
for j in range(i + 1, len(liste_beobachtungen_symbolisch)):
beobachtung_symbolisch_j = liste_beobachtungen_symbolisch[j]
@@ -119,14 +129,16 @@ class StochastischesModell:
Qll[i, j] = 0
Qll[j, i] = 0
if aufgeteilt_i [1] == "gnssbx" or aufgeteilt_i[1] == "gnssby" or aufgeteilt_i[1] == "gnssbz":
beobachtungenID_i = int(aufgeteilt_i[0])
#beobachtungenID_i = int(aufgeteilt_i[0])
beobachtungsart_i = str(aufgeteilt_i[1])
varianzkompontenschaetzung = sp.Symbol(
f"varkomp_{instrumenteID_i}_GNSS-Rover_Basislinienbeobachtungen")
if beobachtungsart_i == "gnssbx":
cxx = sp.symbols(f"cxx_{beobachtungenID_i}")
s0 = sp.symbols(f"s0_{beobachtungenID_i}")
liste_standardabweichungen_symbole.append(cxx)
Qll[i, i] = cxx * (s0 ** 2)
Qll[i, i] = (1 / varianzkompontenschaetzung) * (cxx * (s0 ** 2))
cxy = sp.Symbol(f"cxy_{beobachtungenID_i}")
s0 = sp.symbols(f"s0_{beobachtungenID_i}")
@@ -135,8 +147,8 @@ class StochastischesModell:
aufgeteilt_j = beobachtung_symbolisch_j.split("_")
if int(aufgeteilt_j[0]) == beobachtungenID_i and aufgeteilt_j[1] == "gnssby":
Qll[i, j] = cxy * (s0 ** 2)
Qll[j, i] = cxy * (s0 ** 2)
Qll[i, j] = (1 / varianzkompontenschaetzung) * (cxy * (s0 ** 2))
Qll[j, i] = (1 / varianzkompontenschaetzung) * (cxy * (s0 ** 2))
break
cxz = sp.Symbol(f"cxz_{beobachtungenID_i}")
@@ -146,15 +158,15 @@ class StochastischesModell:
aufgeteilt_j = beobachtung_symbolisch_j.split("_")
if int(aufgeteilt_j[0]) == beobachtungenID_i and aufgeteilt_j[1] == "gnssbz":
Qll[i, j] = cxz * (s0 ** 2)
Qll[j, i] = cxz * (s0 ** 2)
Qll[i, j] = (1 / varianzkompontenschaetzung) * (cxz * (s0 ** 2))
Qll[j, i] = (1 / varianzkompontenschaetzung) * (cxz * (s0 ** 2))
break
if beobachtungsart_i == "gnssby":
cyy = sp.symbols(f"cyy_{beobachtungenID_i}")
s0 = sp.symbols(f"s0_{beobachtungenID_i}")
liste_standardabweichungen_symbole.append(cyy)
Qll[i, i] = cyy * (s0 ** 2)
Qll[i, i] = (1 / varianzkompontenschaetzung) * (cyy * (s0 ** 2))
cyz = sp.Symbol(f"cyz_{beobachtungenID_i}")
s0 = sp.symbols(f"s0_{beobachtungenID_i}")
@@ -163,21 +175,24 @@ class StochastischesModell:
aufgeteilt_j = beobachtung_symbolisch_j.split("_")
if int(aufgeteilt_j[0]) == beobachtungenID_i and aufgeteilt_j[1] == "gnssbz":
Qll[i, j] = cyz * (s0 ** 2)
Qll[j, i] = cyz * (s0 ** 2)
Qll[i, j] = (1 / varianzkompontenschaetzung) * (cyz * (s0 ** 2))
Qll[j, i] = (1 / varianzkompontenschaetzung) * (cyz * (s0 ** 2))
break
if beobachtungsart_i == "gnssbz":
czz = sp.symbols(f"czz_{beobachtungenID_i}")
s0 = sp.symbols(f"s0_{beobachtungenID_i}")
liste_standardabweichungen_symbole.append(czz)
Qll[i, i] = czz * (s0 ** 2)
Qll[i, i] = (1 / varianzkompontenschaetzung) * (czz * (s0 ** 2))
if aufgeteilt_i[1] == "niv":
beobachtungenID_i = int(aufgeteilt_i[0])
instrumenteID_i = dict_beobachtungenID_instrumenteID[beobachtungenID_i]
#beobachtungenID_i = int(aufgeteilt_i[0])
#instrumenteID_i = dict_beobachtungenID_instrumenteID[beobachtungenID_i]
beobachtungsart_i = str(aufgeteilt_i[1])
varianzkompontenschaetzung = sp.Symbol(
f"varkomp_{instrumenteID_i}_Nivellier_Hoehendifferenzbeobachtungen")
stabw_apriori_konstant = sp.Symbol(f"stabw_apriori_konstant_{beobachtungsart_i}_{instrumenteID_i}")
stabw_apriori_streckenprop = sp.Symbol(f"stabw_apriori_streckenprop_{beobachtungsart_i}_{instrumenteID_i}")
nivellement_distanz = sp.Symbol(f"niv_distanz_{beobachtungenID_i}")
@@ -186,7 +201,7 @@ class StochastischesModell:
sigma = sp.sqrt(nivellement_anz_wechselpunkte * stabw_apriori_konstant ** 2 + stabw_apriori_streckenprop ** 2 * nivellement_distanz / 1000)
liste_standardabweichungen_symbole.append(sigma)
Qll[i, i] = sigma ** 2
Qll[i, i] = (1 / varianzkompontenschaetzung) * (sigma ** 2)
Export.matrix_to_csv(r"Zwischenergebnisse\Qll_Symbolisch.csv", liste_beobachtungen_symbolisch, liste_beobachtungen_symbolisch, Qll, "Qll")
return Qll
@@ -203,6 +218,8 @@ class StochastischesModell:
liste_beobachtungen_gnss = db_zugriff.get_beobachtungen_gnssbasislinien()
liste_beobachtungen_nivellement = db_zugriff.get_beobachtungen_nivellement()
liste_varianzkomponenten = db_zugriff.get_varianzkomponentenschaetzung()
dict_beobachtungenID_distanz = {}
for standpunkt, zielpunkt, beobachtungenID, beobachtungsgruppeID, tachymeter_richtung, tachymeter_zenitwinkel, tachymeter_distanz in liste_beobachtungen_tachymeter:
dict_beobachtungenID_distanz[int(beobachtungenID)] = tachymeter_distanz
@@ -225,6 +242,9 @@ class StochastischesModell:
if stabw_apriori_konstant is not None:
dict_konstante_sd[instrumenteID] = float(stabw_apriori_konstant)
for (varianzkomponenteID, instrumenteID, beobachtungsgruppe, varianz_varianzkomponentenschaetzung) in liste_varianzkomponenten:
substitutionen[sp.Symbol(f"varkomp_{instrumenteID}_{beobachtungsgruppe.strip()}")] = float(varianz_varianzkomponentenschaetzung)
for (instrumenteID, beobachtungsart), (stabw_apriori_konstant,
stabw_apriori_streckenprop) in dict_genauigkeiten_neu.items():
@@ -347,12 +367,15 @@ class StochastischesModell:
return Qll_numerisch
def QAA_symbolisch(self, liste_beobachtungen_symbolisch: list) -> np.Matrix:
def QAA_symbolisch(self, liste_beobachtungen_symbolisch: list, pfad_datenbank: str) -> np.Matrix:
liste_standardabweichungen_symbole = []
liste_beobachtungen_symbolisch = [str(b) for b in liste_beobachtungen_symbolisch]
liste_beobachtungen_symbolisch = [b for b in liste_beobachtungen_symbolisch if b.startswith("lA_")]
Qll = sp.zeros(len(liste_beobachtungen_symbolisch), len(liste_beobachtungen_symbolisch))
db_zugriff = Datenbankzugriff(pfad_datenbank)
instrumente_id_anschlusspunkte = db_zugriff.get_instrument_liste("Anschlusspunkte")[0][0]
for i, beobachtung_symbolisch_i in enumerate(liste_beobachtungen_symbolisch):
aufgeteilt_i = beobachtung_symbolisch_i.split("_")
datumskoordinate = str(aufgeteilt_i[1])
@@ -362,7 +385,11 @@ class StochastischesModell:
sigma = sp.Symbol(f"StabwAA_{datumskoordinate}")
liste_standardabweichungen_symbole.append(sigma)
Qll[i, i] = sigma ** 2
varianzkompontenschaetzung = sp.Symbol(
f"varkomp_{instrumente_id_anschlusspunkte}_Anschlusspunkte")
liste_standardabweichungen_symbole.append(varianzkompontenschaetzung)
Qll[i, i] = (1 / varianzkompontenschaetzung) * sigma ** 2
Export.matrix_to_csv(r"Zwischenergebnisse\QAA_Symbolisch.csv", liste_beobachtungen_symbolisch, liste_beobachtungen_symbolisch, Qll, "Qll")
return Qll
@@ -374,11 +401,17 @@ class StochastischesModell:
db_zugriff = Datenbankzugriff(pfad_datenbank)
dict_stabwAA_vorinfo = db_zugriff.get_stabw_AA_Netzpunkte()
liste_varianzkomponenten = db_zugriff.get_varianzkomponentenschaetzung()
substitutionen = {}
for koordinate, stabwAA in dict_stabwAA_vorinfo.items():
substitutionen[sp.Symbol(str(koordinate).strip())] = float(stabwAA)
for (varianzkomponenteID, instrumenteID, beobachtungsgruppe,
varianz_varianzkomponentenschaetzung) in liste_varianzkomponenten:
substitutionen[sp.Symbol(f"varkomp_{instrumenteID}_Anschlusspunkte")] = float(varianz_varianzkomponentenschaetzung)
if not hasattr(self, "func_QAA_numerisch"):
self.func_QAA_numerisch = None
if not hasattr(self, "liste_symbole_lambdify_QAA"):