淘先锋技术网

首页 1 2 3 4 5 6 7

使用Java JDBC连接Hive和HBase数据库,并使用HBase Java API进行数据写入。下面是一个简单的Java代码示例,用于将Hive表中的数据同步到HBase:

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

public class Hive2HBase {
    public static void main(String[] args) throws SQLException, ClassNotFoundException, IOException {
        // Hive连接信息
        String hiveUrl = "jdbc:hive2://<hive_host>:<hive_port>/<hive_db>";
        String hiveUser = "<hive_user>";
        String hivePassword = "<hive_password>";
        String hiveTable = "<hive_table>";

        // HBase连接信息
        Configuration hbaseConfig = HBaseConfiguration.create();
        hbaseConfig.set("hbase.zookeeper.quorum", "<zookeeper_host>");
        hbaseConfig.set("hbase.zookeeper.property.clientPort", "<zookeeper_port>");
        hbaseConfig.set("zookeeper.znode.parent", "/hbase");
        String hbaseTable = "<hbase_table>";
        String hbaseCF = "<hbase_cf>";

        // 加载Hive JDBC驱动
        Class.forName("org.apache.hive.jdbc.HiveDriver");

        // 连接Hive数据库
        Connection hiveConn = DriverManager.getConnection(hiveUrl, hiveUser, hivePassword);

        // 创建Hive表查询语句
        String hiveQuery = "SELECT * FROM " + hiveTable;

        // 执行Hive表查询
        Statement hiveStmt = hiveConn.createStatement();
        ResultSet hiveRs = hiveStmt.executeQuery(hiveQuery);

        // 连接HBase数据库
        Table hbaseTable = ConnectionFactory.createConnection(hbaseConfig).getTable(TableName.valueOf(hbaseTable));

        // 执行HBase表插入
        while (hiveRs.next()) {
            // 从Hive结果集中读取数据
            String rowKey = hiveRs.getString(1);
            String col1 = hiveRs.getString(2);
            String col2 = hiveRs.getString(3);
            String col3 = hiveRs.getString(4);
            String col4 = hiveRs.getString(5);

            // 创建HBase Put对象并设置行键和列族
            Put hbasePut = new Put(Bytes.toBytes(rowKey));
            hbasePut.addColumn(Bytes.toBytes(hbaseCF), Bytes.toBytes("<hbase_col1>"), Bytes.toBytes(col1));
            hbasePut.addColumn(Bytes.toBytes(hbaseCF), Bytes.toBytes("<hbase_col2>"), Bytes.toBytes(col2));
            hbasePut.addColumn(Bytes.toBytes(hbaseCF), Bytes.toBytes("<hbase_col3>"), Bytes.toBytes(col3));
            hbasePut.addColumn(Bytes.toBytes(hbaseCF), Bytes.toBytes("<hbase_col4>"), Bytes.toBytes(col4));

            // 执行HBase表插入
            hbaseTable.put(hbasePut);
        }

        // 关闭连接
        hiveRs.close();
        hiveStmt.close();
        hiveConn.close();
        hbaseTable.close();
    }
}
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HBase2Hive {
    public static void main(String[] args) throws SQLException, ClassNotFoundException {
        // HBase连接信息
        Configuration hbaseConfig = HBaseConfiguration.create();
        hbaseConfig.set("hbase.zookeeper.quorum", "<zookeeper_host>");
        hbaseConfig.set("hbase.zookeeper.property.clientPort", "<zookeeper_port>");
        hbaseConfig.set("zookeeper.znode.parent", "/hbase");
        String hbaseTable = "<hbase_table>";

        // Hive连接信息
        String hiveUrl = "jdbc:hive2://<hive_host>:<hive_port>/<hive_db>";
        String hiveUser = "<hive_user>";
        String hivePassword = "<hive_password>";
        String hiveTable = "<hive_table>";

        // 连接HBase数据库
        Table hbaseTable = ConnectionFactory.createConnection(hbaseConfig).getTable(TableName.valueOf(hbaseTable));

        // 创建HBase表扫描器
        Scan hbaseScan = new Scan();
        Filter filter = new PageFilter(100);
        hbaseScan.setFilter(filter);

        // 执行HBase表扫描
        ResultScanner hbaseScanner = hbaseTable.getScanner(hbaseScan);

        // 加载Hive JDBC驱动
        Class.forName("org.apache.hive.jdbc.HiveDriver");

        // 连接Hive数据库
        Connection hiveConn = DriverManager.getConnection(hiveUrl, hiveUser, hivePassword);

        // 创建Hive表插入语句
        String hiveInsert = "INSERT INTO " + hiveTable + " VALUES (?,?,?,?)";

        // 执行Hive表插入
        for (Result hbaseResult : hbaseScanner) {
            // 从HBase结果集中读取数据
            String col1 = Bytes.toString(hbaseResult.getValue(Bytes.toBytes("<hbase_cf>"), Bytes.toBytes("<hbase_col1>")));
            String col2 = Bytes.toString(hbaseResult.getValue(Bytes.toBytes("<hbase_cf>"), Bytes.toBytes("<hbase_col2>")));
            String col3 = Bytes.toString(hbaseResult.getValue(Bytes.toBytes("<hbase_cf>"), Bytes.toBytes("<hbase_col3>")));
            String col4 = Bytes.toString(hbaseResult.getValue(Bytes.toBytes("<hbase_cf>"), Bytes.toBytes("<hbase_col4>")));

            // 创建Hive表插入语句并设置参数
            Statement hiveStmt = hiveConn.createStatement();
            hiveStmt.executeUpdate(hiveInsert);
            hiveStmt.setString(1, col1);
            hiveStmt.setString(2, col2);
            hiveStmt.setString(3, col3);
            hiveStmt.setString(4, col4);
            hiveStmt.close();
        }

        // 关闭连接
        hbaseScanner.close();
        hbaseTable.close();
        hiveConn.close();
    }
}