文件到 DataFrame 加载器
DataLoader 模块用于使用自定义加载器函数将数据文件内容加载到 DataFrame 中。此加载器函数可以配置为使用不同的处理方法,例如 "single_thread"
、"dask"
或 "dask_thread"
,这由 MORPHEUS_FILE_DOWNLOAD_TYPE
环境变量决定。当 download_method
为 "dask"
或 "dask_thread"
时,将创建一个 Dask 客户端来处理文件;否则,将使用单线程。
处理后,生成的 DataFrame 会使用文件路径的哈希值进行缓存。除了从磁盘加载数据外,此加载器还能够从 S3 存储桶加载文件内容。
在加载 DataLoader 模块时使用以下配置,指定 DataLoader 模块在将文件加载到 DataFrame 中时应使用 file_to_df
加载器。
{
"loaders": [{
"id": "file_to_df"
}]
}
注意:加载器可以在运行时通过控制消息从 load
任务接收配置。
可以在加载任务级别为此特定加载器配置的参数
参数 |
类型 |
描述 |
示例值 |
默认值 |
---|---|---|---|---|
batcher_config |
字典 | 批处理选项 | 请参考下方 | [必需] |
files |
数组 | 要加载的文件列表 | ["/path/to/input/files"] |
[] |
loader_id |
字符串 | 加载器的唯一标识符 | "file_to_df" |
[必需] |
键 |
类型 |
描述 |
示例值 |
默认值 |
---|---|---|---|---|
cache_dir |
字符串 | 用于缓存滚动窗口数据的目录 | "/path/to/cache" |
- |
file_type |
字符串 | 输入文件的类型 | "csv" |
"JSON" |
filter_null |
布尔值 | 是否滤除空值 | true |
false |
parser_kwargs |
字典 | 传递给解析器的关键字参数 | {"delimiter": ","} |
- |
schema |
字典 | 输入数据的模式 | 请参考下方 | - |
timestamp_column_name |
字符串 | 时间戳列的名称 | "timestamp" |
- |
以下 JSON 配置指定了如何在运行时通过控制消息任务将其他配置传递给加载器。
{
"type": "load",
"properties": {
"loader_id": "file_to_df",
"files": ["/path/to/input/files"],
"batcher_config": {
"timestamp_column_name": "timestamp_column_name",
"schema": "string",
"file_type": "JSON",
"filter_null": false,
"parser_kwargs": {
"delimiter": ","
},
"cache_dir": "/path/to/cache"
}
}
}
注意:file_batcher
模块当前在内部生成任务并将其分配给控制消息,然后将其发送到 DataLoader 模块,该模块使用 file_to_df_loader
。 声明了这一点后,此加载器配置是从 File Batcher 模块配置中获取的。