zusammenfügen 13.1.
This commit is contained in:
@@ -1,53 +1,29 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Sequence, List, Dict
|
||||
import sympy as sp
|
||||
|
||||
import numpy as np
|
||||
from scipy import stats
|
||||
from scipy.stats import norm
|
||||
import pandas as pd
|
||||
|
||||
@dataclass
|
||||
class Zuverlaessigkeit:
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
def berechne_redundanzanteile(res_dict: dict, beobachtungen_labels: list) -> pd.DataFrame:
|
||||
|
||||
# 1. Redundanzmatrix R abrufen
|
||||
# R = Qvv * P
|
||||
R = res_dict.get("R")
|
||||
|
||||
if R is None:
|
||||
raise ValueError("Die Redundanzmatrix R wurde im res_dict nicht gefunden.")
|
||||
|
||||
# 2. Diagonalelemente extrahieren (das sind die r_i Werte)
|
||||
r_anteile = np.diag(R)
|
||||
|
||||
# 3. Ergebnisse in einem Pandas DataFrame zusammenfassen
|
||||
df_redundanz = pd.DataFrame({
|
||||
"Beobachtung": beobachtungen_labels,
|
||||
"r_i": np.round(r_anteile, 4), # Dezimal (0 bis 1)
|
||||
"r_i_prozent": np.round(r_anteile * 100, 2) # In Prozent (0 bis 100%)
|
||||
})
|
||||
|
||||
return df_redundanz
|
||||
def gesamtredundanz(n, u):
|
||||
r = n - u
|
||||
return r
|
||||
|
||||
|
||||
|
||||
def redundanzanalyse(self, r_vec: Sequence[float]) -> Dict[str, object]:
|
||||
r_s = [sp.sympify(r) for r in r_vec]
|
||||
EVi = [float(r * 100) for r in r_s]
|
||||
klassen = [self.klassifiziere_ri(float(r)) for r in r_s]
|
||||
|
||||
return {
|
||||
"r_i": [float(r) for r in r_s],
|
||||
"EVi": EVi,
|
||||
"klassen": klassen,
|
||||
"r_sum": float(sum(r_s)),
|
||||
"min_r": float(min(r_s)),
|
||||
"max_r": float(max(r_s)),
|
||||
}
|
||||
def berechne_R(Q_vv, P):
|
||||
R = Q_vv @ P
|
||||
return R #Redundanzmatrix
|
||||
|
||||
|
||||
def berechne_ri(R):
|
||||
ri = np.diag(R)
|
||||
EVi = 100.0 * ri
|
||||
return ri, EVi #Redundanzanteile
|
||||
|
||||
def klassifiziere_ri(self, ri: float) -> str:
|
||||
|
||||
def klassifiziere_ri(ri): #Klassifizierung der Redundanzanteile
|
||||
if ri < 0.01:
|
||||
return "nicht kontrollierbar"
|
||||
elif ri < 0.10:
|
||||
@@ -60,75 +36,131 @@ class Zuverlaessigkeit:
|
||||
return "nahezu vollständig redundant"
|
||||
|
||||
|
||||
def globaltest(r_gesamt, sigma0_apost, sigma0_apriori, alpha):
|
||||
T_G = (sigma0_apost ** 2) / (sigma0_apriori ** 2)
|
||||
F_krit = stats.f.ppf(1 - alpha, r_gesamt, 10 ** 9)
|
||||
H0 = T_G <= F_krit
|
||||
|
||||
def globaltest(self, sigma0_hat: float, sigma0_apriori: float, F_krit: float):
|
||||
s_hat = sp.sympify(sigma0_hat)
|
||||
s0 = sp.sympify(sigma0_apriori)
|
||||
Fk = sp.sympify(F_krit)
|
||||
|
||||
T_G = (s_hat**2) / (s0**2)
|
||||
H0 = bool(T_G <= Fk)
|
||||
if H0:
|
||||
interpretation = (
|
||||
"Nullhypothese H₀ angenommen.\n"
|
||||
)
|
||||
else:
|
||||
interpretation = (
|
||||
"Nullhypothese H₀ verworfen!\n"
|
||||
"Dies kann folgende Gründe haben:\n"
|
||||
"→ Es befinden sich grobe Fehler im Datenmaterial.\n"
|
||||
"→ Das funktionale Modell ist fehlerhaft.\n"
|
||||
"→ Das stochastische Modell ist zu optimistisch."
|
||||
)
|
||||
|
||||
return {
|
||||
"T_G": float(T_G),
|
||||
"F_krit": float(Fk),
|
||||
"r_gesamt": r_gesamt,
|
||||
"sigma0_apost": sigma0_apost,
|
||||
"sigma0_apriori": sigma0_apriori,
|
||||
"alpha": alpha,
|
||||
"T_G": T_G,
|
||||
"F_krit": F_krit,
|
||||
"H0_angenommen": H0,
|
||||
"Interpretation": interpretation,
|
||||
}
|
||||
|
||||
|
||||
def lokaltest_innere_Zuverlaessigkeit(v, Q_vv, ri, labels, s0_apost, alpha, beta):
|
||||
v = np.asarray(v, float).reshape(-1)
|
||||
Q_vv = np.asarray(Q_vv, float)
|
||||
ri = np.asarray(ri, float).reshape(-1)
|
||||
labels = list(labels)
|
||||
|
||||
def data_snooping(
|
||||
self,
|
||||
v: Sequence[float],
|
||||
Qv_diag: Sequence[float],
|
||||
r_vec: Sequence[float],
|
||||
sigma0_hat: float,
|
||||
k: float,
|
||||
) -> List[Dict[str, float | bool]]:
|
||||
# Standardabweichungen der Residuen
|
||||
qv = np.diag(Q_vv).astype(float)
|
||||
s_vi = float(s0_apost) * np.sqrt(qv)
|
||||
|
||||
v_s = [sp.sympify(x) for x in v]
|
||||
Qv_s = [sp.sympify(q) for q in Qv_diag]
|
||||
r_s = [sp.sympify(r) for r in r_vec]
|
||||
s0 = sp.sympify(sigma0_hat)
|
||||
k_s = sp.sympify(k)
|
||||
# Quantile k und kA (zweiseitig),
|
||||
k = float(norm.ppf(1 - alpha / 2))
|
||||
kA = float(norm.ppf(1 - beta)) # (Testmacht 1-β)
|
||||
|
||||
results = []
|
||||
# Nichtzentralitätsparameter δ0
|
||||
nzp = k + kA
|
||||
|
||||
for vi, Qvi, ri in zip(v_s, Qv_s, r_s):
|
||||
# Normierte Verbesserung NV
|
||||
NV = np.abs(v) / s_vi
|
||||
|
||||
s_vi = s0 * sp.sqrt(Qvi)
|
||||
NV_i = sp.Abs(vi) / s_vi
|
||||
# Grenzen für v_i
|
||||
v_grenz = k * s_vi
|
||||
v_min = -v_grenz
|
||||
v_max = v_grenz
|
||||
|
||||
if ri == 0:
|
||||
GRZW_i = sp.oo
|
||||
else:
|
||||
GRZW_i = (s_vi / ri) * k_s
|
||||
# Grobfehlerabschätzung:
|
||||
ri_safe = np.where(ri == 0, np.nan, ri)
|
||||
GF = -v / ri_safe
|
||||
|
||||
auff = bool(NV_i > k_s)
|
||||
# Grenzwert für die Aufdeckbarkeit eines GF (GRZW)
|
||||
GRZW_i = (s_vi / ri_safe) * k
|
||||
|
||||
results.append({
|
||||
"v_i": float(vi),
|
||||
"Qv_i": float(Qvi),
|
||||
"r_i": float(ri),
|
||||
"s_vi": float(s_vi),
|
||||
"NV_i": float(NV_i),
|
||||
"GRZW_i": float(GRZW_i if GRZW_i != sp.oo else float("inf")),
|
||||
"auffällig": auff,
|
||||
})
|
||||
auffaellig = NV > k
|
||||
|
||||
return results
|
||||
Lokaltest_innere_Zuv = pd.DataFrame({
|
||||
"Beobachtung": labels,
|
||||
"v_i": v,
|
||||
"r_i": ri,
|
||||
"s_vi": s_vi,
|
||||
"k": k,
|
||||
"NV_i": NV,
|
||||
"auffaellig": auffaellig,
|
||||
"v_min": v_min,
|
||||
"v_max": v_max,
|
||||
"GF_i": GF,
|
||||
"GRZW_v": v_grenz, # = k*s_vi
|
||||
"GRZW_i": GRZW_i, # = (s_vi/r_i)*k
|
||||
"alpha": alpha,
|
||||
"beta": beta,
|
||||
"kA": kA,
|
||||
"δ0": nzp,
|
||||
})
|
||||
return Lokaltest_innere_Zuv
|
||||
|
||||
|
||||
def EinflussPunktlage(df_lokaltest):
|
||||
df = df_lokaltest.copy()
|
||||
|
||||
def aeussere_zuverlaessigkeit_EF(self, r_vec: Sequence[float], delta0: float):
|
||||
delta = sp.sympify(delta0)
|
||||
EF_list = []
|
||||
for ri in r_vec:
|
||||
ri_s = sp.sympify(ri)
|
||||
if ri_s == 0:
|
||||
EF = sp.oo
|
||||
else:
|
||||
EF = sp.sqrt((1 - ri_s) / ri_s) * delta
|
||||
EF_list.append(float(EF if EF != sp.oo else float("inf")))
|
||||
r = df["r_i"].astype(float).to_numpy()
|
||||
GF = df["GF_i"].astype(float).to_numpy()
|
||||
nzp = df["δ0"].astype(float).to_numpy()
|
||||
|
||||
return EF_list
|
||||
EF = np.sqrt((1 - r) / r) * nzp
|
||||
EP = (1 - r) * GF
|
||||
|
||||
df["δ0"] = nzp
|
||||
df["EF_i"] = EF
|
||||
df["EP_i"] = EP
|
||||
|
||||
EinflussPunktlage = df[["Beobachtung", "r_i", "GF_i", "EF_i", "EP_i", "δ0", "alpha", "beta"]]
|
||||
return EinflussPunktlage
|
||||
|
||||
|
||||
def aeussere_zuverlaessigkeit_EF(Qxx, A, P, s0_apost, GRZW, labels):
|
||||
Qxx = np.asarray(Qxx, float)
|
||||
A = np.asarray(A, float)
|
||||
P = np.asarray(P, float)
|
||||
GRZW = np.asarray(GRZW, float).reshape(-1)
|
||||
labels = list(labels)
|
||||
|
||||
B = Qxx @ (A.T @ P)
|
||||
|
||||
EF = np.empty_like(GRZW, dtype=float)
|
||||
|
||||
# Für jede Beobachtung i: ∇x_i = B[:,i] * GRZW_i
|
||||
# EF_i^2 = (GRZW_i^2 * B_i^T Qxx^{-1} B_i) / s0^2
|
||||
for i in range(len(GRZW)):
|
||||
bi = B[:, i] # (u,)
|
||||
y = np.linalg.solve(Qxx, bi) # = Qxx^{-1} bi
|
||||
EF2 = (GRZW[i] ** 2) * float(bi @ y) / (float(s0_apost) ** 2)
|
||||
EF[i] = np.sqrt(EF2)
|
||||
|
||||
df = pd.DataFrame({
|
||||
"Beobachtung": labels,
|
||||
"GRZW_i": GRZW,
|
||||
"EF_i": EF
|
||||
})
|
||||
return df
|
||||
Reference in New Issue
Block a user