学习参考文档:博客园-不吃紫菜
正向索引、倒排索引
Kibana:数据可视化
Elasticsearch:存储、计算、搜索数据。
Elasticsearch介绍
Elasticsearch、Kibana、Logstash、Beats合称Elastic Stack(ELK)。应用于日志数据分析、实时监控。
Elasticsearch基于Lucene,Lucene是基于Java语言的搜索引擎类库。
Elasticsearch是面向文档存储的。
在ES中,索引(index)是基本的数据存储单元。
倒排索引
原文本表(Stored Fields):
文本ID | 文本 |
---|---|
1 | I like piano and plane |
2 | I play piano |
3 | I am your best friend |
倒排索引表(Inverted Index):
Term Dictionary | Posting list |
---|---|
am | 3 |
and | 1 |
best | 3 |
friend | 3 |
I | 1,2,3 |
like | 1 |
piano | 1,2 |
plane | 1 |
play | 2 |
your | 3 |
有了倒排索引表后,在所有文本中查找到包含目标词的文本的时间复杂度是\(O(\log N)\)。倒排索引表也不一定要存储在磁盘中,一些Term具有相同的前缀,因此可以用树结构轻量化表示所有的Term Dictionary然后存储在内存中,这就是Term Index。
Term Index:
p | i | ano |
la | ne | |
y |
一个Segment=Inverted Index+Term Index+Stored Fields+Doc Values。Segment的更新是通过覆盖的方式而不是修改的方式。
多个基础Segment就可以组成一个基础搜索库Lucene,ElasticSearch就是基于Lucene。
在ES中,一个Shard就是一个Lucene库,多个Shard可以分布式运行在不同Node上,因此也诞生了Shard副本概念(和Kafka的分区相似)。大规模分布式ES中,有三类Node分工:1. Master Node(集群管理);2. Data Node(存储数据);3. Coordinate Node(处理请求)。
ES的搜索流程:
Query Phase:客户端请求 -> Coordinate Node -> 多个Data Node -> 并发搜索每个Segment -> 返回文档ID和排序信息(?)
Fetch Phase:Coordinate Node 根据文档ID -> 多个Data Node -> 检索Stored Fields -> 返回目标文档
安装配置Elasticsearch
Elasticsearch安装
- 下载安装包:
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.14.3-linux-x86_64.tar.gz
- 在新建用户zt的.profile文件中追加下述内容以配置环境变量。
vim /home/zt/.profile
export ES_JAVA_HOME=/home/zt/Elasticsearch814/elasticsearch-8.14.3/jdk/
export ES_HOME=/home/zt/Elasticsearch814/elasticsearch-8.14.3
执行source .profile
- 修改配置
vim $ES_HOME/config/elasticsearch.yml
修改:
# 使本地电脑能够访问到远程服务器
network.host: 0.0.0.0
# 指定为单节点,绕过引导检查,设置为开发者模式
discovery.type: single-node
# 开启security安全认证
xpack.security.enabled: true
vim $ES_HOME/config/jvm.options
修改下述内容:我设成了1G
-Xms1g
-Xmx1g
为Elasticsearch配置用户密码
在保持ES启动的情况下使用下述所有命令。
# 查询ES中现有的用户
$ES_HOME/bin/elasticsearch-users list
# 新建ES用户zt并赋予超级用户权限,然后设置密码
$ES_HOME/bin/elasticsearch-users useradd zt -r superuser
但是通过上述命令自建的用户zt会导致连接Kibana失败。所以还是通过下述命令为系统用户apm_system
、kibana_system
、kibana
、logstash_system
、beats_system
、remote_monitoring_user
、elastic
生成密码。但是这些密码似乎只会在console中被打印出一次,所以应该妥善保存。
$ES_HOME/bin/elasticsearch-setup-passwords auto
Elasticsearch启动和停止
后台启动:
$ES_HOME/bin/elasticsearch -d
停止:
ps -ef | grep elasticsearch | grep -v grep | awk '{print $2}' | xargs kill
之后浏览器访问:http://公网IP:9200
即可看见结果。Chrome可以安装multi
elasticsearch heads插件美化可视化结果。
分词器安装
这里下载ICU分词器。
$ES_HOME/bin/elasticsearch-plugin install analysis-icu
再下载一个分词器:
wget https://release.infinilabs.com/analysis-ik/stable/elasticsearch-analysis-ik-8.14.3.zip
unzip elasticsearch-analysis-ik-8.14.3.zip -d $ES_HOME/plugins/elasticsearch-analysis-ik-8.14.3
安装完毕后重启ES。
在浏览器打开的网页的的符合查询栏目,对http://公网IP:9200/_analyze发送POST请求,请求体如下。点击“提交请求”可以看到分词结果。
{
"analyzer": "icu_analyzer",
"text": "你好,我的Elasticsearch"
}
Kibana安装和配置
wget https://artifacts.elastic.co/downloads/kibana/kibana-8.14.3-linux-x86_64.tar.gz
修改kibana.yml
vim /home/zt/Kibana814/kibana-8.14.3/config/kibana.yml
修改下述内容:
server.host: "0.0.0.0"
i18n.locale: "zh-CN"
elasticsearch.hosts: ["http://192.168.0.45:9200"]
elasticsearch.username: "kibana_system"
elasticsearch.password: "kibana_system的密码"
配置后上述并启动kibana后,直接使用自建用户zt及其密码就可以登录Kibana了。
启动停止
nohup /home/zt/Kibana814/kibana-8.14.3/bin/kibana &
启动过程比较慢
之后直接在浏览器访问http://公网IP:5601
,不必访问http://公网IP:9200
了。
关闭kibana:
ps -ef | grep kibana | grep -v grep | awk '{print $2}' | xargs kill
Kibana用法
打开“开发工具”
# 创建索引myindex
PUT /myindex
# 获取索引myindex
GET /myindex
# 在索引myindex中创建映射
POST /myindex/_mapping
{
"properties": {
"content": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
}
}
}
# 往索引myindex添加数据
POST /myindex/_create/0
{
"content": "你好,Elasticsearch。你好,西伯利亚蝴蝶"
}
POST /myindex/_create/1
{
"content": "你好,Elasticsearch。你好,仙女座星系"
}
# 在myindex索引中进行搜索
GET /myindex/_search
{
"query": {"match": {"content": "你好"}}
}
# 删除索引myindex
DELETE /myindex
# 修改副本数量
PUT /myindex/_settings
{
"index": {
"number_of_replicas: 2"
}
}
# 新增字段
PUT /myindex/_mapping
{
"properties": {
"grade": {
"type": "integer"
}
}
}
ES常用命令
创建索引
创建一个名为employees的索引,shard数为1,shard副本数为0。
HTTP请求
PUT: http://公网IP:9200/employees
Headers: Content-Type:application/json
Authorization -> Basic Auth -> 输入用户名和密码
Body:
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword" // 指定关键字类型为keyword,表示不对之进行分词
}
}
},
"gender": {
"type": "keyword" // 精确匹配,不参与分词
},
"birth_date": {
"type": "date",
"format": "yyyy-MM-dd"
},
"CV": {
"type": "text",
"analyzer": "ik_max_word", // 设置分词器
"search_analyzer": "ik_smart"
}
}
}
}
Kibana
DELETE /employees
PUT /employees
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword" // 指定关键字类型为keyword,表示不对之进行分词
}
}
},
"gender": {
"type": "keyword" // 精确匹配,不参与分词
},
"birth_date": {
"type": "date",
"format": "yyyy-MM-dd"
},
"CV": {
"type": "text",
"analyzer": "ik_max_word", // 设置分词器
"search_analyzer": "ik_smart"
}
}
}
}
Python
elasticsearch==8.17.0
from elasticsearch import Elasticsearch
# 连接配置
es = Elasticsearch(
hosts=["http://公网IP:9200"],
basic_auth=("用户名", "密码"),
request_timeout=30
)
print(es.info()) # 测试连接
index_name = "employees"
index_body = {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"gender": {
"type": "keyword"
},
"birth_date": {
"type": "date",
"format": "yyyy-MM-dd"
},
"CV": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
}
}
}
}
try:
if es.indices.exists(index=index_name):
es.indices.delete(index=index_name)
print(f"索引 {index_name} 删除成功")
# 创建新索引(携带完整配置)
response = es.indices.create(index=index_name, body=index_body)
print(f"索引 {index_name} 创建成功")
print("响应详情:", response)
except Exception as e:
print("操作失败:", str(e))
插入数据
HTTP
单条数据插入
POST: http://公网IP:9200/employees/_doc
Headers: Content-Type:application/json
Authorization -> Basic Auth -> 输入用户名和密码
Body:
{
"name": "张三",
"gender": "M",
"birth_date": "1995-08-20",
"CV": "资深软件工程师,擅长Java和Python开发,熟悉Elasticsearch。"
}
批量数据插入
POST: http://公网IP:9200/employees/_doc
Headers: Content-Type:application/x-ndjson
Authorization -> Basic Auth -> 输入用户名和密码
Body:
{ "index": { "_index": "employees" } }
{ "name": "张三", "gender": "M", "birth_date": "1990-01-01", "CV": "资深Java工程师..." }
{ "index": { "_index": "employees", "_id": "1001" } }
{ "name": "李四", "gender": "F", "birth_date": "1995-05-20", "CV": "前端React专家..." }
这里一次性批量插入了两条数据,而且在第二条数据中手动制定了_id,这样ES就不会自动创建第二条数据的_id编码。另外需要注意上述ndjson格式的请求体必须要以一个新的换行符结尾。
Kibana
Python
使用Python实现批量查询数据。
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
es = Elasticsearch(
hosts=["http://113.45.129.200:9200"],
basic_auth=("用户名", "密码"),
request_timeout=30
)
actions = [
{
"_index": "employees",
"_source": {
"name": "王五",
"gender": "M",
"birth_date": "1988-11-15",
"CV": "运维工程师,熟悉Linux和Kubernetes。"
}
},
{
"_index": "employees",
"_id": "1002", # 指定文档ID
"_source": {
"name": "赵六",
"gender": "F",
"birth_date": "2000-03-22",
"CV": "数据分析师,擅长Python和SQL。"
}
}
]
success_count, errors = bulk(es, actions)
print(f"成功插入 {success_count} 条文档")
if errors:
print("错误详情:", errors)
修改索引
Kibana
新增索引
向employees索引添加一个字段salary。
PUT /employees/_mapping
{
"properties": {
"salary": {
"type": "integer"
}
}
}
在索引中已经存储了数据时也是可以向索引增加字段的。在查询数据时,在salary字段中没有值的数据是不会显示salary这个字段的。
另外,删除已有的字段映射或者数据是很难实现的,我想这应该跟Segment的更新方式只能是覆盖有关。
Python
新增索引
向employees索引添加一个字段salary。
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import RequestError
es = Elasticsearch(
hosts=["http://113.45.129.200:9200"],
basic_auth=("用户名", "密码"),
request_timeout=30
)
body = {
"properties": {
"salary": {
"type": "integer"
}
}
}
try:
response = es.indices.put_mapping(
index="employees",
body=body
)
print("字段添加成功!响应结果:", response)
except RequestError as e:
print("更新映射失败:", e.info)
except Exception as e:
print("发生其他错误:", str(e))
查询索引
(更高级的查询需要开启一个新的#)
Kibana
查询索引元数据
GET /employees
查询索引中存储的数据
GET employees/_search
{
"query": {
"match_all": {}
}
}
Python
查询索引中存储的数据
from elasticsearch import Elasticsearch
es = Elasticsearch(
hosts=["http://113.45.129.200:9200"],
basic_auth=("用户名", "密码"),
request_timeout=30
)
body = {
"query": {
"match_all": {}
}
}
result = es.search(index="employees", body=body)
print("文档总数:", result["hits"]["total"]["value"])
print(result)
索引别名
# 创建一个索引并指定一个别名
PUT /myindex
{
"aliases": {
"myalias": {}
},
"settings": {
"refresh_interval": "30s",
"number_of_shards":1,
"number_of_replicas": 0
}
}
# 或,为现有索引添加新的别名(原有别名依然可以使用)
POST /_aliases
{
"actions": [
{
"add": {
"index": "myindex",
"alias": "newalias"
}
}
]
}
# 通过别名获取该索引
GET /myalias
# 不使用别名的话可以使用通配符对相关的所有索引进行搜索
POST *index/_search
# 使用别名的情况下可以多个索引施加同一个别名,然后直接查这一个别名实现对所有索引的搜索。
Elasticsearch文档操作
# 通过POST命令,"_d"为UID
POST /myindex/_doc
{
"name": "Lucas",
"sex": "m",
"age": 24,
"address": "America",
"remark": "C++"
}
# 通过PUT命令新增数据,"_id"为给定的值,已有的久文档会被替换
PUT /myindex/_doc/0
{
"name": "Jack",
"sex": "m",
"age": 24,
"address": "America",
"remark": "Java"
}
PUT /myindex/_doc/1
{
"name": "Lucy",
"sex": "f",
"age": 23,
"address": "America",
"remark": "Python"
}
# 查看数据
POST /myindex/_search
# 批量新增
POST _bulk
{ "create": { "_index": "myindex", "_id": 2 } }
{ "id": 3, "title": "Tom", "content": "Tom is a teacher", "tags": ["Java", "Python", "C++"], "create_time": 2024 }
{ "create": { "_index": "myindex", "_id": 3 } }
{ "id": 4, "title": "Malina", "content": "Malina is a student", "tags": ["Java", "Python"], "create_time": 2024 }
# 批量替换
POST _bulk
{ "index": { "_index": "myindex", "_id": 2 } }
{ "id": 3, "title": "Tom", "content": "Tom is a teacher", "tags": ["Java", "Python", "C++"], "create_time": 2024 }
{ "index": { "_index": "myindex", "_id": 3 } }
{ "id": 4, "title": "Malina", "content": "Malina is a student", "tags": ["Java", "Python"], "create_time": 2024 }
Python操作ES
环境准备
在Kibana中运行GET /
来查看ES版本号(位于”version”->“number”)中。例如我的当前ES版本为8.14.3。
pip install elasticsearch==8.17.0
from elasticsearch import Elasticsearch
es = Elasticsearch("http://公网IP:9200")
print(es.info()) # 能够打印出信息说明本地连接Elasticsearch成功
Python ES 查询语句
query = {
"query": {
"bool": {
"must": [
{"match": {"mapping1": "value0"}},
{"match": {"mapping2": "value1"}},
{"match": {"mapping3": "value2"}}
]
}
}
}
query_result = es.count(index="my_index_name", body=search_query)
print(query_result) # 打印出信息{'count': 261285, '_shards': {'total': 3, 'successful': 3, 'skipped': 0, 'failed': 0}}
query_result = es.search(index="my_index_name", body=search_query)
print(query_result) # 打印出查询到的所有信息
Elasticsearch原理
倒排索引
文档预处理:比如删除语气词
构建词典
创建倒排列表
存储索引文件
查询处理
Elasticsearch中的Index相当于MySQL中的一个table
Elasticsearch中的Mapping相当于MySQL中的一个schema
Elasticsearch中的Document相当于MySQL中的一个row
RAG Demo
# Requirements.txt
# Elasticsearch: 8.14.3
# Kibana: 8.14.3
elasticsearch==8.17.0
openai==1.59.3
文本向量化
借助阿里云百炼平台。
from openai import OpenAI
llm_client = OpenAI(api_key=<API_KEY>, base_url='https://dashscope.aliyuncs.com/compatible-mode/v1')
text = "Hello, ElasticSearch"
tmp = llm_client.embeddings.create(input=text, model='text-embedding-v3')
# 打印出文本向量化后的结果以及向量维度。这里应该是(1, 1024)维度
print(tmp.data[0].embedding, len(tmp.data[0].embedding))
文本向量入库
from elasticsearch import Elasticsearch
es = Elasticsearch("http://公网IP:9200", request_timeout=30)
print(es.info()) # 能打印出结果则连接成功
- 创建索引用于存储文本向量。
mappings = {
"mappings": {
"properties": {
"semantic": {
"type": "dense_vector",
"dims": 1024
},
"content": {
"type": "text"
}
}
}
}
es.indices.create(index='my_rag_vector_index', body=mappings)
senmantic的类型dense_vector在ElasticSearch 7 版本中不受支持。
- 向量文本存入新构建的索引
# documnets格式:[{'text': ..., 'text_vector': ...}, ...]
def generate_docs(documents):
index_name = 'my_rag_vector_index'
result = []
for row in documents:
text = row['text']
text_vector = row['text_vector']
doc = {'content': text, 'semantic': text_vector}
result.append({'index': {'_index': index_name}})
result.append(doc)
return result
es.bulk(body=generate_docs(documents))
检索
user_text = 'ElasticSearch'
user_text_vector = llm_client.embeddings.create(input=text, model='text-embedding-v3').data[0].embedding
query = {
'knn': {
'field': 'semantic',
'query_vector': user_text_vector,
'num_candidates': 1
}
}
result = es.search(index='rag_vector_index_zt', body={'query': query})
print(result)