Get Started with DataChain¶
开始使用 DataChain¶
🔨Wrangle unstructured AI data at scale
🔨大规模处理非结构化 AI 数据
Datachain enables multimodal API calls and local AI inferences to run in parallel over many samples as chained operations. The resulting datasets can be saved, versioned, and sent directly to PyTorch and TensorFlow for training. Datachain can persist features of Python objects returned by AI models, and enables vectorized analytical operations over them.
Datachain 支持多模态 API 调用和本地 AI 推理作为链式操作在多个样本上并行运行。生成的数据集可以保存、版本控制并直接发送到 PyTorch 和 TensorFlow 进行训练。Datachain 可以持久保存 AI 模型返回的 Python 对象的特征,并支持对它们进行矢量化分析操作。
The typical use cases are data curation, LLM analytics and validation, image segmentation, pose detection, and GenAI alignment. Datachain is especially helpful if batch operations can be optimized – for instance, when synchronous API calls can be parallelized or where an LLM API offers batch processing.
典型使用案例包括数据管理、LLM 分析和验证、图像分割、姿势检测和 GenAI 对齐。如果可以优化批处理操作,例如,当同步 API 调用可以并行化时,或者 LLM API 提供批处理时,数据链将特别有用。
pip install datachain
Operation basics¶
操作基础¶
Datachain is built by composing wrangling operations.
Datachain 是通过组合 wrangling 操作构建的。
For example, let us consider the New Yorker Cartoon caption contest dataset, where cartoons are matched against the potential titles. Let us imagine we want to augment this dataset with synthetic scene descriptions coming from an AI model. The below code takes images from the cloud, and applies PaliGemma model to caption the first five of them and put the results in the column “scene”:
例如,让我们考虑一下 New Yorker Cartoon 字幕竞赛数据集,其中卡通与可能的标题相匹配。假设我们想用来自 AI 模型的合成场景描述来增强这个数据集。下面的代码从云端获取图像,并应用 PaliGemma 模型为其中的前五个图像添加标题,并将结果放在 “scene” 列中:
#
# pip install transformers
#
from datachain.lib.dc import Column, DataChain, File
from transformers import AutoProcessor, PaliGemmaForConditionalGeneration
images = DataChain.from_storage("gs://datachain-demo/newyorker_caption_contest/images", type="image")
model = PaliGemmaForConditionalGeneration.from_pretrained("google/paligemma-3b-mix-224")
processor = AutoProcessor.from_pretrained("google/paligemma-3b-mix-224")
def process(file: File) -> str:
image=file.read().convert("RGB")
inputs = processor(text="caption", images=image, return_tensors="pt")
generate_ids = model.generate(**inputs, max_new_tokens=100)
return processor.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
chain = (
images.limit(5)
.settings(cache=True)
.map(scene=lambda file: process(file), output = str)
.save()
)
Here is how we can view the results in a plot:
以下是我们如何在图中查看结果:
import matplotlib.pyplot as plt
import re
from textwrap import wrap
def trim_text(text):
match = re.search(r'[A-Z][^.]*\.', text)
return match.group(0) if match else ''
images = chain.collect("file")
captions = chain.collect("scene")
_ , axes = plt.subplots(1, len(captions), figsize=(15, 5))
for ax, img, caption in zip(axes, images, captions):
ax.imshow(img.read(),cmap='gray')
ax.axis('off')
wrapped_caption = "\n".join(wrap(trim_text(caption), 30))
ax.set_title(wrapped_caption, fontsize=6)
plt.show()
If interested to see more multimodal examples for DataChain, please follow this tutorial:
如果有兴趣查看 DataChain 的更多多模态示例,请遵循以下教程:
https://github.com/iterative/datachain-examples/blob/main/multimodal/clip_fine_tuning.ipynb Google Colab
Handling Python objects¶
处理 Python 对象¶
In addition to storing primitive Python data types like strings, DataChain is also capable of using data models.
除了存储字符串等原始 Python 数据类型外,DataChain 还能够使用数据模型。
For example, most LLMs return objects that carry additional fields. If provider offers a Pydantic model for their LLM, Datachain can use it as a schema.
例如,大多数 LLMs 返回带有附加字段的对象。如果提供商为其 LLM,则 Datachain 可以将其用作模式。
In the below example, we are calling a Mixtral 8x22b model to judge the “service chatbot” dataset from Karlsruhe Institute of Technology and saving the results as Mistral’s ChatCompletionResponse objects:
在下面的示例中,我们调用了一个 Mixtral 8x22b 模型来判断卡尔斯鲁厄理工学院的 “service chatbot” 数据集,并将结果保存为 Mistral 的 ChatCompletionResponse 对象:
# pip install mistralai
# this example requires a free Mistral API key, get yours at https://console.mistral.ai
# $ export MISTRAL_API_KEY='your key'
import os
from datachain.lib.feature import Feature
from datachain.lib.dc import Column, DataChain
from mistralai.client import MistralClient
from mistralai.models.chat_completion import ChatMessage
from mistralai.models.chat_completion import ChatCompletionResponse as MistralModel
from datachain.lib.data_model import DataModel
prompt = "Was this dialog successful? Describe the 'result' as 'Yes' or 'No' in a short JSON"
api_key = os.environ["MISTRAL_API_KEY"]
## register the data model ###
DataModel.register(MistralModel)
chain = (
DataChain
.from_storage("gs://datachain-demo/chatbot-KiT/", type="text")
.filter(Column("file.name").glob("*.txt"))
.limit(5)
.settings(parallel=4, cache=True)
.map(
mistral=lambda file: MistralClient(api_key=api_key).chat(
model="open-mixtral-8x22b",
response_format={"type": "json_object"},
messages= [
ChatMessage(role="system", content=f"{prompt}"),
ChatMessage(role="user", content=f"{file.read()}")
]
),
output=MistralModel
)
.save("dialog-rating")
)
**iter = chain.collect("mistral")
**print(*map(lambda chat_response: chat_response.choices[0].message.content, iter))
If you are interested in more LLM evaluation examples for DataChain, please follow this tutorial:
如果您对更多 DataChain 的 LLM 评估示例感兴趣,请关注以下教程:
https://github.com/iterative/datachain-examples/blob/main/llm/llm_chatbot_evaluation.ipynb Google Colab
Vectorized analytics¶
矢量化分析¶
Datachain internally represents datasets as tables, so analytical queries on the chain are automatically vectorized:
Datachain 在内部将数据集表示为表,因此链上的分析查询会自动矢量化:
# continued from the previous example
mistral_cost = chain.sum("mistral.usage.prompt_tokens")*0.000002 + \
chain.sum("mistral.usage.completion_tokens")*0.000006
print(f"The cost of {chain.count()} calls to Mixtral 8x22b : ${mistral_cost:.4f}")
Dataset persistence¶
数据集持久化¶
The “save” operation makes chain dataset persistent in the current (working) directory of the query. A hidden folder .datachain/
holds the records. A persistent dataset can be accessed later to start a derivative chain:
“save” 操作使链数据集持久化在查询的当前 (工作) 目录中。隐藏文件夹 .datachain/
保存记录。稍后可以访问持久数据集以启动衍生链:
Persistent datasets are immutable and automatically versioned. Here is how to access the dataset registry:
持久性数据集是不可变的,并且会自动进行版本控制。以下是访问数据集注册表的方法:
mydatasets = DataChain.datasets()
for ds in mydatasets.collect("dataset"):
print(f"{ds.name}@v{ds.version}")
Processed: 1 rows [00:00, 777.59 rows/s]
Generated: 14 rows [00:00, 11279.34 rows/s]
dialog-rating@v1
dialog-rating@v2
By default, when a saved dataset is loaded, the latest version is fetched but another version can be requested:
默认情况下,加载保存的数据集时,会获取最新版本,但可以请求另一个版本:
Chain execution, optimization and parallelism¶
Datachain avoids redundant operations. Execution is triggered only when a downstream operation requests the processed results. However, it would be inefficient to run, say, LLM queries again every time you just want to collect several objects from the chain.
“Save” operation nails execution results and automatically refers to them every time the downstream functions ask for data. Saving without an explicit name generates an auto-named dataset which serves the same purpose.
Datachain natively supports parallelism in execution. If an API or a local model supports parallel requests, the settings
operator can split the load across multiple workers (see the code example above)
Reading external metadata¶
It is common for AI data to come with pre-computed metadata (annotations, classes, etc).
DataChain library understands common annotation formats (JSON, CSV, webdataset and parquet), and can unite data samples from storage with side-loaded metadata. The schema for metadata can be set explicitly or be inferred.
Here is an example of reading a simple CSV file where schema is heuristically derived from the header:
from datachain.lib.dc import DataChain
uri="gs://datachain-demo/chatbot-csv/"
csv_dataset = DataChain.from_csv(uri)
print(csv_dataset.to_pandas())
Reading metadata from JSON format is a more complicated scenario because a JSON-annotated dataset typically references data samples in blocks within JSON files.
Here is an example from MS COCO “captions” JSON which employs separate sections for image meta and captions:
{
"images": [
{
"license": 4,
"file_name": "000000397133.jpg",
"coco_url": "http://images.cocodataset.org/val2017/000000397133.jpg",
"height": 427,
"width": 640,
"date_captured": "2013-11-14 17:02:52",
"flickr_url": "http://farm7.staticflickr.com/6116/6255196340_da26cf2c9e_z.jpg",
"id": 397133
},
...
],
"annotations": [
{
"image_id" : "179765",
"id" : 38,
"caption" : "A black Honda motorcycle parked in front of a garage."
},
...
],
...
}
Note how complicated the setup is. Every image is references by the name, and the metadata for this file is keyed by the “id” field. This same field is references later in the “annotations’ array, which is present in JSON files describing captions and the detected instances. The categories for the instances are stored in the “categories” array.
However, Datachain can easily parse the entire COCO structure via several reading and merging operators:
from datachain.lib.dc import Column, DataChain
images_uri="gs://datachain-demo/coco2017/images/val/"
captions_uri="gs://datachain-demo/coco2017/annotations/captions_val2017.json"
images = DataChain.from_storage(images_uri)
meta = DataChain.from_json(captions_uri, jmespath = "images")
captions = DataChain.from_json(captions_uri, jmespath = "annotations")
images_meta = images.merge(meta, on="file.name", right_on="images.file_name")
captioned_images = images_meta.merge(captions, on="images.id", right_on="annotations.image_id")
The resulting dataset has image entries as files decorated with all the metadata and captions:
images_with_dogs = captioned_images.filter(Column("annotations.caption").glob("*dog*"))
images_with_dogs.select("annotations", "file.name").show()
captions captions captions file
image_id id caption name
0 17029 778902 a dog jumping to catch a frisbee in a yard 000000017029.jpg
1 17029 779838 A dog jumping to catch a red frisbee in a garden 000000017029.jpg
2 17029 781941 The dog is catching the Frisbee in mid air in ... 000000017029.jpg
3 17029 782283 A dog catches a frisbee outside in the grass. 000000017029.jpg
4 17029 783543 A dog leaping to catch a red frisbee. 000000017029.jpg
5 18193 278544 A woman in green sweater eating a hotdog by ca... 000000018193.jpg
...
[Limited by 20 rows]
要深入了解如何使用 JSON 元数据,请遵循以下教程:
https://github.com/iterative/datachain-examples/blob/main/formats/json-metadata-tutorial.ipynb Google Colab
Passing data to training¶
Chain results can be exported or passed directly to Pytorch dataloader. For example, if we are interested in passing three columns to training, the following Pytorch code will do it:
ds = train.select("file", "caption_choices", "label_ind").to_pytorch(
transform=preprocess,
tokenizer=clip.tokenize,
)
loader = DataLoader(ds, batch_size=2)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
train(loader, model, optimizer)
See a larger example for CLIP fine-tuning here:
https://github.com/iterative/datachain-examples/blob/main/multimodal/clip_fine_tuning.ipynb Google Colab