Push
This commit is contained in:
@@ -1,6 +1,361 @@
|
||||
i = 5
|
||||
import csv
|
||||
import xml.etree.ElementTree as ET
|
||||
from decimal import Decimal, getcontext, ROUND_HALF_UP
|
||||
|
||||
if i % 6 != 0:
|
||||
print("nein")
|
||||
else:
|
||||
print("ja")
|
||||
# ========= Einstellungen =========
|
||||
JXL_IN = r"C:\Users\fabia\Desktop\Masterprojekt_V3\Daten\campusnetz_bereinigt_plus_nachmessung.jxl"
|
||||
CSV_IN = r"C:\Users\fabia\Desktop\Masterprojekt_V3\Daten\campsnetz_beobachtungen_plus_nachmessungen.csv"
|
||||
CSV_OUT = r"C:\Users\fabia\Desktop\Masterprojekt_V3\Daten\campsnetz_beobachtungen_plus_nachmessungen_korrigiert.csv"
|
||||
|
||||
getcontext().prec = 70
|
||||
|
||||
|
||||
# ========= Hilfsfunktionen =========
|
||||
def count_decimals(s: str, sep: str) -> int:
|
||||
if s is None:
|
||||
return 0
|
||||
s = s.strip()
|
||||
if s == "":
|
||||
return 0
|
||||
if ":ZH:" in s:
|
||||
s = s.split(":ZH:", 1)[0].strip()
|
||||
if sep in s:
|
||||
return len(s.split(sep, 1)[1])
|
||||
return 0
|
||||
|
||||
|
||||
def fmt_decimal_fixed(x: Decimal, decimals: int, sep: str) -> str:
|
||||
q = Decimal("1") if decimals == 0 else Decimal("1." + ("0" * decimals))
|
||||
y = x.quantize(q, rounding=ROUND_HALF_UP)
|
||||
txt = format(y, "f")
|
||||
if sep != ".":
|
||||
txt = txt.replace(".", sep)
|
||||
if decimals == 0:
|
||||
txt = txt.split(sep)[0]
|
||||
return txt
|
||||
|
||||
|
||||
def parse_decimal_csv(s: str) -> Decimal:
|
||||
"""
|
||||
CSV-Zahlen mit Komma, evtl. mit ":ZH:..." im letzten Feld.
|
||||
"""
|
||||
s = (s or "").strip()
|
||||
if ":ZH:" in s:
|
||||
s = s.split(":ZH:", 1)[0].strip()
|
||||
s = s.replace(",", ".")
|
||||
return Decimal(s)
|
||||
|
||||
|
||||
def parse_decimal_comma(s: str) -> Decimal:
|
||||
"""
|
||||
Komma-String nach Decimal.
|
||||
"""
|
||||
return Decimal((s or "").strip().replace(",", "."))
|
||||
|
||||
|
||||
def deg_to_gon_str(deg_str: str) -> str:
|
||||
"""
|
||||
JXL: Winkel in Grad (Dezimalpunkt).
|
||||
gon = deg * (10/9)
|
||||
Ausgabe mit exakt so vielen Nachkommastellen wie im JXL-Gradwert enthalten.
|
||||
Dezimaltrennzeichen: Komma.
|
||||
"""
|
||||
deg_str = (deg_str or "").strip()
|
||||
d = count_decimals(deg_str, ".")
|
||||
deg = Decimal(deg_str)
|
||||
gon = deg * (Decimal(10) / Decimal(9))
|
||||
return fmt_decimal_fixed(gon, d, ",")
|
||||
|
||||
|
||||
def meter_str_from_jxl(m_str: str) -> str:
|
||||
"""
|
||||
JXL: Distanz in Meter (Dezimalpunkt).
|
||||
Ausgabe mit exakt so vielen Nachkommastellen wie in der JXL enthalten.
|
||||
Dezimaltrennzeichen: Komma.
|
||||
"""
|
||||
m_str = (m_str or "").strip()
|
||||
d = count_decimals(m_str, ".")
|
||||
return fmt_decimal_fixed(Decimal(m_str), d, ",")
|
||||
|
||||
|
||||
def is_obs_line(row: list[str]) -> bool:
|
||||
"""
|
||||
Beobachtungszeile: Zielpunkt nicht leer, Hz/Z/SD numerisch parsebar.
|
||||
Zielpunkt darf alphanumerisch sein (FH3 etc.).
|
||||
"""
|
||||
if len(row) < 4:
|
||||
return False
|
||||
if row[0].strip() == "" or row[1].strip() == "" or row[2].strip() == "" or row[3].strip() == "":
|
||||
return False
|
||||
try:
|
||||
_ = parse_decimal_csv(row[1])
|
||||
_ = parse_decimal_csv(row[2])
|
||||
_ = parse_decimal_csv(row[3])
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def is_station_candidate(row: list[str]) -> bool:
|
||||
"""
|
||||
Kandidat für Standpunkt: erste Spalte nicht leer, Messspalten leer.
|
||||
Ob es wirklich ein Standpunkt ist, entscheiden wir später über StationName-Menge.
|
||||
"""
|
||||
if len(row) < 4:
|
||||
return False
|
||||
return (
|
||||
row[0].strip() != ""
|
||||
and row[1].strip() == ""
|
||||
and row[2].strip() == ""
|
||||
and row[3].strip() == ""
|
||||
)
|
||||
|
||||
|
||||
def csv_is_rounding_of_jxl(csv_str: str, jxl_full_str: str) -> bool:
|
||||
"""
|
||||
Prüft: CSV ist gerundete Darstellung des JXL-Wertes.
|
||||
Kriterium:
|
||||
- CSV hat weniger Nachkommastellen als JXL
|
||||
- und: JXL auf CSV-Dezimalstellen gerundet == CSV-Wert (numerisch)
|
||||
"""
|
||||
dc = count_decimals(csv_str, ",")
|
||||
dj = count_decimals(jxl_full_str, ",")
|
||||
if dc >= dj:
|
||||
return False
|
||||
|
||||
try:
|
||||
csv_val = parse_decimal_csv(csv_str)
|
||||
jxl_val = parse_decimal_comma(jxl_full_str)
|
||||
|
||||
q = Decimal("1") if dc == 0 else Decimal("1." + ("0" * dc))
|
||||
jxl_rounded = jxl_val.quantize(q, rounding=ROUND_HALF_UP)
|
||||
csv_q = csv_val.quantize(q, rounding=ROUND_HALF_UP)
|
||||
|
||||
return jxl_rounded == csv_q
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
# ========= JXL einlesen =========
|
||||
tree = ET.parse(JXL_IN)
|
||||
root = tree.getroot()
|
||||
|
||||
# StationRecords: (StationName, StationID, IH)
|
||||
station_records = []
|
||||
for sr in root.iter("StationRecord"):
|
||||
sname = (sr.findtext("StationName") or "").strip()
|
||||
sid = (sr.attrib.get("ID") or "").strip()
|
||||
ih = (sr.findtext("TheodoliteHeight") or "").strip()
|
||||
if sname != "" and sid != "":
|
||||
station_records.append((sname, sid, ih))
|
||||
|
||||
station_names_set = {sname for sname, _, _ in station_records}
|
||||
|
||||
# pro StationName ggf. mehrere Aufbauten -> "nächsten unbenutzten" nehmen
|
||||
stationname_to_records = {}
|
||||
for sname, sid, ih in station_records:
|
||||
stationname_to_records.setdefault(sname, []).append((sid, ih))
|
||||
stationname_usecount = {k: 0 for k in stationname_to_records.keys()}
|
||||
|
||||
# TargetHeight je TargetRecord-ID
|
||||
target_height_by_id = {}
|
||||
for tr in root.iter("TargetRecord"):
|
||||
tid = (tr.attrib.get("ID") or "").strip()
|
||||
zh = (tr.findtext("TargetHeight") or "").strip()
|
||||
if tid != "":
|
||||
target_height_by_id[tid] = zh
|
||||
|
||||
# Pro StationID: Sequenz der PointRecords
|
||||
station_seq = {sid: [] for _, sid, _ in station_records}
|
||||
|
||||
for pr in root.iter("PointRecord"):
|
||||
stid = (pr.findtext("StationID") or "").strip()
|
||||
if stid == "" or stid not in station_seq:
|
||||
continue
|
||||
|
||||
circle = pr.find("Circle")
|
||||
if circle is None:
|
||||
continue
|
||||
|
||||
target_name = (pr.findtext("Name") or "").strip()
|
||||
target_id = (pr.findtext("TargetID") or "").strip()
|
||||
|
||||
hz_deg = (circle.findtext("HorizontalCircle") or "").strip()
|
||||
z_deg = (circle.findtext("VerticalCircle") or "").strip()
|
||||
sd_m = (circle.findtext("EDMDistance") or "").strip()
|
||||
|
||||
if target_name == "" or hz_deg == "" or z_deg == "" or sd_m == "":
|
||||
continue
|
||||
|
||||
station_seq[stid].append({
|
||||
"target": target_name,
|
||||
"hz_gon": deg_to_gon_str(hz_deg),
|
||||
"z_gon": deg_to_gon_str(z_deg),
|
||||
"sd_m": meter_str_from_jxl(sd_m),
|
||||
"zh": target_height_by_id.get(target_id, ""),
|
||||
})
|
||||
|
||||
|
||||
# ========= Matching-Funktion =========
|
||||
def pick_jxl_entry_for_obs(seq, start_ptr, zp, hz_csv, z_csv, sd_csv, search_window=200):
|
||||
"""
|
||||
Standard: nimmt seq[start_ptr]
|
||||
Wenn target nicht passt: sucht im Fenster nach passendem zp.
|
||||
Bei Mehrfachtreffern wird bevorzugt, wo gerundete Werte passen.
|
||||
"""
|
||||
if start_ptr >= len(seq):
|
||||
return None, start_ptr
|
||||
|
||||
first = seq[start_ptr]
|
||||
if first["target"] == zp:
|
||||
return first, start_ptr + 1
|
||||
|
||||
end = min(len(seq), start_ptr + search_window)
|
||||
candidates = []
|
||||
for i in range(start_ptr, end):
|
||||
if seq[i]["target"] == zp:
|
||||
candidates.append((i, seq[i]))
|
||||
|
||||
if not candidates:
|
||||
return first, start_ptr + 1
|
||||
|
||||
if len(candidates) == 1:
|
||||
i, entry = candidates[0]
|
||||
return entry, i + 1
|
||||
|
||||
good = []
|
||||
for i, entry in candidates:
|
||||
ok_hz = csv_is_rounding_of_jxl(hz_csv, entry["hz_gon"])
|
||||
ok_z = csv_is_rounding_of_jxl(z_csv, entry["z_gon"])
|
||||
ok_sd = csv_is_rounding_of_jxl(sd_csv, entry["sd_m"])
|
||||
score = int(ok_hz) + int(ok_z) + int(ok_sd)
|
||||
good.append((score, i, entry))
|
||||
|
||||
good.sort(key=lambda t: (-t[0], t[1]))
|
||||
_, i_best, entry_best = good[0]
|
||||
return entry_best, i_best + 1
|
||||
|
||||
|
||||
# ========= CSV verarbeiten =========
|
||||
repl_counts = {"Hz": 0, "Z": 0, "SD": 0}
|
||||
current_station_id = None
|
||||
current_station_ptr = 0
|
||||
|
||||
line_no = 0
|
||||
|
||||
fehlende_IH = [] # (zeilennummer, standpunkt)
|
||||
fehlende_ZH = [] # (zeilennummer, standpunkt, zielpunkt)
|
||||
fehlender_StationRecord = [] # (zeilennummer, standpunkt_text)
|
||||
|
||||
current_station_name = None
|
||||
|
||||
with open(CSV_IN, newline="", encoding="utf-8") as fin, open(CSV_OUT, "w", newline="", encoding="utf-8") as fout:
|
||||
reader = csv.reader(fin, delimiter=";")
|
||||
writer = csv.writer(fout, delimiter=";", lineterminator="\n")
|
||||
|
||||
for row in reader:
|
||||
line_no += 1
|
||||
|
||||
if len(row) < 4:
|
||||
row = row + [""] * (4 - len(row))
|
||||
|
||||
# ---- Standpunkt-Kandidat? ----
|
||||
if is_station_candidate(row):
|
||||
sp = row[0].strip()
|
||||
|
||||
# Nur als Standpunkt behandeln, wenn er wirklich in der JXL als StationName existiert:
|
||||
if sp in station_names_set:
|
||||
use = stationname_usecount.get(sp, 0)
|
||||
recs = stationname_to_records[sp]
|
||||
if use >= len(recs):
|
||||
raise RuntimeError(f"Standpunkt {sp} kommt in CSV öfter vor als in der JXL (StationRecords).")
|
||||
|
||||
sid, ih = recs[use]
|
||||
stationname_usecount[sp] = use + 1
|
||||
|
||||
current_station_name = sp
|
||||
current_station_id = sid
|
||||
current_station_ptr = 0
|
||||
|
||||
# fehlende IH loggen
|
||||
if ih is None or str(ih).strip() == "":
|
||||
fehlende_IH.append((line_no, sp))
|
||||
|
||||
writer.writerow([sp, f"IH:{ih}", "", "", ""])
|
||||
continue
|
||||
|
||||
# NICHT in JXL: wenn es wie ein Standpunkt aussieht -> loggen
|
||||
if sp.isdigit():
|
||||
fehlender_StationRecord.append((line_no, sp))
|
||||
|
||||
writer.writerow(row)
|
||||
continue
|
||||
|
||||
# ---- Beobachtung? ----
|
||||
if is_obs_line(row) and current_station_id is not None:
|
||||
zp = row[0].strip()
|
||||
hz_csv = row[1].strip()
|
||||
z_csv = row[2].strip()
|
||||
sd_csv = row[3].strip()
|
||||
|
||||
seq = station_seq.get(current_station_id, [])
|
||||
jxl_entry, new_ptr = pick_jxl_entry_for_obs(seq, current_station_ptr, zp, hz_csv, z_csv, sd_csv)
|
||||
|
||||
if jxl_entry is None:
|
||||
writer.writerow(row)
|
||||
continue
|
||||
|
||||
current_station_ptr = new_ptr
|
||||
|
||||
hz_out = hz_csv
|
||||
z_out = z_csv
|
||||
sd_out = sd_csv
|
||||
|
||||
if csv_is_rounding_of_jxl(hz_csv, jxl_entry["hz_gon"]):
|
||||
hz_out = jxl_entry["hz_gon"]
|
||||
repl_counts["Hz"] += 1
|
||||
|
||||
if csv_is_rounding_of_jxl(z_csv, jxl_entry["z_gon"]):
|
||||
z_out = jxl_entry["z_gon"]
|
||||
repl_counts["Z"] += 1
|
||||
|
||||
if csv_is_rounding_of_jxl(sd_csv, jxl_entry["sd_m"]):
|
||||
sd_out = jxl_entry["sd_m"]
|
||||
repl_counts["SD"] += 1
|
||||
|
||||
# fehlende ZH loggen
|
||||
zh_val = jxl_entry.get("zh", "")
|
||||
if zh_val is None or str(zh_val).strip() == "":
|
||||
fehlende_ZH.append((line_no, current_station_name, zp))
|
||||
|
||||
last_col = f"{sd_out}:ZH:{zh_val}" if str(zh_val).strip() != "" else sd_out
|
||||
writer.writerow([zp, hz_out, z_out, last_col])
|
||||
continue
|
||||
|
||||
# ---- alles andere unverändert ----
|
||||
writer.writerow(row)
|
||||
|
||||
print("Fertig.")
|
||||
print("Ausgabe:", CSV_OUT)
|
||||
print("Ersetzungen (Rundung -> JXL volle Nachkommastellen):", repl_counts)
|
||||
|
||||
print("\n--- Fehlende IH ---")
|
||||
print("Anzahl:", len(fehlende_IH))
|
||||
for z, sp in fehlende_IH[:50]:
|
||||
print(f"Zeile {z}: Standpunkt {sp} (IH leer in JXL)")
|
||||
if len(fehlende_IH) > 50:
|
||||
print("... (weitere gekürzt)")
|
||||
|
||||
print("\n--- Fehlende ZH ---")
|
||||
print("Anzahl:", len(fehlende_ZH))
|
||||
for z, sp, zp in fehlende_ZH[:50]:
|
||||
print(f"Zeile {z}: Standpunkt {sp}, Ziel {zp} (ZH nicht ermittelt)")
|
||||
if len(fehlende_ZH) > 50:
|
||||
print("... (weitere gekürzt)")
|
||||
|
||||
print("\n--- Standpunkt in CSV, aber kein StationRecord in JXL ---")
|
||||
print("Anzahl:", len(fehlender_StationRecord))
|
||||
for z, sp in fehlender_StationRecord[:50]:
|
||||
print(f"Zeile {z}: Standpunkt {sp} (nicht in JXL als StationName gefunden)")
|
||||
if len(fehlender_StationRecord) > 50:
|
||||
print("... (weitere gekürzt)")
|
||||
|
||||
Reference in New Issue
Block a user