Elasticsearch Learning

Tao Zou

2025-03-16

学习参考文档:博客园-不吃紫菜

正向索引、倒排索引

Kibana:数据可视化

Elasticsearch:存储、计算、搜索数据。

Elasticsearch介绍

  • Elasticsearch、Kibana、Logstash、Beats合称Elastic Stack(ELK)。应用于日志数据分析、实时监控。

  • Elasticsearch基于Lucene,Lucene是基于Java语言的搜索引擎类库。

  • Elasticsearch是面向文档存储的。

在ES中,索引(index)是基本的数据存储单元。

倒排索引

原文本表(Stored Fields):

文本ID 文本
1 I like piano and plane
2 I play piano
3 I am your best friend

倒排索引表(Inverted Index):

Term Dictionary Posting list
am 3
and 1
best 3
friend 3
I 1,2,3
like 1
piano 1,2
plane 1
play 2
your 3

有了倒排索引表后,在所有文本中查找到包含目标词的文本的时间复杂度是\(O(\log N)\)。倒排索引表也不一定要存储在磁盘中,一些Term具有相同的前缀,因此可以用树结构轻量化表示所有的Term Dictionary然后存储在内存中,这就是Term Index。

Term Index:

p i ano
la ne
y

一个Segment=Inverted Index+Term Index+Stored Fields+Doc Values。Segment的更新是通过覆盖的方式而不是修改的方式。

多个基础Segment就可以组成一个基础搜索库LuceneElasticSearch就是基于Lucene

在ES中,一个Shard就是一个Lucene库,多个Shard可以分布式运行在不同Node上,因此也诞生了Shard副本概念(和Kafka的分区相似)。大规模分布式ES中,有三类Node分工:1. Master Node(集群管理);2. Data Node(存储数据);3. Coordinate Node(处理请求)。

ES的搜索流程:

  1. Query Phase:客户端请求 -> Coordinate Node -> 多个Data Node -> 并发搜索每个Segment -> 返回文档ID和排序信息(?)

  2. Fetch Phase:Coordinate Node 根据文档ID -> 多个Data Node -> 检索Stored Fields -> 返回目标文档

安装配置Elasticsearch

Elasticsearch安装

  1. 下载安装包:
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.14.3-linux-x86_64.tar.gz
  1. 在新建用户zt的.profile文件中追加下述内容以配置环境变量。
vim /home/zt/.profile
export ES_JAVA_HOME=/home/zt/Elasticsearch814/elasticsearch-8.14.3/jdk/
export ES_HOME=/home/zt/Elasticsearch814/elasticsearch-8.14.3

执行source .profile

  1. 修改配置
vim $ES_HOME/config/elasticsearch.yml

修改:

# 使本地电脑能够访问到远程服务器
network.host: 0.0.0.0
# 指定为单节点,绕过引导检查,设置为开发者模式
discovery.type: single-node
# 开启security安全认证
xpack.security.enabled: true
vim $ES_HOME/config/jvm.options

修改下述内容:我设成了1G

-Xms1g
-Xmx1g

为Elasticsearch配置用户密码

参考连接

在保持ES启动的情况下使用下述所有命令。

# 查询ES中现有的用户
$ES_HOME/bin/elasticsearch-users list
# 新建ES用户zt并赋予超级用户权限,然后设置密码
$ES_HOME/bin/elasticsearch-users useradd zt -r superuser

但是通过上述命令自建的用户zt会导致连接Kibana失败。所以还是通过下述命令为系统用户apm_systemkibana_systemkibanalogstash_systembeats_systemremote_monitoring_userelastic生成密码。但是这些密码似乎只会在console中被打印出一次,所以应该妥善保存。

$ES_HOME/bin/elasticsearch-setup-passwords auto

Elasticsearch启动和停止

后台启动:

$ES_HOME/bin/elasticsearch -d

停止:

ps -ef | grep elasticsearch | grep -v grep | awk '{print $2}' | xargs kill

之后浏览器访问:http://公网IP:9200即可看见结果。Chrome可以安装multi elasticsearch heads插件美化可视化结果。

分词器安装

这里下载ICU分词器。

$ES_HOME/bin/elasticsearch-plugin install analysis-icu

再下载一个分词器:

wget https://release.infinilabs.com/analysis-ik/stable/elasticsearch-analysis-ik-8.14.3.zip
unzip elasticsearch-analysis-ik-8.14.3.zip -d $ES_HOME/plugins/elasticsearch-analysis-ik-8.14.3

安装完毕后重启ES。

在浏览器打开的网页的的符合查询栏目,对http://公网IP:9200/_analyze发送POST请求,请求体如下。点击“提交请求”可以看到分词结果。

{
  "analyzer": "icu_analyzer",
  "text": "你好,我的Elasticsearch"
}

Kibana安装和配置

wget https://artifacts.elastic.co/downloads/kibana/kibana-8.14.3-linux-x86_64.tar.gz

修改kibana.yml

vim /home/zt/Kibana814/kibana-8.14.3/config/kibana.yml

修改下述内容:

server.host: "0.0.0.0"
i18n.locale: "zh-CN"
elasticsearch.hosts: ["http://192.168.0.45:9200"]
elasticsearch.username: "kibana_system"
elasticsearch.password: "kibana_system的密码"

配置后上述并启动kibana后,直接使用自建用户zt及其密码就可以登录Kibana了。

启动停止

nohup /home/zt/Kibana814/kibana-8.14.3/bin/kibana &

启动过程比较慢

之后直接在浏览器访问http://公网IP:5601,不必访问http://公网IP:9200了。

关闭kibana:

ps -ef | grep kibana | grep -v grep | awk '{print $2}' | xargs kill

Kibana用法

打开“开发工具”

# 创建索引myindex
PUT /myindex
# 获取索引myindex
GET /myindex
# 在索引myindex中创建映射
POST /myindex/_mapping
{
    "properties": {
        "content": {
            "type": "text",
            "analyzer": "ik_max_word",
            "search_analyzer": "ik_smart"
        }
    }
}
# 往索引myindex添加数据
POST /myindex/_create/0
{
    "content": "你好,Elasticsearch。你好,西伯利亚蝴蝶"
}
POST /myindex/_create/1
{
    "content": "你好,Elasticsearch。你好,仙女座星系"
}
# 在myindex索引中进行搜索
GET /myindex/_search
{
    "query": {"match": {"content": "你好"}}
}
# 删除索引myindex
DELETE /myindex
# 修改副本数量
PUT /myindex/_settings
{
    "index": {
        "number_of_replicas: 2"
    }
}
# 新增字段
PUT /myindex/_mapping
{
  "properties": {
    "grade": {
      "type": "integer"
    }
  }
}

ES常用命令

创建索引

创建一个名为employees的索引,shard数为1,shard副本数为0。

HTTP请求

PUT: http://公网IP:9200/employees

Headers: Content-Type:application/json

Authorization -> Basic Auth -> 输入用户名和密码

Body:

{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword"  // 指定关键字类型为keyword,表示不对之进行分词
          }
        }
      },
      "gender": {
        "type": "keyword"  // 精确匹配,不参与分词
      },
      "birth_date": {
        "type": "date",
        "format": "yyyy-MM-dd"
      },
      "CV": {
        "type": "text",
        "analyzer": "ik_max_word",  // 设置分词器
        "search_analyzer": "ik_smart"
      }
    }
  }
}

Kibana

DELETE /employees
PUT /employees
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword"  // 指定关键字类型为keyword,表示不对之进行分词
          }
        }
      },
      "gender": {
        "type": "keyword"  // 精确匹配,不参与分词
      },
      "birth_date": {
        "type": "date",
        "format": "yyyy-MM-dd"
      },
      "CV": {
        "type": "text",
        "analyzer": "ik_max_word",  // 设置分词器
        "search_analyzer": "ik_smart"
      }
    }
  }
}

Python

elasticsearch==8.17.0

from elasticsearch import Elasticsearch

# 连接配置
es = Elasticsearch(
    hosts=["http://公网IP:9200"],
    basic_auth=("用户名", "密码"),
    request_timeout=30
)
print(es.info())  # 测试连接

index_name = "employees"
index_body = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
    "mappings": {
        "properties": {
            "name": {
                "type": "text",
                "fields": {
                    "keyword": {
                        "type": "keyword"
                    }
                }
            },
            "gender": {
                "type": "keyword"
            },
            "birth_date": {
                "type": "date",
                "format": "yyyy-MM-dd"
            },
            "CV": {
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_smart"
            }
        }
    }
}

try:
    if es.indices.exists(index=index_name):
        es.indices.delete(index=index_name)
        print(f"索引 {index_name} 删除成功")

    # 创建新索引(携带完整配置)
    response = es.indices.create(index=index_name, body=index_body)
    print(f"索引 {index_name} 创建成功")
    print("响应详情:", response)

except Exception as e:
    print("操作失败:", str(e))

插入数据

HTTP

单条数据插入

POST: http://公网IP:9200/employees/_doc

Headers: Content-Type:application/json

Authorization -> Basic Auth -> 输入用户名和密码

Body:

{
  "name": "张三",
  "gender": "M",
  "birth_date": "1995-08-20",
  "CV": "资深软件工程师,擅长Java和Python开发,熟悉Elasticsearch。"
}

批量数据插入

POST: http://公网IP:9200/employees/_doc

Headers: Content-Type:application/x-ndjson

Authorization -> Basic Auth -> 输入用户名和密码

Body:

{ "index": { "_index": "employees" } }
{ "name": "张三", "gender": "M", "birth_date": "1990-01-01", "CV": "资深Java工程师..." }
{ "index": { "_index": "employees", "_id": "1001" } }
{ "name": "李四", "gender": "F", "birth_date": "1995-05-20", "CV": "前端React专家..." }

这里一次性批量插入了两条数据,而且在第二条数据中手动制定了_id,这样ES就不会自动创建第二条数据的_id编码。另外需要注意上述ndjson格式的请求体必须要以一个新的换行符结尾。

Kibana

Python

使用Python实现批量查询数据。

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

es = Elasticsearch(
    hosts=["http://113.45.129.200:9200"],
    basic_auth=("用户名", "密码"),
    request_timeout=30
)

actions = [
    {
        "_index": "employees",
        "_source": {
            "name": "王五",
            "gender": "M",
            "birth_date": "1988-11-15",
            "CV": "运维工程师,熟悉Linux和Kubernetes。"
        }
    },
    {
        "_index": "employees",
        "_id": "1002",  # 指定文档ID
        "_source": {
            "name": "赵六",
            "gender": "F",
            "birth_date": "2000-03-22",
            "CV": "数据分析师,擅长Python和SQL。"
        }
    }
]

success_count, errors = bulk(es, actions)
print(f"成功插入 {success_count} 条文档")
if errors:
    print("错误详情:", errors)

修改索引

Kibana

新增索引

向employees索引添加一个字段salary。

PUT /employees/_mapping
{
  "properties": {
    "salary": {
      "type": "integer"
    }
  }
}

在索引中已经存储了数据时也是可以向索引增加字段的。在查询数据时,在salary字段中没有值的数据是不会显示salary这个字段的。

另外,删除已有的字段映射或者数据是很难实现的,我想这应该跟Segment的更新方式只能是覆盖有关。

Python

新增索引

向employees索引添加一个字段salary。

from elasticsearch import Elasticsearch
from elasticsearch.exceptions import RequestError

es = Elasticsearch(
    hosts=["http://113.45.129.200:9200"],
    basic_auth=("用户名", "密码"),
    request_timeout=30
)
body = {
    "properties": {
        "salary": {
        "type": "integer"
        }
    }
}

try:
    response = es.indices.put_mapping(
        index="employees",
        body=body
    )
    print("字段添加成功!响应结果:", response)
except RequestError as e:
    print("更新映射失败:", e.info)
except Exception as e:
    print("发生其他错误:", str(e))

查询索引

(更高级的查询需要开启一个新的#)

Kibana

查询索引元数据

GET /employees

查询索引中存储的数据

GET employees/_search
{
  "query": {
    "match_all": {}
  }
}

Python

查询索引中存储的数据

from elasticsearch import Elasticsearch

es = Elasticsearch(
    hosts=["http://113.45.129.200:9200"],
    basic_auth=("用户名", "密码"),
    request_timeout=30
)
body = {
    "query": {
        "match_all": {}
    }
}

result = es.search(index="employees", body=body)
print("文档总数:", result["hits"]["total"]["value"])
print(result)

索引别名

# 创建一个索引并指定一个别名
PUT /myindex
{
  "aliases": {
    "myalias": {}
  },
  "settings": {
    "refresh_interval": "30s",
    "number_of_shards":1,
    "number_of_replicas": 0
  }
}
# 或,为现有索引添加新的别名(原有别名依然可以使用)
POST /_aliases
{
  "actions": [
    {
      "add": {
        "index": "myindex",
        "alias": "newalias"
      }
    }
  ]
}
# 通过别名获取该索引
GET /myalias
# 不使用别名的话可以使用通配符对相关的所有索引进行搜索
POST *index/_search
# 使用别名的情况下可以多个索引施加同一个别名,然后直接查这一个别名实现对所有索引的搜索。

Elasticsearch文档操作

# 通过POST命令,"_d"为UID
POST /myindex/_doc
{
  "name": "Lucas",
  "sex": "m",
  "age": 24,
  "address": "America",
  "remark": "C++"
}

# 通过PUT命令新增数据,"_id"为给定的值,已有的久文档会被替换
PUT /myindex/_doc/0
{
  "name": "Jack",
  "sex": "m",
  "age": 24,
  "address": "America",
  "remark": "Java"
}
PUT /myindex/_doc/1
{
  "name": "Lucy",
  "sex": "f",
  "age": 23,
  "address": "America",
  "remark": "Python"
}

# 查看数据
POST /myindex/_search
# 批量新增
POST _bulk
{ "create": { "_index": "myindex", "_id": 2 } }
{ "id": 3, "title": "Tom", "content": "Tom is a teacher", "tags": ["Java", "Python", "C++"], "create_time": 2024 }
{ "create": { "_index": "myindex", "_id": 3 } }
{ "id": 4, "title": "Malina", "content": "Malina is a student", "tags": ["Java", "Python"], "create_time": 2024 }
# 批量替换
POST _bulk
{ "index": { "_index": "myindex", "_id": 2 } }
{ "id": 3, "title": "Tom", "content": "Tom is a teacher", "tags": ["Java", "Python", "C++"], "create_time": 2024 }
{ "index": { "_index": "myindex", "_id": 3 } }
{ "id": 4, "title": "Malina", "content": "Malina is a student", "tags": ["Java", "Python"], "create_time": 2024 }

Python操作ES

环境准备

在Kibana中运行GET /来查看ES版本号(位于”version”->“number”)中。例如我的当前ES版本为8.14.3

pip install elasticsearch==8.17.0 
from elasticsearch import Elasticsearch
es = Elasticsearch("http://公网IP:9200")
print(es.info())  # 能够打印出信息说明本地连接Elasticsearch成功

Python ES 查询语句

query = {
  "query": {
    "bool": {
      "must": [
        {"match": {"mapping1": "value0"}},
        {"match": {"mapping2": "value1"}},
        {"match": {"mapping3": "value2"}}
        ]
    }
  }
}
query_result = es.count(index="my_index_name", body=search_query)
print(query_result)  # 打印出信息{'count': 261285, '_shards': {'total': 3, 'successful': 3, 'skipped': 0, 'failed': 0}}
query_result = es.search(index="my_index_name", body=search_query)
print(query_result)  # 打印出查询到的所有信息

Elasticsearch原理

倒排索引

  1. 文档预处理:比如删除语气词

  2. 构建词典

  3. 创建倒排列表

  4. 存储索引文件

  5. 查询处理

Elasticsearch中的Index相当于MySQL中的一个table

Elasticsearch中的Mapping相当于MySQL中的一个schema

Elasticsearch中的Document相当于MySQL中的一个row

RAG Demo

# Requirements.txt

# Elasticsearch: 8.14.3
# Kibana: 8.14.3

elasticsearch==8.17.0
openai==1.59.3

文本向量化

借助阿里云百炼平台。

from openai import OpenAI

llm_client = OpenAI(api_key=<API_KEY>, base_url='https://dashscope.aliyuncs.com/compatible-mode/v1')

text = "Hello, ElasticSearch"
tmp = llm_client.embeddings.create(input=text,  model='text-embedding-v3')

# 打印出文本向量化后的结果以及向量维度。这里应该是(1, 1024)维度
print(tmp.data[0].embedding, len(tmp.data[0].embedding))

文本向量入库

from elasticsearch import Elasticsearch

es = Elasticsearch("http://公网IP:9200", request_timeout=30)
print(es.info())  # 能打印出结果则连接成功
  1. 创建索引用于存储文本向量。
mappings = {
    "mappings": {
        "properties": {
            "semantic": {
                "type": "dense_vector",
                "dims": 1024
            },
            "content": {
                "type": "text"
            }
        }
    }
}
es.indices.create(index='my_rag_vector_index', body=mappings)

senmantic的类型dense_vector在ElasticSearch 7 版本中不受支持。

  1. 向量文本存入新构建的索引
# documnets格式:[{'text': ..., 'text_vector': ...}, ...]
def generate_docs(documents):
    index_name = 'my_rag_vector_index'
    result = []
    for row in documents:
        text = row['text']
        text_vector = row['text_vector']
        doc = {'content': text, 'semantic': text_vector}
        result.append({'index': {'_index': index_name}})
        result.append(doc)
    return result
    
es.bulk(body=generate_docs(documents)) 

检索

user_text = 'ElasticSearch'
user_text_vector = llm_client.embeddings.create(input=text, model='text-embedding-v3').data[0].embedding
query = {
    'knn': {
        'field': 'semantic',
        'query_vector': user_text_vector,
        'num_candidates': 1
    }
}
result = es.search(index='rag_vector_index_zt', body={'query': query})
print(result)