Skip to content

Extract vocabulary enums

extract_vocabulary_enums.py

This script extracts all vocabulary enum values from the Faktencheck database. It looks for: 1. Vocabulary tables (those starting with "vocabulary_") referenced in queries.yaml 2. Direct columns in core_zotaddon and related tables that contain choice values

Inspired by: db_converter.py and database_unique_summary.py

Usage example

python extract_vocabulary_enums.py --top-n 100

extract_all_vocabulary_enums(queries_yaml_path, host, port, database, user, password, top_n)

Extract all vocabulary enum values from the database. Inspired by get_unique_single_and_multi_values() and show_unique_values_summary()

Source code in src/kibad_llm/data_integration/extract_vocabulary_enums.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def extract_all_vocabulary_enums(
    queries_yaml_path: str,
    host: str,
    port: int,
    database: str,
    user: str,
    password: str,
    top_n: int,
) -> dict[str, list[Any]]:
    """
    Extract all vocabulary enum values from the database.
    Inspired by get_unique_single_and_multi_values() and show_unique_values_summary()
    """
    with open(queries_yaml_path, encoding="utf-8") as f:
        queries_yaml = yaml.safe_load(f)

    vocab_query_names = queries_yaml.get("VOCAB_QUERY_NAMES", {})

    if not vocab_query_names:
        logger.warning("No VOCAB_QUERY_NAMES found in queries.yaml")
        return {}

    logger.info(f"Found {len(vocab_query_names)} vocabulary queries to process")

    # Map field names to their vocabulary table suffixes
    vocab_tables: dict[str, str] = {}
    for field_name, query_key in vocab_query_names.items():
        query = queries_yaml.get(query_key)
        if query:
            table_suffix = extract_vocabulary_table_name(query)
            if table_suffix:
                vocab_tables[field_name] = table_suffix
                logger.info(f"Field '{field_name}' → vocabulary_{table_suffix}")
            else:
                logger.warning(f"Could not extract vocabulary table from query: {query_key}")
        else:
            logger.warning(f"Query key '{query_key}' not found in queries.yaml")

    result: dict[str, list[Any]] = {}

    with psycopg2.connect(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password,
    ) as conn:
        with conn.cursor() as cursor:
            # Extract from vocabulary tables
            for field_name, table_suffix in vocab_tables.items():
                logger.info(f"Querying vocabulary_{table_suffix}...")
                values = query_vocabulary_values(cursor, table_suffix)

                if len(values) <= top_n:
                    result[field_name] = values
                    logger.info(f"  Found {len(values)} values (included)")
                else:
                    logger.info(f"  Found {len(values)} values (excluded, exceeds top_n={top_n})")

            # Extract from direct columns
            for field_name, (table, column) in DIRECT_COLUMNS.items():
                logger.info(f"Querying {table}.{column}...")
                values = query_direct_column_values(cursor, table, column)

                if len(values) <= top_n:
                    result[field_name] = values
                    logger.info(f"  Found {len(values)} values (included)")
                else:
                    logger.info(f"  Found {len(values)} values (excluded, exceeds top_n={top_n})")

    return result

extract_vocabulary_table_name(query)

Extract the vocabulary table name (without prefix) from a SQL query.

Source code in src/kibad_llm/data_integration/extract_vocabulary_enums.py
43
44
45
46
47
48
49
50
51
def extract_vocabulary_table_name(query: str) -> str | None:
    """
    Extract the vocabulary table name (without prefix) from a SQL query.
    """
    pattern = r"vocabulary_(\w+)"
    match = re.search(pattern, query, re.IGNORECASE)
    if match:
        return normalize_table_suffix(match.group(1))
    return None

main(queries_path, host, port, database, user, password, top_n)

Main entry point for running the vocabulary extraction process.

Source code in src/kibad_llm/data_integration/extract_vocabulary_enums.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def main(
    queries_path: Path,
    host: str,
    port: str,
    database: str,
    user: str,
    password: str,
    top_n: int,
):
    """
    Main entry point for running the vocabulary extraction process.
    """
    logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")

    logger.info(f"Reading queries from: {queries_path}")
    logger.info(f"Connecting to database: {host}:{port}/{database}")
    logger.info(f"Top-N threshold: {top_n}")

    vocab_enums = extract_all_vocabulary_enums(
        queries_yaml_path=str(queries_path),
        host=host,
        port=int(port),
        database=database,
        user=user,
        password=password,
        top_n=top_n,
    )

    logger.info("\n" + "=" * 80)
    logger.info("VOCABULARY ENUM VALUES")
    logger.info("=" * 80 + "\n")

    print(json.dumps(vocab_enums, indent=2, ensure_ascii=False))

    logger.info("\n" + "=" * 80)
    logger.info(f"Total fields included: {len(vocab_enums)}")
    logger.info(f"Total unique values: {sum(len(v) for v in vocab_enums.values())}")
    logger.info("=" * 80)

normalize_table_suffix(s)

Normalize the table suffix by removing underscores.

Example

"direct_driver" -> "directdriver"

Source code in src/kibad_llm/data_integration/extract_vocabulary_enums.py
33
34
35
36
37
38
39
40
def normalize_table_suffix(s: str) -> str:
    """
    Normalize the table suffix by removing underscores.

    Example:
        "direct_driver" -> "directdriver"
    """
    return re.sub(r"_+", "", s or "")

query_direct_column_values(cursor, table, column)

Query all distinct values from a specific column in a table.

Source code in src/kibad_llm/data_integration/extract_vocabulary_enums.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def query_direct_column_values(cursor: Any, table: str, column: str) -> list[Any]:
    """
    Query all distinct values from a specific column in a table.
    """
    table_ident = sql.Identifier(table)
    column_ident = sql.Identifier(column)
    query = sql.SQL("SELECT DISTINCT {} FROM {} WHERE {} IS NOT NULL ORDER BY {} ASC").format(
        column_ident, table_ident, column_ident, column_ident
    )

    try:
        cursor.execute(query)
        rows = cursor.fetchall()
        return [row[0] for row in rows]
    except psycopg2.Error as e:
        logger.error(f"Error querying {table}.{column}: {e}")
        cursor.connection.rollback()
        return []

query_nested_column_values(cursor, query_template)

Execute a query that returns nested data (e.g., with multiple columns).

Returns a list of dictionaries where keys are column names.

Source code in src/kibad_llm/data_integration/extract_vocabulary_enums.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def query_nested_column_values(cursor: Any, query_template: str) -> list[dict[str, Any]]:
    """
    Execute a query that returns nested data (e.g., with multiple columns).

    Returns a list of dictionaries where keys are column names.
    """
    try:
        cursor.execute(query_template.replace("%s", "NULL"))  # Get structure without specific ID
        return []  # Nested values need to be queried per record
    except psycopg2.Error as e:
        logger.error(f"Error with nested query: {e}")
        cursor.connection.rollback()
        return []

query_vocabulary_values(cursor, table_suffix)

Query all possible values from a specific vocabulary table.

Source code in src/kibad_llm/data_integration/extract_vocabulary_enums.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def query_vocabulary_values(cursor: Any, table_suffix: str) -> list[str]:
    """
    Query all possible values from a specific vocabulary table.
    """
    normalized = normalize_table_suffix(table_suffix)
    table_ident = sql.Identifier(f"vocabulary_{normalized}")
    query = sql.SQL("SELECT name FROM {} ORDER BY name ASC").format(table_ident)

    try:
        cursor.execute(query)
        rows = cursor.fetchall()
        return [row[0] for row in rows]
    except psycopg2.Error as e:
        logger.error(f"Error querying vocabulary_{normalized}: {e}")
        cursor.connection.rollback()
        return []