python 查询es报错
ApiError(406, 'Content-Type header [application/vnd.elasticsearch+json; compatible-with=8] is not supported
原因
服务端用的是AWS OpenSearch 1.3,我本地下的elasticsearch版本8.*太高了不兼容
解决办法
根据官网提示,“Amazon OpenSearch Service是一项托管服务,可在AWS云中轻松部署、操作和扩展OpenSearch群集。Amazon OpenSearch Service支持OpenSearch和旧版Elasticsearch OSS(最高可达7.10,该软件的最终开源版本)。当你创建一个集群时,你可以选择使用哪个搜索引擎。”下载了elasticsearch 7.10后,可以正常查询es索引
附代码
下面展示一些 内联代码片
。
import os
from sshtunnel import SSHTunnelForwarder
from elasticsearch import Elasticsearch
import logging
import json
import traceback
import time
from Config.Config import Config
from Utils.ReadYaml import ReadYaml
from Utils.logger_handler import LoggerHandler
# 设置日志记录
logging.basicConfig(level=logging.INFO)
logger = LoggerHandler()
class SshEsClient:
def __init__(self, SSH_HOST, SSH_PORT, SSH_USERNAME, SSH_PRIVATE_KEY_PATH, ES_HOST, ES_PORT,
ES_UserName, ES_PassWord: str):
"""
:param SSH_HOST:SSH 跳板机IP
:param SSH_PORT:SSH 跳板机端口
:param SSH_USERNAME:跳板机用户名
:param SSH_PRIVATE_KEY_PATH:私钥文件地址
:param ES_HOST:ES 服务器地址
:param ES_PORT:ES 服务器端口
:param ES_UserName:ES 服务器用户名
:param ES_PassWord:ES 服务器密码
"""
# SSH 跳板机参数
self.ES_PassWord = ES_PassWord
self.ES_UserName = ES_UserName
self.SSH_PRIVATE_KEY_PATH = SSH_PRIVATE_KEY_PATH
self.SSH_USERNAME = SSH_USERNAME
self.SSH_PORT = SSH_PORT
self.SSH_HOST = SSH_HOST
# Redis 服务器参数
self.ES_PORT = ES_PORT
self.ES_HOST = ES_HOST
# 尝试使用不同的本地端口
self.LOCAL_PORT = LOCAL_PORT # 更改为不同的端口
self.tunnel = None
self.es = None
def __enter__(self):
try:
# 建立 SSH 隧道
self.tunnel = SSHTunnelForwarder(
(self.SSH_HOST, self.SSH_PORT),
ssh_username=self.SSH_USERNAME,
ssh_pkey=self.SSH_PRIVATE_KEY_PATH,
remote_bind_address=(self.ES_HOST, self.ES_PORT),
local_bind_address=('0.0.0.0', 0) # 绑定到所有本地地址,并让系统自动选择一个本地端口
)
# 开启隧道
self.tunnel.start()
# print(self.tunnel)
if self.tunnel.is_active:
logger.info('SSH 连接已成功建立')
# logger.info(f'本地端口 {self.LOCAL_PORT} 已映射到 ES 服务器 {self.ES_HOST}:{self.ES_PORT}')
# 连接到本地映射的 ES 服务器
es_config = {"hosts": [{'host': 'localhost',
'port': 0,
'scheme': 'https'}],
"http_auth": (self.ES_UserName, self.ES_PassWord),
"verify_certs": False}
# 获取SSH隧道映射到本地的端口
local_port = self.tunnel.local_bind_port
es_config['hosts'][0]['port'] = local_port # 更新Elasticsearch主机配置中的端口号
# print(es_config)
self.es = Elasticsearch(**es_config)
else:
logger.error('SSH 连接建立失败')
return self
except Exception as e:
logger.error(f"发生错误: {e}")
def __exit__(self, exc_type, exc_val, exc_tb):
if self.es:
self.es.close()
self.es = None
if self.tunnel:
self.tunnel.stop()
self.tunnel = None
def get_value(self, index: str, body, verbose=2):
start_time = time.time()
try:
value = self.es.search(index=index, body=body)
if verbose == 2:
logger.info(
f'>> [ES] 获取body的value成功, body: {body}, value: {value}, 耗时: {time.time() - start_time:.2}s')
elif verbose == 1:
logger.info(f'>> [ES] 获取body的value成功, key: {body}, 耗时: {time.time() - start_time:.2}s')
return value
except Exception as e:
err = traceback.format_exc()
logger.error(f'>> [ES] 获取value异常, key: {body}, error: {e}, traceback: {err} 耗时: {time.time() - start_time:.2}s')
return None
if __name__ == '__main__':
with SshEsClient(
**ReadYaml(os.path.join(Config.test_datas_dir, "es连接账号.yaml"), part="sit_es").read()[0]) as es_client:
print('es_client',es_client)
query = {"query": {
"match_all": {}
}
}
result = es_client.get_value(index="hsrh_es_platform_product_0", body=query)