Skip to content

Db converter

format_result(result, column_names)

Formats a result tuple into a dictionary. If the results contain multiple values, the results are paired with their corresponding column names. Otherwise, the result is returned as is.

Parameters:

Name Type Description Default
result tuple

the result to be formatted

required
column_names list[str]

the column names to be used as keys

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: the formatted result

Source code in src/kibad_llm/data_integration/db_converter.py
45
46
47
48
49
50
51
52
53
54
55
56
57
def format_result(result: tuple, column_names: list[str]) -> dict[str, Any]:
    """
    Formats a result tuple into a dictionary. If the results contain multiple values, the results
    are paired with their corresponding column names. Otherwise, the result is returned as is.

    Args:
        result (tuple): the result to be formatted
        column_names (list[str]): the column names to be used as keys

    Returns:
        dict[str, Any]: the formatted result
    """
    return dict(zip(column_names, result)) if len(result) > 1 else result[0]

main(filepath, queries_path, host, port, database, user, password)

Main function to execute the query and convert the results to JSONL format.

Parameters:

Name Type Description Default
filepath Path

path to the output file

required
queries_path Path

path to yaml file including SQL queries

required
host str

database host

required
port str

database port

required
database str

database name

required
user str

database user

required
password str

database password

required
Source code in src/kibad_llm/data_integration/db_converter.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def main(
    filepath: Path,
    queries_path: Path,
    host: str,
    port: str,
    database: str,
    user: str,
    password: str,
) -> None:
    """
    Main function to execute the query and convert the results to JSONL format.

    Args:
        filepath (Path): path to the output file
        queries_path (Path): path to yaml file including SQL queries
        host (str): database host
        port (str): database port
        database (str): database name
        user (str): database user
        password (str): database password
    """

    # load CORE_QUERY and VOCAB_QUERIES queries from queries.yaml
    with open(queries_path, encoding="utf-8") as f:
        queries_yaml = yaml.safe_load(f)

    core_query = queries_yaml["CORE_QUERY"]
    vocab_queries = {k: queries_yaml[v] for k, v in queries_yaml["VOCAB_QUERY_NAMES"].items()}

    with psycopg2.connect(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password,
    ) as conn:

        with conn.cursor() as cursor:

            results: list[tuple] = query_core(cursor, core_query)
            if cursor.description is None:
                raise ValueError("Cursor description is None. Query might have failed.")
            column_names: list[str] = [desc[0] for desc in cursor.description]
            with open(filepath, "w", encoding="utf-8") as f:
                for result in tqdm(results, "Exporting DB entries to JSONL"):
                    row_dict: dict[str, Any] = dict(zip(column_names, result))
                    for query_name, query in vocab_queries.items():
                        vocab_results: list[tuple] = query_core(cursor, query, (result[0],))
                        if cursor.description is None:
                            raise ValueError(
                                "Cursor description is None. Query might have failed."
                            )
                        vocab_column_names: list[str] = [desc[0] for desc in cursor.description]

                        if query_name in SINGLE_ENTITIES:
                            if vocab_results:
                                if len(vocab_results) > 1:
                                    raise ValueError(
                                        f"Expected a single result only for {query_name}, got {len(vocab_results)}."
                                    )
                                row_dict[query_name] = format_result(
                                    vocab_results[0], vocab_column_names
                                )
                            else:
                                row_dict[query_name] = None
                        else:
                            if vocab_results:
                                row_dict[query_name] = [
                                    format_result(vocab_result, vocab_column_names)
                                    for vocab_result in vocab_results
                                ]
                            else:
                                row_dict[query_name] = None

                    f.write(json.dumps(row_dict, ensure_ascii=False, sort_keys=True) + "\n")

query_core(cursor, query, query_vars=None)

Queries the core data from the database.

Parameters:

Name Type Description Default
cursor cursor

cursor to execute the query

required
query str

query to execute

required
query_vars tuple | None

arguments to pass to the query

None

Returns:

Type Description
list[tuple]

list[tuple]: the results of the query

Source code in src/kibad_llm/data_integration/db_converter.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def query_core(
    cursor: psycopg2.extensions.cursor, query: str, query_vars: tuple | None = None
) -> list[tuple]:
    """
    Queries the core data from the database.

    Args:
        cursor (psycopg2.extensions.cursor): cursor to execute the query
        query (str): query to execute
        query_vars (tuple | None): arguments to pass to the query

    Returns:
        list[tuple]: the results of the query
    """
    cursor.execute(query, query_vars)

    return cursor.fetchall()