diff --git a/.idea/Masterprojekt-Campusnetz.iml b/.idea/Masterprojekt-Campusnetz.iml index 1d2fcdf..89b2bd1 100644 --- a/.idea/Masterprojekt-Campusnetz.iml +++ b/.idea/Masterprojekt-Campusnetz.iml @@ -4,7 +4,7 @@ - + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml index ba45cb8..590a59e 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -3,5 +3,5 @@ - + \ No newline at end of file diff --git a/Parameterschaetzung.py b/Parameterschaetzung.py index 7b394a7..a23acce 100644 --- a/Parameterschaetzung.py +++ b/Parameterschaetzung.py @@ -1,26 +1,41 @@ +from typing import Dict, Any import sympy as sp -#für Varianzkomponentenschätzung -MAX_ITER = 10 -TOL = 1e-3 # 0.1%. +def ausgleichung_mit_vks_iterativ( + A: sp.Matrix, + l: sp.Matrix, + modell: StochastischesModell, + max_iter: int = 10, + tol: float = 1e-3, +) -> Dict[str, Any]: + """ + Führt eine iterative Ausgleichung mit Varianzkomponentenschätzung durch. -for loop in range(MAX_ITER): + Ablauf: + - starte mit σ0,g² aus modell.sigma0_groups (meist alle = 1.0) + - wiederhole: + * Ausgleichung + * VKS + * Aktualisierung σ0,g² + bis sich alle σ̂0,g² ~ 1.0 (oder max_iter erreicht). + """ - Q_ll, P = modell.aufstellen_Qll_P() + history = [] # optional: Zwischenergebnisse speichern - N = A.T * P * A - n_vec = A.T * P * l - dx = N.LUsolve(n_vec) + for it in range(max_iter): + result = ausgleichung_einmal(A, l, modell) + history.append(result) - v = l - A * dx + sigma_hat = result["sigma_hat"] - sigma_hat = modell.varianzkomponenten(v, A) + # Prüfkriterium: alle σ̂ nahe bei 1.0? + if all(abs(val - 1.0) < tol for val in sigma_hat.values()): + print(f"Konvergenz nach {it+1} Iterationen.") + break - print(f"Iteration {loop+1}, σ̂² Gruppen:", sigma_hat) + # sonst: Modell-σ0,g² mit VKS-Ergebnis updaten + modell.update_sigma0_von_vks(sigma_hat) - # Prüfen: ist jede Komponente ≈ 1? - if all(abs(val - 1) < TOL for val in sigma_hat.values()): - print("Konvergenz erreicht ✔") - break - - modell.update_sigma(sigma_hat) \ No newline at end of file + # letztes Ergebnis + History zurückgeben + result["history"] = history + return result \ No newline at end of file diff --git a/Stochastisches_Modell.py b/Stochastisches_Modell.py index 4e85267..144d497 100644 --- a/Stochastisches_Modell.py +++ b/Stochastisches_Modell.py @@ -1,6 +1,6 @@ import sympy as sp from dataclasses import dataclass, field -from typing import Dict, Tuple +from typing import Dict, Tuple, Iterable @dataclass class StochastischesModell: @@ -27,12 +27,12 @@ class StochastischesModell: return int(self.sigma_beob.rows) - def aufstellen_Qll_P(self) -> Tuple[sp.Matrix, sp.Matrix]: + def berechne_Qll_P(self) -> Tuple[sp.Matrix, sp.Matrix]: n = self.n_beob Q_ll = sp.zeros(n, n) P = sp.zeros(n, n) - for i in range(self.n): + for i in range(self.n_beob): sigma_i = self.sigma_beob[i, 0] #σ-Wert der i-ten Beobachtung holen g = int(self.group_beob[i, 0]) #Gruppenzugehörigkeit der Beobachtung bestimmen sigma0_sq = self.sigma0_groups[g] #Den Varianzfaktor der Gruppe holen @@ -42,45 +42,45 @@ class StochastischesModell: return Q_ll, P - @staticmethod - def redundanz_pro_beobachtung(A, P): - n = P.rows - sqrtP = sp.zeros(n, n) - for i in range(n): - sqrtP[i, i] = sp.sqrt(P[i, i]) + def berechne_Qvv(self, A: sp.Matrix, Q_ll: sp.Matrix, Q_xx: sp.Matrix) -> sp.Matrix: + Q_vv = Q_ll - A * Q_xx * A.T + return Q_vv #Kofaktormatrix der Beobachtungsresiduen - A_tilde = sqrtP * A - M = (A_tilde.T * A_tilde).inv() + def berechne_R(self, Q_vv: sp.Matrix, P: sp.Matrix) -> sp.Matrix: + R = Q_vv * P + return R #Redundanzmatrix + + + def berechne_r(self, R: sp.Matrix) -> sp.Matrix: + n = R.rows r = sp.zeros(n, 1) for i in range(n): - a_i = sp.Matrix([A_tilde.row(i)]) - r[i] = 1 - (a_i * M * a_i.T)[0] - return r + r[i, 0] = R[i, i] + return r #Redundanzanteile - def varianzkomponenten(self, v, A) -> Dict[int, float]: - _, P = self.aufstellen_Qll_P() - r_obs = self.redundanz_pro_beobachtung(A, P) - gruppen = sorted(set(int(g) for g in self.group_beob)) - - sigma_hat = {} - + def berechne_vks(self,v: sp.Matrix, P: sp.Matrix, r: sp.Matrix) -> Dict[int, float]: + if v.rows != self.n_beob: + raise ValueError("v passt nicht zur Anzahl der Beobachtungen.") + gruppen = sorted({int(g) for g in self.group_beob}) + sigma_gruppen: Dict[int, float] = {} for g in gruppen: - idx = [i for i in range(self.n) if int(self.group_beob[i]) == g] + idx = [i for i in range(self.n_beob) + if int(self.group_beob[i, 0]) == g] + if not idx: + continue - v_i = sp.Matrix([v[i] for i in idx]) - - P_i = sp.zeros(len(idx)) - for k, j in enumerate(idx): - P_i[k, k] = P[j, j] - - r_g = sum(r_obs[j] for j in idx) - - sigma_hat[g] = float((v_i.T * P_i * v_i)[0] / r_g) - return sigma_hat + v_g = sp.Matrix([v[i, 0] for i in idx]) + P_g = sp.zeros(len(idx), len(idx)) + for k, i_beob in enumerate(idx): + P_g[k, k] = P[i_beob, i_beob] + r_g = sum(r[i_beob, 0] for i_beob in idx) + sigma_gruppe_g = (v_g.T * P_g * v_g)[0, 0] / r_g + sigma_gruppen[g] = float(sigma_gruppe_g) + return sigma_gruppen - def update_sigma(self, sigma_hat_dict): - for g, val in sigma_hat_dict.items(): - self.sigma0_groups[g] = float(val) \ No newline at end of file + def update_sigma0_von_vks(self, sigma_hat: Dict[int, float]) -> None: + for g, val in sigma_hat.items(): + self.sigma0_groups[int(g)] = float(val) \ No newline at end of file