本文共 3004 字,大约阅读时间需要 10 分钟。
Spark Streaming整合Spark SQL操作,官网demo地址
You can easily use operations on streaming data. You have to create a SparkSession using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SparkSession. This is shown in the following example. It modifies the earlier to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL
必须使用SparkContext(正在使用 StreamingContext 的那个SparkContext)创建一个SparkSession。在下面的示例中显示,使用DataFrames和SQL生成单词计数。每个RDD都转换为一个DataFrame,注册为临时表,然后使用SQL查询。
/** DataFrame operations inside your streaming program */val words: DStream[String] = ...words.foreachRDD { rdd => // Get the singleton instance of SparkSession val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() import spark.implicits._ // Convert RDD[String] to DataFrame val wordsDataFrame = rdd.toDF("word") // Create a temporary view wordsDataFrame.createOrReplaceTempView("words") // Do word count on DataFrame using SQL and print it val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show()}
代码实现
import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext, Time}import Spark.Black.{clicklog, ssc}import org.apache.spark.rdd.RDDimport org.apache.spark.sql.SparkSessionobject SparkSql extends App{ val sparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount") val ssc = new StreamingContext(sparkConf,Seconds(5)) val lines = ssc.socketTextStream("hadoop2", 9999) val words = lines.flatMap(_.split(" ")) // Convert RDDs of the words DStream to DataFrame and run SQL query words.foreachRDD { (rdd: RDD[String], time: Time) => // Get the singleton instance of SparkSession val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) import spark.implicits._ // Convert RDD[String] to RDD[case class] to DataFrame val wordsDataFrame = rdd.map(w => Record(w)).toDF() // Creates a temporary view using the DataFrame wordsDataFrame.createOrReplaceTempView("words") // Do word count on table using SQL and print it val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") println(s"========= $time =========") wordCountsDataFrame.show() } /** Case class for converting RDD to DataFrame */ case class Record(word: String) /** Lazily instantiated singleton instance of SparkSession */ object SparkSessionSingleton { @transient private var instance: SparkSession = _ def getInstance(sparkConf: SparkConf): SparkSession = { if (instance == null) { instance = SparkSession .builder .config(sparkConf) .getOrCreate() } instance } } ssc.start() ssc.awaitTermination()}
转载地址:http://wxazi.baihongyu.com/