Skip to content

Commit 18c2e3b

Browse files
authored
Merge pull request #444 from Lyt99/feature/batch-describe-network-interfaces
reduce call for DescribeNetworkInstaces and DescribeVPCAttribute
2 parents 790d4b9 + 713e553 commit 18c2e3b

File tree

3 files changed

+140
-117
lines changed

3 files changed

+140
-117
lines changed

pkg/controller/service/clbv1/vgroups.go

Lines changed: 86 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"k8s.io/apimachinery/pkg/types"
99
utilerrors "k8s.io/apimachinery/pkg/util/errors"
1010
"k8s.io/apimachinery/pkg/util/intstr"
11+
"k8s.io/apimachinery/pkg/util/sets"
1112
utilfeature "k8s.io/apiserver/pkg/util/feature"
1213
ctrlCfg "k8s.io/cloud-provider-alibaba-cloud/pkg/config"
1314
"k8s.io/cloud-provider-alibaba-cloud/pkg/controller/service/reconcile/annotation"
@@ -35,6 +36,7 @@ func NewVGroupManager(kubeClient client.Client, cloud prvd.Provider) (*VGroupMan
3536
if err != nil {
3637
return nil, err
3738
}
39+
3840
return &VGroupManager{
3941
kubeClient: kubeClient,
4042
cloud: cloud,
@@ -69,36 +71,84 @@ func (mgr *VGroupManager) BuildLocalModel(reqCtx *svcCtx.RequestContext, m *mode
6971
return err
7072
}
7173

72-
vpcCIDRs, err := mgr.cloud.DescribeVpcCIDRBlock(reqCtx.Ctx, mgr.vpcId, candidates.AddressIPVersion)
74+
var vgs []model.VServerGroup
75+
containsPotentialReadyEndpoints := false
76+
for _, port := range reqCtx.Service.Spec.Ports {
77+
vg, cpr, err := mgr.buildVGroupForServicePort(reqCtx, port, candidates, m.LoadBalancerAttribute.IsUserManaged)
78+
if err != nil {
79+
return fmt.Errorf("build vgroup for port %d error: %s", port.Port, err.Error())
80+
}
81+
vgs = append(vgs, vg)
82+
containsPotentialReadyEndpoints = containsPotentialReadyEndpoints || cpr
83+
}
84+
85+
err = mgr.updateVServerGroupENIBackendID(reqCtx, vgs, candidates.AddressIPVersion)
7386
if err != nil {
74-
return fmt.Errorf("get vpc cidr error: %s", err.Error())
87+
return err
7588
}
7689

77-
vgs := make([]model.VServerGroup, len(reqCtx.Service.Spec.Ports))
78-
errs := make([]error, len(reqCtx.Service.Spec.Ports))
79-
containsPotentialReadyEndpoints := make([]bool, len(reqCtx.Service.Spec.Ports))
80-
parallel.Parallelize(reqCtx.Ctx, ctrlCfg.ControllerCFG.MaxConcurrentActions, len(vgs), func(i int) {
81-
port := reqCtx.Service.Spec.Ports[i]
82-
vg, cpr, err := mgr.buildVGroupForServicePort(reqCtx, vpcCIDRs, port, candidates, m.LoadBalancerAttribute.IsUserManaged)
83-
if err != nil {
84-
errs[i] = fmt.Errorf("build vgroup for port %d error: %s", port.Port, err.Error())
85-
return
90+
m.VServerGroups = vgs
91+
m.ContainsPotentialReadyEndpoints = containsPotentialReadyEndpoints
92+
return nil
93+
}
94+
95+
func (mgr *VGroupManager) updateVServerGroupENIBackendID(reqCtx *svcCtx.RequestContext, vgs []model.VServerGroup, ipVersion model.AddressIPVersionType) error {
96+
eniIPs := sets.Set[string]{}
97+
for _, vg := range vgs {
98+
for _, b := range vg.Backends {
99+
if b.Type == model.ENIBackendType {
100+
eniIPs.Insert(b.ServerIp)
101+
}
86102
}
87-
vgs[i] = vg
88-
containsPotentialReadyEndpoints[i] = cpr
89-
})
103+
}
90104

91-
cpr := false
92-
for _, c := range containsPotentialReadyEndpoints {
93-
if c {
94-
cpr = true
95-
break
105+
ips := eniIPs.UnsortedList()
106+
if len(ips) == 0 {
107+
return nil
108+
}
109+
result, err := mgr.cloud.DescribeNetworkInterfaces(mgr.vpcId, ips, ipVersion)
110+
if err != nil {
111+
return fmt.Errorf("call DescribeNetworkInterfaces: %s", err.Error())
112+
}
113+
114+
var vpcCIDRs []*net.IPNet
115+
for i := range vgs {
116+
var filteredBackends []model.BackendAttribute
117+
var skipIPs []string
118+
for _, b := range vgs[i].Backends {
119+
if b.Type == model.ENIBackendType {
120+
eniid, ok := result[b.ServerIp]
121+
if !ok {
122+
if vpcCIDRs == nil {
123+
vpcCIDRs, err = mgr.cloud.DescribeVpcCIDRBlock(reqCtx.Ctx, mgr.vpcId, ipVersion)
124+
if err != nil {
125+
return fmt.Errorf("call DescribeVpcCIDRBlock: %s", err.Error())
126+
}
127+
}
128+
if containsIP(vpcCIDRs, b.ServerIp) {
129+
return fmt.Errorf("can not find eniid for ip %s in vpc %s", b.ServerIp, mgr.vpcId)
130+
} else {
131+
skipIPs = append(skipIPs, b.ServerIp)
132+
continue
133+
}
134+
}
135+
b.ServerId = eniid
136+
}
137+
filteredBackends = append(filteredBackends, b)
138+
}
139+
vgs[i].Backends = filteredBackends
140+
if len(skipIPs) > 0 {
141+
reqCtx.Log.Info(fmt.Sprintf("warning: filter pods by vpc cidr %+v, podIPs=%+v", vpcCIDRs, skipIPs))
142+
reqCtx.Recorder.Event(
143+
reqCtx.Service,
144+
v1.EventTypeNormal,
145+
helper.SkipSyncBackends,
146+
fmt.Sprintf("Not sync pods [%s] whose ip is not in vpc cidrs", strings.Join(skipIPs, ",")),
147+
)
96148
}
97149
}
98150

99-
m.VServerGroups = vgs
100-
m.ContainsPotentialReadyEndpoints = cpr
101-
return utilerrors.NewAggregate(errs)
151+
return nil
102152
}
103153

104154
func (mgr *VGroupManager) BuildRemoteModel(reqCtx *svcCtx.RequestContext, m *model.LoadBalancer) error {
@@ -386,7 +436,7 @@ func isReusedVGroup(reusedVgIDs []string, vGroupId string) bool {
386436
return false
387437
}
388438

389-
func (mgr *VGroupManager) buildVGroupForServicePort(reqCtx *svcCtx.RequestContext, vpcCIDRs []*net.IPNet,
439+
func (mgr *VGroupManager) buildVGroupForServicePort(reqCtx *svcCtx.RequestContext,
390440
port v1.ServicePort, candidates *backend.EndpointWithENI, isUserManagedLB bool) (model.VServerGroup, bool, error) {
391441

392442
vg := model.VServerGroup{
@@ -446,19 +496,19 @@ func (mgr *VGroupManager) buildVGroupForServicePort(reqCtx *svcCtx.RequestContex
446496
switch candidates.TrafficPolicy {
447497
case helper.ENITrafficPolicy:
448498
reqCtx.Log.Info(fmt.Sprintf("eni mode, build backends for %s", vg.NamedKey))
449-
backends, err = mgr.buildENIBackends(reqCtx, vpcCIDRs, candidates, initialBackends, vg)
499+
backends, err = mgr.buildENIBackends(reqCtx, candidates, initialBackends, vg)
450500
if err != nil {
451501
return vg, false, fmt.Errorf("build eni backends error: %s", err.Error())
452502
}
453503
case helper.LocalTrafficPolicy:
454504
reqCtx.Log.Info(fmt.Sprintf("local mode, build backends for %s", vg.NamedKey))
455-
backends, err = mgr.buildLocalBackends(reqCtx, vpcCIDRs, candidates, initialBackends, vg)
505+
backends, err = mgr.buildLocalBackends(reqCtx, candidates, initialBackends, vg)
456506
if err != nil {
457507
return vg, false, fmt.Errorf("build local backends error: %s", err.Error())
458508
}
459509
case helper.ClusterTrafficPolicy:
460510
reqCtx.Log.Info(fmt.Sprintf("cluster mode, build backends for %s", vg.NamedKey))
461-
backends, err = mgr.buildClusterBackends(reqCtx, vpcCIDRs, candidates, initialBackends, vg)
511+
backends, err = mgr.buildClusterBackends(reqCtx, candidates, initialBackends, vg)
462512
if err != nil {
463513
return vg, false, fmt.Errorf("build cluster backends error: %s", err.Error())
464514
}
@@ -681,22 +731,22 @@ func (mgr *VGroupManager) setBackendsFromEndpointSlices(reqCtx *svcCtx.RequestCo
681731
return backends, containsPotentialReadyEndpoints, nil
682732
}
683733

684-
func (mgr *VGroupManager) buildENIBackends(reqCtx *svcCtx.RequestContext, vpcCIDRs []*net.IPNet,
685-
candidates *backend.EndpointWithENI, backends []model.BackendAttribute, vgroup model.VServerGroup) ([]model.BackendAttribute, error) {
734+
func (mgr *VGroupManager) buildENIBackends(reqCtx *svcCtx.RequestContext, candidates *backend.EndpointWithENI,
735+
backends []model.BackendAttribute, vgroup model.VServerGroup) ([]model.BackendAttribute, error) {
686736
if len(backends) == 0 {
687737
return nil, nil
688738
}
689739

690-
backends, err := updateENIBackends(reqCtx, mgr, vpcCIDRs, backends, candidates.AddressIPVersion)
740+
backends, err := updateENIBackends(reqCtx, mgr, backends, candidates.AddressIPVersion)
691741
if err != nil {
692742
return backends, err
693743
}
694744

695745
return setWeightBackends(helper.ENITrafficPolicy, backends, vgroup.VGroupWeight), nil
696746
}
697747

698-
func (mgr *VGroupManager) buildLocalBackends(reqCtx *svcCtx.RequestContext, vpcCIDRs []*net.IPNet,
699-
candidates *backend.EndpointWithENI, initBackends []model.BackendAttribute, vgroup model.VServerGroup) ([]model.BackendAttribute, error) {
748+
func (mgr *VGroupManager) buildLocalBackends(reqCtx *svcCtx.RequestContext, candidates *backend.EndpointWithENI,
749+
initBackends []model.BackendAttribute, vgroup model.VServerGroup) ([]model.BackendAttribute, error) {
700750
var (
701751
ecsBackends, eciBackends []model.BackendAttribute
702752
err error
@@ -743,7 +793,7 @@ func (mgr *VGroupManager) buildLocalBackends(reqCtx *svcCtx.RequestContext, vpcC
743793
// 2. add eci backends
744794
if len(eciBackends) != 0 {
745795
reqCtx.Log.Info("add eciBackends")
746-
eciBackends, err = updateENIBackends(reqCtx, mgr, vpcCIDRs, eciBackends, candidates.AddressIPVersion)
796+
eciBackends, err = updateENIBackends(reqCtx, mgr, eciBackends, candidates.AddressIPVersion)
747797
if err != nil {
748798
return nil, fmt.Errorf("update eci backends error: %s", err.Error())
749799
}
@@ -772,8 +822,8 @@ func removeDuplicatedECS(backends []model.BackendAttribute) []model.BackendAttri
772822

773823
}
774824

775-
func (mgr *VGroupManager) buildClusterBackends(reqCtx *svcCtx.RequestContext, vpcCIDRs []*net.IPNet,
776-
candidates *backend.EndpointWithENI, initBackends []model.BackendAttribute, vgroup model.VServerGroup) ([]model.BackendAttribute, error) {
825+
func (mgr *VGroupManager) buildClusterBackends(reqCtx *svcCtx.RequestContext, candidates *backend.EndpointWithENI,
826+
initBackends []model.BackendAttribute, vgroup model.VServerGroup) ([]model.BackendAttribute, error) {
777827
var (
778828
ecsBackends, eciBackends []model.BackendAttribute
779829
err error
@@ -823,7 +873,7 @@ func (mgr *VGroupManager) buildClusterBackends(reqCtx *svcCtx.RequestContext, vp
823873
}
824874

825875
if len(eciBackends) != 0 {
826-
eciBackends, err = updateENIBackends(reqCtx, mgr, vpcCIDRs, eciBackends, candidates.AddressIPVersion)
876+
eciBackends, err = updateENIBackends(reqCtx, mgr, eciBackends, candidates.AddressIPVersion)
827877
if err != nil {
828878
return nil, fmt.Errorf("update eci backends error: %s", err.Error())
829879
}
@@ -834,44 +884,12 @@ func (mgr *VGroupManager) buildClusterBackends(reqCtx *svcCtx.RequestContext, vp
834884
return setWeightBackends(helper.ClusterTrafficPolicy, backends, vgroup.VGroupWeight), nil
835885
}
836886

837-
func updateENIBackends(reqCtx *svcCtx.RequestContext, mgr *VGroupManager, vpcCIDRs []*net.IPNet,
838-
backends []model.BackendAttribute, ipVersion model.AddressIPVersionType) (
839-
[]model.BackendAttribute, error) {
840-
var ips []string
841-
for _, b := range backends {
842-
ips = append(ips, b.ServerIp)
843-
}
844-
result, err := mgr.cloud.DescribeNetworkInterfaces(mgr.vpcId, ips, ipVersion)
845-
if err != nil {
846-
return nil, fmt.Errorf("call DescribeNetworkInterfaces: %s", err.Error())
847-
}
848-
849-
var skipIPs []string
887+
func updateENIBackends(reqCtx *svcCtx.RequestContext, mgr *VGroupManager, backends []model.BackendAttribute, ipVersion model.AddressIPVersionType) ([]model.BackendAttribute, error) {
850888
for i := range backends {
851-
eniid, ok := result[backends[i].ServerIp]
852-
if !ok {
853-
// if ip in vpcCIDRs, it should have a eni id
854-
if containsIP(vpcCIDRs, backends[i].ServerIp) {
855-
return nil, fmt.Errorf("can not find eniid for ip %s in vpc %s", backends[i].ServerIp, mgr.vpcId)
856-
} else {
857-
skipIPs = append(skipIPs, backends[i].ServerIp)
858-
}
859-
}
860889
// for ENI backend type, port should be set to targetPort (default value), no need to update
861-
backends[i].ServerId = eniid
862890
backends[i].Type = model.ENIBackendType
863891
}
864892

865-
if len(skipIPs) > 0 {
866-
reqCtx.Log.Info(fmt.Sprintf("warning: filter pods by vpc cidr %+v, podIPs=%+v", vpcCIDRs, skipIPs))
867-
reqCtx.Recorder.Event(
868-
reqCtx.Service,
869-
v1.EventTypeNormal,
870-
helper.SkipSyncBackends,
871-
fmt.Sprintf("Not sync pods [%s] whose ip is not in vpc cidrs", strings.Join(skipIPs, ",")),
872-
)
873-
}
874-
875893
return backends, nil
876894
}
877895

pkg/controller/service/nlbv2/server_groups.go

Lines changed: 49 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"k8s.io/apimachinery/pkg/api/errors"
66
"k8s.io/apimachinery/pkg/types"
7+
"k8s.io/apimachinery/pkg/util/sets"
78
"k8s.io/cloud-provider-alibaba-cloud/pkg/controller/service/reconcile/parallel"
89
"strconv"
910
"strings"
@@ -67,17 +68,15 @@ type serverGroupAction struct {
6768
}
6869

6970
func (mgr *ServerGroupManager) BuildLocalModel(reqCtx *svcCtx.RequestContext, mdl *nlbmodel.NetworkLoadBalancer) error {
71+
var sgs []*nlbmodel.ServerGroup
7072

7173
candidates, err := reconbackend.NewEndpointWithENI(reqCtx, mgr.kubeClient)
7274
if err != nil {
7375
return err
7476
}
7577

76-
sgs := make([]*nlbmodel.ServerGroup, len(mdl.Listeners))
77-
errs := make([]error, len(mdl.Listeners))
78-
containsPotentialReadyBackends := make([]bool, len(mdl.Listeners))
79-
parallel.Parallelize(reqCtx.Ctx, ctrlCfg.ControllerCFG.MaxConcurrentActions, len(mdl.Listeners), func(i int) {
80-
lis := mdl.Listeners[i]
78+
containsPotentialReadyBackends := false
79+
for _, lis := range mdl.Listeners {
8180
sg := &nlbmodel.ServerGroup{
8281
VPCId: mgr.vpcId,
8382
ServicePort: lis.ServicePort,
@@ -87,41 +86,61 @@ func (mgr *ServerGroupManager) BuildLocalModel(reqCtx *svcCtx.RequestContext, md
8786
if candidates.AddressIPVersion == model.IPv6 {
8887
sg.AddressIPVersion = string(model.DualStack)
8988
}
90-
if lis.ListenerPort != 0 {
91-
sg.NamedKey = getServerGroupNamedKey(reqCtx.Service, sg.Protocol, lis.ServicePort)
92-
} else {
93-
sg.AnyPortEnabled = true
94-
sg.HealthCheckConfig = &nlbmodel.HealthCheckConfig{
95-
HealthCheckEnabled: tea.Bool(true),
96-
HealthCheckConnectPort: sg.ServicePort.TargetPort.IntVal,
97-
}
98-
sg.NamedKey = getAnyPortServerGroupNamedKey(reqCtx.Service, sg.Protocol, lis.StartPort, lis.EndPort)
99-
}
89+
sg.NamedKey = getServerGroupNamedKey(reqCtx.Service, sg.Protocol, lis.ServicePort)
10090
sg.ServerGroupName = sg.NamedKey.Key()
10191
if err := setServerGroupAttributeFromAnno(sg, reqCtx.Anno); err != nil {
102-
errs[i] = err
103-
return
92+
return err
10493
}
10594
cpr, err := mgr.setServerGroupServers(reqCtx, sg, candidates, mdl.LoadBalancerAttribute.IsUserManaged)
10695
if err != nil {
107-
errs[i] = fmt.Errorf("set ServerGroup for port %d error: %s", lis.ServicePort.Port, err.Error())
108-
return
96+
return fmt.Errorf("set ServerGroup for port %d error: %s", lis.ServicePort.Port, err.Error())
10997
}
110-
sgs[i] = sg
111-
containsPotentialReadyBackends[i] = cpr
112-
})
98+
sgs = append(sgs, sg)
99+
containsPotentialReadyBackends = containsPotentialReadyBackends || cpr
100+
}
113101

114-
cpr := false
115-
for _, c := range containsPotentialReadyBackends {
116-
if c {
117-
cpr = true
118-
break
119-
}
102+
err = mgr.updateServerGroupENIBackendID(reqCtx, sgs, candidates.AddressIPVersion)
103+
if err != nil {
104+
return err
120105
}
121106

122107
mdl.ServerGroups = sgs
123-
mdl.ContainsPotentialReadyEndpoints = cpr
124-
return utilerrors.NewAggregate(errs)
108+
mdl.ContainsPotentialReadyEndpoints = containsPotentialReadyBackends
109+
return nil
110+
}
111+
112+
func (mgr *ServerGroupManager) updateServerGroupENIBackendID(reqCtx *svcCtx.RequestContext, sgs []*nlbmodel.ServerGroup, ipVersion model.AddressIPVersionType) error {
113+
eniIPs := sets.Set[string]{}
114+
for _, sg := range sgs {
115+
for _, s := range sg.Servers {
116+
if s.ServerType == nlbmodel.EniServerType {
117+
eniIPs.Insert(s.ServerIp)
118+
}
119+
}
120+
}
121+
122+
ips := eniIPs.UnsortedList()
123+
if len(ips) == 0 {
124+
return nil
125+
}
126+
result, err := mgr.cloud.DescribeNetworkInterfaces(mgr.vpcId, ips, ipVersion)
127+
if err != nil {
128+
return fmt.Errorf("call DescribeNetworkInterfaces: %s", err.Error())
129+
}
130+
131+
for _, sg := range sgs {
132+
for i := range sg.Servers {
133+
if sg.Servers[i].ServerType == nlbmodel.EniServerType {
134+
eniid, ok := result[sg.Servers[i].ServerIp]
135+
if !ok {
136+
return fmt.Errorf("can not find eniid for ip %s in vpc %s", sg.Servers[i].ServerIp, mgr.vpcId)
137+
}
138+
sg.Servers[i].ServerId = eniid
139+
}
140+
}
141+
}
142+
143+
return nil
125144
}
126145

127146
func (mgr *ServerGroupManager) ListNLBServerGroups(reqCtx *svcCtx.RequestContext) ([]*nlbmodel.ServerGroup, error) {
@@ -937,23 +956,8 @@ func updateENIBackends(mgr *ServerGroupManager, backends []nlbmodel.ServerGroupS
937956
return backends, nil
938957
}
939958

940-
var ips []string
941-
for _, b := range backends {
942-
ips = append(ips, b.ServerIp)
943-
}
944-
945-
result, err := mgr.cloud.DescribeNetworkInterfaces(mgr.vpcId, ips, ipVersion)
946-
if err != nil {
947-
return nil, fmt.Errorf("call DescribeNetworkInterfaces: %s", err.Error())
948-
}
949-
950959
for i := range backends {
951-
eniid, ok := result[backends[i].ServerIp]
952-
if !ok {
953-
return nil, fmt.Errorf("can not find eniid for ip %s in vpc %s", backends[i].ServerIp, mgr.vpcId)
954-
}
955960
// for ENI backend type, port should be set to targetPort (default value), no need to update
956-
backends[i].ServerId = eniid
957961
backends[i].ServerType = nlbmodel.EniServerType
958962
}
959963
return backends, nil

0 commit comments

Comments
 (0)