226 lines
11 KiB
Python
226 lines
11 KiB
Python
import csv
|
|
import sqlite3
|
|
from decimal import Decimal
|
|
|
|
import Berechnungen
|
|
|
|
|
|
class Import:
|
|
def __init__(self, pfad_datenbank):
|
|
self.pfad_datenbank = pfad_datenbank
|
|
pass
|
|
|
|
def string_to_float(self, zahl):
|
|
zahl = zahl.replace(',', '.')
|
|
return float(zahl)
|
|
|
|
def string_to_decimal(self, zahl):
|
|
zahl = zahl.replace(',', '.')
|
|
return Decimal(zahl)
|
|
|
|
def import_koordinaten_lh_tachymeter(self, pfad_datei):
|
|
liste_punktnummern = []
|
|
liste_punktnummern_vorher = []
|
|
liste_punktnummern_vorher_db = []
|
|
Import_abbrechen = False
|
|
|
|
|
|
with open (pfad_datei, newline='', encoding='utf-8') as csvfile:
|
|
con = sqlite3.connect(self.pfad_datenbank)
|
|
cursor = con.cursor()
|
|
liste_punktnummern_db = [r[0] for r in cursor.execute("SELECT DISTINCT punktnummer FROM Netzpunkte").fetchall()]
|
|
cursor.close()
|
|
con.close()
|
|
|
|
r = csv.reader(csvfile, delimiter=';')
|
|
for row in r:
|
|
liste_punktnummern.append(row[0])
|
|
if row[0] in liste_punktnummern_vorher:
|
|
Import_abbrechen = True
|
|
print(f"Der Import wurde abgebrochen, weil in der Datei {pfad_datei} Punktnummern doppelt vorhanden sind. Bitte in der Datei ändern und Import wiederholen.")
|
|
break
|
|
|
|
liste_punktnummern_vorher.append(row[0])
|
|
|
|
if row[0] in liste_punktnummern_db:
|
|
Import_abbrechen = True
|
|
print(f"Der Import wurde abgebrochen, weil mindestens ein Teil der Punktnummern aus der Datei {pfad_datei} bereits in der Datenbank vorhanden ist. Bitte in der Datei ändern und Import wiederholen.")
|
|
break
|
|
liste_punktnummern_vorher_db.append(row[0])
|
|
|
|
if Import_abbrechen == False:
|
|
con = sqlite3.connect(self.pfad_datenbank)
|
|
cursor = con.cursor()
|
|
|
|
with open(pfad_datei, newline='', encoding='utf-8') as csvfile:
|
|
r = csv.reader(csvfile, delimiter=';')
|
|
for row in r:
|
|
cursor.execute(
|
|
"INSERT INTO Netzpunkte (punktnummer, naeherungx_lh, naeherungy_lh, naeherungz_lh) VALUES (?, ?, ?, ?)",
|
|
(row[0], self.string_to_float(row[1]), self.string_to_float(row[2]), self.string_to_float(row[3])))
|
|
|
|
con.commit()
|
|
cursor.close()
|
|
con.close()
|
|
print("Der Import der Näherungskoordinaten wurde erfolgreich abgeschlossen")
|
|
|
|
def import_beobachtungen_tachymeter(self, pfad_datei, instrumentenID):
|
|
# Prüfen, ob Bereits Daten aus der Datei in der Datenbank vorhanden sind
|
|
con = sqlite3.connect(self.pfad_datenbank)
|
|
cursor = con.cursor()
|
|
liste_dateinamen_in_db = [r[0] for r in cursor.execute(
|
|
"SELECT DISTINCT dateiname FROM Beobachtungen"
|
|
).fetchall()]
|
|
liste_beobachtungsgruppeID = [r[0] for r in cursor.execute("""SELECT DISTINCT beobachtungsgruppeID
|
|
FROM Beobachtungen""").fetchall()]
|
|
liste_instrumentenid = [r[0] for r in cursor.execute("SELECT instrumenteID FROM Instrumente").fetchall()]
|
|
|
|
con.close()
|
|
cursor.close
|
|
|
|
Import_fortsetzen = True
|
|
|
|
if pfad_datei in liste_dateinamen_in_db:
|
|
Import_fortsetzen = False
|
|
|
|
if Import_fortsetzen:
|
|
nummer_zielpunkt = 0
|
|
try:
|
|
nummer_beobachtungsgruppeID = max(liste_beobachtungsgruppeID)
|
|
except:
|
|
nummer_beobachtungsgruppeID = 0
|
|
|
|
with (open(pfad_datei, "r", encoding="utf-8") as f):
|
|
liste_fehlerhafte_zeile = []
|
|
liste_beobachtungen_vorbereitung = []
|
|
|
|
for i, zeile in enumerate(f):
|
|
if i < 3:
|
|
continue
|
|
zeile = zeile.strip().split(";")
|
|
if zeile[1] == "" and zeile[2] == "" and zeile[3] == "":
|
|
nummer_beobachtungsgruppeID += 1
|
|
# print("Standpunkt: ",nummer_beobachtungsgruppeID ,zeile[0])
|
|
standpunkt = zeile[0]
|
|
|
|
if nummer_zielpunkt % 6 != 0:
|
|
liste_fehlerhafte_zeile.append(i)
|
|
|
|
nummer_zielpunkt = 0
|
|
liste_zielpunkte_hs = []
|
|
liste_zielpunkte_vs2 = []
|
|
liste_zielpunkte_vs3 = []
|
|
else:
|
|
nummer_zielpunkt += 1
|
|
if zeile[0] not in liste_zielpunkte_hs:
|
|
liste_zielpunkte_hs.append(zeile[0])
|
|
if zeile[0] in liste_zielpunkte_vs3:
|
|
# print(f"{nummer_zielpunkt} VS3 HS1 {zeile}")
|
|
liste_beobachtungen_vorbereitung.append(
|
|
[nummer_beobachtungsgruppeID, "VS3", "HS1", standpunkt, zeile[0], zeile[1],
|
|
zeile[2], zeile[3]])
|
|
elif zeile[0] in liste_zielpunkte_vs2:
|
|
# print(f"{nummer_zielpunkt} VS2 HS1 {zeile}")
|
|
liste_beobachtungen_vorbereitung.append(
|
|
[nummer_beobachtungsgruppeID, "VS2", "HS1", standpunkt, zeile[0], zeile[1],
|
|
zeile[2], zeile[3]])
|
|
else:
|
|
# print(f"{nummer_zielpunkt} VS1 HS1 {zeile}")
|
|
liste_beobachtungen_vorbereitung.append(
|
|
[nummer_beobachtungsgruppeID, "VS1", "HS1", standpunkt, zeile[0], zeile[1],
|
|
zeile[2],
|
|
zeile[3]])
|
|
|
|
else:
|
|
liste_zielpunkte_hs.remove(zeile[0])
|
|
if zeile[0] in liste_zielpunkte_vs3:
|
|
# print(f"{nummer_zielpunkt} VS3 HS2 {zeile}")
|
|
liste_beobachtungen_vorbereitung.append(
|
|
[nummer_beobachtungsgruppeID, "VS3", "HS2", standpunkt, zeile[0], zeile[1],
|
|
zeile[2],
|
|
zeile[3]])
|
|
|
|
elif zeile[0] in liste_zielpunkte_vs2:
|
|
if zeile[0] not in liste_zielpunkte_vs3:
|
|
liste_zielpunkte_vs3.append(zeile[0])
|
|
# print(f"{nummer_zielpunkt} VS2 HS2 {zeile}")
|
|
liste_beobachtungen_vorbereitung.append(
|
|
[nummer_beobachtungsgruppeID, "VS2", "HS2", standpunkt, zeile[0], zeile[1],
|
|
zeile[2],
|
|
zeile[3]])
|
|
else:
|
|
if zeile[0] not in liste_zielpunkte_vs2:
|
|
liste_zielpunkte_vs2.append(zeile[0])
|
|
# print(f"{nummer_zielpunkt} VS1 HS2 {zeile}")
|
|
liste_beobachtungen_vorbereitung.append(
|
|
[nummer_beobachtungsgruppeID, "VS1", "HS2", standpunkt, zeile[0], zeile[1],
|
|
zeile[2],
|
|
zeile[3]])
|
|
|
|
if liste_fehlerhafte_zeile == []:
|
|
# print(f"Einlesen der Datei {pfad_datei} erfolgreich beendet.")
|
|
pass
|
|
else:
|
|
print(
|
|
f"Das Einlesen der Datei {pfad_datei} wurde abgebrochen.\nBitte bearbeiten Sie die Zeilen rund um: {", ".join(map(str, liste_fehlerhafte_zeile))} in der csv-Datei und wiederholen Sie den Import.")
|
|
Import_fortsetzen = False
|
|
|
|
else:
|
|
print(
|
|
f"Der Import wurde abgebrochen, weil die Beobachtungen aus der Datei {pfad_datei} bereits in der Datenbank vorhanden sind.")
|
|
|
|
if Import_fortsetzen:
|
|
liste_beobachtungen_import = []
|
|
|
|
while len(liste_beobachtungen_vorbereitung) > 0:
|
|
liste_aktueller_zielpunkt = liste_beobachtungen_vorbereitung[0]
|
|
aktueller_zielpunkt = liste_aktueller_zielpunkt[4]
|
|
# print(liste_beobachtungen_vorbereitung[0])
|
|
|
|
for index in range(1, len(liste_beobachtungen_vorbereitung)):
|
|
liste = liste_beobachtungen_vorbereitung[index]
|
|
|
|
if liste[4] == aktueller_zielpunkt:
|
|
# print(liste)
|
|
richtung1 = self.string_to_decimal(liste_aktueller_zielpunkt[5])
|
|
richtung2 = self.string_to_decimal(liste[5]) - Decimal(200)
|
|
zenitwinkel_vollsatz_gon = (self.string_to_decimal(liste_aktueller_zielpunkt[6]) - self.string_to_decimal(
|
|
liste[6]) + 400) / 2
|
|
zenitwinkel_vollsatz_rad = Berechnungen.Einheitenumrechnung.gon_to_rad_Decimal(zenitwinkel_vollsatz_gon)
|
|
distanz_vollsatz = (self.string_to_decimal(liste_aktueller_zielpunkt[7]) + self.string_to_decimal(
|
|
liste[7])) / 2
|
|
if richtung2 < 0:
|
|
richtung2 += Decimal(400)
|
|
elif richtung2 > 400:
|
|
richtung2 -= Decimal(400)
|
|
richtung_vollsatz_gon = (richtung1 + richtung2) / 2
|
|
richtung_vollsatz_rad = Berechnungen.Einheitenumrechnung.gon_to_rad_Decimal(richtung_vollsatz_gon)
|
|
|
|
# print(richtung_vollsatz)
|
|
# print(zenitwinkel_vollsatz)
|
|
# print(distanz_vollsatz)
|
|
liste_beobachtungen_import.append(
|
|
[liste[0], liste[3], liste[4], richtung_vollsatz_rad, zenitwinkel_vollsatz_rad, distanz_vollsatz])
|
|
|
|
del liste_beobachtungen_vorbereitung[index]
|
|
del liste_beobachtungen_vorbereitung[0]
|
|
break
|
|
|
|
if instrumentenID not in liste_instrumentenid:
|
|
Import_fortsetzen = False
|
|
print(
|
|
"Der Import wurde abgebrochen. Bitte eine gültige InstrumentenID eingeben. Bei Bedarf ist das Instrument neu anzulegen.")
|
|
|
|
if Import_fortsetzen:
|
|
con = sqlite3.connect(self.pfad_datenbank)
|
|
cursor = con.cursor()
|
|
for beobachtung_import in liste_beobachtungen_import:
|
|
cursor.execute(
|
|
"INSERT INTO Beobachtungen (punktnummer_sp, punktnummer_zp, instrumenteID, beobachtungsgruppeID, tachymeter_richtung, tachymeter_zenitwinkel, tachymeter_distanz, dateiname) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(beobachtung_import[1], beobachtung_import[2], instrumentenID, beobachtung_import[0],
|
|
float(beobachtung_import[3]), float(beobachtung_import[4]), float(beobachtung_import[5]),
|
|
pfad_datei))
|
|
con.commit()
|
|
cursor.close()
|
|
con.close()
|
|
print(f"Der Import der Datei {pfad_datei} wurde erfolgreich abgeschlossen.") |