import csv from decimal import Decimal, getcontext, ROUND_HALF_UP import sqlite3 from typing import Any import xml.etree.ElementTree as ET from Berechnungen import Berechnungen import Berechnungen class Import: """Importfunktionen für Messdaten und Näherungswerte in die SQLite-Datenbank. Die Klasse stellt Methoden zur Verfügung für: - Import von Näherungskoordinaten (LH-Tachymeter, GNSS/ECEF) in die Tabelle Netzpunkte, - Import und Vorverarbeitung von Tachymeterbeobachtungen (CSV-Datei) inkl. optionaler Korrektur durch nicht gerundete Leica-JXL, - Import von GNSS-Basislinien inkl. Kovarianzanteilen in die Tabelle Beobachtungen, - Import von Nivellementdaten (Normalhöhen als Näherungen sowie RVVR-Züge als dh-Beobachtungen), - einfache Plausibilitätsprüfungen (Duplikate, InstrumentenID, Dateiname bereits importiert). """ def __init__(self, pfad_datenbank: str, a : float, b : float) -> None: """Initialisiert die Importklasse. Speichert den Pfad zur SQLite-Datenbank und initialisiert die Hilfsklasse Berechnungen. :param a: Große Halbachse a des Referenzellipsoids in Meter. :type a: float :param b: Kleine Halbachse b des Referenzellipsoids in Meter. :type b: float :param pfad_datenbank: Pfad zur SQLite-Datenbank. :type pfad_datenbank: str :return: None :rtype: None """ self.a = a self.b = b self.pfad_datenbank = pfad_datenbank self.berechnungen = Berechnungen.Berechnungen(self.a, self.b) def string_to_float(self, zahl: str) -> float: """Konvertiert einen Zahlenstring in float. Ersetzt das Dezimaltrennzeichen "," durch "." und führt anschließend float() aus. :param zahl: Zahlenstring (z. B. "12,345" oder "12.345"). :type zahl: str :return: Zahlenwert als float. :rtype: float """ zahl = zahl.replace(',', '.') return float(zahl) def string_to_decimal(self, zahl: str) -> Decimal: """Konvertiert einen Zahlenstring in Decimal. Ersetzt das Dezimaltrennzeichen "," durch "." und erzeugt anschließend ein Decimal. :param zahl: Zahlenstring (z. B. "12,345" oder "12.345"). :type zahl: str :return: Konvertierter Decimal-Wert. :rtype: Decimal """ zahl = zahl.replace(',', '.') return Decimal(zahl) def import_koordinaten_lh_tachymeter(self, pfad_datei: str) -> None: """Importiert Näherungskoordinaten im lokalen Horizontsystem des Tachymeters in die Tabelle Netzpunkte. Die Datei wird semikolon-separiert erwartet. Pro Zeile werden Punktnummer sowie X/Y/Z-Näherung ausgelesen und in Netzpunkte (naeherungx_lh, naeherungy_lh, naeherungz_lh) geschrieben. Vor dem Import werden zwei zentrale Prüfungen durchgeführt: - Punktnummern dürfen in der Importdatei nicht doppelt vorkommen, - Punktnummern aus der Importdatei dürfen noch nicht in der Tabelle Netzpunkte vorhanden sein. :param pfad_datei: Pfad zur CSV-Datei mit Punktnummer und Koordinaten. :type pfad_datei: str :return: None :rtype: None """ liste_punktnummern = [] liste_punktnummern_vorher = [] liste_punktnummern_vorher_db = [] # Import_abbrechen wird True, sobald eine Fehler festgestellt wird. Als Folge wird der Import abgebrochen und eine Fehlermeldung mit Handlungshinweisen für den Benutzer ausgegeben. Import_abbrechen = False with open (pfad_datei, newline='', encoding='utf-8') as csvfile: # Abfragen aller in der Tabelle Netzpunkte enthaltenen Punktnummern con = sqlite3.connect(self.pfad_datenbank) cursor = con.cursor() liste_punktnummern_db = [r[0] for r in cursor.execute("SELECT DISTINCT punktnummer FROM Netzpunkte").fetchall()] cursor.close() con.close() r = csv.reader(csvfile, delimiter=';') for row in r: liste_punktnummern.append(row[0]) # Abbruch des Imports und Ausgabe einer Fehlermeldung, wenn ein Punkt doppelt in der Importdatei vorhanden ist. if row[0] in liste_punktnummern_vorher: Import_abbrechen = True print(f"Der Import wurde abgebrochen, weil in der Datei {pfad_datei} Punktnummern doppelt vorhanden sind. Bitte in der Datei ändern und Import wiederholen.") break liste_punktnummern_vorher.append(row[0]) # Abbruch des Imports und Ausgabe einer Fehlermeldung, wenn mindestens eine Punktnummer aus der Importdatei bereits in der Tabelle Netzpunkte vorhanden ist. if row[0] in liste_punktnummern_db: Import_abbrechen = True print(f"Der Import wurde abgebrochen, weil mindestens ein Teil der Punktnummern aus der Datei {pfad_datei} bereits in der Datenbank vorhanden ist. Bitte in der Datei ändern und Import wiederholen.") break liste_punktnummern_vorher_db.append(row[0]) # Import durchführen, wenn keine Fehler festgestellt wurden if Import_abbrechen == False: con = sqlite3.connect(self.pfad_datenbank) cursor = con.cursor() with open(pfad_datei, newline='', encoding='utf-8') as csvfile: r = csv.reader(csvfile, delimiter=';') for row in r: cursor.execute( "INSERT INTO Netzpunkte (punktnummer, naeherungx_lh, naeherungy_lh, naeherungz_lh) VALUES (?, ?, ?, ?)", (row[0], self.string_to_float(row[1]), self.string_to_float(row[2]), self.string_to_float(row[3]))) con.commit() cursor.close() con.close() print("Der Import der Näherungskoordinaten wurde erfolgreich abgeschlossen") def ist_rundung_von_jxl(self, wert_csv: str, wert_jxl: str) -> bool: """Prüft, ob ein CSV-Wert eine Rundung eines JXL-Wertes (mit mehr Nachkommastellen) ist. Es wird geprüft, ob: - beide Werte echte Zahlen sind, - der CSV-Wert weniger Nachkommastellen als der JXL-Wert besitzt, - der JXL-Wert auf die Nachkommastellenanzahl des CSV-Wertes gerundet exakt dem CSV-Wert entspricht. :param wert_csv: Wert aus der CSV-Datei. :type wert_csv: str :param wert_jxl: Wert aus der JXL-Datei. :type wert_jxl: str :return: True, wenn wert_csv eine Rundung von wert_jxl auf weniger Nachkommastellen darstellt, sonst False. :rtype: bool """ wert_csv = str(wert_csv).strip() wert_jxl = str(wert_jxl).strip() if ":ZH:" in wert_csv: wert_csv = wert_csv.split(":ZH:", 1)[0].strip() if ":ZH:" in wert_jxl: wert_jxl = wert_jxl.split(":ZH:", 1)[0].strip() def ist_zahl(text: str) -> bool: text = str(text).strip() if text == "": return False if text[0] == "+" or text[0] == "-": text = text[1:] if text == "": return False if text.count(",") + text.count(".") > 1: return False if "," in text: teile = text.split(",", 1) if len(teile) != 2: return False if teile[0] == "" or teile[1] == "": return False return teile[0].isdigit() and teile[1].isdigit() if "." in text: teile = text.split(".", 1) if len(teile) != 2: return False if teile[0] == "" or teile[1] == "": return False return teile[0].isdigit() and teile[1].isdigit() return text.isdigit() if ist_zahl(wert_csv) == False: return False if ist_zahl(wert_jxl) == False: return False # Ermittlung der Anzahl Nachkommastellen in CSV-Datei und JXL-Datei anzahl_nachkommastellen_csv = 0 if "," in wert_csv: anzahl_nachkommastellen_csv = len(wert_csv.split(",", 1)[1]) elif "." in wert_csv: anzahl_nachkommastellen_csv = len(wert_csv.split(".", 1)[1]) anzahl_nachkommastellen_jxl = 0 if "," in wert_jxl: anzahl_nachkommastellen_jxl = len(wert_jxl.split(",", 1)[1]) elif "." in wert_jxl: anzahl_nachkommastellen_jxl = len(wert_jxl.split(".", 1)[1]) if anzahl_nachkommastellen_csv >= anzahl_nachkommastellen_jxl: return False wert_csv_decimal = self.string_to_decimal(wert_csv.replace(".", ",")) wert_jxl_decimal = self.string_to_decimal(wert_jxl.replace(".", ",")) q = Decimal("1") if anzahl_nachkommastellen_csv == 0 else Decimal("1." + ("0" * anzahl_nachkommastellen_csv)) wert_jxl_gerundet = wert_jxl_decimal.quantize(q, rounding=ROUND_HALF_UP) wert_csv_gerundet = wert_csv_decimal.quantize(q, rounding=ROUND_HALF_UP) return wert_jxl_gerundet == wert_csv_gerundet def ist_zahl_csv(self, text: str) -> bool: """Prüft, ob ein String in der CSV als Zahl interpretiert werden kann. :param text: Zu prüfender Text. :type text: str :return: True, wenn der Text eine Zahl ist, sonst False. :rtype: bool """ text = str(text).strip() if text == "": return False if text[0] == "+" or text[0] == "-": text = text[1:] if text == "": return False if text.count(",") + text.count(".") > 1: return False if "," in text: teile = text.split(",", 1) if len(teile) != 2: return False if teile[0] == "" or teile[1] == "": return False return teile[0].isdigit() and teile[1].isdigit() if "." in text: teile = text.split(".", 1) if len(teile) != 2: return False if teile[0] == "" or teile[1] == "": return False return teile[0].isdigit() and teile[1].isdigit() return text.isdigit() def korrigiere_beobachtungen_tachymeter_csv_mit_jxl(self, pfad_datei_csv: str, pfad_datei_jxl: str, pfad_datei_csv_out: str) -> dict: """Korrigiert Tachymeterbeobachtungen in einer CSV-Datei über eine JXL-Datei (Leica XML). Die JXL-Datei wird eingelesen und je StationID eine Sequenz an Beobachtungen (Zielpunktname, Horizontal-/Vertikalkreisablesung, Schrägdistanz sowie Prismenhöhe) aufgebaut. Die Winkel werden von Grad nach gon umgerechnet und in der Nachkommastellenauflösung der JXL gerundet. Anschließend wird die CSV zeilenweise verarbeitet: - Standpunktzeilen werden der zugehörigen StationID/IH aus der JXL zugeordnet, - Beobachtungszeilen werden falls der CSV-Wert eine Rundung des JXL-Werts ist durch den JXL-Wert mit voller Nachkommastellenzahl ersetzt, - Prismenhöhen (ZH) werden aus der JXL übernommen und in der letzten Spalte als ":ZH:" angehängt. Zusätzlich werden Fehler-Listen erstellt (fehlende IH, fehlende ZH, Standpunkte in CSV aber nicht in JXL). Diese werden dem Benutzer übergeben. :param pfad_datei_csv: Pfad zur Eingabe-CSV (Tachymeterbeobachtungen). :type pfad_datei_csv: str :param pfad_datei_jxl: Pfad zur JXL-Datei. :type pfad_datei_jxl: str :param pfad_datei_csv_out: Pfad zur Ausgabe-CSV (korrigierte Datei). :type pfad_datei_csv_out: str :return: Ergebnisdictionary mit Status und Diagnosen, u. a.: - "Import_fortsetzen" (bool), - "dict_ersetzungen" (dict mit Zählungen für "Hz", "Z", "SD"), - "liste_zeilen_ohne_IH" (list), - "liste_zeilen_ohne_ZH" (list), - "liste_zeilen_standpunkt_nicht_in_jxl" (list), - "pfad_datei_csv_out" (str). :rtype: dict """ # Import_fortsetzen wird False, sobald eine Fehler festgestellt wird. Als Folge wird der Import abgebrochen und eine Fehlermeldung mit Handlungshinweisen für den Benutzer ausgegeben. Import_fortsetzen = True getcontext().prec = 70 dict_ersetzungen = {"Hz": 0, "Z": 0, "SD": 0} # IH = Instrumentenhöhe | ZH = Zielhöhe, bzw. Prismenhöhe liste_zeilen_ohne_IH = [] liste_zeilen_ohne_ZH = [] liste_zeilen_standpunkt_nicht_in_jxl = [] liste_tachymeterstandpunkte = [] dict_stationname_tachymeterstandpunkte = {} dict_standpunkte_anzahl = {} dict_targetID_zu_ZH = {} dict_stationID_zu_seq = {} dict_stationnamen = {} # Vorbereitung jxl-Datei lesen tree = ET.parse(pfad_datei_jxl) root = tree.getroot() # JXL-Datei auslesen if Import_fortsetzen: # Standpunkt, StationID, Instrumentenhöhe aus der JXL-Datei auslesen for sr in root.iter("StationRecord"): stationname = (sr.findtext("StationName") or "").strip() station_id = (sr.attrib.get("ID") or "").strip() ih = (sr.findtext("TheodoliteHeight") or "").strip() if stationname != "" and station_id != "": liste_tachymeterstandpunkte.append((stationname, station_id, ih)) dict_stationnamen[stationname] = 1 if stationname not in dict_stationname_tachymeterstandpunkte: dict_stationname_tachymeterstandpunkte[stationname] = [] dict_stationname_tachymeterstandpunkte[stationname].append((station_id, ih)) for stationname in dict_stationname_tachymeterstandpunkte.keys(): dict_standpunkte_anzahl[stationname] = 0 # Prismenhöhe auslesen und in Dict speichern for tr in root.iter("TargetRecord"): target_id = (tr.attrib.get("ID") or "").strip() zh = (tr.findtext("TargetHeight") or "").strip() if target_id != "": dict_targetID_zu_ZH[target_id] = zh for tupel in liste_tachymeterstandpunkte: station_id = tupel[1] if station_id not in dict_stationID_zu_seq: dict_stationID_zu_seq[station_id] = [] # Horizontal- und Vertikalkreisablesungen, sowie Schrägdistanzablesungen aus JXL-Datei auslesen for pr in root.iter("PointRecord"): station_id = (pr.findtext("StationID") or "").strip() if station_id == "" or station_id not in dict_stationID_zu_seq: continue circle = pr.find("Circle") if circle is None: continue zielpunkt_name = (pr.findtext("Name") or "").strip() target_id = (pr.findtext("TargetID") or "").strip() hz_deg = (circle.findtext("HorizontalCircle") or "").strip() z_deg = (circle.findtext("VerticalCircle") or "").strip() sd_m = (circle.findtext("EDMDistance") or "").strip() if zielpunkt_name == "" or hz_deg == "" or z_deg == "" or sd_m == "": continue stellen_hz = 0 if "." in hz_deg: stellen_hz = len(hz_deg.split(".", 1)[1]) stellen_z = 0 if "." in z_deg: stellen_z = len(z_deg.split(".", 1)[1]) stellen_sd = 0 if "." in sd_m: stellen_sd = len(sd_m.split(".", 1)[1]) # Umrechnung Grad -> gon hz_gon_decimal = Decimal(hz_deg) * (Decimal(10) / Decimal(9)) z_gon_decimal = Decimal(z_deg) * (Decimal(10) / Decimal(9)) q_hz = Decimal("1") if stellen_hz == 0 else Decimal("1." + ("0" * stellen_hz)) q_z = Decimal("1") if stellen_z == 0 else Decimal("1." + ("0" * stellen_z)) q_sd = Decimal("1") if stellen_sd == 0 else Decimal("1." + ("0" * stellen_sd)) hz_gon_decimal = hz_gon_decimal.quantize(q_hz, rounding=ROUND_HALF_UP) z_gon_decimal = z_gon_decimal.quantize(q_z, rounding=ROUND_HALF_UP) sd_decimal = Decimal(sd_m).quantize(q_sd, rounding=ROUND_HALF_UP) # Ausgabe mit Kommatrennung hz_gon_text = format(hz_gon_decimal, "f").replace(".", ",") z_gon_text = format(z_gon_decimal, "f").replace(".", ",") sd_text = format(sd_decimal, "f").replace(".", ",") zh = dict_targetID_zu_ZH.get(target_id, "") # Dictionary mit den Abfragen für die Weiterverarbeitung füllen. dict_stationID_zu_seq[station_id].append({ "target": zielpunkt_name, "hz_gon": hz_gon_text, "z_gon": z_gon_text, "sd_m": sd_text, "zh": zh }) station_id_aktuell = None index_csv_jxl_aktuell = 0 standpunkt_aktuell = None # CSV-Datei zeilenweise durchgehen und ggf. Werte ersetzen if Import_fortsetzen: with (open(pfad_datei_csv, newline="", encoding="utf-8") as fin, open(pfad_datei_csv_out, "w", newline="", encoding="utf-8") as fout): reader = csv.reader(fin, delimiter=";") writer = csv.writer(fout, delimiter=";", lineterminator="\n") for i, row in enumerate(reader): nummer_zeile = i + 1 if len(row) < 4: row = row + [""] * (4 - len(row)) if row[0].strip() != "" and row[1].strip() == "" and row[2].strip() == "" and row[3].strip() == "": standpunkt = row[0].strip() if standpunkt in dict_stationnamen: zaehler = dict_standpunkte_anzahl.get(standpunkt, 0) liste_records = dict_stationname_tachymeterstandpunkte[standpunkt] # Überprüfung, ob in beiden Dateien die selben Tachymeterstandpunkte vorhanden sind if zaehler >= len(liste_records): Import_fortsetzen = False print( f"Der Vorgang wurde abgebrochen: Standpunkt {standpunkt} kommt in der CSV öfter vor als in der JXL.") break station_id, ih = liste_records[zaehler] dict_standpunkte_anzahl[standpunkt] = zaehler + 1 station_id_aktuell = station_id index_csv_jxl_aktuell = 0 standpunkt_aktuell = standpunkt # Erstellen einer Liste mit allen Zeilennummern aus der JXL, für die keine Instrumentenhöhe vorliegt, damit dies als Fehlermeldung mit Bearbeitungshinweisen ausgegeben werden kann. if ih is None or str(ih).strip() == "": liste_zeilen_ohne_IH.append((nummer_zeile, standpunkt)) writer.writerow([standpunkt, f"IH:{ih}", "", "", ""]) continue # Erstellen einer Liste mit allen Zeilennummern aus der JXL, für die keine Prismenhöhe vorliegt, damit dies als Fehlermeldung mit Bearbeitungshinweisen ausgegeben werden kann. if standpunkt.isdigit(): liste_zeilen_standpunkt_nicht_in_jxl.append((nummer_zeile, standpunkt)) writer.writerow(row) continue ist_beobachtung = False if row[0].strip() != "" and row[1].strip() != "" and row[2].strip() != "" and row[3].strip() != "": wert_hz = row[1].split(":ZH:", 1)[0].strip() wert_z = row[2].split(":ZH:", 1)[0].strip() wert_sd = row[3].split(":ZH:", 1)[0].strip() if self.ist_zahl_csv(wert_hz) and self.ist_zahl_csv(wert_z) and self.ist_zahl_csv(wert_sd): ist_beobachtung = True if ist_beobachtung and station_id_aktuell is not None: zielpunkt = row[0].strip() hz_csv = row[1].strip() z_csv = row[2].strip() sd_csv = row[3].strip() liste_seq = dict_stationID_zu_seq.get(station_id_aktuell, []) if liste_seq == []: writer.writerow(row) continue if index_csv_jxl_aktuell >= len(liste_seq): writer.writerow(row) continue # Abfragen der selben Daten aus der JXL-Datei über den index jxl_eintrag = liste_seq[index_csv_jxl_aktuell] index_neu = index_csv_jxl_aktuell + 1 if jxl_eintrag["target"] != zielpunkt: index_ende = min(len(liste_seq), index_csv_jxl_aktuell + 200) liste_kandidaten = [] for index_kandidat in range(index_csv_jxl_aktuell, index_ende): if liste_seq[index_kandidat]["target"] == zielpunkt: liste_kandidaten.append((index_kandidat, liste_seq[index_kandidat])) # Überprüfung, ob die Ablesungen in der CSV-Datei wirklich Rundungen der selben Daten mit mehr Nachkommastellen aus der JXL-Datei sind und abspeichern der Prüfergebnisse. if liste_kandidaten != []: if len(liste_kandidaten) == 1: index_kandidat, kandidat = liste_kandidaten[0] jxl_eintrag = kandidat index_neu = index_kandidat + 1 else: liste_bewertung = [] for index_kandidat, kandidat in liste_kandidaten: score = 0 if self.ist_rundung_von_jxl(hz_csv, kandidat["hz_gon"]): score += 1 if self.ist_rundung_von_jxl(z_csv, kandidat["z_gon"]): score += 1 if self.ist_rundung_von_jxl(sd_csv, kandidat["sd_m"]): score += 1 liste_bewertung.append((score, index_kandidat, kandidat)) liste_bewertung.sort(key=lambda t: (-t[0], t[1])) _, index_best, kandidat_best = liste_bewertung[0] jxl_eintrag = kandidat_best index_neu = index_best + 1 index_csv_jxl_aktuell = index_neu # Nur in der CSV-Datei ersetzen, wenn die CSV-Werte tatsächlich eine Rundung der JXL-Werte sind hz_out = hz_csv z_out = z_csv sd_out = sd_csv if self.ist_rundung_von_jxl(hz_csv, jxl_eintrag["hz_gon"]): hz_out = jxl_eintrag["hz_gon"] dict_ersetzungen["Hz"] += 1 if self.ist_rundung_von_jxl(z_csv, jxl_eintrag["z_gon"]): z_out = jxl_eintrag["z_gon"] dict_ersetzungen["Z"] += 1 if self.ist_rundung_von_jxl(sd_csv, jxl_eintrag["sd_m"]): sd_out = jxl_eintrag["sd_m"] dict_ersetzungen["SD"] += 1 zh = jxl_eintrag.get("zh", "") if zh is None or str(zh).strip() == "": liste_zeilen_ohne_ZH.append((nummer_zeile, standpunkt_aktuell, zielpunkt)) spalte_letzte = f"{sd_out}:ZH:{zh}" if str(zh).strip() != "" else sd_out writer.writerow([zielpunkt, hz_out, z_out, spalte_letzte]) continue writer.writerow(row) if Import_fortsetzen: print(f"Korrektur erfolgreich abgeschlossen. Ausgabe: {pfad_datei_csv_out}") print(f"Ersetzungen in der CSV-Datei (Rundung -> JXL volle Nachkommastellen): {dict_ersetzungen}") # Ausgabe der Zeilennummern in der JXL-Datei ohne Instrumentenhöhe if len(liste_zeilen_ohne_IH) > 0: print("\n--- Fehlende IH in JXL-Datei ---") print(f"Anzahl: {len(liste_zeilen_ohne_IH)}") print(liste_zeilen_ohne_IH) # Ausgabe der Zeilennummern in der JXL-Datei ohne Prismenhöhe if len(liste_zeilen_ohne_ZH) > 0: print("\n--- Fehlende ZH ---") print(f"Anzahl: {len(liste_zeilen_ohne_ZH)}") print(liste_zeilen_ohne_ZH) # Ausgabe der Zeilennummern in der JXL-Datei mit unterschiedlichen Tachymeterstandpunkten im Vergleich zur CSV-Datei if len(liste_zeilen_standpunkt_nicht_in_jxl) > 0: print("\n--- Standpunkt in CSV-Datei, aber nicht in JXL-Datei---") print(f"Anzahl: {len(liste_zeilen_standpunkt_nicht_in_jxl)}") print(liste_zeilen_standpunkt_nicht_in_jxl) else: print("Die Korrektur der CSV-Datei wurde abgebrochen.") return { "Import_fortsetzen": Import_fortsetzen, "dict_ersetzungen": dict_ersetzungen, "liste_zeilen_ohne_IH": liste_zeilen_ohne_IH, "liste_zeilen_ohne_ZH": liste_zeilen_ohne_ZH, "liste_zeilen_standpunkt_nicht_in_jxl": liste_zeilen_standpunkt_nicht_in_jxl, "pfad_datei_csv_out": pfad_datei_csv_out } def import_beobachtungen_tachymeter(self, pfad_datei: str, instrumentenID: int) -> None: """Importiert Tachymeterbeobachtungen aus einer CSV-Datei in die Tabelle Beobachtungen. Die Datei wird blockweise verarbeitet (Standpunktzeile + Beobachtungszeilen). Aus je zwei Halbsätzen wird ein Vollsatz abgeleitet: - Richtung: Mittel aus Hz(HS1) und Hz(HS2-200 gon), normiert in [0, 400), - Zenitwinkel: (ZW1 - ZW2 + 400) / 2, - Schrägdistanz: Mittel der beiden SD. Zusätzlich werden aus Instrumenten- und Prismenhöhe bodenbezogene Werte berechnet (schraegdistanz_bodenbezogen, zenitwinkel_bodenbezogen) über berechne_zenitwinkel_distanz_bodenbezogen(). Vorabprüfungen: - Abbruch, wenn der Dateiname bereits in Beobachtungen vorkommt, - Abbruch, wenn instrumentenID nicht in Tabelle Instrumente existiert, - Plausibilitätsprüfung auf Vollsatz-Struktur (6 Zeilen pro Zielpunkt: 3 Vollsätze * 2 Halbsätze). :param pfad_datei: Pfad zur CSV-Datei. :type pfad_datei: str :param instrumentenID: ID des verwendeten Instruments aus Tabelle Instrumente. :type instrumentenID: int :return: None :rtype: None """ # Prüfen, ob Bereits Daten aus der Datei in der Datenbank vorhanden sind con = sqlite3.connect(self.pfad_datenbank) cursor = con.cursor() liste_dateinamen_in_db = [r[0] for r in cursor.execute( "SELECT DISTINCT dateiname FROM Beobachtungen" ).fetchall()] liste_beobachtungsgruppeID = [r[0] for r in cursor.execute("""SELECT DISTINCT beobachtungsgruppeID FROM Beobachtungen""").fetchall()] liste_instrumentenid = [r[0] for r in cursor.execute("SELECT instrumenteID FROM Instrumente").fetchall()] cursor.close() con.close() # Import_fortsetzen wird False, sobald eine Fehler festgestellt wird. Als Folge wird der Import abgebrochen und eine Fehlermeldung mit Handlungshinweisen für den Benutzer ausgegeben. Import_fortsetzen = True # Abbrechen des Imports, wenn bereits Daten aus der selben Datei importiert wurden. if pfad_datei in liste_dateinamen_in_db: Import_fortsetzen = False if Import_fortsetzen: nummer_zielpunkt = 0 # Abfragen der aktuell höschten Nummer im Attribut beobachtungsgruppeID der Tabelle Beobachtungen try: nummer_beobachtungsgruppeID = max(liste_beobachtungsgruppeID) except: nummer_beobachtungsgruppeID = 0 if nummer_beobachtungsgruppeID is None: nummer_beobachtungsgruppeID = 0 with (open(pfad_datei, "r", encoding="utf-8") as f): liste_fehlerhafte_zeile = [] liste_beobachtungen_vorbereitung = [] for i, zeile in enumerate(f): # Die ersten drei Zeilen der Importdatei müssen überprungen werden if i < 3: continue zeile = zeile.strip().split(";") if len(zeile) < 5: zeile = zeile + [""] * (5 - len(zeile)) # Tachymeterstandpunkt speichern und beobachtungsgruppeID als ID für das einmalige Aufbauen eines Tachymeters auf einem Punkt festlegen if zeile[2] == "" and zeile[3] == "" and zeile[4] == "": nummer_beobachtungsgruppeID += 1 standpunkt = zeile[0] instrumentenhoehe = zeile[1] if instrumentenhoehe.startswith("IH:"): instrumentenhoehe = instrumentenhoehe.split("IH:", 1)[1].strip() # Überprüfung, ob für jeden Beobachteten Zielpunkt 6 Zeilen vorhanden sind (3 Vollszätze mit je 2 Halbsätzen) if nummer_zielpunkt % 6 != 0: liste_fehlerhafte_zeile.append(i) nummer_zielpunkt = 0 liste_zielpunkte_hs = [] liste_zielpunkte_vs2 = [] liste_zielpunkte_vs3 = [] else: # ZH = Prismenhöhe | VS = Vollsatz | HS = Halbsatz # Erstellen einer Liste mit Standpunkt, Vollsatz und Halbsatzzuordnung für die Beobachtungen für die Weiterverarbeitung nummer_zielpunkt += 1 if ":ZH:" in zeile[3]: teil = zeile[3].split(":ZH:", 1) zeile[3] = teil[0].strip() zeile[4] = teil[1].strip() if zeile[0] not in liste_zielpunkte_hs: liste_zielpunkte_hs.append(zeile[0]) if zeile[0] in liste_zielpunkte_vs3: liste_beobachtungen_vorbereitung.append( [nummer_beobachtungsgruppeID, "VS3", "HS1", standpunkt, zeile[0], zeile[1], zeile[2], zeile[3], zeile[4], instrumentenhoehe]) elif zeile[0] in liste_zielpunkte_vs2: liste_beobachtungen_vorbereitung.append( [nummer_beobachtungsgruppeID, "VS2", "HS1", standpunkt, zeile[0], zeile[1], zeile[2], zeile[3], zeile[4], instrumentenhoehe]) else: liste_beobachtungen_vorbereitung.append( [nummer_beobachtungsgruppeID, "VS1", "HS1", standpunkt, zeile[0], zeile[1], zeile[2], zeile[3], zeile[4], instrumentenhoehe]) else: liste_zielpunkte_hs.remove(zeile[0]) if zeile[0] in liste_zielpunkte_vs3: liste_beobachtungen_vorbereitung.append( [nummer_beobachtungsgruppeID, "VS3", "HS2", standpunkt, zeile[0], zeile[1], zeile[2], zeile[3], zeile[4], instrumentenhoehe]) elif zeile[0] in liste_zielpunkte_vs2: if zeile[0] not in liste_zielpunkte_vs3: liste_zielpunkte_vs3.append(zeile[0]) liste_beobachtungen_vorbereitung.append( [nummer_beobachtungsgruppeID, "VS2", "HS2", standpunkt, zeile[0], zeile[1], zeile[2], zeile[3], zeile[4], instrumentenhoehe]) else: if zeile[0] not in liste_zielpunkte_vs2: liste_zielpunkte_vs2.append(zeile[0]) liste_beobachtungen_vorbereitung.append( [nummer_beobachtungsgruppeID, "VS1", "HS2", standpunkt, zeile[0], zeile[1], zeile[2], zeile[3], zeile[4], instrumentenhoehe]) # Ausgabe, welche Zeilen in der Importdatei bearbeitet werden müssen. if liste_fehlerhafte_zeile != []: fehler_zeilen = ", ".join(map(str, liste_fehlerhafte_zeile)) print( f"Das Einlesen der Datei {pfad_datei} wurde abgebrochen.\n" f"Bitte bearbeiten Sie die Zeilen rund um: {fehler_zeilen} in der csv-Datei " f"und wiederholen Sie den Import." ) Import_fortsetzen = False else: print( f"Der Import wurde abgebrochen, weil die Beobachtungen aus der Datei {pfad_datei} bereits in der Datenbank vorhanden sind.") if Import_fortsetzen: liste_beobachtungen_import = [] while len(liste_beobachtungen_vorbereitung) > 0: liste_aktueller_zielpunkt = liste_beobachtungen_vorbereitung[0] aktueller_zielpunkt = liste_aktueller_zielpunkt[4] for index in range(1, len(liste_beobachtungen_vorbereitung)): liste = liste_beobachtungen_vorbereitung[index] # Berechnen der zu importierenden Beobachtungen. (Jeweils reduziert auf den Vollsatz) if liste[4] == aktueller_zielpunkt: richtung1 = self.string_to_decimal(liste_aktueller_zielpunkt[5]) richtung2 = self.string_to_decimal(liste[5]) - Decimal(200) zenitwinkel_vollsatz_gon = (self.string_to_decimal(liste_aktueller_zielpunkt[6]) - self.string_to_decimal( liste[6]) + 400) / 2 zenitwinkel_vollsatz_rad = Berechnungen.Einheitenumrechnung.gon_to_rad_Decimal(zenitwinkel_vollsatz_gon) distanz_vollsatz = (self.string_to_decimal(liste_aktueller_zielpunkt[7]) + self.string_to_decimal( liste[7])) / 2 if richtung2 < 0 and richtung1 != Decimal(0): richtung2 += Decimal(400) elif richtung2 > 400: richtung2 -= Decimal(400) richtung_vollsatz_gon = (richtung1 + richtung2) / 2 richtung_vollsatz_rad = Berechnungen.Einheitenumrechnung.gon_to_rad_Decimal(richtung_vollsatz_gon) if liste_aktueller_zielpunkt[8] == liste[8]: prismenhoehe = liste_aktueller_zielpunkt[8] else: Import_fortsetzen = False print(f"Der Import wurde abgebrochen, weil für zwei Halbsätze vom Standpunkt {liste_aktueller_zielpunkt[3]} zum Zielpunkt {aktueller_zielpunkt} unterschiedliche Prismenhöhen vorliegen. Bitte in der Datei {pfad_datei} korrigieren und Import neustarten.") if liste_aktueller_zielpunkt[9] == liste[9]: instrumentenhoehe_import = liste_aktueller_zielpunkt[9] else: Import_fortsetzen = False print(f"Der Import wurde abgebrochen, weil für zwei Halbsätze vom Standpunkt {liste_aktueller_zielpunkt[3]} zum Zielpunkt {aktueller_zielpunkt} unterschiedliche Instrumentenhöhen vorliegen. Bitte in der Datei {pfad_datei} korrigieren und Import neustarten.") # Umrechnen der Zenitwinkel und Schrägdistanzen auf den Boden unter Verwendung der Instrumenten- und Prismenhöhen schraegdistanz_bodenbezogen, zenitwinkel_bodenbezogen = self.berechnungen.berechne_zenitwinkel_distanz_bodenbezogen( float(zenitwinkel_vollsatz_rad), float(distanz_vollsatz), float(instrumentenhoehe_import), float(prismenhoehe)) liste_beobachtungen_import.append( [liste[0], liste[3], liste[4], richtung_vollsatz_rad, zenitwinkel_vollsatz_rad, distanz_vollsatz, zenitwinkel_bodenbezogen, schraegdistanz_bodenbezogen, instrumentenhoehe_import, prismenhoehe]) del liste_beobachtungen_vorbereitung[index] del liste_beobachtungen_vorbereitung[0] break # Überprüfung, ob das Instrument bereits vom Benutzer angelegt wurde. if instrumentenID not in liste_instrumentenid: Import_fortsetzen = False print( "Der Import wurde abgebrochen. Bitte eine gültige InstrumentenID eingeben. Bei Bedarf ist das Instrument neu anzulegen.") # Berechnete bodenbezogene Beobachtungen, welche jeweils auf den Vollsatz reduziert sind, in die Tabelle Beobachtungen importieren. if Import_fortsetzen: con = sqlite3.connect(self.pfad_datenbank) cursor = con.cursor() for beobachtung_import in liste_beobachtungen_import: cursor.execute( "INSERT INTO Beobachtungen (punktnummer_sp, punktnummer_zp, instrumenteID, beobachtungsgruppeID, tachymeter_richtung, tachymeter_zenitwinkel_roh, tachymeter_distanz_roh, dateiname, tachymeter_instrumentenhoehe, tachymeter_prismenhoehe, tachymeter_zenitwinkel, tachymeter_distanz) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", (beobachtung_import[1], beobachtung_import[2], instrumentenID, beobachtung_import[0], float(beobachtung_import[3]), float(beobachtung_import[4]), float(beobachtung_import[5]), pfad_datei, float(beobachtung_import[8]), float(beobachtung_import[9]), float(beobachtung_import[6]), float(beobachtung_import[7]))) con.commit() cursor.close() con.close() print(f"Der Import der Datei {pfad_datei} wurde erfolgreich abgeschlossen.") def vorbereitung_import_beobachtungen_nivellement_naeherung_punkthoehen(self, pfad_datei: str, instrumentenID: int) -> None | tuple[None, None] | tuple[dict[Any, Any], list[Any]]: """Bereitet den Import von Nivellementdaten zur Ableitung von Näherungs-Normalhöhen vor. Aus einer semikolon-separierten Datei werden Zeilen erkannt, die berechnete Zielweiten/Punkthöhen enthalten, und daraus pro Punktnummer alle gefundenen Z-Werte gesammelt. Anschließend wird je Punktnummer der Mittelwert berechnet (Rundung auf 6 Nachkommastellen). Danach wird geprüft, welche dieser Punktnummern bereits in der Tabelle Netzpunkte existieren. Abbruchbedingungen: - Dateiname bereits in Beobachtungen vorhanden, - instrumentenID nicht in Instrumente vorhanden. :param pfad_datei: Pfad zur Importdatei. :type pfad_datei: str :param instrumentenID: ID des verwendeten Niv-Instruments aus der Tabelle Instrumente. :type instrumentenID: int :return: (dict_punkt_mittelwert_punkthoehen, liste_punktnummern_in_db) oder (None, None) bei Abbruch. :rtype: None | tuple[None, None] | tuple[dict[Any, Any], list[Any]] """ # Prüfen, ob bereits Daten aus der Datei in der Datenbank vorhanden sind con = sqlite3.connect(self.pfad_datenbank) cursor = con.cursor() liste_dateinamen_in_db = [r[0] for r in cursor.execute( "SELECT DISTINCT dateiname FROM Beobachtungen" ).fetchall()] liste_instrumentenid = [r[0] for r in cursor.execute("SELECT instrumenteID FROM Instrumente").fetchall()] liste_netzpunkte = [r[0] for r in cursor.execute("SELECT punktnummer FROM Netzpunkte").fetchall()] cursor.close() con.close() # Import_fortsetzen wird False, sobald eine Fehler festgestellt wird. Als Folge wird der Import abgebrochen und eine Fehlermeldung mit Handlungshinweisen für den Benutzer ausgegeben. Import_fortsetzen = True # Abbruch des Imports, wenn bereits Daten aus der Datei importiert wurden. if pfad_datei in liste_dateinamen_in_db: Import_fortsetzen = False print(f"Der Import wurde abgebrochen, weil die Beobachtungen aus der Datei {pfad_datei} bereits in der Datenbank vorhanden sind.") return None, None # Abbruch, wenn das Instrument noch nicht vom Benutzer angelegt wurde. if instrumentenID not in liste_instrumentenid: Import_fortsetzen = False print( "Der Import wurde abgebrochen. Bitte eine gültige InstrumentenID eingeben. Bei Bedarf ist das Instrument neu anzulegen.") return None, None if Import_fortsetzen: # Messwerte aus der Importdatei abfragen muster_berechnete_zielweiten = "| | |Z " dict_punkt_alle_punkthoehen = {} dict_punkt_mittelwert_punkthoehen = {} with open(pfad_datei, newline="", encoding="utf-8") as csvfile: r = csv.reader(csvfile, delimiter=";") for i, row in enumerate(r): if any(muster_berechnete_zielweiten in feld for feld in row): zeile = " ".join([str(feld) for feld in row]) punktnummer = None if "KD1" in zeile: teil = zeile.split("KD1", 1)[1].strip() if teil != "": punktnummer = teil.split()[0] wert_z = None if "|Z" in zeile: teil = zeile.split("|Z", 1)[1].strip() if teil != "": wert_z = self.string_to_float(teil.split()[0]) if punktnummer is not None and wert_z is not None: if punktnummer not in dict_punkt_alle_punkthoehen: dict_punkt_alle_punkthoehen[punktnummer] = [] dict_punkt_alle_punkthoehen[punktnummer].append(wert_z) for punktnummer, liste_z in dict_punkt_alle_punkthoehen.items(): # Hier wird auf 6 Nachkommastellen gerundet! dict_punkt_mittelwert_punkthoehen[punktnummer] = round(sum(liste_z) / len(liste_z),6) # Erstellen einer Liste mit allen Punktnummern, die in der Datenbank vorliegen und für die Nivellementbeobachtungen vorliegen. if Import_fortsetzen: # Ausgabe, welche Niv-Punkte bereits in der Tabelle Netzpunkte enthalten sind liste_punktnummern_nivellement = dict_punkt_mittelwert_punkthoehen.keys() liste_punktnummern_in_db = [] liste_punktnummern_nicht_in_db = [] for punktnummer in liste_punktnummern_nivellement: if punktnummer in liste_netzpunkte: liste_punktnummern_in_db.append(punktnummer) else: liste_punktnummern_nicht_in_db.append(punktnummer) # Es werden nur Höhendifferenzen für Punkte berechnet, für die Näherungskoordinaten in der Datenbank vorliegen. if Import_fortsetzen: print(f"Für folgende Nivellementpunkte werden die Höhen in der Ausgleichung berechnet: {liste_punktnummern_in_db}\nFür folgende Punkte wird aktuell keine Höhe in der Ausgleichung berechnet: {liste_punktnummern_nicht_in_db}. Bei Bedarf im folgenden Schritt ändern!") return dict_punkt_mittelwert_punkthoehen, liste_punktnummern_in_db def import_beobachtungen_nivellement_naeherung_punkthoehen(self, dict_punkt_mittelwert_punkthoehen: dict, liste_punktnummern_in_db: list, liste_punktnummern_hinzufuegen: list) -> str | None: """Importiert Näherungsnormalhöhen in die Tabelle Netzpunkte. Es werden Listen geführt für neu hinzugefügte, bereits vorhandene und geänderte Punkte und abschließend als Konsolen-Ausgabe ausgegeben. :param dict_punkt_mittelwert_punkthoehen: Dictionary {punktnummer: normalhoehe} als Mittelwerte. :type dict_punkt_mittelwert_punkthoehen: dict :param liste_punktnummern_in_db: Punktnummern, die bereits in Netzpunkte existieren. :type liste_punktnummern_in_db: list :param liste_punktnummern_hinzufuegen: Zusätzliche Punktnummern, die ebenfalls übernommen werden sollen. :type liste_punktnummern_hinzufuegen: list :return: Status-String mit der Liste der Punkte, für die Normalhöhen in der Ausgleichung verfügbar sind, oder None bei Abbruch. :rtype: str | None """ # Import_fortsetzen wird False, sobald eine Fehler festgestellt wird. Als Folge wird der Import abgebrochen und eine Fehlermeldung mit Handlungshinweisen für den Benutzer ausgegeben. Import_fortsetzen = True if dict_punkt_mittelwert_punkthoehen == None or liste_punktnummern_in_db == None or liste_punktnummern_hinzufuegen == None: Import_fortsetzen = False print("Der Import der Nivellementbeobachtungen wurde abgebrochen.") return None con = sqlite3.connect(self.pfad_datenbank) cursor = con.cursor() liste_punkte_neu_hinzugefuegt = [] liste_punkte_bereits_vorhanden = [] liste_punkte_geaendert = [] for punktnummer in liste_punktnummern_hinzufuegen: # Neu anlegen oder aktualisieren der Normalhöhen in der Tabelle Netzpunkte try: cursor.execute(f"INSERT INTO Netzpunkte (punktnummer, normalhoehe_hfp) VALUES (?, ?)", (punktnummer, dict_punkt_mittelwert_punkthoehen[punktnummer])) liste_punkte_neu_hinzugefuegt.append(punktnummer) except sqlite3.IntegrityError: liste_punkte_bereits_vorhanden.append(punktnummer) cursor.execute( "UPDATE Netzpunkte SET normalhoehe_hfp = ? WHERE punktnummer = ?", (dict_punkt_mittelwert_punkthoehen[punktnummer], punktnummer) ) liste_punkte_geaendert.append(punktnummer) for punktnummer in liste_punktnummern_in_db: cursor.execute(f"UPDATE Netzpunkte SET normalhoehe_hfp = ? WHERE punktnummer = ?", (dict_punkt_mittelwert_punkthoehen[punktnummer], punktnummer)) liste_punkte_geaendert.append(punktnummer) con.commit() cursor.close() con.close() print(f"Neu hinzugefügt ({len(liste_punkte_neu_hinzugefuegt)}): {liste_punkte_neu_hinzugefuegt}") print(f"Bereits vorhanden ({len(liste_punkte_bereits_vorhanden)}): {liste_punkte_bereits_vorhanden}") print(f"Geändert ({len(liste_punkte_geaendert)}): {liste_punkte_geaendert}\n") return f"Für folgende Punkte werden die Höhen Ausgeglichen: {liste_punktnummern_in_db}" def import_beobachtungen_nivellement_RVVR(self, pfad_datei: str, instrumentenID: int) -> str | None: """Importiert geometrische Nivellementbeobachtungen nach dem RVVR Prinzip die Tabelle Beobachtungen. Es werden Zeilen mit Rück-/Vormessungen erkannt (rvvr: Rück, Vor, Vor, Rück). Pro Block werden dh und Entfernung berechnet (Rundung jeweils auf 8 Nachkommastellen). Anschließend werden Züge reduziert, indem Wechselpunkte, die nicht in Netzpunkte vorhanden sind, durch Summation herausgekürzt werden. Damit werden nur Beobachtungen zwischen Punkten in Netzpunkte (bzw. reduzierten Start-/Zielpunkten) in Beobachtungen gespeichert. Abbruchbedingungen: - Dateiname bereits in Beobachtungen vorhanden, - instrumentenID nicht in Instrumente vorhanden, - Anzahl RVVR-Zeilen nicht durch 4 teilbar. :param pfad_datei: Pfad zur Textdatei. :type pfad_datei: str :param instrumentenID: ID des verwendeten Niv-Instruments aus der Tabelle Instrumente. :type instrumentenID: int :return: Status-String bei Erfolg oder None bei Abbruch. :rtype: str | None """ # Prüfen, ob Bereits Daten aus der Datei in der Datenbank vorhanden sind con = sqlite3.connect(self.pfad_datenbank) cursor = con.cursor() liste_dateinamen_in_db = [r[0] for r in cursor.execute( "SELECT DISTINCT dateiname FROM Beobachtungen" ).fetchall()] liste_instrumentenid = [r[0] for r in cursor.execute("SELECT instrumenteID FROM Instrumente").fetchall()] liste_netzpunkte = [r[0] for r in cursor.execute("SELECT punktnummer FROM Netzpunkte").fetchall()] cursor.close() con.close() # Import_fortsetzen wird False, sobald eine Fehler festgestellt wird. Als Folge wird der Import abgebrochen und eine Fehlermeldung mit Handlungshinweisen für den Benutzer ausgegeben. Import_fortsetzen = True # Import abbrechen, wenn bereits Daten aus der selben Datei importiert wurden if pfad_datei in liste_dateinamen_in_db: Import_fortsetzen = False print(f"Der Import wurde abgebrochen, weil die Beobachtungen aus der Datei {pfad_datei} bereits in der Datenbank vorhanden sind.") # Import abbrechen, wenn das Instrument noch nicht vom Benutzer angelegt wurde. if instrumentenID not in liste_instrumentenid: Import_fortsetzen = False print( "Der Import wurde abgebrochen. Bitte eine gültige InstrumentenID eingeben. Bei Bedarf ist das Instrument neu anzulegen.") if Import_fortsetzen: # rvvr = Rück, Vor, Vor, Rück # Berechnen der Höhenunterschiede delta H und Schrägstrecken zwischen Zwei Punkten gemäß der Importdatei aus rvvr. anzahl_zeilen_rvvr = 0 liste_zeilen_rvvr = [] liste_punktpaare = [] with open(pfad_datei, "r", encoding="utf-8") as f: for i, zeile in enumerate(f): if ("Lr" in zeile) or ("Lv" in zeile): liste_zeilen_rvvr.append(zeile) anzahl_zeilen_rvvr += 1 if anzahl_zeilen_rvvr % 4 == 0: index = 0 while index < len(liste_zeilen_rvvr): block_4 = liste_zeilen_rvvr[index:index + 4] liste_punktnummern_block = [] for zeile_block in block_4: punktnummer = None if "|KD1" in zeile_block: teil = zeile_block.split("|KD1", 1)[1].strip() if teil != "": punktnummer = teil.split()[0] if punktnummer is not None: if punktnummer not in liste_punktnummern_block: liste_punktnummern_block.append(punktnummer) r1 = None v1 = None v2 = None r2 = None e_1 = None e_2 = None e_3 = None e_4 = None zugnummer = None if "|Lr" in block_4[0]: teil = block_4[0].split("|Lr", 1)[0].strip() if teil != "": zugnummer = int(teil.split()[-1]) teil = block_4[0].split("Lr", 1)[1].strip() if "Lr" in block_4[0] else block_4[0].split("Lv", 1)[ 1].strip() r1 = self.string_to_float(teil.split()[0]) teil = block_4[1].split("Lr", 1)[1].strip() if "Lr" in block_4[1] else block_4[1].split("Lv", 1)[ 1].strip() v1 = self.string_to_float(teil.split()[0]) teil = block_4[2].split("Lr", 1)[1].strip() if "Lr" in block_4[2] else block_4[2].split("Lv", 1)[ 1].strip() v2 = self.string_to_float(teil.split()[0]) teil = block_4[3].split("Lr", 1)[1].strip() if "Lr" in block_4[3] else block_4[3].split("Lv", 1)[ 1].strip() r2 = self.string_to_float(teil.split()[0]) if "|E" in block_4[0]: teil = block_4[0].split("|E", 1)[1].strip() if teil != "": e_1 = self.string_to_float(teil.split()[0]) if "|E" in block_4[1]: teil = block_4[1].split("|E", 1)[1].strip() if teil != "": e_2 = self.string_to_float(teil.split()[0]) if "|E" in block_4[2]: teil = block_4[2].split("|E", 1)[1].strip() if teil != "": e_3 = self.string_to_float(teil.split()[0]) if "|E" in block_4[3]: teil = block_4[3].split("|E", 1)[1].strip() if teil != "": e_4 = self.string_to_float(teil.split()[0]) # Achtung: Hier Rundung auf 8 Nachkommastellen dh = round((r1 - v1 - v2 + r2) / 2, 8) entfernung = round((e_1 + e_2 + e_3 + e_4) / 2, 8) liste_punktpaare.append((zugnummer, liste_punktnummern_block[0], liste_punktnummern_block[1], dh, entfernung)) index += 4 # Berechnen der Höhendifferenzen und schrägstrecken zwischen Punkten in der Tabelle Netzpunkte. # Somit werden alle Wechselpunkte nicht in der Ausgleichung berücksichtigt. # Die Berechnung erfolgt durch addition. liste_beobachtungen_reduziert = [] liste_beobachtungen_bearbeitung = [] zugnummer_vorher = liste_punktpaare[0][0] for einzelbeobachtung in liste_punktpaare: zugnummer = einzelbeobachtung[0] if zugnummer == zugnummer_vorher: if einzelbeobachtung[1] in liste_netzpunkte and einzelbeobachtung[2] in liste_netzpunkte: liste_beobachtungen_reduziert.append(einzelbeobachtung + (1,)) elif einzelbeobachtung[1] in liste_netzpunkte and einzelbeobachtung[2] not in liste_netzpunkte: liste_beobachtungen_bearbeitung.append(einzelbeobachtung) elif einzelbeobachtung[1] not in liste_netzpunkte and einzelbeobachtung[2] in liste_netzpunkte: liste_beobachtungen_bearbeitung.append(einzelbeobachtung) startpunkt = None zielpunkt = None summe_dh = None summe_entfernung = None anzahl_standpunkte = 1 for i, beobachtung_bearbeiten in enumerate(liste_beobachtungen_bearbeitung): if i == 0: startpunkt = beobachtung_bearbeiten[1] summe_dh = beobachtung_bearbeiten[3] summe_entfernung = beobachtung_bearbeiten[4] anzahl_standpunkte = 1 else: zielpunkt = beobachtung_bearbeiten[2] summe_dh += beobachtung_bearbeiten[3] summe_entfernung += beobachtung_bearbeiten[4] anzahl_standpunkte += 1 # Achtung:Hier Rundung auf 8 Nachkommastellen! liste_beobachtungen_reduziert.append( (zugnummer, startpunkt, zielpunkt, round(summe_dh, 8), round(summe_entfernung, 8), anzahl_standpunkte)) liste_beobachtungen_bearbeitung = [] else: liste_beobachtungen_bearbeitung.append(einzelbeobachtung) else: if einzelbeobachtung[1] in liste_netzpunkte and einzelbeobachtung[2] in liste_netzpunkte: liste_beobachtungen_reduziert.append(einzelbeobachtung + (1,)) elif einzelbeobachtung[1] in liste_netzpunkte and einzelbeobachtung[2] not in liste_netzpunkte: liste_beobachtungen_bearbeitung.append(einzelbeobachtung) elif einzelbeobachtung[1] not in liste_netzpunkte and einzelbeobachtung[2] in liste_netzpunkte: liste_beobachtungen_bearbeitung.append(einzelbeobachtung) startpunkt = None zielpunkt = None summe_dh = None summe_entfernung = None anzahl_standpunkte = 1 for i, beobachtung_bearbeiten in enumerate(liste_beobachtungen_bearbeitung): if i == 0: startpunkt = beobachtung_bearbeiten[1] summe_dh = beobachtung_bearbeiten[3] summe_entfernung = beobachtung_bearbeiten[4] anzahl_standpunkte = 1 else: zielpunkt = beobachtung_bearbeiten[2] summe_dh += beobachtung_bearbeiten[3] summe_entfernung += beobachtung_bearbeiten[4] anzahl_standpunkte += 1 #Achtung:Hier Rundung auf 8 Nachkommastellen! liste_beobachtungen_reduziert.append( (zugnummer, startpunkt, zielpunkt, round(summe_dh,8), round(summe_entfernung,8), anzahl_standpunkte)) liste_beobachtungen_bearbeitung = [] else: liste_beobachtungen_bearbeitung.append(einzelbeobachtung) zugnummer_vorher = zugnummer con = sqlite3.connect(self.pfad_datenbank) cursor = con.cursor() for beobachtung_reduziert in liste_beobachtungen_reduziert: cursor.execute(f"INSERT INTO Beobachtungen (punktnummer_sp, punktnummer_zp, instrumenteID, niv_dh, niv_strecke, niv_anz_standpkte, dateiname) VALUES (?, ?, ?, ?, ?, ?, ?)", (beobachtung_reduziert[1], beobachtung_reduziert[2], instrumentenID, beobachtung_reduziert[3], beobachtung_reduziert[4], beobachtung_reduziert[5], pfad_datei)) con.commit() cursor.close() con.close() return f"Die Beobachtungen aus der Datei {pfad_datei} wurden erfolgreich importiert." else: print(f"Anzahl nicht RVVR durch 4 teilbar. Bitte die Datei {pfad_datei} überprüfen! Der Import wurde abgebrochen.") Import_fortsetzen = False def import_koordinaten_gnss(self, pfad_datei: str, liste_sapos_stationen_genauigkeiten: list) -> str: """Importiert GNSS-Koordinaten (ECEF) in die Tabelle Netzpunkte. Die Datei wird semikolon-separiert gelesen und zusätzlich aufgesplittet (Leerzeichen und Kommas). Für Referenzstationen (Kennzeichnung "Referenz" und Standardabweichungen 0.0000/0.0000/0.0000) werden die übergebenen Genauigkeiten (X/Y/Z) eingesetzt. Die Koordinaten und Standardabweichungen werden in Netzpunkte geschrieben. :param pfad_datei: Pfad zur Koordinatendatei. :type pfad_datei: str :param liste_sapos_stationen_genauigkeiten: Liste mit drei Werten [σX, σY, σZ] für Referenzstationen. :type liste_sapos_stationen_genauigkeiten: list :return: Statusmeldung zum erfolgreichen Import. :rtype: str """ con = sqlite3.connect(self.pfad_datenbank) cursor = con.cursor() with (open(pfad_datei, newline="", encoding="utf-8") as csvfile): r = csv.reader(csvfile, delimiter = ";") for i, row in enumerate(r): row_neu = [] for eintrag in row: eintrag = str(eintrag).strip() eintrag = eintrag.replace("'", "") aufgeteilt = eintrag.split() for teil in aufgeteilt: teil = teil.split(",") row_neu.extend(teil) if row_neu[1] == 'Referenz' and row_neu[7] == '0.0000' and row_neu[8] == '0.0000' and row_neu[9] == '0.0000': row_neu[7] = liste_sapos_stationen_genauigkeiten[0] row_neu[8] = liste_sapos_stationen_genauigkeiten[1] row_neu[9] = liste_sapos_stationen_genauigkeiten[2] cursor.execute(f"""INSERT INTO Netzpunkte (punktnummer, naeherungx_us, naeherungy_us, naeherungz_us, stabw_vorinfo_x, stabw_vorinfo_y, stabw_vorinfo_z) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT (punktnummer) DO UPDATE SET naeherungx_us = excluded.naeherungx_us, naeherungy_us = excluded.naeherungy_us, naeherungz_us = excluded.naeherungz_us, stabw_vorinfo_x = excluded.stabw_vorinfo_x, stabw_vorinfo_y = excluded.stabw_vorinfo_y, stabw_vorinfo_z = excluded.stabw_vorinfo_z""", (row_neu[0], row_neu[4], row_neu[5], row_neu[6], row_neu[7], row_neu[8], row_neu[9]) ) con.commit() con.close() return "Import der Koordinaten aus stationärem GNSS abgeschlossen." def import_basislinien_gnss(self, pfad_datei: str, instrumentenID: int) -> None: """Importiert GNSS-Basislinien inkl. Kovarianzen in die Tabelle Beobachtungen. Die Datei wird zeilenweise gelesen und Basislinien werden über Präfixe erkannt: - @+ : Ziel-/Punktkennung (ID), - @- : Gegenpunktkennung und Komponenten (bx/by/bz), - @= : s0 und Kovarianzterme (cxx, cxy, cxz, cyy, cyz, czz). Vorabprüfungen: - Abbruch, wenn der Dateiname bereits in Beobachtungen vorkommt, - Abbruch, wenn instrumentenID nicht in Instrumente existiert. :param pfad_datei: Pfad zur Basisliniendatei (Text). :type pfad_datei: str :param instrumentenID: ID des verwendeten GNSS-Instruments (FK auf Instrumente). :type instrumentenID: int :return: None :rtype: None """ # Import_fortsetzen wird False, sobald eine Fehler festgestellt wird. Als Folge wird der Import abgebrochen und eine Fehlermeldung mit Handlungshinweisen für den Benutzer ausgegeben. Import_fortsetzen = True # Prüfen, ob Bereits Daten aus der Datei in der Datenbank vorhanden sind con = sqlite3.connect(self.pfad_datenbank) cursor = con.cursor() liste_dateinamen_in_db = [r[0] for r in cursor.execute( "SELECT DISTINCT dateiname FROM Beobachtungen" ).fetchall()] if pfad_datei in liste_dateinamen_in_db: Import_fortsetzen = False # Import abbrechen, wenn das Instrument noch nicht vom Benutzer angelegt wurde. liste_instrumentenid = [r[0] for r in cursor.execute("SELECT instrumenteID FROM Instrumente").fetchall()] cursor.close() con.close() if instrumentenID not in liste_instrumentenid: Import_fortsetzen = False print( "Der Import wurde abgebrochen. Bitte eine gültige InstrumentenID eingeben. Bei Bedarf ist das Instrument neu anzulegen.") if Import_fortsetzen: liste_basilinien = [] tupel_basislinie = () with (open(pfad_datei, "r", encoding="utf-8") as txt): for i, zeile in enumerate(txt): zeile = str(zeile).rstrip("\n").rstrip("\r") aufgeteilt = zeile.split() if aufgeteilt[0][:2] == "@+": tupel_basislinie += (aufgeteilt[0][2:],) if aufgeteilt[0][:2] == "@-": tupel_basislinie += (aufgeteilt[0][2:], aufgeteilt[1], aufgeteilt[2], aufgeteilt[3],) if aufgeteilt[0][:2] == "@=": tupel_basislinie += (aufgeteilt[1], aufgeteilt[2], aufgeteilt[3], aufgeteilt[4], aufgeteilt[5], aufgeteilt[6], aufgeteilt[7], ) liste_basilinien.append(tupel_basislinie) tupel_basislinie = () else: print( f"Der Import wurde abgebrochen, weil die Beobachtungen aus der Datei {pfad_datei} bereits in der Datenbank vorhanden sind.") if Import_fortsetzen: con = sqlite3.connect(self.pfad_datenbank) cursor = con.cursor() for basislinie in liste_basilinien: cursor.execute( "INSERT INTO Beobachtungen (punktnummer_sp, punktnummer_zp, gnss_bx, gnss_by, gnss_bz, gnss_s0, gnss_cxx, gnss_cxy, gnss_cxz, gnss_cyy, gnss_cyz, gnss_czz, dateiname, instrumenteID) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", (basislinie[0], basislinie[1], float(basislinie[2]), float(basislinie[3]), float(basislinie[4]), float(basislinie[5]), float(basislinie[6]), float(basislinie[7]), float(basislinie[8]), float(basislinie[9]), float(basislinie[10]), float(basislinie[11]), pfad_datei, instrumentenID)) con.commit() cursor.close() con.close() print(f"Der Import der Datei {pfad_datei} wurde erfolgreich abgeschlossen.")