Merge remote-tracking branch 'origin/main'
This commit is contained in:
109
Netzqualität_Zuverlässigkeit.py
Normal file
109
Netzqualität_Zuverlässigkeit.py
Normal file
@@ -0,0 +1,109 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Sequence, List, Dict
|
||||||
|
import sympy as sp
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Zuverlaessigkeit:
|
||||||
|
|
||||||
|
def redundanzanalyse(self, r_vec: Sequence[float]) -> Dict[str, object]:
|
||||||
|
r_s = [sp.sympify(r) for r in r_vec]
|
||||||
|
EVi = [float(r * 100) for r in r_s]
|
||||||
|
klassen = [self.klassifiziere_ri(float(r)) for r in r_s]
|
||||||
|
|
||||||
|
return {
|
||||||
|
"r_i": [float(r) for r in r_s],
|
||||||
|
"EVi": EVi,
|
||||||
|
"klassen": klassen,
|
||||||
|
"r_sum": float(sum(r_s)),
|
||||||
|
"min_r": float(min(r_s)),
|
||||||
|
"max_r": float(max(r_s)),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def klassifiziere_ri(self, ri: float) -> str:
|
||||||
|
if ri < 0.01:
|
||||||
|
return "nicht kontrollierbar"
|
||||||
|
elif ri < 0.10:
|
||||||
|
return "schlecht kontrollierbar"
|
||||||
|
elif ri < 0.30:
|
||||||
|
return "ausreichend kontrollierbar"
|
||||||
|
elif ri < 0.70:
|
||||||
|
return "gut kontrollierbar"
|
||||||
|
else:
|
||||||
|
return "nahezu vollständig redundant"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def globaltest(self, sigma0_hat: float, sigma0_apriori: float, F_krit: float):
|
||||||
|
s_hat = sp.sympify(sigma0_hat)
|
||||||
|
s0 = sp.sympify(sigma0_apriori)
|
||||||
|
Fk = sp.sympify(F_krit)
|
||||||
|
|
||||||
|
T_G = (s_hat**2) / (s0**2)
|
||||||
|
H0 = bool(T_G <= Fk)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"T_G": float(T_G),
|
||||||
|
"F_krit": float(Fk),
|
||||||
|
"H0_angenommen": H0,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def data_snooping(
|
||||||
|
self,
|
||||||
|
v: Sequence[float],
|
||||||
|
Qv_diag: Sequence[float],
|
||||||
|
r_vec: Sequence[float],
|
||||||
|
sigma0_hat: float,
|
||||||
|
k: float,
|
||||||
|
) -> List[Dict[str, float | bool]]:
|
||||||
|
|
||||||
|
v_s = [sp.sympify(x) for x in v]
|
||||||
|
Qv_s = [sp.sympify(q) for q in Qv_diag]
|
||||||
|
r_s = [sp.sympify(r) for r in r_vec]
|
||||||
|
s0 = sp.sympify(sigma0_hat)
|
||||||
|
k_s = sp.sympify(k)
|
||||||
|
|
||||||
|
results = []
|
||||||
|
|
||||||
|
for vi, Qvi, ri in zip(v_s, Qv_s, r_s):
|
||||||
|
|
||||||
|
s_vi = s0 * sp.sqrt(Qvi)
|
||||||
|
NV_i = sp.Abs(vi) / s_vi
|
||||||
|
|
||||||
|
if ri == 0:
|
||||||
|
GRZW_i = sp.oo
|
||||||
|
else:
|
||||||
|
GRZW_i = (s_vi / ri) * k_s
|
||||||
|
|
||||||
|
auff = bool(NV_i > k_s)
|
||||||
|
|
||||||
|
results.append({
|
||||||
|
"v_i": float(vi),
|
||||||
|
"Qv_i": float(Qvi),
|
||||||
|
"r_i": float(ri),
|
||||||
|
"s_vi": float(s_vi),
|
||||||
|
"NV_i": float(NV_i),
|
||||||
|
"GRZW_i": float(GRZW_i if GRZW_i != sp.oo else float("inf")),
|
||||||
|
"auffällig": auff,
|
||||||
|
})
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def aeussere_zuverlaessigkeit_EF(self, r_vec: Sequence[float], delta0: float):
|
||||||
|
delta = sp.sympify(delta0)
|
||||||
|
EF_list = []
|
||||||
|
for ri in r_vec:
|
||||||
|
ri_s = sp.sympify(ri)
|
||||||
|
if ri_s == 0:
|
||||||
|
EF = sp.oo
|
||||||
|
else:
|
||||||
|
EF = sp.sqrt((1 - ri_s) / ri_s) * delta
|
||||||
|
EF_list.append(float(EF if EF != sp.oo else float("inf")))
|
||||||
|
|
||||||
|
return EF_list
|
||||||
@@ -23,7 +23,7 @@ def iterative_ausgleichung(
|
|||||||
|
|
||||||
v = l - A * dx #Residuenvektor v
|
v = l - A * dx #Residuenvektor v
|
||||||
|
|
||||||
Q_vv = modell.berechne_Qvv(A, Q_ll, Q_xx) #Kofaktormatrix der Verbesserungen Qvv
|
Q_vv = modell.berechne_Qvv(A, P, Q_xx) #Kofaktormatrix der Verbesserungen Qvv
|
||||||
R = modell.berechne_R(Q_vv, P) #Redundanzmatrix R
|
R = modell.berechne_R(Q_vv, P) #Redundanzmatrix R
|
||||||
r = modell.berechne_r(R) #Redundanzanteile als Vektor r
|
r = modell.berechne_r(R) #Redundanzanteile als Vektor r
|
||||||
|
|
||||||
@@ -44,10 +44,21 @@ def iterative_ausgleichung(
|
|||||||
"sigma0_groups": dict(modell.sigma0_groups),
|
"sigma0_groups": dict(modell.sigma0_groups),
|
||||||
})
|
})
|
||||||
|
|
||||||
if all(abs(val - 1.0) < tol for val in sigma_hat.values()): #Abbruchkriterium
|
# --- Abbruchkriterium ---
|
||||||
print(f"Konvergenz nach {it + 1} Iterationen erreicht.")
|
if sigma_hat:
|
||||||
break
|
max_rel_change = 0.0
|
||||||
|
for g, new_val in sigma_hat.items():
|
||||||
|
old_val = modell.sigma0_groups.get(g, 1.0)
|
||||||
|
if old_val != 0:
|
||||||
|
rel = abs(new_val - old_val) / abs(old_val)
|
||||||
|
max_rel_change = max(max_rel_change, rel)
|
||||||
|
|
||||||
|
if max_rel_change < tol:
|
||||||
|
print(f"Konvergenz nach {it + 1} Iterationen erreicht (max. rel. Änderung = {max_rel_change:.2e}).")
|
||||||
|
modell.update_sigma0_von_vks(sigma_hat)
|
||||||
|
break
|
||||||
|
|
||||||
|
# Varianzfaktoren für nächste Iteration übernehmen
|
||||||
modell.update_sigma0_von_vks(sigma_hat)
|
modell.update_sigma0_von_vks(sigma_hat)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -42,8 +42,8 @@ class StochastischesModell:
|
|||||||
return Q_ll, P
|
return Q_ll, P
|
||||||
|
|
||||||
|
|
||||||
def berechne_Qvv(self, A: sp.Matrix, Q_ll: sp.Matrix, Q_xx: sp.Matrix) -> sp.Matrix:
|
def berechne_Qvv(self, A: sp.Matrix, P: sp.Matrix, Q_xx: sp.Matrix) -> sp.Matrix:
|
||||||
Q_vv = Q_ll - A * Q_xx * A.T
|
Q_vv = P.inv() - A * Q_xx * A.T
|
||||||
return Q_vv #Kofaktormatrix der Beobachtungsresiduen
|
return Q_vv #Kofaktormatrix der Beobachtungsresiduen
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user