1.RocketMqConfig
@Configuration
public class RocketMqConfig {
@Value("${expenseCenter.nameServer:10.172.32.21:1092;10.172.32.18:1075;10.172.32.23:1092}")
private String nameServer;
@Value("${spring.application.name:themis-sharing-platform-schedule}")
private String applicationName;
@Value("${rocketmq.producer.access-key:rocketmq2}")
private String accessKey;
@Value("${rocketmq.producer.secret-key:12345678}")
private String secretKey;
@Bean("fineRocketMqTemplate")
public RocketMQTemplate getFineRocktMqTemplate() {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(applicationName, getAclRPCHook());
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
defaultMQProducer.setNamesrvAddr(nameServer);
rocketMQTemplate.setProducer(defaultMQProducer);
return rocketMQTemplate;
}
public RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
}
2.发送消息
@Service
@Slf4j
public class DelayIntelligentCallServiceImpl implements DelayIntelligentCallService {
@Autowired
private RocketMQTemplate fineRocketMqTemplate;
@Value("${delay.intelligent.call.topic:delay_intelligent_call_topic}")
private String delayCallTopic;
@Override
public void delayIntelligentCall(String reminderId) {
Message<String> message = MessageBuilder.withPayload(reminderId).build();
try {
fineRocketMqTemplate.syncSend(delayCallTopic, message,2000,17);
log.info("delay call topic message :{}",message);
}catch (Exception e){
log.info("delay call failed{}", ExceptionUtils.getStackTrace(e));
}
}
}
3.接收消息
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "${delay.intelligent.call.group:themis_delay_intelligent_call_group}", topic = "${delay.intelligent.call.topic:themis_delay_intelligent_call_topic}",
accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}")
public class DelayIntelligentCallConsumer implements RocketMQListener<String> {
@Resource
ThemisReminderDetailService themisReminderDetailService;
@Resource
private ThemisReminderService themisReminderService;
@Override
public void onMessage(String msg) {
if (ObjectUtils.isEmpty(msg)) {
return;
}
try {
ThemisReminder themisReminder = themisReminderService.queryReminderByReminderId(msg);
if (ObjectUtils.isEmpty(themisReminder) || ObjectUtils.isEmpty(themisReminder.getStatus())) {
return;
}
if (Objects.equals(ReminderStateEnum.TIME_OUT_TO_BE_REPLY.getCode(), themisReminder.getStatus())) {
themisReminderDetailService.firstCall(themisReminder.getComplainId());
log.info("调用智能外呼,成功complainId:{}", themisReminder.getComplainId());
}
} catch (Exception ex) {
log.info("调用智能外呼,reminderId:{} 发生异常 :{}", msg, ExceptionUtils.getStackTrace(ex));
}
}
}