spark2.x由浅入深深到底系列六之RDDjavaapi用JdbcRDD读取关系型数据库
学习任何的spark技术之前,请先正确理解spark,可以参考:正确理解spark

建瓯网站制作公司哪家好,找成都创新互联!从网页设计、网站建设、微信开发、APP开发、成都响应式网站建设等网站项目制作,到程序开发,运营维护。成都创新互联从2013年成立到现在10年的时间,我们拥有了丰富的建站经验和运维经验,来保证我们的工作的顺利进行。专注于网站建设就选成都创新互联。
以下是用spark RDD java api实现从关系型数据库中读取数据,这里使用的是derby本地数据库,当然可以是MySQL或者oracle等关系型数据库:
package com.twq.javaapi.java7;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.JdbcRDD;
import java.io.Serializable;
import java.sql.*;
public class JavaJdbcRDDSuite implements Serializable {
public static void prepareData() throws ClassNotFoundException, SQLException {
//使用本地数据库derby,当然可以使用mysql等关系型数据库
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
Connection connection =
DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true");
try {
//创建一张表FOO,ID是一个自增的主键,DATA是一个INTEGER列
Statement create = connection.createStatement();
create.execute(
"CREATE TABLE FOO(" +
"ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +
"DATA INTEGER)");
create.close();
//插入数据
PreparedStatement insert = connection.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)");
for (int i = 1; i <= 5; i++) {
insert.setInt(1, i * 2);
insert.executeUpdate();
}
insert.close();
} catch (SQLException e) {
// If table doesn't exist...
if (e.getSQLState().compareTo("X0Y32") != 0) {
throw e;
}
} finally {
connection.close();
}
}
public static void shutdownDB() throws SQLException {
try {
DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;shutdown=true");
} catch (SQLException e) {
// Throw if not normal single database shutdown
// https://db.apache.org/derby/docs/10.2/ref/rrefexcept71493.html
if (e.getSQLState().compareTo("08006") != 0) {
throw e;
}
}
}
public static void main(String[] args) throws Exception {
JavaSparkContext sc = new JavaSparkContext("local", "JavaAPISuite");
//准备数据
prepareData();
//构建JdbcRDD
JavaRDD rdd = JdbcRDD.create(
sc,
new JdbcRDD.ConnectionFactory() {
@Override
public Connection getConnection() throws SQLException {
return DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb");
}
},
"SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
1, 5, 1,
new Function() {
@Override
public Integer call(ResultSet r) throws Exception {
return r.getInt(1);
}
}
);
//结果: [2, 4, 6, 8, 10]
System.out.println(rdd.collect());
shutdownDB();
sc.stop();
}
} 详细了解RDD的api的话,可以参考: spark core RDD api原理详解
分享名称:spark2.x由浅入深深到底系列六之RDDjavaapi用JdbcRDD读取关系型数据库
标题来源:http://www.cqwzjz.cn/article/jgjceo.html


咨询
建站咨询
