Files
Masterprojekt-Campusnetz/Stochastisches_Modell.py
2025-12-07 16:07:51 +01:00

112 lines
4.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sympy as sp
from dataclasses import dataclass, field
from typing import Dict, Tuple
@dataclass
class StochastischesModellApriori:
sigma_beob: Iterable[float] #σ der einzelnen Beobachtung
group_beob: Iterable[int] #Gruppenzugehörigkeit jeder Beobachtung (Distanz, Richtung, GNSS, Nivellement,...,)
sigma0_groups: Dict[int, float] = field(default_factory=dict) #σ0² für jede Gruppe
def __post_init__(self):
self.sigma_beob = sp.Matrix(list(self.sigma_beob)) #Spaltenvektor
self.group_beob = sp.Matrix(list(self.group_beob)) #Spaltenvektor
if self.sigma_beob.rows != self.group_beob.rows:
raise ValueError("sigma_obs und group_ids müssen gleich viele Einträge haben.")
unique_groups = sorted({int(g) for g in self.group_beob}) #jede Beobachtungsgruppe wird genau einmal berücksichtigt
for g in unique_groups:
if g not in self.sigma0_groups: #Fehlende Gruppen mit σ_0j^2 = 1.0
self.sigma0_groups[g] = 1.0
@property
def n_beob(self) -> int:
return int(self.sigma_beob.rows)
def aufstellen_Qll_P(self) -> Tuple[sp.Matrix, sp.Matrix]:
n = self.n_beob
Q_ll = sp.zeros(n, n)
P = sp.zeros(n, n)
for i in range(n):
sigma_i = self.sigma_beob[i, 0] #σ-Wert der i-ten Beobachtung holen
g = int(self.group_beob[i, 0]) #Gruppenzugehörigkeit der Beobachtung bestimmen
sigma0_sq = self.sigma0_groups[g] #Den Varianzfaktor der Gruppe holen
q_ii = sigma_i**2 #σ² berechnen
Q_ll[i, i] = q_ii #Diagonale
P[i, i] = 1 / (sigma0_sq * q_ii) #durch VKS nicht mehr P=Qll^-1
return Q_ll, P
@staticmethod
def redundanz_pro_beobachtung(A: sp.Matrix, P: sp.Matrix) -> sp.Matrix:
n_beob = P.rows #Anzahl der Beobachtungen (Zeilen in P)
n_param = A.cols #Anzahl der Unbekannten (Spalten in A)
sqrtP = sp.zeros(n_beob, n_beob) #Wurzel von P (der Diagonale)
for i in range(n_beob):
sqrtP[i, i] = sp.sqrt(P[i, i])
A_tilde = sqrtP * A
M = (A_tilde.T * A_tilde).inv()
r_vec = sp.zeros(n_beob, 1)
for i in range(n_beob):
a_i = A_tilde.row(i) # 1 × n_param
a_i_row = sp.Matrix([a_i]) # explizit 1×n-Matrix
r_i = 1 - (a_i_row * M * a_i_row.T)[0, 0]
r_vec[i, 0] = r_i
return r_vec
def varianzkomponentenschaetzung(
self,
v: sp.Matrix, # Residuenvektor (n × 1)
A: sp.Matrix, # Designmatrix
) -> Dict[int, float]:
if v.rows != self.n_beob:
raise ValueError("Länge von v passt nicht zur Anzahl Beobachtungen im Modell.")
# Aktuelle Gewichte
Q_ll, P = self.aufstellen_Qll_P()
# Redundanzzahlen pro Beobachtung
r_vec = self.redundanz_pro_beobachtung(A, P)
new_sigma0_sq: Dict[int, float] = {}
# Für jede Gruppe j:
unique_groups = sorted({int(g) for g in self.group_beob})
for g in unique_groups:
# Indizes der Beobachtungen in dieser Gruppe
idx = [i for i in range(self.n_beob) if int(self.group_beob[i, 0]) == g]
if not idx:
continue
# v_j, P_j, r_j extrahieren
v_j = sp.Matrix([v[i, 0] for i in idx]) # (m_j × 1)
P_j = sp.zeros(len(idx), len(idx))
r_j = 0
for ii, i in enumerate(idx):
P_j[ii, ii] = P[i, i]
r_j += r_vec[i, 0]
# σ̂_j^2 = (v_jᵀ P_j v_j) / r_j
sigma_hat_j_sq = (v_j.T * P_j * v_j)[0, 0] / r_j
# als float rausgeben, kann man aber auch symbolisch lassen
new_sigma0_sq[g] = float(sigma_hat_j_sq)
return new_sigma0_sq
def update_sigma0(self, new_sigma0_sq: Dict[int, float]) -> None:
for g, val in new_sigma0_sq.items():
self.sigma0_groups[int(g)] = float(val)