有时候需要连接第三方的各种数据源,总是要去写不同的代码,于是将MaxCompute, Hive, Oracle, MySQL等JDBC连接封装起来,只需要传入不同的参数即可创建一个不同类型的连接池。
连接参数基础类封装
封装了JDBC基础的连接参数,如果不需要这些属性可以继承该类,增加新的属性即可。
@Datapublic class BaseJdbcConnParam implements Serializable { /** * driver name */ private String driverName; /** * IP */ private String ip; /** * db server port */ private Integer port; /** * db name */ private String dbName; /** * db connection username */ private String username; /** * db connection password */ private String password;}
抽象连接工具类封装
功能如下:
/** * @Description 抽象连接工具类父类 * @Author itdl * @Date 2022/08/15 09:54 */public abstract class AbstractConnUtil<P extends BaseJdbcConnParam> { /** * connection params */ protected final P connParam; /** * jdbc connection object */ protected final Connection connection; /** * 构造函数, 构造工具类对象 * @param connParam 连接参数 */ public AbstractConnUtil(P connParam) { this.connParam = connParam; this.connection = buildConnection(); } /** * 构建连接对象 * @return 连接对象 */ protected abstract Connection buildConnection(); /** * 获取连接 */ public Connection getConnection() { return connection; }}
连接池管理
功能如下:
/** * @Description 连接池管理 * @Author itdl * @Date 2022/08/16 09:42 */@Slf4jpublic class DbConnPool<T extends BaseJdbcConnParam> { /** * 用于存放连接 */ private final LinkedList<Connection> connPool = new LinkedList<Connection>(); /** * 最大连接池数量 */ private final Integer maxPoolSize; private final T connParam; /** * 构造函数 * @param connParam 连接参数 * @param maxPoolSize 连接池大小 */ public DbConnPool(T connParam, Integer maxPoolSize) { this.maxPoolSize = maxPoolSize; this.connParam = connParam; // 初始化连接池 for (int i = 0; i < maxPoolSize; i++) { connPool.addLast(this.createConnection()); } } /** * 创建数据库连接 * @return 连接 */ private Connection createConnection() { if (connParam instanceof OracleJdbcConnParam){ final OracleConnUtil util = new OracleConnUtil((OracleJdbcConnParam) connParam); return util.getConnection(); } if (connParam instanceof HiveJdbcConnParam){ final HiveConnUtil util = new HiveConnUtil((HiveJdbcConnParam) connParam); return util.getConnection(); } if (connParam instanceof MysqlJdbcConnParam){ final MysqlConnUtil util = new MysqlConnUtil((MysqlJdbcConnParam) connParam); return util.getConnection(); } if (connParam instanceof MaxComputeJdbcConnParam){ final MaxComputeJdbcUtil util = new MaxComputeJdbcUtil((MaxComputeJdbcConnParam) connParam); return util.getConnection(); } throw new BizException(ResultCode.CONN_TYPE_NOT_SUPPORT); } /** * 获取连接 * @return 连接 */ public synchronized Connection getConnection(){ if (connPool.size() == 0){// throw new BizException(ResultCode.CONN_POOL_EMPTY_ERR); // 最长等待十分钟 try { log.info("==========连接池已经空了, 请等待其他线程释放=========="); wait(10 * 60 * 1000); } catch (InterruptedException e) { log.info("==========连接池已经空了, 等待了10分钟还没有释放,抛出异常=========="); e.printStackTrace(); throw new BizException(ResultCode.CONN_POOL_EMPTY_ERR); } } // 去除最上面一个连接 如果没有连接了,将会抛出异常 return connPool.removeFirst(); } /** * 用完后释放连接 * @param conn 要释放的连接 */ public synchronized void freeConnection(Connection conn){ // 通知连接已经释放 notifyAll(); this.connPool.addLast(conn); } /** * 关闭连接池 */ public synchronized void close(){ for (Connection connection : connPool) { SqlUtil.close(connection); } }}
SQL操作工具类
根据连接对象Connection和数据库房源,封装不同的sql执行。执行SQL核心功能封装。
/** * @Description SQL操作工具类 * @Author itdl * @Date 2022/08/10 17:13 */@Slf4jpublic class SqlUtil { /**查询mysql表注释sql*/ public static final String SELECT_TABLES_MYSQL = "select table_name, table_comment from information_schema.tables where TABLE_SCHEMA = '%s'"; /**查询MaxCompute表注释sql*/ public static final String SELECT_TABLES_MAX_COMPUTE = "select table_name, table_comment from information_schema.tables where TABLE_SCHEMA = '%s'"; /**查询oracle表注释sql*/ public static final String SELECT_TABLES_ORACLE = "SELECT t2.TABLE_NAME as table_name, t2.COMMENTS as table_comment FROM user_tables t1 inner join user_tab_comments t2 on t1.TABLE_NAME = t2.TABLE_NAME"; /**查询hive表注释sql, 先查询表名,根据表名获取建表语句,正则提取表注释*/ public static final String SELECT_TABLES_HIVE = "show tables"; public static final String SELECT_TABLES_2_HIVE = "describe extended %s"; /**分页数量统计Mysql*/ private static final String SELECT_COUNT_MYSQL = "select count(1) from (%s) z"; /**分页数量统计MaxCompute*/ private static final String SELECT_COUNT_MAX_COMPUTE = "select count(1) from (%s) z;"; /**分页数量统计Hive*/ private static final String SELECT_COUNT_ORACLE = "select count(1) from (%s) z"; /**分页数量统计Oracle*/ private static final String SELECT_COUNT_HIVE = "select count(1) from (%s) z"; /**maxCompute开启全表扫描sql*/ private static final String FULL_SCAN_MAX_COMPUTE = "set odps.sql.allow.fullscan=true;"; /**分页查询sql-Mysql*/ private static final String SELECT_PAGE_MYSQL = "select z.* from (%s) z limit %s, %s"; /**分页查询sql-MaxCompute*/ private static final String SELECT_PAGE_MAX_COMPUTE = "select z.* from (%s) z limit %s, %s;"; /**分页查询sql-Hive*/ private static final String SELECT_PAGE_HIVE = "select * from (select row_number() over () as row_num_01,u.* from (%s) u) mm where mm.row_num_01 between %s and %s"; /**分页查询sql-Oracle*/ private static final String SELECT_PAGE_ORACLE = "select * from (SELECT ROWNUM as row_num_01,z.* from (%s) z) h where h.row_num_01 > %s and h.row_num_01 <= %s"; /**数据库连接*/ private final Connection connection; /**数据库方言*/ private final Integer dbDialect; /**支持的方言列表*/ private static final List<Integer> supportDbTypes = Arrays.asList(DbDialectEnum.ORACLE.getCode(), DbDialectEnum.HIVE.getCode(), DbDialectEnum.MYSQL.getCode(), DbDialectEnum.MAX_COMPUTE.getCode()); public SqlUtil(Connection connection, Integer dbDialect) { if (!supportDbTypes.contains(dbDialect)){ throw new BizException(ResultCode.CONN_TYPE_NOT_SUPPORT); } this.connection = connection; this.dbDialect = dbDialect; } /** * 根据connection获取所有的表和对应的注释 */ public List<TableMetaInfo> getTables(String schemaName){ List<TableMetaInfo> result = new ArrayList<>(); String sql = ""; switch (this.dbDialect){ case 1: sql = SELECT_TABLES_ORACLE; break; case 2: sql = SELECT_TABLES_HIVE; break; case 3: if (StringUtils.isBlank(schemaName)){ throw new BizException(ResultCode.SELECT_TABLES_SCHEMA_NOT_NULL_ERR); } sql = String.format(SELECT_TABLES_MYSQL, schemaName); break; case 4: if (StringUtils.isBlank(schemaName)){ throw new BizException(ResultCode.SELECT_TABLES_SCHEMA_NOT_NULL_ERR); } sql = String.format(SELECT_TABLES_MAX_COMPUTE, schemaName); default: break; } if (StringUtils.isBlank(sql)){ throw new BizException(ResultCode.CONN_TYPE_NOT_SUPPORT); } // 执行SQL语句 final List<LinkedHashMap<String, Object>> resultMaps = querySql(sql); if (ObjectUtils.isEmpty(resultMaps)){ return Lists.newArrayList(); } // hive单独处理 List<TableMetaInfo> result1 = getHiveTableMetaInfos(result, resultMaps); if (result1 != null) return result1; // 转换结果 return resultMaps.stream().map( m->{ final TableMetaInfo info = new TableMetaInfo(); Object tableNameObj = m.get("table_name"); String tableName = tableNameObj == null ? m.get("TABLE_NAME") == null ? "" : String.valueOf(m.get("TABLE_NAME")) : String.valueOf(tableNameObj); Object tableCommentObj = m.get("table_comment"); String tableComment = tableCommentObj == null ? m.get("TABLE_COMMENT") == null ? "" : String.valueOf(m.get("TABLE_COMMENT")) : String.valueOf(tableCommentObj); info.setTableName(tableName); info.setComment(tableComment); return info; } ).collect(Collectors.toList()); } /** * 根据schemeName,表名获取字段列表 * @param tableName 一般是数据库 oracle是用户名 */ public List<TableColumnMetaInfo> getColumnsByTableName(String tableName){ try { List<TableColumnMetaInfo> list = new ArrayList<>(); final DatabaseMetaData metaData = connection.getMetaData(); final ResultSet columns = metaData.getColumns(null, null, tableName, null); while (columns.next()){ String columnName = columns.getString("COLUMN_NAME"); String remarks = columns.getString("REMARKS"); remarks = StringUtils.isBlank(remarks) ? "" : remarks; final TableColumnMetaInfo metaInfo = new TableColumnMetaInfo(tableName, columnName, remarks); list.add(metaInfo); } return list; } catch (SQLException e) { e.printStackTrace(); return Lists.newArrayList(); } } /** * 执行sql查询 * @param querySql 查询sql * @return List<Map<String, Object>> 通过LinkedHashMap接受,序列化时可保证顺序一致 */ public List<LinkedHashMap<String, Object>> queryData(String querySql, boolean... fullScan){ Statement statement = null; ResultSet resultSet = null; try { // 创建statement statement = this.connection.createStatement(); // 执行全表扫描sql for (boolean b : fullScan) { if (b){ statement.execute(FULL_SCAN_MAX_COMPUTE); break; } } // 执行查询语句 resultSet = statement.executeQuery(querySql); // 构建结果返回 return buildListMap(resultSet); } catch (SQLException e) { e.printStackTrace(); throw new BizException(ResultCode.SQL_EXEC_ERR); } finally { // 关闭resultSet, statement close(resultSet, statement); } } /** * 执行sql查询 * @param querySql 查询sql * @return List<Map<String, Object>> */ public List<LinkedHashMap<String, Object>> queryData(String querySql, Integer page, Integer size){ Statement statement = null; ResultSet resultSet = null; try { // 1、替换分号 querySql = querySql.replaceAll(";", ""); // 创建statement statement = this.connection.createStatement(); // 2、格式化SQL int offset = (page - 1 ) * size; String execSql = ""; switch (this.dbDialect){ case 1: // oracle execSql = String.format(SELECT_PAGE_ORACLE, querySql, offset, size); break; case 2: // hive execSql = String.format(SELECT_PAGE_HIVE, querySql, offset, size); break; case 3: // mysql execSql = String.format(SELECT_PAGE_MYSQL, querySql, offset, size); break; case 4: // maxCompute execSql = String.format(SELECT_PAGE_MAX_COMPUTE, querySql, offset, size); break; default: break; } // maxCompute开启全表扫描 if (DbDialectEnum.MAX_COMPUTE.getCode().equals(this.dbDialect)){ statement.execute(FULL_SCAN_MAX_COMPUTE); } log.info("=======>>>执行分页sql为:{}", execSql); // 执行查询语句 resultSet = statement.executeQuery(execSql); // 构建结果返回 return buildListMap(resultSet); } catch (SQLException e) { e.printStackTrace(); throw new BizException(ResultCode.SQL_EXEC_ERR); } finally { // 关闭resultSet, statement close(resultSet, statement); } } /** * 执行分页查询 * @param querySql 分页查询sql * @param page 页码 从1开始 第n页传n * @param size 每页记录数 * @return 分页查询结果 */ public PageResult<LinkedHashMap<String, Object>> pageQueryMap(String querySql, Integer page, Integer size){ // 1、替换分号 querySql = querySql.replaceAll(";", ""); String countSql = ""; switch (this.dbDialect){ case 1: // oracle countSql = String.format(SELECT_COUNT_ORACLE, querySql); break; case 2: // hive countSql = String.format(SELECT_COUNT_HIVE, querySql); break; case 3: // mysql countSql = String.format(SELECT_COUNT_MYSQL, querySql); break; case 4: // maxCompute countSql = String.format(SELECT_COUNT_MAX_COMPUTE, querySql); break; default: break; } log.info("=======>>>执行分页统计总数sql为:{}", countSql); // 查询总数 final List<LinkedHashMap<String, Object>> countMap = queryData(countSql, DbDialectEnum.MAX_COMPUTE.getCode().equals(this.dbDialect)); if (CollectionUtils.isEmpty(countMap)){ return new PageResult<>(0L, new ArrayList<>()); } long count = 0L; for (Object value : countMap.get(0).values()) { count = Long.parseLong(String.valueOf(value)); } if (count == 0){ return new PageResult<>(0L, new ArrayList<>()); } // 执行分页查询 开启全表扫描 final List<LinkedHashMap<String, Object>> resultList = queryData(querySql, page, size); return new PageResult<>(count, resultList); } /** * 执行分页查询 * @param querySql 分页查询sql * @param page 页码 从1开始 第n页传n * @param size 每页记录数 * @return 分页查询结果 */ public <T>PageResult<T> pageQuery(String querySql, Integer page, Integer size, Class<T> clazz){ final PageResult<LinkedHashMap<String, Object>> result = pageQueryMap(querySql, page, size); List<T> rows = new ArrayList<>(); for (LinkedHashMap<String, Object> row : result.getRows()) { final T t = JSONObject.parseobject(JSONObject.toJSONString(row), clazz); rows.add(t); } return new PageResult<>(result.getTotal(), rows); } /** * 获取hive的表注释 * @param result 结果 * @param resultMaps show tables结果 * @return List<TableMetaInfo> */ private List<TableMetaInfo> getHiveTableMetaInfos(List<TableMetaInfo> result, List<LinkedHashMap<String, Object>> resultMaps) { if (dbDialect.equals(DbDialectEnum.HIVE.getCode())){ for (LinkedHashMap<String, Object> resultMap : resultMaps) { final String tabName = String.valueOf(resultMap.get("tab_name")); final String descTableCommentSql = String.format(SELECT_TABLES_2_HIVE, tabName); List<LinkedHashMap<String, Object>> resultMapsComments = querySql(descTableCommentSql);// col_name -> Detailed Table Information String comments = resultMapsComments.stream() .filter(m -> "Detailed Table Information".equals(m.get("col_name"))) .map(m -> String.valueOf(m.get("data_type"))).findFirst() .orElse(""); comments = ReUtil.get("parameters:\{(?!.*?\().*transient_lastDdlTime.*?comment=(.*?)\}", comments,1); if (StringUtils.isBlank(comments)) { comments = ""; } if (comments.contains(",")){ comments = comments.substring(0, comments.lastIndexOf(",")); } result.add(new TableMetaInfo(tabName, comments)); log.info("===========>>>获取表{}的注释成功:{}", tabName, comments); resultMapsComments.clear(); } return result; } return null; } /** * 执行SQL查询 * @param sql sql语句 * @return 数据列表,使用LinkedHashMap是为了防止HashMap序列化后导致顺序乱序 */ public List<LinkedHashMap<String, Object>> querySql(String sql){ // 执行sql Statement statement = null; ResultSet resultSet = null; try { statement = connection.createStatement(); resultSet = statement.executeQuery(sql); return buildListMap(resultSet); } catch (SQLException e) { e.printStackTrace(); throw new BizException(ResultCode.SQL_EXEC_ERR); }finally { // 关闭 close(resultSet, statement); } } /** * 关闭对象 传入多个时注意顺序, 需要先关闭哪个就传在参数前面 * @param objs 对象动态数组 */ public static void close(Object ...objs){ if (objs == null || objs.length == 0){ return; } for (Object obj : objs) { if (obj instanceof Statement){ try { ((Statement) obj).close(); }catch (Exception e){ e.printStackTrace(); } } if (obj instanceof ResultSet){ try { ((ResultSet) obj).close(); }catch (Exception e){ e.printStackTrace(); } } if (obj instanceof Connection){ try { ((Connection) obj).close(); }catch (Exception e){ e.printStackTrace(); } } } } /** * @Description 功能描述:将resultSet构造为List<Map> * @Author itdl * @Date 2022/4/18 21:13 * @Param {@link ResultSet} resultSet * @Return {@link List < Map <String,Object>>} **/ private List<LinkedHashMap<String, Object>> buildListMap(ResultSet resultSet) throws SQLException { if (resultSet == null) { return Lists.newArrayList(); } List<LinkedHashMap<String, Object>> resultList = new ArrayList<>(); // 获取元数据 ResultSetMetaData metaData = resultSet.getMetaData(); while (resultSet.next()) { // 获取列数 int columnCount = metaData.getColumnCount(); LinkedHashMap<String, Object> map = new LinkedHashMap<>(); for (int i = 0; i < columnCount; i++) { String columnName = metaData.getColumnName(i + 1); // 过滤掉查询的结果包含序号的 if("mm.row_num_01".equalsIgnoreCase(columnName) || "row_num_01".equalsIgnoreCase(columnName)){ continue; } // 去除hive查询结果的mm.别名前缀 if (columnName.startsWith("mm.")){ columnName = columnName.substring(columnName.indexOf(".") + 1); } Object object = resultSet.getObject(columnName); // maxCompute里面的空返回的是使用n if ("\N".equalsIgnoreCase(String.valueOf(object))) { map.put(columnName, ""); } else { map.put(columnName, object); } } resultList.add(map); } return resultList; }}
MaxCompute JDBC连接池封装
MaxCompute 已经有了JDBC连接方式 也就是 odbc-jdbc, 最终能够获取一个Connection. 官方文档:
https://help.aliyun.com/document_detail/161246.html
封装MaxCompute JDBC连接参数
/** * @author itdl * @description maxCompute使用JDBC的连接参数 * @date 2022/08/08 10:07 */@Datapublic class MaxComputeJdbcConnParam extends BaseJdbcConnParam{ /**阿里云accessId 相当于用户名 */ private String aliyunAccessId; /**阿里云accessKey 相当于密码 */ private String aliyunAccessKey; /** maxcompute_endpoint */ private String endpoint; /**项目名称*/ private String projectName;}
封装MaxCompute JDBC连接实现类
就是实现父类AbstractConnUtil,实现抽象方法buildConnection
/** * @Description maxCompute JDBC连接实现 * @Author itdl * @Date 2022/08/08 14:26 */@Slf4jpublic class MaxComputeJdbcUtil extends AbstractConnUtil<MaxComputeJdbcConnParam>{ /**JDBC 驱动名称*/ private static final String DRIVER_NAME = "com.aliyun.odps.jdbc.OdpsDriver"; /** * 构造函数, 构造工具类对象 * * @param connParam 连接参数 */ public MaxComputeJdbcUtil(MaxComputeJdbcConnParam connParam) { super(connParam); } @Override protected Connection buildConnection() { return buildConn(); } /** * 创建连接 * @return 数据库连接 */ private Connection buildConn() { try { Class.forName(DRIVER_NAME); } catch (ClassNotFoundException e) { e.printStackTrace(); throw new BizException(ResultCode.MAX_COMPUTE_DRIVE_LOAD_ERR); } try { Properties dbProperties = new Properties(); dbProperties.put("user", connParam.getAliyunAccessId()); dbProperties.put("password", connParam.getAliyunAccessKey()); dbProperties.put("remarks", "true"); // JDBCURL连接模板 String jdbcUrlTemplate = "jdbc:odps:%s?project=%s&useProjectTimeZone=true"; // 使用驱动管理器连接获取连接 return DriverManager.getConnection( String.format(jdbcUrlTemplate, connParam.getEndpoint(), connParam.getProjectName()), dbProperties); } catch (SQLException e) { e.printStackTrace(); throw new BizException(ResultCode.CONN_USER_PWD_ERR); } }}
连接测试代码一起放在结尾,将会开启多个线程获取连接,然后去获取表名,表注释,字段名,字段注释,传入page, size和普通sql就可以实现分页查询的封装方法
Hive JDBC连接池封装
Hive JDBC连接参数
Hive连接参数封装,除了基础的JDBC所需字段,还需要kerberos相关字段,因为hive开启kerberos认证后,需要使用kertab密钥文件和kbr5.conf配置文件去认证。将会在参数和测试代码中得到重复的体现。
/** * @Description Hive JDBC connection params * @Author itdl * @Date 2022/08/10 16:40 */@Data@EqualsAndHashCode(callSuper = false)public class HiveJdbcConnParam extends BaseJdbcConnParam { /** * enable kerberos authentication */ private boolean enableKerberos; /** * principal */ private String principal; /** * kbr5 file path in dick */ private String kbr5FilePath; /** * keytab file path in dick */ private String keytabFilePath;}
Hive JDBC获取连接实现
Hive获取JDBC连接之后,本来可以从Connection的元数据中获取表的注释,但是获取的中文注释居然是乱码,但是我们Hue上查看表注释又是正常,暂时没找到这种方式如何解决,从而退而求其次,通过表名去获取建表语句,从建表语句中通过正则表达式提取表的注释。
/** * @Description hive connection util * @Author itdl * @Date 2022/08/10 16:52 */@Slf4jpublic class HiveConnUtil extends AbstractConnUtil<HiveJdbcConnParam>{ public HiveConnUtil(HiveJdbcConnParam connParam) { super(connParam); } /** * 获取连接 * @return 连接 */ public Connection getConnection() { return connection; } @Override protected Connection buildConnection(){ try {// Class.forName("org.Apache.hive.jdbc.HiveDriver"); Class.forName(connParam.getDriverName()); } catch (ClassNotFoundException e) { e.printStackTrace(); throw new BizException(ResultCode.HIVE_DRIVE_LOAD_ERR); } // 开启kerberos后需要私钥 // 拼接jdbcUrl String jdbcUrl = "jdbc:hive2://%s:%s/%s"; String ip = connParam.getIp(); String port = connParam.getPort() + ""; String dbName = connParam.getDbName(); final String username = connParam.getUsername(); final String password = connParam.getPassword(); // is enable kerberos authentication final boolean enableKerberos = connParam.isEnableKerberos(); // 格式化 Connection connection; // 获取连接 try { Properties dbProperties = new Properties(); dbProperties.put("user", username); dbProperties.put("password", password); // 加上remark后, 能够获取到标注释 但是会出现中文乱码 dbProperties.put("remarks", "true"); if (!enableKerberos) { jdbcUrl = String.format(jdbcUrl, ip, port, dbName); connection = DriverManager.getConnection(jdbcUrl, dbProperties); } else { final String principal = connParam.getPrincipal(); final String kbr5FilePath = connParam.getKbr5FilePath(); final String secretFilePath = connParam.getKeytabFilePath(); String format = "jdbc:hive2://%s:%s/%s;principal=%s"; jdbcUrl = String.format(format, ip, port, dbName, principal); // 使用hadoop安全认证 System.setProperty("JAVA.security.krb5.conf", kbr5FilePath); System.setProperty("javax.security.auth.useSubjectCredsOnly", "false"); // 解决windows中执行可能出现找不到HADOOP_HOME或hadoop.home.dir问题 // Kerberos认证 org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); conf.set("hadoop.security.authentication", "Kerberos"); conf.set("keytab.file", secretFilePath); conf.set("kerberos.principal", principal); UserGroupInformation.setConfiguration(conf); try { UserGroupInformation.loginUserFromKeytab(username, secretFilePath); } catch (IOException e) { e.printStackTrace(); throw new BizException(ResultCode.KERBEROS_AUTH_FAIL_ERR); } try { connection = DriverManager.getConnection(jdbcUrl, dbProperties); } catch (SQLException e) { e.printStackTrace(); throw new BizException(ResultCode.KERBEROS_AUTH_SUCCESS_GET_CONN_FAIL_ERR); } } log.info("=====>>>获取hive连接成功:username:{},jdbcUrl: {}", username, jdbcUrl); return connection; } catch (SQLException e) { e.printStackTrace(); throw new BizException(ResultCode.HIVE_CONN_USER_PWD_ERR); } catch (BizException e){ throw e; } catch (Exception e) { e.printStackTrace(); throw new BizException(ResultCode.HIVE_CONN_ERR); } }}
Oracle JDBC连接参数封装
只需要继承父类即可
/** * @Description Oracle连接的JDBC参数 * @Author itdl * @Date 2022/08/15 09:50 */public class OracleJdbcConnParam extends BaseJdbcConnParam{ }
Oracle JDBC连接实现类
包括了普通用户的认证和dba用户的认证
/** * @Description Oracle获取jdbc连接工具类 * @Author itdl * @Date 2022/08/15 09:52 */@Slf4jpublic class OracleConnUtil extends AbstractConnUtil<OracleJdbcConnParam> { /** * 构造函数, 构造工具类对象 * * @param connParam 连接参数 */ public OracleConnUtil(OracleJdbcConnParam connParam) { super(connParam); } @Override protected Connection buildConnection() { try { Class.forName("oracle.jdbc.driver.OracleDriver"); } catch (ClassNotFoundException e) { e.printStackTrace(); throw new BizException(ResultCode.ORACLE_DRIVE_LOAD_ERR); } // 拼接jdbcUrl String jdbcUrl = "jdbc:oracle:thin:@//%s:%s/%s"; final String ip = connParam.getIp(); final String port = connParam.getPort() + ""; final String dbName = connParam.getDbName(); final String username = connParam.getUsername(); final String password = connParam.getPassword(); // 格式化 jdbcUrl = String.format(jdbcUrl, ip, port, dbName); // 获取连接 Connection connection; try { Properties dbProperties = new Properties(); // 用户名 如果是dba,则后面跟了as sysdba String dba = "as sysdba"; dbProperties.put("password", password); dbProperties.put("remarks", "true"); if (username.trim().endsWith(dba)) { dbProperties.put("user", username.trim().substring(0, username.trim().indexOf(dba) - 1)); dbProperties.put("defaultRowPrefetch", "15"); dbProperties.put("internal_logon", "sysdba"); connection = DriverManager.getConnection(jdbcUrl, dbProperties); } else { dbProperties.put("user", username); connection = DriverManager.getConnection(jdbcUrl, dbProperties); } log.info("=====>>>获取oracle连接成功:username:{}, jdbcUrl: {}", username, jdbcUrl); return connection; } catch (SQLException e) { e.printStackTrace(); if (e.getMessage().contains("TNS:listener")) { throw new BizException(ResultCode.CONN_LISTENER_UNKNOWN_ERR); } if (e.getMessage().contains("ORA-01017")) { throw new BizException(ResultCode.CONN_USER_PWD_ERR); } if (e.getMessage().contains("IO 错误: Got minus one from a read call")) { throw new BizException(ResultCode.CONN_CONN_TOO_MANY_ERR); } throw new BizException(ResultCode.CONN_UNKNOWN_ERR); } catch (Exception e) { throw new BizException(ResultCode.CONN_UNKNOWN_ERR); } }}
Mysql JDBC连接池封装
Mysql JDBC连接参数封装
只需要继承父类即可
/** * @Description Mysql连接的JDBC参数 * @Author itdl * @Date 2022/08/15 09:50 */public class MysqlJdbcConnParam extends BaseJdbcConnParam{ }
Mysql JDBC连接实现
需要注意的是连接的属性里面配置useInformationSchema=true,表示可以直接从Connection中获取表和字段的注释。
/** * @Description Mysql获取jdbc连接工具类 * @Author itdl * @Date 2022/08/15 09:52 */@Slf4jpublic class MysqlConnUtil extends AbstractConnUtil<MysqlJdbcConnParam> { /** * 构造函数, 构造工具类对象 * * @param connParam 连接参数 */ public MysqlConnUtil(MysqlJdbcConnParam connParam) { super(connParam); } @Override protected Connection buildConnection() { try { Class.forName("com.mysql.cj.jdbc.Driver"); } catch (ClassNotFoundException e) { e.printStackTrace(); throw new BizException(ResultCode.MYSQL_DRIVE_LOAD_ERR); } // 拼接jdbcUrl String jdbcUrl = "jdbc:mysql://%s:%s/%s?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"; final String ip = connParam.getIp(); final String port = connParam.getPort() + ""; final String dbName = connParam.getDbName(); final String username = connParam.getUsername(); final String password = connParam.getPassword(); // 格式化 jdbcUrl = String.format(jdbcUrl, ip, port, dbName); // 获取连接 try { Properties dbProperties = new Properties(); dbProperties.put("user", username); dbProperties.put("password", password); dbProperties.put("remarks", "true"); // 设置可以获取tables remarks信息 dbProperties.setProperty("useInformationSchema", "true"); Connection connection = DriverManager.getConnection(jdbcUrl,dbProperties); log.info("=====>>>获取mysql连接成功:username:{}, jdbcUrl: {}", username, jdbcUrl); return connection; } catch (SQLException e) { e.printStackTrace(); if (e.getMessage().contains("Unknown database")){ throw new BizException(ResultCode.CONN_UNKNOWN_DB_ERR); } throw new BizException(ResultCode.CONN_USER_PWD_ERR); } catch (Exception e) { throw new BizException(ResultCode.CONN_UNKNOWN_ERR); } }}
测试代码连接各自数据库
@SpringBootTest(classes = DbConnectionDemoApplication.class)@RunWith(value = SpringRunner.class)@Slf4jclass DbConnectionDemoApplicationTests { private DbConnPool<?> connPool = null; @Test public void testMysqlConn() throws InterruptedException { // 创建连接参数 final MysqlJdbcConnParam connParam = new MysqlJdbcConnParam(); final String ip = "localhost"; final Integer port = 3306; final String username = "root"; final String password = "root"; final String dbname = "test_db"; // 设置参数 connParam.setDriverName(Driver.class.getName()); connParam.setIp(ip); connParam.setPort(port); connParam.setUsername(username); connParam.setPassword(password); connParam.setDbName(dbname); // 创建连接池 connPool = new DbConnPool<>(connParam, 2); handler01(dbname, DbDialectEnum.MYSQL); new Thread(() -> handler01(dbname, DbDialectEnum.MYSQL)).start(); new Thread(() -> handler01(dbname, DbDialectEnum.MYSQL)).start(); Thread.sleep(60 * 1000); } @Test public void testOracleConn() throws InterruptedException { // 创建连接参数 final OracleJdbcConnParam connParam = new OracleJdbcConnParam(); final String ip = "你的Oracle的IP地址"; final Integer port = 1521; // 如果是admin账号 用户后面+ as sysdba final String username = "用户名"; final String password = "密码"; final String dbname = "实例/服务名"; // 设置参数 connParam.setDriverName(Driver.class.getName()); connParam.setIp(ip); connParam.setPort(port); connParam.setUsername(username); connParam.setPassword(password); connParam.setDbName(dbname); // 创建连接池 connPool = new DbConnPool<>(connParam, 2); final DbDialectEnum dbDialectEnum = DbDialectEnum.ORACLE; // 处理操作(oracle的schemaName就是用户名) handler01(username, dbDialectEnum); // 新建两个线程获取连接 new Thread(() -> handler01(username, dbDialectEnum)).start(); new Thread(() -> handler01(username, dbDialectEnum)).start(); Thread.sleep(60 * 1000); } @Test public void testHiveConn() throws InterruptedException { // 创建连接参数 final HiveJdbcConnParam connParam = new HiveJdbcConnParam(); final String ip = "连接的域名"; final Integer port = 10000; // 如果是admin账号 用户后面+ as sysdba final String username = "账号@域名"; final String password = ""; final String dbname = "数据库名"; final String principal = "hive/_HOST@域名"; final String kbr5FilePath = "C:\workspace\krb5.conf"; final String keytabFilePath = "C:\workspace\zhouyu.keytab"; // 设置参数 connParam.setDriverName(Driver.class.getName()); connParam.setIp(ip); connParam.setPort(port); connParam.setUsername(username); connParam.setPassword(password); connParam.setDbName(dbname); connParam.setEnableKerberos(true); connParam.setPrincipal(principal); connParam.setKbr5FilePath(kbr5FilePath); connParam.setKeytabFilePath(keytabFilePath); // 创建连接池 connPool = new DbConnPool<>(connParam, 2); final DbDialectEnum dbDialectEnum = DbDialectEnum.HIVE; // 处理操作(oracle的schemaName就是用户名) handler01(username, dbDialectEnum); // 新建两个线程获取连接 new Thread(() -> handler01(username, dbDialectEnum)).start(); new Thread(() -> handler01(username, dbDialectEnum)).start(); Thread.sleep(10 * 60 * 1000); } @Test public void testMaxComputeConn() throws InterruptedException { // 创建连接参数 final MaxComputeJdbcConnParam connParam = new MaxComputeJdbcConnParam(); String accessId = "你的阿里云accessId"; String accessKey = "你的阿里云accessKey"; String endpoint = "http://service.cn-chengdu.maxcompute.aliyun.com/api"; String projectName = "项目名=数据库名"; // 设置参数 connParam.setDriverName(Driver.class.getName()); connParam.setAliyunAccessId(accessId); connParam.setAliyunAccessKey(accessKey); connParam.setEndpoint(endpoint); connParam.setProjectName(projectName); // 创建连接池 connPool = new DbConnPool<>(connParam, 2); final DbDialectEnum dbDialectEnum = DbDialectEnum.MAX_COMPUTE; // 处理操作(oracle的schemaName就是用户名) handler01(projectName, dbDialectEnum); // 新建两个线程获取连接 new Thread(() -> handler01(projectName, dbDialectEnum)).start(); new Thread(() -> handler01(projectName, dbDialectEnum)).start(); Thread.sleep(60 * 1000); } private void handler01(String schemaName, DbDialectEnum dbDialectEnum) { final Connection connection = connPool.getConnection(); // 构建工具类 final SqlUtil sqlUtil = new SqlUtil(connection, dbDialectEnum.getCode()); // 获取表和注释 final List<TableMetaInfo> tables = sqlUtil.getTables(schemaName); log.info("===============获取所有表和注释开始==================="); log.info(tables.toString()); log.info("===============获取所有表和注释结束==================="); // 获取字段和注释 final String tableName = tables.get(0).getTableName(); final List<TableColumnMetaInfo> columns = sqlUtil.getColumnsByTableName(tableName); log.info("===============获取第一个表的字段和注释开始==================="); log.info(columns.toString()); log.info("===============获取第一个表的字段和注释结束==================="); final PageResult<LinkedHashMap<String, Object>> pageResult = sqlUtil.pageQueryMap("select * from " + tableName, 1, 10); log.info("===============SQL分页查询开始==================="); log.info("总数:{}", pageResult.getTotal()); log.info("记录数:{}", JSONObject.toJSONString(pageResult.getRows())); log.info("===============SQL分页查询结束==================="); connPool.freeConnection(connection); } @After public void close(){ if (connPool != null){ connPool.close(); log.info("==================连接池成功关闭================"); } }}
就是为了方便整合第三方数据源做数据源管理时比较重要(若有所需,私信“封装数据源源码”获取源码)。