diff --git a/pkg/loxinet/dpbroker.go b/pkg/loxinet/dpbroker.go index 3865c58d..3faed762 100644 --- a/pkg/loxinet/dpbroker.go +++ b/pkg/loxinet/dpbroker.go @@ -18,13 +18,14 @@ package loxinet import ( "fmt" - cmn "github.com/loxilb-io/loxilb/common" - tk "github.com/loxilb-io/loxilib" "net" "os" "runtime/debug" "sync" "time" + + cmn "github.com/loxilb-io/loxilb/common" + tk "github.com/loxilb-io/loxilib" ) // man names constants @@ -41,10 +42,6 @@ const ( MapNameFw4 = "FW4" ) -const ( - UseRPCPeer = false -) - // error codes const ( DpErrBase = iota - 103000 @@ -451,8 +448,6 @@ type DpHookInterface interface { DpCtDel(w *DpCtInfo) int DpSockVIPAdd(w *SockVIPDpWorkQ) int DpSockVIPDel(w *SockVIPDpWorkQ) int - DpCnodeAdd(w *PeerDpWorkQ) int - DpCnodeDel(w *PeerDpWorkQ) int DpTableGC() DpCtGetAsync() DpGetLock() @@ -518,8 +513,6 @@ func (dp *DpH) DpXsyncRPC(op DpSyncOpT, arg interface{}) int { var ret int var err error - return 0 - dp.SyncMtx.Lock() defer dp.SyncMtx.Unlock() @@ -775,35 +768,27 @@ func (dp *DpH) DpWorkOnSockVIP(vsWq *SockVIPDpWorkQ) DpRetT { // DpWorkOnPeerOp - routine to work on a peer request for clustering func (dp *DpH) DpWorkOnPeerOp(pWq *PeerDpWorkQ) DpRetT { if pWq.Work == DpCreate { - if UseRPCPeer { - var newPeer DpPeer - for _, pe := range dp.Peers { - if pe.Peer.Equal(pWq.PeerIP) { - return DpCreateErr - } + var newPeer DpPeer + for _, pe := range dp.Peers { + if pe.Peer.Equal(pWq.PeerIP) { + return DpCreateErr } - newPeer.Peer = pWq.PeerIP - dp.Peers = append(dp.Peers, newPeer) - tk.LogIt(tk.LogInfo, "Added cluster-peer %s\n", newPeer.Peer.String()) - return 0 - } else { - return dp.DpHooks.DpCnodeAdd(pWq) } + newPeer.Peer = pWq.PeerIP + dp.Peers = append(dp.Peers, newPeer) + tk.LogIt(tk.LogInfo, "Added cluster-peer %s\n", newPeer.Peer.String()) + return 0 } else if pWq.Work == DpRemove { - if UseRPCPeer { - for idx := range dp.Peers { - pe := &dp.Peers[idx] - if pe.Peer.Equal(pWq.PeerIP) { - if pe.Client != nil { - dp.RPC.RPCHooks.RPCClose(pe) - } - dp.Peers = append(dp.Peers[:idx], dp.Peers[idx+1:]...) - tk.LogIt(tk.LogInfo, "Deleted cluster-peer %s\n", pWq.PeerIP.String()) - return 0 + for idx := range dp.Peers { + pe := &dp.Peers[idx] + if pe.Peer.Equal(pWq.PeerIP) { + if pe.Client != nil { + dp.RPC.RPCHooks.RPCClose(pe) } + dp.Peers = append(dp.Peers[:idx], dp.Peers[idx+1:]...) + tk.LogIt(tk.LogInfo, "Deleted cluster-peer %s\n", pWq.PeerIP.String()) + return 0 } - } else { - return dp.DpHooks.DpCnodeDel(pWq) } } diff --git a/pkg/loxinet/dpebpf_linux.go b/pkg/loxinet/dpebpf_linux.go index 3ac62691..40e82308 100644 --- a/pkg/loxinet/dpebpf_linux.go +++ b/pkg/loxinet/dpebpf_linux.go @@ -268,30 +268,14 @@ func DpEbpfSetLogLevel(logLevel tk.LogLevelT) { } // DpEbpfInit - initialize the ebpf dp subsystem -func DpEbpfInit(clusterNodes string, rssEn, egrHooks, localSockPolicy, sockMapEn bool, nodeNum int, disBPF bool, logLevel tk.LogLevelT) *DpEbpfH { +func DpEbpfInit(clusterEn, rssEn, egrHooks, localSockPolicy, sockMapEn bool, nodeNum int, disBPF bool, logLevel tk.LogLevelT) *DpEbpfH { var cfg C.struct_ebpfcfg - //cNodes := strings.Split(clusterNodes, ",") - //for i, cNode := range cNodes { - // addr := net.ParseIP(cNode) - // if addr == nil { - // continue - // } - // if utils.IsIPHostAddr(cNode) { - // continue - // } - // if i == 0 { - // cfg.cluster1 = C.CString(cNode) - // } else if i == 1 { - // cfg.cluster2 = C.CString(cNode) - // } - //} - - //if len(clusterEn) > 0 { - // cfg.have_mtrace = 1 - //} else { - // cfg.have_mtrace = 0 - //} + if clusterEn { + cfg.have_mtrace = 1 + } else { + cfg.have_mtrace = 0 + } if egrHooks { cfg.egr_hooks = 1 } else { @@ -1080,7 +1064,7 @@ func DpLBRuleMod(w *LBDpWorkQ) int { nxfa.inactive = 1 } - dat.nxfrm = C.ushort(len(w.endPoints)) + dat.nxfrm = C.uchar(len(w.endPoints)) if w.CsumDis { dat.cdis = 1 } else { @@ -1943,38 +1927,6 @@ func (e *DpEbpfH) DpSockVIPDel(w *SockVIPDpWorkQ) int { return ec } -// DpCnodeAdd - routine to work on adding a cnode -func (e *DpEbpfH) DpCnodeAdd(w *PeerDpWorkQ) int { - cnode := w.PeerIP.String() - - cnodeStr := C.CString(cnode) - defer C.free(unsafe.Pointer(cnodeStr)) - - ec := int(C.llb_add_cnode(cnodeStr)) - if ec != 0 { - *w.Status = DpCreateErr - } else { - *w.Status = 0 - } - return ec -} - -// DpCnodeDel - routine to work on deleting a cnode -func (e *DpEbpfH) DpCnodeDel(w *PeerDpWorkQ) int { - cnode := w.PeerIP.String() - - cnodeStr := C.CString(cnode) - defer C.free(unsafe.Pointer(cnodeStr)) - - ec := int(C.llb_delete_cnode(cnodeStr)) - if ec != 0 { - *w.Status = DpRemoveErr - } else { - *w.Status = 0 - } - return ec -} - //export goMapNotiHandler func goMapNotiHandler(m *mapNoti) { diff --git a/pkg/loxinet/loxinet.go b/pkg/loxinet/loxinet.go index 167d6781..5421c30c 100644 --- a/pkg/loxinet/loxinet.go +++ b/pkg/loxinet/loxinet.go @@ -292,7 +292,7 @@ func loxiNetInit() { RunCommand(MkMountCG2, false) } // Initialize the ebpf datapath subsystem - mh.dpEbpf = DpEbpfInit(opts.Opts.ClusterNodes, mh.rssEn, mh.eHooks, mh.lSockPolicy, mh.sockMapEn, mh.self, mh.disBPF, logLevel) + mh.dpEbpf = DpEbpfInit(clusterMode, mh.rssEn, mh.eHooks, mh.lSockPolicy, mh.sockMapEn, mh.self, mh.disBPF, logLevel) mh.dp = DpBrokerInit(mh.dpEbpf, rpcMode) // Initialize the security zone subsystem diff --git a/pkg/loxinet/rules.go b/pkg/loxinet/rules.go index 52e6bea2..b4905817 100644 --- a/pkg/loxinet/rules.go +++ b/pkg/loxinet/rules.go @@ -78,8 +78,7 @@ const ( // constants const ( - MaxLBEndPoints = 1500 - MaxLBEndPointsRR = 32 + MaxLBEndPoints = 24 DflLbaInactiveTries = 2 // Default number of inactive tries before LB arm is turned off MaxDflLbaInactiveTries = 100 // Max number of inactive tries before LB arm is turned off DflLbaCheckTimeout = 10 // Default timeout for checking LB arms @@ -91,7 +90,7 @@ const ( LbMaxInactiveTimeout = 24 * 3600 // Maximum inactive timeout for established sessions MaxEndPointCheckers = 4 // Maximum helpers to check endpoint health EndPointCheckerDuration = 2 // Duration at which ep-helpers will run - MaxEndPointSweeps = 40 // Maximum end-point sweeps per round + MaxEndPointSweeps = 20 // Maximum end-point sweeps per round VIPSweepDuration = 30 // Duration of periodic VIP maintenance DefaultPersistTimeOut = 10800 // Default persistent LB session timeout SnatFwMark = 0x80000000 // Snat Marker @@ -911,7 +910,7 @@ func (R *RuleH) modNatEpHost(r *ruleEnt, endpoints []ruleLBEp, doAddOp bool, liv pType = HostProbeConnectTCP pPort = nep.xPort } else if r.tuples.l4Prot.val == 17 { - //pType = HostProbeConnectUDP + pType = HostProbeConnectUDP pType = HostProbeConnectTCP // FIXME pPort = nep.xPort } else if r.tuples.l4Prot.val == 1 { @@ -1261,8 +1260,7 @@ func (R *RuleH) syncEPHostState2Rule(rule *ruleEnt, checkNow bool) bool { if rule.tuples.l4Prot.val == 6 { sType = HostProbeConnectTCP } else if rule.tuples.l4Prot.val == 17 { - //sType = HostProbeConnectUDP - sType = HostProbeConnectTCP // FIXME + sType = HostProbeConnectUDP } else if rule.tuples.l4Prot.val == 1 { sType = HostProbePing } else if rule.tuples.l4Prot.val == 132 { @@ -1574,12 +1572,6 @@ func (R *RuleH) AddLbRule(serv cmn.LbServiceArg, servSecIPs []cmn.LbSecIPArg, al return RuleEpCountErr, errors.New("endpoints-range error") } - if (serv.Sel == cmn.LbSelRr || serv.Sel == cmn.LbSelLeastConnections || - serv.Sel == cmn.LbSelPrio || serv.Sel == cmn.LbSelN2 || serv.Sel == cmn.LbSelN3) && - len(servEndPoints) > MaxLBEndPointsRR { - return RuleEpCountErr, errors.New("endpoints-range1 error") - } - // Validate persist timeout if serv.Sel == cmn.LbSelRrPersist { if serv.PersistTimeout == 0 || serv.PersistTimeout > 24*60*60 { @@ -2930,19 +2922,19 @@ func (r *ruleEnt) LB2DP(work DpWorkT) int { if at.sel == cmn.LbSelPrio { j := 0 k := 0 - var small [MaxLBEndPointsRR]int - var neps [MaxLBEndPointsRR]ruleLBEp + var small [MaxLBEndPoints]int + var neps [MaxLBEndPoints]ruleLBEp for i, ep := range at.endPoints { if ep.inActiveEP { continue } oEp := &at.endPoints[i] - sw := (int(ep.weight) * MaxLBEndPointsRR) / 100 + sw := (int(ep.weight) * MaxLBEndPoints) / 100 if sw == 0 { small[k] = i k++ } - for x := 0; x < sw && j < MaxLBEndPointsRR; x++ { + for x := 0; x < sw && j < MaxLBEndPoints; x++ { neps[j].xIP = oEp.xIP neps[j].rIP = oEp.rIP neps[j].xPort = oEp.xPort @@ -2955,12 +2947,12 @@ func (r *ruleEnt) LB2DP(work DpWorkT) int { j++ } } - if j < MaxLBEndPointsRR { + if j < MaxLBEndPoints { v := 0 if k == 0 { k = len(at.endPoints) } - for j < MaxLBEndPointsRR { + for j < MaxLBEndPoints { idx := small[v%k] oEp := &at.endPoints[idx] neps[j].xIP = oEp.xIP diff --git a/pkg/loxinet/xsync_server.go b/pkg/loxinet/xsync_server.go index d4e9ddee..98d070e6 100644 --- a/pkg/loxinet/xsync_server.go +++ b/pkg/loxinet/xsync_server.go @@ -26,8 +26,6 @@ import ( "net/rpc" "os" "runtime/debug" - "time" - opts "github.com/loxilb-io/loxilb/options" tk "github.com/loxilb-io/loxilib" "google.golang.org/grpc" @@ -214,10 +212,6 @@ func LoxiXsyncMain(mode string) { return } - for { - time.Sleep(1 * time.Second) - } - // Stack trace logger defer func() { if e := recover(); e != nil {