首页 前端知识 envoy 源码分析

envoy 源码分析

2025-03-20 12:03:44 前端知识 前端哥 582 34 我要收藏

整体架构

Envoy 的架构如图所示:

Envoy 中也可能有多个 Listener,每个 Listener 中可能会有多个 filter 组成了 chain。

Envoy 接收到请求后,会先走 FilterChain,通过各种 L3/L4/L7 Filter 对请求进行微处理,然后再路由到指定的集群,并通过负载均衡获取一个目标地址,最后再转发出去。

在Envoy中,具有最为核心的四种资源:Listener,Router,Cluster,以及Filter。

Listener:Envoy工作的基础

简单理解,Listener是Envoy打开的一个监听端口,用于接收来自Downstream(客户端)连接。Envoy可以支持复数个Listener。多个Listener之间几乎所有的配置都是隔离的。Listener配置中核心包括监听地址、Filter链等。

Filter:强大源于可扩展

Filter,通俗的讲,就是插件。通过Filter机制,Envoy提供了极为强大的可扩展能力。

利用Filter机制,Envoy理论上可以实现任意协议的支持以及协议之间的转换,可以对请求流量进行全方位的修改和定制。强大的Filter机制带来的不仅仅是强大的可扩展性,同时还有优秀的可维护性。Filter机制让Envoy的使用者可以在不侵入社区源码的基础上对Envoy做各个方面的增强。

  • 网络过滤器(Network Filters): 工作在 L3/L4,是 Envoy 网络连接处理的核心,处理的是原始字节,分为 Read、Write 和 Read/Write 三类。

  • HTTP 过滤器(HTTP Filters): 工作在 L7,由特殊的网络过滤器 HTTP connection manager 管理,专门处理 HTTP1/HTTP2/gRPC 请求。它将原始字节转换成 HTTP 格式,从而可以对 HTTP 协议进行精确控制。

  • 网络过滤器(Network Filters): 工作在 L3/L4,是 Envoy 网络连接处理的核心,处理的是原始字节,分为 Read、Write 和 Read/Write 三类。

  • HTTP 过滤器(HTTP Filters): 工作在 L7,由特殊的网络过滤器 HTTP connection manager 管理,专门处理 HTTP1/HTTP2/gRPC 请求。它将原始字节转换成 HTTP 格式,从而可以对 HTTP 协议进行精确控制。

Cluster:对上游服务的抽象

在Envoy中,每个Upstream上游服务都被抽象成一个Cluster。Cluster包含该服务的连接池、超时时间、endpoints地址、端口、类型(类型决定了Envoy获取该Cluster具体可以访问的endpoint方法)等等。

Cluster对应的配置/资源发现服务称之为CDS。一般情况下,CDS服务会将其发现的所有可访问服务全量推送给Envoy。与CDS紧密相关的另一种服务称之为EDS。CDS服务负责Cluster资源的推送。而当该Cluster类型为EDS时,说明该Cluster的所有endpoints需要由xDS服务下发,而不使用DNS等去解析。下发endpoints的服务就称之为EDS。

Listener对应的配置/资源发现服务称之为LDS。LDS是Envoy正常工作的基础,没有LDS,Envoy就不能实现端口监听(如果启动配置也没有提供静态Listener的话),其他所有xDS服务也失去了作用。

Router:上下游之间的桥梁

Listener可以接收来自下游的连接,Cluster可以将流量发送给具体的上游服务,而Router则决定Listener在接收到下游连接和数据之后,应该将数据交给哪一个Cluster处理。它定义了数据分发的规则。虽然说到Router大部分时候都可以默认理解为HTTP路由,但是Envoy支持多种协议,如Dubbo、Redis等,所以此处Router泛指所有用于桥接Listener和后端服务(不限定HTTP)的规则与资源集合。

Route对应的配置/资源发现服务称之为RDS。Router中最核心配置包含匹配规则和目标Cluster,此外,也可能包含重试、分流、限流等等。

服务管理

静态配置

下面的配置将所有流量代理到 baidu.com,配置完成后我们应该能够通过请求 Envoy 的端点就可以直接看到百度的主页了,而无需更改 URL 地址。

static_resources:
   # 1. 监听器
  listeners:
  - name: listener_0
    address:
      socket_address: { address: 0.0.0.0, port_value: 10000 }

        # 2. 过滤器
    filter_chains:
    - filters:
      - name: envoy.http_connection_manager
        config:
          stat_prefix: ingress_http
          route_config:
            name: local_route
            virtual_hosts:
            - name: local_service
              domains: ["*"]
              routes:
              - match: { prefix: "/" }
                route: { host_rewrite: www.baidu.com, cluster: service_baidu }
          http_filters:
          - name: envoy.router
         
  # 3. 集群
  clusters:
  - name: service_baidu
    connect_timeout: 0.25s
    type: LOGICAL_DNS
    dns_lookup_family: V4_ONLY
    lb_policy: ROUND_ROBIN
    hosts: [{ socket_address: { address: www.baidu.com, port_value: 443 }}]
    tls_context: { sni: baidu.com }

# 4. 管理    
admin:
  access_log_path: /tmp/admin_access.log
  address:
    socket_address: { address: 0.0.0.0, port_value: 9901 }
 

动态配置xDS协议(动态服务发现)

Envoy 通过查询文件或管理服务器来动态发现资源。这些发现服务及其相应的 API 被统称为 xDS。

Envoy 通过订阅(subscription)方式来获取资源,如监控指定路径下的文件、启动 gRPC 流(streaming)或轮询 REST-JSON URL。后两种方式会发送 DiscoveryRequest 请求消息,发现的对应资源则包含在响应消息 DiscoveryResponse 中。

xDS 协议是 “X Discovery Service” 的简写,这里的 “X” 表示它不是指具体的某个协议,是一组基于不同数据源的服务发现协议的总称:

CDS:Cluster Discovery Service
EDS:Endpoint Discovery Service
SDS:Secret Discovery Service
RDS:Route Discovery Service
LDS:Listener Discovery Service

xDS 协议是由 Envoy 提出的,目前已成为服务网格的协议标准之一。

Envoy是 Istio 中默认的 sidecar 代理,但只要实现了 xDS 协议,理论上也可以作为 Istio 中的 sidecar 代理 —— 比如蚂蚁集团开源的 MOSN。

比如每个 Envoy 流以发送一个 DiscoveryRequest 开始,包括了列表订阅的资源、订阅资源对应的类型 URL、节点标识符和空的 version_info。EDS 请求示例如下:

version_info:
node: { id: envoy }
resource_names:
- foo
- bar
type_url: type.googleapis.com/envoy.api.v2.ClusterLoadAssignment
response_nonce

管理服务器可立刻或等待资源就绪时发送 DiscoveryResponse 作为响应,示例如下

version_info: X
resources:
- foo ClusterLoadAssignment proto encoding
- bar ClusterLoadAssignment proto encoding
type_url: type.googleapis.com/envoy.api.v2.ClusterLoadAssignment
nonce: A

源码分析

源码版本

release/v1.31

启动

Bootstrap文件详解

Envoy 内部对请求的处理流程其实跟我们上面脑补的流程大致相同,即对请求的处理流程基本是不变的,而对于变化的部分,即对请求数据的微处理,全部抽象为 Filter,例如对请求的读写是 ReadFilter、WriteFilter,对 HTTP 请求数据的编解码是 StreamEncoderFilter、StreamDecoderFilter,对 TCP 的处理是 TcpProxyFilter,其继承自 ReadFilter,对 HTTP 的处理是 ConnectionManager,其也是继承自 ReadFilter 等等,各个 Filter 最终会组织成一个 FilterChain,在收到请求后首先走 FilterChain,其次路由到指定集群并做负载均衡获取一个目标地址,然后转发出去。

Bootstrap启动过程

  1. 载入Bootstrap启动yaml文件

  2. 设置header prefix

  3. 初始化stats、设置TagProducer、StatsMatcher、HistogramSettings等

  4. 创建Server stats

  5. 注册assert action、bug action

  6. 设置healthy check为false

  7. cluster manager包含了多阶段初始化,第一阶段要初始化的是static/DNS clsuter, 然后是预先定义的静态的EDS cluster, 如果包含了CDS需要等待CDS收到一个response,或者是失败的响应,最后初始化CDS,接着开始初始化CDS提供的Cluster。

  8. 如果集群提供了健康检查,Envoy还会进行一轮健康检查

  9. 等到cluster manager初始化完毕后,RDS和LDS开始初始化,直到收到响应或者失败,在此之前Envoy是不会接受连接来处理流量的

  10. 如果LDS的响应中嵌入了RDS的配置,那么还需要等待RDS收到响应,这个过程被称为listener warming

  11. 上述所有流程完毕后,listener开始接受流量。

数据流处理

总的来说,Envoy中的filter处理过程可以简单概括为:请求到达Listener -> 经过一系列filter处理 -> 转发给上游服务 -> 接收上游服务的响应 -> 经过相同filter链处理 -> 发送响应给客户端。

其中,将请求转发给上游服务的过程是由Proxy模块完成的。Proxy模块负责管理与上游服务之间的通信,包括建立连接、发送请求、接收响应等操作。一旦请求在经过所有的filter处理后,Proxy模块将负责将请求转发给相应的上游服务,并将上游服务返回的响应转发回客户端。Proxy模块在Envoy中扮演着重要的角色,确保请求能够顺利地到达目标服务并返回响应。

Listener 分析

listener流程

  1. 监听器配置:在Envoy的配置文件中,可以配置一个或多个listener来指定Envoy应该监听的地址和端口,以及应用于该listener的filter链。

  2. 传入连接接受:当Envoy启动时,每个listener会开始在其指定的地址和端口上监听传入的连接。一旦有新的连接建立,listener将会接受这个连接。

  3. Filter链处理:一旦listener接受了传入连接,连接将会按照listener配置的顺序通过一系列的filter。每个filter可以对连接进行操作,比如解密、身份验证、流量控制等。

在监听处理时,为了截获关注的流量到listener端口,经常通过iptables等方式配置,将关注的流量转发至对应的listener统一处理。

Filter分析

Filter 作用在listen socket上,当有连接到来的时候,通过libevent会触发可读事件,调用listen socket的accept获取到连接socket封装为ConnectionSocket, 最后调用ActiveListener::onAccept,将获取到的连接socket作为其参数。

  1. 创建filter chain

  2. continueFilterChain 调用filter chain

  3. 如果有filter返回了StopIteration,那么就开启timer,在这个时间内还没有继续continue就直接关闭当前socket

  4. filter返回StopIteration后,要继续运行剩下的filter可以回调continueFilterChain

比如proxy_protocol这个listener filter当接收到一个filter后会注册读事件,从socket读取proxy协议头,所以会返回StopIteration 等到有数据可读的时候,并且读到了协议头才会回调continueFilterChain继续执行下面的filter

void ConnectionHandlerImpl::ActiveListener::onAccept(
    Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections) {
  auto active_socket = std::make_unique<ActiveSocket>(*this, std::move(socket),
                                                      hand_off_restored_destination_connections);

  // Create and run the filters
  config_.filterChainFactory().createListenerFilterChain(*active_socket);
  active_socket->continueFilterChain(true);

  // Move active_socket to the sockets_ list if filter iteration needs to continue later.
  // Otherwise we let active_socket be destructed when it goes out of scope.
  if (active_socket->iter_ != active_socket->accept_filters_.end()) {
    // 启动了一个timer,避免filter长时间不调用
    active_socket->startTimer();
    active_socket->moveIntoListBack(std::move(active_socket), sockets_);
  }
}

// 如果超时就从socket list中移除当前socket
void ConnectionHandlerImpl::ActiveSocket::onTimeout() {
  listener_.stats_.downstream_pre_cx_timeout_.inc();
  ASSERT(inserted());
  unlink();
}

void ConnectionHandlerImpl::ActiveSocket::startTimer() {
  if (listener_.listener_filters_timeout_.count() > 0) {
    timer_ = listener_.parent_.dispatcher_.createTimer([this]() -> void { onTimeout(); });
    timer_->enableTimer(listener_.listener_filters_timeout_);
  }
}

四层filter执行链

  1. Downstream连接建立后,开始创建filter,然后初始化filter

  2. 回调onNewConnection

  3. 回调onData

bool FilterManagerImpl::initializeReadFilters() {
  if (upstream_filters_.empty()) {
    return false;
  }
  // 初始化完成后,开始从头开始执行filter
  onContinueReading(nullptr);
  return true;
}

// 传入的是nullptr的时候,从头开始执行filter的
// 设置initialized_标志为true
// 回调onNewConnection,如果是返回stop就停止运行了
// 等待filter返回通过ReadFilterCallbacks回调onContinueReading来继续执行
void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter) {
  std::list<ActiveReadFilterPtr>::iterator entry;
  if (!filter) {
    entry = upstream_filters_.begin();
  } else {
    entry = std::next(filter->entry());
  }

  for (; entry != upstream_filters_.end(); entry++) {
    if (!(*entry)->initialized_) {
      (*entry)->initialized_ = true;
      FilterStatus status = (*entry)->filter_->onNewConnection();
      if (status == FilterStatus::StopIteration) {
        return;
      }
    }

    BufferSource::StreamBuffer read_buffer = buffer_source_.getReadBuffer();
    if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {
      FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream);
      if (status == FilterStatus::StopIteration) {
        return;
      }
    }
  }
}

Example: 有三个filter、其中任何一个filter其中的一个callback返回StopIteration那么整个流程就停止了,需要等待调用onContinueReading才能继续 执行下一个callback方法。

FilterA::onNewConnection FilterA::onData

FilterB::onNewConnection FilterB::onData

FilterC::onNewConnection FilterC::onData

执行顺序为: FilterA::onNewConnection->FilterA::onData->FilterB::onNewConnection->FilterB::onData->FilterC::onNewConnection->FilterC::onData 任何一个callback返回StopIteration整个流程就不会继续往下走了,需要等待对应的filter回调onContinueReading,这样就会带来一个问题,一旦停止filter chain 继续往下走,那么网络层依然会收数据存在内部buffer里面,这会导致内存上涨,因此TCP PROXY中会在调用onNewConnection的时候先关闭读,然后和upstream建立连接 连接建立后才会开启读,防止内存被打爆。

目前实现的listener filter主要有original_dstoriginal_srcproxy_protocoltls_inspector等,同时支持用户扩展自定义的filter。

original_dst filter

一般应用于通过iptables或者tproxy的方式将流量发送给envoy,导致原来要访问的地址信息丢失,但是可以通过从socket中获取到这些信息,交给envoy做listen转发。

  1. 主要就是从socket中获取到原来的目的地址信息 (getsockopt(fd, SOL_IP, SO_ORIGINAL_DST, &orig_addr, &addr_len))

  2. 然后设置socket的restore_local_address为原来的目的地址

Network::FilterStatus OriginalDstFilter::onAccept(Network::ListenerFilterCallbacks& cb) {
  ENVOY_LOG(debug, "original_dst: New connection accepted");
  Network::ConnectionSocket& socket = cb.socket();
  const Network::Address::Instance& local_address = *socket.localAddress();

  if (local_address.type() == Network::Address::Type::Ip) {
    Network::Address::InstanceConstSharedPtr original_local_address =
        getOriginalDst(socket.ioHandle().fd());

    // A listener that has the use_original_dst flag set to true can still receive
    // connections that are NOT redirected using iptables. If a connection was not redirected,
    // the address returned by getOriginalDst() matches the local address of the new socket.
    // In this case the listener handles the connection directly and does not hand it off.
    if (original_local_address) {
      // Restore the local address to the original one.
      socket.restoreLocalAddress(original_local_address);
    }
  }

  return Network::FilterStatus::Continue;

四层filter执行链

  1. Downstream连接建立后,开始创建filter,然后初始化filter

  2. 回调onNewConnection

  3. 回调onData

bool FilterManagerImpl::initializeReadFilters() {
  if (upstream_filters_.empty()) {
    return false;
  }
  // 初始化完成后,开始从头开始执行filter
  onContinueReading(nullptr);
  return true;
}

// 传入的是nullptr的时候,从头开始执行filter的
// 设置initialized_标志为true
// 回调onNewConnection,如果是返回stop就停止运行了
// 等待filter返回通过ReadFilterCallbacks回调onContinueReading来继续执行
void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter) {
  std::list<ActiveReadFilterPtr>::iterator entry;
  if (!filter) {
    entry = upstream_filters_.begin();
  } else {
    entry = std::next(filter->entry());
  }

  for (; entry != upstream_filters_.end(); entry++) {
    if (!(*entry)->initialized_) {
      (*entry)->initialized_ = true;
      FilterStatus status = (*entry)->filter_->onNewConnection();
      if (status == FilterStatus::StopIteration) {
        return;
      }
    }

    BufferSource::StreamBuffer read_buffer = buffer_source_.getReadBuffer();
    if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {
      FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream);
      if (status == FilterStatus::StopIteration) {
        return;
      }
    }
  }
}

Example: 有三个filter、其中任何一个filter其中的一个callback返回StopIteration那么整个流程就停止了,需要等待调用onContinueReading才能继续 执行下一个callback方法。

FilterA::onNewConnection FilterA::onData

FilterB::onNewConnection FilterB::onData

FilterC::onNewConnection FilterC::onData

执行顺序为: FilterA::onNewConnection->FilterA::onData->FilterB::onNewConnection->FilterB::onData->FilterC::onNewConnection->FilterC::onData 任何一个callback返回StopIteration整个流程就不会继续往下走了,需要等待对应的filter回调onContinueReading,这样就会带来一个问题,一旦停止filter chain 继续往下走,那么网络层依然会收数据存在内部buffer里面,这会导致内存上涨,因此TCP PROXY中会在调用onNewConnection的时候先关闭读,然后和upstream建立连接 连接建立后才会开启读,防止内存被打爆。

目前实现的listener filter主要有original_dstoriginal_srcproxy_protocoltls_inspector等,同时支持用户扩展自定义的filter。

original_dst filter

一般应用于通过iptables或者tproxy的方式将流量发送给envoy,导致原来要访问的地址信息丢失,但是可以通过从socket中获取到这些信息,交给envoy做listen转发。

  1. 主要就是从socket中获取到原来的目的地址信息 (getsockopt(fd, SOL_IP, SO_ORIGINAL_DST, &orig_addr, &addr_len))

  2. 然后设置socket的restore_local_address为原来的目的地址

Network::FilterStatus OriginalDstFilter::onAccept(Network::ListenerFilterCallbacks& cb) {
  ENVOY_LOG(debug, "original_dst: New connection accepted");
  Network::ConnectionSocket& socket = cb.socket();
  const Network::Address::Instance& local_address = *socket.localAddress();

  if (local_address.type() == Network::Address::Type::Ip) {
    Network::Address::InstanceConstSharedPtr original_local_address =
        getOriginalDst(socket.ioHandle().fd());

    // A listener that has the use_original_dst flag set to true can still receive
    // connections that are NOT redirected using iptables. If a connection was not redirected,
    // the address returned by getOriginalDst() matches the local address of the new socket.
    // In this case the listener handles the connection directly and does not hand it off.
    if (original_local_address) {
      // Restore the local address to the original one.
      socket.restoreLocalAddress(original_local_address);
    }
  }

  return Network::FilterStatus::Continue;

original_src filter

L3/L4 transparency的含义: L3要求源IP可见、L4要求端口可见,这个filter的目的是将原地址信息透传到upstream,让upstream可以 获取到真实的源IP和端口信息。

proxy_protocol filter

建立连接后发送一段数据来传递源地址和端口信息。

// 连接建立后,开始注册读事件,读取传递过来的数据。
Network::FilterStatus Filter::onAccept(Network::ListenerFilterCallbacks& cb) {
  ENVOY_LOG(debug, "proxy_protocol: New connection accepted");
  Network::ConnectionSocket& socket = cb.socket();
  ASSERT(file_event_.get() == nullptr);
  file_event_ =
      cb.dispatcher().createFileEvent(socket.ioHandle().fd(),
                                      [this](uint32_t events) {
                                        ASSERT(events == Event::FileReadyType::Read);
                                        onRead();
                                      },
                                      Event::FileTriggerType::Edge, Event::FileReadyType::Read);
  cb_ = &cb;
  return Network::FilterStatus::StopIteration;
}

void Filter::onRead() {
  try {
    onReadWorker();
  } catch (const EnvoyException& ee) {
    config_->stats_.downstream_cx_proxy_proto_error_.inc();
    cb_->continueFilterChain(false);
  }
}

// 读取proxy头,这里的读取是通过ioctl(fd, FIONREAD, &bytes_avail) 来获取缓存中的数据大小 ,
// 然后通过MSG_PEEK的方式查看数据。并不是直接read,因为一旦不是proxy ptorocol协议头会导致数据不完整(被读走了)。

void Filter::onReadWorker() {
  Network::ConnectionSocket& socket = cb_->socket();

  if ((!proxy_protocol_header_.has_value() && !readProxyHeader(socket.ioHandle().fd())) ||
      (proxy_protocol_header_.has_value() && !parseExtensions(socket.ioHandle().fd()))) {
    // We return if a) we do not yet have the header, or b) we have the header but not yet all
    // the extension data. In both cases we'll be called again when the socket is ready to read
    // and pick up where we left off.
    return;
  }
  ....
  // 读取完成后,拿到获取的源地址信息进行操作。

  // Only set the local address if it really changed, and mark it as address being restored.
  if (*proxy_protocol_header_.value().local_address_ != *socket.localAddress()) {
    socket.restoreLocalAddress(proxy_protocol_header_.value().local_address_);
  }
  socket.setRemoteAddress(proxy_protocol_header_.value().remote_address_);
  ....
}

TLS Inspector filter

TLS Inspector listener filter allows detecting whether the transport appears to be TLS or plaintext, and if it is TLS, it detects the Server Name Indication and/or Application-Layer Protocol Negotiation from the client. This can be used to select a FilterChain via the server_names and/or application_protocols of a FilterChainMatch.

  1. 注册读数据,等待数据到来

  2. 解析Client hello报文

  3. 找到TLS信息,设置TransportSocket为Tls

Cluster分析

clusuter的定义

在Envoy中Cluster表示的是一个集群,一个集群主要包含两个部分的信息,一个部分是这个集群相关的配置,比如集群的名字、集群下机器建链的超时时间、负载均衡策略、建立链接用什么协议等等。 另外一个部分则是这个集群下所包含的机器列表。

在isto中 一组服务通过标签筛选中的上游pod及对应这里的集群下的集群列表, 服务则对应envoy 中的 cluster。

例如下面这个例子。

 clusters:
  - name: statsd
    type: STATIC
    connect_timeout: 0.25s
    lb_policy: ROUND_ROBIN
    load_assignment:
      cluster_name: statsd
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: 127.0.0.1
                port_value: 8125
                protocol: TCP

 上面这段yaml定义了一个名为statsd的集群,负载均衡策略是ROUND_ROBIN、连接超时时间是0.25s、这个集群下面有一个机器,这个集群的类型是STATIC。根据这段yamlEnvoy就会创建一个Cluster对象。 这个Cluster对象并非是一个通用的对象,而且根据yaml中的type字段,找到对象类型的Cluster的构造工厂函数来进行构造。

而STRICT_DNS类型的Cluster则是通过DNS查询指定域名来获取机器列表的。EDS类型的Cluster则是通过发送EDS请求给控制面来获取机器列表的。无论是何种方式获取, 最终机器列表都是存在envoy::config::endpoint::v3::ClusterLoadAssignment这样的protobuf message中的。

message ClusterLoadAssignment {
  // Load balancing policy settings.
  // [#next-free-field: 6]
  message Policy {
    // [#not-implemented-hide:]
    message DropOverload {
      // Identifier for the policy specifying the drop.
      string category = 1 [(validate.rules).string = {min_bytes: 1}];

      // Percentage of traffic that should be dropped for the category.
      type.FractionalPercent drop_percentage = 2;
    }

    reserved 1;

    // Action to trim the overall incoming traffic to protect the upstream
    // hosts. This action allows protection in case the hosts are unable to
    // recover from an outage, or unable to autoscale or unable to handle
    // incoming traffic volume for any reason.
    //
    // At the client each category is applied one after the other to generate
    // the 'actual' drop percentage on all outgoing traffic. For example:
    //
    // .. code-block:: json
    //
    //  { "drop_overloads": [
    //      { "category": "throttle", "drop_percentage": 60 }
    //      { "category": "lb", "drop_percentage": 50 }
    //  ]}
    //
    // The actual drop percentages applied to the traffic at the clients will be
    //    "throttle"_drop = 60%
    //    "lb"_drop = 20%  // 50% of the remaining 'actual' load, which is 40%.
    //    actual_outgoing_load = 20% // remaining after applying all categories.
    // [#not-implemented-hide:]
    repeated DropOverload drop_overloads = 2;

    // Priority levels and localities are considered overprovisioned with this
    // factor (in percentage). This means that we don't consider a priority
    // level or locality unhealthy until the percentage of healthy hosts
    // multiplied by the overprovisioning factor drops below 100.
    // With the default value 140(1.4), Envoy doesn't consider a priority level
    // or a locality unhealthy until their percentage of healthy hosts drops
    // below 72%. For example:
    //
    // .. code-block:: json
    //
    //  { "overprovisioning_factor": 100 }
    //
    // Read more at :ref:`priority levels <arch_overview_load_balancing_priority_levels>` and
    // :ref:`localities <arch_overview_load_balancing_locality_weighted_lb>`.
    google.protobuf.UInt32Value overprovisioning_factor = 3 [(validate.rules).uint32 = {gt: 0}];

    // The max time until which the endpoints from this assignment can be used.
    // If no new assignments are received before this time expires the endpoints
    // are considered stale and should be marked unhealthy.
    // Defaults to 0 which means endpoints never go stale.
    google.protobuf.Duration endpoint_stale_after = 4 [(validate.rules).duration = {gt {}}];

    // The flag to disable overprovisioning. If it is set to true,
    // :ref:`overprovisioning factor
    // <arch_overview_load_balancing_overprovisioning_factor>` will be ignored
    // and Envoy will not perform graceful failover between priority levels or
    // localities as endpoints become unhealthy. Otherwise Envoy will perform
    // graceful failover as :ref:`overprovisioning factor
    // <arch_overview_load_balancing_overprovisioning_factor>` suggests.
    // [#not-implemented-hide:]
    bool disable_overprovisioning = 5 [deprecated = true];
  }

  // Name of the cluster. This will be the :ref:`service_name
  // <envoy_api_field_Cluster.EdsClusterConfig.service_name>` value if specified
  // in the cluster :ref:`EdsClusterConfig
  // <envoy_api_msg_Cluster.EdsClusterConfig>`.
  string cluster_name = 1 [(validate.rules).string = {min_bytes: 1}];

  // List of endpoints to load balance to.
  repeated endpoint.LocalityLbEndpoints endpoints = 2;

  // Map of named endpoints that can be referenced in LocalityLbEndpoints.
  // [#not-implemented-hide:]
  map<string, endpoint.Endpoint> named_endpoints = 5;

  // Load balancing policy settings.
  Policy policy = 4;
}

一个集群下面是多个LocalityLbEndpoints,一个LocalityLbEndpoints包含一个Locality、一个优先级、一个区域的权重、以及一批LbEndpoint 一个LbEndpoint包含了一个机器和对应的元数据和权重。

message LocalityLbEndpoints {
  // Identifies location of where the upstream hosts run.
  core.Locality locality = 1;

  // The group of endpoints belonging to the locality specified.
  repeated LbEndpoint lb_endpoints = 2;

  // Optional: Per priority/region/zone/sub_zone weight; at least 1. The load
  // balancing weight for a locality is divided by the sum of the weights of all
  // localities  at the same priority level to produce the effective percentage
  // of traffic for the locality. The sum of the weights of all localities at
  // the same priority level must not exceed uint32_t maximal value (4294967295).
  //
  // Locality weights are only considered when :ref:`locality weighted load
  // balancing <arch_overview_load_balancing_locality_weighted_lb>` is
  // configured. These weights are ignored otherwise. If no weights are
  // specified when locality weighted load balancing is enabled, the locality is
  // assigned no load.
  google.protobuf.UInt32Value load_balancing_weight = 3 [(validate.rules).uint32 = {gte: 1}];

  // Optional: the priority for this LocalityLbEndpoints. If unspecified this will
  // default to the highest priority (0).
  //
  // Under usual circumstances, Envoy will only select endpoints for the highest
  // priority (0). In the event all endpoints for a particular priority are
  // unavailable/unhealthy, Envoy will fail over to selecting endpoints for the
  // next highest priority group.
  //
  // Priorities should range from 0 (highest) to N (lowest) without skipping.
  uint32 priority = 5 [(validate.rules).uint32 = {lte: 128}];

  // Optional: Per locality proximity value which indicates how close this
  // locality is from the source locality. This value only provides ordering
  // information (lower the value, closer it is to the source locality).
  // This will be consumed by load balancing schemes that need proximity order
  // to determine where to route the requests.
  // [#not-implemented-hide:]
  google.protobuf.UInt32Value proximity = 6;
}

Primary Cluster 和 Secondary Cluster

Envoy 中,将构建两种集群 Primary 集群(Primary Cluster)Secondary 集群(Secondary Cluster) 主要用于 动态负载均衡服务发现(Service Discovery) 机制,二者的区别主要体现在 管理方式负载均衡的动态性 上。

Primary 集群(Primary Cluster)

概念

  • Primary 集群Envoy 启动时 解析并加载的集群,通常是静态定义的(但也可以是动态发现的)。

  • 这些集群通常 不会自动更新,除非 Envoy 重新加载配置。

特点

  • 可以是静态的(static)或者 动态的(EDS, DNS)。

  • 优先级较高,通常是 Envoy 最先解析并初始化 的集群。

  • 变更需要重新加载,或者依赖 动态服务发现(EDS) 来更新。

  • 适用于 长期稳定的后端服务,如数据库、监控系统等。

示例

clusters:
  - name: primary_cluster
    type: STATIC
    load_assignment:
      cluster_name: primary_cluster
      endpoints:
        - lb_endpoints:
            - endpoint:
                address:
                  socket_address:
                    address: 10.0.0.1
                    port_value: 80

Secondary 集群

概念

  • Secondary 集群 是在 Envoy 运行时 通过 xDS(如 CDS、EDS)动态添加和管理的。

  • 这些集群可以 在运行时动态添加、删除或修改,而无需重新加载 Envoy。

特点

  • 完全依赖 xDS 发现(如 CDS、EDS),不会在启动时静态加载

  • 动态性更强,适合需要 频繁变更 的服务(如微服务)。

  • 适用于大规模可变的服务发现场景,如 Kubernetes 中的 Pod 变更。

示例(CDS 动态发现)

dynamic_resources:
  cds_config:
    ads: {}

Cluster构建

无论是Primary cluster、还是Secondary Cluster,最终都是通过loadCluster把Cluster Protobuf变成Cluster对象。这两者的区别就是added_via_api,前者为false、后者为true。这个参数表明是否是通过API获取的。很明显Primary都不是通过API来获取的。

absl::StatusOr<ClusterManagerImpl::ClusterDataPtr>
ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
                                const uint64_t cluster_hash, const std::string& version_info,
                                bool added_via_api, const bool required_for_ads,
                                ClusterMap& cluster_map)

这个方法主要做了以下几件事:

1. 通过ClusterManagerFactory以及Cluster的Protobuf来创建ClusterThreadAwareLoadBalancer

source/common/upstream/cluster_manager_impl.cc

absl::StatusOr<std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>>
      new_cluster_pair_or_error =
          factory_.clusterFromProto(cluster, *this, outlier_event_logger_, added_via_api);

Cluster是对集群的抽象,而ThreadAwareLoadBalancer则是对这个集群Load Balancer的抽象,这个load balancer是感知线程的。 在实现自定义集群的时候需要自己来实现,目前Envoy中只有Dynamic forward proxyAggregateredis等三种类似的集群是实现了ThreadAwareLoadBalancer接口, 他们有自己专用的LoadBalancer,其他的集群用的都是Envoy内置的的几个标准Load Balancer实现。比如Aggregate集群的构造函数如下,他创建了AggregateThreadAwareLoadBalancer, 属于这个集群特有的LoadBalancer

source/extensions/clusters/aggregate/cluster.cc

absl::StatusOr<std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
ClusterFactory::createClusterWithConfig(
    const envoy::config::cluster::v3::Cluster& cluster,
    const envoy::extensions::clusters::aggregate::v3::ClusterConfig& proto_config,
    Upstream::ClusterFactoryContext& context) {
  absl::Status creation_status = absl::OkStatus();
  auto new_cluster =
      std::shared_ptr<Cluster>(new Cluster(cluster, proto_config, context, creation_status));
  RETURN_IF_NOT_OK(creation_status);
  auto lb = std::make_unique<AggregateThreadAwareLoadBalancer>(*new_cluster);
  return std::make_pair(new_cluster, std::move(lb));
}

2. 设置healthChecker、outlierDetector等callback

 ASSERT(state_ == State::WaitingToStartSecondaryInitialization ||
         state_ == State::CdsInitialized ||
         state_ == State::WaitingForPrimaryInitializationToComplete);
  ENVOY_LOG(debug, "maybe finish initialize primary init clusters empty: {}",
            primary_init_clusters_.empty());
  // If we are still waiting for primary clusters to initialize, do nothing.
  if (!primary_init_clusters_.empty()) {
    return;
  } else if (state_ == State::WaitingForPrimaryInitializationToComplete) {
    state_ = State::WaitingToStartSecondaryInitialization;
    if (primary_clusters_initialized_callback_) {
      primary_clusters_initialized_callback_();
    }
    return;
  }

首先判断是否完成了Primary Cluster的初始化,Primary Cluster初始化完成的标志就是primary_init_clusters_为空,因为载入的时候会把所有的Primary CLuster存进去, 然后遍历这个列表进行初始化,初始化完成的话则从这个列表中移除,因此这个列表为空就表明初始化完成了。

void ClusterManagerInitHelper::addCluster(ClusterManagerCluster& cm_cluster) {
  // See comments in ClusterManagerImpl::addOrUpdateCluster() for why this is only called during
  // server initialization.
  ASSERT(state_ != State::AllClustersInitialized);

  const auto initialize_cb = [&cm_cluster, this] {
    onClusterInit(cm_cluster);
    cm_cluster.cluster().info()->configUpdateStats().warming_state_.set(0);
  };
  Cluster& cluster = cm_cluster.cluster();

  cluster.info()->configUpdateStats().warming_state_.set(1);
  if (cluster.initializePhase() == Cluster::InitializePhase::Primary) {
    // Remove the previous cluster before the cluster object is destroyed.
    primary_init_clusters_.insert_or_assign(cm_cluster.cluster().info()->name(), &cm_cluster);
    cluster.initialize(initialize_cb);
  } else {
    ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary);
    // Remove the previous cluster before the cluster object is destroyed.
    secondary_init_clusters_.insert_or_assign(cm_cluster.cluster().info()->name(), &cm_cluster);
    if (started_secondary_initialize_) {
      // This can happen if we get a second CDS update that adds new clusters after we have
      // already started secondary init. In this case, just immediately initialize.
      cluster.initialize(initialize_cb);
    }
  }

  ENVOY_LOG(debug, "cm init: adding: cluster={} primary={} secondary={}", cluster.info()->name(),
            primary_init_clusters_.size(), secondary_init_clusters_.size());
}

void ClusterManagerInitHelper::onClusterInit(ClusterManagerCluster& cluster) {
  ASSERT(state_ != State::AllClustersInitialized);
  per_cluster_init_callback_(cluster);
  removeCluster(cluster);
}

void ClusterManagerInitHelper::removeCluster(ClusterManagerCluster& cluster) {
  if (state_ == State::AllClustersInitialized) {
    return;
  }

  // There is a remote edge case where we can remove a cluster via CDS that has not yet been
  // initialized. When called via the remove cluster API this code catches that case.
  absl::flat_hash_map<std::string, ClusterManagerCluster*>* cluster_map;
  if (cluster.cluster().initializePhase() == Cluster::InitializePhase::Primary) {
    cluster_map = &primary_init_clusters_;
  } else {
    ASSERT(cluster.cluster().initializePhase() == Cluster::InitializePhase::Secondary);
    cluster_map = &secondary_init_clusters_;
  }

  // It is possible that the cluster we are removing has already been initialized, and is not
  // present in the initializer map. If so, this is fine as a CDS update may happen for a
  // cluster with the same name. See the case "UpdateAlreadyInitialized" of the
  // target //test/common/upstream:cluster_manager_impl_test.
  auto iter = cluster_map->find(cluster.cluster().info()->name());
  if (iter != cluster_map->end() && iter->second == &cluster) {
    cluster_map->erase(iter);
  }
  ENVOY_LOG(debug, "cm init: init complete: cluster={} primary={} secondary={}",
            cluster.cluster().info()->name(), primary_init_clusters_.size(),
            secondary_init_clusters_.size());
  maybeFinishInitialize();
}

3. 如果Primary集群都初始化完成了,那接下来就看是否是在做Secondary cluster的初始化

secondary_init_clusters_不为空表明Secondary cluster还没有开始初始化或者没初始化完成,这个时候如果started_secondary_initialize_为false,表明 没有开始初始化。此时通过调用initializeSecondaryClusters开始正在的进行Secondary的初始化。

// If we are still waiting for secondary clusters to initialize, see if we need to first call
  // initialize on them. This is only done once.
  ENVOY_LOG(debug, "maybe finish initialize secondary init clusters empty: {}",
            secondary_init_clusters_.empty());
  if (!secondary_init_clusters_.empty()) {
    if (!started_secondary_initialize_) {
      ENVOY_LOG(info, "cm init: initializing secondary clusters");
      // If the first CDS response doesn't have any primary cluster, ClusterLoadAssignment
      // should be already paused by CdsApiImpl::onConfigUpdate(). Need to check that to
      // avoid double pause ClusterLoadAssignment.
      Config::ScopedResume maybe_resume_eds_leds_sds;
      if (cm_.adsMux()) {
        const std::vector<std::string> paused_xds_types{
            Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>(),
            Config::getTypeUrl<envoy::config::endpoint::v3::LbEndpoint>(),
            Config::getTypeUrl<envoy::extensions::transport_sockets::tls::v3::Secret>()};
        maybe_resume_eds_leds_sds = cm_.adsMux()->pause(paused_xds_types);
      }
      initializeSecondaryClusters();
    }
    return;

4. 初始化CDS,否则没有拿到所有的cluster没办法进行Seondary cluster的初始化

  // At this point, if we are doing static init, and we have CDS, start CDS init. Otherwise, move
  // directly to initialized.
  started_secondary_initialize_ = false;
  ENVOY_LOG(debug, "maybe finish initialize cds api ready: {}", cds_ != nullptr);
  if (state_ == State::WaitingToStartSecondaryInitialization && cds_) {
    ENVOY_LOG(info, "cm init: initializing cds");
    state_ = State::WaitingToStartCdsInitialization;
    cds_->initialize();
  } else {
    ENVOY_LOG(info, "cm init: all clusters initialized");
    state_ = State::AllClustersInitialized;
    if (initialized_callback_) {
      initialized_callback_();
    }
  }

cds初始化完成后会发送xds请求给控制面获取所有的cluster,当收到所有的cluster的时候,就触发cds设置的callback,在callback里面会再次触发maybeFinishInitialize 这个时候就走到了步骤3中的逻辑了。

void ClusterManagerInitHelper::setCds(CdsApi* cds) {
  ASSERT(state_ == State::Loading);
  cds_ = cds;
  if (cds_) {
    cds_->setInitializedCb([this]() -> void {
      ASSERT(state_ == State::WaitingToStartCdsInitialization);
      state_ = State::CdsInitialized;
      maybeFinishInitialize();
    });
  }
}

cluster在线程中的同步

每一个Cluster初始化完成后都会在其callback中调用这个方法进行Cluster额外的初始化。在这个初始化中会添加一些callback 最后触发thread local cluster的更新,以确保每一个thread都包含了最新的cluster内容了。

void ClusterManagerImpl::onClusterInit(ClusterManagerCluster& cm_cluster)

在运行中一个Cluster动态初始化完成或者更新后,需要更新所有的Thread Local中,让所有的Thread可以拿到最新的Cluster

void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_cluster,
                                                      ThreadLocalClusterUpdateParams&& params);

设置集群的AddedOrUpdated位,表明已经更新了

bool add_or_update_cluster = false;
  if (!cm_cluster.addedOrUpdated()) {
    add_or_update_cluster = true;
    cm_cluster.setAddedOrUpdated();
  }

开始生成update hosts params、locality weight、overprovision_factor等需要参数

各个thread中的Cluster Priority Set会根据这些参数来进行更新。

for (auto& per_priority : params.per_priority_update_params_) {
    const auto& host_set =
        cm_cluster.cluster().prioritySet().hostSetsPerPriority()[per_priority.priority_];
    per_priority.update_hosts_params_ = HostSetImpl::updateHostsParams(*host_set);
    per_priority.locality_weights_ = host_set->localityWeights();
    per_priority.weighted_priority_health_ = host_set->weightedPriorityHealth();
    per_priority.overprovisioning_factor_ = host_set->overprovisioningFactor();
  }

在各个线程中获取到ThreadLocalClusterManagerImpl同步更新

  tls_.runOnAllThreads([info = cm_cluster.cluster().info(), params = std::move(params),
                        add_or_update_cluster, load_balancer_factory, map = std::move(host_map),
                        cluster_initialization_object = std::move(cluster_initialization_object),
                        drop_overload](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
    ASSERT(cluster_manager.has_value(),
           "Expected the ThreadLocalClusterManager to be set during ClusterManagerImpl creation.");

    // Cluster Manager here provided by the particular thread, it will provide
    // this allowing to make the relevant change.
    if (const bool defer_unused_clusters =
            cluster_initialization_object != nullptr &&
            !cluster_manager->thread_local_clusters_.contains(info->name()) &&
            !Envoy::Thread::MainThread::isMainThread();
        defer_unused_clusters) {
      // Save the cluster initialization object.
      ENVOY_LOG(debug, "Deferring add or update for TLS cluster {}", info->name());
      cluster_manager->thread_local_deferred_clusters_[info->name()] =
          cluster_initialization_object;

      // Invoke similar logic of onClusterAddOrUpdate.
      ThreadLocalClusterCommand command = [&cluster_manager,
                                           cluster_name = info->name()]() -> ThreadLocalCluster& {
        // If we have multiple callbacks only the first one needs to use the
        // command to initialize the cluster.
        auto existing_cluster_entry = cluster_manager->thread_local_clusters_.find(cluster_name);
        if (existing_cluster_entry != cluster_manager->thread_local_clusters_.end()) {
          return *existing_cluster_entry->second;
        }

        auto* cluster_entry = cluster_manager->initializeClusterInlineIfExists(cluster_name);
        ASSERT(cluster_entry != nullptr, "Deferred clusters initiailization should not fail.");
        return *cluster_entry;
      };
      for (auto cb_it = cluster_manager->update_callbacks_.begin();
           cb_it != cluster_manager->update_callbacks_.end();) {
        // The current callback may remove itself from the list, so a handle for
        // the next item is fetched before calling the callback.
        auto curr_cb_it = cb_it;
        ++cb_it;
        (*curr_cb_it)->onClusterAddOrUpdate(info->name(), command);
      }

    } else {
      // Broadcast
      ThreadLocalClusterManagerImpl::ClusterEntry* new_cluster = nullptr;
      if (add_or_update_cluster) {
        if (cluster_manager->thread_local_clusters_.contains(info->name())) {
          ENVOY_LOG(debug, "updating TLS cluster {}", info->name());
        } else {
          ENVOY_LOG(debug, "adding TLS cluster {}", info->name());
        }

        new_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(*cluster_manager, info,
                                                                      load_balancer_factory);
        cluster_manager->thread_local_clusters_[info->name()].reset(new_cluster);
        cluster_manager->local_stats_.clusters_inflated_.set(
            cluster_manager->thread_local_clusters_.size());
      }

      if (cluster_manager->thread_local_clusters_[info->name()]) {
        cluster_manager->thread_local_clusters_[info->name()]->setDropOverload(drop_overload);
      }
      for (const auto& per_priority : params.per_priority_update_params_) {
        cluster_manager->updateClusterMembership(
            info->name(), per_priority.priority_, per_priority.update_hosts_params_,
            per_priority.locality_weights_, per_priority.hosts_added_, per_priority.hosts_removed_,
            per_priority.weighted_priority_health_, per_priority.overprovisioning_factor_, map);
      }

      if (new_cluster != nullptr) {
        ThreadLocalClusterCommand command = [&new_cluster]() -> ThreadLocalCluster& {
          return *new_cluster;
        };
        for (auto cb_it = cluster_manager->update_callbacks_.begin();
             cb_it != cluster_manager->update_callbacks_.end();) {
          // The current callback may remove itself from the list, so a handle for
          // the next item is fetched before calling the callback.
          auto curr_cb_it = cb_it;
          ++cb_it;
          (*curr_cb_it)->onClusterAddOrUpdate(info->name(), command);
        }
      }
    }
  });

router分析

Envoy 通过 route_config 规则匹配请求路径:

  • 根据 HostPathMethod 进行匹配。
  • 将请求路由到 指定集群(Cluster)
routes:
  - match:
      prefix: "/api"
    route:
      cluster: backend_service

此外流量定向到对应的cluster后,需要获取custer下的主机列表做负载均衡

  • Envoy 支持 多种负载均衡策略
    • Round Robin(轮询)
    • Random(随机)
    • Least Request(最少请求)
    • Ring Hash(哈希一致性)
  • 负载均衡策略在 Cluster 配置 中定义:
clusters:
  - name: backend_service
    type: EDS
    lb_policy: ROUND_ROBIN
    load_assignment:
      cluster_name: backend_service
      endpoints:
        - lb_endpoints:
            - endpoint:
                address:
                  socket_address:
                    address: 10.0.0.1
                    port_value: 8080

转载请注明出处或者链接地址:https://www.qianduange.cn//article/24213.html
评论
发布的文章

动态规划感悟1

2025-03-20 12:03:52

华为NAS真实测评!

2025-03-20 12:03:52

Java设计模式之代理模式

2025-03-20 12:03:51

Linux 锁、线程同步

2025-03-20 12:03:48

大家推荐的文章
会员中心 联系我 留言建议 回顶部
复制成功!