博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[case52]聊聊flink KeyedStream的aggregation操作
阅读量:6217 次
发布时间:2019-06-21

本文共 10166 字,大约阅读时间需要 33 分钟。

本文主要研究一下flink KeyedStream的aggregation操作

实例

@Test    public void testMax() throws Exception {        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        WordCount[] data = new WordCount[]{new WordCount(1,"Hello", 1), new                WordCount(1,"World", 3), new WordCount(2,"Hello", 1)};        env.fromElements(data)                .keyBy("word")                .max("frequency")                .addSink(new SinkFunction
() { @Override public void invoke(WordCount value, Context context) throws Exception { LOGGER.info("value:{}",value); } }); env.execute("testMax"); }
  • 这里先对word字段进行keyBy操作,然后再通过KeyedStream的max方法按frequency字段取最大的WordCount

KeyedStream.aggregate

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java

public SingleOutputStreamOperator
sum(int positionToSum) { return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig())); } public SingleOutputStreamOperator
sum(String field) { return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig())); } public SingleOutputStreamOperator
max(int positionToMax) { return aggregate(new ComparableAggregator<>(positionToMax, getType(), AggregationFunction.AggregationType.MAX, getExecutionConfig())); } public SingleOutputStreamOperator
max(String field) { return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAX, false, getExecutionConfig())); } public SingleOutputStreamOperator
min(int positionToMin) { return aggregate(new ComparableAggregator<>(positionToMin, getType(), AggregationFunction.AggregationType.MIN, getExecutionConfig())); } public SingleOutputStreamOperator
min(String field) { return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MIN, false, getExecutionConfig())); } public SingleOutputStreamOperator
maxBy(int positionToMaxBy) { return this.maxBy(positionToMaxBy, true); } public SingleOutputStreamOperator
maxBy(String positionToMaxBy) { return this.maxBy(positionToMaxBy, true); } public SingleOutputStreamOperator
maxBy(int positionToMaxBy, boolean first) { return aggregate(new ComparableAggregator<>(positionToMaxBy, getType(), AggregationFunction.AggregationType.MAXBY, first, getExecutionConfig())); } public SingleOutputStreamOperator
maxBy(String field, boolean first) { return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAXBY, first, getExecutionConfig())); } public SingleOutputStreamOperator
minBy(int positionToMinBy) { return this.minBy(positionToMinBy, true); } public SingleOutputStreamOperator
minBy(String positionToMinBy) { return this.minBy(positionToMinBy, true); } public SingleOutputStreamOperator
minBy(int positionToMinBy, boolean first) { return aggregate(new ComparableAggregator
(positionToMinBy, getType(), AggregationFunction.AggregationType.MINBY, first, getExecutionConfig())); } public SingleOutputStreamOperator
minBy(String field, boolean first) { return aggregate(new ComparableAggregator(field, getType(), AggregationFunction.AggregationType.MINBY, first, getExecutionConfig())); } protected SingleOutputStreamOperator
aggregate(AggregationFunction
aggregate) { StreamGroupedReduce
operator = new StreamGroupedReduce
( clean(aggregate), getType().createSerializer(getExecutionConfig())); return transform("Keyed Aggregation", getType(), operator); }
  • KeyedStream的aggregation方法是protected修饰的,sum、max、min、maxBy、minBy这几个方法实际都是调用aggregate方法,只是它们创建的ComparableAggregator的AggregationType不一样,分别是SUM, MAX, MIN, MAXBY, MINBY
  • 每个sum、max、min、maxBy、minBy都有两个重载方法,一个是int类型的参数,一个是String类型的参数
  • maxBy、minBy比sum、max、min多了first(boolean)参数,该参数用于指定在碰到多个compare值相等时,是否取第一个返回

ComparableAggregator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java

@Internalpublic class ComparableAggregator
extends AggregationFunction
{ private static final long serialVersionUID = 1L; private Comparator comparator; private boolean byAggregate; private boolean first; private final FieldAccessor
fieldAccessor; private ComparableAggregator(AggregationType aggregationType, FieldAccessor
fieldAccessor, boolean first) { this.comparator = Comparator.getForAggregation(aggregationType); this.byAggregate = (aggregationType == AggregationType.MAXBY) || (aggregationType == AggregationType.MINBY); this.first = first; this.fieldAccessor = fieldAccessor; } public ComparableAggregator(int positionToAggregate, TypeInformation
typeInfo, AggregationType aggregationType, ExecutionConfig config) { this(positionToAggregate, typeInfo, aggregationType, false, config); } public ComparableAggregator(int positionToAggregate, TypeInformation
typeInfo, AggregationType aggregationType, boolean first, ExecutionConfig config) { this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, positionToAggregate, config), first); } public ComparableAggregator(String field, TypeInformation
typeInfo, AggregationType aggregationType, boolean first, ExecutionConfig config) { this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, field, config), first); } @SuppressWarnings("unchecked") @Override public T reduce(T value1, T value2) throws Exception { Comparable
o1 = (Comparable) fieldAccessor.get(value1); Object o2 = fieldAccessor.get(value2); int c = comparator.isExtremal(o1, o2); if (byAggregate) { // if they are the same we choose based on whether we want to first or last // element with the min/max. if (c == 0) { return first ? value1 : value2; } return c == 1 ? value1 : value2; } else { if (c == 0) { value1 = fieldAccessor.set(value1, o2); } return value1; } }}
  • ComparableAggregator继承了AggregationFunction,而AggregationFunction则实现了ReduceFunction接口,这里ComparableAggregator实现的reduce方法,它首先借助Comparator来比较两个对象,然后根据是否是byAggregate做不同处理,如果是byAggregate,则在比较值为0时,判断是否返回最先遇到的元素,如果是则返回value1,否则返回value2,比较值非0时,则取比较值最大的元素返回;如果不是byAggregate,则如果比较值为0(比较字段的值value1小于等于value2的情况),则使用反射方法将value2的比较字段的值更新到value1,最后都是返回value1

AggregationFunction

@Internalpublic abstract class AggregationFunction
implements ReduceFunction
{ private static final long serialVersionUID = 1L; /** * Aggregation types that can be used on a windowed stream or keyed stream. */ public enum AggregationType { SUM, MIN, MAX, MINBY, MAXBY, }}
  • AggregationFunction声明实现了ReduceFunction,同时定义了五种类型的AggregationType,分别是SUM, MIN, MAX, MINBY, MAXBY

Comparator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/aggregation/Comparator.java

@Internalpublic abstract class Comparator implements Serializable {    private static final long serialVersionUID = 1L;    public abstract 
int isExtremal(Comparable
o1, R o2); public static Comparator getForAggregation(AggregationType type) { switch (type) { case MAX: return new MaxComparator(); case MIN: return new MinComparator(); case MINBY: return new MinByComparator(); case MAXBY: return new MaxByComparator(); default: throw new IllegalArgumentException("Unsupported aggregation type."); } } private static class MaxComparator extends Comparator { private static final long serialVersionUID = 1L; @Override public
int isExtremal(Comparable
o1, R o2) { return o1.compareTo(o2) > 0 ? 1 : 0; } } private static class MaxByComparator extends Comparator { private static final long serialVersionUID = 1L; @Override public
int isExtremal(Comparable
o1, R o2) { int c = o1.compareTo(o2); if (c > 0) { return 1; } if (c == 0) { return 0; } else { return -1; } } } private static class MinByComparator extends Comparator { private static final long serialVersionUID = 1L; @Override public
int isExtremal(Comparable
o1, R o2) { int c = o1.compareTo(o2); if (c < 0) { return 1; } if (c == 0) { return 0; } else { return -1; } } } private static class MinComparator extends Comparator { private static final long serialVersionUID = 1L; @Override public
int isExtremal(Comparable
o1, R o2) { return o1.compareTo(o2) < 0 ? 1 : 0; } }}
  • Comparator则实现Serializable接口,定义了isExtremal抽象方法,同时提供了getForAggregation工厂方法,根据不同的AggregationType创建不同的Comparator
  • Comparator里头定义了MaxComparator、MinComparator、MinByComparator、MaxByComparator四个子类,它们都实现了isExtremal方法
  • MaxComparator直接利用Comparable接口定义的compareTo方法,不过它的返回只有0和1,compareTo大于0的时候才返回1,否则返回0,也就是大于的情况才返回1,否则返回0;MaxByComparator也先根据Comparable接口定义的compareTo方法获取值,不过它的返回值有3种,大于0的时候返回1,等于0时返回0,小于0时返回-1,也就是大于的情况返回1,相等的情况返回0,小于的情况返回-1

小结

  • KeyedStream的aggregation操作主要分为sum、max、min、maxBy、minBy这几个方法,它们内部都调用了protected修饰的aggregation方法,只是它们创建的ComparableAggregator的AggregationType不一样,分别是SUM, MAX, MIN, MAXBY, MINBY
  • ComparableAggregator继承了AggregationFunction,而AggregationFunction则实现了ReduceFunction接口,这里ComparableAggregator实现的reduce方法,它首先借助Comparator来比较两个对象,然后根据是否是byAggregate做不同处理,如果是byAggregate,则在比较值为0时,判断是否返回最先遇到的元素,如果是则返回最先遇到的,否则返回最后遇到的,比较值非0时,则取比较值最大的元素返回;如果不是byAggregate,则如果比较值为0,则使用反射方法将后者的值更新到value1,最后都是返回value1
  • Comparator里头定义了MaxComparator、MinComparator、MinByComparator、MaxByComparator四个子类,它们都实现了isExtremal方法;MaxComparator与MaxByComparator的区别在于,MaxComparator大于返回1,小于等于返回0,而MaxByComparator返回值更精细,大于返回1,等于返回0,小于返回-1;这个区别也体现在ComparableAggregator的reduce方法中,而且maxBy、minBy比其他方法多了一个first(boolean)参数,专门用于在比较值为的0的时候选择返回哪个元素;而reduce方法对于非byAggregate操作,始终返回的是value1,在比较值小于等于的时候,使用反射更新value1,然后返回value1

doc

转载地址:http://lblja.baihongyu.com/

你可能感兴趣的文章
CentOS下php安装mcrypt扩展
查看>>
2015.10.14-TransactionScope测试
查看>>
Android中MediaMuxer跟MediaCodec用例
查看>>
缓冲区的运用
查看>>
细谈WEB标准
查看>>
经典SQL
查看>>
Gitweb 安装与配置
查看>>
Microsoft.Net中数字签名技术
查看>>
iOS-iOS8模拟器设置中文键盘
查看>>
关于cocos2dx手游lua文件加密的解决方式
查看>>
分布式事务处理模型
查看>>
CSS实现限制显示的字数,超出显示"..."
查看>>
探索ASP.NET MVC5系列之~~~2.视图篇(上)---包含XSS防御和异步分部视图的处理
查看>>
MD5加密算法(转)
查看>>
Vue.2.0.5-条件渲染
查看>>
[译]AngularJS Services 获取后端数据
查看>>
scapy流量嗅探简单使用
查看>>
Hadoop Hive概念学习系列之hive的正则表达式初步(六)
查看>>
Leetcode: Combination Sum IV && Summary: The Key to Solve DP
查看>>
Hibernate整合C3P0实现连接池
查看>>