在实时计算Flink作业中,数据倾斜是一个常见的问题。它会导致某些Task实例处理的数据量远大于其他Task实例,从而影响作业的整体性能。通常表现为部分task cpu 持续100%,而其他task 运行正常。
数据倾斜通常是由于以下原因导致的:
Flink提供了多种并行度调整策略,可以根据实际情况选择合适的策略。
在作业提交之前,根据数据的总体情况,手动设置合适的并行度。 这种方法简单直接,适用于数据量相对稳定,倾斜程度不高的场景。
如何操作:
SET 'parallelism.default' = 128; 来设置默认并行度。env.setParallelism(128); 来设置默认并行度。.setParallelism(256) 来单独设置并行度。Flink可以根据作业的运行状态,自动调整并行度。 这种方法可以更好地应对数据量的变化和倾斜程度的变化。
如何操作:
execution.adaptive-scheduler.enabled: true
如果Key的分布不均匀,可以尝试重新分配Key,使数据更加均匀地分布到不同的Task实例上。
keyBy或rebalance进行手动分区:keyBy: 根据Key进行分区,将相同Key的数据发送到同一个Task实例。 适用于需要根据Key进行聚合、Join等操作的场景。rebalance: 将数据均匀地分配到不同的Task实例,可以缓解数据倾斜。适用于不需要根据Key进行聚合、Join等操作的场景。示例:
DataStream<Tuple2<String, Integer>> stream = ...;
// 使用keyBy进行分区
DataStream<Tuple2<String, Integer>> keyedStream = stream.keyBy(value -> value.f0);
// 使用rebalance进行重新分配
DataStream<Tuple2<String, Integer>> rebalancedStream = stream.rebalance();
可以编写自定义的Partitioner,根据Key的值,将数据分配到不同的Task实例。 这种方法可以根据实际情况,实现更灵活的分区策略。
示例:
public class MyPartitioner implements Partitioner<String> {
private int numPartitions;
@Override
public void setup(int numPartitions) {
this.numPartitions = numPartitions;
}
@Override
public int partition(String key, int numPartitions) {
// 根据Key的值,计算分区ID
return Math.abs(key.hashCode()) % numPartitions;
}
}
DataStream<Tuple2<String, Integer>> stream = ...;
// 使用自定义Partitioner进行分区
DataStream<Tuple2<String, Integer>> partitionedStream = stream.partitionCustom(new MyPartitioner(), value -> value.f0);
在进行聚合操作之前,可以先在每个Task实例上进行本地聚合,减少数据量,从而缓解数据倾斜。
如何操作:
reduce或aggregate进行本地聚合: 在keyBy之后,先使用reduce或aggregate进行本地聚合,然后再进行全局聚合。示例:
DataStream<Tuple2<String, Integer>> stream = ...;
// 使用keyBy进行分区
KeyedStream<Tuple2<String, Integer>, String> keyedStream = stream.keyBy(value -> value.f0);
// 使用reduce进行本地聚合
DataStream<Tuple2<String, Integer>> reducedStream = keyedStream.reduce((value1, value2) -> new Tuple2<>(value1.f0, value1.f1 + value2.f1));
// 使用aggregate进行本地聚合
DataStream<Tuple2<String, Integer>> aggregatedStream = keyedStream.aggregate(new AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> createAccumulator() {
return new Tuple2<>("", 0);
}
@Override
public Tuple2<String, Integer> add(Tuple2<String, Integer> value, Tuple2<String, Integer> accumulator) {
return new Tuple2<>(value.f0, accumulator.f1 + value.f1);
}
@Override
public Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {
return accumulator;
}
@Override
public Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {
return new Tuple2<>(a.f0, a.f1 + b.f1);
}
});
将聚合操作分为两个阶段:第一阶段进行本地聚合,第二阶段进行全局聚合。 这种方法可以有效地减少数据量,缓解数据倾斜。
如何操作:
map操作,将Key进行随机化处理,然后进行本地聚合。示例:
DataStream<Tuple2<String, Integer>> stream = ...;
// 第一阶段:本地聚合
DataStream<Tuple2<String, Integer>> localAggregatedStream = stream
.map(value -> new Tuple2<>(value.f0 + "_" + RandomUtils.nextInt(10), value.f1))
.keyBy(value -> value.f0)
.sum(1);
// 第二阶段:全局聚合
DataStream<Tuple2<String, Integer>> globalAggregatedStream = localAggregatedStream
.map(value -> new Tuple2<>(value.f0.substring(0, value.f0.lastIndexOf("_")), value.f1))
.keyBy(value -> value.f0)
.sum(1);
在使用Join操作时,可以使用Bloom Filter来过滤掉一些不可能Join上的数据,从而减少数据量,缓解数据倾斜。
如何操作:
需要注意的是,Bloom Filter存在一定的误判率,可能会导致一些可以Join上的数据被过滤掉。
将热点Key拆分成多个Key,然后将数据分配到不同的Task实例上。 这种方法可以有效地缓解数据倾斜。
如何操作:
key_1, key_2, key_3 等。示例:
DataStream<Tuple2<String, Integer>> stream = ...;
// 拆分热点Key
DataStream<Tuple2<String, Integer>> splittedStream = stream
.map(value -> {
if (value.f0.equals("hot_key")) {
return new Tuple2<>("hot_key_" + RandomUtils.nextInt(10), value.f1);
} else {
return value;
}
});
以上是一些常见的Flink作业数据倾斜并行度调整方案。 在实际应用中,需要根据具体情况选择合适的方案,并进行调整和优化。 监控作业的性能指标,例如CPU利用率、反压等,可以帮助你更好地了解作业的运行状态,并及时发现和解决数据倾斜问题。
希望这些方案能够帮助你解决Flink作业中的数据倾斜问题! Good luck! 👍