【架构实战】数据湖架构设计与实践

张开发
2026/4/5 17:18:42 15 分钟阅读

分享文章

【架构实战】数据湖架构设计与实践
一、为什么需要数据湖传统数据仓库的局限性- 结构化数据为主难以处理非结构化数据- Schema固定难以适应业务变化- 数据预处理后丢失细节- 存储成本高数据湖的特点- 存储各种类型数据结构化、半结构化、非结构化- Schema-on-Read读时模式- 保留原始数据可重复计算- 降低数据存储成本## 二、数据湖架构### 1. 核心组件数据源├── 日志系统Kafka├── 业务数据库MySQL/PostgreSQL├── 埋点数据App/Web├── 外部API└── 文件上传S3/HDFS ↓数据采集层├── Flume日志├── DebeziumCDC├── Sqoop批量└── Kafka Connect ↓数据存储层├── 对象存储MinIO/S3/HDFS├── Delta Lake/Iceberg/Hudi└── 数据目录Apache Atlas ↓数据处理层├── Spark├── Flink└── Presto/Trino ↓数据应用层├── BI报表├── 数据科学└── ML平台### 2. 技术选型对比| 组件 | 选项 | 推荐 ||------|------|------|| 存储 | HDFS/S3/MinIO | S3云/MinIO私有 || 格式 | Parquet/ORC/Avro | Parquet || 表格式 | Delta/Iceberg/Hudi | Delta Lake || 查询引擎 | Presto/Trino/Spark | Trino || 元数据 | Hive Metastore/Glue | Hive Metastore |## 三、Delta Lake实战### 1. Delta Lake简介Delta Lake是Databricks开源的存储层提供- ACID事务- 可扩展元数据处理- 时间旅行Time Travel- 模式强制和演化- 流批一体### 2. Spark Delta Lake依赖配置xmldependency groupIdio.delta/groupId artifactIddelta-core_2.12/artifactId version2.4.0/version/dependency写入数据pythonfrom delta.tables import DeltaTablefrom pyspark.sql import SparkSessionspark SparkSession.builder \ .appName(DataLakeDemo) \ .config(spark.sql.extensions, io.delta.sql.DeltaSparkSessionExtension) \ .config(spark.sql.catalog.spark_catalog, org.apache.spark.sql.delta.catalog.DeltaCatalog) \ .getOrCreate()# 批量写入df spark.read.format(json).load(/data/events/*.json)df.write \ .format(delta) \ .mode(overwrite) \ .partitionBy(date, event_type) \ .option(mergeSchema, true) \ .save(/delta/events)流式写入python# Kafka - Delta Lakestreaming_df spark.readStream \ .format(kafka) \ .option(kafka.bootstrap.servers, localhost:9092) \ .option(subscribe, events) \ .load()query streaming_df.selectExpr(CAST(key AS STRING), CAST(value AS STRING)) \ .writeStream \ .format(delta) \ .option(checkpointLocation, /delta/events/_checkpoints) \ .outputMode(append) \ .start(/delta/events)读取数据python# 读取最新数据df spark.read.format(delta).load(/delta/events)# 时间旅行 - 读取历史版本df_v1 spark.read \ .format(delta) \ .option(versionAsOf, 1) \ .load(/delta/events)# 时间旅行 - 读取指定时间点df_before spark.read \ .format(delta) \ .option(timestampAsOf, 2024-01-01T00:00:00) \ .load(/delta/events)增量读取python# 获取增量数据deltaTable DeltaTable.forPath(spark, /delta/events)# 只读取新数据newDF deltaTable.toDF().filter(col(date) 2024-01-15)### 3. UPSERT操作pythonfrom delta.tables import DeltaTable# Merge操作UPSERTdeltaTable DeltaTable.forPath(spark, /delta/users)deltaTable.alias(old).merge( updatesDF.alias(new), old.user_id new.user_id).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()### 4. 数据优化python# VACUUM - 删除不需要的文件保留7天deltaTable.vacuum(retentionHours 168)# OPTIMIZE - 优化小文件deltaTable.optimize().where(date 2024-01-15).zOrderBy(user_id).execute()## 四、Hudi实战### 1. Hudi简介HudiHadoop Upsert Delete and Incremental特点- 支持UPSERT/DELETE- 增量拉取- 多种表类型Copy on Write / Merge on Read### 2. Spark Hudipythonfrom pyspark.sql import SparkSessionspark SparkSession.builder \ .appName(HudiDemo) \ .config(spark.serializer, org.apache.spark.serializer.KryoSerializer) \ .config(spark.sql.extensions, org.apache.spark.sql.hudi.HoodieSparkSessionExtension) \ .getOrCreate()# 写入数据hoodie_options { hoodie.table.name: events, hoodie.datasource.write.recordkey.field: id, hoodie.datasource.write.partitionpath.field: date, hoodie.datasource.write.table.type: COPY_ON_WRITE, hoodie.datasource.write.operation: bulk_insert, hoodie.datasource.write.precombine.field: ts, hoodie.upsert.shuffle.parallelism: 200, hoodie.insert.shuffle.parallelism: 200}df.write \ .format(hudi) \ .options(**hoodie_options) \ .mode(append) \ .save(hdfs://namenode:8020/hudi/events)### 3. 增量拉取python# 增量拉取spark.read \ .format(hudi) \ .load(hdfs://namenode:8020/hudi/events) \ .createOrReplaceTempView(hudi_events_snapshot)# 获取指定时间点后的数据incremental_df spark.sql( SELECT * FROM hudi_events_snapshot WHERE hoodie_commit_time 20240115120000)## 五、数据湖最佳实践### 1. 表设计sql-- 使用分区表CREATE TABLE events ( id BIGINT, user_id BIGINT, event_type STRING, properties STRING, event_time TIMESTAMP)USING deltaPARTITIONED BY (date STRING, event_type STRING)LOCATION /delta/events-- 配置Z-Order优化OPTIMIZE eventsWHERE date 2024-01-15ZORDER BY (user_id, event_time)### 2. 数据治理python# 数据质量检查from great_expectations import GreatExpectationscontext GreatExpectations()checkpoint context.get_checkpoint(events_quality)results checkpoint.run( batch_request{ datasource_name: my_datasource, data_asset_name: events, })if not results[success]: # 发送告警 send_alert(results[failed_expectations])### 3. 权限控制sql-- 基于列的权限控制GRANT SELECT(event_time, event_type) ON events TO analyst_role;GRANT SELECT ON events TO data_scientist_role;-- 基于行的权限控制CREATE TABLE events_filtered ASSELECT * FROM eventsWHERE CASE WHEN current_user() admin THEN true ELSE date 2024-01-01END## 六、总结数据湖是现代数据平台的核心-Lakehouse结合数据湖和数据仓库优点-Delta Lake成熟的表格式支持ACID-Hudi适合CDC场景支持增量处理-最佳实践分区、Z-Order、数据质量**实施建议**1. 评估数据量和业务需求2. 选择合适的表格式3. 设计合理的分区策略4. 建立数据质量监控—个人观点仅供参考

更多文章