025、分布式计算实战:Spark Core与Spark SQL

张开发
2026/4/9 1:39:40 15 分钟阅读

分享文章

025、分布式计算实战:Spark Core与Spark SQL
从一次深夜调试说起上周三凌晨两点集群告警突然响了。一个跑了六小时的Spark作业卡在99%最后一个stage的200个task里总有那么三五个一直在挣扎。日志里满是FetchFailedException和ExecutorLost的报错数据倾斜那熟悉的味道隔着屏幕都能闻到。这种场景你肯定也遇到过——数据分布不均匀少数几个key扛了几千万条记录几个倒霉的executor内存直接撑爆。今天我们就聊聊怎么用Spark Core和Spark SQL解决这类实战问题。Spark Core理解你的并行引擎先看段真实的生产代码这是出问题的那个stage的简化版valrawRDDsc.textFile(hdfs://data/logs/*.gz).map(lineparseLog(line))// 解析日志返回(key, value).filter(_!null)// 过滤脏数据// 问题就出在这个groupByKey上valgroupedRDDrawRDD.groupByKey()// 这里踩过坑groupByKey默认不进行map端合并.mapValues(valuesprocessBatch(values))groupedRDD.saveAsTextFile(hdfs://output/result)看起来挺干净是吧问题在于groupByKey()会把某个key对应的所有values都拉到同一个节点上做聚合。如果某个key特别热比如user_id0默认用户或者cityunknown那个节点就惨了。改进方案一用reduceByKey替代// 先做map端局部聚合大幅减少shuffle数据量valreducedRDDrawRDD.map(kv(kv._1,1))// 假设我们只是计数.reduceByKey(__,200)// 第二个参数是分区数根据数据量调整改进方案二加盐打散倾斜key// 对热点key添加随机前缀valsaltedRDDrawRDD.map{case(key,value)if(isHotKey(key)){valsalt(math.random*100).toInt(s${key}_$salt,value)}else{(key,value)}}// 第一次聚合valfirstAggsaltedRDD.reduceByKey(mergeFunc)// 去掉盐值二次聚合valfinalResultfirstAgg.map{case(key,value)valoriginalKeykey.split(_)(0)(originalKey,value)}.reduceByKey(mergeFunc)分区数设置是个经验活。我的一般原则是每个分区的数据量控制在128MB以内但分区总数不要超过executor_cores * executor_instances * 3。别设太大调度开销会让你哭的。Spark SQL声明式编程的甜与苦切换到Spark SQL后代码清爽多了但坑一点没少。有次我写了这么个查询SELECTuser_id,COUNT(*)ascntFROMlogsLEFTJOINuser_infoONlogs.user_iduser_info.idWHEREuser_info.register_date2023-01-01GROUPBYuser_idORDERBYcntDESCLIMIT100看起来标准吧执行计划一出来就傻眼了——先做了全表JOIN然后才过滤register_date几百G的数据在集群里来回搬。Spark SQL的优化器没那么智能你得手动引导。优化后的写法WITHactive_usersAS(SELECTidFROMuser_infoWHEREregister_date2023-01-01-- 先过滤数据量降两个数量级)SELECTlogs.user_id,COUNT(*)ascntFROMlogsINNERJOINactive_usersONlogs.user_idactive_users.id-- 用INNER替代LEFTGROUPBYlogs.user_idORDERBYcntDESCLIMIT100这里还有个细节默认的spark.sql.shuffle.partitions是200对于大表JOIN来说太小了。我通常在作业开头加上spark.conf.set(spark.sql.shuffle.partitions,1000)spark.conf.set(spark.sql.autoBroadcastJoinThreshold,104857600)// 100MB适当调大广播JOIN用好了是神器用不好就是OOM炸弹。小表确实可以广播但别忘了算上序列化后的内存开销实际占用可能比文件大小多30%。调试技巧看透Spark UI遇到慢任务别急着改代码先看Spark UI。几个关键指标Stage页面的Shuffle Read/Write如果某个stage的write特别大考虑加map端聚合Executor页面的GC时间如果超过10%调大executor内存或者换G1GCSQL页面的执行计划看到BroadcastHashJoin就安心看到SortMergeJoin就要警惕数据倾斜我习惯在关键转换后加个cache()但一定记得后面要unpersist()。见过太多作业因为缓存了中间RDD内存占满后频繁spill到磁盘反而更慢了。配置经验谈这些参数是我压测出来的黄金组合适合大多数场景spark.executor.memory8g # 别设太大YARN的NM会不高兴 spark.executor.memoryOverhead2g # 堆外内存Parquet操作很需要 spark.serializerorg.apache.spark.serializer.KryoSerializer spark.sql.adaptive.enabledtrue # 自适应查询执行Spark 3.0后必开 spark.sql.files.maxPartitionBytes134217728 # 128MB和HDFS块对齐最后说几句Spark用起来像开车——自动挡简单但想开得快还得懂手动模式。别迷信DataFrame API就一定比RDD快复杂的多阶段处理里RDD的精细控制反而更有效。生产环境永远先跑小样本数据看看执行计划再全量跑。遇到数据倾斜别怕sample()取样分析key分布针对性打散。记住集群资源是有限的最优雅的实现往往不是最高效的在简洁性和性能之间找到平衡点这才是工程师的价值。下次我们聊聊Spark Streaming的容错机制那又是另一个深夜故事了。

更多文章