我想在每次进入 foreachRDD
时创建一个new mongodb RDD。但是我有序列化问题:
mydstream
.foreachRDD(rdd => {
val mongoClient = MongoClient("localhost", 27017)
val db = mongoClient(mongoDatabase)
val coll = db(mongoCollection)
// ssc is my StreamingContext
val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) })
这会给我一个错误:
object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@31133b6e)
有什么想法吗?
最佳答案
您可以尝试使用 rdd.context返回 SparkContext 或 SparkStreamingContext(如果 rdd 是 DStream)。
mydstream foreachRDD { rdd => {
val mongoClient = MongoClient("localhost", 27017)
val db = mongoClient(mongoDatabase)
val coll = db(mongoCollection)
val modelsRDDRaw = rdd.context.parallelize(coll.find().toList) })
其实RDD好像也有一个.sparkContext
方法。老实说,我不知道有什么区别,也许它们是别名(?)。
关于mongodb - Spark 流 : foreachRDD update my mongo RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34814615/
相关文章:
mongodb - 如何在 $lookup Mongodb 的 LocalField 中将字符串转换
ruby-on-rails - MongoDB 的最佳分析/数据可视化库
javascript - 在 MongoDB 中按字母顺序对文档进行排序(也称为自然排序顺序,人类排
javascript - 使用 Nodejs 在 mongodb 中存储文件
mongodb - 在mongo聚合中将ObjectID转换为字符串
mongodb - Azure:不支持 DocumentDB Mongo $group
spring - 使用 Spring 数据 MongoDB @Document 创建一个上限集合