大数据编程实验四:SparkStreaming编程
创始人
2024-03-13 00:33:05
0

大数据编程实验四:SparkStreaming编程

文章目录

  • 大数据编程实验四:SparkStreaming编程
    • 一、实验目的与要求
    • 二、实验内容
    • 三、实验步骤
      • 1、利用Spark Streaming对不同类型数据源的数据进行处理
      • 2、完成DStream的两种有状态转换操作
      • 3、完成把DStream的数据输出保存到MySQL数据库中

一、实验目的与要求

  1. 通过实验掌握Spark Streaming的基本编程方法
  2. 熟悉利用Spark Streaming处理来自不同数据源的数据
  3. 熟悉DStream的各种转换操作
  4. 熟悉把DStream的数据输出保存到文本文件或MySQL数据库中

二、实验内容

  1. 参照教材示例,利用Spark Streaming对不同类型数据源的数据进行处理
  2. 参照教材示例,完成DStream的两种有状态转换操作
  3. 参照教材示例,完成把DStream的数据输出保存到文本文件或MySQL数据库中

三、实验步骤

1、利用Spark Streaming对不同类型数据源的数据进行处理

  • 文件流

    首先在虚拟机中打开第一个终端作为数据流终端,创建一个logfile目录:

    cd /usr/local/spark/mycode
    mkdir streaming
    cd streaming
    mkdir logfile
    

    然后我们打开第二个终端作为流计算终端,在我们创建的目录下面新建一个py程序:

    vim FileStreaming.py
    

    然后输入如下代码:

    from pyspark import SparkContext, SparkConf
    from pyspark.streaming import StreamingContext
    conf = SparkConf()
    conf.setAppName('TestDStream')
    conf.setMaster('local[2]')
    sc = SparkContext(conf = conf)
    ssc = StreamingContext(sc, 10)
    lines = ssc.textFileStream('file:///usr/local/spark/mycode/streaming/logfile')
    words = lines.flatMap(lambda line: line.split(' '))
    wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b)
    wordCounts.pprint()
    ssc.start()
    ssc.awaitTermination()
    

    在这里插入图片描述

    保存该文件并执行如下命令:

    /usr/local/spark/bin/spark-submit FileStreaming.py
    

    然后我们进入数据流终端,在logfile目录下新建一个log2.txt文件,然后往里面输入一些英文语句后保存退出,再次切换到流计算终端,就可以看见打印出单词统计信息了。

    在这里插入图片描述

  • 套接字流

    我们继续在流计算端的streaming目录下创建一个socket目录,然后在该目录下创建一个DataSourceSocket.py程序:

    mkdir socket
    cd socket
    vim NetworkWordCount.py
    

    并在py程序中输入如下代码:

    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: NetworkWordCount.py  ", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingNetworkWordCount")ssc = StreamingContext(sc, 1)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))counts = lines.flatMap(lambda line: line.split(" "))\.map(lambda word: (word, 1))\.reduceByKey(lambda a, b: a+b)counts.pprint()ssc.start()ssc.awaitTermination()
    

    在这里插入图片描述

    我们再在数据流终端启动Socket服务器端:

    nc -lk 8888
    

    然后我们再进入流计算终端,执行如下代码启动流计算:

    /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 8888
    

    然后我们在数据流终端内手动输入一行英文句子后回车,多输入几次,流计算终端就会不断执行词频统计并打印出信息。

    在这里插入图片描述

  • RDD队列流

    我们继续在streaming目录下新建rddqueue目录并在该目录下创建py程序:

    mkdir rddqueue
    cd rddqueue/
    vim RDDQueueStreaming.py
    

    然后在py文件中输入如下代码:

    import time
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContextif __name__ == "__main__":sc = SparkContext(appName="PythonStreamingQueueStream")ssc = StreamingContext(sc, 2)#创建一个队列,通过该队列可以把RDD推给一个RDD队列流rddQueue = []for i in range(5):rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]time.sleep(1)#创建一个RDD队列流inputStream = ssc.queueStream(rddQueue)mappedStream = inputStream.map(lambda x: (x % 10, 1))reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)reducedStream.pprint()ssc.start()ssc.stop(stopSparkContext=True, stopGraceFully=True)
    

    在这里插入图片描述

    保存退出后再执行如下命令:

    /usr/local/spark/bin/spark-submit RDDQueueStreaming.py
    

    在这里插入图片描述

2、完成DStream的两种有状态转换操作

  • DStream无状态转换操作

    上面的词频统计程序NetworkWordCount就采取了无状态转换操作。

  • DStream有状态转换操作

    我们在socket目录下创建WindowedNetworkWordCount.py程序并输入如下代码:

    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    if __name__ == "__main__":if len(sys.argv) != 3:print("Usage: WindowedNetworkWordCount.py  ", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingWindowedNetworkWordCount")ssc = StreamingContext(sc, 10)ssc.checkpoint("file:///usr/local/spark/mycode/streaming/socket/checkpoint")lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))counts = lines.flatMap(lambda line: line.split(" "))\.map(lambda word: (word, 1))\. reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)counts.pprint()ssc.start()ssc.awaitTermination()
    

    在这里插入图片描述

    然后我们在数据流终端执行如下命令启动服务器:

    cd /usr/local/spark/mycode/streaming/socket/
    nc -lk 6666
    

    然后再在流计算终端运行我们刚写的代码:

    /usr/local/spark/bin/spark-submit WindowedNetworkWordCount.py localhost 6666
    

    在数据流终端输入英文就可以看见统计结果了。

    在这里插入图片描述

3、完成把DStream的数据输出保存到MySQL数据库中

我们首先启动MySQL数据库:

systemctl start mysqld.service
mysql -u root -p

然后创建spark数据库和wordcount表:

mysql> create database spark;
mysql> use spark;
mysql> create table wordcount (word char(20), count int(4));

然后再在终端安装python连接MySQL的模块:

pip3 install PyMySQL

然后我们在streaming目录下新建stateful目录并在该目录下创建py文件:

mkdir stateful
cd stateful/
vim NetworkWordCountStatefulDB.py

并在py文件中输入如下代码:

from __future__ import print_function 
import sys 
import pymysql 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext 
if __name__ == "__main__":if len(sys.argv) != 3:print("Usage: NetworkWordCountStateful  ", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")ssc = StreamingContext(sc, 1)ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful") # RDD with initial state (key, value) pairsinitialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)]) def updateFunc(new_values, last_sum):return sum(new_values) + (last_sum or 0) lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))running_counts = lines.flatMap(lambda line: line.split(" "))\.map(lambda word: (word, 1))\.updateStateByKey(updateFunc, initialRDD=initialStateRDD) running_counts.pprint() def dbfunc(records):db = pymysql.connect("localhost","root","123456","spark")cursor = db.cursor() def doinsert(p):sql = "insert into wordcount(word,count) values ('%s', '%s')" % (str(p[0]), str(p[1]))try:cursor.execute(sql)db.commit()except:db.rollback()for item in records:doinsert(item) def func(rdd):repartitionedRDD = rdd.repartition(3)repartitionedRDD.foreachPartition(dbfunc)running_counts.foreachRDD(func)ssc.start()ssc.awaitTermination()

在这里插入图片描述

然后我们新建一个数据源终端并执行如下命令:

cd /usr/local/spark/mycode/streaming/stateful/
nc -lk 5555

然后再在我们的流计算终端运行我们该编写的代码:

/usr/local/spark/bin/spark-submit NetworkWordCountStatefulDB.py localhost 5555

然后就可以把词频统计的结果写入MySQL中了。

相关内容

热门资讯

美国2年期国债收益率上涨15个... 原标题:美国2年期国债收益率上涨15个基点 美国2年期国债收益率上涨15个基...
汽车油箱结构是什么(汽车油箱结... 本篇文章极速百科给大家谈谈汽车油箱结构是什么,以及汽车油箱结构原理图解对应的知识点,希望对各位有所帮...
嵌入式 ADC使用手册完整版 ... 嵌入式 ADC使用手册完整版 (188977万字)💜&#...
重大消息战皇大厅开挂是真的吗... 您好:战皇大厅这款游戏可以开挂,确实是有挂的,需要了解加客服微信【8435338】很多玩家在这款游戏...
盘点十款牵手跑胡子为什么一直... 您好:牵手跑胡子这款游戏可以开挂,确实是有挂的,需要了解加客服微信【8435338】很多玩家在这款游...
senator香烟多少一盒(s... 今天给各位分享senator香烟多少一盒的知识,其中也会对sevebstars香烟进行解释,如果能碰...
终于懂了新荣耀斗牛真的有挂吗... 您好:新荣耀斗牛这款游戏可以开挂,确实是有挂的,需要了解加客服微信8435338】很多玩家在这款游戏...
盘点十款明星麻将到底有没有挂... 您好:明星麻将这款游戏可以开挂,确实是有挂的,需要了解加客服微信【5848499】很多玩家在这款游戏...
总结文章“新道游棋牌有透视挂吗... 您好:新道游棋牌这款游戏可以开挂,确实是有挂的,需要了解加客服微信【7682267】很多玩家在这款游...
终于懂了手机麻将到底有没有挂... 您好:手机麻将这款游戏可以开挂,确实是有挂的,需要了解加客服微信【8435338】很多玩家在这款游戏...