import sympy as sp import numpy as np from typing import Iterable, List, Sequence, Tuple, Optional import Stochastisches_Modell import Export class Datumsfestlegung: @staticmethod def weiches_datum(Q_ll: np.ndarray, Q_AA: np.ndarray, varianzkompontenschaetzung_erfolgt: bool, dict_indizes_beobachtungsgruppen: dict) -> np.ndarray: """ Erstellt die erweiterte Kofaktor- und Gewichtsmatrix für eine weiche Datumsfestlegung. Aus den Kofaktormatrizen der Beobachtungen Q_ll und der Kofaktormatrix der Anschlusspunkte Q_AA wird eine erweiterte Kofaktormatrix Q_ext aufgebaut. Zusätzlich wird die zugehörige Gewichtsmatrix P erzeugt. Falls keine Varianzkomponentenschätzung durchgeführt wurde, wird P als Blockmatrix aus den Inversen (Gewichten) von Q_ll und Q_AA aufgebaut. Falls eine Varianzkomponentenschätzung durchgeführt wurde, wird Q_ext entsprechend den in definierten Beobachtungsgruppen in Teilblöcke zerlegt (z.B. SD, R, ZW, gnss, niv, lA). Für jeden Block wird die Gewichtsmatrix berechnet und anschließend zu einer Gesamtgewichtsmatrix zusammengesetzt. :param Q_ll: a-priori Kofaktormatrix der Beobachtungen. :type Q_ll: numpy.ndarray :param Q_AA: a-priori Kofaktormatrix der Anschlusspunkte. :type Q_AA: numpy.ndarray :param varianzkompontenschaetzung_erfolgt: Kennzeichen, ob eine Varianzkomponentenschätzung berücksichtigt werden soll. :type varianzkompontenschaetzung_erfolgt: bool :param dict_indizes_beobachtungsgruppen: Dictionary mit Indexbereichen je Beobachtungsgruppe zur Blockzerlegung. :type dict_indizes_beobachtungsgruppen: dict :return: Tuple aus erweiterter Kofaktormatrix Q_ext und zugehöriger Gewichtsmatrix P. :rtype: tuple[numpy.ndarray, numpy.ndarray] :raises ValueError: Wenn Q_ll oder Q_AA keine quadratische Matrix ist. """ if Stochastisches_Modell.StochastischesModell.berechne_P(Q_ll).ndim != 2 or \ Stochastisches_Modell.StochastischesModell.berechne_P(Q_ll).shape[0] != \ Stochastisches_Modell.StochastischesModell.berechne_P(Q_ll).shape[1]: raise ValueError("Q_ll muss quadratisch sein.") if Stochastisches_Modell.StochastischesModell.berechne_P(Q_AA).ndim != 2 or \ Stochastisches_Modell.StochastischesModell.berechne_P(Q_AA).shape[0] != \ Stochastisches_Modell.StochastischesModell.berechne_P(Q_AA).shape[1]: raise ValueError("Q_AA muss quadratisch sein.") Q_ext = np.block( [[Q_ll, np.zeros((Q_ll.shape[0], Q_AA.shape[0]))], [np.zeros((Q_AA.shape[0], Q_ll.shape[0])), Q_AA]]) if varianzkompontenschaetzung_erfolgt == False: P = np.block([[Stochastisches_Modell.StochastischesModell.berechne_P(Q_ll), np.zeros( (Stochastisches_Modell.StochastischesModell.berechne_P(Q_ll).shape[0], Stochastisches_Modell.StochastischesModell.berechne_P(Q_AA).shape[0]))], [np.zeros( (Stochastisches_Modell.StochastischesModell.berechne_P(Q_AA).shape[0], Stochastisches_Modell.StochastischesModell.berechne_P(Q_ll).shape[0])), Stochastisches_Modell.StochastischesModell.berechne_P( Q_AA)]]) else: print("Varianzkomponentenschätzung wurde durchgeführt") for beobachtungsgruppe, indizes in dict_indizes_beobachtungsgruppen.items(): z_start, z_ende = indizes[0], indizes[1] # Zeile 2 bis inklusive 5 s_start, s_ende = indizes[0], indizes[1] # Spalte 1 bis inklusive 4 if beobachtungsgruppe == "SD": aufgeteilt_SD = Q_ext[z_start: z_ende + 1, s_start: s_ende + 1] if beobachtungsgruppe == "R": aufgeteilt_R = Q_ext[z_start: z_ende + 1, s_start: s_ende + 1] if beobachtungsgruppe == "ZW": aufgeteilt_ZW = Q_ext[z_start: z_ende + 1, s_start: s_ende + 1] if beobachtungsgruppe == "gnss": aufgeteilt_gnss = Q_ext[z_start: z_ende + 1, s_start: s_ende + 1] if beobachtungsgruppe == "niv": aufgeteilt_niv = Q_ext[z_start: z_ende + 1, s_start: s_ende + 1] if beobachtungsgruppe == "lA": aufgeteilt_lA = Q_ext[z_start: z_ende + 1, s_start: s_ende + 1] aufgeteilt_SD_invertiert = Stochastisches_Modell.StochastischesModell.berechne_P(aufgeteilt_SD) aufgeteilt_R_invertiert = Stochastisches_Modell.StochastischesModell.berechne_P(aufgeteilt_R) aufgeteilt_ZW_invertiert = Stochastisches_Modell.StochastischesModell.berechne_P(aufgeteilt_ZW) aufgeteilt_gnss_invertiert = Stochastisches_Modell.StochastischesModell.berechne_P(aufgeteilt_gnss) aufgeteilt_niv_invertiert = Stochastisches_Modell.StochastischesModell.berechne_P(aufgeteilt_niv) aufgeteilt_lA_invertiert = Stochastisches_Modell.StochastischesModell.berechne_P(aufgeteilt_lA) def Z(A, B): return np.zeros((A.shape[0], B.shape[0])) P = np.block([ [aufgeteilt_SD_invertiert, Z(aufgeteilt_SD_invertiert, aufgeteilt_R_invertiert), Z(aufgeteilt_SD_invertiert, aufgeteilt_ZW_invertiert), Z(aufgeteilt_SD_invertiert, aufgeteilt_gnss_invertiert), Z(aufgeteilt_SD_invertiert, aufgeteilt_niv_invertiert), Z(aufgeteilt_SD_invertiert, aufgeteilt_lA_invertiert)], [Z(aufgeteilt_R_invertiert, aufgeteilt_SD_invertiert), aufgeteilt_R_invertiert, Z(aufgeteilt_R_invertiert, aufgeteilt_ZW_invertiert), Z(aufgeteilt_R_invertiert, aufgeteilt_gnss_invertiert), Z(aufgeteilt_R_invertiert, aufgeteilt_niv_invertiert), Z(aufgeteilt_R_invertiert, aufgeteilt_lA_invertiert)], [Z(aufgeteilt_ZW_invertiert, aufgeteilt_SD_invertiert), Z(aufgeteilt_ZW_invertiert, aufgeteilt_R_invertiert), aufgeteilt_ZW_invertiert, Z(aufgeteilt_ZW_invertiert, aufgeteilt_gnss_invertiert), Z(aufgeteilt_ZW_invertiert, aufgeteilt_niv_invertiert), Z(aufgeteilt_ZW_invertiert, aufgeteilt_lA_invertiert)], [Z(aufgeteilt_gnss_invertiert, aufgeteilt_SD_invertiert), Z(aufgeteilt_gnss_invertiert, aufgeteilt_R_invertiert), Z(aufgeteilt_gnss_invertiert, aufgeteilt_ZW_invertiert), aufgeteilt_gnss_invertiert, Z(aufgeteilt_gnss_invertiert, aufgeteilt_niv_invertiert), Z(aufgeteilt_gnss_invertiert, aufgeteilt_lA_invertiert)], [Z(aufgeteilt_niv_invertiert, aufgeteilt_SD_invertiert), Z(aufgeteilt_niv_invertiert, aufgeteilt_R_invertiert), Z(aufgeteilt_niv_invertiert, aufgeteilt_ZW_invertiert), Z(aufgeteilt_niv_invertiert, aufgeteilt_gnss_invertiert), aufgeteilt_niv_invertiert, Z(aufgeteilt_niv_invertiert, aufgeteilt_lA_invertiert)], [Z(aufgeteilt_lA_invertiert, aufgeteilt_SD_invertiert), Z(aufgeteilt_lA_invertiert, aufgeteilt_R_invertiert), Z(aufgeteilt_lA_invertiert, aufgeteilt_ZW_invertiert), Z(aufgeteilt_lA_invertiert, aufgeteilt_gnss_invertiert), Z(aufgeteilt_lA_invertiert, aufgeteilt_niv_invertiert), aufgeteilt_lA_invertiert] ]) return Q_ext, P @staticmethod def indizes_beobachtungsvektor_nach_beobachtungsgruppe(Jacobimatrix_symbolisch_liste_beobachtungsvektor): """ Ermittelt Indexbereiche des Beobachtungsvektors getrennt nach Beobachtungsgruppen. Die Funktion analysiert die Bezeichner des symbolischen Beobachtungsvektors (z.B. aus der Jacobi-Matrix) und ordnet jede Beobachtung anhand ihres Kennzeichens (Beobachtungsart) einer Gruppe zu. Für jede Beobachtungsgruppe wird anschließend der minimale und maximale Index im Beobachtungsvektor bestimmt. Unterstützte Beobachtungsgruppen sind u.a.: - SD : Tachymeter-Strecken, - R : Tachymeter-Richtungen, - ZW : Tachymeter-Zenitwinkel, - gnss: GNSS-Basislinienkomponenten (bx/by/bz), - niv : Geometrisches Nivellement, - lA : Pseudobeobachtungen. Die zurückgegebenen Indexbereiche dienen insbesondere zur Blockzerlegung von Kofaktor- oder Gewichtsmatrizen (z.B. bei Varianzkomponentenschätzung oder weicher Datumsfestlegung). :param Jacobimatrix_symbolisch_liste_beobachtungsvektor: Liste der Beobachtungen. :type Jacobimatrix_symbolisch_liste_beobachtungsvektor: list :return: Dictionary mit Indexbereichen je Beobachtungsgruppe. :rtype: dict :raises ValueError: Wenn für eine Beobachtungsgruppe keine Indizes ermittelt werden können. """ liste_strecken_indizes = [] liste_richtungen_indizes = [] liste_zenitwinkel_indizes = [] liste_gnss_indizes = [] liste_nivellement_indizes = [] liste_anschlusspunkte_indizes = [] dict_beobachtungsgruppen_indizes = {} for beobachtung in Jacobimatrix_symbolisch_liste_beobachtungsvektor: if beobachtung.split("_")[1] == "SD": index = Jacobimatrix_symbolisch_liste_beobachtungsvektor.index(beobachtung) liste_strecken_indizes.append(index) if beobachtung.split("_")[1] == "R": index = Jacobimatrix_symbolisch_liste_beobachtungsvektor.index(beobachtung) liste_richtungen_indizes.append(index) if beobachtung.split("_")[1] == "ZW": index = Jacobimatrix_symbolisch_liste_beobachtungsvektor.index(beobachtung) liste_zenitwinkel_indizes.append(index) if beobachtung.split("_")[1] == "gnssbx" or beobachtung.split("_")[1] == "gnssby" or beobachtung.split("_")[ 1] == "gnssbz": index = Jacobimatrix_symbolisch_liste_beobachtungsvektor.index(beobachtung) liste_gnss_indizes.append(index) if beobachtung.split("_")[1] == "niv": index = Jacobimatrix_symbolisch_liste_beobachtungsvektor.index(beobachtung) liste_nivellement_indizes.append(index) if beobachtung.split("_")[0] == "lA": index = Jacobimatrix_symbolisch_liste_beobachtungsvektor.index(beobachtung) liste_anschlusspunkte_indizes.append(index) dict_beobachtungsgruppen_indizes["SD"] = min(liste_strecken_indizes), max(liste_strecken_indizes) dict_beobachtungsgruppen_indizes["R"] = min(liste_richtungen_indizes), max(liste_richtungen_indizes) dict_beobachtungsgruppen_indizes["ZW"] = min(liste_zenitwinkel_indizes), max(liste_zenitwinkel_indizes) dict_beobachtungsgruppen_indizes["gnss"] = min(liste_gnss_indizes), max(liste_gnss_indizes) dict_beobachtungsgruppen_indizes["niv"] = min(liste_nivellement_indizes), max(liste_nivellement_indizes) dict_beobachtungsgruppen_indizes["lA"] = min(liste_anschlusspunkte_indizes), max(liste_anschlusspunkte_indizes) return dict_beobachtungsgruppen_indizes @staticmethod def make_index(unbekannten_liste): names = [str(s).strip() for s in unbekannten_liste] return names, {n: i for i, n in enumerate(names)} @staticmethod def erstelle_G(x0, unbekannten_liste, liste_punktnummern=None, mit_massstab=True): """ Baut G (u x d) in den vollen Unbekanntenraum. Wenn liste_punktnummern=None, werden alle Punkt-IDs aus unbekannten_liste über X*/Y*/Z* automatisch bestimmt. """ x0 = np.asarray(x0, float).reshape(-1) names, idx = Datumsfestlegung.make_index(unbekannten_liste) u = len(names) d = 7 if mit_massstab else 6 G = np.zeros((u, d), dtype=float) # --- Punktliste automatisch, falls nicht gegeben --- if liste_punktnummern is None: pids = set() for n in names: if len(n) >= 2 and n[0].upper() in ("X", "Y", "Z"): pids.add(n[1:]) # alles nach X/Y/Z liste_punktnummern = sorted(pids) for pid in liste_punktnummern: sx, sy, sz = f"X{pid}", f"Y{pid}", f"Z{pid}" if sx not in idx or sy not in idx or sz not in idx: continue # Punkt nicht vollständig als XYZ-Unbekannte vorhanden ix, iy, iz = idx[sx], idx[sy], idx[sz] xi, yi, zi = x0[ix], x0[iy], x0[iz] # Translationen G[ix, 0] = 1.0 G[iy, 1] = 1.0 G[iz, 2] = 1.0 # Rotationen (δr = ω × r) # Rx: δY=-Z, δZ=+Y G[iy, 3] = -zi G[iz, 3] = yi # Ry: δX=+Z, δZ=-X G[ix, 4] = zi G[iz, 4] = -xi # Rz: δX=-Y, δY=+X G[ix, 5] = -yi G[iy, 5] = xi # Maßstab if mit_massstab: G[ix, 6] = xi G[iy, 6] = yi G[iz, 6] = zi return G def aktive_indices_from_selection(auswahl, unbekannten_liste): names, idx = Datumsfestlegung.make_index(unbekannten_liste) aktive = [] for pid, comp in auswahl: key = f"{comp.upper()}{str(pid)}" if key not in idx: raise KeyError(f"{key} nicht im Unbekanntenvektor.") aktive.append(idx[key]) # unique out = [] seen = set() for i in aktive: if i not in seen: seen.add(i) out.append(i) return out def auswahlmatrix_E(u, aktive_indices): E = np.zeros((u, u), dtype=float) for i in aktive_indices: E[int(i), int(i)] = 1.0 return E def berechne_dx(N, n, G): N = np.asarray(N, float) n = np.asarray(n, float).reshape(-1, 1) G = np.asarray(G, float) u = N.shape[0] d = G.shape[1] K = np.block([ [N, G], [G.T, np.zeros((d, d))] ]) rhs = np.vstack([n, np.zeros((d, 1))]) K_inv = np.linalg.inv(K) sol = K_inv @ rhs dx = sol[:u] k = sol[u:] Q_xx = K_inv[:u, :u] return dx, k, Q_xx