about summary refs log tree commit diff stats
path: root/archive/2025/summer/bsc_gerg/src/terminology/terminology.py
blob: 30353ed128c1bfaa5521cefe59ca78959edd57a8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import asyncio
import uuid
from typing import Optional, Annotated, List, AsyncIterable, Type, Any
from uuid import UUID

from pydantic import BaseModel, Field

from src.terminology.event import Handler, Event, EventDispatcher, DocumentAdded, TextExtracted, TermExtracted, \
    OccurrenceResolved, PartialDefinitionGenerated, TermNormalized
from src.terminology.models import Term, TextSource, Definition


class Blackboard(BaseModel):
    terms: Annotated[List[Term], Field(default_factory=list)]
    sources: Annotated[List[TextSource], Field(default_factory=list)]

    def add_term(self, term: str):
        term = Term(text=term)
        self.terms.append(term)
        return term

    def find_term(self, term_str: str):
        for term in self.terms:
            if term.text == term_str:
                return term

    def add_text_source(self, text: str):
        source = TextSource(id=uuid.uuid4(), text=text)
        self.sources.append(source)
        return source

    def get_text_source(self, id: UUID) -> Optional[TextSource]:
        for source in self.sources:
            if source.id == id:
                return source
        return None


class KnowledgeSource(Handler):
    blackboard: Blackboard

    class Config:
        arbitrary_types_allowed = True



class TextExtractor(KnowledgeSource):
    handles: Annotated[List[Type[Event]], Field(default_factory=lambda: [DocumentAdded])]

    async def activate(self, event: DocumentAdded) -> AsyncIterable[Event]:
        yield


class TermExtractor(KnowledgeSource):
    handles: Annotated[List[Type[Event]], Field(default_factory=lambda: [TextExtracted])]

    async def activate(self, event: TextExtracted) -> AsyncIterable[Event]:
        yield


class TermNormalizer(KnowledgeSource):
    handles: Annotated[List[Type[Event]], Field(default_factory=lambda: [TermExtracted])]

    async def activate(self, event: TermExtracted) -> AsyncIterable[Event]:
        yield


class OccurrenceResolver(KnowledgeSource):
    handles: Annotated[List[Type[Event]], Field(default_factory=lambda: [TermExtracted, TermNormalized])]

    async def activate(self, event: TermExtracted | TermNormalized) -> AsyncIterable[Event]:
        yield


class DefinitionResolver(KnowledgeSource):
    handles: Annotated[List[Type[Event]], Field(default_factory=lambda: [TermExtracted, TermNormalized])]

    async def activate(self, event: Event) -> AsyncIterable[Event]:
        yield


class DefinitionGenerator(KnowledgeSource):
    handles: Annotated[List[Type[Event]], Field(default_factory=lambda: [OccurrenceResolved])]

    async def activate(self, event: OccurrenceResolved) -> AsyncIterable[Event]:
        yield


class DefinitionCombiner(KnowledgeSource):
    handles: Annotated[List[Type[Event]], Field(default_factory=lambda: [PartialDefinitionGenerated])]

    async def activate(self, event: PartialDefinitionGenerated) -> AsyncIterable[Event]:
        yield


class Controller:

    def __init__(self):
        self.blackboard = Blackboard()
        self.knowledge_sources = []
        self.broker = EventDispatcher()

    def register_knowledge_source(self, knowledge_source: Type[KnowledgeSource]):
        knowledge_source.blackboard = self.blackboard
        instance = knowledge_source(blackboard=self.blackboard)
        self.knowledge_sources.append(instance)
        self.broker.register_handler(instance)

    async def emit(self, event: Event):
        async with self.broker.task_group:
            self.broker.emit(event)

    async def analyse_document(self, path: str):
        async with self.broker.task_group:
            self.broker.emit(
                DocumentAdded(path=path)
            )

    async def start(self):
        async with self.broker.task_group:
            self.broker.emit(
                TextExtracted(text="Der Schrankenwärter muss das Gleis sichern.")
            )