【Flink】需求实现之独立访客数量的计算 和 布隆过滤器的原理及使用
创始人
2024-03-21 07:45:11
0

文章目录

  • 一 独立访客数量计算
  • 二 布隆过滤器
    • 1 什么是布隆过滤器
    • 2 实现原理
      • (1)HashMap 的问题
      • (2)布隆过滤器数据结构
    • 3 使用布隆过滤器去重

一 独立访客数量计算

public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.readTextFile("E:\\develop\\MyWork\\flink2022tutorial\\src\\main\\resources\\UserBehavior.csv").map(new MapFunction() {@Overridepublic Example7.UserBehavior map(String value) throws Exception {String[] arr = value.split(",");return new Example7.UserBehavior(arr[0],arr[1],arr[2],arr[3],Long.parseLong(arr[4]) * 1000L);}}).keyBy(r -> r.behavior.equals("pv")).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Example7.UserBehavior element, long recordTimestamp) {return element.timestamp;}})).keyBy(r -> true).window(TumblingEventTimeWindows.of(Time.hours(1))).aggregate(new CountAgg(),new WindowResult()).print();env.execute();
}public static class WindowResult extends ProcessWindowFunction{@Overridepublic void process(Boolean s, Context context, Iterable elements, Collector out) throws Exception {String windowStart = new Timestamp(context.window().getStart()).toString();String windowEnd = new Timestamp(context.window().getEnd()).toString();Long count = elements.iterator().next();out.collect("窗口" + windowStart + "~" + windowEnd + "的统计值是:" + count);}
}// 使用hashset对用户id进行去重,输出访客数量
public static class CountAgg implements AggregateFunction,Long>{@Overridepublic HashSet createAccumulator() {return new HashSet<>();}@Overridepublic HashSet add(Example7.UserBehavior value, HashSet accumulator) {accumulator.add(value.userId);return accumulator;}@Overridepublic Long getResult(HashSet accumulator) {return (long)accumulator.size();}@Overridepublic HashSet merge(HashSet a, HashSet b) {return null;}
}

此程序有一个隐患:现将所有数据keyBy到了同一条流,每一个小时取一次uv,添加到hashSet去重,如果程序的用户向很大,如1亿个独立访客,一个用户的用户id为100个字符,那么一个窗口中的独立访客就要占用10G的内存。

想要优化这种使用了增量聚合与全窗口聚合的程序,就需要使用一种新的数据结构 – 布隆过滤器。

二 布隆过滤器

1 什么是布隆过滤器

本质上布隆过滤器是一种数据结构,是一种比较巧妙的概率型数据结构(probabilistic datastructure),特点是高效地插入和查询,可以用来告诉你“某样东西一定不存在或者可能存在”。

相比于传统的 List、Set、Map 等数据结构,它更高效、占用空间更少,但是缺点是其返回的结果是概率性的,而不是确切的。

2 实现原理

(1)HashMap 的问题

通常我们判断某个元素是否存在用的是什么?

HashMap 可以将值映射到 HashMap 的 Key,然后可以在 O(1) 的时间复杂度内返回结果,效率奇高。但是 HashMap 的实现也有缺点,例如存储容量占比高,考虑到负载因子的存在,通常空间是不能被用满的,而一旦值很多,例如上亿的时候,那 HashMap 占据的内存大小就变得很可观了。

再加入我们的数据集存储在远程服务器上,本地服务接受输入,而数据集非常大不可能一次性读进内存构建 HashMap 的时候,也会存在问题。

(2)布隆过滤器数据结构

布隆过滤器是一个 bit 向量或者说 bit 数组,如下图:

在这里插入图片描述

如果要映射一个值到布隆过滤器中,需要使用多个不同的哈希函数生成多个哈希值,并对每个生成的哈希值指向的 bit 位重置为 1,例如针对值“baidu”和三个不同的哈希函数分别生成了哈希值 1、4、7,之后“baidu”字符串就被丢弃,则上图转变为:

在这里插入图片描述

那么此时现在再存一个值“tencent”,如果哈希函数返回 3、4、8 的话,图继续变为:
在这里插入图片描述

值得注意的是,4 这个 bit 位由于两个值的哈希函数都返回了这个 bit 位,因此它被覆盖了。

现在如果想查询“dianping”这个值是否存在,哈希函数返回了 1、5、8 三个值,结果发现 5 这个 bit 位上的值为 0,说明没有任何一个值映射到这个 bit 位上,因此可以很确定地说“dianping”这个值不存在。

而当需要查询“baidu”这个值是否存在的话,那么哈希函数必然会返回 1、4、7,然后检查发现这三个 bit 位上的
值均为 1,那么不可以说“baidu”存在,只能是“baidu”这个值可能存在,不确定1、4、7这三位没有被其他数据覆盖过。

3 使用布隆过滤器去重

使用org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter包下的布隆过滤器。

// 使用布隆过滤器对用户id进行去重,输出访客数量
public static class CountAgg implements AggregateFunction>,Long>{@Overridepublic Tuple2> createAccumulator() {// 参数依次为要去重的元素类型,预估有多少人,误判率return Tuple2.of(0L,BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8),100000,0.01));}@Overridepublic Tuple2> add(Example7.UserBehavior value, Tuple2> accumulator) {if(!accumulator.f1.mightContain(value.userId)){accumulator.f1.put(value.userId);accumulator.f0 += 1L;}return accumulator;}@Overridepublic Long getResult(Tuple2> accumulator) {return accumulator.f0;}@Overridepublic Tuple2> merge(Tuple2> a, Tuple2> b) {return null;}

相关内容

热门资讯

汽车油箱结构是什么(汽车油箱结... 本篇文章极速百科给大家谈谈汽车油箱结构是什么,以及汽车油箱结构原理图解对应的知识点,希望对各位有所帮...
美国2年期国债收益率上涨15个... 原标题:美国2年期国债收益率上涨15个基点 美国2年期国债收益率上涨15个基...
嵌入式 ADC使用手册完整版 ... 嵌入式 ADC使用手册完整版 (188977万字)💜&#...
重大消息战皇大厅开挂是真的吗... 您好:战皇大厅这款游戏可以开挂,确实是有挂的,需要了解加客服微信【8435338】很多玩家在这款游戏...
盘点十款牵手跑胡子为什么一直... 您好:牵手跑胡子这款游戏可以开挂,确实是有挂的,需要了解加客服微信【8435338】很多玩家在这款游...
senator香烟多少一盒(s... 今天给各位分享senator香烟多少一盒的知识,其中也会对sevebstars香烟进行解释,如果能碰...
终于懂了新荣耀斗牛真的有挂吗... 您好:新荣耀斗牛这款游戏可以开挂,确实是有挂的,需要了解加客服微信8435338】很多玩家在这款游戏...
盘点十款明星麻将到底有没有挂... 您好:明星麻将这款游戏可以开挂,确实是有挂的,需要了解加客服微信【5848499】很多玩家在这款游戏...
总结文章“新道游棋牌有透视挂吗... 您好:新道游棋牌这款游戏可以开挂,确实是有挂的,需要了解加客服微信【7682267】很多玩家在这款游...
终于懂了手机麻将到底有没有挂... 您好:手机麻将这款游戏可以开挂,确实是有挂的,需要了解加客服微信【8435338】很多玩家在这款游戏...