FTP超大文件下载入库--断点续传发

我的需求是将ftp上面的文件进行下载入库,文件大概有两万多个文本,大概是36G,六张表总共2亿数据量。

过程中遇到的错误

Connection closed without indication.   错误1 
421 Could not create socket . 错误2
原因:应该是下载文件数量比较多,下载一定数量以后网络不稳定,这个时候就需要重新连一下ftp
ftpClient.listFiles反应很慢

     原因是因为默认缓冲区大小是1024,也就是1K,当然慢了,在调用上传API之前重新修改以下默认设置即可,如将缓冲区改为10M

ftpClient.setBufferSize(1024 * 1024 * 10)

FTP断点续传代码(公司ftp不支持多线程下载文件,可以设置多线程进行文件下载)

package sample.util.xxzf.sftp;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.util.Properties;


@Slf4j
public class FTPUtil {

    /**
     * FTP地址
     **/
    private static String ftpHost;

    /**
     * FTP端口
     **/
    private static int ftpPort;

    /**
     * FTP用户名
     **/
    private static String ftpUsername;

    /**
     * FTP密码
     **/
    private static String ftpPassword;

    /**
     * FTP协议里面,规定文件名编码为iso-8859-1
     **/
    private static String serverCharset = "ISO-8859-1";

    /**
     * UTF-8字符编码
     **/
    private static final String CHARSET_UTF8 = "UTF-8";

    /**
     * 设置缓冲区大小4M
     **/
    private static final int BUFFER_SIZE = 1024 * 1024 * 4;

    /** 五分钟的毫秒数 */
    private static final long TEN_MINUTE = 5 * 600 * 1000L;


    public FTPClient ftpClient = null;
    /**
     * 加载ftp配置信息
     */
    static {
        try {
            Properties properties = new Properties();
            InputStream in = FTPUtil.class.getClassLoader().getResourceAsStream( "db.properties" );
            properties.load( in );
            ftpHost = properties.getProperty( "ftp_hostname" );
            String port = properties.getProperty( "ftp_port" );
            if (port != null && !"".equals( port )) {
                ftpPort = Integer.valueOf( port );
            }
            ftpUsername = properties.getProperty( "ftp_username" );
            ftpPassword = properties.getProperty( "ftp_password" );
            in.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }





    /**
     * 下载整个文件夹
     *
     * @param ftpPath   /usr/local/tmp
     * @param localPath F:/bak/v1
     * @return
     */
    public static boolean downloadFolder(String ftpPath, String localPath) {
        FTPClient ftpClient = null;
        try {
            // build client
           ftpClient = buildClient();

            File savePathFile = new File( localPath );
            if (!savePathFile.exists()) {
                savePathFile.mkdirs();
            }
            downloadDir(ftpClient, ftpPath, localPath);

            return true;
        } catch (Exception e) {
            e.printStackTrace();
            log.error( "ftp download fail:" + e.getMessage() );
        } finally {

           close( ftpClient );
        }
        return false;
    }



    /**
     * 构造FTPClient
     *
     * @return
     */
    private static FTPClient buildClient() throws Exception {
        FTPClient ftpClient = new FTPClient();

        if (null != ftpClient && ftpClient.isConnected() && ftpClient.isAvailable()) {
            // 防止假卡死
            ftpClient.enterLocalPassiveMode();
        }

        ftpClient = new FTPClient();
        ftpClient.setControlEncoding( "GBK" );

        try {
            // 连接FTP服务器
            ftpClient.connect( ftpHost, ftpPort );
            // 登录FTP服务器
            ftpClient.login( ftpUsername, ftpPassword );
            // 是否登录成功
            if (!FTPReply.isPositiveCompletion( ftpClient.getReplyCode() )) {
                // 登录失败
                log.info( "用户名或密码错误,登录FTP服务器失败:" + ftpHost + ":" + ftpPort );
                // 关闭连接
                close( ftpClient );

            } else {
                // 登录成功
                log.info( "登录FTP服务器成功:" + ftpHost + ":" + ftpPort );
                //设置被动模式
                //ftpClient.enterLocalPassiveMode();
                // 设置文件类型
                ftpClient.setFileType( FTP.BINARY_FILE_TYPE );
                // 防止假卡死
                ftpClient.enterLocalPassiveMode();
                // 中文支持
                ftpClient.setControlEncoding( "GBK" );
                // 服务器获取自身Ip地址和提交的host进行匹配
                ftpClient.setRemoteVerificationEnabled( false );
                // 设置连接超时时间
                ftpClient.setConnectTimeout( 60 * 10000 );
                // 设置数据传输超时时间
                ftpClient.setDataTimeout( 2 * 60 * 10000 );
                // 设置缓冲大小
                ftpClient.setReceiveBufferSize( 10240 * 10240 );
                ftpClient.setBufferSize( 10240 * 10240 );
            }

        } catch (Exception e) {
            log.info( "无法登录FTP服务器:" + ftpHost + ":" + ftpPort );
            e.printStackTrace();
        }


        return ftpClient;
    }

    /**
     * 关闭FTP客户端
     *
     * @param ftpClient
     */
    private static void close(FTPClient ftpClient) {
        if (ftpClient != null) {
            try {
                ftpClient.logout();
                ftpClient.disconnect();
            } catch (IOException e) {
                e.printStackTrace();
                log.error( "ftp logout fail:" + e.getMessage() );
            }
        }
    }

    /**
     * 功能描述: 下载目录下所有文件 
     *
     * @param pathname
     *            FTP服务器文件目录 *
     * @param localpath
     *            下载后的文件路径 * void
     * @param
     */
    public static boolean downloadDir(FTPClient ftpClient, String pathname, String localpath) {
        boolean flag = false;
        int count=0;
        try {
            // 切换FTP目录
            ftpClient.changeWorkingDirectory(pathname);
            FTPFile[] ftpFiles = ftpClient.listFiles();
            for (FTPFile file : ftpFiles) {
                if (".".equals(file.getName()) || "..".equals(file.getName())) {
                    continue;
                }
                if (count==1000){
                    //当文件下载到一定数量以后就会报错,reponse 421  received closed connection 或者 出现 connection closed without indication
                    ftpClient = buildClient();
                    ftpClient.changeWorkingDirectory(pathname);
                    count=0;
                }
                count++;
                FileOutputStream os = null;
                long size = file.getSize();
                File localFile = new File(localpath + "/" + file.getName());
                if (localFile.length() == size) {
                    log.info("文件{}已下载完成", localFile.getName());
                    continue;
                }

                if (localFile != null && localFile.exists() && localFile.isFile() && localFile.length() > 0 && localFile.length() < size) {
                    // 需要断点续传
                    os = new FileOutputStream(localFile, true);
                } else {
                    if (!checkFile(localFile)) {
                        return false;
                    }
                    os = new FileOutputStream(localFile);
                }
                log.info("正在下载文件:{},总大小:{}", file.getName(), size);
                long start = System.currentTimeMillis();
                try {
                    InputStream in = null;
                    try {
                        byte[] bytes = new byte[1024 * 32];
                        long step = size /100;
                        long process = 0L;
                        long localSize = localFile.length();
                        if (localSize > size) {
                            log.error("本地文件大于服务器文件,终止下载");
                            checkFile(localFile);
                            return false;
                        }
                        if (localSize > 0) {
                            ftpClient.setRestartOffset(localSize);
                        }
                        int c;
                        in = ftpClient.retrieveFileStream(file.getName());
                        if (in == null) {
                            log.info("连接异常,将退出登录");
                            try {
                                ftpClient.logout();
                            } catch (Exception e) {
                                log.debug("关闭连接错误", e); // 可以忽略的错误
                            }
                            try {
                                ftpClient.disconnect();
                            } catch (Exception e) {
                                log.debug("关闭连接错误", e); // 可以忽略的错误
                            }
                            ftpClient = null;
                            log.info("退出登录成功");
                            return false;
                        }

                        while((c = in.read(bytes))!= -1){
                            os.write(bytes, 0, c);
                            localSize += c;
                            long nowProcess=0;
                            if (step!=0){
                                nowProcess = localSize /step;
                            }

                            if(size > 50000000 && nowProcess > process){ // 大于50兆才显示进度
                                process = nowProcess;
                                log.info("{}%", process);
                                if (System.currentTimeMillis() - start > TEN_MINUTE) { // 大于指定时间
                                    log.info("时间已到,未下载部分将在下次任务中下载");
                                    try {
                                        ftpClient.logout();
                                    } catch (Exception e) {
                                        log.debug("关闭连接错误", e); // 可以忽略的错误
                                    }
                                    try {
                                        ftpClient.disconnect();
                                    } catch (Exception e) {
                                        log.debug("关闭连接错误", e); // 可以忽略的错误
                                    }
                                    return false;
                                }
                            }
                        }
                        log.info("文件下载完成到:{}", localpath + "/" + file.getName());
                    } catch (SocketTimeoutException e) {
                        log.info("下载出错:", e);
                        return false;
                    } catch (Exception e) {
                        log.error("下载出错:", e);
                        return false;
                    }
                    finally {
                        try {
                            in.close();
                            ftpClient.completePendingCommand();  // 如果没有这一句 的流是空的 ftpClient.retrieveFileStream(file.getName());  只有这个文件读取结束以后才能调,不能重复调或者乱掉
                        } catch (Exception e) {
                            log.debug("关闭流错误", e); // 可以忽略的错误
                        }
                    }
                } catch (Exception e) {
                    log.error("下载文件错误:{}", file.getName(), e);
                    return false;
                } finally {
                    try {
                        os.close();
                    } catch (Exception e) {
                        log.debug("关闭连接异常"); // 可以忽略的错误类型
                    }
                }
            }
            flag = true;
        } catch (Exception e) {
            log.error("下载文件错误:{}", pathname, e);
            return false;
        }
        return flag;
    }

    /**
     * 功能描述:  初始化文件
     * @param localFile
     * void
     */
    private static boolean checkFile(File localFile) {
        if (localFile.exists()) { // 已经存在则删除
            localFile.delete();
        }
        // 然后再创建
        localFile.mkdirs();
        if (localFile.exists()) {
            localFile.delete();
        }
        try {
            localFile.createNewFile();
            return true;
        } catch (IOException e) {
            log.error("创建文件失败", e);
            return false;
        }
    }

}

对下载好的数据进行入库操作server层

package sample.service.yssjcj.ky;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import sample.service.yssjcj.ky.kp.ImportFileThread;
import sample.util.file.FilesUtil;
import sample.util.xxzf.sftp.FTPUtil;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


@Slf4j
@Service
public class Kpbcs4Service {

    @Value("${threads.size}")
    private int threadsSie;


    @Value("${threads.batch-size}")
    private int batchSize;


    public void ftpFile(String ftpPath, String localPath) {

        //FTP下载,ftp目前应该设置的是不支持多线程下载,需要开启设置,
        boolean b = FTPUtil.downloadFolder( ftpPath, localPath );
        if (!b) {
            return;
        }
        List<File> fileList = FilesUtil.getFileList( localPath );
        if (fileList.size() > 0) {
            //创建多线程
            List<ListenableFuture<Integer>> futures = new ArrayList();
            ExecutorService pool = Executors.newFixedThreadPool( threadsSie );
            ListeningExecutorService executorService = MoreExecutors.listeningDecorator( pool );
            for (File file : fileList) {
                //判断是否是文件
                if (!file.isFile()) {
                    continue;
                }
                // 创建线程
                ImportFileThread importFileThread = new ImportFileThread( file.getPath(), batchSize );
                // 添加到队列
                futures.add( executorService.submit( importFileThread ) );

            }
            final ListenableFuture<List<Integer>> resultFuture = Futures.successfulAsList( futures );
            try {
                // 获取所有线程的执行结果,如果还未执行完毕,则处于等待状态,直至所有线程执行完毕
                resultFuture.get();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (pool != null && !pool.isShutdown()) {
                    pool.shutdown();
                }
            }
        }
    }
}

线程方法层

package sample.service.yssjcj.ky.kp;

import lombok.extern.slf4j.Slf4j;
import sample.util.file.FilesUtil;

import java.util.concurrent.Callable;


@Slf4j
public class ImportFileThread implements Callable<Integer> {


    // 文件全名
    private String filePath;

    private int batchSize;

    public ImportFileThread(String filePath, int batchSize) {
        this.filePath = filePath;
        this.batchSize = batchSize;

    }

    @Override
    public Integer call() {
        importfile( filePath ,batchSize);
        return 1;
    }

    public void importfile(String filePath,int batchSize) {
        log.info( "入库的文件名称{}", filePath );
        //读取文本数据。
        FilesUtil.readTxtFile( filePath ,batchSize);
    }


}

dao层,这个使用多线程读取文件,读一条addBatch();一条,需要批量提交。如果将所有一个文件的所有数据加载到内存中,会出现内存溢出问题。

package sample.util.file;


import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.formula.functions.Count;
import sample.mapper.yssjcj.ky.KpYrDao;
import sample.mapper.yssjcj.ky.KpYsDao;
import sample.mapper.yssjcj.ky.dao.KpKcs4Dao;
import sample.mapper.yssjcj.ky.dao.KpLrDao;
import sample.mapper.yssjcj.ky.dao.KpLsDao;
import sample.util.StringUtils;
import sample.util.db.DBConnect;

import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;


@Slf4j
public class FilesUtil {

    /**
     * 递归读取文件夹内文件
     * <p>
     * fileList
     * new一个list
     * strPath
     * 文件夹路径
     *
     * @return 文件夹内文件list
     */
    public static List<File> getFileList(String strPath) {
        List<File> fileList = new ArrayList<>();
        File dir = new File( strPath );
        File[] files = dir.listFiles(); // 该文件目录下文件全部放入数组
        if (files != null) {
            for (int i = 0; i < files.length; i++) {
                fileList.add( files[i] );
            }
        }
        return fileList;
    }


    /**
     * 存在的问题就是对于超大型文本,会内存溢出
     * 读取文本文件,每行按","分隔为数组,去掉空行。
     * fileFullName 文件名称
     * charsetName  编码方式
     */
    public static void readTxtFile(String filePath, int batchSize) {
        BufferedReader reader = null;
        int count = 0;
        File file = new File( filePath );
        String fileName = file.getName();
        PreparedStatement ps = null;
        DBConnect dbc = new DBConnect( "daas" );
        Connection conn = dbc.getConnection();
        try {
            log.info( "插入数据的表名【" + fileName + "】" );
            // 数据库操作,TODO 多线程跑会不会关闭其他的数据库连接

            reader = new BufferedReader( new InputStreamReader( new FileInputStream( filePath ), "GBK" ) );
            if (fileName.contains( "LS" )) {

                String inSql = "INSERT INTO KP_LS  ";
                inSql += "(CZID, SPCDM, CKH, JZRQ, PZTBZ, PJPH, SPRQ, FZID, FZHZM, FZSSJM, DZID, DZHZM, DZSSJM, JYZID, LC, SY1, SY2, SY3, LCTZ, KTTZ, ZZZID, ZZZHZM, XB, PB, CC, CCZZCC, DDQYDM, CCRQ, TSJJD1, TSJJDLC1, TSJJDPJ1, TSJJD2, TSJJDLC2, TSJJDPJ2, TSJJD3, TSJJDLC3, TSJJDPJ3, TSJJD4, TSJJDLC4, TSJJDPJ4, TSJJD5, TSJJDLC5, TSJJDPJ5, JCPJ, YZYJK, JJFDF, WPDPF, CZKTF, RPF, YHJ, PJHJ, ZZBCCCZCPJ, ZZYTPCC, ZZYTPCCRQ, ZZYTPCCZ, DZZFJYLSH, DZZFFPTKJYLSH, DPBZ, KPXSFWF, YDSPSXF, GSXPBS, GSYPDSPRQ, GSYPDSPCDM, GSYPDSPCKH, GSYPPH, KDWLDH, KDF, KDQYDM, GQCJ, GQTPF, GQJTK, BDH, SXJE, JLJSBZ, WJM, YF) ";
                inSql += "  VALUES(?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)";
                ps = conn.prepareStatement( inSql );
                String lineTempValue = "";
                //读取第一行文本
                while ((lineTempValue = reader.readLine()) != null) {
                    if (lineTempValue.split( "," ).length > 0 && !(lineTempValue.equals( "#" ))) {
                        String[] strAarray = StringUtils.substringBefore( lineTempValue, ";" ).split( "," );
                        KpLsDao lsDao = new KpLsDao();
                        lsDao.saveKpLs( strAarray, fileName, ps );
                        ps.addBatch();
                        /*
                         * 批量提交
                         * */
                        count++;
                        if (count % batchSize == 0) {
                            ps.executeBatch();
                            conn.commit();
                            ps.clearBatch();
                        }
                    }
                }
                //文件读取完也关闭
                ps.executeBatch();
                conn.commit();
                ps.clearBatch();
                log.info( "【{}】文件插入数据库的条数是【{}】", fileName, count );
            } else if (fileName.contains( "LR" )) {
                String inSql = "INSERT INTO KP_LR  ";
                inSql += "(YPDSPRQ, YPDSPCDM, YPDSPCKH, TPZID, TPCDM, TPCKH, TPRQ, TPSJ, JZRQ, TPPH, SPRQ, FZID, FZHZM, FZSSJM, DZID, DZHZM, DZSSJM, JYZID, LC, SY1, SY2, SY3, LCTZ, ZXTZ, ZZZID, ZZZHZM, XB, PB, CC, ZCC, DDQYDM, CCRQ, TPYY, TSJJD1, TSJJDLC1, TSJJDPJ1, TSJJD2, TSJJDLC2, TSJJDPJ2, TSJJD3, TSJJDLC3, TSJJDPJ3, TSJJD4, TSJJDLC4, TSJJDPJ4, TSJJD5, TSJJDLC5, TSJJDPJ5, JCPJ, YZYJK, JJFDF, WPDPF, CZKTF, RPF, YHJ, YSPJ, YTPJ, TPSXF, JTK, TPRS, SGTPBZ, BZCCSJ, DZZFJYLSH, DPBZ, KDWLDH, KDFTK, KDQYDM, DBH, TBJE, JLJSBZ,WJM,YF)   ";
                inSql += "  VALUES(?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,?)";
                ps = conn.prepareStatement( inSql );
                String lineTempValue = "";
                //读取第一行文本
                while ((lineTempValue = reader.readLine()) != null) {
                    if (lineTempValue.split( "," ).length > 0 && !(lineTempValue.equals( "#" ))) {
                        String[] strAarray = StringUtils.substringBefore( lineTempValue, ";" ).split( "," );
                        KpLrDao kpLrDao = new KpLrDao();
                        kpLrDao.saveKpLr( strAarray, fileName, ps );
                        ps.addBatch();
                        /*
                         * 批量提交
                         * */
                        count++;
                        if (count % batchSize == 0) {
                            ps.executeBatch();
                            conn.commit();
                            ps.clearBatch();
                        }
                    }
                }
                if (count > 0) {
                    ps.executeBatch();
                    conn.commit();
                    ps.clearBatch();
                }
                log.info( "【{}】文件插入数据库的条数是【{}】", fileName, count );
            } else if (fileName.contains( "kcs4" )) {
                log.info( "线程名称" + Thread.currentThread().getName() + "文件名" + reader );
                String inSql = "INSERT INTO KP_KCS4  ";
                inSql += "(JYBZ, JZRQ, ZCHZM, SPCDM, ZS, ZFZS, FSRS, GMRS, ZZRS, ZJE, KDJS, KDF, SBZFS, FBFS, SSFS, XSBF,WJM,YF)  ";
                inSql += "  VALUES(?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)";
                ps = conn.prepareStatement( inSql );

                String lineTempValue = "";
                //读取第一行文本
                while ((lineTempValue = reader.readLine()) != null) {
                    if (lineTempValue.split( "," ).length > 0 && !(lineTempValue.equals( "#" ))) {
                        String[] strAarray = StringUtils.substringBefore( lineTempValue, ";" ).split( "," );
                        KpKcs4Dao kpKcs4Dao = new KpKcs4Dao();
                        kpKcs4Dao.saveKpKcs4( strAarray, fileName, ps );
                        ps.addBatch();
                        /*
                         * 批量提交
                         * */
                        count++;
                        if (count % batchSize == 0) {
                            ps.executeBatch();
                            conn.commit();
                            ps.clearBatch();
                        }
                    }
                }
                if (count > 0) {
                    ps.executeBatch();
                    conn.commit();
                    ps.clearBatch();

                }
                log.info( "【{}】文件插入数据库的条数是【{}】", fileName, count );
            } else if (fileName.contains( "YS" )) {

                String inSql = "INSERT INTO KYPG_LS  ";
                inSql += "(YF, WJM, SPBZ, WJRQ, SFCC, CZID, SPCDM, CKH, JZRQ, PZTBZ, PH, SPRQ, FZID, FZ, FZJ, DZID, DZ, DZJ, JYZID, LC, SY1, SY2, SY3, LCTZ, KTTZ, ZZZID, ZZZ, XB, PB, CC, CCZCC, DDQYDM, CCRQ, TSJJ1, TSJJLC1, TSJJPJ1, TSJJ2, TSJJLC2, TSJJPJ2, TSJJ3, TSJJLC3, TSJJPJ3, TSJJ4, TSJJLC4, TSJJPJ4, TSJJ5, TSJJLC5, TSJJPJ5, JCPJ, YZYJK, JJFDF, WPDPF, CZKTF, RPF, YHJ, PJHJ, ZZBCPJ, YTPCC, YTPCCRQ, YTPCCZ, DZLSH, FPTKLSH, DPBZ, XSFWF, YDPSXF, GSXPBS, YPSPRQ, YPSPCDM, YPSPCKH, YPPH, KDWLDH, KDF, KDQYDM, GQCJ, GQTPF, GQTPK, BDH, SBJ, ZXBS, JCPJ_S, WPDPF7_S, WPDPF3_S, CZKTF_S, RPF_S, JCPJ_J, WPDPF7_J, WPDPF3_J, CZKTF_J, RPF_J, PJHJ_J, GQTPF_J, YL1, ZXPJ, KJPJPJ,YF)   ";
                inSql += "  VALUES(?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)";
                ps = conn.prepareStatement( inSql );

                String lineTempValue = "";
                //读取第一行文本
                while ((lineTempValue = reader.readLine()) != null) {
                    if (lineTempValue.split( "," ).length > 0 && !(lineTempValue.equals( "#" ))) {
                        String[] strAarray = StringUtils.substringBefore( lineTempValue, ";" ).split( "," );
                        KpYsDao kpYsDao = new KpYsDao();
                        kpYsDao.saveKpYs( strAarray, fileName, ps );
                        ps.addBatch();
                        /*
                         * 批量提交
                         * */
                        count++;
                        if (count % batchSize == 0) {
                            ps.executeBatch();
                            conn.commit();
                            ps.clearBatch();
                        }
                    }
                }
                if (count > 0) {
                    ps.executeBatch();
                    conn.commit();
                    ps.clearBatch();
                }
                log.info( "【{}】文件插入数据库的条数是【{}】", fileName, count );
            } else if (fileName.contains( "YR" )) {

                String inSql = "INSERT INTO KP_LR  ";
                inSql += "(YPDSPRQ, YPDSPCDM, YPDSPCKH, TPZID, TPCDM, TPCKH, TPRQ, TPSJ, JZRQ, TPPH, SPRQ, FZID, FZHZM, FZSSJM, DZID, DZHZM, DZSSJM, JYZID, LC, SY1, SY2, SY3, LCTZ, ZXTZ, ZZZID, ZZZHZM, XB, PB, CC, ZCC, DDQYDM, CCRQ, TPYY, TSJJD1, TSJJDLC1, TSJJDPJ1, TSJJD2, TSJJDLC2, TSJJDPJ2, TSJJD3, TSJJDLC3, TSJJDPJ3, TSJJD4, TSJJDLC4, TSJJDPJ4, TSJJD5, TSJJDLC5, TSJJDPJ5, JCPJ, YZYJK, JJFDF, WPDPF, CZKTF, RPF, YHJ, YSPJ, YTPJ, TPSXF, JTK, TPRS, SGTPBZ, BZCCSJ, DZZFJYLSH, DPBZ, KDWLDH, KDFTK, KDQYDM, DBH, TBJE, JLJSBZ,WJM,YF)   ";
                inSql += "  VALUES(?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,?)";
                ps = conn.prepareStatement( inSql );

                String lineTempValue = "";
                //读取第一行文本
                while ((lineTempValue = reader.readLine()) != null) {
                    if (lineTempValue.split( "," ).length > 0 && !(lineTempValue.equals( "#" ))) {
                        String[] strAarray = StringUtils.substringBefore( lineTempValue, ";" ).split( "," );
                        KpYrDao kpYrDao = new KpYrDao();
                        kpYrDao.saveKpYr( strAarray, fileName, ps );
                        ps.addBatch();
                        /*
                         * 批量提交
                         * */
                        count++;
                        if (count % batchSize == 0) {
                            ps.executeBatch();
                            conn.commit();
                            ps.clearBatch();
                        }
                    }
                }
                if (count > 0) {
                    ps.executeBatch();
                    conn.commit();
                    ps.clearBatch();
                }
                log.info( "【{}】文件插入数据库的条数是【{}】", fileName, count );
            }
            // 关闭
            reader.close();//TODO 是否关闭文件,这里关闭跟下面关闭的区别,这里关闭文件会不会关闭错误;
        } catch (IOException e) {
            e.printStackTrace();
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != ps) {
                    ps.close();
                }
                if (null != conn) {
                    conn.close();
                }
            } catch (Exception e) {
                log.error( e.getMessage(), e );
            }
        }
    }


    public static int lineNumber(String filePath) {

        int lineNumber = 0;
        try {
            FileReader fileReader = new FileReader( new File( filePath ) );
            LineNumberReader lineNumberReader = new LineNumberReader( fileReader );
            lineNumberReader.skip( Long.MAX_VALUE );
            lineNumber = lineNumberReader.getLineNumber();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return lineNumber;
    }

    public static void main(String[] args) {
        String fileName = "kcs40201.00P";
        System.out.println( fileName.contains( "kcs4" ) );
    }
}

 数据库连接方法

package sample.util.db;
import com.alibaba.druid.pool.DruidDataSourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.sql.DataSource;
import java.io.InputStream;
import java.sql.*;
import java.util.Hashtable;
import java.util.Properties;

/**
 * @Description
 * @Author chenfei
 * @Date 2024/3/20 17:47
 */
public class DBConnect { private Connection conn = null;
    static Logger logger = LoggerFactory.getLogger(DBConnect.class.getName());
    private static DataSource dataSource;
    public static Properties dbProps = null;
    public static String dbName;
    public static Hashtable<String, DataSource> hmDataSource = new Hashtable<String, DataSource>();

    static {
        // 项目执行使用
        InputStream is = DBConnect.class.getClassLoader().getResourceAsStream("db.properties");
        //本地执行使用
//		File file = new File("src/main/resources/db.properties");
//		InputStream is = new FileInputStream(file);
        dbProps = new Properties();
        try {
            dbProps.load(is);
            dbName = dbProps.getProperty("defaultDb");
        } catch (Exception e) {
            logger.error("不能读取属性文件. " + "请确保db.properties在CLASSPATH指定的路径中");
        }
    }

    public static DataSource getdataSource() {
        return dataSource;
    }

    /**
     * 构造数据库的连接和访问类
     */
    public DBConnect(String dbName) {
        initializeDataSource(dbName);
    }

    /**
     * 无参数构造方法,默认找ldtj数据源
     */
    public DBConnect() {
        initializeDataSource(dbName);
    }

    private void initializeDataSource(String dbName) {
        dataSource = hmDataSource.get(dbName);
        if (dataSource == null)// 建立数据库连接
        {
            String dbPool = dbProps.getProperty(dbName + ".dbPool");
            if (dbPool.equalsIgnoreCase("weblogic")) {
                String weblogic_pool = dbProps.getProperty(dbName + ".weblogic.poolName");
                Context ctx = null;
                Hashtable<Object, Object> ht = new Hashtable<Object, Object>();
                ht.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
                // ht.put(Context.PROVIDER_URL, "t3://10.1.3.49:7003");
                try {
                    ctx = new InitialContext(ht);
                    dataSource = (DataSource) ctx.lookup(weblogic_pool);
                } catch (Exception e) {
                    logger.error("数据库连接失败");
                    e.printStackTrace();
                }
            } else if (dbPool.equalsIgnoreCase("druid")) {
                try {
                    Hashtable<String, String> map = new Hashtable<String, String>();
                    map.put("driverClassName", dbProps.getProperty(dbName + ".driverClassName"));
                    map.put("url", dbProps.getProperty(dbName + ".url"));
                    map.put("username", dbProps.getProperty(dbName + ".username"));
                    map.put("password", dbProps.getProperty(dbName + ".password"));
                    map.put("initialSize", dbProps.getProperty(dbName + ".initialSize"));
                    map.put("maxActive", dbProps.getProperty(dbName + ".maxActive"));

                    map.put("timeBetweenEvictionRunsMillis", "60000");
                    map.put("minEvictableIdleTimeMillis", "300000");
                    if (map.get("driverClassName").indexOf("oracle") != -1) {
                        map.put("validationQuery", "SELECT 1 FROM DUAL");
                        map.put("testWhileIdle", "true");
                        map.put("testOnBorrow", "false");
                        map.put("testOnReturn", "false");
                    }
                    // map.put("poolPreparedStatements", "true");
                    // map.put("maxPoolPreparedStatementPerConnectionSize",
                    // "50");
                    map.put("removeAbandoned", "true");
                    map.put("removeAbandonedTimeout", "1800");
                    map.put("logAbandoned", "true");
                    // DruidDataSourceFactory.createDataSource(map).getConnection();
                    dataSource = DruidDataSourceFactory.createDataSource(map);
                } catch (Exception e) {
                    dataSource = null;
                    logger.error("数据库连接失败");
                    e.printStackTrace();
                }
            }
            hmDataSource.put(dbName, dataSource);
        }
        try {
            conn = hmDataSource.get(dbName).getConnection();
        } catch (SQLException e) {
            logger.error("数据库连接失败:" + e.getMessage());
            e.printStackTrace();
        }
    }

    public Connection getConnection() {
        return conn;
    }

    public void close() {
        try {
            conn.close();
            conn = null;
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public static void closeConn(Connection conn, PreparedStatement ps, Statement st, ResultSet rs) {
        try {
            if (null != rs) {
                rs.close();
            }
            if (null != st) {
                st.close();
            }
            if (null != ps) {
                ps.close();
            }
            if (null != conn) {
                conn.close();
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public static void closeConn(Connection conn, PreparedStatement pst) {
        DBConnect.closeConn(conn, pst, null, null);
    }

    public static void closePst(PreparedStatement pst) {
        DBConnect.closeConn(null, pst, null, null);
    }

    public static void main(String[] args) {
        DBConnect dbConnect = new DBConnect("jcpt");
        System.out.println(dbConnect.getConnection());
    }

    /**
     * @param dbUrl:    数据库 IP:port/dbname
     * @param username: 用户名
     * @param password: 密码
     * @description 通过JDBC获取数据库连接
     * @return: java.sql.Connection
     * @author Xin.Lu
     * @date 2020/8/11 16:18
     */
    public static Connection getConneciton(String dbUrl, String username, String password) {
        Connection conn = null;
        try {
            Class.forName("oracle.jdbc.driver.OracleDriver");
            conn = DriverManager.getConnection("jdbc:oracle:thin:@" + dbUrl, username, password);
        } catch (ClassNotFoundException | SQLException e) {
            e.printStackTrace();
        }
        return conn;
    }

    /**
     * @param i:
     * @param size:
     * @description 判断是否需要提交
     * @return: boolean
     * @author Xin.Lu
     * @date 2020/8/12 16:40
     */
    public static boolean isNeedCommit(int i, int size) {
        if ((i != 0 && i % 1000 == 0) || (i == size - 1)) {
            return true;
        } else {
            return false;
        }
    }
}

相关推荐

  1. FTP超大文件下载入库--断点

    2024-04-13 05:26:02       30 阅读
  2. 前端实现断点文件

    2024-04-13 05:26:02       50 阅读
  3. 断点功能

    2024-04-13 05:26:02       63 阅读
  4. 浏览器Content-Range断点MP4文件

    2024-04-13 05:26:02       26 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-13 05:26:02       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-13 05:26:02       101 阅读
  3. 在Django里面运行非项目文件

    2024-04-13 05:26:02       82 阅读
  4. Python语言-面向对象

    2024-04-13 05:26:02       91 阅读

热门阅读

  1. 常见分类算法及其应用

    2024-04-13 05:26:02       31 阅读
  2. kylin使用心得

    2024-04-13 05:26:02       35 阅读
  3. vue不正经指南

    2024-04-13 05:26:02       30 阅读
  4. 双指针问题的常见剪枝

    2024-04-13 05:26:02       38 阅读
  5. 程序的预处理

    2024-04-13 05:26:02       125 阅读
  6. Spring MVC 视图解析器

    2024-04-13 05:26:02       39 阅读
  7. 自动化运维(二十二)Ansible实战 之Jenkins模块

    2024-04-13 05:26:02       43 阅读
  8. Vue3 Scss的使用(一)

    2024-04-13 05:26:02       39 阅读