Skip to content

Chunking

ChunkingExtractor

Extractor that chunks extraction and aggregates results per key. This extractor calls the base extraction function multiple times (for each chunk in the document) on the same input text, passing some previous context to each subsequent call.

Pass llm=None with verbose=True to get the number of chunks per document without inference.

WARNING: If a Token that is greater than max_char_buffer is encountered, it becomes its own chunk. This edge case can produce chunks that are larger than max_char_buffer would allow.

Parameters:

Name Type Description Default
aggregator Aggregator

Method to aggregate the llm output for the individual chunks before returning

required
return_as_list list[str] | None

List of field names to return as lists of all extracted values

None
tokenizer Tokenizer | None

tokenizer to use for chunking

None
max_char_buffer int

Max chunk size in characters

20000
verbose bool

Adds verbose logging

False
**kwargs

Additional keyword arguments passed to the base extraction function.

{}
Source code in src/kibad_llm/extractors/chunking.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class ChunkingExtractor:
    """Extractor that chunks extraction and aggregates results per key.
    This extractor calls the base extraction function multiple times
    (for each chunk in the document) on the same input text,
    passing some previous context to each subsequent call.

    Pass llm=None with verbose=True to get the number of chunks per document without inference.

    WARNING:
    If a Token that is greater than max_char_buffer is encountered, it becomes its own chunk.
    This edge case can produce chunks that are larger than max_char_buffer would allow.

    Args:
        aggregator: Method to aggregate the llm output for the individual chunks before returning
        return_as_list: List of field names to return as lists of all extracted values
        tokenizer: tokenizer to use for chunking
        max_char_buffer: Max chunk size in characters
        verbose: Adds verbose logging
        **kwargs: Additional keyword arguments passed to the base extraction function.
    """

    def __init__(
        self,
        aggregator: Aggregator,
        return_as_list: list[str] | None = None,
        tokenizer: tokenizer_lib.Tokenizer | None = None,
        max_char_buffer: int = 20000,
        verbose: bool = False,
        **kwargs,
    ):
        self.aggregator = aggregator
        self.return_as_list = return_as_list or []
        self.default_kwargs = kwargs
        self.tokenizer = tokenizer
        self.max_char_buffer = max_char_buffer
        self.verbose = verbose

    def __call__(self, *args, **kwargs) -> dict[str, Any]:
        text = kwargs.pop("text", None)
        if text is None:
            text = args[0]

        text_id = kwargs.pop("text_id", None)
        if text_id is None:
            text_id = args[-1]

        combined_kwargs = {**self.default_kwargs, **kwargs}

        chunks = _document_chunk_iterator(
            document=text,
            max_char_buffer=self.max_char_buffer,
            tokenizer=self.tokenizer,
        )

        results = []
        if self.verbose:
            logger.info(f"starting processing for text {text_id}")
            logger.info(f"{text[:100]}[...]{text[100:]}" if len(text) > 200 else text)
            logging.info(f"{str(len(chunks)).rjust(4, ' ')} chunks in document {args[-1]}")
            # wrapping in tqdm doesn't change the functionality but upsets mypy.
            # hence we need the '# type: ignore' comment
            chunks = tqdm(chunks, desc=text_id)  # type: ignore
        for i, chunk in enumerate(chunks):
            current_result = extract_from_text_lenient(
                text=text,
                text_id=f"{text_id}_chunk_{i}",
                **combined_kwargs,
                # This may raise an error if character_start or character_end is already provided via kwargs,
                # but we want to be strict about not allowing that since it would interfere with the chunking logic.
                character_start=chunk.char_interval.start_pos or 0,
                character_end=chunk.char_interval.end_pos,
            )
            results.append(current_result)

        structured_outputs = [v.get("structured", None) for v in results]
        aggregated_structured = self.aggregator(structured_outputs)

        result: dict[str, Any] = {
            "structured": aggregated_structured,
        }
        for field in self.return_as_list:
            result[f"{field}_list"] = [v.get(field, None) for v in results]
        return result