最近项目需要连接NebulaGraph图数据库获取部分数据,于是查看了一些相关资料,发现可以通过类似数据库连接池NebulaPool方式连接。主要也是以下几个部分:创建连接池,、创建会话、执行查询、解析结果。下面是一个简单的DEMO记录。
组件项目
<!-- SpringBoot依赖包 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<!-- Client依赖包 -->
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>client</artifactId>
<version>3.6.1</version>
</dependency>
@Data
@ConfigurationProperties(prefix = "nebula-graph")
public class NebulaGraphProperties {
private Boolean enable = false;
private String[] clusterNodes = null;
private int maxConnectSize = 10;
private String username;
private String password;
}
public class NebulaGraphFactoryBean implements FactoryBean, DisposableBean {
private NebulaGraphProperties nebulaGraphProperties;
private NebulaPool nebulaPool;
public NebulaGraphFactoryBean(NebulaGraphProperties nebulaGraphProperties) {
this.nebulaGraphProperties = nebulaGraphProperties;
String[] clusterNodes = nebulaGraphProperties.getClusterNodes();
if (null == clusterNodes || clusterNodes.length == 0) {
return;
}
List<HostAddress> hostAddresses = new ArrayList<>();
for (int i = 0, len = clusterNodes.length; i < len; i++) {
String clusterNode = clusterNodes[i];
if (!clusterNode.contains(":")) {
continue;
}
String[] ipAndPort = clusterNode.split(":");
if (ipAndPort.length != 2 || !ipAndPort[1].matches("\\d+")) {
throw new RuntimeException("Invalid Nebula Graph Node " + clusterNode);
}
hostAddresses.add(new HostAddress(ipAndPort[0], Integer.parseInt(ipAndPort[1])));
}
NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
nebulaPoolConfig.setMaxConnSize(nebulaGraphProperties.getMaxConnectSize());
nebulaPool = new NebulaPool();
try {
nebulaPool.init(hostAddresses, nebulaPoolConfig);
} catch (UnknownHostException e) {
throw new RuntimeException("Unknown Nebula Graph Host");
}
}
@Override
public Object getObject() {
try {
return nebulaPool.getSession(nebulaGraphProperties.getUsername(), nebulaGraphProperties.getPassword(), false);
} catch (NotValidConnectionException | IOErrorException | AuthFailedException | ClientServerIncompatibleException e) {
throw new RuntimeException("Nebula graph session exception", e);
}
}
@Override
public Class<?> getObjectType() {
return Session.class;
}
public Session getSession() {
return (Session) getObject();
}
@Override
public void destroy() throws Exception {
nebulaPool.close();
}
}
@EnableConfigurationProperties({
NebulaGraphProperties.class })
@Configuration
public class NebulaGraphAutoConfiguration {
@ConditionalOnProperty(name = "nebula-graph.enable", havingValue = "true", matchIfMissing = false)
@Bean
public NebulaGraphFactoryBean nebulaGraphFactoryBean(NebulaGraphProperties nebulaGraphProperties) {
return new NebulaGraphFactoryBean(nebulaGraphProperties);
}
}
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.component.nebula.graph.config.NebulaGraphAutoConfiguration
业务项目
<!--ComponentNebulaGraph依赖包-->
<dependency>
<groupId>com.component</groupId>
<artifactId>component-nebula-graph</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
nebula-graph:
enable: false
cluster-nodes:
- 192.168.0.1:9559
- 192.168.0.1:9669
max-connect-size: 10
username: root
password: 123456
@Slf4j
@Service("nebulaGraphService")
public class NebulaGraphServiceImpl implements NebulaGraphService {
private static final String SPACE_QL = "USE %s";
@Autowired
private NebulaGraphFactoryBean nebulaGraphFactoryBean;
public NGResultV1DTO execute(String space, String ngql, Map<String, Object> parameterMap) throws IOErrorException {
Session session = nebulaGraphFactoryBean.getSession();
NGResultV1DTO ngResultV1DTO = JsonUtils.json(session.executeJson(String.format(SPACE_QL, space)), NGResultV1DTO.class);
if (!ngResultV1DTO.isSuccess()) {
return ngResultV1DTO;
}
String result = null == parameterMap ? session.executeJson(ngql) : session.executeJsonWithParameter(ngql, parameterMap);
log.info("execute result {}", result);
ngResultV1DTO = JsonUtils.json(result, NGResultV1DTO.class);
return ngResultV1DTO;
}
@Override
public <T> ResultDTO<T> executeOne(String space, String ngql, Map<String, Object> parameterMap, Class<T> clazz) throws IOErrorException {
return buildResultDTO(execute(space, ngql, parameterMap), clazz, true);
}
@Override
public <T> ResultDTO<List<T>> execute(String space, String ngql, Map<String, Object> parameterMap, Class<T> clazz) throws IOErrorException {
return buildResultDTO(execute(space, ngql, parameterMap), clazz, false);
}
private <T> ResultDTO buildResultDTO(NGResultV1DTO ngResultV1DTO, Class<T> clazz, boolean isSingleResult) throws IOErrorException {
if (!ngResultV1DTO.isSuccess()) {
NGResultV1DTO.Error error = ngResultV1DTO.getErrors().get(0);
return ResultDTO.fail(error.getCode(), error.getMessage());
}
List<T> resultList = parse(ngResultV1DTO, clazz);
return ResultDTO.success(!ObjectUtil.isEmpty(resultList) && isSingleResult ? resultList.get(0) : resultList);
}
private <T> List<T> parse(NGResultV1DTO ngResultV1DTO, Class<T> clazz) {
List<NGResultV1DTO.Result> results = ngResultV1DTO.getResults();
if (null == results || results.isEmpty()) {
return null;
}
NGResultV1DTO.Result result = results.get(0);
List<NGResultV1DTO.Data> datas = result.getDatas();
if (null == datas || datas.isEmpty()) {
return null;
}
boolean needColumns = false;
List<String> columns = result.getColumns();
List<T> targetList = new ArrayList<>();
for (int i = 0, len = datas.size(); i < len; i++) {
NGResultV1DTO.Data data = datas.get(i);
List<?> rows = data.getRows();
if (null == rows || rows.isEmpty()) {
continue;
}
if (i == 0) {
List<?> metas = data.getMetas();
if (null == metas || null == metas.get(0)) {
needColumns = true;
}
}
Object row = rows.get(0);
Map<String, Object> dataMap = new HashMap<>();
if (needColumns) {
Object[] rowArray = (Object[]) row;
for (int j = 0, jLen = rowArray.length; j < jLen; j++) {
dataMap.put(columns.get(j), rowArray[j]);
}
} else {
((Map<String, Object>) row).forEach((key, value) -> {
if (key.contains(".")) {
String[] keyArray = key.split(".");
dataMap.put(keyArray[keyArray.length - 1], value);
} else {
dataMap.put(key, value);
}
});
}
targetList.add(ReflectUtils.convertMapToObject(dataMap, clazz));
}
return targetList;
}
}
总体来说,跟普通的数据库连接还是很相似的,上手也是比较容易的。