python 查询es报错
ApiError(406, 'Content-Type header [application/vnd.elasticsearch+json; compatible-with=8] is not supported
原因
服务端用的是AWS OpenSearch 1.3,我本地下的elasticsearch版本8.*太高了不兼容
解决办法
根据官网提示,“Amazon OpenSearch Service是一项托管服务,可在AWS云中轻松部署、操作和扩展OpenSearch群集。Amazon OpenSearch Service支持OpenSearch和旧版Elasticsearch OSS(最高可达7.10,该软件的最终开源版本)。当你创建一个集群时,你可以选择使用哪个搜索引擎。”下载了elasticsearch 7.10后,可以正常查询es索引
附代码
下面展示一些 内联代码片
。
import os from sshtunnel import SSHTunnelForwarder from elasticsearch import Elasticsearch import logging import json import traceback import time from Config.Config import Config from Utils.ReadYaml import ReadYaml from Utils.logger_handler import LoggerHandler # 设置日志记录 logging.basicConfig(level=logging.INFO) logger = LoggerHandler() class SshEsClient: def __init__(self, SSH_HOST, SSH_PORT, SSH_USERNAME, SSH_PRIVATE_KEY_PATH, ES_HOST, ES_PORT, ES_UserName, ES_PassWord: str): """ :param SSH_HOST:SSH 跳板机IP :param SSH_PORT:SSH 跳板机端口 :param SSH_USERNAME:跳板机用户名 :param SSH_PRIVATE_KEY_PATH:私钥文件地址 :param ES_HOST:ES 服务器地址 :param ES_PORT:ES 服务器端口 :param ES_UserName:ES 服务器用户名 :param ES_PassWord:ES 服务器密码 """ # SSH 跳板机参数 self.ES_PassWord = ES_PassWord self.ES_UserName = ES_UserName self.SSH_PRIVATE_KEY_PATH = SSH_PRIVATE_KEY_PATH self.SSH_USERNAME = SSH_USERNAME self.SSH_PORT = SSH_PORT self.SSH_HOST = SSH_HOST # Redis 服务器参数 self.ES_PORT = ES_PORT self.ES_HOST = ES_HOST # 尝试使用不同的本地端口 self.LOCAL_PORT = LOCAL_PORT # 更改为不同的端口 self.tunnel = None self.es = None def __enter__(self): try: # 建立 SSH 隧道 self.tunnel = SSHTunnelForwarder( (self.SSH_HOST, self.SSH_PORT), ssh_username=self.SSH_USERNAME, ssh_pkey=self.SSH_PRIVATE_KEY_PATH, remote_bind_address=(self.ES_HOST, self.ES_PORT), local_bind_address=('0.0.0.0', 0) # 绑定到所有本地地址,并让系统自动选择一个本地端口 ) # 开启隧道 self.tunnel.start() # print(self.tunnel) if self.tunnel.is_active: logger.info('SSH 连接已成功建立') # logger.info(f'本地端口 {self.LOCAL_PORT} 已映射到 ES 服务器 {self.ES_HOST}:{self.ES_PORT}') # 连接到本地映射的 ES 服务器 es_config = {"hosts": [{'host': 'localhost', 'port': 0, 'scheme': 'https'}], "http_auth": (self.ES_UserName, self.ES_PassWord), "verify_certs": False} # 获取SSH隧道映射到本地的端口 local_port = self.tunnel.local_bind_port es_config['hosts'][0]['port'] = local_port # 更新Elasticsearch主机配置中的端口号 # print(es_config) self.es = Elasticsearch(**es_config) else: logger.error('SSH 连接建立失败') return self except Exception as e: logger.error(f"发生错误: {e}") def __exit__(self, exc_type, exc_val, exc_tb): if self.es: self.es.close() self.es = None if self.tunnel: self.tunnel.stop() self.tunnel = None def get_value(self, index: str, body, verbose=2): start_time = time.time() try: value = self.es.search(index=index, body=body) if verbose == 2: logger.info( f'>> [ES] 获取body的value成功, body: {body}, value: {value}, 耗时: {time.time() - start_time:.2}s') elif verbose == 1: logger.info(f'>> [ES] 获取body的value成功, key: {body}, 耗时: {time.time() - start_time:.2}s') return value except Exception as e: err = traceback.format_exc() logger.error(f'>> [ES] 获取value异常, key: {body}, error: {e}, traceback: {err} 耗时: {time.time() - start_time:.2}s') return None if __name__ == '__main__': with SshEsClient( **ReadYaml(os.path.join(Config.test_datas_dir, "es连接账号.yaml"), part="sit_es").read()[0]) as es_client: print('es_client',es_client) query = {"query": { "match_all": {} } } result = es_client.get_value(index="hsrh_es_platform_product_0", body=query)
复制