import ipywidgets as widgets from IPython.display import display, clear_output import numpy as np import pandas as pd import Datenbank import Datumsfestlegung from Netzqualitaet_Genauigkeit import Genauigkeitsmaße class VKS: """Varianzkomponentenschätzung (VKS) mit Anpassung durch Benutzereingaben. Die Klasse stellt Methoden zur Verfügung für: - Berechnung der a-posteriori Standardabweichungen je Beobachtungsgruppe anhand von v, P und R, - Anzeige und Anpassung der Varianzkomponenten (Multiplikationsfaktoren), - Rückschreiben geänderter Faktoren in die Datenbank und Sperren weiterer Eingaben nach dem Speichern. Die interaktive Tabelle stellt die in der Datenbank gespeicherten Varianzkomponenten dar und ergänzt je Zeile einen frei wählbaren Varianzmultiplikator (Standard: 1.0). """ def __init__(self,pfad_datenbank: str) -> None: """Initialisiert die VKS-Klasse. Legt den Datenbankpfad fest, initialisiert den Datenbankzugriff und bereitet Instanzvariablen für DataFrame, Widgets, Button und Log-Ausgabe vor. :param pfad_datenbank: Pfad zur SQLite-Datenbank. :type pfad_datenbank: str :return: None :rtype: None """ self.pfad_datenbank = pfad_datenbank self.df = None self.neue_werte_widgets = [] self.btn_save = None self.log_output = None self.columns = ['ID', 'InstrumentenID', 'Beobachtungsgruppe', 'Varianzfaktor_aktuelle_iteration'] self.liste_ergebnisse = [] self.db_zugriff = Datenbank.Datenbankzugriff(self.pfad_datenbank) def varianzkomponten_berechnen(self, Jacobimatrix_symbolisch_liste_beobachtungsvektor: list, res: dict, R: np.asarray) -> pd.DataFrame: """Berechnet a-posteriori Varianzen und a-posteriori Standardabweichungen je Beobachtungsgruppe. Teilt die Residuen v, die Gewichtsmatrix P sowie die Redundanzmatrix R gemäß der in Jacobimatrix_symbolisch_liste_beobachtungsvektor enthaltenen Gruppenindizes auf. Für jede Beobachtungsgruppe wird die Redundanz ri = Spur(R_i) gebildet und daraus s0_aposteriori berechnet. Unterstützte Gruppenkennungen: "SD", "R", "ZW", "gnss", "niv", "lA". :param Jacobimatrix_symbolisch_liste_beobachtungsvektor: Liste der Zeilenbeschriftungen/Beobachtungskennungen (Strings). :type Jacobimatrix_symbolisch_liste_beobachtungsvektor: list :param res: Ergebnis-Dictionary der Ausgleichung, erwartet mindestens res["v"] (Residuen) und res["P"] (Gewichte). :type res: dict :param R: Redundanzmatrix (z. B. aus R = Q_vv @ P oder äquivalent), passend zu v/P dimensioniert. :type R: np.asarray :return: DataFrame mit den Spalten 'Beobachtungsgruppe', 'Standardabweichung', 'Varianz' :rtype: pd.DataFrame """ self.liste_ergebnisse = [] liste_ergebnisse = [] # Zeilen- und Spaltennummern der jeweiligen Beobachtungsgruppe, z.B. SD für Schrägdistanz ermitteln dict_indizes_beobachtungsgruppen = Datumsfestlegung.Datumsfestlegung.indizes_beobachtungsvektor_nach_beobachtungsgruppe( Jacobimatrix_symbolisch_liste_beobachtungsvektor) # Matrizen, bzw. Vektoren V, P und R aufteilen und die Redundanz und Standardabweichung aposteriori der jeweiligen Beobachtungsgruppe berechnen for beobachtungsgruppe, indizes in dict_indizes_beobachtungsgruppen.items(): z_start, z_ende = indizes[0], indizes[1] s_start, s_ende = indizes[0], indizes[1] # SD = Tachymeter Schrägstrecke if beobachtungsgruppe == "SD": beobachtungsgruppe_lang = "Tachymeter_Streckenbeobachtungen" aufgeteilt_v_SD = res["v"][z_start: z_ende + 1, :] aufgeteilt_P_SD = res["P"][z_start: z_ende + 1, s_start: s_ende + 1] aufgeteilt_R_SD = R[z_start: z_ende + 1, s_start: s_ende + 1] ri_SD = sum(np.diag(aufgeteilt_R_SD)) s0_aposteriori_SD = Genauigkeitsmaße.berechne_s0apost(aufgeteilt_v_SD, aufgeteilt_P_SD, ri_SD, False) liste_ergebnisse.append( {"Beobachtungsgruppe": beobachtungsgruppe_lang, "Standardabweichung a posteriori": s0_aposteriori_SD, "Varianz a posteriori": s0_aposteriori_SD ** 2}) # R = Tachymeter Richtungsbeobachtungen if beobachtungsgruppe == "R": beobachtungsgruppe_lang = "Tachymeter_Richtungsbeobachtungen" aufgeteilt_v_R = res["v"][z_start: z_ende + 1, :] aufgeteilt_P_R = res["P"][z_start: z_ende + 1, s_start: s_ende + 1] aufgeteilt_R_R = R[z_start: z_ende + 1, s_start: s_ende + 1] ri_R = sum(np.diag(aufgeteilt_R_R)) s0_aposteriori_R = Genauigkeitsmaße.berechne_s0apost(aufgeteilt_v_R, aufgeteilt_P_R, ri_R, False) liste_ergebnisse.append( {"Beobachtungsgruppe": beobachtungsgruppe_lang, "Standardabweichung a posteriori": s0_aposteriori_R, "Varianz a posteriori": s0_aposteriori_R ** 2}) # ZW = Tachymeter Zenitwinkelbeobachtung if beobachtungsgruppe == "ZW": beobachtungsgruppe_lang = "Tachymeter_Zenitwinkelbeobachtungen" aufgeteilt_v_ZW = res["v"][z_start: z_ende + 1, :] aufgeteilt_P_ZW = res["P"][z_start: z_ende + 1, s_start: s_ende + 1] aufgeteilt_R_ZW = R[z_start: z_ende + 1, s_start: s_ende + 1] ri_ZW = sum(np.diag(aufgeteilt_R_ZW)) s0_aposteriori_ZW = Genauigkeitsmaße.berechne_s0apost(aufgeteilt_v_ZW, aufgeteilt_P_ZW, ri_ZW, False) liste_ergebnisse.append( {"Beobachtungsgruppe": beobachtungsgruppe_lang, "Standardabweichung a posteriori": s0_aposteriori_ZW, "Varianz a posteriori": s0_aposteriori_ZW ** 2}) # GNSS = GNSS-Basisilinien if beobachtungsgruppe == "gnss": beobachtungsgruppe_lang = "GNSS-Rover_Basislinienbeobachtungen" aufgeteilt_v_gnss = res["v"][z_start: z_ende + 1, :] aufgeteilt_P_gnss = res["P"][z_start: z_ende + 1, s_start: s_ende + 1] aufgeteilt_R_gnss = R[z_start: z_ende + 1, s_start: s_ende + 1] ri_gnss = sum(np.diag(aufgeteilt_R_gnss)) s0_aposteriori_gnss = Genauigkeitsmaße.berechne_s0apost(aufgeteilt_v_gnss, aufgeteilt_P_gnss, ri_gnss, False) liste_ergebnisse.append( {"Beobachtungsgruppe": beobachtungsgruppe_lang, "Standardabweichung a posteriori": s0_aposteriori_gnss, "Varianz a posteriori": s0_aposteriori_gnss ** 2}) # niv = geometrisches Nivellement if beobachtungsgruppe == "niv": beobachtungsgruppe_lang = "Nivellier_Hoehendifferenzbeobachtungen" aufgeteilt_v_niv = res["v"][z_start: z_ende + 1, :] aufgeteilt_P_niv = res["P"][z_start: z_ende + 1, s_start: s_ende + 1] aufgeteilt_R_niv = R[z_start: z_ende + 1, s_start: s_ende + 1] ri_niv = sum(np.diag(aufgeteilt_R_niv)) s0_aposteriori_niv = Genauigkeitsmaße.berechne_s0apost(aufgeteilt_v_niv, aufgeteilt_P_niv, ri_niv, False) liste_ergebnisse.append( {"Beobachtungsgruppe": beobachtungsgruppe_lang, "Standardabweichung a posteriori": s0_aposteriori_niv, "Varianz a posteriori": s0_aposteriori_niv ** 2}) # lA = Anschlusspunkte für die weiche Lagerung if beobachtungsgruppe == "lA": beobachtungsgruppe_lang = "Anschlusspunkte" aufgeteilt_v_lA = res["v"][z_start: z_ende + 1, :] aufgeteilt_P_lA = res["P"][z_start: z_ende + 1, s_start: s_ende + 1] aufgeteilt_R_lA = R[z_start: z_ende + 1, s_start: s_ende + 1] ri_lA = sum(np.diag(aufgeteilt_R_lA)) s0_aposteriori_lA = Genauigkeitsmaße.berechne_s0apost(aufgeteilt_v_lA, aufgeteilt_P_lA, ri_lA, False) # Speichern in Instanzvariable liste_ergebnisse.append( {"Beobachtungsgruppe": beobachtungsgruppe_lang, "Standardabweichung a posteriori": s0_aposteriori_lA, "Varianz a posteriori": s0_aposteriori_lA ** 2}) self.liste_ergebnisse = liste_ergebnisse df_varianzkomponenten = pd.DataFrame(self.liste_ergebnisse) return df_varianzkomponenten def vks_ausfuehren(self) -> None: """Initialisiert die interaktive VKS-Eingabetabelle. Liest die aktuell gespeicherten Varianzkomponenten aus der Datenbank, erstellt daraus einen pandas DataFrame und erzeugt je Zeile ein FloatText-Tupel (Standardwert 1.0) zur Eingabe eines Multiplikationsfaktors. Zusätzlich wird ein Button zum Speichern der Änderungen erstellt. :return: None :rtype: None """ # In der aktuellen Iteration verwendete Varianzkomponenten aus der Datenbank abfragen und zur Tabelle hinzufügen rohdaten = self.db_zugriff.get_varianzkomponentenschaetzung() df_temporaer = pd.DataFrame(rohdaten, columns=self.columns) if self.liste_ergebnisse: sortierreihenfolge = [item['Beobachtungsgruppe'] for item in self.liste_ergebnisse] self.df = (df_temporaer.set_index('Beobachtungsgruppe') .reindex(sortierreihenfolge) .reset_index()) self.df = self.df[self.columns].dropna(subset=['ID']) else: self.df = df_temporaer # Festlegen der Tupel für die Eintragung des Multiplikationsfaktors auf die Varianzkomponente der aktuellen Iteration self.neue_werte_widgets = [ widgets.FloatText(value=1.0, step=0.1, layout=widgets.Layout(width='150px')) for _ in range(len(self.df)) ] # Output speichern self.log_output = widgets.Output() # Button zum speichern der Benutzereingaben definieren self.btn_save = widgets.Button( description="Varianzkomponten anpassen", icon="check", layout=widgets.Layout(width='300px', height='40px') ) self.btn_save.style.button_color = '#3c3f41' self.btn_save.style.text_color = '#afb1b3' # Button verbinden self.btn_save.on_click(self.buttondefinition) def buttondefinition(self, b: any) -> None: """Definieren der durch den Button auszuführenden Aktionen. Liest alle Varianzmultiplikatoren aus den Eingabewidgets aus und schreibt in die Datenbank. Nach erfolgreichem Speichern wird der Button deaktiviert und Benutzereingaben werden gesperrt. :param b: Button-Event-Objekt von ipywidgets (wird beim ausführen übergeben). :type b: Any :return: None :rtype: None """ with self.log_output: clear_output() # Benutzereingaben abfragen liste_varianzkomponten_anpassen = [ (int(self.df.iat[i, 1]), str(self.df.iat[i, 2]), w.value) for i, w in enumerate(self.neue_werte_widgets) if w.value != 1.0 ] # Benutzereingabe in Datenbank übernehmen if liste_varianzkomponten_anpassen: self.db_zugriff.set_varianzkomponente(liste_varianzkomponten_anpassen) # Button zum speichern deaktivieren self.btn_save.disabled = True for w in self.neue_werte_widgets: # Eingabe neuer Faktoren für die Varianzkomponenten verhindern w.disabled = True def zeige_vks_tabelle(self) -> None: """Erzeugt und zeigt die interaktive VKS-Tabelle im Notebook an. Baut eine tabellarische Darstellung auf. Ergänzt die Ausgabe um einen Speichern-Button. :return: None :rtype: None """ # ID nicht mehr anzeigen columns_anzeige = [c for c in self.columns if c != "ID"] # Header erstellen header = widgets.HBox([ widgets.Label(col, layout=widgets.Layout(width='250px', font_weight='bold', align='center')) for col in columns_anzeige ] + [widgets.Label('Varianzmultiplikator', layout=widgets.Layout(width='150px', font_weight='bold', align='center'))]) zeilen = [] # Iterieren über df und widgets gleichzeitig for i, row in enumerate(self.df[columns_anzeige].itertuples(index=False)): zellen = [widgets.Label(str(val), layout=widgets.Layout(width='250px', align='center')) for val in row] zellen.append(self.neue_werte_widgets[i]) zeilen.append(widgets.HBox(zellen)) display(widgets.VBox([header] + zeilen + [widgets.HTML("
"), self.btn_save, self.log_output]))