Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexserver: add debug endpoint for deleting repository shards #485

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
99 changes: 99 additions & 0 deletions cmd/zoekt-sourcegraph-indexserver/cleanup.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package main

import (
"errors"
"fmt"
"io/fs"
"log"
"os"
"os/exec"
Expand Down Expand Up @@ -172,6 +174,38 @@ func cleanup(indexDir string, repos []uint32, now time.Time, shardMerging bool)
}
}

// remove any Zoekt metadata files in the given dir that don't have an
// associated shard file
Comment on lines +161 to +162
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this unrelated cleanup? IE maybe should be another PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I added this functionality since we switched the implementation. Before, the implementation made sure to delete the metadata file before its associated shard file. Since we switched to using zoekt.IndexFilePaths that order isn't guaranteed anymore (leaving open the possibility of a metadata file not having an associated shard if we delete the shard and then crash). I added this additional logic to cleanup so that we would have a background process that would reap those "stranded" files.

I am happy to pull bit into a separate PR though.

metaFiles, err := filepath.Glob(filepath.Join(indexDir, "*.meta"))
if err != nil {
log.Printf("failed to glob %q for stranded metadata files: %s", indexDir, err)
} else {
for _, metaFile := range metaFiles {
shard := strings.TrimSuffix(metaFile, ".meta")
_, err := os.Stat(shard)
if err == nil {
// metadata file has associated shard
continue
}

if !errors.Is(err, fs.ErrNotExist) {
log.Printf("failed to stat metadata file %q: %s", metaFile, err)
continue
}

// metadata doesn't have an associated shard file, remove the metadata file

err = os.Remove(metaFile)
if err != nil {
log.Printf("failed to remove stranded metadata file %q: %s", metaFile, err)
continue
} else {
log.Printf("removed stranded metadata file: %s", metaFile)
}

}
}

metricCleanupDuration.Observe(time.Since(start).Seconds())
}

Expand Down Expand Up @@ -515,3 +549,68 @@ func removeTombstones(fn string) ([]*zoekt.Repository, error) {
}
return tombstones, nil
}

// deleteShards deletes all the shards in indexDir that are associated with
// the given repoID. If the repository specified by repoID happens to be in a
// compound shard, the repository is tombstoned instead.
//
// deleteShards returns errRepositoryNotFound if the repository specified by repoID
// isn't present in indexDir.
//
// Users must hold the global indexDir lock before calling deleteShards.
func deleteShards(indexDir string, repoID uint32) error {
Copy link
Member

@stefanhengl stefanhengl Nov 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quoting from cleanup:

	simple := shards[:0]
		for _, s := range shards {
			if shardMerging && maybeSetTombstone([]shard{s}, repo) {
				shardsLog(indexDir, "tombname", []shard{s})
			} else {
				simple = append(simple, s)
			}
		}

		if len(simple) == 0 {
			continue
		}

		removeAll(simple...)

Can't we just call that instead? I guess that is what @keegancsmith was refering to(?). We could factor it out into its own function and call it from your handler and from cleanup. WDYT?

shardMap := getShards(indexDir)

shards, ok := shardMap[repoID]
if !ok {
return errRepositoryNotFound
}

// Ensure that the paths are in reverse sorted order to ensure that Zoekt's repository <-> shard matching logic
// works correctly.
//
// Example: - repoA_v16.00002.zoekt
// - repoA_v16.00001.zoekt
// - repoA_v16.00000.zoekt
//
// zoekt-indexserver checks whether it has indexed "repoA" by first checking to see if the 0th shard
// is present (repoA_v16.00000.zoekt). If it's present, then it gathers all rest of the shards names in ascending order
// (...00001.zoekt, ...00002.zoekt). If it's missing, then zoekt assumes that it never indexed "repoA".
//
// If this function were to crash while deleting repoA, and we only deleted the 0th shard, then shard's 1 & 2 would never
// be cleaned up by Zoekt indexserver (since the 0th shard is the only shard that's tested).
//
// Deleting shards in reverse sorted order (2 -> 1 -> 0) always ensures that we don't leave an inconsistent
// state behind even if we crash.

sort.Slice(shards, func(i, j int) bool {
return shards[i].Path > shards[j].Path
})

for _, s := range shards {
// Is this repository inside a compound shard? If so, set a tombstone
// instead of deleting the shard outright.
if zoekt.ShardMergingEnabled() && maybeSetTombstone([]shard{s}, repoID) {
shardsLog(indexDir, "tomb", []shard{s})
Copy link
Contributor Author

@ggilmore ggilmore Nov 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Looking at the implementation of shardslog, I'm unsure if it's threadsafe (what happens if there are two writers to the same file)?

deleteShard's requirement that we hold the global index lock protects us against this, but I wonder if we should make this more explicit (e.g. have a dedicated mutex just for shardslog).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quoting: https://github.com/natefinch/lumberjack

Lumberjack assumes that only one process is writing to the output files. Using the same lumberjack configuration from multiple processes on the same machine will result in improper behavior.

continue
}

paths, err := zoekt.IndexFilePaths(s.Path)
if err != nil {
return fmt.Errorf("listing files for shard %q: %w", s.Path, err)
}

for _, p := range paths {
err := os.Remove(p)
if err != nil {
return fmt.Errorf("deleting %q: %w", p, err)
}
}

shardsLog(indexDir, "delete", []shard{s})
}

return nil
}

var errRepositoryNotFound = errors.New("repository not found")
195 changes: 185 additions & 10 deletions cmd/zoekt-sourcegraph-indexserver/cleanup_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"errors"
"fmt"
"net/url"
"os"
Expand All @@ -19,6 +20,7 @@ import (
)

func TestCleanup(t *testing.T) {

mk := func(name string, n int, mtime time.Time) shard {
return shard{
RepoID: fakeID(name),
Expand All @@ -28,6 +30,11 @@ func TestCleanup(t *testing.T) {
RepoTombstone: false,
}
}

mkMeta := func(name string, n int) string {
return fmt.Sprintf("%s_v%d.%05d.zoekt.meta", url.QueryEscape(name), 15, n)
}

// We don't use getShards so that we have two implementations of the same
// thing (ie pick up bugs in one)
glob := func(pattern string) []shard {
Expand Down Expand Up @@ -56,14 +63,16 @@ func TestCleanup(t *testing.T) {
recent := now.Add(-time.Hour)
old := now.Add(-25 * time.Hour)
cases := []struct {
name string
repos []string
index []shard
trash []shard
tmps []string

wantIndex []shard
wantTrash []shard
name string
repos []string
indexMetaFiles []string
index []shard
trash []shard
tmps []string

wantIndex []shard
wantIndexMetaFiles []string
wantTrash []shard
}{{
name: "noop",
}, {
Expand Down Expand Up @@ -96,6 +105,13 @@ func TestCleanup(t *testing.T) {
index: []shard{mk("foo", 0, recent), mk("bar", 0, recent)},
wantIndex: []shard{mk("foo", 0, recent)},
wantTrash: []shard{mk("bar", 0, now)},
}, {
name: "remove metafiles with no associated shards",
repos: []string{"foo", "bar"},
index: []shard{mk("foo", 0, recent), mk("bar", 0, recent)},
indexMetaFiles: []string{mkMeta("foo", 0), mkMeta("foo", 1), mkMeta("bar", 0)},
wantIndex: []shard{mk("foo", 0, recent), mk("bar", 0, recent)},
wantIndexMetaFiles: []string{mkMeta("foo", 0), mkMeta("bar", 0)},
}, {
name: "clean old .tmp files",
tmps: []string{"recent.tmp", "old.tmp"},
Expand Down Expand Up @@ -134,19 +150,54 @@ func TestCleanup(t *testing.T) {
t.Fatal(err)
}
}
for _, f := range tt.indexMetaFiles {
path := filepath.Join(dir, f)
if _, err := os.Create(path); err != nil {
t.Fatal(err)
}
}

var repoIDs []uint32
for _, name := range tt.repos {
repoIDs = append(repoIDs, fakeID(name))
}
cleanup(dir, repoIDs, now, false)

if d := cmp.Diff(tt.wantIndex, glob(filepath.Join(dir, "*.zoekt"))); d != "" {
actualIndexShards := glob(filepath.Join(dir, "*.zoekt"))

sort.Slice(actualIndexShards, func(i, j int) bool {
return actualIndexShards[i].Path < actualIndexShards[j].Path
})
sort.Slice(tt.wantIndex, func(i, j int) bool {
return tt.wantIndex[i].Path < tt.wantIndex[j].Path
})

if d := cmp.Diff(tt.wantIndex, actualIndexShards); d != "" {
t.Errorf("unexpected index (-want, +got):\n%s", d)
}
if d := cmp.Diff(tt.wantTrash, glob(filepath.Join(dir, ".trash", "*.zoekt"))); d != "" {

actualTrashShards := glob(filepath.Join(dir, ".trash", "*.zoekt"))

sort.Slice(actualTrashShards, func(i, j int) bool {
return actualTrashShards[i].Path < actualTrashShards[j].Path
})

sort.Slice(tt.wantTrash, func(i, j int) bool {
return tt.wantTrash[i].Path < tt.wantTrash[j].Path
})
if d := cmp.Diff(tt.wantTrash, actualTrashShards); d != "" {
t.Errorf("unexpected trash (-want, +got):\n%s", d)
}

actualIndexMetaFiles := globBase(filepath.Join(dir, "*.meta"))

sort.Strings(actualIndexMetaFiles)
sort.Strings(tt.wantIndexMetaFiles)

if d := cmp.Diff(tt.wantIndexMetaFiles, actualIndexMetaFiles, cmpopts.EquateEmpty()); d != "" {
t.Errorf("unexpected metadata files (-want, +got):\n%s", d)
}

if tmps := globBase(filepath.Join(dir, "*.tmp")); len(tmps) > 0 {
t.Errorf("unexpected tmps: %v", tmps)
}
Expand Down Expand Up @@ -455,6 +506,130 @@ func TestCleanupCompoundShards(t *testing.T) {
}
}

func TestDeleteShards(t *testing.T) {
remainingRepoA := zoekt.Repository{ID: 1, Name: "A"}
remainingRepoB := zoekt.Repository{ID: 2, Name: "B"}
repositoryToDelete := zoekt.Repository{ID: 99, Name: "DELETE_ME"}

t.Run("delete repository from set of normal shards", func(t *testing.T) {
indexDir := t.TempDir()

// map of repoID -> list of associated shard paths + metadata paths
shardMap := make(map[uint32][]string)

// setup: create shards for each repository, and populate the shard map
for _, r := range []zoekt.Repository{
remainingRepoA,
remainingRepoB,
repositoryToDelete,
} {
shards := createTestNormalShard(t, indexDir, r, 3)

for _, shard := range shards {
// create stub meta file
metaFile := shard + ".meta"
f, err := os.Create(metaFile)
if err != nil {
t.Fatalf("creating metadata file %q: %s", metaFile, err)
}

f.Close()

shardMap[r.ID] = append(shardMap[r.ID], shard, metaFile)
}
}

// run test: delete repository
err := deleteShards(indexDir, repositoryToDelete.ID)
if err != nil {
t.Errorf("unexpected error when deleting shards: %s", err)
}

// run assertions: gather all the shards + meta files that remain and
// check to see that only the files associated with the "remaining" repositories
// are present
var actualShardFiles []string

for _, pattern := range []string{"*.zoekt", "*.meta"} {
files, err := filepath.Glob(filepath.Join(indexDir, pattern))
if err != nil {
t.Fatalf("globbing indexDir: %s", err)
}

actualShardFiles = append(actualShardFiles, files...)
}

var expectedShardFiles []string
expectedShardFiles = append(expectedShardFiles, shardMap[remainingRepoA.ID]...)
expectedShardFiles = append(expectedShardFiles, shardMap[remainingRepoB.ID]...)

sort.Strings(actualShardFiles)
sort.Strings(expectedShardFiles)

if diff := cmp.Diff(expectedShardFiles, actualShardFiles); diff != "" {
t.Errorf("unexpected diff in list of shard files (-want +got):\n%s", diff)
}
})

t.Run("delete repository from compound shard", func(t *testing.T) {
indexDir := t.TempDir()

// setup: enable shard merging for compound shards
t.Setenv("SRC_ENABLE_SHARD_MERGING", "1")

// setup: create compound shard with all repositories
repositories := []zoekt.Repository{remainingRepoA, remainingRepoB, repositoryToDelete}
shard := createTestCompoundShard(t, indexDir, repositories)

err := deleteShards(indexDir, repositoryToDelete.ID)
if err != nil {
t.Errorf("unexpected error when deleting shards: %s", err)
}

// verify: read the compound shard, and ensure that only
// the repositories that we expect are in the shard (and the deleted one has been tombstoned)
actualRepositories, _, err := zoekt.ReadMetadataPathAlive(shard)
if err != nil {
t.Fatalf("reading repository metadata from shard: %s", err)
}

expectedRepositories := []*zoekt.Repository{&remainingRepoA, &remainingRepoB}

sort.Slice(actualRepositories, func(i, j int) bool {
return actualRepositories[i].ID < actualRepositories[j].ID
})

sort.Slice(expectedRepositories, func(i, j int) bool {
return expectedRepositories[i].ID < expectedRepositories[j].ID
})

opts := []cmp.Option{
cmpopts.IgnoreUnexported(zoekt.Repository{}),
cmpopts.IgnoreFields(zoekt.Repository{}, "IndexOptions", "HasSymbols"),
cmpopts.EquateEmpty(),
}
if diff := cmp.Diff(expectedRepositories, actualRepositories, opts...); diff != "" {
t.Errorf("unexpected diff in list of repositories (-want +got):\n%s", diff)
}
})

t.Run("returns errRepositoryNotFound if the repoID isn't in indexDir", func(t *testing.T) {
indexDir := t.TempDir()

// setup: create compound shard with all repositories
repositories := []zoekt.Repository{remainingRepoA, remainingRepoB, repositoryToDelete}
for _, r := range repositories {
createTestNormalShard(t, indexDir, r, 3)
}

// test: delete some random repository and check to see if we get the expected error
err := deleteShards(indexDir, 7777777)
if !errors.Is(err, errRepositoryNotFound) {
t.Errorf("expected errRepositoryNotFound when deleting shards, got: %s", err)
}
})
}

// createCompoundShard returns a path to a compound shard containing repos with
// ids. Use optsFns to overwrite fields of zoekt.Repository for all repos.
func createCompoundShard(t *testing.T, dir string, ids []uint32, optFns ...func(in *zoekt.Repository)) string {
Expand Down
9 changes: 9 additions & 0 deletions cmd/zoekt-sourcegraph-indexserver/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ func debugCmd() *ffcli.Command {
"wget -q -O - http://localhost:6072/metrics -sS | grep index_shard_merging_running". It is only possible
to trigger one merge operation at a time.

wget -q -O - http://localhost:6072/debug/delete?id=[REPOSITORY_ID]
delete all of the shards associated with the given repository id.

You can find the id associated with a repository via the "/debug/indexed" route.
If you need to delete multiple repositories at once, you can create a small shell pipeline. See the following example
(that removes the first listed repository from the ""/debug/indexed" route for inspiration):

> wget -q -O - http://localhost:6072/debug/indexed | awk '{print $1}' | tail -n +2 | head -n 1 | xargs -I {} -- wget -q -O - "http://localhost:6072/debug/delete?id={}"

wget -q -O - http://localhost:6072/debug/queue
list the repositories in the indexing queue, sorted by descending priority.

Expand Down
Loading