Skip to content

Commit

Permalink
Update README. Add sanitycheck option in dsapp client (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpunish3r authored Dec 11, 2023
1 parent 2c3e610 commit 4710276
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 43 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ List of events (entry types):
- Entry data:
>u8 gasPricePercentage
>u8 isValid // Intrinsic
>u256 stateRoot
>u32 encodedTXLength
>u8[] encodedTX
Expand All @@ -314,8 +315,7 @@ List of events (entry types):
- Entry data:
>u64 blockL2Num
>u256 l2BlockHash
>u256 stateRoot
>u256 imStateRoot
>u256 stateRoot
### Update GER
- Entry type = 4
Expand Down
162 changes: 121 additions & 41 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ const (
StSequencer = 1 // StSequencer sequencer stream type
)

var (
sanityEntry uint64 = 0
sanityBlock uint64 = 0
sanityBookmark uint64 = 0
)

// main runs a datastream server or client
func main() {
// Set log level
Expand Down Expand Up @@ -111,6 +117,11 @@ func main() {
Usage: "entry bookmark to query entry data pointed by it (0..N)",
Value: "none",
},
&cli.BoolFlag{
Name: "sanitycheck",
Usage: "when receiving streaming check entry, bookmark, and block sequence consistency",
Value: false,
},
&cli.StringFlag{
Name: "log",
Usage: "log level (debug|info|warn|error)",
Expand Down Expand Up @@ -198,30 +209,6 @@ func runServer(ctx *cli.Context) error {
// Pause for testing purpose
time.Sleep(time.Duration(sleep) * time.Millisecond)

// ------------------------------------------------------------
// Fake Sequencer data
dataBlockStart := make([]byte, 0)
dataBlockStart = binary.LittleEndian.AppendUint64(dataBlockStart, 101) // nolint:gomnd
dataBlockStart = binary.LittleEndian.AppendUint64(dataBlockStart, 1337) // nolint:gomnd
dataBlockStart = binary.LittleEndian.AppendUint64(dataBlockStart, uint64(time.Now().Unix()))
dataBlockStart = append(dataBlockStart, []byte{10, 11, 12, 13, 14, 15, 16, 17, 10, 11, 12, 13, 14, 15, 16, 17, 10, 11, 12, 13, 14, 15, 16, 17, 10, 11, 12, 13, 14, 15, 16, 17}...)
dataBlockStart = append(dataBlockStart, []byte{20, 21, 22, 23, 24, 20, 21, 22, 23, 24, 20, 21, 22, 23, 24, 20, 21, 22, 23, 24}...)
dataBlockStart = binary.LittleEndian.AppendUint16(dataBlockStart, 5) // nolint:gomnd

dataTx := make([]byte, 0) // nolint:gomnd
dataTx = append(dataTx, 128) // nolint:gomnd
dataTx = append(dataTx, 1) // nolint:gomnd
dataTx = binary.LittleEndian.AppendUint32(dataTx, 5) // nolint:gomnd
dataTx = append(dataTx, []byte{1, 2, 3, 4, 5}...) // nolint:gomnd

dataBlockEnd := make([]byte, 0)
dataBlockEnd = binary.LittleEndian.AppendUint64(dataBlockEnd, 1337) // nolint:gomnd
dataBlockEnd = append(dataBlockEnd, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}...)
dataBlockEnd = append(dataBlockEnd, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}...)
// ------------------------------------------------------------

imark := s.GetHeader().TotalEntries

end := make(chan uint8)

go func(chan uint8) {
Expand All @@ -230,49 +217,45 @@ func runServer(ctx *cli.Context) error {

rand.Seed(time.Now().UnixNano())

init := s.GetHeader().TotalEntries / 4 // nolint:gomnd

// Atomic Operations loop
for n := 1; n <= int(numOpersLoop); n++ {
for n := uint64(0); n < numOpersLoop; n++ {
// Start atomic operation
err = s.StartAtomicOp()
if err != nil {
log.Error(">> App error! StartAtomicOp")
return
}

// Add stream entries:
// Bookmark
bookmark := []byte{0} // nolint:gomnd
bookmark = binary.LittleEndian.AppendUint64(bookmark, imark)

_, err := s.AddStreamBookmark(bookmark)
// Add stream entries (sample):
// 1.Bookmark
_, err := s.AddStreamBookmark(fakeBookmark(init + n))
if err != nil {
log.Errorf(">> App error! AddStreamBookmark: %v", err)
}

// Block Start
entryBlockStart, err := s.AddStreamEntry(EtL2BlockStart, dataBlockStart)
// 2.Block Start
entryBlockStart, err := s.AddStreamEntry(EtL2BlockStart, fakeDataBlockStart(init+n))
if err != nil {
log.Errorf(">> App error! AddStreamEntry type %v: %v", EtL2BlockStart, err)
return
}
// Tx
// 3.Tx
numTx := 1 //rand.Intn(20) + 1
for i := 1; i <= numTx; i++ {
_, err = s.AddStreamEntry(EtL2Tx, dataTx)
_, err = s.AddStreamEntry(EtL2Tx, fakeDataTx())
if err != nil {
log.Errorf(">> App error! AddStreamEntry type %v: %v", EtL2Tx, err)
return
}
}
// Block Start
entryBlockEnd, err := s.AddStreamEntry(EtL2BlockEnd, dataBlockEnd)
// 4.Block End
_, err = s.AddStreamEntry(EtL2BlockEnd, fakeDataBlockEnd(init+n))
if err != nil {
log.Errorf(">> App error! AddStreamEntry type %v: %v", EtL2BlockEnd, err)
return
}

imark = entryBlockEnd + 1

if !testRollback || entryBlockStart%10 != 0 || latestRollback == entryBlockStart {
// Commit atomic operation
err = s.CommitAtomicOp()
Expand Down Expand Up @@ -303,6 +286,40 @@ func runServer(ctx *cli.Context) error {
return nil
}

func fakeBookmark(blockNum uint64) []byte {
bookmark := []byte{0} // nolint:gomnd
bookmark = binary.LittleEndian.AppendUint64(bookmark, blockNum)
return bookmark
}

func fakeDataBlockStart(blockNum uint64) []byte {
dataBlockStart := make([]byte, 0)
dataBlockStart = binary.LittleEndian.AppendUint64(dataBlockStart, 101) // nolint:gomnd
dataBlockStart = binary.LittleEndian.AppendUint64(dataBlockStart, blockNum)
dataBlockStart = binary.LittleEndian.AppendUint64(dataBlockStart, uint64(time.Now().Unix()))
dataBlockStart = append(dataBlockStart, []byte{10, 11, 12, 13, 14, 15, 16, 17, 10, 11, 12, 13, 14, 15, 16, 17, 10, 11, 12, 13, 14, 15, 16, 17, 10, 11, 12, 13, 14, 15, 16, 17}...)
dataBlockStart = append(dataBlockStart, []byte{20, 21, 22, 23, 24, 20, 21, 22, 23, 24, 20, 21, 22, 23, 24, 20, 21, 22, 23, 24}...)
dataBlockStart = binary.LittleEndian.AppendUint16(dataBlockStart, 5) // nolint:gomnd
return dataBlockStart
}

func fakeDataTx() []byte {
dataTx := make([]byte, 0) // nolint:gomnd
dataTx = append(dataTx, 128) // nolint:gomnd
dataTx = append(dataTx, 1) // nolint:gomnd
dataTx = binary.LittleEndian.AppendUint32(dataTx, 5) // nolint:gomnd
dataTx = append(dataTx, []byte{1, 2, 3, 4, 5}...) // nolint:gomnd
return dataTx
}

func fakeDataBlockEnd(blockNum uint64) []byte {
dataBlockEnd := make([]byte, 0)
dataBlockEnd = binary.LittleEndian.AppendUint64(dataBlockEnd, blockNum)
dataBlockEnd = append(dataBlockEnd, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}...)
dataBlockEnd = append(dataBlockEnd, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}...)
return dataBlockEnd
}

// runClient runs a local datastream client and tests its features
func runClient(ctx *cli.Context) error {
// Set log level
Expand All @@ -323,6 +340,7 @@ func runClient(ctx *cli.Context) error {
queryHeader := ctx.Bool("header")
queryEntry := ctx.String("entry")
queryBookmark := ctx.String("bookmark")
sanityCheck := ctx.Bool("sanitycheck")

// Create client
c, err := datastreamer.NewClient(server, StSequencer)
Expand All @@ -331,7 +349,11 @@ func runClient(ctx *cli.Context) error {
}

// Set process entry callback function
c.SetProcessEntryFunc(printEntryNum)
if !sanityCheck {
c.SetProcessEntryFunc(printEntryNum)
} else {
c.SetProcessEntryFunc(checkEntryBlockSanity)
}

// Start client (connect to the server)
err = c.Start()
Expand Down Expand Up @@ -441,6 +463,64 @@ func printEntryNum(e *datastreamer.FileEntry, c *datastreamer.StreamClient, s *d
return nil
}

// checkEntryBlockSanity checks entry, bookmark, and block sequence consistency
func checkEntryBlockSanity(e *datastreamer.FileEntry, c *datastreamer.StreamClient, s *datastreamer.StreamServer) error {
// Log work in progress
if e.Number%100000 == 0 {
log.Infof("Sanity check entry #%d...", e.Number)
}

// Sanity check for entry sequence
if sanityEntry > 0 {
if e.Number != sanityEntry {
log.Infof("SANITY CHECK failed: Entry received[%d] | Entry expected[%d]", e.Number, sanityEntry)
return errors.New("sanity check failed for entry sequence")
}
} else {
if e.Number != 0 {
log.Infof("SANITY CHECK failed: Entry received[%d] | Entry expected[0]", e.Number)
return errors.New("sanity check failed for entry sequence")
}
}
sanityEntry++

// Sanity check for block sequence
if e.Type == EtL2BlockStart {
blockNum := binary.LittleEndian.Uint64(e.Data[8:16])
if sanityBlock > 0 {
if blockNum != sanityBlock {
log.Infof("SANITY CHECK failed: Block received[%d] | Block expected[%d]", blockNum, sanityBlock)
return errors.New("sanity check failed for block sequence")
}
} else {
if blockNum != 0 {
log.Infof("SANITY CHECK failed: Block received[%d] | Block expected[0]", blockNum)
return errors.New("sanity check failed for block sequence")
}
}
sanityBlock++
}

// Sanity check for bookmarks
if e.Type == datastreamer.EtBookmark {
bookmarkNum := binary.LittleEndian.Uint64(e.Data[1:9])
if sanityBookmark > 0 {
if bookmarkNum != sanityBookmark {
log.Infof("SANITY CHECK failed: Bookmark received[%d] | Bookmark expected[%d]", bookmarkNum, sanityBookmark)
return errors.New("sanity check failed for bookmark sequence")
}
} else {
if bookmarkNum != 0 {
log.Infof("SANITY CHECK failed: Bookmark received[%d] | Bookmark expected[0]", bookmarkNum)
return errors.New("sanity check failed for bookmark sequence")
}
}
sanityBookmark++
}

return nil
}

// runRelay runs a local datastream relay
func runRelay(ctx *cli.Context) error {
// Set log level
Expand Down

0 comments on commit 4710276

Please sign in to comment.