设为首页 加入收藏

TOP

spark用户使用指南
2019-01-19 13:27:38 】 浏览:99
Tags:spark 用户 使用指南

快速启动spark

关于这一部分

Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
在Spark 2.0之前,Spark的主要编程接口是具有弹性的分布式数据集(RDD)。在Spark 2.0之后,RDDs被数据集取代,数据集就像RDD一样强类型,但在底层有更丰富的优化。

Spark Shell的交互分析

Spark的shell提供了一种学习API的简单方法,同时还提供了一种强大的工具,可以交互式地分析数据。它可以在Scala中(在JavaVM上运行,因此是使用现有Java库的好方法)或Python。启动它,在Spark目录中运行以下内容

./bin/spark-shell

Spark的主要抽象是一个称为数据集的项目的分布式集合。数据集可以从Hadoop inputformat(比如HDFS文件)中创建,也可以通过转换其他数据集来创建。让我们从Spark源目录中的README文件的文本中创建一个新的数据集:

scala> val textFile = spark.read.textFile("README.md")

您可以通过调用一些操作来直接从数据集获得值,或者转换数据集以获得一个新的值:

scala> textFile.count() //这个数据集中的项目数量
scala> textFile.first()//这个数据集的第一项

现在让我们将这个数据集转换为一个新的数据集。我们调用filter来返回一个新的数据集,其中有文件中条目的子集。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))

我们可以将转换和动作链接在一起:

scala> textFile.filter(line => line.contains("Spark")).count()

spark其他的数据操作

数据集动作和转换可以用于更复杂的计算。假设我们想要找到最符合要求的一行:

scala> textFile.map(line => line.split(" ").size).reduce((a,b) => if (a > b) a else b)

这首先将一条线映射到一个整型值,创建一个新的数据集。在该数据集上调用reduce,以找到最大的单词计数。映射和减少的参数是Scala函数字面量(闭包),可以使用任何语言特性或Scala/java库。例如,我们可以很容易地调用其他地方声明的函数。我们将使用math.max()函数使这段代码更容易理解:

scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))

一个常见的数据流模式是MapReduce,它是由Hadoop普及的。Spark可以很容易地实现MapReduce流:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()

在这里,我们调用flatMap,将行数据集转换为单词的数据集,然后将groupByKey和count组合起来,计算文件中的每个单词计数,作为一个(字符串,长)对的数据集。在我们的shell中收集单词计数,我们可以调用收集:

scala> wordCounts.collect()

spark缓存

Spark还支持将数据集拉到集群范围内的内存缓存中。当数据被多次访问时,这是非常有用的,例如在查询一个小的“hot”数据集或者运行像PageRank这样的迭代算法时。作为一个简单的例子,让我们标记一下我们的linesWithSpark数据集来缓存:

scala> linesWithSpark.cache()
scala> linesWithSpark.count()
scala> linesWithSpark.count()

spark RDD编程

概述:

在高层次上,每个Spark应用程序都包含一个驱动程序,该程序运行用户的主要功能,并在集群上执行各种并行操作。主要的抽象火花提供了一个有弹性的分布式数据集(RDD),它是一个可以并行操作的集群节点上划分的元素集合。RDDs是通过在Hadoop文件系统(或任何其他Hadoop支持的文件系统)中的文件开始创建的,或者在驱动程序中使用现有的Scala集合,并对其进行转换。用户还可以要求Spark在内存中持久化一个RDD,这样就可以在并行操作中有效地重用它。最后,RDDs会自动从节点故障中恢复。
Spark的第二个抽象是可以在并行操作中使用的共享变量。默认情况下,当Spark在不同的节点上并行地运行一个函数时,它会将函数中使用的每个变量的副本都复制到每个任务中。有时,需要在任务之间共享一个变量,或者在任务和驱动程序之间共享一个变量。Spark支持两种类型的共享变量:广播变量,它可以用于在所有节点上缓存一个值,以及累计变量,这些变量只被“添加”到计数器和求和等变量中。

spark shell使用

在Spark shell中,在名为sc的变量中已经为您创建了一个特殊的解释器感知的SparkContext,使您自己的SparkContext无法工作。您可以设置上下文连接使用-主参数的主上下文,您可以通过将一个逗号分隔的列表传递给—jars参数,将jar添加到类路径中。您还可以通过向—packages参数提供一个逗号分隔的Maven坐标列表,任何依赖项可能存在的附加存储库(例如Sonatype)都可以传递给—repositores参数。例如,要在运行/spark-shell,请使用:

$ ./bin/spark-shell --master local[4]

或者, 添加 code.jar 包在使用路径, 请使用:

$ ./bin/spark-shell --master local[4] --jars code.jar

使用Maven坐标包含依赖项:

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

注意:对于一个完整的选项列表,运行spark-shell —help。

spark 外部数据集

Spark可以从Hadoop支持的任何存储源中创建分布式数据集,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3等等。Spark支持文本文件、序列文件和任何其他Hadoop InputFormat。
可以使用SparkContext的textFile方法创建文本文件RDDs。该方法获取文件的URI(或者是机器上的本地路径,或者是hdfs://、s3n://等等URI),并将其作为行的集合读取。下面是一个示例调用:

scala> val distFile = sc.textFile("data.txt")

一旦创建,distFile就可以由数据集操作来执行。例如,我们可以使用映射来添加所有行的大小,并减少操作,如下:

distFile.map(s => s.length).reduce((a, b) => a + b).

spark RDD操作

RDDs支持两种类型的操作:转换,它从现有的一个中创建一个新的数据集,以及操作,它在数据集上运行一个计算之后,将一个值返回给驱动程序。例如,map是一个转换,它通过一个函数传递每个数据集元素,并返回一个表示结果的新RDD。另一方面,reduce是一个使用某个函数聚合所有RDD元素的动作,并将最终结果返回给驱动程序(尽管也有一个并行的还原ebykey返回一个分布式数据集)。
Spark中的所有转换都是惰性的,因为它们不会立即计算结果。相反,它们只记住应用于一些基本数据集(例如一个文件)的转换。只有当一个动作需要返回到驱动程序的结果时,才会计算转换。这种设计使Spark能够更高效地运行。例如,我们可以认识到,通过map创建的数据集将被用于减少,只返回到驱动程序的结果,而不是更大的映射数据集。
Spark的API严重依赖于驱动程序中的传递函数来在集群上运行。有两种推荐的方法:

  1. 匿名函数语法,它可以用于简短的代码。
  2. 全局单例对象中的静态方法。例如,您可以定义对象my函数,然后传递my函数。func1,如下所示:
object MyFunctions {
  def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)

3.请注意,虽然可以在类实例中传递对方法的引用(而不是单例对象),但这需要发送包含该类的对象和方法。例如,考虑:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

RDD的基础数据类型

并行集合

并行集合(Parallelized collections)的创建是通过一个已有的集合(Scala Seq)上调用SparkContext的parallelize方法实现的。集合中的元素被复制到一个可并行操作的分布式数据集中,例如:这里演示了如何一个包含1到5数组种创建并行集合:

Val data=Array(1,2,3,4,5)
Val distData=sc.parallelize(data)

一旦创建完成,这个分布式数据集(distData)就可以被并行操作,例如:我们可以调用distData.reduce(a,b)=>a+b,将这个数组中的元素相加,我们以后在描述在分布式上的一些操作。
并行集合一个很重要的参数是切片书(slices),表示一个数据集切分的分数,Spark会在集群上运行一个任务。你可以在集群上为每个CPU设置2-4个切片(slices),正常情况下,Spark会试着基于你的集群状况自动地设置切片的数目,然而,你也可以通过parallelize的第二个参数手动地设置(例如:sc.parallelize(data,10))。

外部数据集

Spark可以从任何一个Hadoop支持的存储源创建分布式数据集,包括你的本地文件系统,HDFS,Cassandra,HBase,Amazon S3等,Spark支持文本文件(text files),SequenceFiles和其他Hadoop InputFormat。
文本文件RDDs可以使用SparkContext的textFile方法创建,在这个方法里传入文件的URI(机器上的本地路径或hdfs://,s3n://等),然后他会将文件读取成一个行集合,这里是一个调用例子:

Scala> val distFile=sc.textFile(“data.txt”)

RDD使用

RDD actions和tranformations能被用在更多的复杂计算中,比方说,我们想要找到一行中最多的单词数量:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

首先履行映射成一个整形数值产生一个新RDD上调用reduce找到行中最大的个数,map和reduce的参数是Scala的函数串(闭包),并且可以使用任何语言特性或者Scala/Java类库,例如,我们可以很方便的调用其他的函数声明,我们使用Math.max()函数让代码更容易理解:

scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5:Int=15

Hadoop流行的一个通用的数据流模式是MapReduce。Spark能够很容易的实现MapReduce

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)

这里,我们结合flatMap,map和reduceByKey来计算文件里每个单词出现的数量,他的结果是包含一组(String,int)键值对的RDD,我们可以使用[collect]操作在我们的shell中手机单词的数量:

scala> wordCounts.collect()

spark SQL使用

概述

Spark SQL是结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口提供了关于数据的结构和正在执行的计算的更多信息。在内部,Spark SQL使用这些额外的信息执行额外的优化。有几种方法可以与Spark SQL进行交互,其中包括SQL和Dataset API。当计算结果时,使用相同的执行引擎,独立于您使用的api/语言来表示计算。这种统一意味着开发人员可以很容易地在不同的api之间来回切换,这提供了最自然的方式来表达给定的转换。

Starting Point: SparkSession

Spark中所有功能的入口点是SparkSession类。要创建一个基本的SparkSession,只需使用SparkSession.builder():

import org.apache.spark.sql.SparkSession
val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()
//对于隐式转换,比如将rdd转换为DataFrames
import spark.implicits._

创建DataFrames

通过一个SparkSession,应用程序可以从一个现有的RDD中创建DataFrames,从一个Hive表,或者从Spark数据源。
作为一个示例,下面创建一个基于JSON文件内容的DataFrame:

val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()

DataFrames为在Scala、Java、Python和r中的结构化数据操作提供了一种领域特有的语言。
如前所述,在Spark 2.0中,DataFrames只是Scala和Java API中的行数据集。这些操作也被称为“非类型转换”,与“类型转换”形成鲜明对比的是强类型的scala/java数据集。

这里我们包括一些使用数据集的结构化数据处理的基本示例:

import spark.implicits._
df.printSchema()
df.select("name").show()
df.select($"name", $"age" + 1).show()
df.filter($"age" > 21).show()
df.groupBy("age").count().show()

除了简单的列引用和表达式之外,Datasets还有一个丰富的函数库,包括字符串操作、日期算术、常见的数学运算等等。完整的列表在DataFrame函数引用中可用。

以编程方式运行SQL查询

SparkSession中的sql函数使应用程序能够以编程的方式运行sql查询,并将结果作为DataFrame返回。

//将DataFrame注册为一个SQL临时视图
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

全局临时视图

Spark SQL中的临时视图是会话范围的,如果创建它的会话终止,则会消失。如果您想要在所有会话中共享一个临时视图,并在Spark应用程序终止之前保持活力,您可以创建一个全局临时视图。全局临时视图绑定到一个系统保存的数据库globaltemp,我们必须使用限定名来引用它,e.g. SELECT * FROM global_temp.view1.

//将DataFrame注册为全局临时视图
df.createGlobalTempView("people")
//全局临时视图绑定到一个系统保存的数据库globaltemp
spark.sql("SELECT * FROM global_temp.people").show()
//全局临时视图是交叉会话
spark.newSession().sql("SELECT * FROM global_temp.people").show()

创建数据集

然而,数据集与RDDs类似,而不是使用Java序列化或Kryo,它们使用专门的编码器来序列化对象,以便在网络上进行处理或传输。虽然编码器和标准序列化都负责将对象转换为字节,但编码器是动态生成的代码,并且使用一种格式,允许Spark执行许多操作,如过滤、排序和散列,而不将字节反序列化为对象。

//您可以使用实现产品接口的自定义类
case class Person(name: String, age: Long)
//为案例类创建编码器
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
//大多数常见类型的编码器是通过引入spark.implicits自动提供的._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
//通过提供一个类,可以将DataFrames转换为数据集。映射将通过名称进行
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()

使用反射推断模式

Spark SQL的Scala接口支持自动将包含case类的RDD转换为DataFrame。case类定义了表的模式。对case类的参数的名称是使用反射读取的,并成为列的名称。Case类也可以嵌套或包含复杂的类型,例如Seqs或数组。这个RDD可以隐式地转换为一个DataFrame,然后将其注册为一个表。表可以在后续的SQL语句中使用。

    import spark.implicits._
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
//将DataFrame注册为一个临时视图
peopleDF.createOrReplaceTempView("people")
//可以使用Spark提供的SQL方法来运行SQL语句
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
//结果中的列的列可以通过字段索引来访问
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
//基本类型和case类也可以定义为
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()

以编程的方式指定模式

当case类不能提前定义时(例如,记录的结构是在一个字符串中编码的,或者文本数据集将被解析,不同的用户将对字段进行不同的预测),一个DataFrame可以用3个步骤来创建。

  1. 从原始的RDD中创建一个行;
  2. 创建在第1步中创建的RDD中所表示的结构类型的结构类型。
  3. 通过SparkSession提供的createDataFrame方法将模式应用到行的RDD中。
import org.apache.spark.sql.types._
// 创建一个RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// 模式是用字符串编码的
val schemaString = "name age"
// 根据模式的字符串生成模式
val fields = schemaString.split(" ")
 .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// 将RDD(人员)的记录转换为行
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))
// 将该模式应用于RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
// 使用DataFrame创建一个临时视图
peopleDF.createOrReplaceTempView("people")
// 可以通过使用DataFrames创建的临时视图来运行SQL
val results = spark.sql("SELECT name FROM people")
// SQL查询的结果是DataFrames并支持所有常规的RDD操作
// 结果中的列的列可以通过字段索引或字段名来访问。
results.map(attributes => "Name: " + attributes(0)).show()

聚合函数

内置的DataFrames函数提供了诸如count()、countDistinct ()、avg()、max()、min()等公共聚合,而这些函数都是为DataFrames设计的,Spark SQL也为Scala和Java中的一些提供了类型安全的版本,用于与强类型数据集一起工作。此外,用户不仅限于预定义的聚合函数,而且可以创建自己的聚合函数。

无类型定义的聚合函数

用户必须扩展UserDefinedAggregateFunction抽象类实现一个自定义的无类型的聚合函数。例如,用户定义的平均值可以是这样的:

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object MyAverage extends UserDefinedAggregateFunction {
  // 这个聚合函数的输入参数的数据类型
  def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
  // 聚合缓冲区中的值的数据类型
  def bufferSchema: StructType = {
    StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
  }
  // 返回值的数据类型  
def dataType: DataType = DoubleType
  // 这个函数是否总是在相同的输入上返回相同的输出
  def deterministic: Boolean = true
  // 初始化给定的聚合缓冲区。缓冲区本身是一个行,除了
  // 标准方法,例如检索索引中的值(例如get()、getBoolean())
  // 更新它的价值的机会。注意,缓冲区内的数组和映射仍然是  
// 不变的.
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
  }
  // 使用来自输入的新输入数据更新给定的聚合缓冲区缓冲区  
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getLong(0) + input.getLong(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }
  // 合并两个聚合缓冲区,并将更新后的缓冲区值存储到buffer1
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }
  // 计算出最终结果  
def eva luate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}
// 注册该函数以访问它
spark.udf.register("myAverage", MyAverage)
val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()

类型安全的用户定义的聚合函数

对于强类型数据集的用户定义聚合是围绕聚集器抽象类进行的。例如,类型安全的用户定义的平均值可以是这样的:

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SparkSession
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Employee, Average, Double] {
  // 这个聚合的零值。应该满足任意b+0=b的性质
  def zero: Average = Average(0L, 0L)
  // 结合两个值来生成一个新值。对于性能,函数可以修改缓冲区 
 // 然后返回,而不是新建一个对象
  def reduce(buffer: Average, employee: Employee): Average = {
    buffer.sum += employee.salary
    buffer.count += 1
    buffer
  }
  // 合并两个中间值
  def merge(b1: Average, b2: Average): Average = {
    b1.sum += b2.sum
    b1.count += b2.count
    b1
  }
  // 转换输出的输出
  def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
  // 指定中间值类型的编码器
  def bufferEncoder: Encoder[Average] = Encoders.product
  // 指定最终输出值类型的编码器  
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// 将函数转换为TypedColumn并给它起一个名字
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()

数据源

Spark SQL支持通过DataFrame接口对各种数据源进行操作。DataFrame可以使用关系转换操作,也可以用来创建临时视图。将一个DataFrame注册为一个临时视图允许您在其数据上运行SQL查询。本节描述使用Spark数据源加载和保存数据的一般方法,然后进入用于内置数据源的特定选项。

通用的加载/保存功能

在最简单的形式中,默认数据源(由spark.sql.sources.default拼花,除非另有配置)将用于所有操作。

val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

手动指定选项

您还可以手动指定将使用的数据源,以及您想要传递给数据源的任何额外选项。数据源由其完全限定的名称(即:org.apache.spark.sql.parquet),但对于内置的来源还可以使用短名称(json、jdbc、csv、文本)。从任何数据源类型加载的DataFrames都可以使用这种语法转换为其他类型。

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

直接在文件上运行SQL

您可以使用SQL来直接查询该文件,而不是使用read API将文件加载到DataFrame并查询它。

val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

Bucketing, Sorting and Partitioning

对于基于文件的数据源,还可以对输出进行存储、排序或分区。嵌接和排序仅适用于持久表:

peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

在使用数据集api时,可以同时使用分区和save。

usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

对于单个表,可以使用分区和嵌接:

peopleDF
  .write
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("people_partitioned_bucketed")

分区创建一个目录结构,正如在分区发现节中所描述的那样。因此,它对具有高基数的列的适用性有限。相反,bucketa将数据分布在固定数量的桶中,当许多惟一值是无界的时,可以使用它。

结构化流编程

结构化流是一个可伸缩的、容错的流处理引擎,构建在Spark SQL引擎上。您可以用相同的方式表示流计算,就像在静态数据上表示批处理一样。Spark SQL引擎将会逐渐地、持续地运行它,并在流数据继续到达时更新最终结果。您可以使用Scala、Java、Python或R中的dataset/dataframe API来表示流媒体聚合、事件时间窗口、流到批连接等等,这些计算都是在同一个优化的Spark SQL引擎上执行的。最后,系统确保了端到端的端对端容错保证,通过检查点和写前面的日志。简而言之,结构化流提供了快速、可伸缩的、容错的、端到端的即时处理,而无需用户去考虑流媒体。

简单的例子

假设您想要维护从一个数据服务器接收到的一个正在运行的文本数据,而这个数据服务器是通过TCP套接字监听的。让我们看看如何使用结构化的流媒体来表达这个问题。如果你下载Spark,你可以直接运行这个例子。在任何情况下,让我们一步一步地了解这个示例,并了解它是如何工作的。首先,我们必须导入必要的类并创建一个局部SparkSession,这是与Spark相关的所有功能的起点。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession
  .builder
  .appName("StructuredNetworkWordCount")
  .getOrCreate()
import spark.implicits._

接下来,让我们创建一个流数据的DataFrame,它表示从服务器接收到的服务器上接收的文本数据,并将DataFrame转换为计算单词计数。

//创建来自连接到localhost:9999的输入行流的DataFrame
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()

现在,我们已经对流数据进行了查询。剩下的就是开始接收数据并计算计数。要做到这一点,我们设置它来打印完整的计数集(outputMode(“complete”)指定,每次更新时都要输出到控制台。然后使用start()启动流计算。

// 开始运行将运行计数打印到控制台的查询
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()
query.awaitTermination()

在执行该代码之后,流计算将在后台启动。查询对象是一个对该活动流查询的句柄,我们已经决定使用瓦伊终止()等待查询的终止,以防止在查询处于活动状态时退出进程。
要实际执行这个示例代码,您可以在自己的Spark应用程序中编译代码,也可以在下载Spark后运行这个示例。我们展示的是后者。您首先需要运行Netcat(在大多数类unix系统中发现的一个小型实用程序)作为数据服务器

$ nc -lk 9999

然后,在一个不同的终端中,您可以通过使用

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999

然后,在运行netcat服务器的终端中输入的任何行都将被计数并在屏幕上打印。

spark streaming编程

Spark流是核心Spark API的一个扩展,它支持可伸缩、高吞吐量、容错的实时数据流处理。数据可以从许多来源中摄取,比如卡夫卡、Flume、kin遥感或TCP套接字,并且可以使用复杂的算法来处理,这些算法使用高级功能,如map、reduce、join和window。最后,处理的数据可以被推送到文件系统、数据库和实时指示板。实际上,您可以在数据流上应用Spark的机器学习和图形处理算法。

image

在内部,它是这样工作的。Spark流接收实时的输入数据流,并将数据分成几批,然后由Spark引擎处理,以批量生成最终的结果流。

image

Spark流提供了一个称为离散流或DStream的高级抽象,它代表了连续的数据流。DStreams可以从诸如卡夫卡、Flume和kinor等来源的输入数据流中创建,也可以通过在其他DStreams上应用高级操作来创建。在内部,DStream被表示为一个RDDs的序列。

一个简单的例子

首先,我们导入了Spark流类的名称,以及从StreamingContext到我们的环境中的一些隐式转换,以便向我们需要的其他类添加有用的方法(如DStream)。StreamingContext是所有流媒体功能的主要入口点。我们使用两个执行线程创建一个本地流上下文,一个批处理间隔为1秒。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// 创建一个本地流媒体上下文,使用两个工作线程和一个1秒的批处理间隔。.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

使用这个上下文,我们可以创建一个DStream,它表示来自TCP源的流数据,指定为主机名(例如localhost)和端口(例如9999)。

val lines = ssc.socketTextStream("localhost", 9999)

这行DStream表示将从数据服务器接收到的数据流。这个DStream中的每个记录都是一行文本。接下来,我们要将空格字符分割成单词。

val words = lines.flatMap(_.split(" "))

flatMap是一对多的DStream操作,它通过在源DStream中的每个记录生成多个新记录来创建一个新的DStream。在这种情况下,每一行将被分割成多个单词,而单词流则表示为DStream。接下来,我们要计算这些单词。

import org.apache.spark.streaming.StreamingContext._ 
// 在每批中计算每个单词
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// 将这个DStream中生成的每个RDD的前10个元素打印到控制台
wordCounts.print()

当这些行被执行时,Spark流只会设置它在启动时将执行的计算,并且还没有开始真正的处理。在所有转换设置之后开始处理,我们最后调用

ssc.start()             // 开始计算
ssc.awaitTermination()  // 等待计算终止

完整的代码可以在Spark流示例网络wordcount中找到。
如果您已经下载并构建了Spark,那么您可以像下面这样运行这个示例。您首先需要运行Netcat(在大多数类unix系统中发现的一个小型实用程序)作为数据服务器

$ nc -lk 9999

然后,在一个不同的终端中,您可以通过使用

$ ./bin/run-example streaming.NetworkWordCount localhost 9999

然后,在运行netcat服务器的终端中输入的任何行都将被计数并在屏幕上打印。

更多精彩文章,详见红象云腾社区

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark在文本统计中的简单应用 下一篇Spark_Spark调优-设置executor 数..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目