Skip to content

Commit

Permalink
refactor: improve handling of message read for partially loaded files (
Browse files Browse the repository at this point in the history
…#63)

* refactor: improve handling of message read for partially loaded files
* chore: update release notes
  • Loading branch information
medcl authored Jan 15, 2025
1 parent 68ff853 commit 2786156
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 10 deletions.
3 changes: 2 additions & 1 deletion docs/content.en/docs/release-notes/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ Information about release notes of INFINI Framework is provided here.
### Bug fix
### Improvements
- Add util to http handler, support to parse bool parameter
- Handle simplified bulk metdata, parse index from url path #59
- Handle simplified bulk metdata, parse index from url path (#59)
- Improve handling of message read for partially loaded files (#63)


## v1.1.0 (2025-01-11)
Expand Down
28 changes: 19 additions & 9 deletions modules/queue/disk_queue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,17 +310,26 @@ READ_MSG:

//validate read position
if nextReadPos > d.maxBytesPerFileRead || (d.diskQueue.writeSegmentNum == d.segment && nextReadPos > d.diskQueue.writePos) {
err = errors.Errorf("dirty_read, the read position(%v,%v) exceed max_bytes_to_read: %v, current_write:(%v,%v)", d.segment, nextReadPos, d.maxBytesPerFileRead, d.diskQueue.writeSegmentNum, d.diskQueue.writePos)
time.Sleep(time.Millisecond * 100) //don't catch up too fast
stats.Increment("consumer", d.qCfg.ID, d.cCfg.ID, "dirty_read")
stats.Increment("consumer", d.qCfg.ID, d.cCfg.ID, "invalid_message_read")

//retry when file is in stale
if d.diskQueue.writeSegmentNum > d.segment && nextReadPos > d.maxBytesPerFileRead {
//re-check file size
goto RELOAD_FILE
//error only when complete loaded file
if d.fileLoadCompleted && nextReadPos > d.maxBytesPerFileRead {
err = errors.Errorf("the read position(%v,%v) exceed max_bytes_to_read: %v, current_write:(%v,%v)", d.segment, nextReadPos, d.maxBytesPerFileRead, d.diskQueue.writeSegmentNum, d.diskQueue.writePos)
return messages, true, err
}

//file was known to not loaded completed

//still working on the same file
if d.diskQueue.writeSegmentNum == d.segment {
time.Sleep(100 * time.Millisecond) // Prevent catching up too quickly.
log.Debugf("invalid message size detected. this might be due to a dirty read as the file was being written while open. reloading segment: %d", d.segment)
} else {
log.Debugf("invalid message size detected. this might be due to a partial file load. reloading segment: %d", d.segment)
}

return messages, true, err
stats.Increment("consumer", d.qCfg.ID, d.cCfg.ID, "reload_partial_file")
goto RELOAD_FILE
}

if d.mCfg.Compress.Message.Enabled {
Expand Down Expand Up @@ -365,6 +374,7 @@ READ_MSG:
}

RELOAD_FILE:
log.Debugf("load queue file: %v/%v, read at: %v", d.queue, d.segment, d.readPos)
if nextReadPos >= d.maxBytesPerFileRead {

if !d.fileLoadCompleted {
Expand Down Expand Up @@ -516,7 +526,7 @@ func (d *Consumer) ResetOffset(segment, readPos int64) error {

FIND_NEXT_FILE:
//if next file exists, and current file is not the last file, the file should be completed loaded
if next_file_exists {
if next_file_exists || d.diskQueue.writeSegmentNum > segment {
d.fileLoadCompleted = true
} else {
d.fileLoadCompleted = false
Expand Down

0 comments on commit 2786156

Please sign in to comment.