This commit is contained in:
2026-01-23 21:19:23 +01:00
parent 7c1b335b1c
commit 1561eb242e
8 changed files with 36417 additions and 35865 deletions

440
Import.py
View File

@@ -2,6 +2,11 @@ import csv
import sqlite3
from decimal import Decimal
from typing import Any
import re
import xml.etree.ElementTree as ET
from decimal import Decimal, getcontext, ROUND_HALF_UP
from Berechnungen import Berechnungen
import Berechnungen
@@ -65,7 +70,390 @@ class Import:
con.close()
print("Der Import der Näherungskoordinaten wurde erfolgreich abgeschlossen")
def import_beobachtungen_tachymeter(self, pfad_datei: str, instrumentenID: int) -> None:
def ist_rundung_von_jxl(self, wert_csv: str, wert_jxl_voll: str) -> bool:
wert_csv = str(wert_csv).strip()
wert_jxl_voll = str(wert_jxl_voll).strip()
if ":ZH:" in wert_csv:
wert_csv = wert_csv.split(":ZH:", 1)[0].strip()
if ":ZH:" in wert_jxl_voll:
wert_jxl_voll = wert_jxl_voll.split(":ZH:", 1)[0].strip()
def ist_zahl(text: str) -> bool:
text = str(text).strip()
if text == "":
return False
if text[0] == "+" or text[0] == "-":
text = text[1:]
if text == "":
return False
if text.count(",") + text.count(".") > 1:
return False
if "," in text:
teile = text.split(",", 1)
if len(teile) != 2:
return False
if teile[0] == "" or teile[1] == "":
return False
return teile[0].isdigit() and teile[1].isdigit()
if "." in text:
teile = text.split(".", 1)
if len(teile) != 2:
return False
if teile[0] == "" or teile[1] == "":
return False
return teile[0].isdigit() and teile[1].isdigit()
return text.isdigit()
if ist_zahl(wert_csv) == False:
return False
if ist_zahl(wert_jxl_voll) == False:
return False
anzahl_nachkommastellen_csv = 0
if "," in wert_csv:
anzahl_nachkommastellen_csv = len(wert_csv.split(",", 1)[1])
elif "." in wert_csv:
anzahl_nachkommastellen_csv = len(wert_csv.split(".", 1)[1])
anzahl_nachkommastellen_jxl = 0
if "," in wert_jxl_voll:
anzahl_nachkommastellen_jxl = len(wert_jxl_voll.split(",", 1)[1])
elif "." in wert_jxl_voll:
anzahl_nachkommastellen_jxl = len(wert_jxl_voll.split(".", 1)[1])
if anzahl_nachkommastellen_csv >= anzahl_nachkommastellen_jxl:
return False
wert_csv_decimal = self.string_to_decimal(wert_csv.replace(".", ","))
wert_jxl_decimal = self.string_to_decimal(wert_jxl_voll.replace(".", ","))
q = Decimal("1") if anzahl_nachkommastellen_csv == 0 else Decimal("1." + ("0" * anzahl_nachkommastellen_csv))
wert_jxl_gerundet = wert_jxl_decimal.quantize(q, rounding=ROUND_HALF_UP)
wert_csv_gerundet = wert_csv_decimal.quantize(q, rounding=ROUND_HALF_UP)
return wert_jxl_gerundet == wert_csv_gerundet
def ist_zahl_csv(self, text: str) -> bool:
text = str(text).strip()
if text == "":
return False
if text[0] == "+" or text[0] == "-":
text = text[1:]
if text == "":
return False
if text.count(",") + text.count(".") > 1:
return False
if "," in text:
teile = text.split(",", 1)
if len(teile) != 2:
return False
if teile[0] == "" or teile[1] == "":
return False
return teile[0].isdigit() and teile[1].isdigit()
if "." in text:
teile = text.split(".", 1)
if len(teile) != 2:
return False
if teile[0] == "" or teile[1] == "":
return False
return teile[0].isdigit() and teile[1].isdigit()
return text.isdigit()
def korrigiere_beobachtungen_tachymeter_csv_mit_jxl(self,
pfad_datei_csv: str,
pfad_datei_jxl: str,
pfad_datei_csv_out: str) -> dict:
Import_fortsetzen = True
getcontext().prec = 70
dict_ersetzungen = {"Hz": 0, "Z": 0, "SD": 0}
liste_zeilen_ohne_IH = []
liste_zeilen_ohne_ZH = []
liste_zeilen_standpunkt_nicht_in_jxl = []
liste_stationrecords = []
dict_stationname_stationrecords = {}
dict_stationname_zaehler = {}
dict_targetID_zu_ZH = {}
dict_stationID_zu_seq = {}
dict_stationnamen = {}
tree = ET.parse(pfad_datei_jxl)
root = tree.getroot()
if Import_fortsetzen:
# StationRecords einlesen (Standpunkt, StationID, Instrumentenhöhe)
for sr in root.iter("StationRecord"):
stationname = (sr.findtext("StationName") or "").strip()
station_id = (sr.attrib.get("ID") or "").strip()
ih = (sr.findtext("TheodoliteHeight") or "").strip()
if stationname != "" and station_id != "":
liste_stationrecords.append((stationname, station_id, ih))
dict_stationnamen[stationname] = 1
if stationname not in dict_stationname_stationrecords:
dict_stationname_stationrecords[stationname] = []
dict_stationname_stationrecords[stationname].append((station_id, ih))
for stationname in dict_stationname_stationrecords.keys():
dict_stationname_zaehler[stationname] = 0
for tr in root.iter("TargetRecord"):
target_id = (tr.attrib.get("ID") or "").strip()
zh = (tr.findtext("TargetHeight") or "").strip()
if target_id != "":
dict_targetID_zu_ZH[target_id] = zh
for tupel in liste_stationrecords:
station_id = tupel[1]
if station_id not in dict_stationID_zu_seq:
dict_stationID_zu_seq[station_id] = []
for pr in root.iter("PointRecord"):
station_id = (pr.findtext("StationID") or "").strip()
if station_id == "" or station_id not in dict_stationID_zu_seq:
continue
circle = pr.find("Circle")
if circle is None:
continue
zielpunkt_name = (pr.findtext("Name") or "").strip()
target_id = (pr.findtext("TargetID") or "").strip()
hz_deg = (circle.findtext("HorizontalCircle") or "").strip()
z_deg = (circle.findtext("VerticalCircle") or "").strip()
sd_m = (circle.findtext("EDMDistance") or "").strip()
if zielpunkt_name == "" or hz_deg == "" or z_deg == "" or sd_m == "":
continue
stellen_hz = 0
if "." in hz_deg:
stellen_hz = len(hz_deg.split(".", 1)[1])
stellen_z = 0
if "." in z_deg:
stellen_z = len(z_deg.split(".", 1)[1])
stellen_sd = 0
if "." in sd_m:
stellen_sd = len(sd_m.split(".", 1)[1])
# Umrechnung Grad -> gon
hz_gon_decimal = Decimal(hz_deg) * (Decimal(10) / Decimal(9))
z_gon_decimal = Decimal(z_deg) * (Decimal(10) / Decimal(9))
q_hz = Decimal("1") if stellen_hz == 0 else Decimal("1." + ("0" * stellen_hz))
q_z = Decimal("1") if stellen_z == 0 else Decimal("1." + ("0" * stellen_z))
q_sd = Decimal("1") if stellen_sd == 0 else Decimal("1." + ("0" * stellen_sd))
hz_gon_decimal = hz_gon_decimal.quantize(q_hz, rounding=ROUND_HALF_UP)
z_gon_decimal = z_gon_decimal.quantize(q_z, rounding=ROUND_HALF_UP)
sd_decimal = Decimal(sd_m).quantize(q_sd, rounding=ROUND_HALF_UP)
# Ausgabe mit Komma
hz_gon_text = format(hz_gon_decimal, "f").replace(".", ",")
z_gon_text = format(z_gon_decimal, "f").replace(".", ",")
sd_text = format(sd_decimal, "f").replace(".", ",")
zh = dict_targetID_zu_ZH.get(target_id, "")
dict_stationID_zu_seq[station_id].append({
"target": zielpunkt_name,
"hz_gon": hz_gon_text,
"z_gon": z_gon_text,
"sd_m": sd_text,
"zh": zh
})
station_id_aktuell = None
index_seq_aktuell = 0
standpunkt_aktuell = None
# CSV-Datei zeilenweise durchgehen und ggf. Werte ersetzen
if Import_fortsetzen:
with (open(pfad_datei_csv, newline="", encoding="utf-8") as fin,
open(pfad_datei_csv_out, "w", newline="", encoding="utf-8") as fout):
reader = csv.reader(fin, delimiter=";")
writer = csv.writer(fout, delimiter=";", lineterminator="\n")
for i, row in enumerate(reader):
nummer_zeile = i + 1
if len(row) < 4:
row = row + [""] * (4 - len(row))
if row[0].strip() != "" and row[1].strip() == "" and row[2].strip() == "" and row[3].strip() == "":
standpunkt = row[0].strip()
if standpunkt in dict_stationnamen:
zaehler = dict_stationname_zaehler.get(standpunkt, 0)
liste_records = dict_stationname_stationrecords[standpunkt]
if zaehler >= len(liste_records):
Import_fortsetzen = False
print(
f"Der Vorgang wurde abgebrochen: Standpunkt {standpunkt} kommt in der CSV öfter vor als in der JXL.")
break
station_id, ih = liste_records[zaehler]
dict_stationname_zaehler[standpunkt] = zaehler + 1
station_id_aktuell = station_id
index_seq_aktuell = 0
standpunkt_aktuell = standpunkt
if ih is None or str(ih).strip() == "":
liste_zeilen_ohne_IH.append((nummer_zeile, standpunkt))
writer.writerow([standpunkt, f"IH:{ih}", "", "", ""])
continue
if standpunkt.isdigit():
liste_zeilen_standpunkt_nicht_in_jxl.append((nummer_zeile, standpunkt))
writer.writerow(row)
continue
ist_beobachtung = False
if row[0].strip() != "" and row[1].strip() != "" and row[2].strip() != "" and row[3].strip() != "":
wert_hz = row[1].split(":ZH:", 1)[0].strip()
wert_z = row[2].split(":ZH:", 1)[0].strip()
wert_sd = row[3].split(":ZH:", 1)[0].strip()
if self.ist_zahl_csv(wert_hz) and self.ist_zahl_csv(wert_z) and self.ist_zahl_csv(wert_sd):
ist_beobachtung = True
if ist_beobachtung and station_id_aktuell is not None:
zielpunkt = row[0].strip()
hz_csv = row[1].strip()
z_csv = row[2].strip()
sd_csv = row[3].strip()
liste_seq = dict_stationID_zu_seq.get(station_id_aktuell, [])
if liste_seq == []:
writer.writerow(row)
continue
if index_seq_aktuell >= len(liste_seq):
writer.writerow(row)
continue
jxl_eintrag = liste_seq[index_seq_aktuell]
index_neu = index_seq_aktuell + 1
if jxl_eintrag["target"] != zielpunkt:
index_ende = min(len(liste_seq), index_seq_aktuell + 200)
liste_kandidaten = []
for index_kandidat in range(index_seq_aktuell, index_ende):
if liste_seq[index_kandidat]["target"] == zielpunkt:
liste_kandidaten.append((index_kandidat, liste_seq[index_kandidat]))
if liste_kandidaten != []:
if len(liste_kandidaten) == 1:
index_kandidat, kandidat = liste_kandidaten[0]
jxl_eintrag = kandidat
index_neu = index_kandidat + 1
else:
liste_bewertung = []
for index_kandidat, kandidat in liste_kandidaten:
score = 0
if self.ist_rundung_von_jxl(hz_csv, kandidat["hz_gon"]):
score += 1
if self.ist_rundung_von_jxl(z_csv, kandidat["z_gon"]):
score += 1
if self.ist_rundung_von_jxl(sd_csv, kandidat["sd_m"]):
score += 1
liste_bewertung.append((score, index_kandidat, kandidat))
liste_bewertung.sort(key=lambda t: (-t[0], t[1]))
_, index_best, kandidat_best = liste_bewertung[0]
jxl_eintrag = kandidat_best
index_neu = index_best + 1
index_seq_aktuell = index_neu
# Nur ersetzen, wenn die CSV-Werte tatsächlich eine Rundung der JXL-Werte sind
hz_out = hz_csv
z_out = z_csv
sd_out = sd_csv
if self.ist_rundung_von_jxl(hz_csv, jxl_eintrag["hz_gon"]):
hz_out = jxl_eintrag["hz_gon"]
dict_ersetzungen["Hz"] += 1
if self.ist_rundung_von_jxl(z_csv, jxl_eintrag["z_gon"]):
z_out = jxl_eintrag["z_gon"]
dict_ersetzungen["Z"] += 1
if self.ist_rundung_von_jxl(sd_csv, jxl_eintrag["sd_m"]):
sd_out = jxl_eintrag["sd_m"]
dict_ersetzungen["SD"] += 1
zh = jxl_eintrag.get("zh", "")
if zh is None or str(zh).strip() == "":
liste_zeilen_ohne_ZH.append((nummer_zeile, standpunkt_aktuell, zielpunkt))
spalte_letzte = f"{sd_out}:ZH:{zh}" if str(zh).strip() != "" else sd_out
writer.writerow([zielpunkt, hz_out, z_out, spalte_letzte])
continue
writer.writerow(row)
if Import_fortsetzen:
print(f"Korrektur erfolgreich abgeschlossen. Ausgabe: {pfad_datei_csv_out}")
print(f"Ersetzungen (Rundung -> JXL volle Nachkommastellen): {dict_ersetzungen}")
print("\n--- Fehlende IH ---")
print(f"Anzahl: {len(liste_zeilen_ohne_IH)}")
if len(liste_zeilen_ohne_IH) > 0:
print(liste_zeilen_ohne_IH)
print("\n--- Fehlende ZH ---")
print(f"Anzahl: {len(liste_zeilen_ohne_ZH)}")
if len(liste_zeilen_ohne_ZH) > 0:
print(liste_zeilen_ohne_ZH)
print("\n--- Standpunkt in CSV, aber kein StationRecord in JXL ---")
print(f"Anzahl: {len(liste_zeilen_standpunkt_nicht_in_jxl)}")
if len(liste_zeilen_standpunkt_nicht_in_jxl) > 0:
print(liste_zeilen_standpunkt_nicht_in_jxl)
else:
print("Die Korrektur wurde abgebrochen.")
return {
"Import_fortsetzen": Import_fortsetzen,
"dict_ersetzungen": dict_ersetzungen,
"liste_zeilen_ohne_IH": liste_zeilen_ohne_IH,
"liste_zeilen_ohne_ZH": liste_zeilen_ohne_ZH,
"liste_zeilen_standpunkt_nicht_in_jxl": liste_zeilen_standpunkt_nicht_in_jxl,
"pfad_datei_csv_out": pfad_datei_csv_out
}
def import_beobachtungen_tachymeter(self, pfad_datei: str, instrumentenID: int, a: float, b: float) -> None:
berechnungen = Berechnungen.Berechnungen(a, b)
# Prüfen, ob Bereits Daten aus der Datei in der Datenbank vorhanden sind
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
@@ -102,10 +490,16 @@ class Import:
if i < 3:
continue
zeile = zeile.strip().split(";")
if zeile[1] == "" and zeile[2] == "" and zeile[3] == "":
if len(zeile) < 5:
zeile = zeile + [""] * (5 - len(zeile))
if zeile[2] == "" and zeile[3] == "" and zeile[4] == "":
nummer_beobachtungsgruppeID += 1
# print("Standpunkt: ",nummer_beobachtungsgruppeID ,zeile[0])
standpunkt = zeile[0]
instrumentenhoehe = zeile[1]
if instrumentenhoehe.startswith("IH:"):
instrumentenhoehe = instrumentenhoehe.split("IH:", 1)[1].strip()
if nummer_zielpunkt % 6 != 0:
liste_fehlerhafte_zeile.append(i)
@@ -116,24 +510,30 @@ class Import:
liste_zielpunkte_vs3 = []
else:
nummer_zielpunkt += 1
if ":ZH:" in zeile[3]:
teil = zeile[3].split(":ZH:", 1)
zeile[3] = teil[0].strip()
zeile[4] = teil[1].strip()
if zeile[0] not in liste_zielpunkte_hs:
liste_zielpunkte_hs.append(zeile[0])
if zeile[0] in liste_zielpunkte_vs3:
# print(f"{nummer_zielpunkt} VS3 HS1 {zeile}")
liste_beobachtungen_vorbereitung.append(
[nummer_beobachtungsgruppeID, "VS3", "HS1", standpunkt, zeile[0], zeile[1],
zeile[2], zeile[3]])
zeile[2], zeile[3], zeile[4], instrumentenhoehe])
elif zeile[0] in liste_zielpunkte_vs2:
# print(f"{nummer_zielpunkt} VS2 HS1 {zeile}")
liste_beobachtungen_vorbereitung.append(
[nummer_beobachtungsgruppeID, "VS2", "HS1", standpunkt, zeile[0], zeile[1],
zeile[2], zeile[3]])
zeile[2], zeile[3], zeile[4], instrumentenhoehe])
else:
# print(f"{nummer_zielpunkt} VS1 HS1 {zeile}")
liste_beobachtungen_vorbereitung.append(
[nummer_beobachtungsgruppeID, "VS1", "HS1", standpunkt, zeile[0], zeile[1],
zeile[2],
zeile[3]])
zeile[3], zeile[4], instrumentenhoehe])
else:
liste_zielpunkte_hs.remove(zeile[0])
@@ -142,7 +542,7 @@ class Import:
liste_beobachtungen_vorbereitung.append(
[nummer_beobachtungsgruppeID, "VS3", "HS2", standpunkt, zeile[0], zeile[1],
zeile[2],
zeile[3]])
zeile[3], zeile[4], instrumentenhoehe])
elif zeile[0] in liste_zielpunkte_vs2:
if zeile[0] not in liste_zielpunkte_vs3:
@@ -151,7 +551,7 @@ class Import:
liste_beobachtungen_vorbereitung.append(
[nummer_beobachtungsgruppeID, "VS2", "HS2", standpunkt, zeile[0], zeile[1],
zeile[2],
zeile[3]])
zeile[3], zeile[4], instrumentenhoehe])
else:
if zeile[0] not in liste_zielpunkte_vs2:
liste_zielpunkte_vs2.append(zeile[0])
@@ -159,7 +559,7 @@ class Import:
liste_beobachtungen_vorbereitung.append(
[nummer_beobachtungsgruppeID, "VS1", "HS2", standpunkt, zeile[0], zeile[1],
zeile[2],
zeile[3]])
zeile[3], zeile[4], instrumentenhoehe])
if liste_fehlerhafte_zeile == []:
# print(f"Einlesen der Datei {pfad_datei} erfolgreich beendet.")
@@ -200,11 +600,28 @@ class Import:
richtung_vollsatz_gon = (richtung1 + richtung2) / 2
richtung_vollsatz_rad = Berechnungen.Einheitenumrechnung.gon_to_rad_Decimal(richtung_vollsatz_gon)
if liste_aktueller_zielpunkt[8] == liste[8]:
prismenhoehe = liste_aktueller_zielpunkt[8]
else:
Import_fortsetzen = False
print(f"Der Import wurde abgebrochen, weil für zwei Halbsätze vom Standpunkt {liste_aktueller_zielpunkt[3]} zum Zielpunkt {aktueller_zielpunkt} unterschiedliche Prismenhöhen vorliegen. Bitte in der Datei {pfad_datei} korrigieren und Import neustarten.")
if liste_aktueller_zielpunkt[9] == liste[9]:
instrumentenhoehe_import = liste_aktueller_zielpunkt[9]
else:
Import_fortsetzen = False
print(f"Der Import wurde abgebrochen, weil für zwei Halbsätze vom Standpunkt {liste_aktueller_zielpunkt[3]} zum Zielpunkt {aktueller_zielpunkt} unterschiedliche Instrumentenhöhen vorliegen. Bitte in der Datei {pfad_datei} korrigieren und Import neustarten.")
# print(richtung_vollsatz)
# print(zenitwinkel_vollsatz)
# print(distanz_vollsatz)
schraegdistanz_bodenbezogen, zenitwinkel_bodenbezogen = berechnungen.berechne_zenitwinkel_distanz_bodenbezogen(
float(zenitwinkel_vollsatz_rad), float(distanz_vollsatz), float(instrumentenhoehe_import), float(prismenhoehe))
liste_beobachtungen_import.append(
[liste[0], liste[3], liste[4], richtung_vollsatz_rad, zenitwinkel_vollsatz_rad, distanz_vollsatz])
[liste[0], liste[3], liste[4], richtung_vollsatz_rad, zenitwinkel_vollsatz_rad, distanz_vollsatz, zenitwinkel_bodenbezogen, schraegdistanz_bodenbezogen, instrumentenhoehe_import, prismenhoehe])
del liste_beobachtungen_vorbereitung[index]
del liste_beobachtungen_vorbereitung[0]
@@ -215,15 +632,16 @@ class Import:
print(
"Der Import wurde abgebrochen. Bitte eine gültige InstrumentenID eingeben. Bei Bedarf ist das Instrument neu anzulegen.")
if Import_fortsetzen:
con = sqlite3.connect(self.pfad_datenbank)
cursor = con.cursor()
for beobachtung_import in liste_beobachtungen_import:
cursor.execute(
"INSERT INTO Beobachtungen (punktnummer_sp, punktnummer_zp, instrumenteID, beobachtungsgruppeID, tachymeter_richtung, tachymeter_zenitwinkel, tachymeter_distanz, dateiname) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
"INSERT INTO Beobachtungen (punktnummer_sp, punktnummer_zp, instrumenteID, beobachtungsgruppeID, tachymeter_richtung, tachymeter_zenitwinkel_roh, tachymeter_distanz_roh, dateiname, tachymeter_instrumentenhoehe, tachymeter_prismenhoehe, tachymeter_zenitwinkel, tachymeter_distanz) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(beobachtung_import[1], beobachtung_import[2], instrumentenID, beobachtung_import[0],
float(beobachtung_import[3]), float(beobachtung_import[4]), float(beobachtung_import[5]),
pfad_datei))
pfad_datei, float(beobachtung_import[8]), float(beobachtung_import[9]), float(beobachtung_import[6]), float(beobachtung_import[7])))
con.commit()
cursor.close()
con.close()