import csv import sqlite3 from decimal import Decimal import Berechnungen class Import: def __init__(self, pfad_datenbank): self.pfad_datenbank = pfad_datenbank pass def string_to_float(self, zahl): zahl = zahl.replace(',', '.') return float(zahl) def string_to_decimal(self, zahl): zahl = zahl.replace(',', '.') return Decimal(zahl) def import_koordinaten_lh_tachymeter(self, pfad_datei): liste_punktnummern = [] liste_punktnummern_vorher = [] liste_punktnummern_vorher_db = [] Import_abbrechen = False with open (pfad_datei, newline='', encoding='utf-8') as csvfile: con = sqlite3.connect(self.pfad_datenbank) cursor = con.cursor() liste_punktnummern_db = [r[0] for r in cursor.execute("SELECT DISTINCT punktnummer FROM Netzpunkte").fetchall()] cursor.close() con.close() r = csv.reader(csvfile, delimiter=';') for row in r: liste_punktnummern.append(row[0]) if row[0] in liste_punktnummern_vorher: Import_abbrechen = True print(f"Der Import wurde abgebrochen, weil in der Datei {pfad_datei} Punktnummern doppelt vorhanden sind. Bitte in der Datei ändern und Import wiederholen.") break liste_punktnummern_vorher.append(row[0]) if row[0] in liste_punktnummern_db: Import_abbrechen = True print(f"Der Import wurde abgebrochen, weil mindestens ein Teil der Punktnummern aus der Datei {pfad_datei} bereits in der Datenbank vorhanden ist. Bitte in der Datei ändern und Import wiederholen.") break liste_punktnummern_vorher_db.append(row[0]) if Import_abbrechen == False: con = sqlite3.connect(self.pfad_datenbank) cursor = con.cursor() with open(pfad_datei, newline='', encoding='utf-8') as csvfile: r = csv.reader(csvfile, delimiter=';') for row in r: cursor.execute( "INSERT INTO Netzpunkte (punktnummer, naeherungx_lh, naeherungy_lh, naeherungz_lh) VALUES (?, ?, ?, ?)", (row[0], self.string_to_float(row[1]), self.string_to_float(row[2]), self.string_to_float(row[3]))) con.commit() cursor.close() con.close() print("Der Import der Näherungskoordinaten wurde erfolgreich abgeschlossen") def import_beobachtungen_tachymeter(self, pfad_datei, instrumentenID): # Prüfen, ob Bereits Daten aus der Datei in der Datenbank vorhanden sind con = sqlite3.connect(self.pfad_datenbank) cursor = con.cursor() liste_dateinamen_in_db = [r[0] for r in cursor.execute( "SELECT DISTINCT dateiname FROM Beobachtungen" ).fetchall()] liste_beobachtungsgruppeID = [r[0] for r in cursor.execute("""SELECT DISTINCT beobachtungsgruppeID FROM Beobachtungen""").fetchall()] liste_instrumentenid = [r[0] for r in cursor.execute("SELECT instrumenteID FROM Instrumente").fetchall()] con.close() cursor.close Import_fortsetzen = True if pfad_datei in liste_dateinamen_in_db: Import_fortsetzen = False if Import_fortsetzen: nummer_zielpunkt = 0 try: nummer_beobachtungsgruppeID = max(liste_beobachtungsgruppeID) except: nummer_beobachtungsgruppeID = 0 with (open(pfad_datei, "r", encoding="utf-8") as f): liste_fehlerhafte_zeile = [] liste_beobachtungen_vorbereitung = [] for i, zeile in enumerate(f): if i < 3: continue zeile = zeile.strip().split(";") if zeile[1] == "" and zeile[2] == "" and zeile[3] == "": nummer_beobachtungsgruppeID += 1 # print("Standpunkt: ",nummer_beobachtungsgruppeID ,zeile[0]) standpunkt = zeile[0] if nummer_zielpunkt % 6 != 0: liste_fehlerhafte_zeile.append(i) nummer_zielpunkt = 0 liste_zielpunkte_hs = [] liste_zielpunkte_vs2 = [] liste_zielpunkte_vs3 = [] else: nummer_zielpunkt += 1 if zeile[0] not in liste_zielpunkte_hs: liste_zielpunkte_hs.append(zeile[0]) if zeile[0] in liste_zielpunkte_vs3: # print(f"{nummer_zielpunkt} VS3 HS1 {zeile}") liste_beobachtungen_vorbereitung.append( [nummer_beobachtungsgruppeID, "VS3", "HS1", standpunkt, zeile[0], zeile[1], zeile[2], zeile[3]]) elif zeile[0] in liste_zielpunkte_vs2: # print(f"{nummer_zielpunkt} VS2 HS1 {zeile}") liste_beobachtungen_vorbereitung.append( [nummer_beobachtungsgruppeID, "VS2", "HS1", standpunkt, zeile[0], zeile[1], zeile[2], zeile[3]]) else: # print(f"{nummer_zielpunkt} VS1 HS1 {zeile}") liste_beobachtungen_vorbereitung.append( [nummer_beobachtungsgruppeID, "VS1", "HS1", standpunkt, zeile[0], zeile[1], zeile[2], zeile[3]]) else: liste_zielpunkte_hs.remove(zeile[0]) if zeile[0] in liste_zielpunkte_vs3: # print(f"{nummer_zielpunkt} VS3 HS2 {zeile}") liste_beobachtungen_vorbereitung.append( [nummer_beobachtungsgruppeID, "VS3", "HS2", standpunkt, zeile[0], zeile[1], zeile[2], zeile[3]]) elif zeile[0] in liste_zielpunkte_vs2: if zeile[0] not in liste_zielpunkte_vs3: liste_zielpunkte_vs3.append(zeile[0]) # print(f"{nummer_zielpunkt} VS2 HS2 {zeile}") liste_beobachtungen_vorbereitung.append( [nummer_beobachtungsgruppeID, "VS2", "HS2", standpunkt, zeile[0], zeile[1], zeile[2], zeile[3]]) else: if zeile[0] not in liste_zielpunkte_vs2: liste_zielpunkte_vs2.append(zeile[0]) # print(f"{nummer_zielpunkt} VS1 HS2 {zeile}") liste_beobachtungen_vorbereitung.append( [nummer_beobachtungsgruppeID, "VS1", "HS2", standpunkt, zeile[0], zeile[1], zeile[2], zeile[3]]) if liste_fehlerhafte_zeile == []: # print(f"Einlesen der Datei {pfad_datei} erfolgreich beendet.") pass else: print( f"Das Einlesen der Datei {pfad_datei} wurde abgebrochen.\nBitte bearbeiten Sie die Zeilen rund um: {", ".join(map(str, liste_fehlerhafte_zeile))} in der csv-Datei und wiederholen Sie den Import.") Import_fortsetzen = False else: print( f"Der Import wurde abgebrochen, weil die Beobachtungen aus der Datei {pfad_datei} bereits in der Datenbank vorhanden sind.") if Import_fortsetzen: liste_beobachtungen_import = [] while len(liste_beobachtungen_vorbereitung) > 0: liste_aktueller_zielpunkt = liste_beobachtungen_vorbereitung[0] aktueller_zielpunkt = liste_aktueller_zielpunkt[4] # print(liste_beobachtungen_vorbereitung[0]) for index in range(1, len(liste_beobachtungen_vorbereitung)): liste = liste_beobachtungen_vorbereitung[index] if liste[4] == aktueller_zielpunkt: # print(liste) richtung1 = self.string_to_decimal(liste_aktueller_zielpunkt[5]) richtung2 = self.string_to_decimal(liste[5]) - Decimal(200) zenitwinkel_vollsatz_gon = (self.string_to_decimal(liste_aktueller_zielpunkt[6]) - self.string_to_decimal( liste[6]) + 400) / 2 zenitwinkel_vollsatz_rad = Berechnungen.Einheitenumrechnung.gon_to_rad_Decimal(zenitwinkel_vollsatz_gon) distanz_vollsatz = (self.string_to_decimal(liste_aktueller_zielpunkt[7]) + self.string_to_decimal( liste[7])) / 2 if richtung2 < 0: richtung2 += Decimal(400) elif richtung2 > 400: richtung2 -= Decimal(400) richtung_vollsatz_gon = (richtung1 + richtung2) / 2 richtung_vollsatz_rad = Berechnungen.Einheitenumrechnung.gon_to_rad_Decimal(richtung_vollsatz_gon) # print(richtung_vollsatz) # print(zenitwinkel_vollsatz) # print(distanz_vollsatz) liste_beobachtungen_import.append( [liste[0], liste[3], liste[4], richtung_vollsatz_rad, zenitwinkel_vollsatz_rad, distanz_vollsatz]) del liste_beobachtungen_vorbereitung[index] del liste_beobachtungen_vorbereitung[0] break if instrumentenID not in liste_instrumentenid: Import_fortsetzen = False print( "Der Import wurde abgebrochen. Bitte eine gültige InstrumentenID eingeben. Bei Bedarf ist das Instrument neu anzulegen.") if Import_fortsetzen: con = sqlite3.connect(self.pfad_datenbank) cursor = con.cursor() for beobachtung_import in liste_beobachtungen_import: cursor.execute( "INSERT INTO Beobachtungen (punktnummer_sp, punktnummer_zp, instrumenteID, beobachtungsgruppeID, tachymeter_richtung, tachymeter_zenitwinkel, tachymeter_distanz, dateiname) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", (beobachtung_import[1], beobachtung_import[2], instrumentenID, beobachtung_import[0], float(beobachtung_import[3]), float(beobachtung_import[4]), float(beobachtung_import[5]), pfad_datei)) con.commit() cursor.close() con.close() print(f"Der Import der Datei {pfad_datei} wurde erfolgreich abgeschlossen.")