Skip to content

Commit

Permalink
Merge pull request #935 from loxilb-io/egr-fix2
Browse files Browse the repository at this point in the history
PR - #877 egress support - small fixes
  • Loading branch information
UltraInstinct14 authored Jan 16, 2025
2 parents ae51e55 + ac0390c commit ad1b883
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 66 deletions.
2 changes: 1 addition & 1 deletion api/restapi/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,20 @@ const (

const BFDPort = 3784
const BFDDefRetryCount = 3

const (
// CIDefault - Default CI Instance name
CIDefault = "llb-inst0"
// CIMasterStateString - Master state string for a cluster instance
CIMasterStateString = "MASTER"
// CIBackupStateString - Backup state string for a cluster instance
CIBackupStateString = "BACKUP"
// CIFaultStateString - Fault state string for a cluster instance
CIFaultStateString = "FAULT"
// CIStopStateString - Stop state string for a cluster instance
CIStopStateString = "STOP"
// CIUnDefStateString - Undefined state string for a cluster instance
CIUnDefStateString = "NOT_DEFINED"
)

const (
Expand Down
18 changes: 9 additions & 9 deletions pkg/loxinet/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,18 +174,18 @@ func (h *CIStateH) CIStateGetInst(inst string) (string, error) {
return ci.StateStr, nil
}

return "NOT_DEFINED", errors.New("not found")
return cmn.CIUnDefStateString, errors.New("not found")
}

// CIInit - routine to initialize Cluster context
func CIInit(args CIKAArgs) *CIStateH {
var nCIh = new(CIStateH)
nCIh.StateMap = make(map[string]int)
nCIh.StateMap["MASTER"] = cmn.CIStateMaster
nCIh.StateMap["BACKUP"] = cmn.CIStateBackup
nCIh.StateMap["FAULT"] = cmn.CIStateConflict
nCIh.StateMap["STOP"] = cmn.CIStateNotDefined
nCIh.StateMap["NOT_DEFINED"] = cmn.CIStateNotDefined
nCIh.StateMap[cmn.CIMasterStateString] = cmn.CIStateMaster
nCIh.StateMap[cmn.CIBackupStateString] = cmn.CIStateBackup
nCIh.StateMap[cmn.CIFaultStateString] = cmn.CIStateConflict
nCIh.StateMap[cmn.CIStopStateString] = cmn.CIStateNotDefined
nCIh.StateMap[cmn.CIUnDefStateString] = cmn.CIStateNotDefined
nCIh.SpawnKa = args.SpawnKa
nCIh.RemoteIP = args.RemoteIP
nCIh.SourceIP = args.SourceIP
Expand All @@ -194,7 +194,7 @@ func CIInit(args CIKAArgs) *CIStateH {

if _, ok := nCIh.ClusterMap[cmn.CIDefault]; !ok {
ci := &ClusterInstance{State: cmn.CIStateNotDefined,
StateStr: "NOT_DEFINED",
StateStr: cmn.CIUnDefStateString,
Vip: net.IPv4zero,
}
nCIh.ClusterMap[cmn.CIDefault] = ci
Expand Down Expand Up @@ -534,7 +534,7 @@ func (h *CIStateH) CIStateUpdate(cm cmn.HASMod) (int, error) {

if _, ok := h.ClusterMap[cm.Instance]; !ok {
h.ClusterMap[cm.Instance] = &ClusterInstance{State: cmn.CIStateNotDefined,
StateStr: "NOT_DEFINED",
StateStr: cmn.CIUnDefStateString,
Vip: net.IPv4zero}
tk.LogIt(tk.LogDebug, "[CLUSTER] New Instance %s created\n", cm.Instance)
}
Expand All @@ -559,7 +559,7 @@ func (h *CIStateH) CIStateUpdate(cm cmn.HASMod) (int, error) {
if mh.bgp != nil {
mh.bgp.UpdateCIState(cm.Instance, ci.State, ci.Vip)
}
go mh.zr.Rules.RulesSyncToClusterState()
go mh.zr.Rules.RulesSyncToClusterState(cm.Instance, cm.State)
return ci.State, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/loxinet/dpbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (dp *DpH) DpXsyncRPC(op DpSyncOpT, arg interface{}) int {
}
// FIXME - There is a race condition here
cIState, _ := mh.has.CIStateGetInst(tmpCti.CI)
if cIState != "MASTER" {
if cIState != cmn.CIMasterStateString {
return 0
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/loxinet/dpebpf_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,9 +857,9 @@ func DpRouteMod(w *RouteDpWorkQ) int {
if tk.IsNetIPv4(w.Dst.IP.String()) {
key4 := new(rt4Key)

len, _ := w.Dst.Mask.Size()
len += 16 /* 16-bit ZoneNum + prefix-len */
key4.l.prefixlen = C.uint(len)
mlen, _ := w.Dst.Mask.Size()
mlen += 16 /* 16-bit ZoneNum + prefix-len */
key4.l.prefixlen = C.uint(mlen)
kPtr = (*[6]uint8)(getPtrOffset(unsafe.Pointer(key4),
C.sizeof_struct_bpf_lpm_trie_key))

Expand All @@ -875,8 +875,8 @@ func DpRouteMod(w *RouteDpWorkQ) int {
} else {
key6 := new(rt6Key)

len, _ := w.Dst.Mask.Size()
key6.l.prefixlen = C.uint(len)
mlen, _ := w.Dst.Mask.Size()
key6.l.prefixlen = C.uint(mlen)

k6Ptr := (*C.uchar)(getPtrOffset(unsafe.Pointer(key6),
C.sizeof_struct_bpf_lpm_trie_key))
Expand Down
8 changes: 4 additions & 4 deletions pkg/loxinet/gobgpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,9 +778,9 @@ func (gbh *GoBgpH) initBgpClient() {

ciState, err := mh.has.CIStateGetInst(cmn.CIDefault)
if err == nil {
if ciState == "BACKUP" {
if ciState == cmn.CIBackupStateString {
gbh.resetBGPPolicy(true)
} else if ciState == "MASTER" {
} else if ciState == cmn.CIMasterStateString {
gbh.resetBGPPolicy(false)
}
}
Expand Down Expand Up @@ -853,9 +853,9 @@ func (gbh *GoBgpH) UpdateCIState(instance string, state int, vip net.IP) {
if update {
ciState, err := mh.has.CIStateGetInst(cmn.CIDefault)
if err == nil {
if ciState == "BACKUP" {
if ciState == cmn.CIBackupStateString {
gbh.resetBGPPolicy(true)
} else if ciState == "MASTER" {
} else if ciState == cmn.CIMasterStateString {
gbh.resetBGPPolicy(false)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/loxinet/loxinet.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func loxiNetTicker(bgpPeerMode bool) {
if mh.cloudHook != nil {
// Cleanup any cloud resources
ciState, _ := mh.has.CIStateGetInst(cmn.CIDefault)
if ciState == "MASTER" {
if ciState == cmn.CIMasterStateString {
bfdSessions, err := mh.has.CIBFDSessionGet()
if err == nil {
cleanCloudResources := true
Expand Down
35 changes: 19 additions & 16 deletions pkg/loxinet/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,6 @@ func (R *RuleH) modNatEpHost(r *ruleEnt, endpoints []ruleLBEp, doAddOp bool, liv
pPort = nep.xPort
} else if r.tuples.l4Prot.val == 17 {
pType = HostProbeConnectUDP
pType = HostProbeConnectTCP // FIXME
pPort = nep.xPort
} else if r.tuples.l4Prot.val == 1 {
pType = HostProbePing
Expand Down Expand Up @@ -2177,7 +2176,7 @@ func (R *RuleH) AddFwRule(fwRule cmn.FwRuleArg, fwOptArgs cmn.FwOptArg) (int, er
if fwOptArgs.OnDefault {
state, err := mh.has.CIStateGetInst(cmn.CIDefault)
if err == nil {
if state == "BACKUP" {
if state == cmn.CIBackupStateString {
return 0, nil
}
}
Expand Down Expand Up @@ -3213,7 +3212,7 @@ func (R *RuleH) AdvRuleVIP(IP net.IP, eIP net.IP, inst string, egress bool) erro
}

ciState, _ := mh.has.CIStateGetInst(inst)
if ciState == "MASTER" {
if ciState == cmn.CIMasterStateString {
dev := fmt.Sprintf("llb-rule-%s", IP.String())
ret, _ := R.zone.L3.IfaFindAddr(dev, IP)
if ret == 0 {
Expand Down Expand Up @@ -3257,7 +3256,7 @@ func (R *RuleH) AdvRuleVIP(IP net.IP, eIP net.IP, inst string, egress bool) erro
mh.has.CIAddClusterRoute(IP.String(), false)
}

} else if ciState != "NOT_DEFINED" {
} else if ciState != cmn.CIUnDefStateString {
if utils.IsIPHostAddr(IP.String()) {
ifname := "lo"
ev, _, iface := R.zone.L3.IfaSelectAny(IP, false)
Expand Down Expand Up @@ -3297,29 +3296,33 @@ func (R *RuleH) AdvRuleVIP(IP net.IP, eIP net.IP, inst string, egress bool) erro
return nil
}

func (R *RuleH) RulesSyncToClusterState() {
func (R *RuleH) RulesSyncToClusterState(inst, ciStateStr string) {

// For Cloud integrations, there is only default instance
ciState, _ := mh.has.CIStateGetInst(cmn.CIDefault)
if mh.cloudHook != nil {
if ciState == "MASTER" {
// For Cloud integrations, certain operations are performed only on default instance state changes
if mh.cloudHook != nil && inst == cmn.CIDefault {
if ciStateStr == cmn.CIMasterStateString {
mh.cloudHook.CloudPrepareVIPNetWork()
} else if ciState == "BACKUP" {
} else if ciStateStr == cmn.CIBackupStateString {
mh.cloudHook.CloudUnPrepareVIPNetWork()
}
}

for _, eFw := range R.tables[RtFw].eMap {
if eFw.act.action.(*ruleFwOpts).opt.onDflt {
if ciState == "MASTER" || ciState != "BACKUP" {
eFw.Fw2DP(DpCreate)
} else if ciState == "BACKUP" {
eFw.Fw2DP(DpRemove)
if inst == cmn.CIDefault {
for _, eFw := range R.tables[RtFw].eMap {
if eFw.act.action.(*ruleFwOpts).opt.onDflt {
if ciStateStr == cmn.CIMasterStateString || ciStateStr != cmn.CIBackupStateString {
eFw.Fw2DP(DpCreate)
} else if ciStateStr == cmn.CIBackupStateString {
eFw.Fw2DP(DpRemove)
}
}
}
}

for vip, vipElem := range R.vipMap {
if vipElem.inst != inst {
continue
}
ip := vipElem.pVIP
if ip == nil {
ip = net.ParseIP(vip)
Expand Down
47 changes: 23 additions & 24 deletions pkg/loxinet/utils_aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,8 @@ func (aws *AWSAPIStruct) CloudPrepareVIPNetWork() error {
if err != nil {
tk.LogIt(tk.LogError, "cidrBlock (%s) dissassociate failed in VPC %s:%s\n", cidrBlock, *vpcOut.Vpcs[0].VpcId, err)
return err
} else {
tk.LogIt(tk.LogInfo, "cidrBlock (%s) dissassociated from VPC %s\n", cidrBlock, *vpcOut.Vpcs[0].VpcId)
}
tk.LogIt(tk.LogInfo, "cidrBlock (%s) dissassociated from VPC %s\n", cidrBlock, *vpcOut.Vpcs[0].VpcId)
break
}
}
Expand All @@ -291,9 +290,8 @@ func (aws *AWSAPIStruct) CloudPrepareVIPNetWork() error {
if err != nil {
tk.LogIt(tk.LogError, "cidrBlock (%s) associate failed in VPC %s:%s\n", cidrBlock, vpcID, err)
return err
} else {
tk.LogIt(tk.LogError, "cidrBlock (%s) associated to VPC %s\n", cidrBlock, vpcID)
}
tk.LogIt(tk.LogError, "cidrBlock (%s) associated to VPC %s\n", cidrBlock, vpcID)
}

ointfs, err := net.Interfaces()
Expand Down Expand Up @@ -615,7 +613,7 @@ func awsGetNetworkInterface(ctx context.Context, instanceID string, vIP net.IP)
return "", errors.New("not found interface")
}

func awsCreatePrivateIp(ctx context.Context, ni string, vIP net.IP) error {
func awsCreatePrivateIP(ctx context.Context, ni string, vIP net.IP) error {
allowReassign := true
input := &ec2.AssignPrivateIpAddressesInput{
NetworkInterfaceId: &ni,
Expand Down Expand Up @@ -663,10 +661,10 @@ func awsUpdatePrivateIP(vIP net.IP, add bool) error {
return awsDeletePrivateIP(ctx, niID, vIP)
}

return awsCreatePrivateIp(ctx, niID, vIP)
return awsCreatePrivateIP(ctx, niID, vIP)
}

func awsAssociateElasticIp(vIP, eIP net.IP, add bool) error {
func awsAssociateElasticIP(vIP, eIP net.IP, add bool) error {

if intfENIName == "" {
tk.LogIt(tk.LogError, "associate elasticIP: failed to get ENI intf name\n")
Expand All @@ -688,28 +686,28 @@ func awsAssociateElasticIp(vIP, eIP net.IP, add bool) error {
niID = loxiEniID
}

eipID, eipAssociateID, err := awsGetElasticIpId(ctx, eIP)
eipID, eipAssociateID, err := awsGetElasticIPId(ctx, eIP)
if err != nil {
tk.LogIt(tk.LogError, "AWS get elastic IP failed: %v\n", err)
return err
}

tk.LogIt(tk.LogInfo, "AWS adding elastic IP : %s\n", eIP.String())
if !add {
return awsDisassociateElasticIpWithInterface(ctx, eipAssociateID)
return awsDisassociateElasticIPWithInterface(ctx, eipAssociateID)
}
return awsAssociateElasticIpWithInterface(ctx, eipID, niID, vIP)
return awsAssociateElasticIPWithInterface(ctx, eipID, niID, vIP)
}

func awsAssociateElasticIpWithInterface(ctx context.Context, eipID, niID string, privateIP net.IP) error {
func awsAssociateElasticIPWithInterface(ctx context.Context, eipID, niID string, privateIP net.IP) error {
allowReassign := true
input := &ec2.AssociateAddressInput{
AllocationId: &eipID,
NetworkInterfaceId: &niID,
AllowReassociation: &allowReassign,
}
if privateIP != nil {
if err := awsCreatePrivateIp(ctx, niID, privateIP); err != nil {
if err := awsCreatePrivateIP(ctx, niID, privateIP); err != nil {
tk.LogIt(tk.LogError, "AWS create priv IP failed: %s\n", err)
return err
}
Expand All @@ -723,14 +721,14 @@ func awsAssociateElasticIpWithInterface(ctx context.Context, eipID, niID string,
return err
}

func awsDisassociateElasticIpWithInterface(ctx context.Context, eipAssociateID string) error {
func awsDisassociateElasticIPWithInterface(ctx context.Context, eipAssociateID string) error {
_, err := ec2Client.DisassociateAddress(ctx, &ec2.DisassociateAddressInput{
AssociationId: &eipAssociateID,
})
return err
}

func awsGetElasticIpId(ctx context.Context, eIP net.IP) (string, string, error) {
func awsGetElasticIPId(ctx context.Context, eIP net.IP) (string, string, error) {
filterStr := "public-ip"
output, err := ec2Client.DescribeAddresses(ctx, &ec2.DescribeAddressesInput{
Filters: []types.Filter{
Expand All @@ -743,14 +741,14 @@ func awsGetElasticIpId(ctx context.Context, eIP net.IP) (string, string, error)
if len(output.Addresses) <= 0 {
return "", "", fmt.Errorf("not found Elastic IP %s", eIP.String())
}
var allocateId, associateId string
var allocateID, associateID string
if output.Addresses[0].AllocationId != nil {
allocateId = *output.Addresses[0].AllocationId
allocateID = *output.Addresses[0].AllocationId
}
if output.Addresses[0].AssociationId != nil {
associateId = *output.Addresses[0].AssociationId
associateID = *output.Addresses[0].AssociationId
}
return allocateId, associateId, nil
return allocateID, associateID, nil
}

// CloudAPIInit - Initialize the AWS cloud API
Expand Down Expand Up @@ -835,14 +833,15 @@ func awsImdsGetSecurityGroups(ctx context.Context) ([]string, error) {

// CloudUpdatePrivateIP - Update private IP related to an elastic IP
func (aws *AWSAPIStruct) CloudUpdatePrivateIP(vIP net.IP, eIP net.IP, add bool) error {
if vIP.Equal(eIP) { // no use EIP
if vIP.Equal(eIP) {
// Do not use EIP
return awsUpdatePrivateIP(vIP, add)
} else { // use EIP
if err := awsAssociateElasticIp(vIP, eIP, add); err != nil {
return err
}
return awsPrepDFLRoute()
}
// use EIP
if err := awsAssociateElasticIP(vIP, eIP, add); err != nil {
return err
}
return awsPrepDFLRoute()
}

// AWSCloudHookNew - Create AWS specific API hooks
Expand Down
Loading

0 comments on commit ad1b883

Please sign in to comment.