使用Spark Streaming转换不同的JSON有效负载

2025-05-29 0 52

Spark Streaming 是底层基于 Spark Core 的对大数据进行实时计算的框架,可以流方式从源读取数据。只需要从数据源创建一个读取流,然后我们可以创建写入流以将数据加载到目标数据源中。

使用Spark Streaming转换不同的JSON有效负载

接下来的演示,将假设我们有不同的 JSON 有效负载进入一个 kafka 主题,我们需要将其转换并写入另一个 kafka 主题。

创建一个ReadStream

为了能连续接收JSON有效负载作为消息。我们需要首先读取消息并使用spark的readstream创建数据帧。Spark 中提供了 readStream 函数,我们可以使用这个函数基本上创建一个 readStream。这将从 kafka 主题中读取流负载。

  1. valdf=spark
  2. .readStream
  3. .format("kafka")
  4. .option("kafka.bootstrap.servers","host1:port1,host2:port2")
  5. .option("subscribe","topic1")
  6. .load()

我们可以创建一个 case-class(例如CustomerUnion),它将包含JSON有效负载的所有可能字段。这样,我们就能在数据帧上运行select查询而不会失败。

  1. valrawDfValue=rawData.selectExpr("CAST(valueASSTRING)").as[String]
  2. valschema=ScalaReflection.schemaFor[CustomerUnion].dataType.asInstanceOf[StructType]
  3. valextractedDFWithSchema=rawDfValue.select(from_json(col("value"),schema).as("data")).select("data.*")
  4. extractedDFWithSchema.createOrReplaceTempView(“tempView”)

这将为我们提供一个数据帧提取的 DFWithSchema,其中包含作为有效负载字段的列。

示例输入负载

这是两个样本输入有效负载,但也可以有更多的有效负载,有些字段不存在(变量)。

  1. {
  2. “id”:1234,
  3. “firstName”:”Jon”,
  4. “lastName”:”Butler”,
  5. “City”:”Newyork”,
  6. “Email”:abc@gmail.com,
  7. “Phone”:”2323123”
  8. }
  1. {
  2. “firstName”:”Jon”,
  3. “lastName”:”Butler”,
  4. “City”:”Newyork”,
  5. “Email”:abc@gmail.com,
  6. “Phone”:”2323123”
  7. }

样例输出负载

根据id字段,我们将决定输出有效负载。如果存在一个 id 字段,我们将把它视为一个用户更新案例,并且在输出有效负载中只发送“Email”和“Phone”。我们可以根据某些条件配置任何字段。这只是一个例子。

如果 id 不存在,我们将发送所有字段。下面是两个输出载荷的示例:

  1. {
  2. “userid”:1234,
  3. “Email”:abc@gmail.com,
  4. “Phone”:”2323123”
  5. }
  1. {
  2. “fullname”:”JonButler”,
  3. “City”:”Newyork”,
  4. “Email”:abc@gmail.com,
  5. “Phone”:”2323123”
  6. }

开始WriteStreams

一旦我们有了数据帧,我们就可以运行尽可能多的sql查询,并根据所需的有效负载写入 kafka 主题。因此,我们可以创建一个包含所有sql查询的列表,并通过该列表进行循环,并调用writeStream函数。让我们假设,我们有一个名为 queryList 的列表,它只包含字符串(即sql查询)。

下面为写入流定义的一个函数:

  1. defstartWriteStream(query:String):Unit={
  2. valtransformedDf=spark.sql(query)
  3. transformedDf
  4. .selectExpr("CAST(keyASSTRING)","CAST(valueASSTRING)")
  5. .writeStream
  6. .format("kafka")
  7. .option("kafka.bootstrap.servers","host1:port1,host2:port2")
  8. .option("topic","topic1")
  9. .start()
  10. }

这将启动列表中每个查询的写入流。

  1. queryList.foreach(startWriteStream)
  2. spark.streams.awaitAnyTermination()

如果我们知道输入有效负载的所有可能字段,那么即使有一些字段不存在,我们的sql查询也不会失败。我们已经将有效负载的模式指定为case-class,它将为缺席字段创建指定 NULL 的数据帧。

通过这种方式,我们可以使用 spark-streaming 在所需的转换/过滤器之后将多个有效负载从同一主题写入不同的主题。

【51CTO译稿,合作站点转载请注明原文译者和出处为51CTO.com】

文章来源:https://developer.51cto.com/art/202108/678717.htm

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 使用Spark Streaming转换不同的JSON有效负载 https://www.kuaiidc.com/93822.html

相关文章

发表评论
暂无评论