Files
Masterprojekt_V3/Import.py
2026-02-06 18:18:37 +01:00

1299 lines
67 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import csv
from decimal import Decimal, getcontext, ROUND_HALF_UP
import sqlite3
from typing import Any
import xml.etree.ElementTree as ET
from Berechnungen import Berechnungen
import Berechnungen
from Einheitenumrechnung import Einheitenumrechnung
class Import:
"""Importfunktionen für Messdaten und Näherungswerte in die SQLite-Datenbank.
Die Klasse stellt Methoden zur Verfügung für:
- Import von Näherungskoordinaten (LH-Tachymeter, GNSS/ECEF) in die Tabelle Netzpunkte,
- Import und Vorverarbeitung von Tachymeterbeobachtungen (CSV-Datei) inkl. optionaler Korrektur durch nicht gerundete Leica-JXL,
- Import von GNSS-Basislinien inkl. Kovarianzanteilen in die Tabelle Beobachtungen,
- Import von Nivellementdaten (Normalhöhen als Näherungen sowie RVVR-Züge als dh-Beobachtungen),
- einfache Plausibilitätsprüfungen (Duplikate, InstrumentenID, Dateiname bereits importiert).
"""
def __init__(self, pfad_datenbank: str, a : float, b : float) -> None:
"""Initialisiert die Importklasse.
Speichert den Pfad zur SQLite-Datenbank und initialisiert die Hilfsklasse Berechnungen.
:param a: Große Halbachse a des Referenzellipsoids in Meter.
:type a: float
:param b: Kleine Halbachse b des Referenzellipsoids in Meter.
:type b: float
:param pfad_datenbank: Pfad zur SQLite-Datenbank.
:type pfad_datenbank: str
:return: None
:rtype: None
"""
self.a = a
self.b = b
self.pfad_datenbank = pfad_datenbank
self.berechnungen = Berechnungen.Berechnungen(self.a, self.b)
def string_to_float(self, zahl: str) -> float:
"""Konvertiert einen Zahlenstring in float.
Ersetzt das Dezimaltrennzeichen "," durch "." und führt anschließend float() aus.
:param zahl: Zahlenstring (z. B. "12,345" oder "12.345").
:type zahl: str
:return: Zahlenwert als float.
:rtype: float
"""
zahl = zahl.replace(',', '.')
return float(zahl)
def string_to_decimal(self, zahl: str) -> Decimal:
"""Konvertiert einen Zahlenstring in Decimal.
Ersetzt das Dezimaltrennzeichen "," durch "." und erzeugt anschließend ein Decimal.
:param zahl: Zahlenstring (z. B. "12,345" oder "12.345").
:type zahl: str
:return: Konvertierter Decimal-Wert.
:rtype: Decimal
"""
zahl = zahl.replace(',', '.')
return Decimal(zahl)
def import_koordinaten_lh_tachymeter(self, pfad_datei: str) -> None:
"""Importiert Näherungskoordinaten im lokalen Horizontsystem des Tachymeters in die Tabelle Netzpunkte.
Die Datei wird semikolon-separiert erwartet. Pro Zeile werden Punktnummer sowie X/Y/Z-Näherung
ausgelesen und in Netzpunkte (naeherungx_lh, naeherungy_lh, naeherungz_lh) geschrieben.
Vor dem Import werden zwei zentrale Prüfungen durchgeführt:
- Punktnummern dürfen in der Importdatei nicht doppelt vorkommen,
- Punktnummern aus der Importdatei dürfen noch nicht in der Tabelle Netzpunkte vorhanden sein.
:param pfad_datei: Pfad zur CSV-Datei mit Punktnummer und Koordinaten.
:type pfad_datei: str
:return: None
:rtype: None
"""
liste_punktnummern = []
liste_punktnummern_vorher = []
liste_punktnummern_vorher_db = []
# Import_abbrechen wird True, sobald eine Fehler festgestellt wird. Als Folge wird der Import abgebrochen und eine Fehlermeldung mit Handlungshinweisen für den Benutzer ausgegeben.
Import_abbrechen = False
with open (pfad_datei, newline='', encoding='utf-8') as csvfile:
# Abfragen aller in der Tabelle Netzpunkte enthaltenen Punktnummern
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
liste_punktnummern_db = [r[0] for r in cursor.execute("SELECT DISTINCT punktnummer FROM Netzpunkte").fetchall()]
cursor.close()
con.close()
r = csv.reader(csvfile, delimiter=';')
for row in r:
liste_punktnummern.append(row[0])
# Abbruch des Imports und Ausgabe einer Fehlermeldung, wenn ein Punkt doppelt in der Importdatei vorhanden ist.
if row[0] in liste_punktnummern_vorher:
Import_abbrechen = True
print(f"❌ Der Import wurde abgebrochen, weil in der Datei {pfad_datei} Punktnummern doppelt vorhanden sind. Bitte in der Datei ändern und Import wiederholen.")
break
liste_punktnummern_vorher.append(row[0])
# Abbruch des Imports und Ausgabe einer Fehlermeldung, wenn mindestens eine Punktnummer aus der Importdatei bereits in der Tabelle Netzpunkte vorhanden ist.
if row[0] in liste_punktnummern_db:
Import_abbrechen = True
print(f"❌ Der Import wurde abgebrochen, weil mindestens ein Teil der Punktnummern aus der Datei {pfad_datei} bereits in der Datenbank vorhanden ist. Bitte in der Datei ändern und Import wiederholen.")
break
liste_punktnummern_vorher_db.append(row[0])
# Import durchführen, wenn keine Fehler festgestellt wurden
if Import_abbrechen == False:
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
with open(pfad_datei, newline='', encoding='utf-8') as csvfile:
r = csv.reader(csvfile, delimiter=';')
for row in r:
cursor.execute(
"INSERT INTO Netzpunkte (punktnummer, naeherungx_lh, naeherungy_lh, naeherungz_lh) VALUES (?, ?, ?, ?)",
(row[0], self.string_to_float(row[1]), self.string_to_float(row[2]), self.string_to_float(row[3])))
con.commit()
cursor.close()
con.close()
print("✅ Der Import der Näherungskoordinaten wurde erfolgreich abgeschlossen")
def ist_rundung_von_jxl(self, wert_csv: str, wert_jxl: str) -> bool:
"""Prüft, ob ein CSV-Wert eine Rundung eines JXL-Wertes (mit mehr Nachkommastellen) ist.
Es wird geprüft, ob:
- beide Werte echte Zahlen sind,
- der CSV-Wert weniger Nachkommastellen als der JXL-Wert besitzt,
- der JXL-Wert auf die Nachkommastellenanzahl des CSV-Wertes gerundet exakt dem CSV-Wert entspricht.
:param wert_csv: Wert aus der CSV-Datei.
:type wert_csv: str
:param wert_jxl: Wert aus der JXL-Datei.
:type wert_jxl: str
:return: True, wenn wert_csv eine Rundung von wert_jxl auf weniger Nachkommastellen darstellt, sonst False.
:rtype: bool
"""
wert_csv = str(wert_csv).strip()
wert_jxl = str(wert_jxl).strip()
if ":ZH:" in wert_csv:
wert_csv = wert_csv.split(":ZH:", 1)[0].strip()
if ":ZH:" in wert_jxl:
wert_jxl = wert_jxl.split(":ZH:", 1)[0].strip()
def ist_zahl(text: str) -> bool:
text = str(text).strip()
if text == "":
return False
if text[0] == "+" or text[0] == "-":
text = text[1:]
if text == "":
return False
if text.count(",") + text.count(".") > 1:
return False
if "," in text:
teile = text.split(",", 1)
if len(teile) != 2:
return False
if teile[0] == "" or teile[1] == "":
return False
return teile[0].isdigit() and teile[1].isdigit()
if "." in text:
teile = text.split(".", 1)
if len(teile) != 2:
return False
if teile[0] == "" or teile[1] == "":
return False
return teile[0].isdigit() and teile[1].isdigit()
return text.isdigit()
if ist_zahl(wert_csv) == False:
return False
if ist_zahl(wert_jxl) == False:
return False
# Ermittlung der Anzahl Nachkommastellen in CSV-Datei und JXL-Datei
anzahl_nachkommastellen_csv = 0
if "," in wert_csv:
anzahl_nachkommastellen_csv = len(wert_csv.split(",", 1)[1])
elif "." in wert_csv:
anzahl_nachkommastellen_csv = len(wert_csv.split(".", 1)[1])
anzahl_nachkommastellen_jxl = 0
if "," in wert_jxl:
anzahl_nachkommastellen_jxl = len(wert_jxl.split(",", 1)[1])
elif "." in wert_jxl:
anzahl_nachkommastellen_jxl = len(wert_jxl.split(".", 1)[1])
if anzahl_nachkommastellen_csv >= anzahl_nachkommastellen_jxl:
return False
wert_csv_decimal = self.string_to_decimal(wert_csv.replace(".", ","))
wert_jxl_decimal = self.string_to_decimal(wert_jxl.replace(".", ","))
q = Decimal("1") if anzahl_nachkommastellen_csv == 0 else Decimal("1." + ("0" * anzahl_nachkommastellen_csv))
wert_jxl_gerundet = wert_jxl_decimal.quantize(q, rounding=ROUND_HALF_UP)
wert_csv_gerundet = wert_csv_decimal.quantize(q, rounding=ROUND_HALF_UP)
return wert_jxl_gerundet == wert_csv_gerundet
def ist_zahl_csv(self, text: str) -> bool:
"""Prüft, ob ein String in der CSV als Zahl interpretiert werden kann.
:param text: Zu prüfender Text.
:type text: str
:return: True, wenn der Text eine Zahl ist, sonst False.
:rtype: bool
"""
text = str(text).strip()
if text == "":
return False
if text[0] == "+" or text[0] == "-":
text = text[1:]
if text == "":
return False
if text.count(",") + text.count(".") > 1:
return False
if "," in text:
teile = text.split(",", 1)
if len(teile) != 2:
return False
if teile[0] == "" or teile[1] == "":
return False
return teile[0].isdigit() and teile[1].isdigit()
if "." in text:
teile = text.split(".", 1)
if len(teile) != 2:
return False
if teile[0] == "" or teile[1] == "":
return False
return teile[0].isdigit() and teile[1].isdigit()
return text.isdigit()
def korrigiere_beobachtungen_tachymeter_csv_mit_jxl(self,
pfad_datei_csv: str,
pfad_datei_jxl: str,
pfad_datei_csv_out: str) -> dict:
"""Korrigiert Tachymeterbeobachtungen in einer CSV-Datei über eine JXL-Datei (Leica XML).
Die JXL-Datei wird eingelesen und je StationID eine Sequenz an Beobachtungen (Zielpunktname,
Horizontal-/Vertikalkreisablesung, Schrägdistanz sowie Prismenhöhe) aufgebaut. Die Winkel werden
von Grad nach gon umgerechnet und in der Nachkommastellenauflösung der JXL gerundet.
Anschließend wird die CSV zeilenweise verarbeitet:
- Standpunktzeilen werden der zugehörigen StationID/IH aus der JXL zugeordnet,
- Beobachtungszeilen werden falls der CSV-Wert eine Rundung des JXL-Werts ist durch den JXL-Wert mit voller Nachkommastellenzahl ersetzt,
- Prismenhöhen (ZH) werden aus der JXL übernommen und in der letzten Spalte als ":ZH:" angehängt.
Zusätzlich werden Fehler-Listen erstellt (fehlende IH, fehlende ZH, Standpunkte in CSV aber nicht in JXL). Diese werden dem Benutzer übergeben.
:param pfad_datei_csv: Pfad zur Eingabe-CSV (Tachymeterbeobachtungen).
:type pfad_datei_csv: str
:param pfad_datei_jxl: Pfad zur JXL-Datei.
:type pfad_datei_jxl: str
:param pfad_datei_csv_out: Pfad zur Ausgabe-CSV (korrigierte Datei).
:type pfad_datei_csv_out: str
:return: Ergebnisdictionary mit Status und Diagnosen, u. a.:
- "Import_fortsetzen" (bool),
- "dict_ersetzungen" (dict mit Zählungen für "Hz", "Z", "SD"),
- "liste_zeilen_ohne_IH" (list),
- "liste_zeilen_ohne_ZH" (list),
- "liste_zeilen_standpunkt_nicht_in_jxl" (list),
- "pfad_datei_csv_out" (str).
:rtype: dict
"""
# Import_fortsetzen wird False, sobald eine Fehler festgestellt wird. Als Folge wird der Import abgebrochen und eine Fehlermeldung mit Handlungshinweisen für den Benutzer ausgegeben.
Import_fortsetzen = True
getcontext().prec = 70
dict_ersetzungen = {"Hz": 0, "Z": 0, "SD": 0}
# IH = Instrumentenhöhe | ZH = Zielhöhe, bzw. Prismenhöhe
liste_zeilen_ohne_IH = []
liste_zeilen_ohne_ZH = []
liste_zeilen_standpunkt_nicht_in_jxl = []
liste_tachymeterstandpunkte = []
dict_stationname_tachymeterstandpunkte = {}
dict_standpunkte_anzahl = {}
dict_targetID_zu_ZH = {}
dict_stationID_zu_seq = {}
dict_stationnamen = {}
# Vorbereitung jxl-Datei lesen
tree = ET.parse(pfad_datei_jxl)
root = tree.getroot()
# JXL-Datei auslesen
if Import_fortsetzen:
# Standpunkt, StationID, Instrumentenhöhe aus der JXL-Datei auslesen
for sr in root.iter("StationRecord"):
stationname = (sr.findtext("StationName") or "").strip()
station_id = (sr.attrib.get("ID") or "").strip()
ih = (sr.findtext("TheodoliteHeight") or "").strip()
if stationname != "" and station_id != "":
liste_tachymeterstandpunkte.append((stationname, station_id, ih))
dict_stationnamen[stationname] = 1
if stationname not in dict_stationname_tachymeterstandpunkte:
dict_stationname_tachymeterstandpunkte[stationname] = []
dict_stationname_tachymeterstandpunkte[stationname].append((station_id, ih))
for stationname in dict_stationname_tachymeterstandpunkte.keys():
dict_standpunkte_anzahl[stationname] = 0
# Prismenhöhe auslesen und in Dict speichern
for tr in root.iter("TargetRecord"):
target_id = (tr.attrib.get("ID") or "").strip()
zh = (tr.findtext("TargetHeight") or "").strip()
if target_id != "":
dict_targetID_zu_ZH[target_id] = zh
for tupel in liste_tachymeterstandpunkte:
station_id = tupel[1]
if station_id not in dict_stationID_zu_seq:
dict_stationID_zu_seq[station_id] = []
# Horizontal- und Vertikalkreisablesungen, sowie Schrägdistanzablesungen aus JXL-Datei auslesen
for pr in root.iter("PointRecord"):
station_id = (pr.findtext("StationID") or "").strip()
if station_id == "" or station_id not in dict_stationID_zu_seq:
continue
circle = pr.find("Circle")
if circle is None:
continue
zielpunkt_name = (pr.findtext("Name") or "").strip()
target_id = (pr.findtext("TargetID") or "").strip()
hz_deg = (circle.findtext("HorizontalCircle") or "").strip()
z_deg = (circle.findtext("VerticalCircle") or "").strip()
sd_m = (circle.findtext("EDMDistance") or "").strip()
if zielpunkt_name == "" or hz_deg == "" or z_deg == "" or sd_m == "":
continue
stellen_hz = 0
if "." in hz_deg:
stellen_hz = len(hz_deg.split(".", 1)[1])
stellen_z = 0
if "." in z_deg:
stellen_z = len(z_deg.split(".", 1)[1])
stellen_sd = 0
if "." in sd_m:
stellen_sd = len(sd_m.split(".", 1)[1])
# Umrechnung Grad -> gon
hz_gon_decimal = Decimal(hz_deg) * (Decimal(10) / Decimal(9))
z_gon_decimal = Decimal(z_deg) * (Decimal(10) / Decimal(9))
q_hz = Decimal("1") if stellen_hz == 0 else Decimal("1." + ("0" * stellen_hz))
q_z = Decimal("1") if stellen_z == 0 else Decimal("1." + ("0" * stellen_z))
q_sd = Decimal("1") if stellen_sd == 0 else Decimal("1." + ("0" * stellen_sd))
hz_gon_decimal = hz_gon_decimal.quantize(q_hz, rounding=ROUND_HALF_UP)
z_gon_decimal = z_gon_decimal.quantize(q_z, rounding=ROUND_HALF_UP)
sd_decimal = Decimal(sd_m).quantize(q_sd, rounding=ROUND_HALF_UP)
# Ausgabe mit Kommatrennung
hz_gon_text = format(hz_gon_decimal, "f").replace(".", ",")
z_gon_text = format(z_gon_decimal, "f").replace(".", ",")
sd_text = format(sd_decimal, "f").replace(".", ",")
zh = dict_targetID_zu_ZH.get(target_id, "")
# Dictionary mit den Abfragen für die Weiterverarbeitung füllen.
dict_stationID_zu_seq[station_id].append({
"target": zielpunkt_name,
"hz_gon": hz_gon_text,
"z_gon": z_gon_text,
"sd_m": sd_text,
"zh": zh
})
station_id_aktuell = None
index_csv_jxl_aktuell = 0
standpunkt_aktuell = None
# CSV-Datei zeilenweise durchgehen und ggf. Werte ersetzen
if Import_fortsetzen:
with (open(pfad_datei_csv, newline="", encoding="utf-8") as fin,
open(pfad_datei_csv_out, "w", newline="", encoding="utf-8") as fout):
reader = csv.reader(fin, delimiter=";")
writer = csv.writer(fout, delimiter=";", lineterminator="\n")
for i, row in enumerate(reader):
nummer_zeile = i + 1
if len(row) < 4:
row = row + [""] * (4 - len(row))
if row[0].strip() != "" and row[1].strip() == "" and row[2].strip() == "" and row[3].strip() == "":
standpunkt = row[0].strip()
if standpunkt in dict_stationnamen:
zaehler = dict_standpunkte_anzahl.get(standpunkt, 0)
liste_records = dict_stationname_tachymeterstandpunkte[standpunkt]
# Überprüfung, ob in beiden Dateien die selben Tachymeterstandpunkte vorhanden sind
if zaehler >= len(liste_records):
Import_fortsetzen = False
print(
f"❌ Der Vorgang wurde abgebrochen: Standpunkt {standpunkt} kommt in der CSV öfter vor als in der JXL.")
break
station_id, ih = liste_records[zaehler]
dict_standpunkte_anzahl[standpunkt] = zaehler + 1
station_id_aktuell = station_id
index_csv_jxl_aktuell = 0
standpunkt_aktuell = standpunkt
# Erstellen einer Liste mit allen Zeilennummern aus der JXL, für die keine Instrumentenhöhe vorliegt, damit dies als Fehlermeldung mit Bearbeitungshinweisen ausgegeben werden kann.
if ih is None or str(ih).strip() == "":
liste_zeilen_ohne_IH.append((nummer_zeile, standpunkt))
writer.writerow([standpunkt, f"IH:{ih}", "", "", ""])
continue
# Erstellen einer Liste mit allen Zeilennummern aus der JXL, für die keine Prismenhöhe vorliegt, damit dies als Fehlermeldung mit Bearbeitungshinweisen ausgegeben werden kann.
if standpunkt.isdigit():
liste_zeilen_standpunkt_nicht_in_jxl.append((nummer_zeile, standpunkt))
writer.writerow(row)
continue
ist_beobachtung = False
if row[0].strip() != "" and row[1].strip() != "" and row[2].strip() != "" and row[3].strip() != "":
wert_hz = row[1].split(":ZH:", 1)[0].strip()
wert_z = row[2].split(":ZH:", 1)[0].strip()
wert_sd = row[3].split(":ZH:", 1)[0].strip()
if self.ist_zahl_csv(wert_hz) and self.ist_zahl_csv(wert_z) and self.ist_zahl_csv(wert_sd):
ist_beobachtung = True
if ist_beobachtung and station_id_aktuell is not None:
zielpunkt = row[0].strip()
hz_csv = row[1].strip()
z_csv = row[2].strip()
sd_csv = row[3].strip()
liste_seq = dict_stationID_zu_seq.get(station_id_aktuell, [])
if liste_seq == []:
writer.writerow(row)
continue
if index_csv_jxl_aktuell >= len(liste_seq):
writer.writerow(row)
continue
# Abfragen der selben Daten aus der JXL-Datei über den index
jxl_eintrag = liste_seq[index_csv_jxl_aktuell]
index_neu = index_csv_jxl_aktuell + 1
if jxl_eintrag["target"] != zielpunkt:
index_ende = min(len(liste_seq), index_csv_jxl_aktuell + 200)
liste_kandidaten = []
for index_kandidat in range(index_csv_jxl_aktuell, index_ende):
if liste_seq[index_kandidat]["target"] == zielpunkt:
liste_kandidaten.append((index_kandidat, liste_seq[index_kandidat]))
# Überprüfung, ob die Ablesungen in der CSV-Datei wirklich Rundungen der selben Daten mit mehr Nachkommastellen aus der JXL-Datei sind und abspeichern der Prüfergebnisse.
if liste_kandidaten != []:
if len(liste_kandidaten) == 1:
index_kandidat, kandidat = liste_kandidaten[0]
jxl_eintrag = kandidat
index_neu = index_kandidat + 1
else:
liste_bewertung = []
for index_kandidat, kandidat in liste_kandidaten:
score = 0
if self.ist_rundung_von_jxl(hz_csv, kandidat["hz_gon"]):
score += 1
if self.ist_rundung_von_jxl(z_csv, kandidat["z_gon"]):
score += 1
if self.ist_rundung_von_jxl(sd_csv, kandidat["sd_m"]):
score += 1
liste_bewertung.append((score, index_kandidat, kandidat))
liste_bewertung.sort(key=lambda t: (-t[0], t[1]))
_, index_best, kandidat_best = liste_bewertung[0]
jxl_eintrag = kandidat_best
index_neu = index_best + 1
index_csv_jxl_aktuell = index_neu
# Nur in der CSV-Datei ersetzen, wenn die CSV-Werte tatsächlich eine Rundung der JXL-Werte sind
hz_out = hz_csv
z_out = z_csv
sd_out = sd_csv
if self.ist_rundung_von_jxl(hz_csv, jxl_eintrag["hz_gon"]):
hz_out = jxl_eintrag["hz_gon"]
dict_ersetzungen["Hz"] += 1
if self.ist_rundung_von_jxl(z_csv, jxl_eintrag["z_gon"]):
z_out = jxl_eintrag["z_gon"]
dict_ersetzungen["Z"] += 1
if self.ist_rundung_von_jxl(sd_csv, jxl_eintrag["sd_m"]):
sd_out = jxl_eintrag["sd_m"]
dict_ersetzungen["SD"] += 1
zh = jxl_eintrag.get("zh", "")
if zh is None or str(zh).strip() == "":
liste_zeilen_ohne_ZH.append((nummer_zeile, standpunkt_aktuell, zielpunkt))
spalte_letzte = f"{sd_out}:ZH:{zh}" if str(zh).strip() != "" else sd_out
writer.writerow([zielpunkt, hz_out, z_out, spalte_letzte])
continue
writer.writerow(row)
if Import_fortsetzen:
print(f"Ersetzungen in der CSV-Datei (Rundung -> JXL volle Nachkommastellen): {dict_ersetzungen}")
# Ausgabe der Zeilennummern in der JXL-Datei ohne Instrumentenhöhe
if len(liste_zeilen_ohne_IH) > 0:
print("\n--- Fehlende IH in JXL-Datei ---")
print(f"Anzahl: {len(liste_zeilen_ohne_IH)}")
print(liste_zeilen_ohne_IH)
# Ausgabe der Zeilennummern in der JXL-Datei ohne Prismenhöhe
if len(liste_zeilen_ohne_ZH) > 0:
print("\n--- Fehlende ZH ---")
print(f"Anzahl: {len(liste_zeilen_ohne_ZH)}")
print(liste_zeilen_ohne_ZH)
# Ausgabe der Zeilennummern in der JXL-Datei mit unterschiedlichen Tachymeterstandpunkten im Vergleich zur CSV-Datei
if len(liste_zeilen_standpunkt_nicht_in_jxl) > 0:
print("\n--- Standpunkt in CSV-Datei, aber nicht in JXL-Datei---")
print(f"Anzahl: {len(liste_zeilen_standpunkt_nicht_in_jxl)}")
print(liste_zeilen_standpunkt_nicht_in_jxl)
else:
print("❌ Die Korrektur der CSV-Datei wurde abgebrochen.")
return {
"Import_fortsetzen": Import_fortsetzen,
"dict_ersetzungen": dict_ersetzungen,
"liste_zeilen_ohne_IH": liste_zeilen_ohne_IH,
"liste_zeilen_ohne_ZH": liste_zeilen_ohne_ZH,
"liste_zeilen_standpunkt_nicht_in_jxl": liste_zeilen_standpunkt_nicht_in_jxl,
"pfad_datei_csv_out": pfad_datei_csv_out
}
def import_beobachtungen_tachymeter(self, pfad_datei: str, instrumentenID: int) -> None:
"""Importiert Tachymeterbeobachtungen aus einer CSV-Datei in die Tabelle Beobachtungen.
Die Datei wird blockweise verarbeitet (Standpunktzeile + Beobachtungszeilen). Aus je zwei Halbsätzen
wird ein Vollsatz abgeleitet:
- Richtung: Mittel aus Hz(HS1) und Hz(HS2-200 gon), normiert in [0, 400),
- Zenitwinkel: (ZW1 - ZW2 + 400) / 2,
- Schrägdistanz: Mittel der beiden SD.
Zusätzlich werden aus Instrumenten- und Prismenhöhe bodenbezogene Werte berechnet
(schraegdistanz_bodenbezogen, zenitwinkel_bodenbezogen) über berechne_zenitwinkel_distanz_bodenbezogen().
Vorabprüfungen:
- Abbruch, wenn der Dateiname bereits in Beobachtungen vorkommt,
- Abbruch, wenn instrumentenID nicht in Tabelle Instrumente existiert,
- Plausibilitätsprüfung auf Vollsatz-Struktur (6 Zeilen pro Zielpunkt: 3 Vollsätze * 2 Halbsätze).
:param pfad_datei: Pfad zur CSV-Datei.
:type pfad_datei: str
:param instrumentenID: ID des verwendeten Instruments aus Tabelle Instrumente.
:type instrumentenID: int
:return: None
:rtype: None
"""
# Prüfen, ob Bereits Daten aus der Datei in der Datenbank vorhanden sind
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
liste_dateinamen_in_db = [r[0] for r in cursor.execute(
"SELECT DISTINCT dateiname FROM Beobachtungen"
).fetchall()]
liste_beobachtungsgruppeID = [r[0] for r in cursor.execute("""SELECT DISTINCT beobachtungsgruppeID
FROM Beobachtungen""").fetchall()]
liste_instrumentenid = [r[0] for r in cursor.execute("SELECT instrumenteID FROM Instrumente").fetchall()]
cursor.close()
con.close()
# Import_fortsetzen wird False, sobald eine Fehler festgestellt wird. Als Folge wird der Import abgebrochen und eine Fehlermeldung mit Handlungshinweisen für den Benutzer ausgegeben.
Import_fortsetzen = True
# Abbrechen des Imports, wenn bereits Daten aus der selben Datei importiert wurden.
if pfad_datei in liste_dateinamen_in_db:
Import_fortsetzen = False
if Import_fortsetzen:
nummer_zielpunkt = 0
# Abfragen der aktuell höchsten Nummer im Attribut beobachtungsgruppeID der Tabelle Beobachtungen
try:
nummer_beobachtungsgruppeID = max(liste_beobachtungsgruppeID)
except:
nummer_beobachtungsgruppeID = 0
if nummer_beobachtungsgruppeID is None:
nummer_beobachtungsgruppeID = 0
with (open(pfad_datei, "r", encoding="utf-8") as f):
liste_fehlerhafte_zeile = []
liste_beobachtungen_vorbereitung = []
for i, zeile in enumerate(f):
# Die ersten drei Zeilen der Importdatei müssen überprungen werden
if i < 3:
continue
zeile = zeile.strip().split(";")
if len(zeile) < 5:
zeile = zeile + [""] * (5 - len(zeile))
# Tachymeterstandpunkt speichern und beobachtungsgruppeID als ID für das einmalige Aufbauen eines Tachymeters auf einem Punkt festlegen
if zeile[2] == "" and zeile[3] == "" and zeile[4] == "":
nummer_beobachtungsgruppeID += 1
standpunkt = zeile[0]
instrumentenhoehe = zeile[1]
if instrumentenhoehe.startswith("IH:"):
instrumentenhoehe = instrumentenhoehe.split("IH:", 1)[1].strip()
# Überprüfung, ob für jeden Beobachteten Zielpunkt 6 Zeilen vorhanden sind (3 Vollszätze mit je 2 Halbsätzen)
if nummer_zielpunkt % 6 != 0:
liste_fehlerhafte_zeile.append(i)
nummer_zielpunkt = 0
liste_zielpunkte_hs = []
liste_zielpunkte_vs2 = []
liste_zielpunkte_vs3 = []
else:
# ZH = Prismenhöhe | VS = Vollsatz | HS = Halbsatz
# Erstellen einer Liste mit Standpunkt, Vollsatz und Halbsatzzuordnung für die Beobachtungen für die Weiterverarbeitung
nummer_zielpunkt += 1
if ":ZH:" in zeile[3]:
teil = zeile[3].split(":ZH:", 1)
zeile[3] = teil[0].strip()
zeile[4] = teil[1].strip()
if zeile[0] not in liste_zielpunkte_hs:
liste_zielpunkte_hs.append(zeile[0])
if zeile[0] in liste_zielpunkte_vs3:
liste_beobachtungen_vorbereitung.append(
[nummer_beobachtungsgruppeID, "VS3", "HS1", standpunkt, zeile[0], zeile[1],
zeile[2], zeile[3], zeile[4], instrumentenhoehe])
elif zeile[0] in liste_zielpunkte_vs2:
liste_beobachtungen_vorbereitung.append(
[nummer_beobachtungsgruppeID, "VS2", "HS1", standpunkt, zeile[0], zeile[1],
zeile[2], zeile[3], zeile[4], instrumentenhoehe])
else:
liste_beobachtungen_vorbereitung.append(
[nummer_beobachtungsgruppeID, "VS1", "HS1", standpunkt, zeile[0], zeile[1],
zeile[2],
zeile[3], zeile[4], instrumentenhoehe])
else:
liste_zielpunkte_hs.remove(zeile[0])
if zeile[0] in liste_zielpunkte_vs3:
liste_beobachtungen_vorbereitung.append(
[nummer_beobachtungsgruppeID, "VS3", "HS2", standpunkt, zeile[0], zeile[1],
zeile[2],
zeile[3], zeile[4], instrumentenhoehe])
elif zeile[0] in liste_zielpunkte_vs2:
if zeile[0] not in liste_zielpunkte_vs3:
liste_zielpunkte_vs3.append(zeile[0])
liste_beobachtungen_vorbereitung.append(
[nummer_beobachtungsgruppeID, "VS2", "HS2", standpunkt, zeile[0], zeile[1],
zeile[2],
zeile[3], zeile[4], instrumentenhoehe])
else:
if zeile[0] not in liste_zielpunkte_vs2:
liste_zielpunkte_vs2.append(zeile[0])
liste_beobachtungen_vorbereitung.append(
[nummer_beobachtungsgruppeID, "VS1", "HS2", standpunkt, zeile[0], zeile[1],
zeile[2],
zeile[3], zeile[4], instrumentenhoehe])
# Ausgabe, welche Zeilen in der Importdatei bearbeitet werden müssen.
if liste_fehlerhafte_zeile != []:
fehler_zeilen = ", ".join(map(str, liste_fehlerhafte_zeile))
print(
f"❌ Das Einlesen der Datei {pfad_datei} wurde abgebrochen.\n"
f"Bitte bearbeiten Sie die Zeilen rund um: {fehler_zeilen} in der csv-Datei "
f"und wiederholen Sie den Import."
)
Import_fortsetzen = False
else:
print(
f"❌ Der Import wurde abgebrochen, weil die Beobachtungen aus der Datei {pfad_datei} bereits in der Datenbank vorhanden sind.")
if Import_fortsetzen:
liste_beobachtungen_import = []
while len(liste_beobachtungen_vorbereitung) > 0:
liste_aktueller_zielpunkt = liste_beobachtungen_vorbereitung[0]
aktueller_zielpunkt = liste_aktueller_zielpunkt[4]
for index in range(1, len(liste_beobachtungen_vorbereitung)):
liste = liste_beobachtungen_vorbereitung[index]
# Berechnen der zu importierenden Beobachtungen. (Jeweils reduziert auf den Vollsatz)
if liste[4] == aktueller_zielpunkt:
richtung1 = self.string_to_decimal(liste_aktueller_zielpunkt[5])
richtung2 = self.string_to_decimal(liste[5]) - Decimal(200)
zenitwinkel_vollsatz_gon = (self.string_to_decimal(liste_aktueller_zielpunkt[6]) - self.string_to_decimal(
liste[6]) + 400) / 2
zenitwinkel_vollsatz_rad = Einheitenumrechnung.gon_to_rad_Decimal(zenitwinkel_vollsatz_gon)
distanz_vollsatz = (self.string_to_decimal(liste_aktueller_zielpunkt[7]) + self.string_to_decimal(
liste[7])) / 2
if richtung2 < 0 and richtung1 != Decimal(0):
richtung2 += Decimal(400)
elif richtung2 > 400:
richtung2 -= Decimal(400)
richtung_vollsatz_gon = (richtung1 + richtung2) / 2
richtung_vollsatz_rad = Einheitenumrechnung.gon_to_rad_Decimal(richtung_vollsatz_gon)
if liste_aktueller_zielpunkt[8] == liste[8]:
prismenhoehe = liste_aktueller_zielpunkt[8]
else:
Import_fortsetzen = False
print(f"❌ Der Import wurde abgebrochen, weil für zwei Halbsätze vom Standpunkt {liste_aktueller_zielpunkt[3]} zum Zielpunkt {aktueller_zielpunkt} unterschiedliche Prismenhöhen vorliegen. Bitte in der Datei {pfad_datei} korrigieren und Import neustarten.")
if liste_aktueller_zielpunkt[9] == liste[9]:
instrumentenhoehe_import = liste_aktueller_zielpunkt[9]
else:
Import_fortsetzen = False
print(f"❌ Der Import wurde abgebrochen, weil für zwei Halbsätze vom Standpunkt {liste_aktueller_zielpunkt[3]} zum Zielpunkt {aktueller_zielpunkt} unterschiedliche Instrumentenhöhen vorliegen. Bitte in der Datei {pfad_datei} korrigieren und Import neustarten.")
# Umrechnen der Zenitwinkel und Schrägdistanzen auf den Boden unter Verwendung der Instrumenten- und Prismenhöhen
schraegdistanz_bodenbezogen, zenitwinkel_bodenbezogen = self.berechnungen.berechne_zenitwinkel_distanz_bodenbezogen(
float(zenitwinkel_vollsatz_rad), float(distanz_vollsatz), float(instrumentenhoehe_import), float(prismenhoehe))
liste_beobachtungen_import.append(
[liste[0], liste[3], liste[4], richtung_vollsatz_rad, zenitwinkel_vollsatz_rad, distanz_vollsatz, zenitwinkel_bodenbezogen, schraegdistanz_bodenbezogen, instrumentenhoehe_import, prismenhoehe])
del liste_beobachtungen_vorbereitung[index]
del liste_beobachtungen_vorbereitung[0]
break
# Überprüfung, ob das Instrument bereits vom Benutzer angelegt wurde.
if instrumentenID not in liste_instrumentenid:
Import_fortsetzen = False
print(
"❌ Der Import wurde abgebrochen. Bitte eine gültige InstrumentenID eingeben. Bei Bedarf ist das Instrument neu anzulegen.")
# Berechnete bodenbezogene Beobachtungen, welche jeweils auf den Vollsatz reduziert sind, in die Tabelle Beobachtungen importieren.
if Import_fortsetzen:
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
for beobachtung_import in liste_beobachtungen_import:
cursor.execute(
"INSERT INTO Beobachtungen (punktnummer_sp, punktnummer_zp, instrumenteID, beobachtungsgruppeID, tachymeter_richtung, tachymeter_zenitwinkel_roh, tachymeter_distanz_roh, dateiname, tachymeter_instrumentenhoehe, tachymeter_prismenhoehe, tachymeter_zenitwinkel, tachymeter_distanz) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(beobachtung_import[1], beobachtung_import[2], instrumentenID, beobachtung_import[0],
float(beobachtung_import[3]), float(beobachtung_import[4]), float(beobachtung_import[5]),
pfad_datei, float(beobachtung_import[8]), float(beobachtung_import[9]), float(beobachtung_import[6]), float(beobachtung_import[7])))
con.commit()
cursor.close()
con.close()
print(f"✅ Der Import der Datei {pfad_datei} wurde erfolgreich abgeschlossen.")
def vorbereitung_import_beobachtungen_nivellement_naeherung_punkthoehen(self, pfad_datei: str,
instrumentenID: int) -> None | tuple[None, None] | tuple[dict[Any, Any], list[Any]]:
"""Bereitet den Import von Nivellementdaten zur Ableitung von Näherungs-Normalhöhen vor.
Aus einer semikolon-separierten Datei werden Zeilen erkannt, die berechnete Zielweiten/Punkthöhen enthalten,
und daraus pro Punktnummer alle gefundenen Z-Werte gesammelt. Anschließend wird je Punktnummer
der Mittelwert berechnet (Rundung auf 6 Nachkommastellen).
Danach wird geprüft, welche dieser Punktnummern bereits in der Tabelle Netzpunkte existieren.
Abbruchbedingungen:
- Dateiname bereits in Beobachtungen vorhanden,
- instrumentenID nicht in Instrumente vorhanden.
:param pfad_datei: Pfad zur Importdatei.
:type pfad_datei: str
:param instrumentenID: ID des verwendeten Niv-Instruments aus der Tabelle Instrumente.
:type instrumentenID: int
:return: (dict_punkt_mittelwert_punkthoehen, liste_punktnummern_in_db) oder (None, None) bei Abbruch.
:rtype: None | tuple[None, None] | tuple[dict[Any, Any], list[Any]]
"""
# Prüfen, ob bereits Daten aus der Datei in der Datenbank vorhanden sind
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
liste_dateinamen_in_db = [r[0] for r in cursor.execute(
"SELECT DISTINCT dateiname FROM Beobachtungen"
).fetchall()]
liste_instrumentenid = [r[0] for r in cursor.execute("SELECT instrumenteID FROM Instrumente").fetchall()]
liste_netzpunkte = [r[0] for r in cursor.execute("SELECT punktnummer FROM Netzpunkte").fetchall()]
cursor.close()
con.close()
# Import_fortsetzen wird False, sobald eine Fehler festgestellt wird. Als Folge wird der Import abgebrochen und eine Fehlermeldung mit Handlungshinweisen für den Benutzer ausgegeben.
Import_fortsetzen = True
# Abbruch des Imports, wenn bereits Daten aus der Datei importiert wurden.
if pfad_datei in liste_dateinamen_in_db:
Import_fortsetzen = False
print(f"❌ Der Import wurde abgebrochen, weil die Beobachtungen aus der Datei {pfad_datei} bereits in der Datenbank vorhanden sind.")
return None, None
# Abbruch, wenn das Instrument noch nicht vom Benutzer angelegt wurde.
if instrumentenID not in liste_instrumentenid:
Import_fortsetzen = False
print(
"❌ Der Import wurde abgebrochen. Bitte eine gültige InstrumentenID eingeben. Bei Bedarf ist das Instrument neu anzulegen.")
return None, None
if Import_fortsetzen:
# Messwerte aus der Importdatei abfragen
muster_berechnete_zielweiten = "| | |Z "
dict_punkt_alle_punkthoehen = {}
dict_punkt_mittelwert_punkthoehen = {}
with open(pfad_datei, newline="", encoding="utf-8") as csvfile:
r = csv.reader(csvfile, delimiter=";")
for i, row in enumerate(r):
if any(muster_berechnete_zielweiten in feld for feld in row):
zeile = " ".join([str(feld) for feld in row])
punktnummer = None
if "KD1" in zeile:
teil = zeile.split("KD1", 1)[1].strip()
if teil != "":
punktnummer = teil.split()[0]
wert_z = None
if "|Z" in zeile:
teil = zeile.split("|Z", 1)[1].strip()
if teil != "":
wert_z = self.string_to_float(teil.split()[0])
if punktnummer is not None and wert_z is not None:
if punktnummer not in dict_punkt_alle_punkthoehen:
dict_punkt_alle_punkthoehen[punktnummer] = []
dict_punkt_alle_punkthoehen[punktnummer].append(wert_z)
for punktnummer, liste_z in dict_punkt_alle_punkthoehen.items():
# Hier wird auf 6 Nachkommastellen gerundet!
dict_punkt_mittelwert_punkthoehen[punktnummer] = round(sum(liste_z) / len(liste_z),6)
# Erstellen einer Liste mit allen Punktnummern, die in der Datenbank vorliegen und für die Nivellementbeobachtungen vorliegen.
if Import_fortsetzen:
# Ausgabe, welche Niv-Punkte bereits in der Tabelle Netzpunkte enthalten sind
liste_punktnummern_nivellement = dict_punkt_mittelwert_punkthoehen.keys()
liste_punktnummern_in_db = []
liste_punktnummern_nicht_in_db = []
for punktnummer in liste_punktnummern_nivellement:
if punktnummer in liste_netzpunkte:
liste_punktnummern_in_db.append(punktnummer)
else:
liste_punktnummern_nicht_in_db.append(punktnummer)
# Es werden nur Höhendifferenzen für Punkte berechnet, für die Näherungskoordinaten in der Datenbank vorliegen.
if Import_fortsetzen:
print(f"Folgende Stand- und Zielpunkte des geometrischen Nivellements werden für die Beobachtungsgruppe ausgeglichen: {liste_punktnummern_in_db}\nFür folgende Punkte wird aktuell keine Höhe in der Ausgleichung berechnet: {liste_punktnummern_nicht_in_db}. Bei Bedarf im folgenden Schritt ändern!")
return dict_punkt_mittelwert_punkthoehen, liste_punktnummern_in_db
def import_beobachtungen_nivellement_naeherung_punkthoehen(self, dict_punkt_mittelwert_punkthoehen: dict,
liste_punktnummern_in_db: list,
liste_punktnummern_hinzufuegen: list) -> str | None:
"""Importiert Näherungsnormalhöhen in die Tabelle Netzpunkte.
Es werden Listen geführt für neu hinzugefügte, bereits vorhandene und geänderte Punkte und
abschließend als Konsolen-Ausgabe ausgegeben.
:param dict_punkt_mittelwert_punkthoehen: Dictionary {punktnummer: normalhoehe} als Mittelwerte.
:type dict_punkt_mittelwert_punkthoehen: dict
:param liste_punktnummern_in_db: Punktnummern, die bereits in Netzpunkte existieren.
:type liste_punktnummern_in_db: list
:param liste_punktnummern_hinzufuegen: Zusätzliche Punktnummern, die ebenfalls übernommen werden sollen.
:type liste_punktnummern_hinzufuegen: list
:return: Status-String mit der Liste der Punkte, für die Normalhöhen in der Ausgleichung verfügbar sind, oder None bei Abbruch.
:rtype: str | None
"""
# Import_fortsetzen wird False, sobald eine Fehler festgestellt wird. Als Folge wird der Import abgebrochen und eine Fehlermeldung mit Handlungshinweisen für den Benutzer ausgegeben.
Import_fortsetzen = True
if dict_punkt_mittelwert_punkthoehen == None or liste_punktnummern_in_db == None or liste_punktnummern_hinzufuegen == None:
Import_fortsetzen = False
print("❌ Der Import der Nivellementbeobachtungen wurde abgebrochen.")
return None
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
liste_punkte_neu_hinzugefuegt = []
liste_punkte_bereits_vorhanden = []
liste_punkte_geaendert = []
for punktnummer in liste_punktnummern_hinzufuegen:
# Neu anlegen oder aktualisieren der Normalhöhen in der Tabelle Netzpunkte
try:
cursor.execute(f"INSERT INTO Netzpunkte (punktnummer, normalhoehe_hfp) VALUES (?, ?)", (punktnummer, dict_punkt_mittelwert_punkthoehen[punktnummer]))
liste_punkte_neu_hinzugefuegt.append(punktnummer)
except sqlite3.IntegrityError:
liste_punkte_bereits_vorhanden.append(punktnummer)
cursor.execute(
"UPDATE Netzpunkte SET normalhoehe_hfp = ? WHERE punktnummer = ?",
(dict_punkt_mittelwert_punkthoehen[punktnummer], punktnummer)
)
liste_punkte_geaendert.append(punktnummer)
for punktnummer in liste_punktnummern_in_db:
cursor.execute(f"UPDATE Netzpunkte SET normalhoehe_hfp = ? WHERE punktnummer = ?", (dict_punkt_mittelwert_punkthoehen[punktnummer], punktnummer))
liste_punkte_geaendert.append(punktnummer)
con.commit()
cursor.close()
con.close()
print(f"Neu hinzugefügt ({len(liste_punkte_neu_hinzugefuegt)}): {liste_punkte_neu_hinzugefuegt}")
print(f"Bereits vorhanden ({len(liste_punkte_bereits_vorhanden)}): {liste_punkte_bereits_vorhanden}")
print(f"Geändert ({len(liste_punkte_geaendert)}): {liste_punkte_geaendert}\n")
return f"Für folgende Punkte werden die Höhen Ausgeglichen: {liste_punktnummern_in_db}"
def import_beobachtungen_nivellement_RVVR(self, pfad_datei: str, instrumentenID: int) -> str | None:
"""Importiert geometrische Nivellementbeobachtungen nach dem RVVR Prinzip die Tabelle Beobachtungen.
Es werden Zeilen mit Rück-/Vormessungen erkannt (rvvr: Rück, Vor, Vor, Rück). Pro Block werden
dh und Entfernung berechnet (Rundung jeweils auf 8 Nachkommastellen). Anschließend werden Züge reduziert, indem
Wechselpunkte, die nicht in Netzpunkte vorhanden sind, durch Summation herausgekürzt werden.
Damit werden nur Beobachtungen zwischen Punkten in Netzpunkte (bzw. reduzierten Start-/Zielpunkten)
in Beobachtungen gespeichert.
Abbruchbedingungen:
- Dateiname bereits in Beobachtungen vorhanden,
- instrumentenID nicht in Instrumente vorhanden,
- Anzahl RVVR-Zeilen nicht durch 4 teilbar.
:param pfad_datei: Pfad zur Textdatei.
:type pfad_datei: str
:param instrumentenID: ID des verwendeten Niv-Instruments aus der Tabelle Instrumente.
:type instrumentenID: int
:return: Status-String bei Erfolg oder None bei Abbruch.
:rtype: str | None
"""
# Prüfen, ob Bereits Daten aus der Datei in der Datenbank vorhanden sind
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
liste_dateinamen_in_db = [r[0] for r in cursor.execute(
"SELECT DISTINCT dateiname FROM Beobachtungen"
).fetchall()]
liste_instrumentenid = [r[0] for r in cursor.execute("SELECT instrumenteID FROM Instrumente").fetchall()]
liste_netzpunkte = [r[0] for r in cursor.execute("SELECT punktnummer FROM Netzpunkte").fetchall()]
cursor.close()
con.close()
# Import_fortsetzen wird False, sobald eine Fehler festgestellt wird. Als Folge wird der Import abgebrochen und eine Fehlermeldung mit Handlungshinweisen für den Benutzer ausgegeben.
Import_fortsetzen = True
# Import abbrechen, wenn bereits Daten aus der selben Datei importiert wurden
if pfad_datei in liste_dateinamen_in_db:
Import_fortsetzen = False
print(f"❌ Der Import wurde abgebrochen, weil die Beobachtungen aus der Datei {pfad_datei} bereits in der Datenbank vorhanden sind.")
# Import abbrechen, wenn das Instrument noch nicht vom Benutzer angelegt wurde.
if instrumentenID not in liste_instrumentenid:
Import_fortsetzen = False
print(
"❌ Der Import wurde abgebrochen. Bitte eine gültige InstrumentenID eingeben. Bei Bedarf ist das Instrument neu anzulegen.")
if Import_fortsetzen:
# rvvr = Rück, Vor, Vor, Rück
# Berechnen der Höhenunterschiede delta H und Schrägstrecken zwischen Zwei Punkten gemäß der Importdatei aus rvvr.
anzahl_zeilen_rvvr = 0
liste_zeilen_rvvr = []
liste_punktpaare = []
with open(pfad_datei, "r", encoding="utf-8") as f:
for i, zeile in enumerate(f):
if ("Lr" in zeile) or ("Lv" in zeile):
liste_zeilen_rvvr.append(zeile)
anzahl_zeilen_rvvr += 1
if anzahl_zeilen_rvvr % 4 == 0:
index = 0
while index < len(liste_zeilen_rvvr):
block_4 = liste_zeilen_rvvr[index:index + 4]
liste_punktnummern_block = []
for zeile_block in block_4:
punktnummer = None
if "|KD1" in zeile_block:
teil = zeile_block.split("|KD1", 1)[1].strip()
if teil != "":
punktnummer = teil.split()[0]
if punktnummer is not None:
if punktnummer not in liste_punktnummern_block:
liste_punktnummern_block.append(punktnummer)
r1 = None
v1 = None
v2 = None
r2 = None
e_1 = None
e_2 = None
e_3 = None
e_4 = None
zugnummer = None
if "|Lr" in block_4[0]:
teil = block_4[0].split("|Lr", 1)[0].strip()
if teil != "":
zugnummer = int(teil.split()[-1])
teil = block_4[0].split("Lr", 1)[1].strip() if "Lr" in block_4[0] else block_4[0].split("Lv", 1)[
1].strip()
r1 = self.string_to_float(teil.split()[0])
teil = block_4[1].split("Lr", 1)[1].strip() if "Lr" in block_4[1] else block_4[1].split("Lv", 1)[
1].strip()
v1 = self.string_to_float(teil.split()[0])
teil = block_4[2].split("Lr", 1)[1].strip() if "Lr" in block_4[2] else block_4[2].split("Lv", 1)[
1].strip()
v2 = self.string_to_float(teil.split()[0])
teil = block_4[3].split("Lr", 1)[1].strip() if "Lr" in block_4[3] else block_4[3].split("Lv", 1)[
1].strip()
r2 = self.string_to_float(teil.split()[0])
if "|E" in block_4[0]:
teil = block_4[0].split("|E", 1)[1].strip()
if teil != "":
e_1 = self.string_to_float(teil.split()[0])
if "|E" in block_4[1]:
teil = block_4[1].split("|E", 1)[1].strip()
if teil != "":
e_2 = self.string_to_float(teil.split()[0])
if "|E" in block_4[2]:
teil = block_4[2].split("|E", 1)[1].strip()
if teil != "":
e_3 = self.string_to_float(teil.split()[0])
if "|E" in block_4[3]:
teil = block_4[3].split("|E", 1)[1].strip()
if teil != "":
e_4 = self.string_to_float(teil.split()[0])
# Achtung: Hier Rundung auf 8 Nachkommastellen
dh = round((r1 - v1 - v2 + r2) / 2, 8)
entfernung = round((e_1 + e_2 + e_3 + e_4) / 2, 8)
liste_punktpaare.append((zugnummer, liste_punktnummern_block[0], liste_punktnummern_block[1], dh, entfernung))
index += 4
# Berechnen der Höhendifferenzen und schrägstrecken zwischen Punkten in der Tabelle Netzpunkte.
# Somit werden alle Wechselpunkte nicht in der Ausgleichung berücksichtigt.
# Die Berechnung erfolgt durch addition.
liste_beobachtungen_reduziert = []
liste_beobachtungen_bearbeitung = []
zugnummer_vorher = liste_punktpaare[0][0]
for einzelbeobachtung in liste_punktpaare:
zugnummer = einzelbeobachtung[0]
if zugnummer == zugnummer_vorher:
if einzelbeobachtung[1] in liste_netzpunkte and einzelbeobachtung[2] in liste_netzpunkte:
liste_beobachtungen_reduziert.append(einzelbeobachtung + (1,))
elif einzelbeobachtung[1] in liste_netzpunkte and einzelbeobachtung[2] not in liste_netzpunkte:
liste_beobachtungen_bearbeitung.append(einzelbeobachtung)
elif einzelbeobachtung[1] not in liste_netzpunkte and einzelbeobachtung[2] in liste_netzpunkte:
liste_beobachtungen_bearbeitung.append(einzelbeobachtung)
startpunkt = None
zielpunkt = None
summe_dh = None
summe_entfernung = None
anzahl_standpunkte = 1
for i, beobachtung_bearbeiten in enumerate(liste_beobachtungen_bearbeitung):
if i == 0:
startpunkt = beobachtung_bearbeiten[1]
summe_dh = beobachtung_bearbeiten[3]
summe_entfernung = beobachtung_bearbeiten[4]
anzahl_standpunkte = 1
else:
zielpunkt = beobachtung_bearbeiten[2]
summe_dh += beobachtung_bearbeiten[3]
summe_entfernung += beobachtung_bearbeiten[4]
anzahl_standpunkte += 1
# Achtung:Hier Rundung auf 8 Nachkommastellen!
liste_beobachtungen_reduziert.append(
(zugnummer, startpunkt, zielpunkt, round(summe_dh, 8),
round(summe_entfernung, 8), anzahl_standpunkte))
liste_beobachtungen_bearbeitung = []
else:
liste_beobachtungen_bearbeitung.append(einzelbeobachtung)
else:
if einzelbeobachtung[1] in liste_netzpunkte and einzelbeobachtung[2] in liste_netzpunkte:
liste_beobachtungen_reduziert.append(einzelbeobachtung + (1,))
elif einzelbeobachtung[1] in liste_netzpunkte and einzelbeobachtung[2] not in liste_netzpunkte:
liste_beobachtungen_bearbeitung.append(einzelbeobachtung)
elif einzelbeobachtung[1] not in liste_netzpunkte and einzelbeobachtung[2] in liste_netzpunkte:
liste_beobachtungen_bearbeitung.append(einzelbeobachtung)
startpunkt = None
zielpunkt = None
summe_dh = None
summe_entfernung = None
anzahl_standpunkte = 1
for i, beobachtung_bearbeiten in enumerate(liste_beobachtungen_bearbeitung):
if i == 0:
startpunkt = beobachtung_bearbeiten[1]
summe_dh = beobachtung_bearbeiten[3]
summe_entfernung = beobachtung_bearbeiten[4]
anzahl_standpunkte = 1
else:
zielpunkt = beobachtung_bearbeiten[2]
summe_dh += beobachtung_bearbeiten[3]
summe_entfernung += beobachtung_bearbeiten[4]
anzahl_standpunkte += 1
#Achtung:Hier Rundung auf 8 Nachkommastellen!
liste_beobachtungen_reduziert.append(
(zugnummer, startpunkt, zielpunkt, round(summe_dh,8), round(summe_entfernung,8), anzahl_standpunkte))
liste_beobachtungen_bearbeitung = []
else:
liste_beobachtungen_bearbeitung.append(einzelbeobachtung)
zugnummer_vorher = zugnummer
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
for beobachtung_reduziert in liste_beobachtungen_reduziert:
cursor.execute(f"INSERT INTO Beobachtungen (punktnummer_sp, punktnummer_zp, instrumenteID, niv_dh, niv_strecke, niv_anz_standpkte, dateiname) VALUES (?, ?, ?, ?, ?, ?, ?)", (beobachtung_reduziert[1], beobachtung_reduziert[2], instrumentenID, beobachtung_reduziert[3], beobachtung_reduziert[4], beobachtung_reduziert[5], pfad_datei))
con.commit()
cursor.close()
con.close()
return f"✅ Die Beobachtungen aus der Datei {pfad_datei} wurden erfolgreich importiert."
else:
print(f"❌ Anzahl RVVR nicht durch 4 teilbar. Bitte die Datei {pfad_datei} überprüfen! Der Import wurde abgebrochen.")
Import_fortsetzen = False
def import_koordinaten_gnss(self, pfad_datei: str, liste_sapos_stationen_genauigkeiten: list) -> str:
"""Importiert GNSS-Koordinaten (ECEF) in die Tabelle Netzpunkte.
Die Datei wird semikolon-separiert gelesen und zusätzlich aufgesplittet (Leerzeichen und Kommas).
Für Referenzstationen (Kennzeichnung "Referenz" und Standardabweichungen 0.0000/0.0000/0.0000)
werden die übergebenen Genauigkeiten (X/Y/Z) eingesetzt.
Die Koordinaten und Standardabweichungen werden in Netzpunkte geschrieben.
:param pfad_datei: Pfad zur Koordinatendatei.
:type pfad_datei: str
:param liste_sapos_stationen_genauigkeiten: Liste mit drei Werten [σX, σY, σZ] für Referenzstationen.
:type liste_sapos_stationen_genauigkeiten: list
:return: Statusmeldung zum erfolgreichen Import.
:rtype: str
"""
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
with (open(pfad_datei, newline="", encoding="utf-8") as csvfile):
r = csv.reader(csvfile, delimiter = ";")
for i, row in enumerate(r):
row_neu = []
for eintrag in row:
eintrag = str(eintrag).strip()
eintrag = eintrag.replace("'", "")
aufgeteilt = eintrag.split()
for teil in aufgeteilt:
teil = teil.split(",")
row_neu.extend(teil)
if row_neu[1] == 'Referenz' and row_neu[7] == '0.0000' and row_neu[8] == '0.0000' and row_neu[9] == '0.0000':
row_neu[7] = liste_sapos_stationen_genauigkeiten[0]
row_neu[8] = liste_sapos_stationen_genauigkeiten[1]
row_neu[9] = liste_sapos_stationen_genauigkeiten[2]
cursor.execute(f"""INSERT INTO Netzpunkte (punktnummer, naeherungx_us, naeherungy_us, naeherungz_us, stabw_vorinfo_x, stabw_vorinfo_y, stabw_vorinfo_z) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT (punktnummer) DO UPDATE SET naeherungx_us = excluded.naeherungx_us,
naeherungy_us = excluded.naeherungy_us,
naeherungz_us = excluded.naeherungz_us,
stabw_vorinfo_x = excluded.stabw_vorinfo_x,
stabw_vorinfo_y = excluded.stabw_vorinfo_y,
stabw_vorinfo_z = excluded.stabw_vorinfo_z""", (row_neu[0], row_neu[4], row_neu[5], row_neu[6], row_neu[7], row_neu[8], row_neu[9])
)
con.commit()
con.close()
return "Import der Koordinaten aus stationärem GNSS abgeschlossen."
def import_basislinien_gnss(self, pfad_datei: str, instrumentenID: int) -> None:
"""Importiert GNSS-Basislinien inkl. Kovarianzen in die Tabelle Beobachtungen.
Die Datei wird zeilenweise gelesen und Basislinien werden über Präfixe erkannt:
- @+ : Ziel-/Punktkennung (ID),
- @- : Gegenpunktkennung und Komponenten (bx/by/bz),
- @= : s0 und Kovarianzterme (cxx, cxy, cxz, cyy, cyz, czz).
Vorabprüfungen:
- Abbruch, wenn der Dateiname bereits in Beobachtungen vorkommt,
- Abbruch, wenn instrumentenID nicht in Instrumente existiert.
:param pfad_datei: Pfad zur Basisliniendatei (Text).
:type pfad_datei: str
:param instrumentenID: ID des verwendeten GNSS-Instruments (FK auf Instrumente).
:type instrumentenID: int
:return: None
:rtype: None
"""
# Import_fortsetzen wird False, sobald eine Fehler festgestellt wird. Als Folge wird der Import abgebrochen und eine Fehlermeldung mit Handlungshinweisen für den Benutzer ausgegeben.
Import_fortsetzen = True
# Prüfen, ob Bereits Daten aus der Datei in der Datenbank vorhanden sind
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
liste_dateinamen_in_db = [r[0] for r in cursor.execute(
"SELECT DISTINCT dateiname FROM Beobachtungen"
).fetchall()]
if pfad_datei in liste_dateinamen_in_db:
Import_fortsetzen = False
# Import abbrechen, wenn das Instrument noch nicht vom Benutzer angelegt wurde.
liste_instrumentenid = [r[0] for r in cursor.execute("SELECT instrumenteID FROM Instrumente").fetchall()]
cursor.close()
con.close()
if instrumentenID not in liste_instrumentenid:
Import_fortsetzen = False
print(
"❌ Der Import wurde abgebrochen. Bitte eine gültige InstrumentenID eingeben. Bei Bedarf ist das Instrument neu anzulegen.")
if Import_fortsetzen:
liste_basilinien = []
tupel_basislinie = ()
with (open(pfad_datei, "r", encoding="utf-8") as txt):
for i, zeile in enumerate(txt):
zeile = str(zeile).rstrip("\n").rstrip("\r")
aufgeteilt = zeile.split()
if aufgeteilt[0][:2] == "@+":
tupel_basislinie += (aufgeteilt[0][2:],)
if aufgeteilt[0][:2] == "@-":
tupel_basislinie += (aufgeteilt[0][2:], aufgeteilt[1], aufgeteilt[2], aufgeteilt[3],)
if aufgeteilt[0][:2] == "@=":
tupel_basislinie += (aufgeteilt[1], aufgeteilt[2], aufgeteilt[3], aufgeteilt[4], aufgeteilt[5], aufgeteilt[6], aufgeteilt[7], )
liste_basilinien.append(tupel_basislinie)
tupel_basislinie = ()
else:
print(
f"❌ Der Import wurde abgebrochen, weil die Beobachtungen aus der Datei {pfad_datei} bereits in der Datenbank vorhanden sind.")
if Import_fortsetzen:
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
for basislinie in liste_basilinien:
cursor.execute(
"INSERT INTO Beobachtungen (punktnummer_sp, punktnummer_zp, gnss_bx, gnss_by, gnss_bz, gnss_s0, gnss_cxx, gnss_cxy, gnss_cxz, gnss_cyy, gnss_cyz, gnss_czz, dateiname, instrumenteID) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(basislinie[0], basislinie[1], float(basislinie[2]), float(basislinie[3]), float(basislinie[4]), float(basislinie[5]), float(basislinie[6]), float(basislinie[7]), float(basislinie[8]), float(basislinie[9]), float(basislinie[10]), float(basislinie[11]), pfad_datei, instrumentenID))
con.commit()
cursor.close()
con.close()
print(f"✅ Der Import der Datei {pfad_datei} wurde erfolgreich abgeschlossen.")