229 lines
12 KiB
Python
229 lines
12 KiB
Python
import ipywidgets as widgets
|
|
from IPython.display import display, clear_output
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
import Datenbank
|
|
import Datumsfestlegung
|
|
from Netzqualitaet_Genauigkeit import Genauigkeitsmaße
|
|
|
|
class VKS:
|
|
"""Varianzkomponentenschätzung (VKS) mit Anpassung durch Benutzereingaben.
|
|
|
|
Die Klasse stellt Methoden zur Verfügung für:
|
|
|
|
- Berechnung der a-posteriori Standardabweichungen je Beobachtungsgruppe anhand von v, P und R,
|
|
- Anzeige und Anpassung der Varianzkomponenten (Multiplikationsfaktoren),
|
|
- Rückschreiben geänderter Faktoren in die Datenbank und Sperren weiterer Eingaben nach dem Speichern.
|
|
|
|
Die interaktive Tabelle stellt die in der Datenbank gespeicherten Varianzkomponenten dar und ergänzt
|
|
je Zeile einen frei wählbaren Varianzmultiplikator (Standard: 1.0).
|
|
"""
|
|
|
|
def __init__(self,pfad_datenbank: str) -> None:
|
|
"""Initialisiert die VKS-Klasse.
|
|
|
|
Legt den Datenbankpfad fest, initialisiert den Datenbankzugriff und bereitet Instanzvariablen
|
|
für DataFrame, Widgets, Button und Log-Ausgabe vor.
|
|
|
|
:param pfad_datenbank: Pfad zur SQLite-Datenbank.
|
|
:type pfad_datenbank: str
|
|
:return: None
|
|
:rtype: None
|
|
"""
|
|
|
|
self.pfad_datenbank = pfad_datenbank
|
|
self.df = None
|
|
self.neue_werte_widgets = []
|
|
self.btn_save = None
|
|
self.log_output = None
|
|
self.columns = ['ID', 'InstrumentenID', 'Beobachtungsgruppe', 'Varianzfaktor_aktuelle_iteration']
|
|
|
|
self.db_zugriff = Datenbank.Datenbankzugriff(self.pfad_datenbank)
|
|
|
|
def varianzkomponten_berechnen(self, Jacobimatrix_symbolisch_liste_beobachtungsvektor: list, res: dict, R: np.ndarray) -> None:
|
|
"""Berechnet a-posteriori Varianzen und a-posteriori Standardabweichungen je Beobachtungsgruppe.
|
|
|
|
Teilt die Residuen v, die Gewichtsmatrix P sowie die Redundanzmatrix R gemäß der in
|
|
Jacobimatrix_symbolisch_liste_beobachtungsvektor enthaltenen Gruppenindizes auf. Für jede
|
|
Beobachtungsgruppe wird die Redundanz ri = Spur(R_i) gebildet und daraus s0_aposteriori
|
|
berechnet.
|
|
|
|
Unterstützte Gruppenkennungen: "SD", "R", "ZW", "gnss", "niv", "lA".
|
|
|
|
:param Jacobimatrix_symbolisch_liste_beobachtungsvektor: Liste der Zeilenbeschriftungen/Beobachtungskennungen (Strings).
|
|
:type Jacobimatrix_symbolisch_liste_beobachtungsvektor: list
|
|
:param res: Ergebnis-Dictionary der Ausgleichung, erwartet mindestens res["v"] (Residuen) und res["P"] (Gewichte).
|
|
:type res: dict
|
|
:param R: Redundanzmatrix (z. B. aus R = Q_vv @ P oder äquivalent), passend zu v/P dimensioniert.
|
|
:type R: np.ndarray
|
|
:return: None
|
|
:rtype: None
|
|
"""
|
|
|
|
# Zeilen- und Spaltennummern der jeweiligen Beobachtungsgruppe, z.B. SD für Schrägdistanz ermitteln
|
|
dict_indizes_beobachtungsgruppen = Datumsfestlegung.Datumsfestlegung.indizes_beobachtungsvektor_nach_beobachtungsgruppe(
|
|
Jacobimatrix_symbolisch_liste_beobachtungsvektor)
|
|
|
|
# Matrizen, bzw. Vektoren V, P und R aufteilen und die Redundanz und Standardabweichung aposteriori der jeweiligen Beobachtungsgruppe berechnen
|
|
for beobachtungsgruppe, indizes in dict_indizes_beobachtungsgruppen.items():
|
|
z_start, z_ende = indizes[0], indizes[1]
|
|
s_start, s_ende = indizes[0], indizes[1]
|
|
|
|
# SD = Tachymeter Schrägstrecke
|
|
if beobachtungsgruppe == "SD":
|
|
aufgeteilt_v_SD = res["v"][z_start: z_ende + 1, :]
|
|
aufgeteilt_P_SD = res["P"][z_start: z_ende + 1, s_start: s_ende + 1]
|
|
aufgeteilt_R_SD = R[z_start: z_ende + 1, s_start: s_ende + 1]
|
|
ri_SD = sum(np.diag(aufgeteilt_R_SD))
|
|
s0_aposteriori_SD = Genauigkeitsmaße.berechne_s0apost(aufgeteilt_v_SD, aufgeteilt_P_SD, ri_SD)
|
|
print(f"s0 aposteriori der Beobachtungsgruppe {beobachtungsgruppe} beträgt: {s0_aposteriori_SD:.4f}")
|
|
print(
|
|
f"Varianz aposteriori der Beobachtungsgruppe {beobachtungsgruppe} beträgt: {s0_aposteriori_SD ** 2:.4f}")
|
|
|
|
# R = Tachymeter Richtungsbeobachtungen
|
|
if beobachtungsgruppe == "R":
|
|
aufgeteilt_v_R = res["v"][z_start: z_ende + 1, :]
|
|
aufgeteilt_P_R = res["P"][z_start: z_ende + 1, s_start: s_ende + 1]
|
|
aufgeteilt_R_R = R[z_start: z_ende + 1, s_start: s_ende + 1]
|
|
ri_R = sum(np.diag(aufgeteilt_R_R))
|
|
s0_aposteriori_R = Genauigkeitsmaße.berechne_s0apost(aufgeteilt_v_R, aufgeteilt_P_R, ri_R)
|
|
print(f"s0 aposteriori der Beobachtungsgruppe {beobachtungsgruppe} beträgt: {s0_aposteriori_R:.4f}")
|
|
print(
|
|
f"Varianz aposteriori der Beobachtungsgruppe {beobachtungsgruppe} beträgt: {s0_aposteriori_R ** 2:.4f}")
|
|
|
|
# ZW = Tachymeter Zenitwinkelbeobachtung
|
|
if beobachtungsgruppe == "ZW":
|
|
aufgeteilt_v_ZW = res["v"][z_start: z_ende + 1, :]
|
|
aufgeteilt_P_ZW = res["P"][z_start: z_ende + 1, s_start: s_ende + 1]
|
|
aufgeteilt_R_ZW = R[z_start: z_ende + 1, s_start: s_ende + 1]
|
|
ri_ZW = sum(np.diag(aufgeteilt_R_ZW))
|
|
s0_aposteriori_ZW = Genauigkeitsmaße.berechne_s0apost(aufgeteilt_v_ZW, aufgeteilt_P_ZW, ri_ZW)
|
|
print(f"s0 aposteriori der Beobachtungsgruppe {beobachtungsgruppe} beträgt: {s0_aposteriori_ZW:.4f}")
|
|
print(
|
|
f"Varianz aposteriori der Beobachtungsgruppe {beobachtungsgruppe} beträgt: {s0_aposteriori_ZW ** 2:.4f}")
|
|
|
|
# GNSS = GNSS-Basisilinien
|
|
if beobachtungsgruppe == "gnss":
|
|
aufgeteilt_v_gnss = res["v"][z_start: z_ende + 1, :]
|
|
aufgeteilt_P_gnss = res["P"][z_start: z_ende + 1, s_start: s_ende + 1]
|
|
aufgeteilt_R_gnss = R[z_start: z_ende + 1, s_start: s_ende + 1]
|
|
ri_gnss = sum(np.diag(aufgeteilt_R_gnss))
|
|
s0_aposteriori_gnss = Genauigkeitsmaße.berechne_s0apost(aufgeteilt_v_gnss, aufgeteilt_P_gnss, ri_gnss)
|
|
print(f"s0 aposteriori der Beobachtungsgruppe {beobachtungsgruppe} beträgt: {s0_aposteriori_gnss:.4f}")
|
|
print(
|
|
f"Varianz aposteriori der Beobachtungsgruppe {beobachtungsgruppe} beträgt: {s0_aposteriori_gnss ** 2:.4f}")
|
|
|
|
# niv = geometrisches Nivellement
|
|
if beobachtungsgruppe == "niv":
|
|
aufgeteilt_v_niv = res["v"][z_start: z_ende + 1, :]
|
|
aufgeteilt_P_niv = res["P"][z_start: z_ende + 1, s_start: s_ende + 1]
|
|
aufgeteilt_R_niv = R[z_start: z_ende + 1, s_start: s_ende + 1]
|
|
ri_niv = sum(np.diag(aufgeteilt_R_niv))
|
|
s0_aposteriori_niv = Genauigkeitsmaße.berechne_s0apost(aufgeteilt_v_niv, aufgeteilt_P_niv, ri_niv)
|
|
print(f"s0 aposteriori der Beobachtungsgruppe {beobachtungsgruppe} beträgt: {s0_aposteriori_niv:.4f}")
|
|
print(
|
|
f"Varianz aposteriori der Beobachtungsgruppe {beobachtungsgruppe} beträgt: {s0_aposteriori_niv ** 2:.4f}")
|
|
|
|
# lA = Anschlusspunkte für die weiche Lagerung
|
|
if beobachtungsgruppe == "lA":
|
|
aufgeteilt_v_lA = res["v"][z_start: z_ende + 1, :]
|
|
aufgeteilt_P_lA = res["P"][z_start: z_ende + 1, s_start: s_ende + 1]
|
|
aufgeteilt_R_lA = R[z_start: z_ende + 1, s_start: s_ende + 1]
|
|
ri_lA = sum(np.diag(aufgeteilt_R_lA))
|
|
s0_aposteriori_lA = Genauigkeitsmaße.berechne_s0apost(aufgeteilt_v_lA, aufgeteilt_P_lA, ri_lA)
|
|
print(f"s0 aposteriori der Beobachtungsgruppe {beobachtungsgruppe} beträgt: {s0_aposteriori_lA:.4f}")
|
|
print(
|
|
f"Varianz aposteriori der Beobachtungsgruppe {beobachtungsgruppe} beträgt: {s0_aposteriori_lA ** 2:.4f}")
|
|
|
|
def vks_ausfuehren(self) -> None:
|
|
"""Initialisiert die interaktive VKS-Eingabetabelle.
|
|
|
|
Liest die aktuell gespeicherten Varianzkomponenten aus der Datenbank, erstellt daraus einen
|
|
pandas DataFrame und erzeugt je Zeile ein FloatText-Tupel (Standardwert 1.0) zur Eingabe eines
|
|
Multiplikationsfaktors. Zusätzlich wird ein Button zum Speichern der Änderungen erstellt.
|
|
|
|
:return: None
|
|
:rtype: None
|
|
"""
|
|
|
|
# In der aktuellen Iteration verwendete Varianzkomponenten aus der Datenbank abfragen und zur Tabelle hinzufügen
|
|
rohdaten = self.db_zugriff.get_varianzkomponentenschaetzung()
|
|
self.df = pd.DataFrame(rohdaten, columns=self.columns)
|
|
|
|
# Festlegen der Tupel für die Eintragung des Multiplikationsfaktors auf die Varianzkomponente der aktuellen Iteration
|
|
self.neue_werte_widgets = [
|
|
widgets.FloatText(value=1.0, step=0.1, layout=widgets.Layout(width='150px'))
|
|
for _ in range(len(self.df))
|
|
]
|
|
|
|
# Output speichern
|
|
self.log_output = widgets.Output()
|
|
|
|
# Button zum speichern der Benutzereingaben definieren
|
|
self.btn_save = widgets.Button(
|
|
description="Varianzkomponten anpassen",
|
|
icon="check",
|
|
layout=widgets.Layout(width='300px', height='40px')
|
|
)
|
|
self.btn_save.style.button_color = '#3c3f41'
|
|
self.btn_save.style.text_color = '#afb1b3'
|
|
|
|
# Button verbinden
|
|
self.btn_save.on_click(self.buttondefinition)
|
|
|
|
def buttondefinition(self, b: any) -> None:
|
|
"""Definieren der durch den Button auszuführenden Aktionen.
|
|
|
|
Liest alle Varianzmultiplikatoren aus den Eingabewidgets aus und schreibt in die Datenbank. Nach erfolgreichem
|
|
Speichern wird der Button deaktiviert und Benutzereingaben werden gesperrt.
|
|
|
|
:param b: Button-Event-Objekt von ipywidgets (wird beim ausführen übergeben).
|
|
:type b: Any
|
|
:return: None
|
|
:rtype: None
|
|
"""
|
|
with self.log_output:
|
|
clear_output()
|
|
# Benutzereingaben abfragen
|
|
liste_varianzkomponten_anpassen = [
|
|
(int(self.df.iat[i, 1]), str(self.df.iat[i, 2]), w.value)
|
|
for i, w in enumerate(self.neue_werte_widgets)
|
|
if w.value != 1.0
|
|
]
|
|
|
|
# Benutzereingabe in Datenbank übernehmen
|
|
if liste_varianzkomponten_anpassen:
|
|
self.db_zugriff.set_varianzkomponente(liste_varianzkomponten_anpassen)
|
|
|
|
# Button zum speichern deaktivieren
|
|
self.btn_save.disabled = True
|
|
for w in self.neue_werte_widgets:
|
|
# Eingabe neuer Faktoren für die Varianzkomponenten verhindern
|
|
w.disabled = True
|
|
|
|
def zeige_vks_tabelle(self) -> None:
|
|
"""Erzeugt und zeigt die interaktive VKS-Tabelle im Notebook an.
|
|
|
|
Baut eine tabellarische Darstellung auf.
|
|
Ergänzt die Ausgabe um einen Speichern-Button.
|
|
|
|
:return: None
|
|
:rtype: None
|
|
"""
|
|
# Header erstellen
|
|
header = widgets.HBox([
|
|
widgets.Label(col, layout=widgets.Layout(width='250px', font_weight='bold', align='center'))
|
|
for col in self.columns
|
|
] + [widgets.Label('Varianzmultiplikator',
|
|
layout=widgets.Layout(width='150px', font_weight='bold', align='center'))])
|
|
|
|
zeilen = []
|
|
# Iterieren über df und widgets gleichzeitig
|
|
for i, row in enumerate(self.df.itertuples(index=False)):
|
|
zellen = [widgets.Label(str(val), layout=widgets.Layout(width='250px', align='center')) for val in row]
|
|
zellen.append(self.neue_werte_widgets[i])
|
|
zeilen.append(widgets.HBox(zellen))
|
|
|
|
# Darstellen der Tabelle
|
|
display(widgets.VBox([header] + zeilen + [widgets.HTML("<br>"), self.btn_save, self.log_output])) |