Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement] logtail: make table subscription async #16879

Merged
merged 3 commits into from
Jun 18, 2024

Conversation

volgariver6
Copy link
Contributor

@volgariver6 volgariver6 commented Jun 13, 2024

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #13724

What this PR does / why we need it:

A table subscription takes long time if the table has much data
which need flushed, and it will block the logtail update of other
tables because they work in the same goroutine.

So we put the table subscription into a independent goroutine,
thus the logtails of other tables could be pushed to cn servers
without being blocked. And when the first phase of pulling table
data finishes, put it into the same goroutine with pushing-job,
and pull the data again for the second phase. At last, send the
whole data that are merged together to cn servers.

After the PR, the table subscription will not block pushing-jobs.
There are 50 parallel pulling jobs at the same time at most by
defalt to avoid OOM and other risks.


PR Type

Enhancement, Tests


Description

  • Introduced logtailMerger to merge multiple LogtailPhase instances into a single TableLogtail.
  • Added unit tests for the logtailMerger to ensure correct merging and callback execution.
  • Introduced LogtailPhase struct to represent logtail information for different phases of subscription.
  • Refactored LogtailServer to handle asynchronous table subscription, including new channels and worker functions.

Changes walkthrough 📝

Relevant files
Enhancement
merger.go
Introduce `logtailMerger` for merging multiple logtail phases

pkg/vm/engine/tae/logtail/service/merger.go

  • Added a new logtailMerger struct to merge multiple LogtailPhase
    instances.
  • Implemented a Merge function to merge logtails and return a combined
    TableLogtail and a callback function.
  • +89/-0   
    response.go
    Add `LogtailPhase` struct for subscription phases               

    pkg/vm/engine/tae/logtail/service/response.go

  • Introduced LogtailPhase struct to represent logtail information for
    subscription phases.
  • +10/-0   
    server.go
    Refactor `LogtailServer` for asynchronous table subscription

    pkg/vm/engine/tae/logtail/service/server.go

  • Refactored LogtailServer to handle asynchronous table subscription.
  • Added channels subReqChan and subTailChan for managing subscription
    requests and logtail phases.
  • Implemented logtailPullWorker and pullLogtailsPhase1 for asynchronous
    logtail pulling.
  • Updated logtailSender to handle the second phase of logtail
    collection.
  • +133/-59
    Tests
    merger_test.go
    Add unit tests for `logtailMerger` functionality                 

    pkg/vm/engine/tae/logtail/service/merger_test.go

  • Added unit tests for the logtailMerger functionality.
  • Verified merging of logtails and execution of callbacks.
  • +68/-0   

    💡 PR-Agent usage:
    Comment /help on the PR to get a list of all available PR-Agent tools and their descriptions

    Copy link

    PR-Agent was enabled for this repository. To continue using it, please link your git user with your CodiumAI identity here.

    PR Reviewer Guide 🔍

    ⏱️ Estimated effort to review [1-5]

    4

    🧪 Relevant tests

    Yes

    🔒 Security concerns

    No

    ⚡ Key issues to review

    Possible Bug:
    The logtailMerger implementation in merger.go uses a panic for handling mismatched table IDs during the merge process. This could lead to abrupt termination of the service if not handled properly. Consider replacing the panic with a more graceful error handling mechanism.

    Performance Concern:
    The merging logic in logtailMerger iterates over all logtails and performs multiple checks and operations within a loop. This could become a performance bottleneck with a large number of logtails. It might be beneficial to optimize this merging process or consider parallel processing techniques if applicable.

    Copy link

    PR-Agent was enabled for this repository. To continue using it, please link your git user with your CodiumAI identity here.

    PR Code Suggestions ✨

    CategorySuggestion                                                                                                                                    Score
    Possible bug
    Add a nil check for t.tail.Table to prevent potential nil pointer dereference

    To avoid potential panic due to a nil pointer dereference, add a nil check for
    t.tail.Table before accessing its TbId field.

    pkg/vm/engine/tae/logtail/service/merger.go [52-55]

    -} else if tableID.TbId != t.tail.Table.TbId {
    +} else if t.tail.Table != nil && tableID.TbId != t.tail.Table.TbId {
       panic(fmt.Sprintf("cannot merge logtails with different table: %d, %d",
         tableID.TbId, t.tail.Table.TbId))
     
    • Apply this suggestion
    Suggestion importance[1-10]: 8

    Why: This suggestion correctly identifies a potential nil pointer dereference, which is a critical issue that could cause the program to crash.

    8
    Possible issue
    Add a timeout mechanism when sending to pullWorkerPool to prevent potential deadlocks

    To prevent potential deadlocks, ensure that the pullWorkerPool channel is not blocked
    indefinitely by adding a timeout mechanism when sending to the channel.

    pkg/vm/engine/tae/logtail/service/server.go [394-395]

    -s.pullWorkerPool <- struct{}{}
    -defer func() { <-s.pullWorkerPool }()
    +select {
    +case s.pullWorkerPool <- struct{}{}:
    +  defer func() { <-s.pullWorkerPool }()
    +case <-time.After(time.Second * 10):
    +  s.logger.Error("timeout while waiting to send to pullWorkerPool")
    +  return
    +}
     
    • Apply this suggestion
    Suggestion importance[1-10]: 7

    Why: The suggestion addresses a possible deadlock scenario by adding a timeout mechanism, which is important for maintaining system responsiveness and stability.

    7
    Maintainability
    Extract the logic for handling the subscription request into a separate function for better readability

    To improve readability and maintainability, consider extracting the logic for handling the
    subscription request into a separate function.

    pkg/vm/engine/tae/logtail/service/server.go [380-388]

     case sub, ok := <-s.subReqChan:
       if !ok {
         s.logger.Info("subscription channel closed")
         return
       }
    +  s.handleSubscriptionRequest(ctx, sub)
    +
    +func (s *LogtailServer) handleSubscriptionRequest(ctx context.Context, sub subscription) {
       go s.pullLogtailsPhase1(ctx, sub)
    +}
     
    • Apply this suggestion
    Suggestion importance[1-10]: 6

    Why: This suggestion improves code maintainability and readability by separating concerns, which is a good practice, though not as critical as runtime errors or deadlocks.

    6
    Best practice
    Reset cbValue to 0 at the beginning of the test to ensure test isolation

    To ensure the test is isolated and does not depend on external state, reset cbValue to 0
    at the beginning of the test.

    pkg/vm/engine/tae/logtail/service/merger_test.go [26-27]

     var cbValue int
    +cbValue = 0
     tail1 := &LogtailPhase{
     
    • Apply this suggestion
    Suggestion importance[1-10]: 5

    Why: Ensuring that tests are isolated and do not depend on external state is a best practice in testing, making this a valid suggestion for improving test reliability.

    5

    @matrix-meow matrix-meow added the size/M Denotes a PR that changes [100,499] lines label Jun 13, 2024
    @volgariver6 volgariver6 force-pushed the issue13724 branch 2 times, most recently from c829349 to c89549c Compare June 13, 2024 14:38
    @volgariver6 volgariver6 requested a review from triump2020 as a code owner June 14, 2024 02:28
    @mergify mergify bot merged commit 9504578 into matrixorigin:main Jun 18, 2024
    17 of 18 checks passed
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    6 participants