1、elasticsearch使用的是8.5.0
索引和mapping构建:
PUT image-index
{
"mappings": {
"properties": {
"mydatavector": {
"type": "dense_vector",
"dims": 3,
"index": true,
"similarity": "dot_product"
},
"title": {
"type": "text"
}
}
}
}
2、数据入库:使用python的elasticsearch 包如果为7.11的,可以同时兼容6.x、7.x、8.x版本
from sentence_transformers import SentenceTransformer
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
encoder = SentenceTransformer('你的模型本地路径')
client2 = Elasticsearch(['http://用户名:密码@IP:PORT'])
def batch_write(data):
actions = [
{"_index": "index_name", # 替换为您的索引名称
"_source": d,
"_id": d["id"]}
for d in data
]try:
bulk(client2, actions)except Exception as e:
print("bach write error")def es_data_mapping(json_data):
question = json_data["title"].strip().replace(" ", "")
#注意这里的normalize_embeddings=True和mapping中的"similarity": "dot_product"相对应
doc_vector = encoder.encode([question],convert_to_tensor=False,normalize_embeddings=True).tolist()[0]
json_data["mydatavector"] = doc_vector
return json_data
batch_write_num = 200
def write_data_2_es():
data_list = [{"id":"1","title":"大家好"}]
write_list = []
for each in data_list:
write_list.append(es_data_mapping(each))
if len(write_list)>batch_write_num:
batch_write(write_list)
write_list.clear()
if len(write_list) > 0:
batch_write(write_list)
write_list.clear()
if __name__ == '__main__':
write_data_2_es()
3、查询检索:
POST http://IP:PORT/INDEX_NAME/_search
{
"_source": ["mydatavector","title"],
"min_score": 0.5,
"knn": {
"field": "mydatavector","query_vector": [0,0,0],
"k": 3,
"num_candidates": 100,"filter": {
"bool": {
"must": [{
"terms": {
"title": ["狗头"
]
}}
]
}}
}
}}