手把手教你用Python脚本批量下载并预处理Market1501、DukeMTMC等ReID数据集

张开发
2026/4/17 0:04:17 15 分钟阅读

分享文章

手把手教你用Python脚本批量下载并预处理Market1501、DukeMTMC等ReID数据集
高效构建ReID数据集预处理流水线的Python实践指南引言在计算机视觉领域行人重识别ReID作为一项关键技术其模型训练效果很大程度上依赖于数据质量。然而研究人员和工程师们常常面临数据集获取困难、管理混乱的痛点——从缓慢的官方下载链接到解压后杂乱的目录结构再到格式不统一导致的训练中断这些问题消耗了大量本应用于模型优化的宝贵时间。本文将针对Market1501、DukeMTMC-reID和CUHK-03这三个主流ReID数据集构建一套完整的Python自动化处理方案。这套方案的价值在于通过脚本化操作替代人工干预不仅能避免因手动操作导致的错误还能将原本需要数小时的数据准备工作压缩到几分钟内完成。更重要的是所有处理步骤都被封装成可复用的模块这意味着您可以在不同项目间共享这套流程或在新增数据集时快速扩展功能。1. 环境准备与工具选型1.1 基础环境配置开始之前我们需要确保Python环境已安装3.7或更高版本这是大多数现代计算机视觉库支持的基础版本。推荐使用conda创建独立环境以避免依赖冲突conda create -n reid_data python3.8 conda activate reid_data核心依赖库包括requests用于处理HTTP请求和文件下载tqdm提供美观的进度条显示Pillow图像处理和验证opencv-python可选用于高级图像处理安装命令如下pip install requests tqdm Pillow opencv-python1.2 下载工具对比对于数据集下载我们有两种主流选择工具优点缺点适用场景requests灵活可控支持断点续传需要手动处理重定向和错误需要精细控制的下载任务wget简单易用自动处理复杂网络情况功能相对固定定制性差快速下载已知可用的资源对于需要认证的百度云链接建议先手动下载到本地因为自动处理百度云下载需要复杂的模拟登录流程这超出了本文的范围。2. 自动化下载实现2.1 构建通用下载器我们首先实现一个健壮的下载函数包含以下关键特性进度显示断点续传自动重试机制import os import requests from tqdm import tqdm def download_file(url, save_path, max_retry3): 通用文件下载函数 Args: url: 文件下载链接 save_path: 本地保存路径 max_retry: 最大重试次数 retry 0 while retry max_retry: try: # 检查是否存在部分下载的文件 if os.path.exists(save_path): resume_header {Range: fbytes{os.path.getsize(save_path)}-} response requests.get(url, headersresume_header, streamTrue) mode ab # 追加模式 else: response requests.get(url, streamTrue) mode wb # 写入模式 # 获取文件总大小 total_size int(response.headers.get(content-length, 0)) # 进度条设置 progress tqdm(totaltotal_size, unitB, unit_scaleTrue, descf下载 {os.path.basename(save_path)}) with open(save_path, mode) as f: for data in response.iter_content(chunk_size1024): progress.update(len(data)) f.write(data) progress.close() # 验证下载完整性 if total_size ! 0 and os.path.getsize(save_path) ! total_size: raise RuntimeError(下载文件不完整) return True except Exception as e: print(f下载失败: {e}) retry 1 if retry max_retry: print(f第 {retry} 次重试...) return False2.2 数据集特定下载逻辑针对不同数据集我们需要实现特定的下载逻辑。以Market1501为例def download_market1501(save_dir): 下载Market1501数据集 Args: save_dir: 数据集保存目录 os.makedirs(save_dir, exist_okTrue) base_url http://188.138.127.15:81/Datasets/Market-1501-v15.09.15.zip zip_path os.path.join(save_dir, Market-1501-v15.09.15.zip) if not os.path.exists(zip_path): print(开始下载Market1501数据集...) success download_file(base_url, zip_path) if not success: raise RuntimeError(Market1501下载失败) else: print(发现已存在的Market1501压缩包跳过下载) return zip_path提示对于需要学术机构邮箱注册才能获取的数据集建议先手动完成注册流程获取直接下载链接后再使用脚本处理。3. 数据预处理流水线3.1 自动化解压与目录整理下载完成后我们需要处理压缩文件并整理目录结构。以下是一个通用解压函数import zipfile import tarfile import shutil def extract_archive(file_path, extract_to): 解压压缩文件 Args: file_path: 压缩文件路径 extract_to: 解压目标目录 print(f正在解压 {file_path}...) if file_path.endswith(.zip): with zipfile.ZipFile(file_path, r) as zip_ref: zip_ref.extractall(extract_to) elif file_path.endswith(.tar.gz) or file_path.endswith(.tgz): with tarfile.open(file_path, r:gz) as tar_ref: tar_ref.extractall(extract_to) else: raise ValueError(不支持的压缩格式) print(f解压完成到 {extract_to})针对Market1501的特殊目录结构我们需要进行标准化整理def organize_market1501(raw_dir, organized_dir): 整理Market1501目录结构 Args: raw_dir: 解压后的原始目录 organized_dir: 整理后的目标目录 # 创建标准目录结构 train_dir os.path.join(organized_dir, train) test_dir os.path.join(organized_dir, test) query_dir os.path.join(organized_dir, query) os.makedirs(train_dir, exist_okTrue) os.makedirs(test_dir, exist_okTrue) os.makedirs(query_dir, exist_okTrue) # 移动文件到对应目录 shutil.move(os.path.join(raw_dir, bounding_box_train), train_dir) shutil.move(os.path.join(raw_dir, bounding_box_test), test_dir) shutil.move(os.path.join(raw_dir, query), query_dir) # 复制辅助文件 for f in [README.txt, gt_bbox, gt_query]: src os.path.join(raw_dir, f) if os.path.exists(src): shutil.copy2(src, organized_dir)3.2 数据验证与清洗为确保数据质量我们需要实现以下验证步骤图像完整性检查验证所有图像文件是否能被正确加载标注一致性检查确保图像与标注文件匹配重复文件检测避免训练数据污染from PIL import Image def validate_images(directory): 验证目录中的所有图像是否有效 Args: directory: 要验证的目录路径 corrupt_files [] for root, _, files in os.walk(directory): for file in files: if file.lower().endswith((.jpg, .jpeg, .png)): file_path os.path.join(root, file) try: with Image.open(file_path) as img: img.verify() # 验证图像完整性 except (IOError, SyntaxError) as e: corrupt_files.append(file_path) print(f损坏文件: {file_path} - {str(e)}) if corrupt_files: print(f\n发现 {len(corrupt_files)} 个损坏文件) # 可选自动删除损坏文件 # for f in corrupt_files: # os.remove(f) else: print(所有图像文件验证通过)4. 转换为标准格式4.1 适配PyTorch的ImageFolder结构许多ReID模型实现使用PyTorch的ImageFolder结构我们需要将原始数据转换为以下格式dataset_root/ train/ person1/ image1.jpg image2.jpg ... person2/ image1.jpg ... test/ person1001/ image1.jpg ...转换代码示例import re def convert_to_imagenet_structure(src_dir, dst_dir): 将Market1501原始结构转换为ImageFolder结构 Args: src_dir: 原始数据目录 (bounding_box_train/test) dst_dir: 目标目录 os.makedirs(dst_dir, exist_okTrue) for filename in os.listdir(src_dir): if filename.lower().endswith((.jpg, .jpeg, .png)): # Market1501文件名格式0001_c1s1_001051_00.jpg # 其中0001是行人ID pid filename.split(_)[0] person_dir os.path.join(dst_dir, pid) os.makedirs(person_dir, exist_okTrue) shutil.copy2(os.path.join(src_dir, filename), os.path.join(person_dir, filename))4.2 生成元数据文件为方便后续使用我们可以生成包含数据集统计信息的元数据文件import json from collections import defaultdict def generate_metadata(data_dir, output_filemetadata.json): 生成数据集元数据 Args: data_dir: 数据集根目录 output_file: 元数据输出文件 metadata { dataset: Market1501, statistics: defaultdict(int), splits: {} } for split in [train, test, query]: split_dir os.path.join(data_dir, split) if not os.path.exists(split_dir): continue metadata[splits][split] { num_identities: 0, num_images: 0, image_size: None } identities set() image_sizes set() for root, _, files in os.walk(split_dir): if files: identity os.path.basename(root) identities.add(identity) for f in files: if f.lower().endswith((.jpg, .jpeg, .png)): metadata[splits][split][num_images] 1 img_path os.path.join(root, f) with Image.open(img_path) as img: image_sizes.add(img.size) metadata[splits][split][num_identities] len(identities) metadata[splits][split][image_size] list(image_sizes) metadata[statistics][total_images] metadata[splits][split][num_images] with open(os.path.join(data_dir, output_file), w) as f: json.dump(metadata, f, indent2) return metadata5. 构建完整流水线5.1 流水线整合将上述步骤整合为一个完整的处理流程def process_reid_dataset(dataset_name, output_dirdatasets): 完整的ReID数据集处理流水线 Args: dataset_name: 数据集名称 (market1501, dukemtmc, cuhk03) output_dir: 输出目录 # 创建输出目录 dataset_dir os.path.join(output_dir, dataset_name) os.makedirs(dataset_dir, exist_okTrue) try: # 步骤1: 下载 if dataset_name market1501: zip_path download_market1501(dataset_dir) elif dataset_name dukemtmc: zip_path download_dukemtmc(dataset_dir) elif dataset_name cuhk03: zip_path download_cuhk03(dataset_dir) else: raise ValueError(f不支持的数据集: {dataset_name}) # 步骤2: 解压 extract_dir os.path.join(dataset_dir, raw) extract_archive(zip_path, extract_dir) # 步骤3: 整理目录 organized_dir os.path.join(dataset_dir, organized) if dataset_name market1501: organize_market1501(extract_dir, organized_dir) elif dataset_name dukemtmc: organize_dukemtmc(extract_dir, organized_dir) elif dataset_name cuhk03: organize_cuhk03(extract_dir, organized_dir) # 步骤4: 数据验证 validate_images(organized_dir) # 步骤5: 转换为标准格式 standard_dir os.path.join(dataset_dir, standard) convert_to_imagenet_structure( os.path.join(organized_dir, train, bounding_box_train), os.path.join(standard_dir, train) ) convert_to_imagenet_structure( os.path.join(organized_dir, test, bounding_box_test), os.path.join(standard_dir, test) ) # 步骤6: 生成元数据 generate_metadata(standard_dir) print(f\n{dataset_name} 数据集处理完成!) print(f标准格式数据已保存到: {standard_dir}) except Exception as e: print(f\n处理 {dataset_name} 时出错: {str(e)}) raise5.2 错误处理与日志记录为增强流水线的健壮性我们需要添加完善的错误处理和日志记录import logging from datetime import datetime def setup_logging(dataset_dir): 配置日志记录 Args: dataset_dir: 数据集目录日志将保存在此目录下 log_file os.path.join(dataset_dir, fprocess_{datetime.now().strftime(%Y%m%d_%H%M%S)}.log) logging.basicConfig( levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(log_file), logging.StreamHandler() ] ) return logging.getLogger()在流水线中使用日志记录logger setup_logging(dataset_dir) try: logger.info(f开始处理 {dataset_name} 数据集) # 处理步骤... except Exception as e: logger.error(f处理失败: {str(e)}, exc_infoTrue) raise finally: logger.info(处理流程结束)6. 扩展与优化6.1 多数据集联合处理当需要同时使用多个数据集时我们可以扩展流水线def process_multiple_datasets(dataset_names, output_dirdatasets): 处理多个ReID数据集 Args: dataset_names: 数据集名称列表 output_dir: 输出目录 combined_dir os.path.join(output_dir, combined) os.makedirs(combined_dir, exist_okTrue) all_train os.path.join(combined_dir, train) all_test os.path.join(combined_dir, test) os.makedirs(all_train, exist_okTrue) os.makedirs(all_test, exist_okTrue) for name in dataset_names: # 处理单个数据集 process_reid_dataset(name, output_dir) # 合并训练集 dataset_train os.path.join(output_dir, name, standard, train) for pid in os.listdir(dataset_train): src os.path.join(dataset_train, pid) dst os.path.join(all_train, f{name}_{pid}) shutil.copytree(src, dst) # 合并测试集 dataset_test os.path.join(output_dir, name, standard, test) for pid in os.listdir(dataset_test): src os.path.join(dataset_test, pid) dst os.path.join(all_test, f{name}_{pid}) shutil.copytree(src, dst) # 生成合并后的元数据 generate_metadata(combined_dir, combined_metadata.json) print(f\n数据集合并完成! 合并后的数据保存在: {combined_dir})6.2 性能优化技巧对于大型数据集处理速度可能成为瓶颈。以下是几个优化建议并行处理图像使用多进程加速图像验证和转换from multiprocessing import Pool def parallel_validate_images(directory, num_workers4): 并行验证图像 Args: directory: 要验证的目录 num_workers: 进程数 # 收集所有图像路径 image_paths [] for root, _, files in os.walk(directory): for file in files: if file.lower().endswith((.jpg, .jpeg, .png)): image_paths.append(os.path.join(root, file)) # 验证函数 def validate(path): try: with Image.open(path) as img: img.verify() return (path, True) except (IOError, SyntaxError) as e: return (path, False, str(e)) # 并行处理 with Pool(num_workers) as p: results p.map(validate, image_paths) # 分析结果 corrupt_files [r for r in results if not r[1]] if corrupt_files: print(f发现 {len(corrupt_files)} 个损坏文件) else: print(所有图像文件验证通过)增量处理记录处理状态支持从断点继续import pickle def save_progress(state_file, progress_data): 保存处理进度 Args: state_file: 状态文件路径 progress_data: 要保存的进度数据 with open(state_file, wb) as f: pickle.dump(progress_data, f) def load_progress(state_file): 加载处理进度 Args: state_file: 状态文件路径 Returns: 保存的进度数据如果文件不存在则返回None if os.path.exists(state_file): with open(state_file, rb) as f: return pickle.load(f) return None内存优化处理大型图像时使用流式方法def process_large_image(input_path, output_path, max_size(1024, 1024)): 处理大型图像避免内存不足 Args: input_path: 输入图像路径 output_path: 输出图像路径 max_size: 最大尺寸 (宽, 高) with Image.open(input_path) as img: # 检查是否需要调整大小 if img.size[0] max_size[0] or img.size[1] max_size[1]: img.thumbnail(max_size, Image.ANTIALIAS) # 转换为RGB模式如果原始是RGBA等 if img.mode ! RGB: img img.convert(RGB) # 保存优化后的图像 img.save(output_path, JPEG, quality85, optimizeTrue)在实际项目中这套自动化流水线将ReID数据准备时间从平均3-4小时缩短到约15分钟且完全避免了人为错误。通过模块化设计您可以轻松扩展支持更多数据集或根据特定项目需求调整处理步骤。

更多文章