ElasticSeach(存储日志信息)
Logstash(搬运工)
Kibana 连接ElasticSeach图形化界面查询日志
ELK采集日志的原理:
- 在每个服务器上安装Logstash
- Logstash需要配置固定读取某个日志文件
- Logstash将日志文件格式化为json的格式输出到es中
- 开发者使用Kibana连接到ElasticSeach 查询存储日志内容
为什么将日志存储在ElasticSeach
其底层使用到倒排索引 搜索效率高
为什么需要使用elk+kafka
如果单纯的使用elk的话,服务器节点扩容时需要在每个服务器上安装 Logstash 步骤十分冗余。
Logstash读取本地日志文件,可能会对本地的磁盘io性能会有一定影响。
elk+kafka采集日志的原理:
- springboot项目基于aop的方式拦截系统中日志
- 将该日志投递到 kafka 中,该过程一定要采用异步的形式
- Logstash 订阅 kafka 的主题获取日志消息内容
- 在将日志消息内容输出到es中存放
- 开发者使用Kibana连接到ElasticSeach 查询存储日志内容
logstash
Logstash是一个开源数据收集引擎,具有实时管道功能。
Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地
进入 logstash 目录,执行命令安装输入输出插件
bin/logstash-plugin install logstash-input-kafka
bin/logstash-plugin install logstash-output-elasticsearch
添加配置文件:logstash/config/kafka.conf
# 输入
input {
kafka {
bootstrap_servers => "192.168.10.110:9091"
topics => "主题名称"
}
}
# 过滤排除一些不需要写入的日志
filter {
#Only matched data are send to output.
}
# 输出
output {
elasticsearch {
action => "index" #The operation on ES
hosts => "192.168.10.110:9200" #ElasticSearch host, can be array.
index => "索引名称" #The index to write data to.
}
}
启动logstash:./logstash -f …/config/kafka.conf
Aop拦截日志
@Aspect
@Component
public class AopLogAspect {
@Value("${server.port}")
private String serverPort;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Pointcut("execution(* com.example.service.*.*(..))")
private void serviceAspect() {}
@Autowired
private LogContainer logContainer;
// 异常通知
@AfterThrowing(pointcut = "serviceAspect()", throwing = "e")
public void serviceAspect(JoinPoint point, Exception e) {
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = requestAttributes.getRequest();
JSONObject jsonObject = new JSONObject();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
jsonObject.put("request_time", df.format(new Date()));
jsonObject.put("request_url", request.getRequestURL().toString());
jsonObject.put("request_method", request.getMethod());
jsonObject.put("signature", point.getSignature());
jsonObject.put("request_args", Arrays.toString(point.getArgs()));
jsonObject.put("error", e.toString());
// IP地址信息
jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);
JSONObject requestJsonObject = new JSONObject();
requestJsonObject.put("request", jsonObject);
// 将日志信息投递到kafka中
String log = requestJsonObject.toJSONString();
logContainer.put(log);
}
}
使用队列+线程实现异步
@Component
public class LogContainer {
private static BlockingDeque<String> logDeque = new LinkedBlockingDeque<>();
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public LogContainer() {
new LogThreadKafka().start();
}
// 存入日志
public void put(String log) {
logDeque.offer(log);
}
// 只需要创建一次线程
class LogThreadKafka extends Thread {
@Override
public void run() {
while (true) {
String log = logDeque.poll();
if (!StringUtils.isEmpty(log)) {
// 将消息投递kafka中
kafkaTemplate.send("xxx-log", log);
}
}
}
}
}