diff options
Diffstat (limited to 'archive/2025/summer/bsc_gerg/src/knowledge/llm')
7 files changed, 422 insertions, 0 deletions
diff --git a/archive/2025/summer/bsc_gerg/src/knowledge/llm/__init__.py b/archive/2025/summer/bsc_gerg/src/knowledge/llm/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/archive/2025/summer/bsc_gerg/src/knowledge/llm/__init__.py diff --git a/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/__init__.py b/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/__init__.py diff --git a/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/combiner.py b/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/combiner.py new file mode 100644 index 000000000..efe81b5be --- /dev/null +++ b/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/combiner.py @@ -0,0 +1,79 @@ +import asyncio +from typing import Annotated, AsyncIterable +from uuid import UUID + +from pydantic import Field + +from src.logger import simple_custom_logger +from src.terminology.event import Event, CombinedDefinitionGenerated, PartialDefinitionGenerated +from src.terminology.models import Definition +from src.terminology.terminology import DefinitionCombiner + +logger = simple_custom_logger("COMBINER") + +RELEVANCE_USER_PROMPT = """Ist der folgende Text eine Definition für den Begriff \"%term%\"? Wenn die Definition spezifisch genug ist, beende deine Folgerung mit TRUE, ansonsten mit FALSE. + +%definition%""" + +COMBINE_SYSTEM_PROMPT = """Nutze nur das gegebene Wissen aus den Anfragen.""" +COMBINE_USER_PROMPT="""Erstelle eine kombinierte Definition für \"%term%\" anhand der folgenden Definitionen. Starte mit allgemeinen Informationen und werde dann spezifischer. Verwende nur die Informationen aus den unten stehenden Texten. + +%definitions%""" + +class LLMDefinitionCombiner(DefinitionCombiner): + + MIN_PARTIAL_DEFINITIONS: int = 3 + + locks: Annotated[dict[UUID, asyncio.Lock], Field(default_factory=lambda: {})] + lock: asyncio.Lock = asyncio.Lock() + + async def get_llm_response_relevance(self, term: str, definition: str) -> str: + pass + + async def get_llm_response_combine(self, term: str, definitions: str) -> str: + pass + + async def activate(self, event: PartialDefinitionGenerated) -> AsyncIterable[Event]: + # Since definitions for a term can be generated concurrently, definitions might get combined multiple times + # For now, only one definition can be combined at once. + # FIXME: Improve locking (lock per term?) + if event.term.id not in self.locks: + self.locks[event.term.id] = asyncio.Lock() + async with self.locks[event.term.id]: + logger.info(f"Locking {event.term.normalized_or_text()} definition combiner {id(event)}") + has_verified_definition = next((definition for definition in event.term.definitions if definition.verified), None) is not None + + partial_definitions = [definition for definition in event.term.definitions if definition.is_partial()] + + if not has_verified_definition and len(partial_definitions) >= self.MIN_PARTIAL_DEFINITIONS: + + event.term.definitions = [definition for definition in event.term.definitions if not definition.is_combined()] + + async with asyncio.TaskGroup() as tg: + tasks = [] + for definition in event.term.definitions: + task = tg.create_task(self.get_llm_response_relevance(event.term.normalized_or_text(), definition.text)) + tasks.append((definition, task)) + + relevant_definitions = [definition for definition, task in tasks if task.result().endswith("TRUE")] + logger.debug(f"Relevant definitions: {relevant_definitions}") + relevant_definitions_text = "\n\n".join([definition.text for definition in relevant_definitions]) + response = await self.get_llm_response_combine(event.term.normalized_or_text(), relevant_definitions_text) + + combined_definition = Definition( + text=response, + verified=False, + partial=False, + source=relevant_definitions + ) + + event.term.definitions.append(combined_definition) + + yield CombinedDefinitionGenerated( + term=event.term, + combined_definition=combined_definition, + relevant_definitions=relevant_definitions + ) + logger.info(f"Lock released for {event.term.normalized_or_text()}") + + diff --git a/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/generator.py b/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/generator.py new file mode 100644 index 000000000..1927a3bef --- /dev/null +++ b/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/generator.py @@ -0,0 +1,97 @@ +import asyncio +import re +from typing import AsyncIterable, Annotated + +from pydantic import Field + +from src.logger import simple_custom_logger +from src.terminology.event import Event +from src.terminology.models import Term +from src.terminology.terminology import Definition, DefinitionGenerator, PartialDefinitionGenerated, OccurrenceResolved + +DEVELOPER_PROMPT = """ +Erstelle eine Definition für einen Begriff anhand von gegebenen Textausschnitten. +Bleibe präzise und kurz. Nutze nur die Informationen aus dem gegebenen Text. Nutze kein gelerntes Wissen aus deinen Trainingsdaten! +Wenn nicht genug Information vorhanden ist oder die Definition zu generell, vage oder nicht fachspezifisch ist, gebe "ERROR" aus. +""" + +logger = simple_custom_logger("DEFGEN") + +class LLMDefinitionGenerator(DefinitionGenerator): + + WINDOW_START: int = 200 + WINDOW_END: int = 300 + MIN_OVERLAP: int = 100 + MAX_LENGTH: int = 1000 + + CERTAINTY_THRESHOLD: int = 0.05 + + known_sources: Annotated[dict[str, list[Term]], Field(default_factory=dict[str, list[Term]])] + + async def generate_definition_from_source(self, term: str, context: str) -> str | None: + pass + + def get_matches(self, term: str, text: str): + pattern = rf"{term}" + matches = list(re.finditer(pattern, text, re.IGNORECASE)) + if len(matches) == 0: + return [text] + + excerpts = [] + last_start = 0 + last_end = 0 + for match in matches: + start = max(0, match.start() - self.WINDOW_START) + end = min(len(text), match.end() + self.WINDOW_END) + + overlap = last_end - start + length = end - last_start + if overlap > self.MIN_OVERLAP and length <= self.MAX_LENGTH: + if len(excerpts) == 0: + excerpts.append(text[start:end]) + else: + excerpts[-1] = text[last_start:end] + last_end = end + else: + last_start = start + last_end = end + excerpts.append(text[start:end]) + return excerpts + + + + async def activate(self, event: OccurrenceResolved) -> AsyncIterable[Event]: + if str(event.source.id) not in self.known_sources: + self.known_sources[str(event.source.id)] = list() + + if event.term in self.known_sources[str(event.source.id)]: + return + + self.known_sources[str(event.source.id)].append(event.term) + + tasks = [] + async with asyncio.TaskGroup() as tg: + matches = self.get_matches(term=event.term.normalized_or_text(), text=event.source.text) + if len(matches) > 0: + for match in matches: + task = tg.create_task(self.generate_definition_from_source(event.term.normalized_or_text(), match)) + tasks.append(task) + else: + tg.create_task(self.generate_definition_from_source(event.term.normalized_or_text(), event.source.text)) + + + for task in asyncio.as_completed(tasks): + result = await task + # print(f"Resolved for {event.term.normalized_or_text()}: {result}") + if result is not None: + definition = Definition( + text=result, + verified=False, + partial=True, + source=event.source + ) + event.term.definitions.append(definition) + yield PartialDefinitionGenerated( + term=event.term, + definition=definition + ) diff --git a/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/unified_generator.py b/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/unified_generator.py new file mode 100644 index 000000000..4f8c8dd3c --- /dev/null +++ b/archive/2025/summer/bsc_gerg/src/knowledge/llm/definition/unified_generator.py @@ -0,0 +1,129 @@ +import asyncio +import re +from typing import Annotated, AsyncIterable +from uuid import UUID + +from pydantic import Field + +from src.llm import create_completion_openai +from src.logger import simple_custom_logger +from src.terminology.event import OccurrenceResolved, Event +from src.terminology.terminology import DefinitionGenerator, Blackboard + +logger = simple_custom_logger("UNIGEN") + +prompt_introduction = """ +Erstelle eine Definition für den Begriff "%term%" anhand von gegebenen Textausschnitten. +Bleibe präzise und kurz. Nutze nur die Informationen aus dem gegebenen Kontext. +Wenn nicht genug Information vorhanden ist oder zu generell, vage oder nicht fachspezifisch ist, gebe "ERROR" aus. +Füge in die Definition die jeweiligen Referenzen hinzu, indem du die Nummer des Abschnitts verwendest im Format [<nummer>]. +""".strip() + +class OpenAIUnifiedDefinitionGenerator(DefinitionGenerator): + + MIN_OCCURRENCES: int = 3 + + WINDOW_START: int = 100 + WINDOW_END: int = 200 + + locks: Annotated[dict[UUID, asyncio.Lock], Field(default_factory=lambda: {})] + + # TODO: See Prompt Engineering for LLMs -> Elastic Snippets + async def activate(self, event: OccurrenceResolved) -> AsyncIterable[Event]: + if event.term.id not in self.locks: + self.locks[event.term.id] = asyncio.Lock() + + async with self.locks[event.term.id]: + logger.info(f"Locking {event.term.normalized_or_text()} for unified definition generator") + + term = event.term.normalized_or_text() + pattern = rf"{term}" + context = [] + for source_id in event.term.occurrences: + source = self.blackboard.get_text_source(id=source_id) + # FIXME: Create elastic snippet? -> dynamic window length? -> differences in quality? + + # Find all occurrences of the term + snippets = [] + + current_start = 0 + current_end = 0 + snippet = "" + + matches = list(re.finditer(pattern, source.text, re.IGNORECASE)) + + for match in matches: + start = max(0, match.start() - self.WINDOW_START) + end = match.end() + self.WINDOW_END + if start < current_end: + # snippet overlaps with current + current_end = end + pass + else: + # snippet is further away -> new one + if snippet != "": + snippets.append(snippet) + current_start = start + current_end = end + snippet = source.text[current_start:current_end] + if snippet != "": + snippets.append(snippet) + + context += snippets + + # for snippet in context: + # logger.debug(f"Snippet: {snippet}") + + logger.debug(f"Found {len(snippets)} snippets for {term}") + + # TODO: think about: how can cost be reduced? How can I decide if a text is relevant to a term? + + messages = [ + ("system", f"{prompt_introduction.replace('%term%', term)}"), + ("user", f"Hier sind einige Textausschnitte, die du verwenden kannst. Beziehe dich bei der Generation nur auf Wissen aus den Textstellen!"), + ("user", "\n\n".join([f"[{index}] {context}" for index, context in enumerate(context)])), + ("user", f"Definiere den Begriff \"{term}\". Beziehe dich nur auf die Textabschnitte!"), + ] + + # logger.debug("\n----\n".join([text for _, text in messages])) + + result = await create_completion_openai( + messages=messages, + model="o4-mini" + ) + + print(result) + + + + + + yield + + +if __name__ == "__main__": + blackboard = Blackboard( + terms=[], + sources=[] + ) + + term = blackboard.add_term("Fahrdienstleiter") + + with open("./../../../data/Handbuch-40820-data_11-15-1.txt", "r") as f: + source1 = blackboard.add_text_source(text=f.read()) + + with open("./../../../data/Handbuch-40820-data_11-15-5.txt", "r") as f: + source2 = blackboard.add_text_source(text=f.read()) + + with open("./../../../data/Handbuch-40820-data.txt", "r") as f: + source3 = blackboard.add_text_source(text=f.read()) + + term.occurrences = [source.id for source in [source1, source2, source3]] + + generator = OpenAIUnifiedDefinitionGenerator(blackboard=blackboard) + + async def test(): + async for event in generator.activate(OccurrenceResolved(term=term, source=source3)): + pass + + asyncio.run(test()) diff --git a/archive/2025/summer/bsc_gerg/src/knowledge/llm/extract.py b/archive/2025/summer/bsc_gerg/src/knowledge/llm/extract.py new file mode 100644 index 000000000..682b100df --- /dev/null +++ b/archive/2025/summer/bsc_gerg/src/knowledge/llm/extract.py @@ -0,0 +1,71 @@ +import re +from typing import AsyncIterable + +from src.terminology.event import TextExtracted, Event, TermExtracted, OccurrenceResolved +from src.terminology.terminology import TermExtractor + +DEVELOPER_PROMPT: str = """ +Du bist Experte für Terminologie und Fachbegriffe. +Deine Aufgabe besteht darin, aus einem Text Begriffe, Abkürzungen und Phrasen zu extrahieren. +Du extrahierst nur Terminologie, die wahrscheinlich in der Eisenbahn verwendet wird. +Du erkennst Abkürzungen und behällst sie unverändert bei. Nur wenn die vollständige Form vorhanden ist, fügst du sie in Klammern am Ende des Begriffs an. +Du extrahierst Phrasen und Wörter sowie verschachtelte Begriffe und deren Einzelteile. +Achte bei längeren Phrasen darauf, ob aus dem Text klar wird, dass es sich um einen besonderen Begriff handelt, der Wahrscheinlich verwendet wird. +Beginne mit den Begriffen, die am wahrscheinlichsten relevant sind. +Gib nur eine Liste von Begriffen zurück. Extrahiere nur Begriffe, die besonders für den Kontext "Eisenbahn" sind! +""" + +EXAMPLE_USER: str = """ +Input: +Du musst das Hauptsignal auf Fahrt stellen. +""" + +OUTPUT_ASSISTANT: str = """ +Output: +- Hauptsignal auf Fahrt stellen +- Hauptsignal +- auf Fahrt stellen +- Fahrtstellung eines Hauptsignals +""" + +class LLMTermExtractor(TermExtractor): + + + async def get_llm_response(self, text: str) -> str: + pass + + async def activate(self, event: TextExtracted) -> AsyncIterable[Event]: + source = self.blackboard.add_text_source(event.text) + response = await self.get_llm_response(event.text) + response = response.split("\n") + terms = [candidate[2:] for candidate in response if candidate.startswith("-") or candidate.startswith("*")] + + for term in terms: + + variation_match = re.search(r"\(.+\)$", term) + abbreviation = None + + if variation_match: + variation = (variation_match.group(0) + .replace("(", "") + .replace(")", "") + .strip()) + term = term.replace(variation_match.group(0), "").strip() + if len(variation) > len(term): + abbreviation = term + term = variation + else: + abbreviation = variation + term = term + + t = self.blackboard.find_term(term_str=term) + + if t is None: + t = self.blackboard.add_term(term=term) + + if abbreviation and not abbreviation in t.variations: + t.variations.append(abbreviation) + + t.occurrences.append(source.id) + yield TermExtracted(term=t) + yield OccurrenceResolved(term=t, source=source) diff --git a/archive/2025/summer/bsc_gerg/src/knowledge/llm/lemmatize.py b/archive/2025/summer/bsc_gerg/src/knowledge/llm/lemmatize.py new file mode 100644 index 000000000..0a7bfb7b9 --- /dev/null +++ b/archive/2025/summer/bsc_gerg/src/knowledge/llm/lemmatize.py @@ -0,0 +1,46 @@ +from typing import AsyncIterable + +from src.terminology.event import TermExtracted, Event, TermNormalized +from src.terminology.terminology import TermNormalizer + +DEVELOPER_PROMPT = """ +You are an expert in linguistics and languages. +Your job is to transform words and phrases into a normalized and generalized form. +You transform words and phrases into singular form. +You do not replace words with other similar words. +""" + +DEVELOPER_PROMPT_SHORT: str = """ +Bringen den folgenden Begriff in eine Basisform. Behalte die Wortart. +""" + +EXAMPLE_USER: list[str] = [ + "örtlicher Zusatz", + "örtliche Zusätze", + "Betra", + "Aufgabe der Triebfahrzeugführerin", + "Triebfahrzeugführerin", + "Rangierbegleitender", +] + +OUTPUT_ASSISTANT = [ + "örtlicher Zusatz", + "örtlicher Zusatz", + "Betra", + "Aufgabe der Triebfahrzeugführer", + "Triebfahrzeugführer", + "Rangierbegleiter", +] + +EXAMPLES = [message for input_term, output_term in zip(EXAMPLE_USER, OUTPUT_ASSISTANT) for message in + [("user", input_term), ("assistant", output_term)]] + +class LLMTermLemmatizer(TermNormalizer): + + async def get_llm_response(self, term: str) -> str: + pass + + async def activate(self, event: TermExtracted) -> AsyncIterable[Event]: + response = await self.get_llm_response(event.term.text) + event.term.normalization = response + yield TermNormalized(term=event.term) \ No newline at end of file |