JRT实现缓存协议

上一篇介绍的借助ORM的增、删、改和DolerGet方法,ORM可以很精准的知道热点数据做内存缓存。那么就有一个问题存在,即部署了多个站点时候,如果用户在一个Web里修改数据了,那么其他Web的ORM是不知道这个变化的,其他Web还是缓存的老数据的话就会造成其他Web命中的缓存数据是老的,造成不可靠问题。

那么就需要一种机制来解决多Web缓存不一致问题,参照ECP实现,把Web分为主服务器和从服务器。主服务器启动TCP服务端,从服务器启动TCP客户端连接主服务器,从服务器和主服务器之间一直保留TCP长连接用来通知缓存变化数据。这样在一个服务器增、删、改数据后就能及时通知其他服务器更新缓存,这样所有服务器的缓存数据都是可信赖的。

首先提取发送数据的对象类型
在这里插入图片描述

然后实现ECP管理类ECPManager来管理启动服务端和客户端

package JRT.DAL.ORM.Global;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;

import JRT.Core.Util.LogUtils;
import JRT.DAL.ORM.Global.ECPDto;

/**
 * 企业缓存协议,基于TCP在多台服务器直接共享缓存的更新,从而确保每个Web下的缓存数据的可靠性,这里实现TCP服务端当做小机,其他服务当做客户端和小机连接。
 * 小机会分发增加数据(增加数据分发与否倒是不影响缓存可靠性,主要是修改和删除)、修改数据、删除数据的执行记录给每个客户端,客户端按收到的记录把数据推入缓存
 */
public class ECPManager {
   
    /**
     * 要推入TCP的缓存队列
     */
    public static ConcurrentLinkedDeque ECPQuen = new ConcurrentLinkedDeque();

    /**
     * 主处理进程
     */
    private static Thread MainThread = null;

    /**
     * 发送数据的操作对象
     */
    public static Socket Sender = null;

    /**
     * 写操作对象
     */
    public static PrintWriter Writer = null;

    /**
     * 数据编码
     */
    private static String Encode="GBK";

    /**
     * 缓存所有客户端
     */
    private static ConcurrentHashMap<String,Socket> AllClient= new ConcurrentHashMap<>();

    /**
     * IP地址
     */
    private static String IP="";

    /**
     * 端口
     */
    private static  int Port=1991;

    /**
     * 是否启用了ECP
     */
    private static boolean UseEcp=false;

    /**
     * 加入缓存,直接缓存,具体的后续有缓存管理器线程维护缓存,这里只管加入队列即可
     *
     * @param obj
     * @throws Exception
     */
    public static void InECPToQuen(ECPDto obj) throws Exception{
   
        ECPQuen.add(obj);
    }

    /**
     * push目前的ECP缓存数据到远程服务器,供GlobalManager定时器定时推送
     */
    public static void TryPushEcp() throws Exception
    {
   
        try {
   
            //如果是客户端,先检查连接
            if (!IP.isEmpty()) {
   
                //重连
                if (Sender == null || Sender.isClosed()) {
   
                    LogUtils.WriteSecurityLog("ECP尝试重连:" + IP + ":" + Port);
                    StartEcpManager(IP, Port);
                    Thread.sleep(1000);
                }
                if (Sender == null || Sender.isClosed()) {
   
                    return;
                } else {
   
                    LogUtils.WriteSecurityLog("ECP尝试重连成功:" + IP + ":" + Port);
                }
            }
            List<ECPDto> pushList=null;
            //从缓存队列里面弹出数据push
            while (ECPQuen.size() > 0) {
   
                ECPDto obj = (ECPDto)ECPQuen.pop();
                if (obj != null) {
   
                    //启用了ECP就push到服务端,否则就只更新自己缓存
                    if(UseEcp==true)
                    {
   
                        //初始化push列表
                        if(pushList==null)
                        {
   
                            pushList=new ArrayList<>();
                        }
                        pushList.add(obj);
                    }
                    else
                    {
   
                        //转换成数据实体推入缓存
                        JRT.DAL.ORM.Global.GlobalManager.InCache(obj);
                    }
                }
            }
            //直接列表一次推送,没有启用ECP的话这个列表一直是空
            if(pushList!=null)
            {
   
                //客户端推送
                if (!IP.isEmpty()) {
   
                    Writer.print(JRT.Core.Util.JsonUtil.Object2Json(pushList));
                    Writer.flush();
                }
                //服务端推送
                else {
   
                    //给每个连接的客户端推送信息
                    for (String ip : AllClient.keySet()) {
   
                        Socket oneClient = AllClient.get(ip);
                        //移除关闭的客户端
                        if (oneClient.isClosed()) {
   
                            AllClient.remove(ip);
                        }
                        PrintWriter oneWriter = new PrintWriter(new OutputStreamWriter(oneClient.getOutputStream(), Encode), false);
                        oneWriter.print(JRT.Core.Util.JsonUtil.Object2Json(pushList));
                        oneWriter.flush();
                    }
                }
            }
        }
        catch (Exception ex)
        {
   
            LogUtils.WriteExceptionLog("推送数据到ECP异常", ex);
        }
    }

    /**
     * 启动ECP管理
     * @param ip
     * @param port
     */
    public static void StartEcpManager(String ip, int port)
    {
   
        IP=ip;
        Port=port;
        //当客户端
        if (!ip.isEmpty()) {
   
            MainThread = new Thread(new Runnable() {
   
                @Override
                public void run() {
   
                    //得到输入流
                    InputStream inputStream = null;
                    //创建Socket对象并连接到服务端
                    Socket socket = null;
                    try {
   
                        //创建Socket对象并连接到服务端
                        socket = new Socket(ip, port);
                        Sender = socket;
                        Writer = new PrintWriter(new OutputStreamWriter(Sender.getOutputStream(), Encode), false);
                        //得到输入流
                        inputStream = socket.getInputStream();
                        //IO读取
                        byte[] buf = new byte[10240];
                        int readlen = 0;
                        //阻塞读取数据
                        while ((readlen = inputStream.read(buf)) != -1) {
   
                            try {
   
                                String ecpJson = new String(buf, 0, readlen, Encode);
                                //得到ECP实体
                                List<ECPDto> dtoList = (List<ECPDto>)JRT.Core.Util.JsonUtil.Json2Object(ecpJson, ECPDto.class);
                                if(dtoList!=null&&dtoList.size()>0)
                                {
   
                                    for(ECPDto dto:dtoList) {
   
                                        //转换成数据实体推入缓存
                                        JRT.DAL.ORM.Global.GlobalManager.InCache(dto);
                                    }
                                }
                            }
                            catch (Exception ee)
                            {
   
                                LogUtils.WriteExceptionLog("ECP处理主服务器数据异常", ee);
                            }
                        }
                    }
                    catch (IOException ex) {
   
                        LogUtils.WriteExceptionLog("ECP侦听TCP服务异常", ex);
                    }
                    finally {
   
                        Sender=null;
                        try {
   
                            if (inputStream != null) {
   
                                //关闭输入
                                inputStream.close();
                            }
                            if (Writer != null) {
   
                                Writer.flush();
                                //关闭输出
                                Writer.close();
                                Writer=null;
                            }
                            if (socket != null) {
   
                                // 关闭连接
                                socket.close();
                            }
                        }
                        catch (Exception ex) {
   
                            LogUtils.WriteExceptionLog("释放TCP资源异常", ex);
                        }

                    }
                }
            });

        }
        //当服务端
        else {
   
            MainThread = new Thread(new Runnable() {
   
                @Override
                public void run() {
   
                    //得到输入流
                    InputStream inputStream = null;
                    //创建Socket对象并连接到服务端
                    Socket socket = null;
                    try {
   
                        ServerSocket serverSocket = new ServerSocket(port);
                        //增加一个无限循环
                        while (true) {
   
                            //等待客户端连接,阻塞
                            Socket clientSocket = serverSocket.accept();
                            //按IP存客户端连接
                            String clientIP=clientSocket.getInetAddress().getHostAddress();
                            AllClient.put(clientIP,clientSocket);
                            //接收客户端消息
                            Thread ClientThread = new Thread(new Runnable() {
   
                                @Override
                                public void run() {
   
                                    try {
   
                                        //得到输出流
                                        Writer = new PrintWriter(new OutputStreamWriter(clientSocket.getOutputStream(), Encode), false);
                                        //得到输入流
                                        InputStream inputStream = clientSocket.getInputStream();
                                        //IO读取
                                        byte[] buf = new byte[10240];
                                        int readlen = 0;
                                        //阻塞读取数据
                                        while ((readlen = inputStream.read(buf)) != -1) {
   
                                            String ecpJson = new String(buf, 0, readlen, Encode);
                                            //得到ECP实体
                                            List<ECPDto> dtoList = (List<ECPDto>)JRT.Core.Util.JsonUtil.Json2Object(ecpJson, ECPDto.class);
                                            if(dtoList!=null&&dtoList.size()>0)
                                            {
   
                                                for(ECPDto dto:dtoList) {
   
                                                    //转换成数据实体推入缓存
                                                    JRT.DAL.ORM.Global.GlobalManager.InCache(dto);
                                                }
                                                //给每个连接的客户端推送信息
                                                for (String ip : AllClient.keySet()) {
   
                                                    Socket oneClient = AllClient.get(ip);
                                                    //移除关闭的客户端
                                                    if (oneClient.isClosed()) {
   
                                                        AllClient.remove(ip);
                                                    }
                                                    PrintWriter oneWriter = new PrintWriter(new OutputStreamWriter(oneClient.getOutputStream(), Encode), false);
                                                    oneWriter.print(ecpJson);
                                                    oneWriter.flush();
                                                }
                                            }
                                        }
                                    }
                                    catch (Exception ee)
                                    {
   
                                        LogUtils.WriteExceptionLog("ECP处理客户端数据异常", ee);
                                    }
                                }
                            });
                            ClientThread.start();
                        }
                    }
                    catch (IOException ex) {
   
                        LogUtils.WriteExceptionLog("侦听仪器TCP异常", ex);
                    }
                    finally {
   
                        try {
   
                            if (inputStream != null) {
   
                                //关闭输入
                                inputStream.close();
                            }
                            if (Writer != null) {
   
                                Writer.flush();
                                //关闭输出
                                Writer.close();
                            }
                            if (socket != null) {
   
                                // 关闭连接
                                socket.close();
                            }
                        } catch (Exception ex) {
   
                            LogUtils.WriteExceptionLog("释放TCP资源异常", ex);
                        }

                    }
                }
            });
        }
        //启动主进程
        MainThread.start();
        UseEcp=true;
    }

    /**
     * 返回ECP待处理队列的json数据供调试看是否符合预期
     * @return
     */
    public static String ViewECPQuenDate() throws Exception
    {
   
        return JRT.Core.Util.JsonUtil.Object2Json(ECPQuen);
    }

}

ECP管理类再对接上GlobalManager

package JRT.DAL.ORM.Global;

import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;

import JRT.Core.MultiPlatform.JRTConfigurtaion;
import JRT.Core.Util.LogUtils;
import JRT.DAL.ORM.Global.OneGlobalNode;
import JRT.DAL.ORM.Global.ECPDto;

/**
 * 实现内存模拟global的效果
 */
public class GlobalManager {
   
    /**
     * 在内存里缓存热点数据
     */
    private static ConcurrentHashMap<String, ConcurrentHashMap<String, OneGlobalNode>> AllHotData = new ConcurrentHashMap<>();

    /**
     * 要缓存数据的队列
     */
    private static ConcurrentLinkedDeque TaskQuen = new ConcurrentLinkedDeque();

    /**
     * 管理缓存的定时器
     */
    private static Timer ManageTimer = new Timer();

    /**
     * 缓存的最大对象数量
     */
    public static Integer GlobalCacheNum = 100000;

    /**
     * 当前的缓存数量
     */
    private static AtomicInteger CurCacheNum=new AtomicInteger(0);

    /**
     * 最后删除数据的时间
     */
    private static Long LastDeleteTime=null;

    /**
     * 加入缓存,直接缓存,具体的后续有缓存管理器线程维护缓存,这里只管加入队列即可
     *
     * @param obj
     * @throws Exception
     */
    public static void InCache(ECPDto obj) throws Exception{
   
        TaskQuen.add(obj);
    }

    /**
     * 通过主键查询数据
     * @param model
     * @param id
     * @param <T>
     * @return
     * @throws Exception
     */
    public static <T> T DolerGet(T model,Object id) throws Exception
    {
   
        //实体的名称
        String modelName = model.getClass().getName();
        if(AllHotData.containsKey(modelName))
        {
   
            //命中数据,克隆返回
            if(AllHotData.get(modelName).containsKey(id))
            {
   
                OneGlobalNode node=AllHotData.get(modelName).get(id);
                //更新时间
                node.Time=JRT.Core.Util.TimeParser.GetTimeInMillis();
                Object retObj=JRT.Core.Util.JsonUtil.CloneObject(node.Data);
                return (T)retObj;
            }
        }
        return null;
    }

    /**
     * 启动缓存数据管理的线程
     */
    public static void StartGlobalManagerTask() throws Exception{
   
        //最大缓存数量
        String GlobalCacheNumConf = JRTConfigurtaion.Configuration("GlobalCacheNum");
        if (GlobalCacheNumConf != null && !GlobalCacheNumConf.isEmpty()) {
   
            GlobalCacheNum = JRT.Core.Util.Convert.ToInt32(GlobalCacheNumConf);
        }
        //定时任务
        TimerTask timerTask = new TimerTask() {
   
            @Override
            public void run() {
   
                try {
   
                    //缓存队列的数据并入缓存
                    while (TaskQuen.size() > 0) {
   
                        //处理要加入缓存的队列
                        DealOneDataQuen();
                    }
                    //尝试推送ECP数据到主服务
                    JRT.DAL.ORM.Global.ECPManager.TryPushEcp();
                    //清理多余的缓存数据,这里需要讲究算法,要求在上百万的缓存数据里快速找到时间最久远的数据
                    if(CurCacheNum.get()>GlobalCacheNum)
                    {
   
                        //每轮清理时间处于上次清理时间和当前时间前百分之5的老数据
                        long Diff=(JRT.Core.Util.TimeParser.GetTimeInMillis()-LastDeleteTime)/20;
                        //留下数据的最大时间
                        long LeftMaxTime=LastDeleteTime+Diff;
                        //遍历所有的热点数据
                        for (String model : AllHotData.keySet()) {
   
                            ConcurrentHashMap<String, OneGlobalNode> oneTableHot=AllHotData.get(model);
                            //记录要删除的数据
                            List<String> delList=new ArrayList<>();
                            for (String key : oneTableHot.keySet()) {
   
                                OneGlobalNode one=oneTableHot.get(key);
                                //需要删除的数据
                                if(one.Time<LeftMaxTime)
                                {
   
                                    delList.add(key);
                                }
                            }
                            //移除时间久的数据
                            for(String del:delList)
                            {
   
                                oneTableHot.remove(del);
                            }
                        }
                    }
                    //清理时间久远的缓存数据
                } catch (Exception ex) {
   
                    LogUtils.WriteExceptionLog("处理Global缓存异常", ex);
                }
            }
        };
        //尝试启动ECP管理器
        String IP = JRTConfigurtaion.Configuration("ECPIP");
        String Port = JRTConfigurtaion.Configuration("ECPPort");
        if(Port!=null&&!Port.isEmpty())
        {
   
            if(IP==null)
            {
   
                IP="";
            }
            //启动ECP管理器
            JRT.DAL.ORM.Global.ECPManager.StartEcpManager(IP,JRT.Core.Util.Convert.ToInt32(Port));
        }
        //启动缓存管理定时器
        ManageTimer.schedule(timerTask, 0, 500);

    }

    /**
     * 通过实体名称获得实体类型信息
     *
     * @param modelName 实体名称
     * @return
     */
    private static Class GetTypeByName(String modelName) throws Exception {
   
        return JRT.Core.Util.ReflectUtil.GetType(JRT.Core.MultiPlatform.JRTConfigurtaion.Configuration("ModelName") + ".Entity." + modelName, JRT.Core.MultiPlatform.JRTConfigurtaion.Configuration("ModelName"));
    }


    /**
     * 处理队列里的一条数据并入缓存
     */
    private static void DealOneDataQuen() {
   
        try {
   
            Object obj = TaskQuen.pop();
            if (obj != null) {
   
                ECPDto dto=(ECPDto)obj;
                //添加或者更新缓存
                if(dto.Cmd.equals("A")||dto.Cmd.equals("U")) {
   
                    Class type = GetTypeByName(dto.Model);
                    Object entity = JRT.Core.Util.JsonUtil.Json2Object(dto.Data, type);
                    JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);
                    //实体的名称
                    String modelName = dto.Model;
                    //得到数据的主键
                    String id = tableInfo.ID.Value.toString();
                    if (!AllHotData.containsKey(modelName)) {
   
                        ConcurrentHashMap<String, OneGlobalNode> map = new ConcurrentHashMap<>();
                        AllHotData.put(modelName, map);
                    }
                    //更新数据
                    if (AllHotData.get(modelName).containsKey(id)) {
   
                        AllHotData.get(modelName).get(id).Data = entity;
                        AllHotData.get(modelName).get(id).Time = JRT.Core.Util.TimeParser.GetTimeInMillis();
                    }
                    //加入到缓存
                    else {
   
                        OneGlobalNode node = new OneGlobalNode();
                        node.Data = entity;
                        node.Time = JRT.Core.Util.TimeParser.GetTimeInMillis();
                        AllHotData.get(modelName).put(id, node);
                        //缓存数量加1
                        CurCacheNum.addAndGet(1);
                        //记录时间
                        if (LastDeleteTime == null) {
   
                            LastDeleteTime = JRT.Core.Util.TimeParser.GetTimeInMillis();
                        }
                    }
                }
                //删除缓存
                else if(dto.Cmd.equals("D"))
                {
   
                    Class type = GetTypeByName(dto.Model);
                    Object entity = JRT.Core.Util.JsonUtil.Json2Object(dto.Data, type);
                    JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);
                    //实体的名称
                    String modelName = dto.Model;
                    //得到数据的主键
                    String id = tableInfo.ID.Value.toString();
                    if (AllHotData.containsKey(modelName)&&AllHotData.get(modelName).containsKey(id)) {
   
                        AllHotData.get(modelName).remove(id);
                    }
                }
                //清空表缓存
                else if(dto.Cmd.equals("CLEAR"))
                {
   
                    //实体的名称
                    String modelName = dto.Model;
                    if (AllHotData.containsKey(modelName)) {
   
                        AllHotData.get(modelName).clear();
                    }
                }
            }
        }
        catch (Exception ex) {
   
            LogUtils.WriteExceptionLog("处理Global缓存添加异常", ex);
        }
    }

    /**
     * 返回Global的json数据供调试看是否符合预期
     * @return
     */
    public static String ViewGlobalJson() throws Exception
    {
   
        return JRT.Core.Util.JsonUtil.Object2Json(AllHotData);
    }

    /**
     * 返回Global待处理队列的json数据供调试看是否符合预期
     * @return
     */
    public static String ViewGlobalTaskQuenDate() throws Exception
    {
   
        return JRT.Core.Util.JsonUtil.Object2Json(TaskQuen);
    }

}

增删改和DolerGet调整

/**
     * 保存对象,不抛异常,执行信息通过参数输出
     *
     * @param entity   实体对象
     * @param outParam 输出执行成功或失败信息,执行成功时输出执行记录主键
     * @param <T>      实体类型约束
     * @return影响行数
     */
    @Override
    public <T> int Save(T entity, OutValue outValue, OutParam outParam) throws Exception {
   
        int row = 0;
        PreparedStatement stmt = null;
        boolean innerT = false;     //标识是否内部开启事务
        String sql = "";
        try {
   
            //根据实体对象获取表信息
            JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);
            HashParam hash = new HashParam();
            //获取插入SQL语句
            sql = JRT.DAL.ORM.Common.ModelToSqlUtil.GetInsertSqlByTableInfo(Manager().GetIDbFactory(factoryName), tableInfo, hash, false);
            //写SQL日志
            JRT.Core.Util.LogUtils.WriteSqlLog("执行插入SQL:" + sql + ";SQL参数:" + JRT.Core.Util.JsonUtil.Object2Json(hash.GetParam()));
            //获取ID列
            String idKey = tableInfo.ID.Key;
            //声明式SQL,并设置参数
            if (!idKey.isEmpty()) {
   
                stmt = Manager().Connection().prepareStatement(sql, new String[]{
   idKey});
            } else {
   
                stmt = Manager().Connection().prepareStatement(sql);
            }
            String paraSql = DBParaUtil.SetDBPara(stmt, hash.GetParam());
            row = stmt.executeUpdate();
            ResultSet rowID = stmt.getGeneratedKeys();
            JRT.Core.Util.LogUtils.WriteSqlLog("参数:" + paraSql);
            //保存成功返回记录主键,返回影响记录数 1
            if (row == 1) {
   
                if (rowID.next() && (!idKey.isEmpty())) {
   
                    outValue.Value = rowID.getInt(idKey);
                    //设置RowID到实体
                    JRT.Core.Util.ReflectUtil.SetObjValue(entity, tableInfo.ID.Key, rowID.getInt(idKey));
                    //尝试把数据推入缓存队列
                    Manager().TryPushToCache(entity, "A");
                }
            } else {
   
                outParam.Code = OutStatus.ERROR;
                outParam.Message = "保存数据失败,执行保存返回:" + row;
            }
            return row;
        } catch (Exception ex) {
   
            outParam.Code = OutStatus.ERROR;
            //操作异常,判断如果开启事务,则回滚事务
            if (Manager().Hastransaction) {
   
                if (!Manager().RollTransaction()) {
   
                    outParam.Message = "保存数据失败!" + ex.getCause().getMessage() + ";回滚事务失败。";
                }
            }
            outParam.Message = "保存数据失败!" + ex.getCause().getMessage() + "执行SQL:" + sql;
        }
        //操作结束释放资源
        finally {
   
            if (stmt != null) {
   
                stmt.close();
            }
            //如果上层调用未开启事务,则调用结束释放数据库连接
            if (!Manager().Hastransaction) {
   
                Manager().Close();
            }
        }
        return row;
    }


/**
     * 更新实体对象
     *
     * @param entity        实体对象
     * @param param         更新条件,有条件就按条件更新,没有条件就按主键更新
     * @param outParam      输出执行成功或失败的信息
     * @param updateColName 更新属性名集合,无属性则更新实体的所有属性
     * @param joiner        连接符,为空或不给则按则按且连接,给的话长度应该比参数长度少1,如: and
     * @param operators     操作符,为空或不给的话各条件按等来比较,给的话长度应该跟参数长度一样,如: !=
     * @param <T>           类型限定符
     * @return 影响行数
     */
    @Override
    public <T> int Update(T entity, HashParam param, OutParam outParam, List<String> updateColName, List<String> joiner, List<String> operators) throws Exception {
   
        PreparedStatement stmt = null;
        if (outParam == null) outParam = new OutParam();
        int row = 0;
        boolean innerT = false;     //标识是否内部开启事务
        try {
   
            //根据实体获取表信息
            JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);
            HashParam hash = new HashParam();
            //获取更新的SQL语句
            String sql = JRT.DAL.ORM.Common.ModelToSqlUtil.GetUpdateSqlByTableInfo(Manager().GetIDbFactory(factoryName), tableInfo, param, updateColName, joiner, operators, hash);
            //写SQL日志
            JRT.Core.Util.LogUtils.WriteSqlLog("执行更新SQL:" + sql);
            //声明式SQL,并且设置参数
            stmt = Manager().Connection().prepareStatement(sql);
            String paraSql = DBParaUtil.SetDBPara(stmt, hash.GetParam());
            row = stmt.executeUpdate();
            JRT.Core.Util.LogUtils.WriteSqlLog("参数:" + paraSql);
            if (row == 1) {
   
                //尝试把数据推入缓存队列
                Manager().TryPushToCache(entity, "U");
            }
            outParam.Code = OutStatus.SUCCESS;
            outParam.Message = "更新数据成功。";
            return row;
        } catch (Exception ex) {
   
            //操作异常,判断如果开启了事务,就回滚事务
            outParam.Code = OutStatus.ERROR;
            if (Manager().Hastransaction) {
   
                if (!Manager().RollTransaction()) {
   
                    outParam.Message = "更新数据失败!" + ex.getCause().getMessage() + ";回滚事务失败。";
                }
            }
            outParam.Message = "更新数据失败!" + ex.getCause().getMessage();
        }
        //操作结束释放资源
        finally {
   
            if (stmt != null) {
   
                stmt.close();
            }
            //如果上层调用未开启事务,则调用结束释放数据库连接
            if (!Manager().Hastransaction) {
   
                Manager().Close();
            }
        }
        return row;
    }


/**
     * 根据条件删除记录
     *
     * @param entity    实体对象
     * @param param     删除条件,有条件按条件删除,没有条件按主键删除
     * @param outParam  输出执行成功或失败的信息
     * @param joiner    多条件逻辑连接符,为空或不给则按则按且连接,给的话长度应该比参数长度少1,如: and
     * @param operators 操作符,为空或不给的话各条件按等来比较,给的话长度应该跟参数长度一样,如: !=
     * @param <T>       类型限定符
     * @return 影响行数
     */
    @Override
    public <T> int Remove(T entity, HashParam param, OutParam outParam, List<String> joiner, List<String> operators) throws Exception {
   
        PreparedStatement stmt = null;
        if (outParam == null) outParam = new OutParam();
        int row = 0;
        try {
   
            //根据实体对象获取表信息
            JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);
            HashParam hash = new HashParam();
            //获取删除SQL语句
            String sql = JRT.DAL.ORM.Common.ModelToSqlUtil.GetDeleteSqlByTableInfo(Manager().GetIDbFactory(factoryName), tableInfo, param, joiner, operators, hash);
            //写SQL日志
            JRT.Core.Util.LogUtils.WriteSqlLog("执行删除SQL:" + sql);
            //声明式SQL,并设置参数
            stmt = Manager().Connection().prepareStatement(sql);
            String paraSql = DBParaUtil.SetDBPara(stmt, hash.GetParam());
            row = stmt.executeUpdate();
            if (row == 1) {
   
                //尝试把数据推入缓存队列
                Manager().TryPushToCache(entity, "D");
            }
            JRT.Core.Util.LogUtils.WriteSqlLog("参数:" + paraSql);
            outParam.Code = OutStatus.SUCCESS;
            outParam.Message = "删除数据成功。";
            return row;
        } catch (Exception ex) {
   
            //操作异常,判断如果开启了事务,就回滚事务
            outParam.Code = OutStatus.ERROR;
            if (Manager().Hastransaction) {
   
                if (!Manager().RollTransaction()) {
   
                    outParam.Message = "更新数据失败!" + ex.getCause().getMessage() + ";回滚事务失败。";
                }
            }
            outParam.Message = "更新数据失败!" + ex.getCause().getMessage();
        }
        //操作结束释放资源
        finally {
   
            if (stmt != null) {
   
                stmt.close();
            }
            //如果上层调用未开启事务,则调用结束释放数据库连接
            if (!Manager().Hastransaction) {
   
                Manager().Close();
            }
        }
        return row;
    }

/**
     * 通过主键查询数据,带缓存的查询,用来解决关系库的复杂关系数据获取,顶替Cache的$g
     *
     * @param model 实体
     * @param id    主键
     * @param <T>
     * @return
     * @throws Exception
     */
    public <T> T DolerGet(T model, Object id) throws Exception {
   
        T ret = GlobalManager.DolerGet(model, id);
        //命中缓存直接返回
        if (ret != null) {
   
            return ret;
        }
        else {
   
            //调用数据库查询
            ret = GetById(model, id);
            //找到数据,推入缓存
            if(ret!=null) {
   
                ECPDto dto = new ECPDto();
                dto.Cmd = "A";
                dto.Model = ret.getClass().getSimpleName();
                dto.Data = JRT.Core.Util.JsonUtil.Object2Json(ret);
                //通知存入缓存
                GlobalManager.InCache(dto);
            }
        }
        return ret;
    }

/**
     * 通过主键查询数据,主业务数据没找到会按历史切割数量找历史表
     *
     * @param model
     * @param id
     * @param <T>
     * @return
     * @throws Exception
     */
    public <T> T GetById(T model, Object id) throws Exception {
   
        JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(model);
        List<ParamDto> param = new ArrayList<>();
        ParamDto p = new ParamDto();
        p.Key = tableInfo.ID.Key;
        p.Value = id;
        param.add(p);
        //创建实体集合
        List<T> lstObj = FindAll(model, param, "", -1, -1, "", null, null);
        //结果为空,返回一个新建的对象
        if (lstObj.size() == 0) {
   
            //从历史表取数据
            String HisTableName = tableInfo.TableInfo.HisTableName();
            if (!HisTableName.isEmpty()) {
   
                int cutNum=0;
                //指定了切割列按切割列切割
                if(!tableInfo.TableInfo.CutHisColName().isEmpty())
                {
   
                    cutNum=JRT.Core.Util.Convert.ToInt32(JRT.Core.Util.ReflectUtil.GetObjValue(model,tableInfo.TableInfo.CutHisColName()).toString());
                }
                else
                {
   
                    cutNum=JRT.Core.Util.Convert.ToInt32(tableInfo.ID.Value.toString());
                }
                //除以历史页大小算到数据该放入哪个历史表
                int hisNum = cutNum/HistoryPage;
                //分割所有历史实体
                String[] HisTableNameArr = HisTableName.split("^");
                //存放页小于所有历史表数据就做移动
                if (hisNum < HisTableNameArr.length) {
   
                    String HisModelName = HisTableNameArr[hisNum];
                    //得到历史表的实体
                    Class cHis = GetTypeByName(HisModelName);
                    //克隆得到历史表的对象
                    Object hisData = JRT.Core.Util.JsonUtil.CloneObject(model, cHis);
                    //创建实体集合
                    List<T> lstHisObj = DolerFindAll(0,hisData, param, "", -1, -1, "", null, null);
                    //结果为空,返回一个新建的对象
                    if (lstHisObj.size() > 0) {
   
                        return lstHisObj.get(0);
                    }
                }
            }
            return null;
        }
        //否则返回第一个实体
        else {
   
            return lstObj.get(0);
        }
    }

    /**
     * 把数据安装维护的历史表大小移入历史表
     *
     * @param model 实体数据
     * @param <T>   泛型
     * @return 是否成功
     * @throws Exception
     */
    public <T> boolean MoveToHistory(T model) throws Exception {
   
        JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(model);
        String HisTableName = tableInfo.TableInfo.HisTableName();
        if (!HisTableName.isEmpty()) {
   
            //分割所有历史实体
            String[] HisTableNameArr = HisTableName.split("^");
            int cutNum=0;
            //指定了切割列按切割列切割
            if(!tableInfo.TableInfo.CutHisColName().isEmpty())
            {
   
                cutNum=JRT.Core.Util.Convert.ToInt32(JRT.Core.Util.ReflectUtil.GetObjValue(model,tableInfo.TableInfo.CutHisColName()).toString());
            }
            else
            {
   
                cutNum=JRT.Core.Util.Convert.ToInt32(tableInfo.ID.Value.toString());
            }
            //除以历史页大小算到数据该放入哪个历史表
            int hisNum = cutNum/HistoryPage;
            //存放页小于所有历史表数据就做移动
            if (hisNum < HisTableNameArr.length) {
   
                String HisModelName = HisTableNameArr[hisNum];
                //得到历史表的实体
                Class cHis = GetTypeByName(HisModelName);
                //克隆得到历史表的对象
                Object newData = JRT.Core.Util.JsonUtil.CloneObject(model, cHis);
                OutParam out = new OutParam();
                //保存历史数据
                int saveRet = Save(newData, out);
                if (saveRet == 1) {
   
                    saveRet = Remove(model, out);
                }
                if (saveRet == 1) {
   
                    return true;
                }
            }
        }
        return false;
    }

连接管理类调整,根据是否带事务在操作执行成功后把数据推入ECP队列供Global管理器往主服务推送分发

package JRT.DAL.ORM.DBUtility;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

import JRT.DAL.ORM.DBUtility.C3P0Util;
import com.mchange.v2.c3p0.ComboPooledDataSource;
import JRT.DAL.ORM.DBUtility.IDbFactory;
import JRT.DAL.ORM.Global.ECPDto;

/**
 * 连接和事务管理
 */
public class DBManager {
   
    /**
     * 驱动名称
     */
    private String factoryName="";

    /**
     * 当前对象的驱动
     */
    private IDbFactory factory=null;


    /**
     * 存数据库连接对象
     */
    private Connection connection=null;

    /**
     * 要转入缓存的临时数据
     */
    private List<ECPDto> ToEcpTmp=null;

    /**
     * 尝试把数据推入缓存队列
     * @param obj 对象
     * @param Oper 操作 A:添加  U:更新  D:删除
     * @throws Exception
     */
    public void TryPushToCache(Object obj,String Oper) throws Exception
    {
   
        ECPDto dto=new ECPDto();
        dto.Cmd=Oper;
        dto.Model=obj.getClass().getSimpleName();
        dto.Data=JRT.Core.Util.JsonUtil.Object2Json(obj);
        //有事务就推缓存
        if(Hastransaction=true)
        {
   
            if(ToEcpTmp==null)
            {
   
                ToEcpTmp=new ArrayList<>();
            }
            ToEcpTmp.add(dto);
        }
        else
        {
   
            //没事务的直接推入缓存队列
            JRT.DAL.ORM.Global.ECPManager.InECPToQuen(dto);
        }
    }

    /**
     * 为每个数据库驱动存储工厂
     */
    private static ConcurrentHashMap<String, IDbFactory> hsFact = new ConcurrentHashMap<>();

    /**
     * 为每个数据库驱动存储连接池
     */
    private static ConcurrentHashMap<String, ComboPooledDataSource> hsPoll = new ConcurrentHashMap<>();

    /**
     * 得到驱动对象
     * @param factoryName
     * @return
     */
    public IDbFactory GetIDbFactory(String factoryName)
    {
   
        if(factory==null)
        {
   
            factory=hsFact.get(factoryName);
        }
        return factory;
    }


    /**
     * 尝试初始化连接池
     * @param factoryName
     */
    public static void TryInitConnPool(String factoryName) throws Exception
    {
   
        if(factoryName=="")
        {
   
            factoryName="LisMianDbFactory";
        }
        if(!hsPoll.containsKey(factoryName))
        {
   
            IDbFactory factory=JRT.Core.Context.ObjectContainer.GetTypeObject(factoryName);
            hsPoll.put(factoryName,C3P0Util.GetConnPool(factory));
            if(!hsFact.containsKey(factoryName))
            {
   
                hsFact.put(factoryName,factory);
            }
        }
    }

    /**
     * 构造函数
     * @param factName 驱动配置名称
     * @throws Exception
     */
    public DBManager(String factName) throws Exception
    {
   
        factoryName=factName;
        TryInitConnPool(factoryName);
    }

    /**
     * 存数据库连接对象
     */
    public Connection Connection() throws Exception
    {
   
        if(connection==null)
        {
   
            connection=hsPoll.get(factoryName).getConnection();
        }
        return connection;
    }

    /**
     * 标识是否开启事务
     */
    public boolean Hastransaction = false;

    /**
     * 存储开启多次事务的保存点,每次调用BeginTransaction开启事务是自动创建保存点
     */
    public LinkedList<Savepoint> Transpoints = new LinkedList<Savepoint>();

    /**
     * 获取开启的事务层级
     * @return
     */
    public int GetTransactionLevel()
    {
   
        return this.Transpoints.size();
    }

    /**
     * 释放数据库连接
     * @return true成功释放,false释放失败
     */
    public boolean Close() throws Exception
    {
   
        if(connection!=null)
        {
   
            connection.setAutoCommit(true);
            connection.close();
        }
        connection=null;
        return true;
    }


    /**
     * 此方法开启事务
     * @return  true开启事务成功,false开始事务失败
     */
    public boolean BeginTransaction() throws Exception
    {
   
        try
        {
   
            this.Connection().setAutoCommit(false);
            this.Hastransaction = true;
            Savepoint savepoint = this.Connection().setSavepoint();
            Transpoints.addLast(savepoint);
            return true;
        }

        catch (SQLException sqle)
        {
   
            JRT.Core.Util.LogUtils.WriteExceptionLog("开启事务失败!" + sqle.getMessage(), sqle);
        }
        return false;
    }

    /**
     * 回滚上一层事务
     * @return true回滚事务成功,false回滚事务失败
     */
    public boolean RollTransaction() throws Exception
    {
   
        //删除临时数据
        if(ToEcpTmp!=null) {
   
            ToEcpTmp.clear();
            ToEcpTmp=null;
        }
        //未开启事务时,算回滚事务成功
        if (!this.Hastransaction)
        {
   
            return true;
        }
        try
        {
   
            if (this.Transpoints.size() == 0)
            {
   
                this.Connection().rollback();
                this.Hastransaction = false;
            }
            else
            {
   
                Savepoint point = this.Transpoints.poll();
                this.Connection().rollback(point);
            }
            return true;
        }
        catch (SQLException sqle)
        {
   
            JRT.Core.Util.LogUtils.WriteExceptionLog("事务回滚失败!" + sqle.getMessage(),sqle);
            throw sqle;
        }
        finally
        {
   
            if (!this.Hastransaction)
            {
   
                Close();
            }
        }
    }

    /**
     * 回滚开启的全部事务
     * @return true回滚事务成功,false回滚事务失败
     */
    public boolean RollTransactionAll() throws Exception
    {
   
        //删除临时数据
        if(ToEcpTmp!=null) {
   
            ToEcpTmp.clear();
            ToEcpTmp=null;
        }
        //未开启事务时,算回滚事务成功
        if (!this.Hastransaction)
        {
   
            return true;
        }
        try
        {
   
            this.Connection().rollback();
            this.Hastransaction = false;
            return true;
        }
        catch (SQLException sqle)
        {
   
            JRT.Core.Util.LogUtils.WriteExceptionLog("回滚所有事务层级失败!" + sqle.getMessage(),sqle);
            throw sqle;
        }
        finally
        {
   
            Close();
        }
    }

    /**
     * 提交事务
     * @return true提交事务成功,false提交事务失败
     */
    public boolean CommitTransaction() throws Exception
    {
   
        try
        {
   
            //临时数据推入缓存
            if(ToEcpTmp!=null)
            {
   
                for(ECPDto obj:ToEcpTmp)
                {
   
                    //没事务的直接推入缓存队列
                    JRT.DAL.ORM.Global.ECPManager.InECPToQuen(obj);
                }
            }
            this.Connection().commit();
            this.Hastransaction = false;
            return true;
        }
        catch (SQLException sqle)
        {
   
            JRT.Core.Util.LogUtils.WriteExceptionLog("提交事务失败!" + sqle.getMessage(),sqle);
        }
        finally
        {
   
            //提交事务,不论成功与否,释放数据库连接
            try
            {
   
                Close();
            }
            catch (Exception ex)
            {
   

            }
        }
        return false;
    }

}





增加的配置
在这里插入图片描述

这样就有一个理论上比较靠谱的缓存机制了,业务用SQL查到主列表数据后,调用DolerGet的获得各种周边相关数据来组装给前台返回,就不用每个写业务的人自己考虑写复杂的级联查询数据库受不了,自己缓存数据时候缓存是否可靠,自己不缓存数据时候调用太多数据库交互又慢的问题。DolerGet基本可以满足比较无脑的多维取数据组装的要求。

相关推荐

  1. Http协议:Http缓存

    2023-12-06 20:48:05       33 阅读
  2. JRT实现Cache的驱动

    2023-12-06 20:48:05       70 阅读
  3. http缓存协议详细介绍

    2023-12-06 20:48:05       59 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2023-12-06 20:48:05       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2023-12-06 20:48:05       106 阅读
  3. 在Django里面运行非项目文件

    2023-12-06 20:48:05       87 阅读
  4. Python语言-面向对象

    2023-12-06 20:48:05       96 阅读

热门阅读

  1. PHP常见错误

    2023-12-06 20:48:05       51 阅读
  2. 微信小程序开发步骤及简单开发案例

    2023-12-06 20:48:05       48 阅读
  3. 每日一题(LeetCode)----字符串--重复的子字符串

    2023-12-06 20:48:05       74 阅读
  4. Pytorch:torch.optim详解

    2023-12-06 20:48:05       51 阅读
  5. Hive_last_value()

    2023-12-06 20:48:05       47 阅读
  6. python协程

    2023-12-06 20:48:05       59 阅读