我的需求是将ftp上面的文件进行下载入库,文件大概有两万多个文本,大概是36G,六张表总共2亿数据量。
过程中遇到的错误
Connection closed without indication. 错误1
421 Could not create socket . 错误2
原因:应该是下载文件数量比较多,下载一定数量以后网络不稳定,这个时候就需要重新连一下ftp
ftpClient.listFiles反应很慢
原因是因为默认缓冲区大小是1024,也就是1K,当然慢了,在调用上传API之前重新修改以下默认设置即可,如将缓冲区改为10M
ftpClient.setBufferSize(1024 * 1024 * 10)
FTP断点续传代码(公司ftp不支持多线程下载文件,可以设置多线程进行文件下载)
package sample.util.xxzf.sftp;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.util.Properties;
@Slf4j
public class FTPUtil {
/**
* FTP地址
**/
private static String ftpHost;
/**
* FTP端口
**/
private static int ftpPort;
/**
* FTP用户名
**/
private static String ftpUsername;
/**
* FTP密码
**/
private static String ftpPassword;
/**
* FTP协议里面,规定文件名编码为iso-8859-1
**/
private static String serverCharset = "ISO-8859-1";
/**
* UTF-8字符编码
**/
private static final String CHARSET_UTF8 = "UTF-8";
/**
* 设置缓冲区大小4M
**/
private static final int BUFFER_SIZE = 1024 * 1024 * 4;
/** 五分钟的毫秒数 */
private static final long TEN_MINUTE = 5 * 600 * 1000L;
public FTPClient ftpClient = null;
/**
* 加载ftp配置信息
*/
static {
try {
Properties properties = new Properties();
InputStream in = FTPUtil.class.getClassLoader().getResourceAsStream( "db.properties" );
properties.load( in );
ftpHost = properties.getProperty( "ftp_hostname" );
String port = properties.getProperty( "ftp_port" );
if (port != null && !"".equals( port )) {
ftpPort = Integer.valueOf( port );
}
ftpUsername = properties.getProperty( "ftp_username" );
ftpPassword = properties.getProperty( "ftp_password" );
in.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 下载整个文件夹
*
* @param ftpPath /usr/local/tmp
* @param localPath F:/bak/v1
* @return
*/
public static boolean downloadFolder(String ftpPath, String localPath) {
FTPClient ftpClient = null;
try {
// build client
ftpClient = buildClient();
File savePathFile = new File( localPath );
if (!savePathFile.exists()) {
savePathFile.mkdirs();
}
downloadDir(ftpClient, ftpPath, localPath);
return true;
} catch (Exception e) {
e.printStackTrace();
log.error( "ftp download fail:" + e.getMessage() );
} finally {
close( ftpClient );
}
return false;
}
/**
* 构造FTPClient
*
* @return
*/
private static FTPClient buildClient() throws Exception {
FTPClient ftpClient = new FTPClient();
if (null != ftpClient && ftpClient.isConnected() && ftpClient.isAvailable()) {
// 防止假卡死
ftpClient.enterLocalPassiveMode();
}
ftpClient = new FTPClient();
ftpClient.setControlEncoding( "GBK" );
try {
// 连接FTP服务器
ftpClient.connect( ftpHost, ftpPort );
// 登录FTP服务器
ftpClient.login( ftpUsername, ftpPassword );
// 是否登录成功
if (!FTPReply.isPositiveCompletion( ftpClient.getReplyCode() )) {
// 登录失败
log.info( "用户名或密码错误,登录FTP服务器失败:" + ftpHost + ":" + ftpPort );
// 关闭连接
close( ftpClient );
} else {
// 登录成功
log.info( "登录FTP服务器成功:" + ftpHost + ":" + ftpPort );
//设置被动模式
//ftpClient.enterLocalPassiveMode();
// 设置文件类型
ftpClient.setFileType( FTP.BINARY_FILE_TYPE );
// 防止假卡死
ftpClient.enterLocalPassiveMode();
// 中文支持
ftpClient.setControlEncoding( "GBK" );
// 服务器获取自身Ip地址和提交的host进行匹配
ftpClient.setRemoteVerificationEnabled( false );
// 设置连接超时时间
ftpClient.setConnectTimeout( 60 * 10000 );
// 设置数据传输超时时间
ftpClient.setDataTimeout( 2 * 60 * 10000 );
// 设置缓冲大小
ftpClient.setReceiveBufferSize( 10240 * 10240 );
ftpClient.setBufferSize( 10240 * 10240 );
}
} catch (Exception e) {
log.info( "无法登录FTP服务器:" + ftpHost + ":" + ftpPort );
e.printStackTrace();
}
return ftpClient;
}
/**
* 关闭FTP客户端
*
* @param ftpClient
*/
private static void close(FTPClient ftpClient) {
if (ftpClient != null) {
try {
ftpClient.logout();
ftpClient.disconnect();
} catch (IOException e) {
e.printStackTrace();
log.error( "ftp logout fail:" + e.getMessage() );
}
}
}
/**
* 功能描述: 下载目录下所有文件
*
* @param pathname
* FTP服务器文件目录 *
* @param localpath
* 下载后的文件路径 * void
* @param
*/
public static boolean downloadDir(FTPClient ftpClient, String pathname, String localpath) {
boolean flag = false;
int count=0;
try {
// 切换FTP目录
ftpClient.changeWorkingDirectory(pathname);
FTPFile[] ftpFiles = ftpClient.listFiles();
for (FTPFile file : ftpFiles) {
if (".".equals(file.getName()) || "..".equals(file.getName())) {
continue;
}
if (count==1000){
//当文件下载到一定数量以后就会报错,reponse 421 received closed connection 或者 出现 connection closed without indication
ftpClient = buildClient();
ftpClient.changeWorkingDirectory(pathname);
count=0;
}
count++;
FileOutputStream os = null;
long size = file.getSize();
File localFile = new File(localpath + "/" + file.getName());
if (localFile.length() == size) {
log.info("文件{}已下载完成", localFile.getName());
continue;
}
if (localFile != null && localFile.exists() && localFile.isFile() && localFile.length() > 0 && localFile.length() < size) {
// 需要断点续传
os = new FileOutputStream(localFile, true);
} else {
if (!checkFile(localFile)) {
return false;
}
os = new FileOutputStream(localFile);
}
log.info("正在下载文件:{},总大小:{}", file.getName(), size);
long start = System.currentTimeMillis();
try {
InputStream in = null;
try {
byte[] bytes = new byte[1024 * 32];
long step = size /100;
long process = 0L;
long localSize = localFile.length();
if (localSize > size) {
log.error("本地文件大于服务器文件,终止下载");
checkFile(localFile);
return false;
}
if (localSize > 0) {
ftpClient.setRestartOffset(localSize);
}
int c;
in = ftpClient.retrieveFileStream(file.getName());
if (in == null) {
log.info("连接异常,将退出登录");
try {
ftpClient.logout();
} catch (Exception e) {
log.debug("关闭连接错误", e); // 可以忽略的错误
}
try {
ftpClient.disconnect();
} catch (Exception e) {
log.debug("关闭连接错误", e); // 可以忽略的错误
}
ftpClient = null;
log.info("退出登录成功");
return false;
}
while((c = in.read(bytes))!= -1){
os.write(bytes, 0, c);
localSize += c;
long nowProcess=0;
if (step!=0){
nowProcess = localSize /step;
}
if(size > 50000000 && nowProcess > process){ // 大于50兆才显示进度
process = nowProcess;
log.info("{}%", process);
if (System.currentTimeMillis() - start > TEN_MINUTE) { // 大于指定时间
log.info("时间已到,未下载部分将在下次任务中下载");
try {
ftpClient.logout();
} catch (Exception e) {
log.debug("关闭连接错误", e); // 可以忽略的错误
}
try {
ftpClient.disconnect();
} catch (Exception e) {
log.debug("关闭连接错误", e); // 可以忽略的错误
}
return false;
}
}
}
log.info("文件下载完成到:{}", localpath + "/" + file.getName());
} catch (SocketTimeoutException e) {
log.info("下载出错:", e);
return false;
} catch (Exception e) {
log.error("下载出错:", e);
return false;
}
finally {
try {
in.close();
ftpClient.completePendingCommand(); // 如果没有这一句 的流是空的 ftpClient.retrieveFileStream(file.getName()); 只有这个文件读取结束以后才能调,不能重复调或者乱掉
} catch (Exception e) {
log.debug("关闭流错误", e); // 可以忽略的错误
}
}
} catch (Exception e) {
log.error("下载文件错误:{}", file.getName(), e);
return false;
} finally {
try {
os.close();
} catch (Exception e) {
log.debug("关闭连接异常"); // 可以忽略的错误类型
}
}
}
flag = true;
} catch (Exception e) {
log.error("下载文件错误:{}", pathname, e);
return false;
}
return flag;
}
/**
* 功能描述: 初始化文件
* @param localFile
* void
*/
private static boolean checkFile(File localFile) {
if (localFile.exists()) { // 已经存在则删除
localFile.delete();
}
// 然后再创建
localFile.mkdirs();
if (localFile.exists()) {
localFile.delete();
}
try {
localFile.createNewFile();
return true;
} catch (IOException e) {
log.error("创建文件失败", e);
return false;
}
}
}
对下载好的数据进行入库操作server层
package sample.service.yssjcj.ky;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import sample.service.yssjcj.ky.kp.ImportFileThread;
import sample.util.file.FilesUtil;
import sample.util.xxzf.sftp.FTPUtil;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
@Service
public class Kpbcs4Service {
@Value("${threads.size}")
private int threadsSie;
@Value("${threads.batch-size}")
private int batchSize;
public void ftpFile(String ftpPath, String localPath) {
//FTP下载,ftp目前应该设置的是不支持多线程下载,需要开启设置,
boolean b = FTPUtil.downloadFolder( ftpPath, localPath );
if (!b) {
return;
}
List<File> fileList = FilesUtil.getFileList( localPath );
if (fileList.size() > 0) {
//创建多线程
List<ListenableFuture<Integer>> futures = new ArrayList();
ExecutorService pool = Executors.newFixedThreadPool( threadsSie );
ListeningExecutorService executorService = MoreExecutors.listeningDecorator( pool );
for (File file : fileList) {
//判断是否是文件
if (!file.isFile()) {
continue;
}
// 创建线程
ImportFileThread importFileThread = new ImportFileThread( file.getPath(), batchSize );
// 添加到队列
futures.add( executorService.submit( importFileThread ) );
}
final ListenableFuture<List<Integer>> resultFuture = Futures.successfulAsList( futures );
try {
// 获取所有线程的执行结果,如果还未执行完毕,则处于等待状态,直至所有线程执行完毕
resultFuture.get();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (pool != null && !pool.isShutdown()) {
pool.shutdown();
}
}
}
}
}
线程方法层
package sample.service.yssjcj.ky.kp;
import lombok.extern.slf4j.Slf4j;
import sample.util.file.FilesUtil;
import java.util.concurrent.Callable;
@Slf4j
public class ImportFileThread implements Callable<Integer> {
// 文件全名
private String filePath;
private int batchSize;
public ImportFileThread(String filePath, int batchSize) {
this.filePath = filePath;
this.batchSize = batchSize;
}
@Override
public Integer call() {
importfile( filePath ,batchSize);
return 1;
}
public void importfile(String filePath,int batchSize) {
log.info( "入库的文件名称{}", filePath );
//读取文本数据。
FilesUtil.readTxtFile( filePath ,batchSize);
}
}
dao层,这个使用多线程读取文件,读一条addBatch();一条,需要批量提交。如果将所有一个文件的所有数据加载到内存中,会出现内存溢出问题。
package sample.util.file;
import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.formula.functions.Count;
import sample.mapper.yssjcj.ky.KpYrDao;
import sample.mapper.yssjcj.ky.KpYsDao;
import sample.mapper.yssjcj.ky.dao.KpKcs4Dao;
import sample.mapper.yssjcj.ky.dao.KpLrDao;
import sample.mapper.yssjcj.ky.dao.KpLsDao;
import sample.util.StringUtils;
import sample.util.db.DBConnect;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
@Slf4j
public class FilesUtil {
/**
* 递归读取文件夹内文件
* <p>
* fileList
* new一个list
* strPath
* 文件夹路径
*
* @return 文件夹内文件list
*/
public static List<File> getFileList(String strPath) {
List<File> fileList = new ArrayList<>();
File dir = new File( strPath );
File[] files = dir.listFiles(); // 该文件目录下文件全部放入数组
if (files != null) {
for (int i = 0; i < files.length; i++) {
fileList.add( files[i] );
}
}
return fileList;
}
/**
* 存在的问题就是对于超大型文本,会内存溢出
* 读取文本文件,每行按","分隔为数组,去掉空行。
* fileFullName 文件名称
* charsetName 编码方式
*/
public static void readTxtFile(String filePath, int batchSize) {
BufferedReader reader = null;
int count = 0;
File file = new File( filePath );
String fileName = file.getName();
PreparedStatement ps = null;
DBConnect dbc = new DBConnect( "daas" );
Connection conn = dbc.getConnection();
try {
log.info( "插入数据的表名【" + fileName + "】" );
// 数据库操作,TODO 多线程跑会不会关闭其他的数据库连接
reader = new BufferedReader( new InputStreamReader( new FileInputStream( filePath ), "GBK" ) );
if (fileName.contains( "LS" )) {
String inSql = "INSERT INTO KP_LS ";
inSql += "(CZID, SPCDM, CKH, JZRQ, PZTBZ, PJPH, SPRQ, FZID, FZHZM, FZSSJM, DZID, DZHZM, DZSSJM, JYZID, LC, SY1, SY2, SY3, LCTZ, KTTZ, ZZZID, ZZZHZM, XB, PB, CC, CCZZCC, DDQYDM, CCRQ, TSJJD1, TSJJDLC1, TSJJDPJ1, TSJJD2, TSJJDLC2, TSJJDPJ2, TSJJD3, TSJJDLC3, TSJJDPJ3, TSJJD4, TSJJDLC4, TSJJDPJ4, TSJJD5, TSJJDLC5, TSJJDPJ5, JCPJ, YZYJK, JJFDF, WPDPF, CZKTF, RPF, YHJ, PJHJ, ZZBCCCZCPJ, ZZYTPCC, ZZYTPCCRQ, ZZYTPCCZ, DZZFJYLSH, DZZFFPTKJYLSH, DPBZ, KPXSFWF, YDSPSXF, GSXPBS, GSYPDSPRQ, GSYPDSPCDM, GSYPDSPCKH, GSYPPH, KDWLDH, KDF, KDQYDM, GQCJ, GQTPF, GQJTK, BDH, SXJE, JLJSBZ, WJM, YF) ";
inSql += " VALUES(?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)";
ps = conn.prepareStatement( inSql );
String lineTempValue = "";
//读取第一行文本
while ((lineTempValue = reader.readLine()) != null) {
if (lineTempValue.split( "," ).length > 0 && !(lineTempValue.equals( "#" ))) {
String[] strAarray = StringUtils.substringBefore( lineTempValue, ";" ).split( "," );
KpLsDao lsDao = new KpLsDao();
lsDao.saveKpLs( strAarray, fileName, ps );
ps.addBatch();
/*
* 批量提交
* */
count++;
if (count % batchSize == 0) {
ps.executeBatch();
conn.commit();
ps.clearBatch();
}
}
}
//文件读取完也关闭
ps.executeBatch();
conn.commit();
ps.clearBatch();
log.info( "【{}】文件插入数据库的条数是【{}】", fileName, count );
} else if (fileName.contains( "LR" )) {
String inSql = "INSERT INTO KP_LR ";
inSql += "(YPDSPRQ, YPDSPCDM, YPDSPCKH, TPZID, TPCDM, TPCKH, TPRQ, TPSJ, JZRQ, TPPH, SPRQ, FZID, FZHZM, FZSSJM, DZID, DZHZM, DZSSJM, JYZID, LC, SY1, SY2, SY3, LCTZ, ZXTZ, ZZZID, ZZZHZM, XB, PB, CC, ZCC, DDQYDM, CCRQ, TPYY, TSJJD1, TSJJDLC1, TSJJDPJ1, TSJJD2, TSJJDLC2, TSJJDPJ2, TSJJD3, TSJJDLC3, TSJJDPJ3, TSJJD4, TSJJDLC4, TSJJDPJ4, TSJJD5, TSJJDLC5, TSJJDPJ5, JCPJ, YZYJK, JJFDF, WPDPF, CZKTF, RPF, YHJ, YSPJ, YTPJ, TPSXF, JTK, TPRS, SGTPBZ, BZCCSJ, DZZFJYLSH, DPBZ, KDWLDH, KDFTK, KDQYDM, DBH, TBJE, JLJSBZ,WJM,YF) ";
inSql += " VALUES(?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,?)";
ps = conn.prepareStatement( inSql );
String lineTempValue = "";
//读取第一行文本
while ((lineTempValue = reader.readLine()) != null) {
if (lineTempValue.split( "," ).length > 0 && !(lineTempValue.equals( "#" ))) {
String[] strAarray = StringUtils.substringBefore( lineTempValue, ";" ).split( "," );
KpLrDao kpLrDao = new KpLrDao();
kpLrDao.saveKpLr( strAarray, fileName, ps );
ps.addBatch();
/*
* 批量提交
* */
count++;
if (count % batchSize == 0) {
ps.executeBatch();
conn.commit();
ps.clearBatch();
}
}
}
if (count > 0) {
ps.executeBatch();
conn.commit();
ps.clearBatch();
}
log.info( "【{}】文件插入数据库的条数是【{}】", fileName, count );
} else if (fileName.contains( "kcs4" )) {
log.info( "线程名称" + Thread.currentThread().getName() + "文件名" + reader );
String inSql = "INSERT INTO KP_KCS4 ";
inSql += "(JYBZ, JZRQ, ZCHZM, SPCDM, ZS, ZFZS, FSRS, GMRS, ZZRS, ZJE, KDJS, KDF, SBZFS, FBFS, SSFS, XSBF,WJM,YF) ";
inSql += " VALUES(?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)";
ps = conn.prepareStatement( inSql );
String lineTempValue = "";
//读取第一行文本
while ((lineTempValue = reader.readLine()) != null) {
if (lineTempValue.split( "," ).length > 0 && !(lineTempValue.equals( "#" ))) {
String[] strAarray = StringUtils.substringBefore( lineTempValue, ";" ).split( "," );
KpKcs4Dao kpKcs4Dao = new KpKcs4Dao();
kpKcs4Dao.saveKpKcs4( strAarray, fileName, ps );
ps.addBatch();
/*
* 批量提交
* */
count++;
if (count % batchSize == 0) {
ps.executeBatch();
conn.commit();
ps.clearBatch();
}
}
}
if (count > 0) {
ps.executeBatch();
conn.commit();
ps.clearBatch();
}
log.info( "【{}】文件插入数据库的条数是【{}】", fileName, count );
} else if (fileName.contains( "YS" )) {
String inSql = "INSERT INTO KYPG_LS ";
inSql += "(YF, WJM, SPBZ, WJRQ, SFCC, CZID, SPCDM, CKH, JZRQ, PZTBZ, PH, SPRQ, FZID, FZ, FZJ, DZID, DZ, DZJ, JYZID, LC, SY1, SY2, SY3, LCTZ, KTTZ, ZZZID, ZZZ, XB, PB, CC, CCZCC, DDQYDM, CCRQ, TSJJ1, TSJJLC1, TSJJPJ1, TSJJ2, TSJJLC2, TSJJPJ2, TSJJ3, TSJJLC3, TSJJPJ3, TSJJ4, TSJJLC4, TSJJPJ4, TSJJ5, TSJJLC5, TSJJPJ5, JCPJ, YZYJK, JJFDF, WPDPF, CZKTF, RPF, YHJ, PJHJ, ZZBCPJ, YTPCC, YTPCCRQ, YTPCCZ, DZLSH, FPTKLSH, DPBZ, XSFWF, YDPSXF, GSXPBS, YPSPRQ, YPSPCDM, YPSPCKH, YPPH, KDWLDH, KDF, KDQYDM, GQCJ, GQTPF, GQTPK, BDH, SBJ, ZXBS, JCPJ_S, WPDPF7_S, WPDPF3_S, CZKTF_S, RPF_S, JCPJ_J, WPDPF7_J, WPDPF3_J, CZKTF_J, RPF_J, PJHJ_J, GQTPF_J, YL1, ZXPJ, KJPJPJ,YF) ";
inSql += " VALUES(?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)";
ps = conn.prepareStatement( inSql );
String lineTempValue = "";
//读取第一行文本
while ((lineTempValue = reader.readLine()) != null) {
if (lineTempValue.split( "," ).length > 0 && !(lineTempValue.equals( "#" ))) {
String[] strAarray = StringUtils.substringBefore( lineTempValue, ";" ).split( "," );
KpYsDao kpYsDao = new KpYsDao();
kpYsDao.saveKpYs( strAarray, fileName, ps );
ps.addBatch();
/*
* 批量提交
* */
count++;
if (count % batchSize == 0) {
ps.executeBatch();
conn.commit();
ps.clearBatch();
}
}
}
if (count > 0) {
ps.executeBatch();
conn.commit();
ps.clearBatch();
}
log.info( "【{}】文件插入数据库的条数是【{}】", fileName, count );
} else if (fileName.contains( "YR" )) {
String inSql = "INSERT INTO KP_LR ";
inSql += "(YPDSPRQ, YPDSPCDM, YPDSPCKH, TPZID, TPCDM, TPCKH, TPRQ, TPSJ, JZRQ, TPPH, SPRQ, FZID, FZHZM, FZSSJM, DZID, DZHZM, DZSSJM, JYZID, LC, SY1, SY2, SY3, LCTZ, ZXTZ, ZZZID, ZZZHZM, XB, PB, CC, ZCC, DDQYDM, CCRQ, TPYY, TSJJD1, TSJJDLC1, TSJJDPJ1, TSJJD2, TSJJDLC2, TSJJDPJ2, TSJJD3, TSJJDLC3, TSJJDPJ3, TSJJD4, TSJJDLC4, TSJJDPJ4, TSJJD5, TSJJDLC5, TSJJDPJ5, JCPJ, YZYJK, JJFDF, WPDPF, CZKTF, RPF, YHJ, YSPJ, YTPJ, TPSXF, JTK, TPRS, SGTPBZ, BZCCSJ, DZZFJYLSH, DPBZ, KDWLDH, KDFTK, KDQYDM, DBH, TBJE, JLJSBZ,WJM,YF) ";
inSql += " VALUES(?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,?)";
ps = conn.prepareStatement( inSql );
String lineTempValue = "";
//读取第一行文本
while ((lineTempValue = reader.readLine()) != null) {
if (lineTempValue.split( "," ).length > 0 && !(lineTempValue.equals( "#" ))) {
String[] strAarray = StringUtils.substringBefore( lineTempValue, ";" ).split( "," );
KpYrDao kpYrDao = new KpYrDao();
kpYrDao.saveKpYr( strAarray, fileName, ps );
ps.addBatch();
/*
* 批量提交
* */
count++;
if (count % batchSize == 0) {
ps.executeBatch();
conn.commit();
ps.clearBatch();
}
}
}
if (count > 0) {
ps.executeBatch();
conn.commit();
ps.clearBatch();
}
log.info( "【{}】文件插入数据库的条数是【{}】", fileName, count );
}
// 关闭
reader.close();//TODO 是否关闭文件,这里关闭跟下面关闭的区别,这里关闭文件会不会关闭错误;
} catch (IOException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
} finally {
try {
if (null != ps) {
ps.close();
}
if (null != conn) {
conn.close();
}
} catch (Exception e) {
log.error( e.getMessage(), e );
}
}
}
public static int lineNumber(String filePath) {
int lineNumber = 0;
try {
FileReader fileReader = new FileReader( new File( filePath ) );
LineNumberReader lineNumberReader = new LineNumberReader( fileReader );
lineNumberReader.skip( Long.MAX_VALUE );
lineNumber = lineNumberReader.getLineNumber();
} catch (IOException e) {
e.printStackTrace();
}
return lineNumber;
}
public static void main(String[] args) {
String fileName = "kcs40201.00P";
System.out.println( fileName.contains( "kcs4" ) );
}
}
数据库连接方法
package sample.util.db;
import com.alibaba.druid.pool.DruidDataSourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.sql.DataSource;
import java.io.InputStream;
import java.sql.*;
import java.util.Hashtable;
import java.util.Properties;
/**
* @Description
* @Author chenfei
* @Date 2024/3/20 17:47
*/
public class DBConnect { private Connection conn = null;
static Logger logger = LoggerFactory.getLogger(DBConnect.class.getName());
private static DataSource dataSource;
public static Properties dbProps = null;
public static String dbName;
public static Hashtable<String, DataSource> hmDataSource = new Hashtable<String, DataSource>();
static {
// 项目执行使用
InputStream is = DBConnect.class.getClassLoader().getResourceAsStream("db.properties");
//本地执行使用
// File file = new File("src/main/resources/db.properties");
// InputStream is = new FileInputStream(file);
dbProps = new Properties();
try {
dbProps.load(is);
dbName = dbProps.getProperty("defaultDb");
} catch (Exception e) {
logger.error("不能读取属性文件. " + "请确保db.properties在CLASSPATH指定的路径中");
}
}
public static DataSource getdataSource() {
return dataSource;
}
/**
* 构造数据库的连接和访问类
*/
public DBConnect(String dbName) {
initializeDataSource(dbName);
}
/**
* 无参数构造方法,默认找ldtj数据源
*/
public DBConnect() {
initializeDataSource(dbName);
}
private void initializeDataSource(String dbName) {
dataSource = hmDataSource.get(dbName);
if (dataSource == null)// 建立数据库连接
{
String dbPool = dbProps.getProperty(dbName + ".dbPool");
if (dbPool.equalsIgnoreCase("weblogic")) {
String weblogic_pool = dbProps.getProperty(dbName + ".weblogic.poolName");
Context ctx = null;
Hashtable<Object, Object> ht = new Hashtable<Object, Object>();
ht.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
// ht.put(Context.PROVIDER_URL, "t3://10.1.3.49:7003");
try {
ctx = new InitialContext(ht);
dataSource = (DataSource) ctx.lookup(weblogic_pool);
} catch (Exception e) {
logger.error("数据库连接失败");
e.printStackTrace();
}
} else if (dbPool.equalsIgnoreCase("druid")) {
try {
Hashtable<String, String> map = new Hashtable<String, String>();
map.put("driverClassName", dbProps.getProperty(dbName + ".driverClassName"));
map.put("url", dbProps.getProperty(dbName + ".url"));
map.put("username", dbProps.getProperty(dbName + ".username"));
map.put("password", dbProps.getProperty(dbName + ".password"));
map.put("initialSize", dbProps.getProperty(dbName + ".initialSize"));
map.put("maxActive", dbProps.getProperty(dbName + ".maxActive"));
map.put("timeBetweenEvictionRunsMillis", "60000");
map.put("minEvictableIdleTimeMillis", "300000");
if (map.get("driverClassName").indexOf("oracle") != -1) {
map.put("validationQuery", "SELECT 1 FROM DUAL");
map.put("testWhileIdle", "true");
map.put("testOnBorrow", "false");
map.put("testOnReturn", "false");
}
// map.put("poolPreparedStatements", "true");
// map.put("maxPoolPreparedStatementPerConnectionSize",
// "50");
map.put("removeAbandoned", "true");
map.put("removeAbandonedTimeout", "1800");
map.put("logAbandoned", "true");
// DruidDataSourceFactory.createDataSource(map).getConnection();
dataSource = DruidDataSourceFactory.createDataSource(map);
} catch (Exception e) {
dataSource = null;
logger.error("数据库连接失败");
e.printStackTrace();
}
}
hmDataSource.put(dbName, dataSource);
}
try {
conn = hmDataSource.get(dbName).getConnection();
} catch (SQLException e) {
logger.error("数据库连接失败:" + e.getMessage());
e.printStackTrace();
}
}
public Connection getConnection() {
return conn;
}
public void close() {
try {
conn.close();
conn = null;
} catch (SQLException e) {
e.printStackTrace();
}
}
public static void closeConn(Connection conn, PreparedStatement ps, Statement st, ResultSet rs) {
try {
if (null != rs) {
rs.close();
}
if (null != st) {
st.close();
}
if (null != ps) {
ps.close();
}
if (null != conn) {
conn.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
public static void closeConn(Connection conn, PreparedStatement pst) {
DBConnect.closeConn(conn, pst, null, null);
}
public static void closePst(PreparedStatement pst) {
DBConnect.closeConn(null, pst, null, null);
}
public static void main(String[] args) {
DBConnect dbConnect = new DBConnect("jcpt");
System.out.println(dbConnect.getConnection());
}
/**
* @param dbUrl: 数据库 IP:port/dbname
* @param username: 用户名
* @param password: 密码
* @description 通过JDBC获取数据库连接
* @return: java.sql.Connection
* @author Xin.Lu
* @date 2020/8/11 16:18
*/
public static Connection getConneciton(String dbUrl, String username, String password) {
Connection conn = null;
try {
Class.forName("oracle.jdbc.driver.OracleDriver");
conn = DriverManager.getConnection("jdbc:oracle:thin:@" + dbUrl, username, password);
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
}
return conn;
}
/**
* @param i:
* @param size:
* @description 判断是否需要提交
* @return: boolean
* @author Xin.Lu
* @date 2020/8/12 16:40
*/
public static boolean isNeedCommit(int i, int size) {
if ((i != 0 && i % 1000 == 0) || (i == size - 1)) {
return true;
} else {
return false;
}
}
}