一、需求背景
在实际项目上,我们需要查询多张表,然后组装数据返回,如果串行查询的话,接口响应时间会很长,根据需要,可以考虑并行查询,提高接口响应速度。
二、实现方案
使用CompletableFuture多线程任务编排工具,并发查询数据,最后去汇总查询数据结果返回。
三、编码实现
3.1、定义接口
package com.saferycom.completableFutrue;
/**
* @author LH
*/
public interface UserServiceA {
/**
* 统计粉丝数
* @param userId
* @return
*/
Long countFansByUserId(Long userId);
/**
* 统计消息数
* @param userId
* @return
*/
Long countMsgByUserId(Long userId);
/**
* 统计收藏数
* @param userId
* @return
*/
Long countCollectByUserId(Long userId);
/**
* 统计关注数
* @param userId
* @return
*/
Long countFollowByUserId(Long userId);
/**
* 统计红包数量
* @param userId
* @return
*/
Long countRedBagByUserId(Long userId);
/**
* 统计卡签数
* @param userId
* @return
*/
Long countCouponByUserId(Long userId);
/**
* 统计发布文章数
* @param userId
* @return
*/
Long countArticleByUserId(Long userId);
/**
* 统计点赞数
* @param userId
* @return
*/
Long countLikeByUserId(Long userId);
}
3.2、定义接口实现类
每个方法sleep大约4到10秒,用来模拟查询数据库耗时操作
package com.saferycom.completableFutrue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
/**
* @author LH
*/
@Service
@Slf4j
public class UserServiceAImpl implements UserServiceA {
@Override
public Long countFansByUserId(Long userId) {
log.info("UserService获取FansCount的线程:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(10);
log.info("获取FansCount===睡眠:" + 10 + "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 5201L;
}
@Override
public Long countMsgByUserId(Long userId) {
log.info("UserService获取MsgCount的线程:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(10);
log.info("获取MsgCount===睡眠:" + 10 + "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 6182L;
}
@Override
public Long countCollectByUserId(Long userId) {
log.info("UserService获取CollectCount的线程:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(10);
log.info("获取CollectCount==睡眠:" + 10 + "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1264L;
}
@Override
public Long countFollowByUserId(Long userId) {
log.info("UserService获取FollowCount的线程:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(10);
log.info("获取FollowCount===睡眠:" + 10 + "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 142L;
}
@Override
public Long countRedBagByUserId(Long userId) {
log.info("UserService获取RedBagCount的线程:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(4);
log.info("获取RedBagCount===睡眠:" + 4 + "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 563L;
}
@Override
public Long countCouponByUserId(Long userId) {
log.info("UserService获取CouponCount的线程:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(8);
log.info("获取CouponCount===睡眠:" + 8 + "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 9887L;
}
@Override
public Long countArticleByUserId(Long userId) {
log.info("UserService获取ArticleCount的线程:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(8);
log.info("获取ArticleCount===睡眠:" + 8 + "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 135L;
}
@Override
public Long countLikeByUserId(Long userId) {
log.info("UserService获取likeCount的线程:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(8);
log.info("获取likeCount===睡眠:" + 8 + "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 46L;
}
}
3.3、定义返回类
package com.saferycom.completableFutrue;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author LH
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class UserInfoDto {
/**
* 用户ID
*/
private Long userId ;
/**
* 发布文章数
*/
private Long articleCount ;
/**
* 点赞数
*/
private Long likeCount ;
/**
* 粉丝数
*/
private Long fansCount ;
/**
* 消息数
*/
private Long msgCount ;
/**
* 收藏数
*/
private Long collectCount ;
/**
* 关注数
*/
private Long followCount ;
/**
* 红包数
*/
private Long redBagCount ;
/**
* 卡券数
*/
private Long couponCount ;
}
3.4、整合CompletableFuture,聚合查询,汇总
package com.saferycom.completableFutrue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.*;
/**
* 一个页面有多达10个左右的一个用户行为数据,
* 比如:点赞数,发布文章数,点赞数,消息数,关注数,收藏数,粉丝数,卡券数,红包数等等!
*
* @author LH
*/
@Slf4j
@Component
public class MyFutureTask {
@Resource
private UserServiceA userService;
/**
* 核心线程 8 最大线程 20 保活时间30s 存储队列 10 有守护线程 拒绝策略:将超负荷任务回退到调用者
*/
private static ExecutorService executor = new ThreadPoolExecutor(20, 50, 30L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
public UserInfoDto getUserAllInfo(final Long userId) {
System.out.println("MyFutureTask的线程:" + Thread.currentThread());
try {
// 发布文章数
CompletableFuture<Long> articleCountFT = CompletableFuture.supplyAsync(() -> userService.countArticleByUserId(userId), executor);
// 点赞数
CompletableFuture<Long> LikeCountFT = CompletableFuture.supplyAsync(() -> userService.countLikeByUserId(userId), executor);
// 粉丝数
CompletableFuture<Long> fansCountFT = CompletableFuture.supplyAsync(() -> userService.countFansByUserId(userId), executor);
//消息数
CompletableFuture<Long> msgCountFT = CompletableFuture.supplyAsync(() -> userService.countMsgByUserId(userId), executor);
//收藏数
CompletableFuture<Long> collectCountFT = CompletableFuture.supplyAsync(() -> userService.countCollectByUserId(userId), executor);
//关注数
CompletableFuture<Long> followCountFT = CompletableFuture.supplyAsync(() -> userService.countFollowByUserId(userId), executor);
//红包数
CompletableFuture<Long> redBagCountFT = CompletableFuture.supplyAsync(() -> userService.countRedBagByUserId(userId), executor);
//卡券数
CompletableFuture<Long> couponCountFT = CompletableFuture.supplyAsync(() -> userService.countCouponByUserId(userId), executor);
// 聚合所有的查询 等所有的查询都查询完
CompletableFuture.allOf(articleCountFT, LikeCountFT, fansCountFT, msgCountFT, collectCountFT, followCountFT, redBagCountFT, couponCountFT)
.get(20, TimeUnit.SECONDS);
UserInfoDto userInfoDto = UserInfoDto.builder()
.articleCount(articleCountFT.get())
.likeCount(LikeCountFT.get())
.fansCount(fansCountFT.get())
.msgCount(msgCountFT.get())
.collectCount(collectCountFT.get())
.followCount(followCountFT.get())
.redBagCount(redBagCountFT.get())
.couponCount(couponCountFT.get())
.build();
return userInfoDto;
} catch (Exception e) {
log.error("get user behavior data error", e);
return new UserInfoDto();
}
}
}
3.5、controller类测试
package com.saferycom.completableFutrue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @author LH
* @ClassName: UserControllerA
* @Description: 用户控制器
*/
@RestController
@RequestMapping("/api")
@Slf4j
public class UserControllerA {
@Resource
private MyFutureTask myFutureTask;
/**
* java并发查询数据实战 提高接口响应速度
*
* @param userId
* @return
*/
@GetMapping("/getData")
public UserInfoDto getUserData(Long userId) {
log.info("UserController的线程:{}", Thread.currentThread());
long begin = System.currentTimeMillis();
UserInfoDto userInfoDto = myFutureTask.getUserAllInfo(userId);
long end = System.currentTimeMillis();
log.info("===============总耗时:{}", (end - begin) / 1000.0000 + "秒");
return userInfoDto;
}
}
3.6、启动服务,使用ApiPost请求接口测试
如控制台日志所示,接口总耗时10秒左右,实现类中每个方法sleep大约4到10秒,是并行执行效果 ,这样可以打打提升接口响应速度。
四、总结
如上所示,CompletabableFuture可以实现多线程任务编排,最后通过阻塞汇总各线程结果,当然以上只是其用法之一;它还支持更加丰富的异步任务编排操作,如A、B、C三个线程,A、B并行执行,A、B执行完再执行C等等。
如有需要自行研究了解。