import os
import time
import logging
import json
import datetime
import shutil
import subprocess
import openai
# Minimales Intervall zwischen OpenAI-API-Requests (Sekunden)
MIN_API_CALL_INTERVAL = 2.0
_last_api_call = 0.0
# Slugify: versuche externes, sonst Fallback
try:
from slugify import slugify as _slugify
def slugify(value: str, lowercase: bool = False) -> str:
return _slugify(value, lowercase=lowercase)
except ImportError:
import re
def slugify(value: str, lowercase: bool = False) -> str:
slug = re.sub(r"[^\w\s-]", "", value).strip()
slug = re.sub(r"[-\s]+", "-", slug)
return slug.lower() if lowercase else slug
# Datum parsen: dateutil oder Fallback
try:
from dateutil import parser
def parse_iso_date(s: str) -> datetime.datetime:
return parser.parse(s)
except ImportError:
def parse_iso_date(s: str) -> datetime.datetime:
for fmt in ("%d.%m.%Y", "%Y-%m-%d"):
try:
return datetime.datetime.strptime(s, fmt)
except Exception:
pass
return datetime.datetime.fromisoformat(s)
# PDF Reader: pypdf oder PyPDF2
try:
from pypdf import PdfReader
except ImportError:
try:
from PyPDF2 import PdfReader
except ImportError:
PdfReader = None
PDF_AVAILABLE = PdfReader is not None
import tkinter as tk
from tkinter import filedialog, messagebox
# Konfiguration
CONFIG_PATH = os.path.expanduser("~/.invoice_watchdog_config.json")
def load_config() -> dict:
try:
with open(CONFIG_PATH, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
return {}
def save_config(cfg: dict) -> None:
try:
with open(CONFIG_PATH, 'w', encoding='utf-8') as f:
json.dump(cfg, f, indent=2)
except Exception as e:
logging.error(f"Config-Fehler: {e}")
# Globale Variablen
tcfg = load_config()
WATCH_DIRECTORY = tcfg.get('watch_directory')
OUTPUT_DIRECTORY = tcfg.get('output_directory')
OPENAI_API_KEY = tcfg.get('api_key')
# Schon verarbeitete Dateien merken (damit sie nicht mehrfach geschickt werden)
bereits_verarbeitet = set()
def move_to_failed(file_path: str) -> None:
failed_dir = os.path.join(WATCH_DIRECTORY, "FAILED")
os.makedirs(failed_dir, exist_ok=True)
base_name = os.path.basename(file_path)
target_path = os.path.join(failed_dir, base_name)
try:
shutil.move(file_path, target_path)
logging.info(f"{base_name} wurde in den FAILED-Ordner verschoben.")
except Exception as e:
logging.error(f"Fehler beim Verschieben nach FAILED: {e}")
# Prozesse neue Dateien
def process_new_files(seen: set) -> set:
try:
entries = os.listdir(WATCH_DIRECTORY)
except Exception as e:
logging.error(f"Verzeichnisfehler: {e}")
return seen
for fname in set(entries) - seen:
# Versteckte Dateien ignorieren
if fname.startswith('.') or fname == '.DS_Store':
continue
path = os.path.join(WATCH_DIRECTORY, fname)
ext = os.path.splitext(fname)[1].lower()
text = None
if fname in bereits_verarbeitet:
continue # Schon bearbeitet
if ext == '.txt':
logging.info(f"TXT erkannt: {fname}")
try:
with open(path, 'r', encoding='utf-8') as f:
text = f.read()
except Exception as e:
logging.error(f"TXT-Lese-Fehler {fname}: {e}")
move_to_failed(path)
bereits_verarbeitet.add(fname)
continue
elif ext == '.pdf':
logging.info(f"PDF erkannt: {fname}")
if PDF_AVAILABLE:
try:
reader = PdfReader(path)
pages = [p.extract_text() for p in reader.pages]
text = '\n'.join(filter(None, pages))
except Exception as e:
logging.error(f"PDF-Lese-Fehler {fname}: {e}")
move_to_failed(path)
bereits_verarbeitet.add(fname)
continue
elif shutil.which('pdftotext'):
try:
raw = subprocess.check_output(['pdftotext', path, '-'], stderr=subprocess.DEVNULL)
text = raw.decode('utf-8', errors='ignore')
except Exception as e:
logging.error(f"pdftotext-Fehler {fname}: {e}")
move_to_failed(path)
bereits_verarbeitet.add(fname)
continue
else:
logging.error(f"Keine PDF-Unterstützung für {fname}")
move_to_failed(path)
bereits_verarbeitet.add(fname)
continue
else:
logging.info(f"Überspringe unbekanntes Format: {fname}")
bereits_verarbeitet.add(fname)
continue
# Wenn kein Text gefunden wurde
if not text or len(text.strip()) == 0:
logging.error(f"Kein Text in {fname}")
move_to_failed(path)
bereits_verarbeitet.add(fname)
continue
# Kleine Pause vor OpenAI-Request
time.sleep(1)
data = extract_invoice_data(text)
if data:
rename_file(path, data)
bereits_verarbeitet.add(fname)
else:
logging.error(f"Extraktion fehlgeschlagen: {fname}")
move_to_failed(path)
bereits_verarbeitet.add(fname)
return seen.union(entries)
# Extrahiert per OpenAI-API mit Rate-Limit
def extract_invoice_data(text: str) -> dict | None:
global _last_api_call
if not OPENAI_API_KEY:
logging.error("Kein OpenAI API-Key gesetzt.")
return None
# Rate-Limit: Warte, wenn letzter Call zu kurz her
now = time.time()
interval = now - _last_api_call
if interval < MIN_API_CALL_INTERVAL:
time.sleep(MIN_API_CALL_INTERVAL - interval)
openai.api_key = OPENAI_API_KEY
try:
response = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": (
"Extrahiere aus folgendem Text eine Rechnung mit Feldern als JSON:\n"
"- rechnungsaussteller (string)\n"
"- rechnungsnummer (string)\n"
"- rechnungsdatum (YYYY-MM-DD)\n"
"- betrag (number)"
)},
{"role": "user", "content": text}
],
functions=[{
"name": "extract_invoice_data",
"description": "Extrahiere Rechnungsdaten als JSON",
"parameters": {
"type": "object",
"properties": {
"rechnungsaussteller": {"type": "string"},
"rechnungsnummer": {"type": "string"},
"rechnungsdatum": {"type": "string", "format": "date"},
"betrag": {"type": "number"}
},
"required": ["rechnungsaussteller", "rechnungsnummer", "rechnungsdatum", "betrag"]
}
}],
function_call={"name": "extract_invoice_data"}
)
_last_api_call = time.time()
# Debug-Ausgabe
print("--- OpenAI Raw Response Start ---")
print(response)
print("--- OpenAI Raw Response End ---")
func_call = response.choices[0].message.function_call
# HIER WAR DEIN SYNTAXFEHLER:
args = func_call.arguments if hasattr(func_call, 'arguments') else func_call['arguments']
return json.loads(args)
except Exception as e:
logging.error(f"OpenAI-Fehler: {e}")
return None
except Exception as e:
logging.error(f"OpenAI-Fehler: {e}")
return None
# Benennt Datei um
def rename_file(old: str, data: dict) -> None:
dirpath, filename = os.path.split(old)
ext = os.path.splitext(filename)[1]
issuer = slugify(data.get("rechnungsaussteller", ""))
dt_str = data.get("rechnungsdatum", "")
dt = parse_iso_date(dt_str).strftime("%d.%m.%Y") if dt_str else ""
inv_num = slugify(data.get("rechnungsnummer", ""))
amount = data.get("betrag", "")
new_name = f"{issuer}_ER_{dt}_{inv_num}_{amount}{ext}"
target = OUTPUT_DIRECTORY or dirpath
os.makedirs(target, exist_ok=True)
try:
os.rename(old, os.path.join(target, new_name))
logging.info(f"{filename} → {new_name}")
except Exception as e:
logging.error(f"Rename-Fehler: {e}")
# GUI-Konfiguration
def create_gui():
def pick_folder(var, label):
p = filedialog.askdirectory()
if p:
var.set(p)
label.config(text=p)
root = tk.Tk()
root.title("Invoice Watchdog Konfiguration")
tk.Label(root, text="OpenAI API-Key:").grid(row=0, column=0, sticky="w")
api_var = tk.StringVar(value=OPENAI_API_KEY or "")
tk.Entry(root, show="*", textvariable=api_var).grid(row=0, column=1, columnspan=2, sticky="ew")
tk.Label(root, text="Eingangsordner:").grid(row=1, column=0, sticky="w")
in_var = tk.StringVar(value=WATCH_DIRECTORY or "")
in_label = tk.Label(root, textvariable=in_var)
in_label.grid(row=1, column=1, sticky="ew")
tk.Button(root, text="Auswählen", command=lambda: pick_folder(in_var, in_label)).grid(row=1, column=2)
tk.Label(root, text="Ausgabeordner:").grid(row=2, column=0, sticky="w")
out_var = tk.StringVar(value=OUTPUT_DIRECTORY or "")
out_label = tk.Label(root, textvariable=out_var)
out_label.grid(row=2, column=1, sticky="ew")
tk.Button(root, text="Auswählen", command=lambda: pick_folder(out_var, out_label)).grid(row=2, column=2)
def start():
key = api_var.get().strip()
watch = in_var.get().strip()
outp = out_var.get().strip()
if not key or not watch or not outp:
messagebox.showwarning("Fehler", "Alle Felder müssen ausgefüllt sein!")
return
global OPENAI_API_KEY, WATCH_DIRECTORY, OUTPUT_DIRECTORY
OPENAI_API_KEY = key
WATCH_DIRECTORY = watch
OUTPUT_DIRECTORY = outp
openai.api_key = OPENAI_API_KEY
save_config({'api_key': key, 'watch_directory': watch, 'output_directory': outp})
root.quit()
tk.Button(root, text="Starten", command=start).grid(row=3, column=0, columnspan=3)
root.columnconfigure(1, weight=1)
root.mainloop()
root.destroy()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
create_gui()
if OPENAI_API_KEY and WATCH_DIRECTORY and OUTPUT_DIRECTORY:
seen = set()
logging.info(f"Starte Watchdog für: {WATCH_DIRECTORY}")
try:
while True:
seen = process_new_files(seen)
time.sleep(2)
except KeyboardInterrupt:
logging.info("Watchdog gestoppt.")
else:
logging.error("Konfiguration unvollständig. Bitte starten Sie das Programm neu.")