Master部分

1、master初始化

  • 以node name创建一个distributed logical router
  • 创建两个load balancer用于处理east-west traffic,一个处理TCP,另一个处理UDP
  • 创建一个名为"join"的logical switch用于连接gateway router和distributed router。"join"的IP地址范围为100.64.1.0/24
  • 将distributed router和"join"相连接

2、创建management port

这部分的主要内容为master node创建一个logical switch用于连接distributed router。这个switch仅有一个logical port(A OVS internal interface),它有如下用途:

  • 通过该logical port可以用私有IP访问其他node上运行的容器
  • 当在master node上创建该port时,集群中的容器就不用通过NAT也能访问k8s daemon了
  • node可以用pod的IP地址对pod进行health-check

具体操作步骤如下:

  • 创建一个router port,并且给它分配该local_subnet(该node分配到的IP地址区间)的第一个地址。
  • 创建一个logical switch并且设置它的子网范围,名字就为node_name
  • 将该logical switch连接到router上
  • 如果br-int不存在,则创建之,并且在它上面创建一个OVS internal interface,名字为"k8s-%s" % (node_name[:11])
  • 创建一个OVN logical port,名字为"k8s-" + node_name,之后再对上文的internal interface进行配置
  • 首先启动该interface,如果该interface之前就已经存在了,就删除其上的路由和IP,再给该interface配置IP,并添加到达整个集群的路由
  • 最后,将load balancer添加到logical switch上

Minion部分

对于minion部分,如果所在的平台不是windos,则先调用_linux_init()进行初始化配置,之后再创建management port,做法和master完全一样

1、_linux_init()初始化

这部分主要是对于CNI的配置,具体操作如下:

  • 首先找到ovn-k8s-cni-overlay所在的位置cni_plugin_path,如果cni插件的可执行文件CNI_LINK_PATH/ovn_cni不存在,则构造一个软链接,因此最终k8s调用的cni插件就是ovn-k8s-cni-overlay
  • 接着写入cni network的配置文件,直接创建CNI_CONF_PATH/10-net.conf文件,具体内容如下:
        data = {
"name": "net",
"type": "ovn_cni",
"bridge": "br-int",
"isGateway": "true",
"ipMasq": "false",
"ipam": {
"type": "host-local",
"subnet": args.minion_switch_subnet
}
}

  

 Gateway部分

  • 不能同时指定参数args.physical_interface和args.bridge_interface,两个参数代表了两种方式
  • 首先判断gateway router之前是否已经有被创建,如果没有的话,创建之,且名字为"GR_%s" % (node_name)
  • 接着再将gateway router和上文中的"join"相连接
  • 在gateway router中添加静态路由,将distributed router作为nexthop,而gateway和各个distributed router构成的子网为100.64.1.0/24,连接端口的ip地址通过generate_gateway_ip()生成
  • 在distributed router中添加静态路由,并且将第一个gateway router作为默认网关
  • 为每个gateway router创建两个north-south load-balancer
  • 创建一个external switch用于连接physical interface,名字为"ext_%s" % (node_name)
  • 当我们使用网桥方案,例如将eth0绑定到网桥breth0上,首先进行操作,保证网桥mac地址不变(因为通常有新端口加入时,网桥的端口会发生变化)
  • 设置iface_id为"%s_%s" % (args.bridge_interface, node_name)
  • 创建patch port将br-int和breth0相连
  • 在external_switch上创建一个external interface,名字就为上文中的iface_id,并且地址为"unknown",外部世界通过该端口相连
  • 将gateway router上添加端口,地址是breth0的mac地址,ip是breth0的ip,名字为"rtoe-" + gateway_router
  • 在gateway router上添加默认路由,网关为参数中指定的默认网关
  • 将external_switch和gateway router相连,且端口为"etor-" + gateway_router
  • 在gateway router上设置默认的SNAT

ovn-k8s-gateway-helper部分

  • 首先获取参数,例如--physical-bridge和--physical-interface,接着确认ovs.dirs.RUNDIR和ovs.dirs.LOGDIR的路径(一般为"/var/run/openvswitch"和"/var/log/openvswitch"),最后确定k8s api server
  • 确认physical-bridge是否存在,存在则调用["ovs-vsctl", "get", "interface", $physical_interface, "ofport"]获取physical interface,且命名为physical_interface_ofport
  • 创建一对patch port将physical bridge和br-int相连,再调用命令获取physical bridge上所在的patch port的ofport,且命名为br_int_ofport
  • 调用pool.spawn(_unixctl_run)创建一个"线程",其中_unixctl_run调用ovs.unixctl.server.UnixctlServer.Create(None)创建unixctls,并在一个while死循环中持续运行unixctl_server.run()
  • 调用函数add_conntrack_rules(),在physical bridge上创建如下的flow:
    • 在table 0中,对于来自pod并且发往外部世界的包,提交给connection,从而相反的流量能够返回pods
    • 在table 0中,对于来自外部世界的包,将它通过conntrack提交到table 1,从而了解连接的状态
    • 在table 1中,将established and related connections发往pod
    • 最后,其余的连接的action都为NORMAL
  • 接着调用syn_services():
    • 从api server获取所有的services
    • 遍历所有的services,对于类型不为"NodePort"的service直接跳过,如果service中没有“ports”也直接跳过
    • 如果"ports"中不存在"nodePort"也直接跳过,如果获取的"protocol"不为"udp"和"tcp"则直接跳过,否则将"%s_%s" % (protocol, port)添加到node_ports缓存中
  • 利用'ovs-ofctl dump-flows'命令获取缓存的flow,再将过时的与node ports相关的flow删除
  • 最后一个死循环,遍历service stream,并调用service_events(event):
    • 首先从中获取event type,service name,namespace以及service type等参数,如果service type不为"NodePort"则直接跳过
    • 以"%s_%s" % (namespace, service_name)为ket获取缓存,如果event_type不为"DELETE"且缓存存在,则直接返回
    • 获取service ports,遍历之,如果port中没有"nodePort"则直接跳过,再从中获取protocol,设置protocol_dst = "%s_dst" % (protocol),node_port_key = "%s_%s" % (protocol, port)
    • 若event type为"DELETE",则先将node_port_key从nodes_port_cache中删除,再删除相应的flow
    • 否则,对于其他类型,则先将node_port_key加入node_ports_cache,再添加flow,flow的内容为["add-flow", physical_bridge, "priority=100", "in_port=physical_interface_ofport", protocol, protocol_dst=port, "action=br_int_ofport"]

Watcher部分

1、首先确定ovs的rundir和logdir以及OVN ND unix socket的路径

2、接着,确定k8s api server是否运行, cluster router,两个load balancer是否已经创建

3、首先调用pool.spawn(conn_processor.run_processor)启动一个processor用于处理之后的各种event

4、调用_create_k8s_pod_watcher(),_create_k8s_service_watcher()和_create_k8s_endpoint_watcher()创建三个watcher,pod_watcher_inst的创建过程如下所示:

pod watcher:

  • 首先调用kubernetes.watch_pods(variable.K8S_API_SERVER)创建pod_stream
  • 调用_sync_k8s_pods()进行k8s pod和ovn配置的同步
    • 先调用mode = ovn_k8s.modes.overlay.OvnNB()创建对象用以操控OVN
    • 接着调用pods = kubernetes.get_all_pods(variable.K8S_API_SEVER)获取所有的pod信息
    • 若pods不为空,则调用mode.sync_pods()进行具体的操作,总的来说,syn_pods()就是将已被删除的pod的logical_port删除

      • 创建expected_logical_ports = set()
      • 遍历pods,创建logical_port = "%s_%s" % (namespace, pod_name)获取对应pod的logical_port,并将其加入expected_logical_ports。接着从annotations中获取ip_address,接着再调用self._add_k8s_l4_port_name_cache()将其加入cache中
      • 调用existing_logical_ports = ovn_nbctl("--data=bare", "--no-heading", "--columns=name", "find", "logical_switch_port", "external_ids:pod=true").split()获取之前保存的logical_port
      • 最后,遍历existing_logical_ports - expected_logical_ports将pod已被删除的logical_ports,调用ovn_nbclt()删除
  • 调用pool.spawn(_process_func, pod_watch_inst, _create_k8s_pod_watcher)创建一个"线程"启动监听,对于service和endpoint的watcher同理。第一个参数_process_func是一个函数,而后两个参数是该函数的参数
  • _process_func(watcher, watcher_recycle_func)非常简单,就是无限循环地调用调用watcher.process(),如果发生异常,则再次调用watcher = watcher_recycle_func()重启一个watcher
  • PodWatcher的process()方法非常简单,就是调用util.process_stream(self._pod_stream, self._process_pod_event)
  • process_stream(data_stream, event_callback)首先调用line = next(data_stream)获取一个pod信息,如果没有发生异常的话,就直接调用event_callback(json.load(line))
  • _process_pod_event(self, event)
    • 首先从event中获取pod_data = event['object'],即pod信息和event_type = event['type'],即事件类型
    • 再创建cache_key = "%s_%s" % (namespace, pod_name),其实就是logical_port,再调用self._update_pod_cache(event_type, cache_key, pod_data)更新pod cache
    • 如果之前没有该pod的缓存,或者event_type为"DELETE",则调用self._send_connectivity_event(event_type, pod_name, pod_data)
    • _send_connectivity_event()首先调用ev  = ovn_k8s.processor.Event(event_type, source = pod_name, metadata = pod_data)创建一个Event类,接着调用conn_processor.get_event_queue().put(ev)将该事件加入队列中

service watcher

  • 整个过程和pod watcher是类似的,首先调用kubernetes.watch_services(variable.K8S_API_SERVER)获取service_stream
  • 调用_sync_k8s_services()进行k8s service和ovn配置的同步:
    • 先调用mode = ovn_k8s.modes.overlay.OvnNB()创建对象以操控OVN
    • 接着调用services = kubernetes.get_all_services(variables.K8S_API_SERVER)获取所有的service信息
    • 若services不为空,则调用mode.sync_services(services)进行具体的操作:
      • 对于所有的"clusterIP" services创建cluster_services = {'TCP': [], 'UDP': []},内部填充'IP:port',对于所有的NodePort service,创建nodeport_services = {'TCP': [], 'UDP': []},内部填充nodeport或者'external_ip:port'
      • 遍历从api server中获取 的service,如果service type 既不为"ClusterIP"又不为"NodePort",或者获取的"clusterIP"(service_ip)和"ports"(service_ports)为空,则跳过该service
      • 遍历service_ports,如果service_type为"NodePort"则获取port为"nodePort",否则获取port为"port",若port为空,则跳过
      • 调用service_port.get('protocol', 'TCP')获取protocol,再根据service_type和protocol分别向nodeport_services和cluster_service添加记录
      • 遍历获取的external_ips,根据protocol向nodeport_services中添加记录,记录的内容为"%s:%s" % (external_ip, port)
    • 对于OVN cluster,如果有vip在OVN load-balancer中存在,但是在当前的k8s service中不存在,则将其删除
    • 对于每个gateway,删除所有不存在于"nodeport_services"上的vip
  • 调用pool.spawn(_process_func, service_watcher_inst, _create_k8s_service_watcher)创建一个"线程"监听
  • _process_func(watcher, watcher_recycle_func)非常简单,就是无限循环地调用调用watcher.process(),如果发生异常,则再次调用watcher = watcher_recycle_func()重启一个watcher
  • ServiceWatcher的process()方法非常简单,就是调用util.process_stream(self._service_stream, self._process_service_event)
  • process_stream(data_stream, event_callback)首先调用line = next(data_stream)获取一个service信息,如果没有发生异常的话,就直接调用event_callback(json.load(line))
  • _process_service_event(self, event):
    • 创建service_data = event['object']
    • 调用cluster_ip = service_data['spec'].get('clusterIP')获取cluster_ip(vip),不过有可能在service创建的时候,还并没有分配一个cluster_ip,这时候就直接返回
    • 获取service_name,namespace和event_type,创建cache_key = "%s_%s" % (namespace, service_name),并且由此获取cache_service,并调用self._update_service_cache(event_type, cache_key, service_data)进行更新
    • 当cache_service为空,或者event_type为"DELETE",则调用self._send_connectivity_event(event_type, service_name, service_data),将其放入消息队列中
  • 最终,service的创建工作由update_vip(self, event)完成:

    • 从event中获取service_type和service_name,如果service_type不为”clusterIP“或者"nodePort"就忽略
    • 从event中获取event_type和namespace,创建cache_key = "%s_%s" % (namespace, service_name)
    • 调用self._update_service_cache(event_type, cache_key, service_data),如果event.event_type为"DELETED",则调用self._update_vip(service_data, None)
    • _update_vip(self, service_data, ips):
      • 从service_data中获取service_type, namespace, service_ip,service_ports,external_ips
      • 遍历service_ports,获取port, protocol, target_port,如果service_type为"NodePort"则调用self._create_gateways_vip(namespace, ips, port, target_port, protocol),如果service_type为"ClusterIP"则调用self._create_cluster_vip(namespace, service_ip, ips, port, target_port, protocol)
      • 遍历external_ips,将"external_ip:port"作为VIP加入gateway load-balancer,调用self._create_external_vip(namespace, external_ip, ips, port, target_port, protocol)

最新文章

  1. Linux进程间通信(九):数据报套接字 socket()、bind()、sendto()、recvfrom()、close()
  2. Asp.net Session 与Cookie的应用
  3. Nginx 笔记与总结(3)配置虚拟主机
  4. eclipse不正常编译导致错误:Access denied for user 'root'@'localhost' (using password: YES)
  5. layer.js定制弹窗
  6. 【BZOJ】【2132】圈地计划
  7. PDA库位商品出库适配算法
  8. Delphi- 内置数据库的使用例子BDE
  9. QT下int与QByteArray的转换
  10. MySQL常用命令及操作
  11. 按自己的想法去理解事件和泛型(C#)
  12. Apache+mod_encoding解决URL中文编码问题
  13. html js文字左右滚动插件
  14. Java面试题集合(比较实用)
  15. [面试]future模式
  16. php 微信自定义分享接口
  17. HashMap在JDK1.7中可能出现的并发问题
  18. Selenium Python FirefoxWebDriver处理打开保存对话框
  19. Logback 日志持久化
  20. 浅谈padding

热门文章

  1. mybatis、ibatis 和spring集成
  2. CGPathCreateMutable
  3. 【Hadoop】HA 场景下访问 HDFS JAVA API Client
  4. PHP——连接数据库初
  5. jquery checkbox勾选/取消
  6. Unity3D项目之 Survival Shooter 记录
  7. Linux性能调优、Linux集群与存储等
  8. WinForm------如何打开子窗体的同时关闭父窗体
  9. 修改tomcat配置通过域名直接访问项目首页
  10. 第四篇:使用 CUBLAS 库给矩阵运算提速