淘先锋技术网

首页 1 2 3 4 5 6 7

13f7373284246d5d37932ebc81134000.png

Spark SQL支持通过JDBC直接读取数据库中的数据,这个特性是基于JdbcRDD实现。返回值作为DataFrame返回,这样可以直接使用Spark SQL并跟其他的数据源进行join操作。JDBC数据源可以很简单的通过Java或者Python,而不需要提供ClassTag。注意这与Spark SQL JDBC server不同,后者是基于Spark SQL执行查询。

要保证能使用需要把对应的jdbc驱动放到spark的classpath中。比如,想要连接postgres可以在启动命令中添加jars:

bin/spark-shell 
--driver-class-path postgresql-9.4.1207.jar 
--jars postgresql-9.4.1207.jar

远程数据库的表可以加载成DataFrame或者注册成Spark SQL的临时表,用户可以在数据源选项中配置JDBC相关的连接参数。user和password一般是必须提供的参数,另外一些参数可以参考下面的列表:

url

JDBC连接url,比如jdbc:postgresql://localhost/test?user=fred&password=secret

dbtable

需要读取或者写入的JDBC表,注意里面的内容将会作为from后面的部分,比如 select * from 。注意不能同时配置dbtable和query。

query

query用于指定from后面的子查询,拼接成的sql如下:SELECT FROM () spark_gen_alias 。注意dbtable和query不能同时使用;不允许同时使用partitionColumn和query。

注意:这里的dbtable和query其实没有太大的区别,只是query会默认套一层别名而已。

driver

jdbc驱动driver

partitionColumn, lowerBound, upperBound

指定时这三项需要同时存在,描述了worker如何并行读取数据库。其中partitionColumn必须是数字、date、timestamp,lowerBound和upperBound只是决定了分区的步长,而不会过滤数据,因此表中所有的数据都会被分区返回。该参数仅用于读。

numPartitions

读写时的最大分区数。这也决定了连接JDBC的最大连接数,如果并行度超过该配置,将会使用coalesce(partition)来降低并行度。

queryTimeout

driver执行statement的等待时间,0意味着没有限制。写入的时候这个选项依赖于底层是如何实现setQueryTimeout的,比如h2 driver会检查每个query。默认是0

fetchSize

fetch的大小,决定了每一个fetch,拉取多少数据量。这个参数帮助针对默认比较小的驱动进行调优,比如oracle默认是10行。仅用于读操作。

batchSize

batch大小,决定插入时的并发大小,默认1000。

isolationLvel

事务隔离的等级,作用于当前连接。可以配置成NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, SERIALIZABLE, 依赖于底层jdbc提供的事务隔离,默认是READ_UNCOMMITTED。这个选项仅用于写操作。

sessionInitStatment

每个数据库session创建前执行的操作,用于初始化。如定义一些触发器操作。如 BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;

truncate

写操作选项,当使用SaveMode.Overwrite时,该选项用于是否直接删除并重建表。当表结构发现变化的时候会失效。默认是false。

cascadeTruncate

写操作选项,是否开启级联删除。

createTableOptions

写操作选项,一般用于配置特殊的分区或者数据库配置,比如 CREATE TABLE t (name string) ENGINE=InnoDB

createTableColumnTypes

配置数据库字段的类型,比如 name CHAR(64), comments VARCHAR(1024),仅支持spark sql中支持的数据类型。

customSchema

自定义读取的schema信息,比如 id DECIMAL(38, 0), name STRING 。可以配置部分字段,其他的使用默认的类型映射,比如 id DECIMAL(38, 0)。仅用于读操作。

pushDownPredicate

该选项用于开启或禁用jdbc数据源的谓词下推。默认是true。如果配置为false,那么所有的filter操作都会由spark来完成。当过滤操作用spark更快时,一般才会关闭下推功能。

// 加载jdbc
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

// 使用propeties
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")

val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// 指定自定义的schema信息
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// 保存jdbc
jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

jdbcDF2.write
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// 指定自定义schema映射
jdbcDF.write
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)