代码7.1


import org.apache.spark.streaming._
val ssc = new StreamingContext(sc,Seconds(1))


代码7.2

import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(“DStream”).setMaster(“local[2]”)
val ssc = new StreamingContext(conf,Seconds(1))

代码7.3

//在本地创建一个StreamingContext,分三个线程,批次间隔为2s
val conf = new SparkConf().setMaster(“local[3]”).setAppName(“WordCount”)
val ssc = new StreamingContext(conf, Seconds(2))
//创建DStream,连接hostname:port,类似于localhost:9999,之后对这个端口进行监听
val lines = ssc.socketTextStream(“localhost”, 9999)

代码7.4

import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

代码7.5

import org.apache.spark.streaming.flume._
val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])

代码7.6

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Windows_op {
  	def main(arg: Array[String]): Unit ={
		val conf = new SparkConf().setMaster("local[2]").setAppName("Windowtest")
		//初始化Streaming对象并设置监听间隔为10s
		val ssc = new StreamingContext(conf,Seconds(10))
		//用套接字流指定连接的ip端口
		//第一个参数为主机IP,第二个为端口号
		val lines = ssc.socketTextStream("your IP",9999)
		val words = lines.flatMap(_.split(" "))
		//指定窗口大小为30s,滑动为10s
		val windowwords = words.window(Seconds(30),Seconds(10))
		windowwords.print()
		ssc.start()  //开始计算
		ssc.awaitTermination()  //等待计算结束
  	}
}

代码7.7


import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object countByWindow_op {
  	def main(arg: Array[String]): Unit = {
		val conf = new SparkConf().setMaster("local[2]").setAppName("CountByWindowtest")
		val ssc = new StreamingContext(conf, Seconds(10))
		//设置检查点
		ssc.checkpoint("your checkpoint path ")
		val lines = ssc.socketTextStream("your IP", 9999)//第一个参数为主机IP,第二个为端口号
		val words = lines.flatMap(_.split(" "))
		//根据窗口大小统计DStream元素个数
		val windowwords = words.countByWindow(Seconds(30), Seconds(10))
		windowwords.print()
		ssc.start()
		ssc.awaitTermination()
  	}
}


代码7.8

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object reduceByKeyAndWindow_op {
  	def main(arg: Array[String]): Unit = {
		val conf = new SparkConf().setMaster("local[2]").setAppName("reduceByKeyAndWindowtest")
		val ssc = new StreamingContext(conf, Seconds(10))
		val lines = ssc.socketTextStream("your IP", 9999)//第一个参数为主机IP,第二个为端口
		val words = lines.flatMap(_.split(" "))
		//第一种方法
		val WordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(30),Seconds(10),2)
		//第二种方法WordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_+_,_-_,Seconds(30),Seconds(10),2)
		WordCounts.print()
		ssc.start()
		ssc.awaitTermination()
  	}
}


代码7.9

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object updateStateByKey_op {
  	def main(args: Array[String]) {
		val conf = new SparkConf().setMaster("local[2]")
		  .setAppName("UpdateStateByKeyDemo")
		val ssc = new StreamingContext(conf,Seconds(20))
		//要使用updateStateByKey方法,必须设置Checkpoint
		ssc.checkpoint("your checkpoint path")
		//第一个参数为主机IP,第二个为端口号
		val socketLines = ssc.socketTextStream("your IP",9999) 
		socketLines.flatMap(_.split(" ")).map(word=>(word,1))
		  .updateStateByKey(
			(currValues:Seq[Int],preValue:Option[Int]) =>{
			  val currValue = currValues.sum     //对当前Key值对应的Value进行求和
				//对当前Key值对应的当前、历史状态Value值进行求和
			  Some(currValue + preValue.getOrElse(0)) 
			}).print()                             
		ssc.start()
		ssc.awaitTermination()
		ssc.stop()
  	}
}

代码7.10

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object saveAsTextFiles_op {
  	def main(arg: Array[String]): Unit = {
		val conf = new SparkConf().setMaster("local[2]").setAppName("Windowtest")
		val ssc = new StreamingContext(conf, Seconds(10))
		val lines = ssc.socketTextStream("your IP", 9999)//第一个参数为主机IP,第二个为端口号
		//保存到要保存的路径下,会自动生成test+“-监听时间”+.txt文件
		lines.saveAsTextFiles("your save path","txt")
		ssc.start()
		ssc.awaitTermination()
  	}
}

代码7.11

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Streaming_one {
  	def main(arg: Array[String]): Unit = {
		val conf = new SparkConf().setMaster("local[2]").setAppName("testone")
		val ssc = new StreamingContext(conf, Seconds(30))//设置监听时间间隔为30s
		val lines = ssc.textFileStream("your path")//设置监听的文件夹
		//监听的文件执行wordcount
		val words = lines.flatMap(_.split(" "))
		val wordCounts = words.map(x=>(x,1)).reduceByKey(_+_)
		wordCounts.print()
		ssc.start()
		ssc.awaitTermination()
  	}
}


代码7.12

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds,StreamingContext}
object Streaming_two {
  	def main(args:Array[String]){
		val sparkConf = new SparkConf().setAppName("RDDQueue").setMaster("local[2]")
		val ssc = new StreamingContext(sparkConf,Seconds(4)) 
		//创建RDD队列
    	val rddQueue = new scala.collection.mutable.SynchronizedQueue[RDD[Int]]() 
		val queue = ssc.queueStream(rddQueue) //创建输入的队列数据流
		//处理队列中的RDD数据为(数取余10,1)的形式
		val map = queueStream.map(r=>(r%10,1)) 
		val reduce = map.reduceByKey(_+_)
		reduce.print()
		ssc.start()
		//创建和向队列推入RDD
		for(i<-1 to 10){
			//创建一个包含1-100元素的RDD
			//设置线程暂停一秒,目的是为了使每次循环有一个停止时间,让不至于循环结束太快
			//只进行了一次监听。该操作十次循环会用十秒钟。
			rddQueue+=ssc.sparkContext.makeRDD(1 to 100) 
			Thread.sleep(1000)  
    	}
    	ssc.stop() //结束Streaming程序
  	}
}


代码7.13

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Streaming_three {
  	def main(arg: Array[String]): Unit = {
		val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
		val ssc = new StreamingContext(conf, Seconds(10))
		val lines = ssc.socketTextStream("your IP", 9999)//第一个参数为主机IP,第二个为端口号
		val words = lines.flatMap(_.split(" "))
		val wordcounts = words.map(x => (x, 1)).reduceByKey(_ + _)
		wordcounts.print()
		ssc.start()
		ssc.awaitTermination()
  	}
}


代码7.14

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
object KafkaWordCountProducer {
  	def main(args: Array[String]) {
		if (args.length < 4) {
		  System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " + "<messages> <words>")
		  System.exit(1)
    	}
		val Array(brokers, topic, messages, words) = args
		// Zookeeper连接属性。
		val props = new HashMap[String, Object]()
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
		  "org.apache.kafka.common.serialization.StringSerializer")
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
		  "org.apache.kafka.common.serialization.StringSerializer")
		val producer = new KafkaProducer[String, String](props)
		//发送信息,每秒一次,一次发送消息条数为messages条,每条消息有words个,每
		//个是大于等于0小于10的随机数。
		while(true) {
			(1 to messages.toInt).foreach { 
				messageNum =>
				val str = (1 to words.toInt).map(x => scala.util.Random.nextInt(10).toString)
				  .mkString(" ")
				print(str)
				println()
				val message = new ProducerRecord[String, String](topic, null, str)
				producer.send(message)
			  }
			Thread.sleep(1000)
    	}
  	}
}


代码7.15

import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging

object StreamingExamples extends Logging {
  	//如果用户尚未配置log4j,为流设置合理的日志记录级别。
  	def setStreamingLogLevels() {
    	val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    	if (!log4jInitialized) {
      		logInfo("Setting log level to [WARN] for streaming example." +
        	" To override add a custom log4j.properties to the classpath.")
      		Logger.getRootLogger.setLevel(Level.WARN)
    	}
  	}
}


代码6.16

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils

object KafkaWordCountConsumer {
  	def main(args:Array[String]){
		StreamingExamples.setStreamingLogLevels()
		val sc = new SparkConf().setAppName("KafkaWordCountConsumer").setMaster("local[2]")
		val ssc = new StreamingContext(sc,Seconds(10))
		//设置检查点
    	ssc.checkpoint("your checkpoint path") 
    	val zkQuorum = "localhost:2181" //Zookeeper服务器地址
		val group = " kafka_test"  //topic所在的group
    	val topics = "sender"  //topics的名称
    	val numThreads = 1  //每个topic的分区数
		val topicMap =topics.split(",").map((_,numThreads.toInt)).toMap
		//Kafka的Dstream配置
		val lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap) 
		val lines = lineMap.map(_._2)
		val words = lines.flatMap(_.split(" "))
		val pair = words.map(x => (x,1))
		//对窗口内的RDD执行reduceByKeyAndWindow操作
		val wordCounts = pair.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Minutes(1),Seconds(10),2) 
		wordCounts.print
		ssc.start
		ssc.awaitTermination
  	}
}