Files
Masterprojekt_V3/Netzqualitaet_Zuverlaessigkeit.py
2026-02-08 19:25:35 +01:00

831 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from dataclasses import dataclass
import numpy as np
from scipy import stats
from scipy.stats import norm
import pandas as pd
from IPython.display import HTML
from IPython.display import display, clear_output
import ipywidgets as widgets
import itables
from itables.widget import ITable
@dataclass
class Zuverlaessigkeit:
@staticmethod
def gesamtredundanz(n, u):
"""
Berechnet die Gesamtredundanz des Netzes.
Die Gesamtredundanz ergibt sich aus der Differenz zwischen der Anzahl der
Beobachtungen n und der Anzahl der Unbekannten u. Sie entspricht der Anzahl
der Freiheitsgrade.
:param n: Anzahl der Beobachtungen.
:type n: int
:param u: Anzahl der Unbekannten.
:type u: int
:return: Gesamtredundanz des Netzes.
:rtype: int
"""
r_gesamt = n - u
print(f"Die Gesamtredundanz des Netzes beträgt: {r_gesamt}")
return r_gesamt
@staticmethod
def berechne_R(Q_vv, P):
"""
Berechnet die Redundanzmatrix R aus Qvv und der Gewichtsmatrix P.
Die Redundanzmatrix wird definiert als:
R = Qvv · P
:param Q_vv: Kofaktor-Matrix der Residuen.
:type Q_vv: numpy.ndarray
:param P: Gewichtsmatrix der Beobachtungen.
:type P: numpy.ndarray
:return: Redundanzmatrix R.
:rtype: numpy.ndarray
"""
R = Q_vv @ P
return R
@staticmethod
def berechne_ri(R):
"""
Berechnet die Redundanzanteile einzelner Beobachtungen.
Die Redundanzanteile rᵢ ergeben sich aus den Diagonalelementen der Redundanzmatrix R.
Zusätzlich werden die effektiven Redundanzanteile EVi in Prozent berechnet:
EVi = 100 · rᵢ
:param R: Redundanzmatrix R.
:type R: numpy.ndarray
:return: Tuple aus Redundanzanteilen rᵢ und effektiven Redundanzanteilen EVi in Prozent.
:rtype: tuple[numpy.ndarray, numpy.ndarray]
"""
ri = np.diag(R)
EVi = 100.0 * ri
return ri, EVi
@staticmethod
def klassifiziere_ri(ri):
"""
Klassifiziert einen Redundanzanteil rᵢ nach seiner Kontrollierbarkeit.
Der Redundanzanteil wird anhand üblicher geodätischer Schwellenwerte
qualitativ bewertet.
:param ri: Redundanzanteil einer einzelnen Beobachtung.
:type ri: float
:return: Qualitative Bewertung der Kontrollierbarkeit.
:rtype: str
"""
if ri < 0.01:
return "nicht kontrollierbar"
elif ri < 0.10:
return "schlecht kontrollierbar"
elif ri < 0.30:
return "ausreichend kontrollierbar"
elif ri < 0.70:
return "gut kontrollierbar"
else:
return "nahezu vollständig redundant"
@staticmethod
def redundanzanteile_ri(Qvv, P, liste_beob):
"""
Berechnet und dokumentiert Redundanzanteile rᵢ und EVᵢ für alle Beobachtungen.
Die Ergebnisse werden als DataFrame ausgegeben, als HTML-Tabelle angezeigt und
als Excel-Datei exportiert.
:param Qvv: Kofaktor-Matrix der Residuen.
:type Qvv: numpy.ndarray
:param P: Gewichtsmatrix der Beobachtungen.
:type P: numpy.ndarray
:param liste_beob: Liste der Beobachtungslabels (Zeilenbeschriftungen) zur Zuordnung der Ergebnisse.
:type liste_beob: list
:return: Redundanzmatrix R, Redundanzanteile rᵢ, effektive Redundanzanteile EVᵢ, Ergebnistabelle als DataFrame.
:rtype: tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, pandas.DataFrame]
"""
R = Zuverlaessigkeit.berechne_R(Qvv, P)
ri, EVi = Zuverlaessigkeit.berechne_ri(R)
ri = np.asarray(ri).reshape(-1)
EVi = np.asarray(EVi).reshape(-1)
labels = [str(s) for s in liste_beob]
klassen = [Zuverlaessigkeit.klassifiziere_ri(r) for r in ri]
Redundanzanteile = pd.DataFrame({"Beobachtung": labels, "r_i": ri, "EV_i [%]": EVi, "Klassifikation": klassen, })
display(HTML(Redundanzanteile.to_html(index=False)))
Redundanzanteile.to_excel(r"Netzqualitaet\Redundanzanteile.xlsx", index=False)
return R, ri, EVi, Redundanzanteile
@staticmethod
def globaltest(r_gesamt, sigma0_apost, sigma0_apriori=1):
"""
Führt den Globaltest zur Prüfung des Ausgleichungsmodells durch.
Der Globaltest überprüft, ob die a-posteriori Standardabweichung der Gewichtseinheit σ̂₀
mit der a-priori Annahme σ₀ vereinbar ist. Als Testgröße wird verwendet:
T_G = (σ̂₀²) / (σ₀²)
Die Entscheidung erfolgt über die F-Verteilung. Das Signifikanzniveau alpha wird interaktiv abgefragt
(Standard: 0.001). Zusätzlich wird eine Ergebnis-Tabelle und eine Interpretation ausgegeben.
:param r_gesamt: Gesamtredundanz bzw. Freiheitsgrade.
:type r_gesamt: int
:param sigma0_apost: a-posteriori Standardabweichung der Gewichtseinheit σ̂₀.
:type sigma0_apost: float
:param sigma0_apriori: a-priori Standardabweichung der Gewichtseinheit σ₀ (Standard=1).
:type sigma0_apriori: float
:return: Dictionary mit Testparametern, Testergebnis (H₀ angenommen/verworfen) und Interpretation.
:rtype: dict[str, Any]
:raises ValueError: Wenn alpha nicht in (0, 1) liegt oder nicht in float umgewandelt werden kann.
"""
alpha_input = input("Irrtumswahrscheinlichkeit α wählen (z.B. 0.05, 0.01) [Standard=0.001]: ").strip()
alpha = 0.001 if alpha_input == "" else float(alpha_input)
T_G = (sigma0_apost ** 2) / (sigma0_apriori ** 2)
F_krit = stats.f.ppf(1 - alpha, r_gesamt, 10 ** 9)
H0 = T_G < F_krit
if H0:
interpretation = (
"Nullhypothese H₀ angenommen."
)
else:
interpretation = (
"Nullhypothese H₀ verworfen!"
"Dies kann folgende Gründe haben:"
"→ Es befinden sich grobe Fehler im Datenmaterial. Bitte Lokaltest durchführen und ggf. grobe Fehler im Datenmaterial entfernen."
"→ Das stochastische Modell ist zu optimistisch. Bitte Gewichte überprüfen und ggf. anpassen."
)
globaltest = pd.DataFrame([
["Freiheitsgrad", r_gesamt],
["σ̂₀ a posteriori", sigma0_apost],
["σ₀ a priori", sigma0_apriori],
["Signifikanzniveau α", alpha, ],
["Testgröße T_G", T_G, ],
["Kritischer Wert Fₖ", F_krit],
["Nullhypothese H₀", "angenommen" if H0 else "verworfen"], ], columns=["Größe", "Wert"])
display(HTML(globaltest.to_html(index=False)))
print(interpretation)
return {
"r_gesamt": r_gesamt,
"sigma0_apost": sigma0_apost,
"sigma0_apriori": sigma0_apriori,
"alpha": alpha,
"T_G": T_G,
"F_krit": F_krit,
"H0_angenommen": H0,
"Interpretation": interpretation,
}
def lokaltest_innere_Zuverlaessigkeit(v, Q_vv, ri, labels, s0_apost, alpha, beta):
"""
Führt den Lokaltest zur Grobfehlerdetektion je Beobachtung durch.
Auf Basis der Residuen v, der Kofaktor-Matrix der Residuen Qvv und der Redundanzanteile rᵢ
werden für jede Beobachtung statistische Kennwerte zur Detektion grober Fehler berechnet. Dazu zählen:
- Grobfehlerabschätzung: GFᵢ = vᵢ / rᵢ
- Standardabweichung der Residuen: s_vᵢ = s₀ · √q_vᵢ (mit q_vᵢ = diag(Qvv))
- Normierte Verbesserung: NVᵢ = |vᵢ| / s_vᵢ
- Nichtzentralitätsparameter: δ₀ = k + k_A
mit k aus dem zweiseitigen Normalquantil (α) und k_A aus der Testmacht (1β)
- Grenzwert der Aufdeckbarkeit (Minimal detektierbarer Grobfehler): GRZWᵢ = (s_vᵢ / rᵢ) · δ₀
Beobachtungen werden als auffällig markiert, wenn NVᵢ > δ₀. Für rᵢ = 0 wird die Grobfehlerabschätzung
und der Grenzwert als NaN gesetzt.
:param v: Residuenvektor der Beobachtungen.
:type v: np.asarray
:param Q_vv: Kofaktor-Matrix der Residuen.
:type Q_vv: np.asarray
:param ri: Redundanzanteile der Beobachtungen.
:type ri: np.asarray
:param labels: Liste der Beobachtungen zur Zuordnung in der Ergebnistabelle.
:type labels: list
:param s0_apost: a-posteriori Standardabweichung der Gewichtseinheit s₀.
:type s0_apost: float
:param alpha: Irrtumswahrscheinlichkeit α (Signifikanzniveau, zweiseitiger Test).
:type alpha: float
:param beta: Wahrscheinlichkeit β für einen Fehler 2. Art (Testmacht = 1β).
:type beta: float
:return: DataFrame mit NVᵢ, Auffälligkeit, Grobfehlerabschätzung GFᵢ und Grenzwert GRZWᵢ je Beobachtung.
:rtype: pandas.DataFrame
:raises ValueError: Wenn alpha oder beta nicht im Intervall (0, 1) liegen.
"""
v = np.asarray(v, float).reshape(-1)
Q_vv = np.asarray(Q_vv, float)
ri = np.asarray(ri, float).reshape(-1)
labels = list(labels)
# Grobfehlerabschätzung:
ri_ = np.where(ri == 0, np.nan, ri)
GF = -v / ri_
# Standardabweichungen der Residuen
qv = np.diag(Q_vv).astype(float)
s_vi = float(s0_apost) * np.sqrt(qv)
# Normierte Verbesserung NV
NV = np.abs(v) / s_vi
# Quantile k und kA (zweiseitig),
k = float(norm.ppf(1 - alpha / 2))
kA = float(norm.ppf(1 - beta)) # (Testmacht 1-β)
# Nichtzentralitätsparameter δ0
nzp = k + kA
# Grenzwert für die Aufdeckbarkeit eines GF (GRZW)
GRZW_i = (s_vi / ri_) * nzp
auffaellig = NV > nzp
Lokaltest_innere_Zuv = pd.DataFrame({
"Beobachtung": labels,
"v_i": v,
"r_i": ri,
"s_vi": s_vi,
"k": k,
"NV_i": NV,
"auffaellig": auffaellig,
"GF_i": GF,
"GRZW_i": GRZW_i,
"alpha": alpha,
"beta": beta,
"kA": kA,
"δ0": nzp,
})
return Lokaltest_innere_Zuv
def aufruf_lokaltest(liste_beob, alpha, ausgabe_parameterschaetzung, ri, s0_aposteriori):
"""Startet den Lokaltest und erzeugt die interaktive Tabelle.
:param liste_beob: Liste der Beobachtungslabels.
:type liste_beob: list
:param alpha: Signifikanzniveau.
:type alpha: float
:param ausgabe_parameterschaetzung: Dictionary mit den Ergebnissen der letzten Iteration der Parameterschätzung.
:type ausgabe_parameterschaetzung: dict
:param ri: Redundanz.
:type ri: Any
:param s0_aposteriori: a-posteriori Standardabweichung.
:type s0_aposteriori: float
:return: ausschalten_dict
:rtype: dict
"""
# Initialisieren einer interaktiven Tabelle für die Benutzereingaben
itables.init_notebook_mode()
labels = [str(s) for s in liste_beob]
# Benutzereingabe von β
beta_input = input("β für Macht des Tests (1-β) wählen [Standard: 0.20]: ").strip()
beta = 0.20 if beta_input == "" else float(beta_input)
# Berechnungen für den Lokaltest
Lokaltest = Zuverlaessigkeit.lokaltest_innere_Zuverlaessigkeit(
v=ausgabe_parameterschaetzung["v"],
Q_vv=ausgabe_parameterschaetzung["Q_vv"],
ri=ri,
labels=labels,
s0_apost=s0_aposteriori,
alpha=alpha,
beta=beta
)
if "v_i" in Lokaltest.columns:
Lokaltest["v_i"] = Lokaltest["v_i"].round(6)
if "r_i" in Lokaltest.columns:
Lokaltest["r_i"] = Lokaltest["r_i"].round(4)
if "s_vi" in Lokaltest.columns:
Lokaltest["s_vi"] = Lokaltest["s_vi"].round(6)
if "GF_i" in Lokaltest.columns:
Lokaltest["GF_i"] = Lokaltest["GF_i"].round(6)
if "GRZW_i" in Lokaltest.columns:
Lokaltest["GRZW_i"] = Lokaltest["GRZW_i"].round(6)
# Anlegen des Dataframes
df = Lokaltest.copy()
if "Beobachtung" not in df.columns:
if df.index.name == "Beobachtung":
df = df.reset_index()
else:
df = df.reset_index().rename(columns={"index": "Beobachtung"})
if "Beobachtung_ausschalten" not in df.columns:
df.insert(0, "Beobachtung_ausschalten", "")
else:
zeile = df.pop("Beobachtung_ausschalten")
df.insert(0, "Beobachtung_ausschalten", zeile)
gui = LokaltestInnereZuverlaessigkeitGUI(df)
gui.ausgabe_erstellen()
gui.zeige_tabelle()
Lokaltest.to_excel(r"Netzqualitaet\Lokaltest_innere_Zuverlaessugkeit.xlsx", index=False)
return gui.ausschalten_dict, beta
def aeussere_zuverlaessigkeit(
Lokaltest, bezeichnung, Qxx, A, P, s0_apost, unbekannten_liste, x,
ausschliessen=("lA_",),
):
"""
Berechnet Parameter der äußeren Zuverlässigkeit (EP/EF) je Beobachtung.
Auf Basis der Ergebnisse des Lokaltests werden für jede Beobachtung Maße der äußeren
Zuverlässigkeit bestimmt. Dazu zählen:
- Einfluss auf die (relative) Punktlage EP:
- aus geschätzter Modellstörung: EP_GF,i = |(1 - r_i) · GF_i|
- aus Grenzwert der nicht mehr aufdeckbaren Modellstörung: EP_GRZW,i = |(1 - r_i) · GRZW_i|
Für Winkelbeobachtungen (R/ZW) wird EP in eine äquivalente Querabweichung (in m) umgerechnet: q = EP · s
wobei EP als Winkelstörung im Bogenmaß (rad) und s als räumliche Strecke zwischen Stand- und
Zielpunkt verwendet wird.
- Einflussfaktor / Netzverzerrung EF (Worst-Case-Einfluss einer nicht detektierten Störung):
Es wird eine Einzelstörung Δl_i = GRZW_i angesetzt (alle anderen Δl_j = 0) und in den
Unbekanntenraum übertragen: Δx = Q_xx · A^T · P · Δl
Der Einflussfaktor wird lokal (nur für die von der Beobachtung berührten Punktkoordinaten,
i.d.R. Stand- und Zielpunkt) über die gewichtete Norm berechnet: EF_i^2 = (Δx_loc^T · Q_loc^{-1} · Δx_loc) / s0^2
mit s0 = a posteriori Standardabweichung der Gewichtseinheit.
- Punktstreuungsmaß SP_3D und maximale Verfälschung EF·SP:
Für die berührten Punkte wird je Punkt der 3×3-Block aus Q_xx betrachtet: als Maß wird die maximale Spur
verwendet: SP_3D,loc = s0 · sqrt( max( tr(Q_P) ) )
und daraus: (EF·SP)_i = EF_i · SP_3D,loc
Pseudobeobachtungen (z.B. Lagerungs-/Anschlussgleichungen) können über Präfixe in
"ausschliessen" aus der Auswertung entfernt werden. Es wird geprüft, ob die Anzahl
der Bezeichnungen und die Zeilenanzahl des Lokaltests zur Beobachtungsanzahl von A passen.
:param Lokaltest: DataFrame des Lokaltests`.
:type Lokaltest: pandas.DataFrame
:param bezeichnung: Bezeichnungen der Beobachtungen.
:type bezeichnung: list
:param Qxx: Kofaktor-Matrix der Unbekannten.
:type Qxx: numpy.ndarray
:param A: Jacobi-Matrix (A-Matrix).
:type A: numpy.ndarray
:param P: Gewichtsmatrix der Beobachtungen.
:type P: numpy.ndarray
:param s0_apost: a-posteriori Standardabweichung der Gewichtseinheit s₀.
:type s0_apost: float
:param unbekannten_liste: Liste der Unbekannten.
:type unbekannten_liste: list
:param x: Unbekanntenvektor.
:type x: array_like
:param ausschliessen: Präfixe von Beobachtungsbezeichnungen, die aus der Auswertung entfernt werden sollen
(Standard: ("lA_",) für Lagerungs-/Pseudobeobachtungen).
:type ausschliessen: tuple
:return: DataFrame mit Stand/Zielpunkt, Redundanzanteil rᵢ, EP (aus GF und GRZW), EF sowie SP_3D und EF·SP_3D.
:rtype: pandas.DataFrame
:raises ValueError: Wenn die Anzahl der Bezeichnungen oder die Zeilenanzahl des Lokaltests nicht zu A passt.
"""
lokaltest_daten = Lokaltest.copy()
bezeichnung = [str(l) for l in list(bezeichnung)]
Qxx = np.asarray(Qxx, float)
A = np.asarray(A, float)
P = np.asarray(P, float)
x = np.asarray(x, float).reshape(-1)
namen_str = [str(sym) for sym in unbekannten_liste]
# Konsistenzprüfung
n = A.shape[0]
if len(bezeichnung) != n:
raise ValueError(f"len(labels)={len(bezeichnung)} passt nicht zu A.shape[0]={n}.")
if len(lokaltest_daten) != n:
raise ValueError(f"Lokaltest hat {len(lokaltest_daten)} Zeilen, A hat {n} Beobachtungen.")
# Pseudobeobachtungen lA rausfiltern
beobachtungen = np.ones(n, dtype=bool)
if ausschliessen:
for i, bez in enumerate(bezeichnung):
if any(bez.startswith(pref) for pref in ausschliessen):
beobachtungen[i] = False
lokaltest_daten = lokaltest_daten.loc[beobachtungen].reset_index(drop=True)
bezeichnung = [bez for (bez, k) in zip(bezeichnung, beobachtungen) if k]
A = A[beobachtungen, :]
P = P[np.ix_(beobachtungen, beobachtungen)]
n = A.shape[0]
# Daten aus dem Lokaltest
ri = lokaltest_daten["r_i"].astype(float).to_numpy()
GF = lokaltest_daten["GF_i"].astype(float).to_numpy()
GRZW = lokaltest_daten["GRZW_i"].astype(float).to_numpy()
s0 = float(s0_apost)
# Punktkoordinaten
koordinaten = {}
punkt_ids = sorted({name[1:] for name in namen_str
if name[:1].upper() in ("X", "Y", "Z") and len(name) > 1})
for pid in punkt_ids:
try:
ix = namen_str.index(f"X{pid}")
iy = namen_str.index(f"Y{pid}")
iz = namen_str.index(f"Z{pid}")
koordinaten[pid] = (x[ix], x[iy], x[iz])
except ValueError:
continue
# Standpunkt/Zielpunkt
standpunkte = [""] * n
zielpunkte = [""] * n
for i, bez in enumerate(bezeichnung):
parts = bez.split("_")
sp, zp = None, None
if any(k in bez for k in ["_SD_", "_R_", "_ZW_"]):
if len(parts) >= 5:
sp, zp = parts[3].strip(), parts[4].strip()
elif "gnss" in bez.lower():
if len(parts) >= 2:
sp, zp = parts[-2].strip(), parts[-1].strip()
elif "niv" in bez.lower():
if len(parts) >= 4:
sp = parts[3].strip()
if len(parts) >= 5:
zp = parts[4].strip()
else:
sp = parts[-1].strip()
standpunkte[i] = sp or ""
zielpunkte[i] = zp or ""
# Berechnung des EP
EP_GF = np.abs((1.0 - ri) * GF)
EP_grzw = np.abs((1.0 - ri) * GRZW)
EP_hat_m = np.full(n, np.nan, float)
EP_grzw_m = np.full(n, np.nan, float)
for i, bez in enumerate(bezeichnung):
sp = standpunkte[i]
zp = zielpunkte[i]
wenn_winkel = ("_R_" in bez) or ("_ZW_" in bez)
if not wenn_winkel:
EP_hat_m[i] = EP_GF[i]
EP_grzw_m[i] = EP_grzw[i]
continue
# Wenn Winkel: Querabweichung = Winkel * Strecke (3D)
if sp in koordinaten and zp in koordinaten:
X1, Y1, Z1 = koordinaten[sp]
X2, Y2, Z2 = koordinaten[zp]
s = np.sqrt((X2 - X1) ** 2 + (Y2 - Y1) ** 2 + (Z2 - Z1) ** 2)
EP_hat_m[i] = EP_GF[i] * s
EP_grzw_m[i] = EP_grzw[i] * s
# Berechnung von EF
EF = np.full(n, np.nan, float)
SP_m = np.full(n, np.nan, float)
EF_SP_m = np.full(n, np.nan, float)
for i in range(n):
sp = standpunkte[i]
zp = zielpunkte[i]
bloecke = []
idx = []
try:
if sp:
b = [
namen_str.index(f"X{sp}"),
namen_str.index(f"Y{sp}"),
namen_str.index(f"Z{sp}")
]
bloecke.append(b)
idx += b
if zp:
b = [
namen_str.index(f"X{zp}"),
namen_str.index(f"Y{zp}"),
namen_str.index(f"Z{zp}")
]
bloecke.append(b)
idx += b
except ValueError:
continue
if not bloecke:
continue
idx = list(dict.fromkeys(idx))
dl = np.zeros((n, 1)) # dl ungestört
dl[i, 0] = GRZW[i] # dl gestört durch GRZW
dx = Qxx @ (A.T @ (P @ dl)) # dx mit Störung
dx_pkt = dx[idx, :]
Q_pkt = Qxx[np.ix_(idx, idx)]
# EF
EF2 = (dx_pkt.T @ np.linalg.solve(Q_pkt, dx_pkt)).item() / (s0 ** 2)
EF[i] = np.sqrt(max(0.0, EF2))
# SP 3D: Spur der 3x3 Matrizen in einer Liste
spur_matrix_liste = [np.trace(Qxx[np.ix_(b, b)]) for b in bloecke]
if not spur_matrix_liste:
continue
# SP des schlechtesten Punktes bestimmen
sigma_max = s0 * np.sqrt(max(spur_matrix_liste))
SP_m[i] = sigma_max
EF_SP_m[i] = EF[i] * sigma_max
aeussere_zuverlaessigkeit = pd.DataFrame({
"Beobachtung": bezeichnung,
"Stand-Pkt": standpunkte,
"Ziel-Pkt": zielpunkte,
"r_i": ri,
"EP_GF [mm]": EP_hat_m * 1000.0,
"EP_grzw [mm]": EP_grzw_m * 1000.0,
"EF": EF,
"SP_3D [mm]": SP_m * 1000.0,
"EF*SP_3D [mm]": EF_SP_m * 1000.0,
})
display(HTML(aeussere_zuverlaessigkeit.to_html(index=False)))
aeussere_zuverlaessigkeit.to_excel(r"Netzqualitaet\Aeussere_Zuverlaessigkeit.xlsx", index=False)
return aeussere_zuverlaessigkeit
class LokaltestInnereZuverlaessigkeitGUI:
"""Interaktive Auswahloberfläche für den Lokaltest (innere Zuverlässigkeit).
Die Klasse erzeugt eine ITable-Tabelle auf Basis des Lokaltest-DataFrames und stellt
eine Mehrfachauswahl bereit. Für GNSS-Basislinien wird sichergestellt, dass bei Auswahl
einer Komponente (bx/by/bz) automatisch das gesamte Trio gewählt bzw. abgewählt wird.
"""
def __init__(self, df):
"""Initialisiert die GUI-Objekte.
:param df: DataFrame des Lokaltests (inkl. Spalte "Beobachtung").
:type df: pandas.DataFrame
:return: None
:rtype: None
"""
self.df = df
try:
if not (self.df.index.equals(pd.RangeIndex(start=0, stop=len(self.df), step=1))):
self.df = self.df.reset_index(drop=True)
except:
self.df = self.df.reset_index(drop=True)
self.tabelle = None
self.dict_gnss = {}
self.dict_gnss_erweitert = {}
self.auswahl_zeilen_vorher = set()
self.update_durch_code = False
self.ausschalten_dict = {}
self.output = widgets.Output()
self.btn_auswahl_speichern = widgets.Button(description="Auswahl speichern", icon="download")
self.btn_auswahl_zuruecksetzen = widgets.Button(description="Rückgängig", icon="refresh")
@staticmethod
def gnss_komponenten_extrahieren(beobachtung: str):
"""Extrahiert GNSS-Komponente und einen eindeutigen Key für bx/by/bz-Trio.
:param beobachtung: Text aus Spalte "Beobachtung".
:type beobachtung: str
:return: (komponente, key) oder (None, None)
:rtype: tuple[str | None, str | None]
"""
beobachtung = str(beobachtung).strip()
for gnss_komponente in ["bx", "by", "bz"]:
bezeichnung = f"_gnss{gnss_komponente}_"
if bezeichnung in beobachtung:
key = beobachtung.replace(bezeichnung, "_gnss_")
return gnss_komponente, key
return None, None
def gnss_dictionary_erstellen(self) -> None:
"""Exportiert die Tabelleneinträge in ein Dictionary auf Basis der Tabellenzeilen.
:return: None
:rtype: None
"""
liste_beobachtungen = self.tabelle.df["Beobachtung"].astype(str).tolist()
# Als Instanzvariable speichern
self.dict_gnss = {}
for i, beobachtung in enumerate(liste_beobachtungen):
beobachtung = str(beobachtung).strip()
if "_gnssbx_" in beobachtung:
key = beobachtung.split("_gnssbx_", 1)[1].strip()
if key not in self.dict_gnss:
self.dict_gnss[key] = {}
self.dict_gnss[key]["bx"] = i
if "_gnssby_" in beobachtung:
key = beobachtung.split("_gnssby_", 1)[1].strip()
if key not in self.dict_gnss:
self.dict_gnss[key] = {}
self.dict_gnss[key]["by"] = i
if "_gnssbz_" in beobachtung:
key = beobachtung.split("_gnssbz_", 1)[1].strip()
if key not in self.dict_gnss:
self.dict_gnss[key] = {}
self.dict_gnss[key]["bz"] = i
def gnss_dictionary_erweitert_erstellen(self) -> None:
"""Baut ein Dictionary mit alles GNSS-Komponten auf.
:return: None
:rtype: None
"""
self.dict_gnss_erweitert = {}
for idx, row in self.df.iterrows():
value, key = self.gnss_komponenten_extrahieren(row["Beobachtung"])
if key:
if key not in self.dict_gnss_erweitert:
self.dict_gnss_erweitert[key] = []
self.dict_gnss_erweitert[key].append(idx)
def export_ausschalten_dict(self, Eintrag_Auswahl: str = "beobachtung_ausschalten", Wert_nicht_ausgewaehlt: str = "") -> dict:
"""Exportiert die aktuelle Auswahl in ein Dictionary.
:param Eintrag_Auswahl: Wert für ausgewählte Beobachtungen (beobachtung_ausschalten).
:type Eintrag_Auswahl: str
:param Wert_nicht_ausgewaehlt: Wert für nicht ausgewählte Beobachtungen ("").
:type Wert_nicht_ausgewaehlt: str
:return: Dict {Beobachtung: "beobachtung_ausschalten" oder ""}
:rtype: dict
"""
auswahl = set(self.tabelle.selected_rows or [])
liste_beobachtungen = self.tabelle.df["Beobachtung"].astype(str).tolist()
# Zurückgabe des Ergebnisdictionarys für die Weiterverarbeitung
return {
liste_beobachtungen[i]: (Eintrag_Auswahl if i in auswahl else Wert_nicht_ausgewaehlt)
for i in range(len(liste_beobachtungen))
}
def aktualisiere_ausschalten_dict(self) -> None:
"""Aktualisiert das ausschalten_dict in der Instanzvariablen.
:return: None
:rtype: None
"""
neu = self.export_ausschalten_dict()
self.ausschalten_dict.clear()
self.ausschalten_dict.update(neu)
def ausgabe_aktualisieren(self) -> None:
"""Aktualisiert Ausgabe und schreibt self.ausschalten_dict neu.
:return: None
:rtype: None
"""
self.aktualisiere_ausschalten_dict()
with self.output:
clear_output(wait=True)
auswahl = self.tabelle.selected_rows or []
print(f"AUSGESCHALTET: {len(auswahl)}")
if len(auswahl) > 0:
zeilen = [c for c in ["Beobachtung", "v_i", "r_i", "auffaellig", "GF_i", "GRZW_i"] if c in self.tabelle.df.columns]
display(self.tabelle.df.iloc[auswahl][zeilen].head(30))
def auswahl_exportieren(self, _=None) -> None:
"""Button-Callback: Ausgabe der ausgeschalteten Beobachtungen.
:param _: Button-Event
:type _: Any
:return: None
:rtype: None
"""
self.aktualisiere_ausschalten_dict()
with self.output:
clear_output(wait=True)
print("ausschalten_dict ist aktualisiert.")
ausgeschaltet = [k for k, v in self.ausschalten_dict.items() if v == "X"]
print(f"Nur ausgeschaltete Beobachtungen ({len(ausgeschaltet)}):")
display(ausgeschaltet[:300])
def auswahl_zuruecksetzen(self, _=None) -> None:
"""Button-aktion: setzt Auswahl zurück.
:param _: Button-Event
:type _: Any
:return: None
:rtype: None
"""
self.tabelle.selected_rows = []
self.ausgabe_aktualisieren()
def gnss_auswahl_synchronisieren(self, aenderungen: dict) -> None:
"""Synchronisiert die GNSS-bx/by/bz Auswahl.
:param aenderungen: Dictionary mit Änderungen.
:type aenderungen: dict
:return: None
:rtype: None
"""
if self.update_durch_code:
return
auswahl_aktuell = set(aenderungen["new"] or [])
hinzufuegen = auswahl_aktuell - self.auswahl_zeilen_vorher
entfernt = self.auswahl_zeilen_vorher - auswahl_aktuell
auswahl_final = set(auswahl_aktuell)
# Hinzufügen -> Alle Komponten auswählen
for index in hinzufuegen:
if index not in self.df.index:
continue
beob_name = str(self.df.loc[index, "Beobachtung"])
value, key = self.gnss_komponenten_extrahieren(beob_name)
if key in self.dict_gnss_erweitert:
for p_idx in self.dict_gnss_erweitert[key]:
auswahl_final.add(p_idx)
# Entfernen -> alle Komponenten abwählen
for index in entfernt:
if index not in self.df.index:
continue
beob_name = str(self.df.loc[index, "Beobachtung"])
value, key = self.gnss_komponenten_extrahieren(beob_name)
if key in self.dict_gnss_erweitert:
for p_idx in self.dict_gnss_erweitert[key]:
if p_idx in auswahl_final:
auswahl_final.remove(p_idx)
# Nur bei Änderungen zurück in die Ausgabe schreiben
if auswahl_final != auswahl_aktuell:
self.update_durch_code = True
self.tabelle.selected_rows = sorted(list(auswahl_final))
self.update_durch_code = False
self.auswahl_zeilen_vorher = set(self.tabelle.selected_rows or [])
self.ausgabe_aktualisieren()
def ausgabe_erstellen(self) -> None:
"""Erstellt die Tabelle und verbindet die Buttons.
:return: None
:rtype: None
"""
self.tabelle = ITable(
self.df,
maxBytes=5 * 1024 * 1024, # 5 MB
columnDefs=[
{"targets": 0, "orderable": False, "className": "select-checkbox", "width": "26px"},
],
select={"style": "multi", "selector": "td:first-child"},
order=[[1, "asc"]],
)
self.gnss_dictionary_erstellen()
self.gnss_dictionary_erweitert_erstellen()
self.auswahl_zeilen_vorher = set(self.tabelle.selected_rows or [])
self.aktualisiere_ausschalten_dict()
self.btn_auswahl_speichern.on_click(self.auswahl_exportieren)
self.btn_auswahl_zuruecksetzen.on_click(self.auswahl_zuruecksetzen)
self.tabelle.observe(self.gnss_auswahl_synchronisieren, names="selected_rows")
def zeige_tabelle(self) -> None:
"""Zeigt die Tabelle an.
:return: None
:rtype: None
"""
display(widgets.VBox([self.tabelle, widgets.HBox([self.btn_auswahl_speichern, self.btn_auswahl_zuruecksetzen]), self.output]))
self.ausgabe_aktualisieren()