python - 让 Spark、Python 和 MongoDB 协同工作

我很难将这些组件正确地结合在一起。我已成功安装 Spark 并运行,我可以在本地、独立以及通过 YARN 运行作业。我已按照建议的步骤进行操作(据我所知)here和 here

我正在开发 Ubuntu,我拥有的各种组件版本是

  • Spark spark-1.5.1-bin-hadoop2.6
  • Hadoop hadoop-2.6.1
  • 蒙古 2.6.10
  • Mongo-Hadoop 连接器 克隆自 https://github.com/mongodb/mongo-hadoop.git
  • Python 2.7.10

我在执行各个步骤时遇到了一些困难,例如将哪些 jar 添加到哪个路径,所以我添加的是

  • /usr/local/share/hadoop-2.6.1/share/hadoop/mapreduce 我已经添加 mongo-hadoop-core-1.5。 0-SNAPSHOT.jar
  • 以下环境变量
    • export HADOOP_HOME="/usr/local/share/hadoop-2.6.1"
    • 导出 PATH=$PATH:$HADOOP_HOME/bin
    • export SPARK_HOME="/usr/local/share/spark-1.5.1-bin-hadoop2.6"
    • export PYTHONPATH="/usr/local/share/mongo-hadoop/spark/src/main/python"
    • 导出 PATH=$PATH:$SPARK_HOME/bin

我的 Python 程序是基本的

from pyspark import SparkContext, SparkConf
import pymongo_spark
pymongo_spark.activate()

def main():
    conf = SparkConf().setAppName("pyspark test")
    sc = SparkContext(conf=conf)
    rdd = sc.mongoRDD(
        'mongodb://username:password@localhost:27017/mydb.mycollection')

if __name__ == '__main__':
    main()

我正在使用命令运行它

$SPARK_HOME/bin/spark-submit --driver-class-path /usr/local/share/mongo-hadoop/spark/build/libs/ --master local[4] ~/sparkPythonExample/SparkPythonExample.py

结果我得到以下输出

Traceback (most recent call last):
  File "/home/me/sparkPythonExample/SparkPythonExample.py", line 24, in <module>
    main()
  File "/home/me/sparkPythonExample/SparkPythonExample.py", line 17, in main
    rdd = sc.mongoRDD('mongodb://username:password@localhost:27017/mydb.mycollection')
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 161, in mongoRDD
    return self.mongoPairRDD(connection_string, config).values()
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 143, in mongoPairRDD
    _ensure_pickles(self)
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 80, in _ensure_pickles
    orig_tb)
py4j.protocol.Py4JError

根据here

This exception is raised when an exception occurs in the Java client code. For example, if you try to pop an element from an empty stack. The instance of the Java exception thrown is stored in the java_exception member.

查看 pymongo_spark.py 的源代码和抛出错误的行,它说

"Error while communicating with the JVM. Is the MongoDB Spark jar on Spark's CLASSPATH? : "

因此,作为回应,我试图确保传递了正确的 jar ,但我可能做错了,见下文

$SPARK_HOME/bin/spark-submit --jars /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar --driver-class-path /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar --master local[4] ~/sparkPythonExample/SparkPythonExample.py

我已经将 pymongo 导入到同一个 python 程序中,以验证我至少可以使用它访问 MongoDB,而且我可以。

我知道这里有很多事件部件,所以如果我能提供更多有用的信息,请告诉我。

最佳答案

更新:

2016-07-04

自上次更新 MongoDB Spark Connector成熟了很多。它提供up-to-date binaries和基于数据源的 API,但它使用 SparkConf 配置,因此主观上不如 Stratio/Spark-MongoDB 灵活。

2016-03-30

从最初的答案开始,我发现了两种从 Spark 连接到 MongoDB 的不同方法:

  • mongodb/mongo-spark
  • Stratio/Spark-MongoDB

虽然前者似乎相对不成熟,但后者看起来比 Mongo-Hadoop 连接器更好,并提供 Spark SQL API。

# Adjust Scala and package version according to your setup
# although officially 0.11 supports only Spark 1.5
# I haven't encountered any issues on 1.6.1
bin/pyspark --packages com.stratio.datasource:spark-mongodb_2.11:0.11.0
df = (sqlContext.read
  .format("com.stratio.datasource.mongodb")
  .options(host="mongo:27017", database="foo", collection="bar")
  .load())

df.show()

## +---+----+--------------------+
## |  x|   y|                 _id|
## +---+----+--------------------+
## |1.0|-1.0|56fbe6f6e4120712c...|
## |0.0| 4.0|56fbe701e4120712c...|
## +---+----+--------------------+

似乎比mongo-hadoop-spark稳定很多,支持predicate pushdown,无需静态配置,简单易用。

原答案:

确实,这里有很多事件部件。我试图通过构建一个与描述的配置大致匹配的简单 Docker 镜像来使其更易于管理(但为了简洁起见,我省略了 Hadoop 库)。您可以找到complete source on GitHub ( DOI 10.5281/zenodo.47882 ) 并从头开始构建它:

git clone https://github.com/zero323/docker-mongo-spark.git
cd docker-mongo-spark
docker build -t zero323/mongo-spark .

或下载我拥有的图片pushed to Docker Hub所以你可以简单地 docker pull zero323/mongo-spark):

开始图片:

docker run -d --name mongo mongo:2.6
docker run -i -t --link mongo:mongo zero323/mongo-spark /bin/bash

通过 --jars--driver-class-path 启动 PySpark shell:

pyspark --jars ${JARS} --driver-class-path ${SPARK_DRIVER_EXTRA_CLASSPATH}

最后看看它是如何工作的:

import pymongo
import pymongo_spark

mongo_url = 'mongodb://mongo:27017/'

client = pymongo.MongoClient(mongo_url)
client.foo.bar.insert_many([
    {"x": 1.0, "y": -1.0}, {"x": 0.0, "y": 4.0}])
client.close()

pymongo_spark.activate()
rdd = (sc.mongoRDD('{0}foo.bar'.format(mongo_url))
    .map(lambda doc: (doc.get('x'), doc.get('y'))))
rdd.collect()

## [(1.0, -1.0), (0.0, 4.0)]

请注意,mongo-hadoop 似乎在第一次操作后关闭了连接。所以在collect之后调用例如rdd.count()会抛出异常。

基于我在创建此图像时遇到的不同问题,我倾向于相信 passing mongo-hadoop-1.5.0-SNAPSHOT.jarmongo -hadoop-spark-1.5.0-SNAPSHOT.jar 对两个 --jars--driver-class-path 是唯一的硬性要求

注意事项:

  • 此图片大致基于 jaceklaskowski/docker-spark 所以请一定要给@jacek-laskowski发送一些好的业力|如果有帮助的话。
  • 如果不需要包含 new API 的开发版本那么使用 --packages 很可能是一个更好的选择。

https://stackoverflow.com/questions/33391840/

相关文章:

mongodb - 如何使用 Robomongo 连接到 MongoDB Atlas?

mongodb - Mongoose 是否支持 Mongodb `findAndModify` 方法

node.js - 使用 mongoose 在 mongodb 中设置集合的到期时间

java - 如何通过Java在MongoDB中一次插入多个文档

python - pymongo 中的快速或批量更新

mongodb - 如何在 MongoDB 中创建嵌套索引?

node.js - 使用 2 个字段的 Mongoose 自定义验证

java - 如何使用 Java mongodb 驱动程序中的 "_id"字段查询文档?

linux - Errr 'mongo.js:L112 Error: couldn' t 连接到服务

mongodb - 我控制修补程序上的 Mongo 错误