ELK+kafka日志采集

ElasticSeach(存储日志信息)
Logstash(搬运工)
Kibana 连接ElasticSeach图形化界面查询日志

ELK采集日志的原理:

  1. 在每个服务器上安装Logstash
  2. Logstash需要配置固定读取某个日志文件
  3. Logstash将日志文件格式化为json的格式输出到es中
  4. 开发者使用Kibana连接到ElasticSeach 查询存储日志内容

为什么将日志存储在ElasticSeach
其底层使用到倒排索引 搜索效率高

为什么需要使用elk+kafka
如果单纯的使用elk的话,服务器节点扩容时需要在每个服务器上安装 Logstash 步骤十分冗余。
Logstash读取本地日志文件,可能会对本地的磁盘io性能会有一定影响。

elk+kafka采集日志的原理:

  1. springboot项目基于aop的方式拦截系统中日志
  2. 将该日志投递到 kafka 中,该过程一定要采用异步的形式
  3. Logstash 订阅 kafka 的主题获取日志消息内容
  4. 在将日志消息内容输出到es中存放
  5. 开发者使用Kibana连接到ElasticSeach 查询存储日志内容

logstash

Logstash是一个开源数据收集引擎,具有实时管道功能。
Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地

进入 logstash 目录,执行命令安装输入输出插件

bin/logstash-plugin install logstash-input-kafka
bin/logstash-plugin install logstash-output-elasticsearch

添加配置文件:logstash/config/kafka.conf

# 输入
input {
  kafka {
    bootstrap_servers => "192.168.10.110:9091"
    topics => "主题名称"
 }
}
# 过滤排除一些不需要写入的日志
filter {
  #Only matched data are send to output.
}
# 输出
output {
   elasticsearch {
    action => "index"          #The operation on ES
    hosts  => "192.168.10.110:9200"   #ElasticSearch host, can be array.
    index  => "索引名称"         #The index to write data to.
  }
}

启动logstash:./logstash -f …/config/kafka.conf

Aop拦截日志

@Aspect
@Component
public class AopLogAspect {
    @Value("${server.port}")
    private String serverPort;
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    @Pointcut("execution(* com.example.service.*.*(..))")
    private void serviceAspect() {}
    @Autowired
    private LogContainer logContainer;
    // 异常通知
    @AfterThrowing(pointcut = "serviceAspect()", throwing = "e")
    public void serviceAspect(JoinPoint point, Exception e) {
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        JSONObject jsonObject = new JSONObject();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        jsonObject.put("request_time", df.format(new Date()));
        jsonObject.put("request_url", request.getRequestURL().toString());
        jsonObject.put("request_method", request.getMethod());
        jsonObject.put("signature", point.getSignature());
        jsonObject.put("request_args", Arrays.toString(point.getArgs()));
        jsonObject.put("error", e.toString());
        // IP地址信息
        jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);
        JSONObject requestJsonObject = new JSONObject();
        requestJsonObject.put("request", jsonObject);
        // 将日志信息投递到kafka中
        String log = requestJsonObject.toJSONString();
        logContainer.put(log);
    }
}
使用队列+线程实现异步
@Component
public class LogContainer {
    private static BlockingDeque<String> logDeque = new LinkedBlockingDeque<>();
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    public LogContainer() {
        new LogThreadKafka().start();
    }
    // 存入日志
    public void put(String log) {
        logDeque.offer(log);
    }
    // 只需要创建一次线程
    class LogThreadKafka extends Thread {
        @Override
        public void run() {
            while (true) {
                String log = logDeque.poll();
                if (!StringUtils.isEmpty(log)) {
                    // 将消息投递kafka中
                    kafkaTemplate.send("xxx-log", log);
                }
            }
        }
    }
}

相关推荐

  1. 日志数据采集存储

    2024-05-10 06:48:03       15 阅读
  2. ELK+kafka日志采集

    2024-05-10 06:48:03       8 阅读
  3. Flume采集日志存储到HDFS

    2024-05-10 06:48:03       41 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-05-10 06:48:03       17 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-05-10 06:48:03       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-05-10 06:48:03       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-05-10 06:48:03       18 阅读

热门阅读

  1. MySQL变量的定义与应用

    2024-05-10 06:48:03       8 阅读
  2. Node.js身份证实名认证接口、身份证识别API

    2024-05-10 06:48:03       9 阅读
  3. 第七十五章 锁定 Apache (UNIX® Linux macOS)

    2024-05-10 06:48:03       9 阅读
  4. Node.js发票ocr识别、发票查验接口

    2024-05-10 06:48:03       7 阅读
  5. MySQL中的字符集陷阱:为何避免使用UTF-8

    2024-05-10 06:48:03       10 阅读
  6. 【Web漏洞指南】服务器端 XSS(动态 PDF)

    2024-05-10 06:48:03       8 阅读
  7. CNOCR和PaddleOCR提取pdf中文字-个人记录

    2024-05-10 06:48:03       13 阅读