import org.apache.spark.streaming._
val ssc = new StreamingContext(sc,Seconds(1))
代码7.2
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(“DStream”).setMaster(“local[2]”)
val ssc = new StreamingContext(conf,Seconds(1))
代码7.3
//在本地创建一个StreamingContext,分三个线程,批次间隔为2s
val conf = new SparkConf().setMaster(“local[3]”).setAppName(“WordCount”)
val ssc = new StreamingContext(conf, Seconds(2))
//创建DStream,连接hostname:port,类似于localhost:9999,之后对这个端口进行监听
val lines = ssc.socketTextStream(“localhost”, 9999)
代码7.4
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Windows_op {
def main(arg: Array[String]): Unit ={
val conf = new SparkConf().setMaster("local[2]").setAppName("Windowtest")
//初始化Streaming对象并设置监听间隔为10s
val ssc = new StreamingContext(conf,Seconds(10))
//用套接字流指定连接的ip端口
//第一个参数为主机IP,第二个为端口号
val lines = ssc.socketTextStream("your IP",9999)
val words = lines.flatMap(_.split(" "))
//指定窗口大小为30s,滑动为10s
val windowwords = words.window(Seconds(30),Seconds(10))
windowwords.print()
ssc.start() //开始计算
ssc.awaitTermination() //等待计算结束
}
}
代码7.7
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object countByWindow_op {
def main(arg: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("CountByWindowtest")
val ssc = new StreamingContext(conf, Seconds(10))
//设置检查点
ssc.checkpoint("your checkpoint path ")
val lines = ssc.socketTextStream("your IP", 9999)//第一个参数为主机IP,第二个为端口号
val words = lines.flatMap(_.split(" "))
//根据窗口大小统计DStream元素个数
val windowwords = words.countByWindow(Seconds(30), Seconds(10))
windowwords.print()
ssc.start()
ssc.awaitTermination()
}
}
代码7.8
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object reduceByKeyAndWindow_op {
def main(arg: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("reduceByKeyAndWindowtest")
val ssc = new StreamingContext(conf, Seconds(10))
val lines = ssc.socketTextStream("your IP", 9999)//第一个参数为主机IP,第二个为端口
val words = lines.flatMap(_.split(" "))
//第一种方法
val WordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(30),Seconds(10),2)
//第二种方法WordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_+_,_-_,Seconds(30),Seconds(10),2)
WordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
代码7.9
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object updateStateByKey_op {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]")
.setAppName("UpdateStateByKeyDemo")
val ssc = new StreamingContext(conf,Seconds(20))
//要使用updateStateByKey方法,必须设置Checkpoint
ssc.checkpoint("your checkpoint path")
//第一个参数为主机IP,第二个为端口号
val socketLines = ssc.socketTextStream("your IP",9999)
socketLines.flatMap(_.split(" ")).map(word=>(word,1))
.updateStateByKey(
(currValues:Seq[Int],preValue:Option[Int]) =>{
val currValue = currValues.sum //对当前Key值对应的Value进行求和
//对当前Key值对应的当前、历史状态Value值进行求和
Some(currValue + preValue.getOrElse(0))
}).print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
代码7.10
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object saveAsTextFiles_op {
def main(arg: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("Windowtest")
val ssc = new StreamingContext(conf, Seconds(10))
val lines = ssc.socketTextStream("your IP", 9999)//第一个参数为主机IP,第二个为端口号
//保存到要保存的路径下,会自动生成test+“-监听时间”+.txt文件
lines.saveAsTextFiles("your save path","txt")
ssc.start()
ssc.awaitTermination()
}
}
代码7.11
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Streaming_one {
def main(arg: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("testone")
val ssc = new StreamingContext(conf, Seconds(30))//设置监听时间间隔为30s
val lines = ssc.textFileStream("your path")//设置监听的文件夹
//监听的文件执行wordcount
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x=>(x,1)).reduceByKey(_+_)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
代码7.12
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds,StreamingContext}
object Streaming_two {
def main(args:Array[String]){
val sparkConf = new SparkConf().setAppName("RDDQueue").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(4))
//创建RDD队列
val rddQueue = new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()
val queue = ssc.queueStream(rddQueue) //创建输入的队列数据流
//处理队列中的RDD数据为(数取余10,1)的形式
val map = queueStream.map(r=>(r%10,1))
val reduce = map.reduceByKey(_+_)
reduce.print()
ssc.start()
//创建和向队列推入RDD
for(i<-1 to 10){
//创建一个包含1-100元素的RDD
//设置线程暂停一秒,目的是为了使每次循环有一个停止时间,让不至于循环结束太快
//只进行了一次监听。该操作十次循环会用十秒钟。
rddQueue+=ssc.sparkContext.makeRDD(1 to 100)
Thread.sleep(1000)
}
ssc.stop() //结束Streaming程序
}
}
代码7.13
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Streaming_three {
def main(arg: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(10))
val lines = ssc.socketTextStream("your IP", 9999)//第一个参数为主机IP,第二个为端口号
val words = lines.flatMap(_.split(" "))
val wordcounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordcounts.print()
ssc.start()
ssc.awaitTermination()
}
}
代码7.14
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
object KafkaWordCountProducer {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " + "<messages> <words>")
System.exit(1)
}
val Array(brokers, topic, messages, words) = args
// Zookeeper连接属性。
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
//发送信息,每秒一次,一次发送消息条数为messages条,每条消息有words个,每
//个是大于等于0小于10的随机数。
while(true) {
(1 to messages.toInt).foreach {
messageNum =>
val str = (1 to words.toInt).map(x => scala.util.Random.nextInt(10).toString)
.mkString(" ")
print(str)
println()
val message = new ProducerRecord[String, String](topic, null, str)
producer.send(message)
}
Thread.sleep(1000)
}
}
}
代码7.15
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
object StreamingExamples extends Logging {
//如果用户尚未配置log4j,为流设置合理的日志记录级别。
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}
代码6.16
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
object KafkaWordCountConsumer {
def main(args:Array[String]){
StreamingExamples.setStreamingLogLevels()
val sc = new SparkConf().setAppName("KafkaWordCountConsumer").setMaster("local[2]")
val ssc = new StreamingContext(sc,Seconds(10))
//设置检查点
ssc.checkpoint("your checkpoint path")
val zkQuorum = "localhost:2181" //Zookeeper服务器地址
val group = " kafka_test" //topic所在的group
val topics = "sender" //topics的名称
val numThreads = 1 //每个topic的分区数
val topicMap =topics.split(",").map((_,numThreads.toInt)).toMap
//Kafka的Dstream配置
val lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
val lines = lineMap.map(_._2)
val words = lines.flatMap(_.split(" "))
val pair = words.map(x => (x,1))
//对窗口内的RDD执行reduceByKeyAndWindow操作
val wordCounts = pair.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Minutes(1),Seconds(10),2)
wordCounts.print
ssc.start
ssc.awaitTermination
}
}