Skip to content

Commit

Permalink
Add context when programming routes and other fixes
Browse files Browse the repository at this point in the history
* Add 1 packet toleration in BGP-triggered GUE test.
* Use batch on a list node for static route monitoring.

DO NOT SUBMIT until openconfig/ygnmi#132 is
merged.
  • Loading branch information
wenovus committed Oct 16, 2023
1 parent 7a556e5 commit c67aa6c
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 49 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,5 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)

replace github.com/openconfig/ygnmi => github.com/openconfig/ygnmi v0.8.11-0.20231014030955-f6f81681135b
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -965,8 +965,8 @@ github.com/openconfig/lemming/operator v0.2.0/go.mod h1:LKgEXSR5VK2CAeh2uKijKAXF
github.com/openconfig/ondatra v0.2.7 h1:abEI0qO4Q/BrXV/qi2unXn8dpkZFp9DF8rw6kZod5/E=
github.com/openconfig/ondatra v0.2.7/go.mod h1:Vfwg/PvsupSJOxTxpQvF078MyHxJGvff3jIr4H0vgzs=
github.com/openconfig/testt v0.0.0-20220311054427-efbb1a32ec07 h1:X631iD/B0ximGFb5P9LY5wHju4SiedxUhc5UZEo7VSw=
github.com/openconfig/ygnmi v0.8.7 h1:8K87+VztXhHqsU6/OYRnY/l/bGqFk+qU61mhhdxMCYo=
github.com/openconfig/ygnmi v0.8.7/go.mod h1:7up6qc9l9G4+Cfo37gzO0D7+2g4yqyW+xzh4vYsYTEE=
github.com/openconfig/ygnmi v0.8.11-0.20231014030955-f6f81681135b h1:pd2Av3BgtNdG4bUcNiSZFpFEnBZtWDou2xoGv5czeyA=
github.com/openconfig/ygnmi v0.8.11-0.20231014030955-f6f81681135b/go.mod h1:WXnzNls/5ea6P9wx883ShLi5T/xa7sQjxn6SOH4kVwA=
github.com/openconfig/ygot v0.6.0/go.mod h1:o30svNf7O0xK+R35tlx95odkDmZWS9JyWWQSmIhqwAs=
github.com/openconfig/ygot v0.10.4/go.mod h1:oCQNdXnv7dWc8scTDgoFkauv1wwplJn5HspHcjlxSAQ=
github.com/openconfig/ygot v0.13.2/go.mod h1:kJN0yCXIH07dOXvNBEFm3XxXdnDD5NI6K99tnD5x49c=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,8 @@ func testTrafficAndEncap(t *testing.T, otg *otg.OTG, startingIP string, v6Traffi
}

var expectedPacketCounter int
const wantPacketN = 20
for i := 0; i != wantPacketN; i++ {
const packetN = 20
for i := 0; i != packetN; i++ {
pkt, err := ps.NextPacket()
if err != nil {
t.Fatalf("error reading next packet: %v", err)
Expand Down Expand Up @@ -587,7 +587,7 @@ func testTrafficAndEncap(t *testing.T, otg *otg.OTG, startingIP string, v6Traffi
expectedPacketCounter++
}

if expectedPacketCounter < wantPacketN {
if wantPacketN := packetN - 1; expectedPacketCounter < wantPacketN {
t.Errorf("Got less than %d expected packets: %v", wantPacketN, expectedPacketCounter)
} else {
t.Logf("Got %d expected packets.", expectedPacketCounter)
Expand Down
5 changes: 3 additions & 2 deletions repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1611,8 +1611,9 @@ def go_repositories():
go_repository(
name = "com_github_openconfig_ygnmi",
importpath = "github.com/openconfig/ygnmi",
sum = "h1:8K87+VztXhHqsU6/OYRnY/l/bGqFk+qU61mhhdxMCYo=",
version = "v0.8.7",
replace = "github.com/openconfig/ygnmi",
sum = "h1:pd2Av3BgtNdG4bUcNiSZFpFEnBZtWDou2xoGv5czeyA=",
version = "v0.8.11-0.20231014030955-f6f81681135b",
)
go_repository(
name = "com_github_openconfig_ygot",
Expand Down
54 changes: 27 additions & 27 deletions sysrib/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,24 +112,24 @@ type dplane struct {
}

// programRoute programs the route in the dataplane, returning an error on failure.
func (d *dplane) programRoute(r *ResolvedRoute) error {
func (d *dplane) programRoute(ctx context.Context, r *ResolvedRoute) error {
log.V(1).Infof("sysrib: programming resolved route: %+v", r)
rr, err := resolvedRouteToRouteRequest(r)
if err != nil {
return err
}
_, err = ygnmi.Replace(context.TODO(), d.Client, handlers.RouteQuery(rr.GetPrefix().GetVrfId(), rr.GetPrefix().GetCidr()), rr, ygnmi.WithSetFallbackEncoding())
_, err = ygnmi.Replace(ctx, d.Client, handlers.RouteQuery(rr.GetPrefix().GetVrfId(), rr.GetPrefix().GetCidr()), rr, ygnmi.WithSetFallbackEncoding())
return err
}

// deprogramRoute de-programs the route in the dataplane, returning an error on failure.
func (d *dplane) deprogramRoute(r *ResolvedRoute) error {
func (d *dplane) deprogramRoute(ctx context.Context, r *ResolvedRoute) error {
log.V(1).Infof("sysrib: deprogramming newly unresolved route: %+v", r)
rr, err := resolvedRouteToRouteRequest(r)
if err != nil {
return err
}
_, err = ygnmi.Delete(context.TODO(), d.Client, handlers.RouteQuery(rr.GetPrefix().GetVrfId(), rr.GetPrefix().GetCidr()))
_, err = ygnmi.Delete(ctx, d.Client, handlers.RouteQuery(rr.GetPrefix().GetVrfId(), rr.GetPrefix().GetCidr()))
return err
}

Expand Down Expand Up @@ -239,19 +239,19 @@ func (s *Server) monitorConnectedIntfs(yclient *ygnmi.Client) error {
if intf.Enabled != nil {
if intf.Ifindex != nil {
ifindex := intf.GetIfindex()
s.setInterface(name, int32(ifindex), intf.GetEnabled())
s.setInterface(context.Background(), name, int32(ifindex), intf.GetEnabled())
// TODO(wenbli): Support other VRFs.
if subintf := intf.GetSubinterface(0); subintf != nil {
for _, addr := range subintf.GetOrCreateIpv4().Address {
if addr.Ip != nil && addr.PrefixLength != nil {
if err := s.addInterfacePrefix(name, int32(ifindex), fmt.Sprintf("%s/%d", addr.GetIp(), addr.GetPrefixLength()), fakedevice.DefaultNetworkInstance); err != nil {
if err := s.addInterfacePrefix(context.Background(), name, int32(ifindex), fmt.Sprintf("%s/%d", addr.GetIp(), addr.GetPrefixLength()), fakedevice.DefaultNetworkInstance); err != nil {
log.Warningf("adding interface prefix failed: %v", err)
}
}
}
for _, addr := range subintf.GetOrCreateIpv6().Address {
if addr.Ip != nil && addr.PrefixLength != nil {
if err := s.addInterfacePrefix(name, int32(ifindex), fmt.Sprintf("%s/%d", addr.GetIp(), addr.GetPrefixLength()), fakedevice.DefaultNetworkInstance); err != nil {
if err := s.addInterfacePrefix(context.Background(), name, int32(ifindex), fmt.Sprintf("%s/%d", addr.GetIp(), addr.GetPrefixLength()), fakedevice.DefaultNetworkInstance); err != nil {
log.Warningf("adding interface prefix failed: %v", err)
}
}
Expand Down Expand Up @@ -303,7 +303,7 @@ func (s *Server) monitorBGPGUEPolicies(yclient *ygnmi.Client) error {
policiesFound[prefix] = true
if existingPolicy := s.bgpGUEPolicies[prefix]; policy != existingPolicy {
log.V(1).Infof("Adding new/updated BGP GUE policy: %s: %v", prefix, policy)
if err := s.setGUEPolicy(prefix, policy); err != nil {
if err := s.setGUEPolicy(context.Background(), prefix, policy); err != nil {
log.Errorf("Failed while setting BGP GUE Policy: %v", err)
} else {
s.bgpGUEPolicies[prefix] = policy
Expand Down Expand Up @@ -360,7 +360,7 @@ func (s *Server) monitorBGPGUEPolicies(yclient *ygnmi.Client) error {
for prefix := range s.bgpGUEPolicies {
if _, ok := policiesFound[prefix]; !ok {
log.Infof("Deleting incomplete/non-existent policy: %s", prefix)
if err := s.deleteGUEPolicy(prefix); err != nil {
if err := s.deleteGUEPolicy(context.Background(), prefix); err != nil {
log.Errorf("Failed while deleting BGP GUE Policy: %v", err)
} else {
delete(s.bgpGUEPolicies, prefix)
Expand Down Expand Up @@ -545,7 +545,7 @@ func resolvedRouteToRouteRequest(r *ResolvedRoute) (*dpb.Route, error) {

// ResolveAndProgramDiff walks through each prefix in the RIB, resolving it and
// programs the forwarding plane.
func (s *Server) ResolveAndProgramDiff() error {
func (s *Server) ResolveAndProgramDiff(ctx context.Context) error {
log.Info("Recalculating resolved RIB")
if debugRIB {
defer s.rib.PrintRIB()
Expand All @@ -557,18 +557,18 @@ func (s *Server) ResolveAndProgramDiff() error {
newResolvedRoutes := map[RouteKey]*Route{}
for niName, ni := range s.rib.NI {
for it := ni.IPV4.Iterate(); it.Next(); {
s.resolveAndProgramDiffAux(niName, ni, it.Address().String(), newResolvedRoutes)
s.resolveAndProgramDiffAux(ctx, niName, ni, it.Address().String(), newResolvedRoutes)
}
for it := ni.IPV6.Iterate(); it.Next(); {
s.resolveAndProgramDiffAux(niName, ni, it.Address().String(), newResolvedRoutes)
s.resolveAndProgramDiffAux(ctx, niName, ni, it.Address().String(), newResolvedRoutes)
}
}

// Deprogram newly unresolved routes.
for routeKey, rr := range s.ProgrammedRoutes() {
if _, ok := newResolvedRoutes[routeKey]; !ok {
log.V(1).Infof("Deleting route %s", rr.RouteKey)
if err := s.dataplane.deprogramRoute(rr); err != nil {
if err := s.dataplane.deprogramRoute(ctx, rr); err != nil {
log.Warningf("failed to deprogram route %+v: %v", rr, err)
continue
}
Expand All @@ -593,7 +593,7 @@ func (s *Server) ResolveAndProgramDiff() error {
// It carries out the following functionalities:
// - Resolve a single route specified by prefix and program if it's different.
// - Populate the resolved route into newResolvedRoutes.
func (s *Server) resolveAndProgramDiffAux(niName string, ni *NIRIB, prefix string, newResolvedRoutes map[RouteKey]*Route) {
func (s *Server) resolveAndProgramDiffAux(ctx context.Context, niName string, ni *NIRIB, prefix string, newResolvedRoutes map[RouteKey]*Route) {
log.V(1).Infof("Iterating at prefix %v (v4 has %d tags) (v6 has %d tags)", prefix, ni.IPV4.CountTags(), ni.IPV6.CountTags())
_, pfx, err := net.ParseCIDR(prefix)
if err != nil {
Expand Down Expand Up @@ -631,7 +631,7 @@ func (s *Server) resolveAndProgramDiffAux(niName string, ni *NIRIB, prefix strin
switch {
case routeIsResolved && !reflect.DeepEqual(currentRoute, rr):
log.V(1).Infof("(-currentRoute, +resolvedRoute):\n%s", cmp.Diff(currentRoute, rr))
if err := s.dataplane.programRoute(rr); err != nil {
if err := s.dataplane.programRoute(ctx, rr); err != nil {
log.Warningf("failed to program route %+v: %v", rr, err)
return
}
Expand Down Expand Up @@ -663,7 +663,7 @@ func (s *Server) ProgrammedRoutes() map[RouteKey]*ResolvedRoute {
}

// SetRoute implements ROUTE_ADD and ROUTE_DELETE
func (s *Server) SetRoute(_ context.Context, req *sysribpb.SetRouteRequest) (*sysribpb.SetRouteResponse, error) {
func (s *Server) SetRoute(ctx context.Context, req *sysribpb.SetRouteRequest) (*sysribpb.SetRouteResponse, error) {
pfx, err := prefixString(req.Prefix)
if err != nil {
return nil, err
Expand All @@ -684,7 +684,7 @@ func (s *Server) SetRoute(_ context.Context, req *sysribpb.SetRouteRequest) (*sy
// TODO(wenbli): Check if recursive resolution is an infinite recursion. This happens if there is a cycle.

niName := vrfIDToNiName(req.GetVrfId())
if err := s.setRoute(niName, &Route{
if err := s.setRoute(ctx, niName, &Route{
// TODO(wenbli): check if pfx has to be canonical or does it tolerate it: i.e. 1.1.1.0/24 instead of 1.1.1.1/24
Prefix: pfx,
NextHops: nexthops,
Expand All @@ -709,21 +709,21 @@ func (s *Server) SetRoute(_ context.Context, req *sysribpb.SetRouteRequest) (*sy
}

// setRoute adds/deletes a route from the RIB manager.
func (s *Server) setRoute(niName string, route *Route, isDelete bool) error {
func (s *Server) setRoute(ctx context.Context, niName string, route *Route, isDelete bool) error {
if err := s.rib.setRoute(niName, route, isDelete); err != nil {
return fmt.Errorf("error while adding route to sysrib: %v", err)
}

if err := s.ResolveAndProgramDiff(); err != nil {
if err := s.ResolveAndProgramDiff(ctx); err != nil {
return fmt.Errorf("error while resolving sysrib: %v", err)
}
return nil
}

// addInterfacePrefix adds a prefix to the sysrib as a connected route.
func (s *Server) addInterfacePrefix(name string, ifindex int32, prefix string, niName string) error {
func (s *Server) addInterfacePrefix(ctx context.Context, name string, ifindex int32, prefix string, niName string) error {
log.V(1).Infof("Adding interface prefix: intf %s, idx %d, prefix %s, ni %s", name, ifindex, prefix, niName)
return s.setRoute(niName, &Route{
return s.setRoute(ctx, niName, &Route{
Prefix: prefix,
Connected: &Interface{
Name: name,
Expand All @@ -737,7 +737,7 @@ func (s *Server) addInterfacePrefix(name string, ifindex int32, prefix string, n
}

// setInterface responds to INTERFACE_UP/INTERFACE_DOWN messages from the dataplane.
func (s *Server) setInterface(name string, ifindex int32, enabled bool) error {
func (s *Server) setInterface(ctx context.Context, name string, ifindex int32, enabled bool) error {
log.V(1).Infof("Setting interface %q(%d) to enabled=%v", name, ifindex, enabled)
s.interfacesMu.Lock()
s.interfaces[Interface{
Expand All @@ -746,33 +746,33 @@ func (s *Server) setInterface(name string, ifindex int32, enabled bool) error {
}] = enabled
s.interfacesMu.Unlock()

return s.ResolveAndProgramDiff()
return s.ResolveAndProgramDiff(ctx)
}

// TODO(wenbli): Do we need to handle interface deletion?
// This is not required in the MVP since basic tests will just need to enable/disable interfaces.

// setGUEPolicy adds a new GUE policy and triggers resolved route
// computation and programming.
func (s *Server) setGUEPolicy(prefix string, policy GUEPolicy) error {
func (s *Server) setGUEPolicy(ctx context.Context, prefix string, policy GUEPolicy) error {
if err := s.rib.SetGUEPolicy(prefix, policy); err != nil {
return fmt.Errorf("error while adding route to sysrib: %v", err)
}

if err := s.ResolveAndProgramDiff(); err != nil {
if err := s.ResolveAndProgramDiff(ctx); err != nil {
return fmt.Errorf("error while resolving sysrib: %v", err)
}
return nil
}

// deleteGUEPolicy adds a new GUE policy and triggers resolved route
// computation and programming.
func (s *Server) deleteGUEPolicy(prefix string) error {
func (s *Server) deleteGUEPolicy(ctx context.Context, prefix string) error {
if _, err := s.rib.DeleteGUEPolicy(prefix); err != nil {
return fmt.Errorf("error while adding route to sysrib: %v", err)
}

if err := s.ResolveAndProgramDiff(); err != nil {
if err := s.ResolveAndProgramDiff(ctx); err != nil {
return fmt.Errorf("error while resolving sysrib: %v", err)
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions sysrib/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2529,10 +2529,10 @@ func TestBGPGUEPolicy(t *testing.T) {
}
}
for prefix, policy := range tt.inAddPolicies {
s.setGUEPolicy(prefix, policy)
s.setGUEPolicy(context.Background(), prefix, policy)
}
for _, prefix := range tt.inDeletePolicies {
s.deleteGUEPolicy(prefix)
s.deleteGUEPolicy(context.Background(), prefix)
}

routes, err := ygnmi.GetAll(context.Background(), c, routesQuery)
Expand Down
5 changes: 3 additions & 2 deletions sysrib/server_zapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package sysrib

import (
"context"
"fmt"
"net"

Expand Down Expand Up @@ -83,13 +84,13 @@ func convertToZAPIRoute(routeKey RouteKey, route *Route, rr *ResolvedRoute) (*ze
}

// setZebraRoute calls setRoute after reformatting a zebra-formatted input route.
func (s *Server) setZebraRoute(niName string, zroute *zebra.IPRouteBody) error {
func (s *Server) setZebraRoute(ctx context.Context, niName string, zroute *zebra.IPRouteBody) error {
if s == nil {
return fmt.Errorf("cannot add route to nil sysrib server")
}
log.V(1).Infof("setZebraRoute: %+v", *zroute)
route := convertZebraRoute(niName, zroute)
return s.setRoute(niName, route, false)
return s.setRoute(ctx, niName, route, false)
}

// convertZebraRoute converts a zebra route to a Sysrib route.
Expand Down
18 changes: 8 additions & 10 deletions sysrib/static.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ func convertStaticRoute(sroute *oc.NetworkInstance_Protocol_Static) *Route {
log.Warningf("sysrib: Unhandled static route nexthop type (%T): %v", nh, nh)
}
}
if len(nexthops) == 0 {
return nil
}
return &Route{
Prefix: *sroute.Prefix,
NextHops: nexthops,
Expand All @@ -60,9 +57,11 @@ func convertStaticRoute(sroute *oc.NetworkInstance_Protocol_Static) *Route {
// configuration changes.
// It returns an error if there is an error before monitoring can begin.
func (s *Server) monitorStaticRoutes(yclient *ygnmi.Client) error {
b := &ocpath.Batch{}
staticroot := ocpath.Root().NetworkInstance(fakedevice.DefaultNetworkInstance).Protocol(oc.PolicyTypes_INSTALL_PROTOCOL_TYPE_STATIC, fakedevice.StaticRoutingProtocol)
staticpath := staticroot.StaticAny()
staticpathMap := staticroot.StaticMap()

b := ygnmi.NewBatch[map[string]*oc.NetworkInstance_Protocol_Static](staticpathMap.Config())
b.AddPaths(
staticpath.NextHopAny().NextHop().Config().PathStruct(),
// TODO(wenbli): Handle these paths.
Expand All @@ -75,19 +74,18 @@ func (s *Server) monitorStaticRoutes(yclient *ygnmi.Client) error {
staticRouteWatcher := ygnmi.Watch(
context.Background(),
yclient,
b.Config(),
func(root *ygnmi.Value[*oc.Root]) error {
rootVal, ok := root.Val()
b.Query(),
func(static *ygnmi.Value[map[string]*oc.NetworkInstance_Protocol_Static]) error {
staticMap, ok := static.Val()
if !ok {
return ygnmi.Continue
}
staticp := rootVal.GetOrCreateNetworkInstance(fakedevice.DefaultNetworkInstance).GetOrCreateProtocol(oc.PolicyTypes_INSTALL_PROTOCOL_TYPE_STATIC, fakedevice.StaticRoutingProtocol)
for _, sroute := range staticp.Static {
for _, sroute := range staticMap {
if sroute == nil || sroute.Prefix == nil {
continue
}
if route := convertStaticRoute(sroute); route != nil {
if err := s.setRoute(fakedevice.DefaultNetworkInstance, route, false); err != nil {
if err := s.setRoute(context.Background(), fakedevice.DefaultNetworkInstance, route, false); err != nil {
log.Warningf("Failed to add static route: %v", err)
} else {
gnmiclient.Replace(context.Background(), yclient, staticroot.Static(sroute.GetPrefix()).State(), sroute)
Expand Down
3 changes: 2 additions & 1 deletion sysrib/zapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
package sysrib

import (
"context"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -260,7 +261,7 @@ func (c *Client) HandleRequest(conn net.Conn, vrfID uint32) {
"Topic": "Sysrib",
"Message": m,
})
if err := c.zServer.sysrib.setZebraRoute(vrfIDToNiName(vrfID), m.Body.(*zebra.IPRouteBody)); err != nil {
if err := c.zServer.sysrib.setZebraRoute(context.Background(), vrfIDToNiName(vrfID), m.Body.(*zebra.IPRouteBody)); err != nil {
topicLogger.Warn(fmt.Sprintf("Could not add route to sysrib: %v", err),
bgplog.Fields{
"Topic": "Sysrib",
Expand Down

0 comments on commit c67aa6c

Please sign in to comment.