Spark SQL支持通过JDBC直接读取数据库中的数据,这个特性是基于JdbcRDD实现。返回值作为DataFrame返回,这样可以直接使用Spark SQL并跟其他的数据源进行join操作。JDBC数据源可以很简单的通过Java或者Python,而不需要提供ClassTag。注意这与Spark SQL JDBC server不同,后者是基于Spark SQL执行查询。
要保证能使用需要把对应的jdbc驱动放到spark的classpath中。比如,想要连接postgres可以在启动命令中添加jars:
bin/spark-shell
--driver-class-path postgresql-9.4.1207.jar
--jars postgresql-9.4.1207.jar
远程数据库的表可以加载成DataFrame或者注册成Spark SQL的临时表,用户可以在数据源选项中配置JDBC相关的连接参数。user和password一般是必须提供的参数,另外一些参数可以参考下面的列表:
url
JDBC连接url,比如jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable
需要读取或者写入的JDBC表,注意里面的内容将会作为from后面的部分,比如 select * from 。注意不能同时配置dbtable和query。
query
query用于指定from后面的子查询,拼接成的sql如下:SELECT FROM () spark_gen_alias 。注意dbtable和query不能同时使用;不允许同时使用partitionColumn和query。
注意:这里的dbtable和query其实没有太大的区别,只是query会默认套一层别名而已。
driver
jdbc驱动driver
partitionColumn, lowerBound, upperBound
指定时这三项需要同时存在,描述了worker如何并行读取数据库。其中partitionColumn必须是数字、date、timestamp,lowerBound和upperBound只是决定了分区的步长,而不会过滤数据,因此表中所有的数据都会被分区返回。该参数仅用于读。
numPartitions
读写时的最大分区数。这也决定了连接JDBC的最大连接数,如果并行度超过该配置,将会使用coalesce(partition)来降低并行度。
queryTimeout
driver执行statement的等待时间,0意味着没有限制。写入的时候这个选项依赖于底层是如何实现setQueryTimeout的,比如h2 driver会检查每个query。默认是0
fetchSize
fetch的大小,决定了每一个fetch,拉取多少数据量。这个参数帮助针对默认比较小的驱动进行调优,比如oracle默认是10行。仅用于读操作。
batchSize
batch大小,决定插入时的并发大小,默认1000。
isolationLvel
事务隔离的等级,作用于当前连接。可以配置成NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, SERIALIZABLE, 依赖于底层jdbc提供的事务隔离,默认是READ_UNCOMMITTED。这个选项仅用于写操作。
sessionInitStatment
每个数据库session创建前执行的操作,用于初始化。如定义一些触发器操作。如 BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;
truncate
写操作选项,当使用SaveMode.Overwrite时,该选项用于是否直接删除并重建表。当表结构发现变化的时候会失效。默认是false。
cascadeTruncate
写操作选项,是否开启级联删除。
createTableOptions
写操作选项,一般用于配置特殊的分区或者数据库配置,比如 CREATE TABLE t (name string) ENGINE=InnoDB
createTableColumnTypes
配置数据库字段的类型,比如 name CHAR(64), comments VARCHAR(1024),仅支持spark sql中支持的数据类型。
customSchema
自定义读取的schema信息,比如 id DECIMAL(38, 0), name STRING 。可以配置部分字段,其他的使用默认的类型映射,比如 id DECIMAL(38, 0)。仅用于读操作。
pushDownPredicate
该选项用于开启或禁用jdbc数据源的谓词下推。默认是true。如果配置为false,那么所有的filter操作都会由spark来完成。当过滤操作用spark更快时,一般才会关闭下推功能。
// 加载jdbc
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
// 使用propeties
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// 指定自定义的schema信息
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// 保存jdbc
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// 指定自定义schema映射
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)