Hallo,

ist ein wenig außerhalb von Tomedo, betrifft aber dann doch den Praxisablauf:

Einkommende Papierrechnung werden bei uns gescannt, umbenannt (Schema 'Rechungsaussteller' Eingangsrechnung 'Rechnungsnummer' 'Rechnungsdatum' 'Summe'). Danach kommt die Rechnung in ein DMS (devonthink) und geht wird dann über ein separates Prgramm mit der entsprechenden Buchung verknüpft. Die Originalrechnung wird archiviert, falls eine Prüfung kommt.

Dasselbe Vorgehen passiert auch mit .PDF Rechnungen, die noch ohne XML Datei kommen und nicht ZUGFERD kompatibel sind.

Am meisten nervt das Umbennen der Rechung nach dem Scannen.

Ich habe in Python mit Hilfe von ChatGPT ein Watchdog erstellt, ein Verzeichnis überwacht und die dort reingelegten PDFs an openai schickt. Openai analysiert den aus dem PDF extrahierten Text und schickt die Information nach dem Schema zurück. Der Watchdog benennt die gescannte Datei nach dem oben genannten Schema um, kopiert ihn in einen anderen Ordner und löscht das Original.

Diesen Watchdog kann man natürlich für viele andere Funktionen nutzen: zB Arztbriefe an eine lokal laufende LLM schicken und eine Kurzzusammenfassung erstellen lassen. Oder gescannte Medikamentpläne per LLM erkennen lassen und als Textdatei für Tomedo in einen Lauschordner packen.

Das Programm nutzt diverse Bibilotheken: openai, pypdf, PyPDF2. Diese müssen alle entsprechend installiert sein.

 

Man braucht einen openai-Zugang und muss da auch etwas Geld für die API-Nutzung draufpacken. Dann muss man einen openai-Key generieren, den man beim Start des Programms eingeben muss. Die Verzeichnisse können per GUI gewählt werden.

Das ganze ist natürlich ohne Gewähr für irgendwas und defintiv nicht für Patientendaten oder medizinische Ratschläge.

Den Code habe ich überwiegend mi ChatGPT generiert. Das ganze ist ein Bastelprojekt. Ich freue mich über andere, die mitbasteln wollen :-)

@Zollsoft: wenn das zu weit weg von Tomedo und damit OFFTOPICist, dann Bescheid geben. Ich lösche den Beitrag dann :-)

-js
Gefragt vor in Anderes von (6.1k Punkte)
+3 Punkte

1 Antwort

import os
import time
import logging
import json
import datetime
import shutil
import subprocess
import openai


# Minimales Intervall zwischen OpenAI-API-Requests (Sekunden)
MIN_API_CALL_INTERVAL = 2.0
_last_api_call = 0.0

# Slugify: versuche externes, sonst Fallback
try:
    from slugify import slugify as _slugify
    def slugify(value: str, lowercase: bool = False) -> str:
        return _slugify(value, lowercase=lowercase)
except ImportError:
    import re
    def slugify(value: str, lowercase: bool = False) -> str:
        slug = re.sub(r"[^\w\s-]", "", value).strip()
        slug = re.sub(r"[-\s]+", "-", slug)
        return slug.lower() if lowercase else slug

# Datum parsen: dateutil oder Fallback
try:
    from dateutil import parser
    def parse_iso_date(s: str) -> datetime.datetime:
        return parser.parse(s)
except ImportError:
    def parse_iso_date(s: str) -> datetime.datetime:
        for fmt in ("%d.%m.%Y", "%Y-%m-%d"):
            try:
                return datetime.datetime.strptime(s, fmt)
            except Exception:
                pass
        return datetime.datetime.fromisoformat(s)

# PDF Reader: pypdf oder PyPDF2
try:
    from pypdf import PdfReader
except ImportError:
    try:
        from PyPDF2 import PdfReader
    except ImportError:
        PdfReader = None
PDF_AVAILABLE = PdfReader is not None

import tkinter as tk
from tkinter import filedialog, messagebox

# Konfiguration
CONFIG_PATH = os.path.expanduser("~/.invoice_watchdog_config.json")

def load_config() -> dict:
    try:
        with open(CONFIG_PATH, 'r', encoding='utf-8') as f:
            return json.load(f)
    except Exception:
        return {}

def save_config(cfg: dict) -> None:
    try:
        with open(CONFIG_PATH, 'w', encoding='utf-8') as f:
            json.dump(cfg, f, indent=2)
    except Exception as e:
        logging.error(f"Config-Fehler: {e}")

# Globale Variablen
tcfg = load_config()
WATCH_DIRECTORY = tcfg.get('watch_directory')
OUTPUT_DIRECTORY = tcfg.get('output_directory')
OPENAI_API_KEY = tcfg.get('api_key')

# Schon verarbeitete Dateien merken (damit sie nicht mehrfach geschickt werden)
bereits_verarbeitet = set()

def move_to_failed(file_path: str) -> None:
    failed_dir = os.path.join(WATCH_DIRECTORY, "FAILED")
    os.makedirs(failed_dir, exist_ok=True)
    base_name = os.path.basename(file_path)
    target_path = os.path.join(failed_dir, base_name)
    try:
        shutil.move(file_path, target_path)
        logging.info(f"{base_name} wurde in den FAILED-Ordner verschoben.")
    except Exception as e:
        logging.error(f"Fehler beim Verschieben nach FAILED: {e}")


# Prozesse neue Dateien

def process_new_files(seen: set) -> set:
    try:
        entries = os.listdir(WATCH_DIRECTORY)
    except Exception as e:
        logging.error(f"Verzeichnisfehler: {e}")
        return seen

    for fname in set(entries) - seen:
        # Versteckte Dateien ignorieren
        if fname.startswith('.') or fname == '.DS_Store':
            continue

        path = os.path.join(WATCH_DIRECTORY, fname)
        ext = os.path.splitext(fname)[1].lower()
        text = None

        if fname in bereits_verarbeitet:
            continue  # Schon bearbeitet

        if ext == '.txt':
            logging.info(f"TXT erkannt: {fname}")
            try:
                with open(path, 'r', encoding='utf-8') as f:
                    text = f.read()
            except Exception as e:
                logging.error(f"TXT-Lese-Fehler {fname}: {e}")
                move_to_failed(path)
                bereits_verarbeitet.add(fname)
                continue

        elif ext == '.pdf':
            logging.info(f"PDF erkannt: {fname}")
            if PDF_AVAILABLE:
                try:
                    reader = PdfReader(path)
                    pages = [p.extract_text() for p in reader.pages]
                    text = '\n'.join(filter(None, pages))
                except Exception as e:
                    logging.error(f"PDF-Lese-Fehler {fname}: {e}")
                    move_to_failed(path)
                    bereits_verarbeitet.add(fname)
                    continue
            elif shutil.which('pdftotext'):
                try:
                    raw = subprocess.check_output(['pdftotext', path, '-'], stderr=subprocess.DEVNULL)
                    text = raw.decode('utf-8', errors='ignore')
                except Exception as e:
                    logging.error(f"pdftotext-Fehler {fname}: {e}")
                    move_to_failed(path)
                    bereits_verarbeitet.add(fname)
                    continue
            else:
                logging.error(f"Keine PDF-Unterstützung für {fname}")
                move_to_failed(path)
                bereits_verarbeitet.add(fname)
                continue

        else:
            logging.info(f"Überspringe unbekanntes Format: {fname}")
            bereits_verarbeitet.add(fname)
            continue

        # Wenn kein Text gefunden wurde
        if not text or len(text.strip()) == 0:
            logging.error(f"Kein Text in {fname}")
            move_to_failed(path)
            bereits_verarbeitet.add(fname)
            continue

        # Kleine Pause vor OpenAI-Request
        time.sleep(1)

        data = extract_invoice_data(text)
        if data:
            rename_file(path, data)
            bereits_verarbeitet.add(fname)
        else:
            logging.error(f"Extraktion fehlgeschlagen: {fname}")
            move_to_failed(path)
            bereits_verarbeitet.add(fname)

    return seen.union(entries)


# Extrahiert per OpenAI-API mit Rate-Limit
def extract_invoice_data(text: str) -> dict | None:
    global _last_api_call
    if not OPENAI_API_KEY:
        logging.error("Kein OpenAI API-Key gesetzt.")
        return None

    # Rate-Limit: Warte, wenn letzter Call zu kurz her
    now = time.time()
    interval = now - _last_api_call
    if interval < MIN_API_CALL_INTERVAL:
        time.sleep(MIN_API_CALL_INTERVAL - interval)

    openai.api_key = OPENAI_API_KEY
    try:
        response = openai.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[
                {"role": "system", "content": (
                    "Extrahiere aus folgendem Text eine Rechnung mit Feldern als JSON:\n"
                    "- rechnungsaussteller (string)\n"
                    "- rechnungsnummer (string)\n"
                    "- rechnungsdatum (YYYY-MM-DD)\n"
                    "- betrag (number)"
                )},
                {"role": "user", "content": text}
            ],
            functions=[{
                "name": "extract_invoice_data",
                "description": "Extrahiere Rechnungsdaten als JSON",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "rechnungsaussteller": {"type": "string"},
                        "rechnungsnummer": {"type": "string"},
                        "rechnungsdatum": {"type": "string", "format": "date"},
                        "betrag": {"type": "number"}
                    },
                    "required": ["rechnungsaussteller", "rechnungsnummer", "rechnungsdatum", "betrag"]
                }
            }],
            function_call={"name": "extract_invoice_data"}
        )
        _last_api_call = time.time()

        # Debug-Ausgabe
        print("--- OpenAI Raw Response Start ---")
        print(response)
        print("--- OpenAI Raw Response End ---")

        func_call = response.choices[0].message.function_call

        # HIER WAR DEIN SYNTAXFEHLER:
        args = func_call.arguments if hasattr(func_call, 'arguments') else func_call['arguments']

        return json.loads(args)

    except Exception as e:
        logging.error(f"OpenAI-Fehler: {e}")
        return None

    except Exception as e:
        logging.error(f"OpenAI-Fehler: {e}")
        return None

# Benennt Datei um
def rename_file(old: str, data: dict) -> None:
    dirpath, filename = os.path.split(old)
    ext = os.path.splitext(filename)[1]
    issuer = slugify(data.get("rechnungsaussteller", ""))
    dt_str = data.get("rechnungsdatum", "")
    dt = parse_iso_date(dt_str).strftime("%d.%m.%Y") if dt_str else ""
    inv_num = slugify(data.get("rechnungsnummer", ""))
    amount = data.get("betrag", "")
    new_name = f"{issuer}_ER_{dt}_{inv_num}_{amount}{ext}"
    target = OUTPUT_DIRECTORY or dirpath
    os.makedirs(target, exist_ok=True)
    try:
        os.rename(old, os.path.join(target, new_name))
        logging.info(f"{filename} → {new_name}")
    except Exception as e:
        logging.error(f"Rename-Fehler: {e}")

# GUI-Konfiguration
def create_gui():
    def pick_folder(var, label):
        p = filedialog.askdirectory()
        if p:
            var.set(p)
            label.config(text=p)

    root = tk.Tk()
    root.title("Invoice Watchdog Konfiguration")

    tk.Label(root, text="OpenAI API-Key:").grid(row=0, column=0, sticky="w")
    api_var = tk.StringVar(value=OPENAI_API_KEY or "")
    tk.Entry(root, show="*", textvariable=api_var).grid(row=0, column=1, columnspan=2, sticky="ew")

    tk.Label(root, text="Eingangsordner:").grid(row=1, column=0, sticky="w")
    in_var = tk.StringVar(value=WATCH_DIRECTORY or "")
    in_label = tk.Label(root, textvariable=in_var)
    in_label.grid(row=1, column=1, sticky="ew")
    tk.Button(root, text="Auswählen", command=lambda: pick_folder(in_var, in_label)).grid(row=1, column=2)

    tk.Label(root, text="Ausgabeordner:").grid(row=2, column=0, sticky="w")
    out_var = tk.StringVar(value=OUTPUT_DIRECTORY or "")
    out_label = tk.Label(root, textvariable=out_var)
    out_label.grid(row=2, column=1, sticky="ew")
    tk.Button(root, text="Auswählen", command=lambda: pick_folder(out_var, out_label)).grid(row=2, column=2)

    def start():
        key = api_var.get().strip()
        watch = in_var.get().strip()
        outp = out_var.get().strip()
        if not key or not watch or not outp:
            messagebox.showwarning("Fehler", "Alle Felder müssen ausgefüllt sein!")
            return
        global OPENAI_API_KEY, WATCH_DIRECTORY, OUTPUT_DIRECTORY
        OPENAI_API_KEY = key
        WATCH_DIRECTORY = watch
        OUTPUT_DIRECTORY = outp
        openai.api_key = OPENAI_API_KEY
        save_config({'api_key': key, 'watch_directory': watch, 'output_directory': outp})
        root.quit()

    tk.Button(root, text="Starten", command=start).grid(row=3, column=0, columnspan=3)
    root.columnconfigure(1, weight=1)
    root.mainloop()
    root.destroy()

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
    create_gui()
    if OPENAI_API_KEY and WATCH_DIRECTORY and OUTPUT_DIRECTORY:
        seen = set()
        logging.info(f"Starte Watchdog für: {WATCH_DIRECTORY}")
        try:
            while True:
                seen = process_new_files(seen)
                time.sleep(2)
        except KeyboardInterrupt:
            logging.info("Watchdog gestoppt.")
    else:
        logging.error("Konfiguration unvollständig. Bitte starten Sie das Programm neu.")

 

Beantwortet vor von (6.1k Punkte)
+1 Punkt
Das sieht sehr komplex aus. Würde mich im Realbetrieb interessieren.
Sehr beeindruckend!

Lieben Dank fürs teilen
mega!
19,700 Beiträge
28,146 Antworten
50,835 Kommentare
33,389 Nutzer