mongodb - Spark 流 : foreachRDD update my mongo RDD

我想在每次进入 foreachRDD 时创建一个new mongodb RDD。但是我有序列化问题:

 mydstream  
   .foreachRDD(rdd => {
      val mongoClient = MongoClient("localhost", 27017)
      val db = mongoClient(mongoDatabase)
      val coll = db(mongoCollection)
      // ssc is my StreamingContext
      val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) })

这会给我一个错误:

object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@31133b6e)

有什么想法吗?

最佳答案

您可以尝试使用 rdd.context返回 SparkContext 或 SparkStreamingContext(如果 rdd 是 DStream)。

mydstream foreachRDD { rdd => {
      val mongoClient = MongoClient("localhost", 27017)
      val db = mongoClient(mongoDatabase)
      val coll = db(mongoCollection)
      val modelsRDDRaw = rdd.context.parallelize(coll.find().toList) })

其实RDD好像也有一个.sparkContext方法。老实说,我不知道有什么区别,也许它们是别名(?)。

关于mongodb - Spark 流 : foreachRDD update my mongo RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34814615/

相关文章:

mongodb - 如何在 $lookup Mongodb 的 LocalField 中将字符串转换

ruby-on-rails - MongoDB 的最佳分析/数据可视化库

javascript - 在 MongoDB 中按字母顺序对文档进行排序(也称为自然排序顺序,人类排

javascript - 使用 Nodejs 在 mongodb 中存储文件

regex - Mongo 正则表达式搜索的索引范围

mongodb - 在mongo聚合中将ObjectID转换为字符串

mongodb - Azure:不支持 DocumentDB Mongo $group

spring - 使用 Spring 数据 MongoDB @Document 创建一个上限集合

mongodb - 在集合之间移动文档是表示 MongoDB 中状态变化的好方法吗?

mongodb - 可以在 MongoDB 查询中使用严格的 JSON $dates 吗?