• 1
  • 2
  • 3
  • 4
  • 5
mysql数据库问题 首 页  »  帮助中心  »  数据库  »  mysql数据库问题
Spark SQL与PostgreSQL整合
发布日期:2016-4-19 16:4:57

  Spark SQL与PostgreSQL整合  

  本博客的《Spark与Mysql(JdbcRDD)整合开发》与《Spark RDD写入RMDB(Mysql)方法二》两篇文中介绍了如何通过Spark读写Mysql中的数据。

  在生产环境下,很多公司都会选择使用PostgreSQL数据库,本文将介绍如何通过Spark来获取PostgreSQL中的数据。我将使用Spark 1.3中的DataFrame,我们可以通过SQLContext加载数据库中的数据,并转成DataFrame,我们可以使用SQLContext的load方法,代码如下所示:

  1 def load(source: String, options: Map[String, String]): DataFrame = {

  2 read.options(options).format(source).load()

  3 }

  上面的代码中:options可以传入的参数包括:

  • url
  • dbtable
  • driver
  • partitionColumn
  • lowerBound
  • upperBound
  • numPartitions

  不过在Spark 1.4的版本中,这个方法已经被标记为deprecated,所以我们得调用read.format(source).options(options).load()来替代。

  那么我们的代码可以这么写,代码如下所示,如图1:

 

   在上面的代码使用中,采用的方法是直接将SQL语句直接传入到dbtable中,但很多情况下这还不符合我们的需求,可是,我们还可以通过调用registerTempTable()方法来注册临时表,并调用sql()方法执行查询,代码如下所示:

  1 val testDataFrame= sqlContext.load("jdbc", Map(

  2 "url" -> url,

  3 "driver" -> "org.postgresql.Driver",

  4 "dbtable" -> "iteblog"

  5 ))

  6

  7 testDataFrame.registerTempTable("iteblog")

  8 sqlContext.sql("select * from iteblog").foreach(println)

  最后,若你使用的是SBT来管理项目,你需要在你的build.sbt文件中添加相关的依赖,代码如下所示:

  1 libraryDependencies ++= {

  2 "org.apache.spark" % "spark-core_2.10" % "1.3.1",

  3 "org.apache.spark" % "spark-sql_2.10" % "1.3.1",

  4 "org.postgresql" % "postgresql" % "9.4-1201-jdbc41"

  5 }

  若你使用的是Maven,请在你的pom.xml文件里面加入以下依赖,代码如下所示:


图2

   本博客文章除特别声明,全部都是原创!

  尊重原创,转载请注明: 转载自过往记忆(http://www.iteblog.com/)

  本文链接: 【Spark SQL整合PostgreSQL】(http://www.iteblog.com/archives/1369)