about summary refs log tree commit diff stats
path: root/archive/2025/summer/bsc_gerg/src/knowledge/llm
diff options
context:
space:
mode:
Diffstat (limited to 'archive/2025/summer/bsc_gerg/src/knowledge/llm')
-rw-r--r--archive/2025/summer/bsc_gerg/src/knowledge/llm/__init__.py0
-rw-r--r--archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/__init__.py0
-rw-r--r--archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/combiner.py79
-rw-r--r--archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/generator.py97
-rw-r--r--archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/unified_generator.py129
-rw-r--r--archive/2025/summer/bsc_gerg/src/knowledge/llm/extract.py71
-rw-r--r--archive/2025/summer/bsc_gerg/src/knowledge/llm/lemmatize.py46
7 files changed, 422 insertions, 0 deletions
diff --git a/archive/2025/summer/bsc_gerg/src/knowledge/llm/__init__.py b/archive/2025/summer/bsc_gerg/src/knowledge/llm/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/archive/2025/summer/bsc_gerg/src/knowledge/llm/__init__.py
diff --git a/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/__init__.py b/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/__init__.py
diff --git a/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/combiner.py b/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/combiner.py
new file mode 100644
index 000000000..efe81b5be
--- /dev/null
+++ b/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/combiner.py
@@ -0,0 +1,79 @@
+import asyncio
+from typing import Annotated, AsyncIterable
+from uuid import UUID
+
+from pydantic import Field
+
+from src.logger import simple_custom_logger
+from src.terminology.event import Event, CombinedDefinitionGenerated, PartialDefinitionGenerated
+from src.terminology.models import Definition
+from src.terminology.terminology import DefinitionCombiner
+
+logger = simple_custom_logger("COMBINER")
+
+RELEVANCE_USER_PROMPT = """Ist der folgende Text eine Definition für den Begriff \"%term%\"? Wenn die Definition spezifisch genug ist, beende deine Folgerung mit TRUE, ansonsten mit FALSE.
+
+%definition%"""
+
+COMBINE_SYSTEM_PROMPT = """Nutze nur das gegebene Wissen aus den Anfragen."""
+COMBINE_USER_PROMPT="""Erstelle eine kombinierte Definition für \"%term%\" anhand der folgenden Definitionen. Starte mit allgemeinen Informationen und werde dann spezifischer. Verwende nur die Informationen aus den unten stehenden Texten.
+
+%definitions%"""
+
+class LLMDefinitionCombiner(DefinitionCombiner):
+
+    MIN_PARTIAL_DEFINITIONS: int = 3
+
+    locks: Annotated[dict[UUID, asyncio.Lock], Field(default_factory=lambda: {})]
+    lock: asyncio.Lock = asyncio.Lock()
+
+    async def get_llm_response_relevance(self, term: str, definition: str) -> str:
+        pass
+
+    async def get_llm_response_combine(self, term: str, definitions: str) -> str:
+        pass
+
+    async def activate(self, event: PartialDefinitionGenerated) -> AsyncIterable[Event]:
+        # Since definitions for a term can be generated concurrently, definitions might get combined multiple times
+        # For now, only one definition can be combined at once.
+        # FIXME: Improve locking (lock per term?)
+        if event.term.id not in self.locks:
+            self.locks[event.term.id] = asyncio.Lock()
+        async with self.locks[event.term.id]:
+            logger.info(f"Locking {event.term.normalized_or_text()} definition combiner {id(event)}")
+            has_verified_definition = next((definition for definition in event.term.definitions if definition.verified), None) is not None
+
+            partial_definitions = [definition for definition in event.term.definitions if definition.is_partial()]
+
+            if not has_verified_definition and len(partial_definitions) >= self.MIN_PARTIAL_DEFINITIONS:
+
+                event.term.definitions = [definition for definition in event.term.definitions if not definition.is_combined()]
+
+                async with asyncio.TaskGroup() as tg:
+                    tasks = []
+                    for definition in event.term.definitions:
+                        task = tg.create_task(self.get_llm_response_relevance(event.term.normalized_or_text(), definition.text))
+                        tasks.append((definition, task))
+
+                relevant_definitions = [definition for definition, task in tasks if task.result().endswith("TRUE")]
+                logger.debug(f"Relevant definitions: {relevant_definitions}")
+                relevant_definitions_text = "\n\n".join([definition.text for definition in relevant_definitions])
+                response = await self.get_llm_response_combine(event.term.normalized_or_text(), relevant_definitions_text)
+
+                combined_definition = Definition(
+                    text=response,
+                    verified=False,
+                    partial=False,
+                    source=relevant_definitions
+                )
+
+                event.term.definitions.append(combined_definition)
+
+                yield CombinedDefinitionGenerated(
+                    term=event.term,
+                    combined_definition=combined_definition,
+                    relevant_definitions=relevant_definitions
+                )
+            logger.info(f"Lock released for {event.term.normalized_or_text()}")
+
+
diff --git a/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/generator.py b/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/generator.py
new file mode 100644
index 000000000..1927a3bef
--- /dev/null
+++ b/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/generator.py
@@ -0,0 +1,97 @@
+import asyncio
+import re
+from typing import AsyncIterable, Annotated
+
+from pydantic import Field
+
+from src.logger import simple_custom_logger
+from src.terminology.event import Event
+from src.terminology.models import Term
+from src.terminology.terminology import Definition, DefinitionGenerator, PartialDefinitionGenerated, OccurrenceResolved
+
+DEVELOPER_PROMPT = """
+Erstelle eine Definition für einen Begriff anhand von gegebenen Textausschnitten.
+Bleibe präzise und kurz. Nutze nur die Informationen aus dem gegebenen Text. Nutze kein gelerntes Wissen aus deinen Trainingsdaten!
+Wenn nicht genug Information vorhanden ist oder die Definition zu generell, vage oder nicht fachspezifisch ist, gebe "ERROR" aus.
+"""
+
+logger = simple_custom_logger("DEFGEN")
+
+class LLMDefinitionGenerator(DefinitionGenerator):
+
+    WINDOW_START: int = 200
+    WINDOW_END: int = 300
+    MIN_OVERLAP: int = 100
+    MAX_LENGTH: int = 1000
+
+    CERTAINTY_THRESHOLD: int = 0.05
+
+    known_sources: Annotated[dict[str, list[Term]], Field(default_factory=dict[str, list[Term]])]
+
+    async def generate_definition_from_source(self, term: str, context: str) -> str | None:
+        pass
+
+    def get_matches(self, term: str, text: str):
+        pattern = rf"{term}"
+        matches = list(re.finditer(pattern, text, re.IGNORECASE))
+        if len(matches) == 0:
+            return [text]
+
+        excerpts = []
+        last_start = 0
+        last_end = 0
+        for match in matches:
+            start = max(0, match.start() - self.WINDOW_START)
+            end = min(len(text), match.end() + self.WINDOW_END)
+
+            overlap = last_end - start
+            length = end - last_start
+            if overlap > self.MIN_OVERLAP and length <= self.MAX_LENGTH:
+                if len(excerpts) == 0:
+                    excerpts.append(text[start:end])
+                else:
+                    excerpts[-1] = text[last_start:end]
+                last_end = end
+            else:
+                last_start = start
+                last_end = end
+                excerpts.append(text[start:end])
+        return excerpts
+
+
+
+    async def activate(self, event: OccurrenceResolved) -> AsyncIterable[Event]:
+        if str(event.source.id) not in self.known_sources:
+            self.known_sources[str(event.source.id)] = list()
+
+        if event.term in self.known_sources[str(event.source.id)]:
+            return
+
+        self.known_sources[str(event.source.id)].append(event.term)
+
+        tasks = []
+        async with asyncio.TaskGroup() as tg:
+            matches = self.get_matches(term=event.term.normalized_or_text(), text=event.source.text)
+            if len(matches) > 0:
+                for match in matches:
+                    task = tg.create_task(self.generate_definition_from_source(event.term.normalized_or_text(), match))
+                    tasks.append(task)
+            else:
+                tg.create_task(self.generate_definition_from_source(event.term.normalized_or_text(), event.source.text))
+
+
+        for task in asyncio.as_completed(tasks):
+            result = await task
+            # print(f"Resolved for {event.term.normalized_or_text()}: {result}")
+            if result is not None:
+                definition = Definition(
+                    text=result,
+                    verified=False,
+                    partial=True,
+                    source=event.source
+                )
+                event.term.definitions.append(definition)
+                yield PartialDefinitionGenerated(
+                    term=event.term,
+                    definition=definition
+                )
diff --git a/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/unified_generator.py b/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/unified_generator.py
new file mode 100644
index 000000000..4f8c8dd3c
--- /dev/null
+++ b/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/unified_generator.py
@@ -0,0 +1,129 @@
+import asyncio
+import re
+from typing import Annotated, AsyncIterable
+from uuid import UUID
+
+from pydantic import Field
+
+from src.llm import create_completion_openai
+from src.logger import simple_custom_logger
+from src.terminology.event import OccurrenceResolved, Event
+from src.terminology.terminology import DefinitionGenerator, Blackboard
+
+logger = simple_custom_logger("UNIGEN")
+
+prompt_introduction = """
+Erstelle eine Definition für den Begriff "%term%" anhand von gegebenen Textausschnitten.
+Bleibe präzise und kurz. Nutze nur die Informationen aus dem gegebenen Kontext. 
+Wenn nicht genug Information vorhanden ist oder zu generell, vage oder nicht fachspezifisch ist, gebe "ERROR" aus.
+Füge in die Definition die jeweiligen Referenzen hinzu, indem du die Nummer des Abschnitts verwendest im Format [<nummer>].
+""".strip()
+
+class OpenAIUnifiedDefinitionGenerator(DefinitionGenerator):
+
+    MIN_OCCURRENCES: int = 3
+
+    WINDOW_START: int = 100
+    WINDOW_END: int = 200
+
+    locks: Annotated[dict[UUID, asyncio.Lock], Field(default_factory=lambda: {})]
+
+    # TODO: See Prompt Engineering for LLMs -> Elastic Snippets
+    async def activate(self, event: OccurrenceResolved) -> AsyncIterable[Event]:
+        if event.term.id not in self.locks:
+            self.locks[event.term.id] = asyncio.Lock()
+
+        async with self.locks[event.term.id]:
+            logger.info(f"Locking {event.term.normalized_or_text()} for unified definition generator")
+
+            term = event.term.normalized_or_text()
+            pattern = rf"{term}"
+            context = []
+            for source_id in event.term.occurrences:
+                source = self.blackboard.get_text_source(id=source_id)
+                # FIXME: Create elastic snippet? -> dynamic window length? -> differences in quality?
+
+                # Find all occurrences of the term
+                snippets = []
+
+                current_start = 0
+                current_end = 0
+                snippet = ""
+
+                matches = list(re.finditer(pattern, source.text, re.IGNORECASE))
+
+                for match in matches:
+                    start = max(0, match.start() - self.WINDOW_START)
+                    end = match.end() + self.WINDOW_END
+                    if start < current_end:
+                        # snippet overlaps with current
+                        current_end = end
+                        pass
+                    else:
+                        # snippet is further away -> new one
+                        if snippet != "":
+                            snippets.append(snippet)
+                        current_start = start
+                        current_end = end
+                    snippet = source.text[current_start:current_end]
+                if snippet != "":
+                    snippets.append(snippet)
+
+                context += snippets
+
+            # for snippet in context:
+            #     logger.debug(f"Snippet: {snippet}")
+
+            logger.debug(f"Found {len(snippets)} snippets for {term}")
+
+            # TODO: think about: how can cost be reduced? How can I decide if a text is relevant to a term?
+
+            messages = [
+                ("system", f"{prompt_introduction.replace('%term%', term)}"),
+                ("user", f"Hier sind einige Textausschnitte, die du verwenden kannst. Beziehe dich bei der Generation nur auf Wissen aus den Textstellen!"),
+                ("user", "\n\n".join([f"[{index}] {context}" for index, context in enumerate(context)])),
+                ("user", f"Definiere den Begriff \"{term}\". Beziehe dich nur auf die Textabschnitte!"),
+            ]
+
+            # logger.debug("\n----\n".join([text for _, text in messages]))
+
+            result = await create_completion_openai(
+                messages=messages,
+                model="o4-mini"
+            )
+
+            print(result)
+
+
+
+
+
+        yield
+
+
+if __name__ == "__main__":
+    blackboard = Blackboard(
+        terms=[],
+        sources=[]
+    )
+
+    term = blackboard.add_term("Fahrdienstleiter")
+
+    with open("./../../../data/Handbuch-40820-data_11-15-1.txt", "r") as f:
+        source1 = blackboard.add_text_source(text=f.read())
+
+    with open("./../../../data/Handbuch-40820-data_11-15-5.txt", "r") as f:
+        source2 = blackboard.add_text_source(text=f.read())
+
+    with open("./../../../data/Handbuch-40820-data.txt", "r") as f:
+        source3 = blackboard.add_text_source(text=f.read())
+
+    term.occurrences = [source.id for source in [source1, source2, source3]]
+
+    generator = OpenAIUnifiedDefinitionGenerator(blackboard=blackboard)
+
+    async def test():
+        async for event in generator.activate(OccurrenceResolved(term=term, source=source3)):
+            pass
+
+    asyncio.run(test())
diff --git a/archive/2025/summer/bsc_gerg/src/knowledge/llm/extract.py b/archive/2025/summer/bsc_gerg/src/knowledge/llm/extract.py
new file mode 100644
index 000000000..682b100df
--- /dev/null
+++ b/archive/2025/summer/bsc_gerg/src/knowledge/llm/extract.py
@@ -0,0 +1,71 @@
+import re
+from typing import AsyncIterable
+
+from src.terminology.event import TextExtracted, Event, TermExtracted, OccurrenceResolved
+from src.terminology.terminology import TermExtractor
+
+DEVELOPER_PROMPT: str = """
+Du bist Experte für Terminologie und Fachbegriffe. 
+Deine Aufgabe besteht darin, aus einem Text Begriffe, Abkürzungen und Phrasen zu extrahieren. 
+Du extrahierst nur Terminologie, die wahrscheinlich in der Eisenbahn verwendet wird.
+Du erkennst Abkürzungen und behällst sie unverändert bei. Nur wenn die vollständige Form vorhanden ist, fügst du sie in Klammern am Ende des Begriffs an.
+Du extrahierst Phrasen und Wörter sowie verschachtelte Begriffe und deren Einzelteile.
+Achte bei längeren Phrasen darauf, ob aus dem Text klar wird, dass es sich um einen besonderen Begriff handelt, der Wahrscheinlich verwendet wird.
+Beginne mit den Begriffen, die am wahrscheinlichsten relevant sind.
+Gib nur eine Liste von Begriffen zurück. Extrahiere nur Begriffe, die besonders für den Kontext "Eisenbahn" sind!
+"""
+
+EXAMPLE_USER: str = """
+Input:
+Du musst das Hauptsignal auf Fahrt stellen.
+"""
+
+OUTPUT_ASSISTANT: str = """
+Output:
+- Hauptsignal auf Fahrt stellen
+- Hauptsignal
+- auf Fahrt stellen
+- Fahrtstellung eines Hauptsignals
+"""
+
+class LLMTermExtractor(TermExtractor):
+
+
+    async def get_llm_response(self, text: str) -> str:
+        pass
+
+    async def activate(self, event: TextExtracted) -> AsyncIterable[Event]:
+        source = self.blackboard.add_text_source(event.text)
+        response = await self.get_llm_response(event.text)
+        response = response.split("\n")
+        terms = [candidate[2:] for candidate in response if candidate.startswith("-") or candidate.startswith("*")]
+
+        for term in terms:
+
+            variation_match = re.search(r"\(.+\)$", term)
+            abbreviation = None
+
+            if variation_match:
+                variation = (variation_match.group(0)
+                                .replace("(", "")
+                                .replace(")", "")
+                                .strip())
+                term = term.replace(variation_match.group(0), "").strip()
+                if len(variation) > len(term):
+                    abbreviation = term
+                    term = variation
+                else:
+                    abbreviation = variation
+                    term = term
+
+            t = self.blackboard.find_term(term_str=term)
+
+            if t is None:
+                t = self.blackboard.add_term(term=term)
+
+            if abbreviation and not abbreviation in t.variations:
+                t.variations.append(abbreviation)
+
+            t.occurrences.append(source.id)
+            yield TermExtracted(term=t)
+            yield OccurrenceResolved(term=t, source=source)
diff --git a/archive/2025/summer/bsc_gerg/src/knowledge/llm/lemmatize.py b/archive/2025/summer/bsc_gerg/src/knowledge/llm/lemmatize.py
new file mode 100644
index 000000000..0a7bfb7b9
--- /dev/null
+++ b/archive/2025/summer/bsc_gerg/src/knowledge/llm/lemmatize.py
@@ -0,0 +1,46 @@
+from typing import AsyncIterable
+
+from src.terminology.event import TermExtracted, Event, TermNormalized
+from src.terminology.terminology import TermNormalizer
+
+DEVELOPER_PROMPT = """
+You are an expert in linguistics and languages.
+Your job is to transform words and phrases into a normalized and generalized form.
+You transform words and phrases into singular form.
+You do not replace words with other similar words.
+"""
+
+DEVELOPER_PROMPT_SHORT: str = """
+Bringen den folgenden Begriff in eine Basisform. Behalte die Wortart.
+"""
+
+EXAMPLE_USER: list[str] = [
+    "örtlicher Zusatz",
+    "örtliche Zusätze",
+    "Betra",
+    "Aufgabe der Triebfahrzeugführerin",
+    "Triebfahrzeugführerin",
+    "Rangierbegleitender",
+]
+
+OUTPUT_ASSISTANT = [
+    "örtlicher Zusatz",
+    "örtlicher Zusatz",
+    "Betra",
+    "Aufgabe der Triebfahrzeugführer",
+    "Triebfahrzeugführer",
+    "Rangierbegleiter",
+]
+
+EXAMPLES = [message for input_term, output_term in zip(EXAMPLE_USER, OUTPUT_ASSISTANT) for message in
+                [("user", input_term), ("assistant", output_term)]]
+
+class LLMTermLemmatizer(TermNormalizer):
+
+    async def get_llm_response(self, term: str) -> str:
+        pass
+
+    async def activate(self, event: TermExtracted) -> AsyncIterable[Event]:
+        response = await self.get_llm_response(event.term.text)
+        event.term.normalization = response
+        yield TermNormalized(term=event.term)
\ No newline at end of file