Skip to content

Aggregation utils

AggregationError

Bases: ValueError

Raised when aggregation of structured outputs fails due to conflicts or inconsistencies.

Source code in src/kibad_llm/extractors/aggregation_utils.py
12
13
14
15
class AggregationError(ValueError):
    """Raised when aggregation of structured outputs fails due to conflicts or inconsistencies."""

    pass

aggregate_majority_vote(structured_outputs, skip_type_mismatches=False)

Aggregate structured outputs from multiple extractions.

Entries with the same key are aggregated based on their value types: - Primitive types (str, int, float, bool): majority vote - Dict types: majority vote - List types: majority vote per item

This is meant to aggregate outputs from repeated queries with the same schema, where each extraction may produce slightly different results due to LLM variability. The majority vote ensures that only values consistently appearing across extractions are kept, reducing noise and improving reliability.

Parameters:

Name Type Description Default
structured_outputs InputType

list of structured outputs from multiple extractions

required
skip_type_mismatches bool

If True, skips keys with inconsistent types across extractions instead of raising an error (default: False)

False

Returns: aggregated structured output or None if all entries are None

Source code in src/kibad_llm/extractors/aggregation_utils.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def aggregate_majority_vote(
    structured_outputs: InputType, skip_type_mismatches: bool = False
) -> OutputType:
    """Aggregate structured outputs from multiple extractions.

    Entries with the same key are aggregated based on their value types:
    - Primitive types (str, int, float, bool): majority vote
    - Dict types: majority vote
    - List types: majority vote per item

    This is meant to aggregate outputs from repeated queries with the same schema,
    where each extraction may produce slightly different results due to LLM variability.
    The majority vote ensures that only values consistently appearing across extractions
    are kept, reducing noise and improving reliability.

    Args:
        structured_outputs: list of structured outputs from multiple extractions
        skip_type_mismatches: If True, skips keys with inconsistent types across extractions
            instead of raising an error (default: False)
    Returns:
        aggregated structured output or None if all entries are None
    """
    if all(res is None for res in structured_outputs):
        return None

    values_per_key, type_per_key = collect_values_and_type_per_key(
        structured_outputs, skip_type_mismatches=skip_type_mismatches
    )

    aggregated: dict[str, Any] = dict()
    for key, values in values_per_key.items():
        value_type = type_per_key.get(key, None)
        if value_type is None:
            # if all values are None
            aggregated[key] = None
        else:
            # Aggregate based on type
            # Note: None values participate in voting intentionally. For repeated queries
            # on the same input, frequent None results indicate genuine extraction difficulty,
            # and the aggregated result should reflect this uncertainty.
            if issubclass(value_type, (str, int, float, bool)):
                # single-value: majority vote for primitive types
                aggregated[key] = _majority_vote(values, exclude_none=False)
            elif issubclass(value_type, dict):
                # single-value: majority vote for dicts
                values_hashable = [
                    make_hashable_simple(v) if v is not None else None for v in values
                ]
                majority_hashable = _majority_vote(values_hashable, exclude_none=False)
                # convert back to dict
                mapping = dict(zip(values_hashable, values))
                aggregated[key] = (
                    mapping[majority_hashable] if majority_hashable is not None else None
                )
            elif issubclass(value_type, list):
                # multi-value: majority vote per item for list types
                # explicitly pass the number of structured outputs since some values may
                # be None and thus not in current values
                aggregated[key] = _multi_entry_majority_vote(values)
            else:
                raise NotImplementedError(f"Unsupported value type for aggregation: {value_type}")

    return aggregated

aggregate_single_majority_vote_multi_union(structured_outputs, skip_type_mismatches=False)

Aggregate structured outputs from multiple extractions.

Entries with the same key are aggregated based on their value types: - Primitive types (str, int, float, bool): majority vote - Dict types: majority vote - List types: union of all items across extractions

This is meant to aggregate outputs from queries over different document chunks, where single-valued fields (primitives, dicts) should converge to a consistent value via majority vote, while multi-valued fields (lists) may contain different valid items from each chunk that should all be collected.

Parameters:

Name Type Description Default
structured_outputs InputType

list of structured outputs from multiple extractions

required
skip_type_mismatches bool

If True, skips keys with inconsistent types across extractions instead of raising an error (default: False)

False

Returns:

Type Description
OutputType

aggregated structured output or None if all entries are None

Source code in src/kibad_llm/extractors/aggregation_utils.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def aggregate_single_majority_vote_multi_union(
    structured_outputs: InputType, skip_type_mismatches: bool = False
) -> OutputType:
    """Aggregate structured outputs from multiple extractions.

    Entries with the same key are aggregated based on their value types:
    - Primitive types (str, int, float, bool): majority vote
    - Dict types: majority vote
    - List types: union of all items across extractions

    This is meant to aggregate outputs from queries over different document chunks,
    where single-valued fields (primitives, dicts) should converge to a consistent
    value via majority vote, while multi-valued fields (lists) may contain different
    valid items from each chunk that should all be collected.

    Args:
        structured_outputs: list of structured outputs from multiple extractions
        skip_type_mismatches: If True, skips keys with inconsistent types across extractions
            instead of raising an error (default: False)

    Returns:
        aggregated structured output or None if all entries are None
    """

    if all(res is None for res in structured_outputs):
        return None

    values_per_key, type_per_key = collect_values_and_type_per_key(
        structured_outputs, skip_type_mismatches=skip_type_mismatches
    )

    aggregated: dict[str, Any] = dict()
    for key, values in values_per_key.items():
        value_type = type_per_key.get(key, None)
        if value_type is None:
            # if all values are None
            aggregated[key] = None
        else:
            # Aggregate based on type
            if issubclass(value_type, (str, int, float, bool)):
                # single-value: this should be identical across all outputs, raises AggregationError if not
                aggregated[key] = _majority_vote(values, exclude_none=True)
            elif issubclass(value_type, dict):
                # single-value: this should be identical across all outputs, raises AggregationError if not
                # make dicts hashable for comparison
                values_hashable = [
                    make_hashable_simple(v) if v is not None else None for v in values
                ]
                majority_hashable = _majority_vote(values_hashable, exclude_none=True)
                # convert back to dict
                mapping = dict(zip(values_hashable, values))
                aggregated[key] = (
                    mapping[majority_hashable] if majority_hashable is not None else None
                )
            elif issubclass(value_type, list):
                # multi-value: union per item for list types
                aggregated[key] = _multi_entry_union(values)
            else:
                raise NotImplementedError(f"Unsupported value type for aggregation: {value_type}")

    return aggregated

aggregate_unanimous(structured_outputs, skip_type_mismatches=False)

Aggregate structured outputs with non-overlapping keys.

Combines results from multiple extractions where each extraction is expected to populate different keys (e.g., when a complex schema is split into multiple simpler queries). Each key should appear with a non-None value in at most one extraction.

Parameters:

Name Type Description Default
structured_outputs InputType

list of structured outputs from multiple extractions

required
skip_type_mismatches bool

If True, skips keys with inconsistent types across extractions instead of raising an error (default: False)

False

Returns:

Type Description
OutputType

aggregated structured output or None if all entries are None

Raises:

Type Description
AggregationError

If the same key has non-None values in multiple extractions

Source code in src/kibad_llm/extractors/aggregation_utils.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def aggregate_unanimous(
    structured_outputs: InputType, skip_type_mismatches: bool = False
) -> OutputType:
    """Aggregate structured outputs with non-overlapping keys.

    Combines results from multiple extractions where each extraction is expected to
    populate different keys (e.g., when a complex schema is split into multiple simpler
    queries). Each key should appear with a non-None value in at most one extraction.

    Args:
        structured_outputs: list of structured outputs from multiple extractions
        skip_type_mismatches: If True, skips keys with inconsistent types across extractions
            instead of raising an error (default: False)

    Returns:
        aggregated structured output or None if all entries are None

    Raises:
        AggregationError: If the same key has non-None values in multiple extractions
    """

    if all(res is None for res in structured_outputs):
        return None

    values_per_key, type_per_key = collect_values_and_type_per_key(
        structured_outputs, skip_type_mismatches=skip_type_mismatches
    )

    aggregated: dict[str, Any] = dict()
    for key, values in values_per_key.items():
        value_type = type_per_key.get(key, None)
        if value_type is None:
            # if all values are None
            aggregated[key] = None
        else:
            # Aggregate based on type: all non-None values must be identical, otherwise raise error
            if issubclass(value_type, (str, int, float, bool)):
                aggregated[key] = _aggregate_unanimous(values)
            elif issubclass(value_type, dict):
                # make dicts hashable for comparison
                values_hashable = [
                    make_hashable_simple(v) if v is not None else None for v in values
                ]
                majority_hashable = _aggregate_unanimous(values_hashable)
                # convert back to dict
                mapping = dict(zip(values_hashable, values))
                aggregated[key] = (
                    mapping[majority_hashable] if majority_hashable is not None else None
                )
            elif issubclass(value_type, list):
                values_hashable = [
                    make_hashable_simple(v) if v is not None else None for v in values
                ]
                majority_hashable = _aggregate_unanimous(values_hashable)
                # convert back to dict
                mapping = dict(zip(values_hashable, values))
                aggregated[key] = (
                    mapping[majority_hashable] if majority_hashable is not None else None
                )
            else:
                raise NotImplementedError(f"Unsupported value type for aggregation: {value_type}")

    return aggregated

collect_values_and_type_per_key(structured_outputs, skip_type_mismatches=False)

Collect values and types per key from structured outputs.

Parameters:

Name Type Description Default
structured_outputs list[dict | None]

list of structured outputs from multiple extractions

required
skip_type_mismatches bool

If True, skips keys with inconsistent types across extractions instead of raising an error (default: False)

False

Returns: tuple of: - dict mapping keys to list of values - dict mapping keys to their consistent type (or None if all values are None)

Source code in src/kibad_llm/extractors/aggregation_utils.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def collect_values_and_type_per_key(
    structured_outputs: list[dict | None], skip_type_mismatches: bool = False
) -> tuple[dict[str, list], dict[str, type | None]]:
    """Collect values and types per key from structured outputs.

    Args:
        structured_outputs: list of structured outputs from multiple extractions
        skip_type_mismatches: If True, skips keys with inconsistent types across extractions
            instead of raising an error (default: False)
    Returns:
        tuple of:
            - dict mapping keys to list of values
            - dict mapping keys to their consistent type (or None if all values are None)
    """
    # collect all keys to correctly handle missing entries
    all_keys: set[str] = set()
    for res in structured_outputs:
        if res is not None:
            all_keys.update(res.keys())
    values_per_key = defaultdict(list)
    type_per_key: dict[str, type | None] = dict()
    # get values and type per key
    for res in structured_outputs:
        # skip if complete structured_output is None (LLM query failed)
        if res is not None:
            for key in all_keys:
                value = res.get(key, None)
                values_per_key[key].append(value)
                if value is not None:
                    if key not in type_per_key:
                        type_per_key[key] = type(value)
                    else:
                        if type_per_key[key] != type(value):
                            if not skip_type_mismatches:
                                raise AggregationError(
                                    f"Inconsistent types for key '{key}': "
                                    f"{type_per_key[key]} vs {type(value)}"
                                )
                            else:
                                type_per_key[key] = None
    return values_per_key, type_per_key