博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Streaming实时流处理项目实战笔记——Spark Streaming整合Spark SQL操作
阅读量:3958 次
发布时间:2019-05-24

本文共 3004 字,大约阅读时间需要 10 分钟。

Spark Streaming整合Spark SQL操作,官网demo地址 

DataFrame and SQL Operations

You can easily use  operations on streaming data. You have to create a SparkSession using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SparkSession. This is shown in the following example. It modifies the earlier  to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL

必须使用SparkContext(正在使用 StreamingContext 的那个SparkContext)创建一个SparkSession。在下面的示例中显示,使用DataFrames和SQL生成单词计数。每个RDD都转换为一个DataFrame,注册为临时表,然后使用SQL查询。

/** DataFrame operations inside your streaming program */val words: DStream[String] = ...words.foreachRDD { rdd =>  // Get the singleton instance of SparkSession  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()  import spark.implicits._  // Convert RDD[String] to DataFrame  val wordsDataFrame = rdd.toDF("word")  // Create a temporary view  wordsDataFrame.createOrReplaceTempView("words")  // Do word count on DataFrame using SQL and print it  val wordCountsDataFrame =     spark.sql("select word, count(*) as total from words group by word")  wordCountsDataFrame.show()}

代码实现

import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext, Time}import Spark.Black.{clicklog, ssc}import org.apache.spark.rdd.RDDimport org.apache.spark.sql.SparkSessionobject SparkSql extends App{  val sparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount")  val ssc = new StreamingContext(sparkConf,Seconds(5))  val lines = ssc.socketTextStream("hadoop2", 9999)  val words = lines.flatMap(_.split(" "))  // Convert RDDs of the words DStream to DataFrame and run SQL query  words.foreachRDD { (rdd: RDD[String], time: Time) =>    // Get the singleton instance of SparkSession    val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)    import spark.implicits._    // Convert RDD[String] to RDD[case class] to DataFrame    val wordsDataFrame = rdd.map(w => Record(w)).toDF()    // Creates a temporary view using the DataFrame    wordsDataFrame.createOrReplaceTempView("words")    // Do word count on table using SQL and print it    val wordCountsDataFrame =      spark.sql("select word, count(*) as total from words group by word")    println(s"========= $time =========")    wordCountsDataFrame.show()  }  /** Case class for converting RDD to DataFrame */  case class Record(word: String)  /** Lazily instantiated singleton instance of SparkSession */  object SparkSessionSingleton {    @transient  private var instance: SparkSession = _    def getInstance(sparkConf: SparkConf): SparkSession = {      if (instance == null) {        instance = SparkSession          .builder          .config(sparkConf)          .getOrCreate()      }      instance    }  }    ssc.start()  ssc.awaitTermination()}

 

转载地址:http://wxazi.baihongyu.com/

你可能感兴趣的文章