Skip to content

Compression

open_text(path, *, encoding='utf-8', compression='infer', archive_member=None, errors='strict', newline=None)

Open a file in text mode with optional decompression.

Supported when compression="infer" (by suffix, mirroring pandas): - .gz, .bz2, .xz, .zip, .tar/.tar.gz/.tar.bz2/.tar.xz, .zst

Notes / limitations: - For .zip and .tar* archives: if archive_member is None, the archive must contain exactly ONE regular file, otherwise ValueError is raised. - For .zst: tries stdlib compression.zstd (Python 3.14+, optional module) first; otherwise falls back to third-party 'zstandard' if installed. - Only path-like inputs are supported (not already-open file objects).

Source code in src/kibad_llm/dataset/compression.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@contextmanager
def open_text(
    path: str | Path,
    *,
    encoding: str = "utf-8",
    compression: Compression = "infer",
    archive_member: str | None = None,
    errors: str = "strict",
    newline: str | None = None,
) -> Iterator[io.TextIOBase]:
    """
    Open a file in text mode with optional decompression.

    Supported when compression="infer" (by suffix, mirroring pandas):
      - .gz, .bz2, .xz, .zip, .tar/.tar.gz/.tar.bz2/.tar.xz, .zst

    Notes / limitations:
      - For .zip and .tar* archives: if archive_member is None, the archive must contain
        exactly ONE regular file, otherwise ValueError is raised.
      - For .zst: tries stdlib compression.zstd (Python 3.14+, optional module) first;
        otherwise falls back to third-party 'zstandard' if installed.
      - Only path-like inputs are supported (not already-open file objects).
    """
    method: Compression = _infer_compression(path) if compression == "infer" else compression

    with ExitStack() as stack:
        if method is None:
            yield stack.enter_context(
                open(path, encoding=encoding, errors=errors, newline=newline)
            )
            return

        if method == "gzip":
            yield stack.enter_context(
                gzip.open(path, "rt", encoding=encoding, errors=errors, newline=newline)
            )
            return

        if method == "bz2":
            yield stack.enter_context(
                bz2.open(path, "rt", encoding=encoding, errors=errors, newline=newline)
            )
            return

        if method == "xz":
            yield stack.enter_context(
                lzma.open(path, "rt", encoding=encoding, errors=errors, newline=newline)
            )
            return

        if method == "zip":
            zf = stack.enter_context(zipfile.ZipFile(path))
            names = [n for n in zf.namelist() if not n.endswith("/")]
            if archive_member is None:
                if len(names) != 1:
                    raise ValueError(
                        f"ZIP must contain exactly one file (found {len(names)}): {names}. "
                        "Pass archive_member=... to select one."
                    )
                archive_member = names[0]
            raw = stack.enter_context(zf.open(archive_member))
            yield stack.enter_context(
                io.TextIOWrapper(raw, encoding=encoding, errors=errors, newline=newline)
            )
            return

        if method == "tar":
            tf = stack.enter_context(tarfile.open(path, mode="r:*"))
            members = [m for m in tf.getmembers() if m.isfile()]
            if archive_member is None:
                if len(members) != 1:
                    names = [m.name for m in members]
                    raise ValueError(
                        f"TAR must contain exactly one file (found {len(names)}): {names}. "
                        "Pass archive_member=... to select one."
                    )
                member = members[0]
            else:
                member = tf.getmember(archive_member)

            extracted = tf.extractfile(member)
            if extracted is None:
                raise ValueError(f"Could not extract {member.name!r} from TAR.")
            raw = stack.enter_context(extracted)
            yield stack.enter_context(
                io.TextIOWrapper(raw, encoding=encoding, errors=errors, newline=newline)
            )
            return

        if method == "zst":
            # 1) Prefer stdlib in Python 3.14+: compression.zstd.open(..., 'rt', ...)
            try:
                zstd = importlib.import_module("compression.zstd")
                yield stack.enter_context(
                    zstd.open(path, "rt", encoding=encoding, errors=errors, newline=newline)
                )
                return
            except ModuleNotFoundError:
                pass

            # 2) Fallback: third-party 'zstandard'
            try:
                zstandard = importlib.import_module("zstandard")
            except ModuleNotFoundError as e:
                raise ModuleNotFoundError(
                    "Reading .zst requires either Python 3.14+ with the optional stdlib "
                    "'compression.zstd' module available, or installing 'zstandard'."
                ) from e

            raw_file = stack.enter_context(open(path, "rb"))
            dctx = zstandard.ZstdDecompressor()
            reader = dctx.stream_reader(raw_file)
            stack.callback(reader.close)
            yield stack.enter_context(
                io.TextIOWrapper(reader, encoding=encoding, errors=errors, newline=newline)
            )
            return

        raise ValueError(f"Unsupported compression={method!r}")