-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement list schemas by falling back to other APIs if GET /schemas unavailable #864
base: main
Are you sure you want to change the base?
Conversation
src/storage/resourceLoader.ts
Outdated
// Get latest schema for each subject | ||
for (const subject of subjects) { | ||
const versions: number[] = await subjectsClient.listVersions({ subject }); | ||
const latestVersion: number = Math.max(...versions); | ||
const schema: ResponseSchema = await subjectsClient.getSchemaByVersion({ | ||
subject, | ||
version: latestVersion.toString(), | ||
}); | ||
schemas.push(schema); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably want to queue up these as promises and await them all at once, something like:
// Get latest schema for each subject | |
for (const subject of subjects) { | |
const versions: number[] = await subjectsClient.listVersions({ subject }); | |
const latestVersion: number = Math.max(...versions); | |
const schema: ResponseSchema = await subjectsClient.getSchemaByVersion({ | |
subject, | |
version: latestVersion.toString(), | |
}); | |
schemas.push(schema); | |
} | |
// Get latest schema for each subject | |
const getSchemaPromises: Promise<ResponseSchema>[] = []; | |
for (const subject of subjects) { | |
const versions: number[] = await subjectsClient.listVersions({ subject }); | |
const latestVersion: number = Math.max(...versions); | |
getSchemaPromises.push( | |
subjectsClient.getSchemaByVersion({ | |
subject, | |
version: latestVersion.toString(), | |
}) | |
); | |
} | |
const schemas: ResponseSchema[] = await Promise.all(getSchemaPromises); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah that's much better, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, take a look again pls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shouples @rohitsanj if there are hundreds, thousands of schemas, is it really a good idea to do an unbounded concurrently-fetch-them-all ? I ... doubt it, and would prefer to "chunkify batch" through something like groups of up to ~10-20 or so at a time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I could add that in.
But I think in general we need to guard against trying to fetch 1000s of schemas by adding some sort of pagination or "lazy" fetching (even with the listSchemas
call which would be an expensive query on server side).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alas, the vscode view controller / treeview constructs expects to be able to get all children of a node at once, see TreeDataProvider.getChildren()
, either directly or as result of a thenable (a single promise). Alas cannot provide any sort of generator / async generator based result as far as I can see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the chunking
Tested again with the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is now only fetching the latest version for each subject group in the fallback code, whereas if client.getSchemas()
is successful, will give us all bound versions for each subject.
Will need to discuss with @shouples tomorrow.
Summary of Changes
Resolves #860
All SRs are not built equal, the WarpStream SR API does not support the List Schemas endpoint (for good reason).
We need to handle cases where the list schemas API is not available, and instead construct it using 1 + 2*N API calls to GET /subjects and on each subject: list versions to determine latest version, then get schema by version.
Any additional details or context that should be provided?
Pull request checklist
Please check if your PR fulfills the following (if applicable):
Tests
Other
.vsix
file?