-
Notifications
You must be signed in to change notification settings - Fork 242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Perform handle spill IO outside of locked section in SpillFramework #11880
base: branch-25.02
Are you sure you want to change the base?
Conversation
Signed-off-by: Zach Puller <[email protected]>
Signed-off-by: Zach Puller <[email protected]>
Signed-off-by: Zach Puller <[email protected]>
Signed-off-by: Zach Puller <[email protected]>
sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Zach Puller <[email protected]>
Signed-off-by: Zach Puller <[email protected]>
sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Zach Puller <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please update the docs for all of the public APIs in SpillableHandle
and for SpillableHandle
itself to make it clear what APIs need to be thread safe and how they are supposed to be protected. For example spill
has no indication as to how it should behave when multiple people call it. We can infer that it one wins and the others fail. I want this mostly so it is clear what the contract is for each of these APIs. spillable
being a best effort API intended only for quick filtering is fine, but it needs to be documented so if someone tries to use it in a way that requires it to be exact we know that is wrong and it violates the contract. This also will let me reason about the code and who is calling it so that I can better reason about the correctness.
Also could you please explain how error handling/error recovery is intended to happen when spilling? Like if an exception is thrown while we are in the middle of trying to spill something what should happen. Are we supposed to just keep it in the spilling state?
sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala
Outdated
Show resolved
Hide resolved
Absolutely yeah, I'm just planning to resolve the other outstanding issues in the PR and ensure I have a clear understanding and answer to the other questions, and then will document accordingly. |
Signed-off-by: Zach Puller <[email protected]>
Signed-off-by: Zach Puller <[email protected]>
I discussed offline with @abellina and he gave me some ideas on how to rework this to not require a separate lock, but to just move the IO component out of the protected section of |
I'm triggering a build job to see if some test consistently fails for all PRs. |
build |
Signed-off-by: Zach Puller <[email protected]>
Signed-off-by: Zach Puller <[email protected]>
Signed-off-by: Zach Puller <[email protected]>
Signed-off-by: Zach Puller <[email protected]>
Signed-off-by: Zach Puller <[email protected]>
Signed-off-by: Zach Puller <[email protected]>
Signed-off-by: Zach Puller <[email protected]>
Signed-off-by: Zach Puller <[email protected]>
Signed-off-by: Zach Puller <[email protected]>
…into spill_lock
Signed-off-by: Zach Puller <[email protected]>
Signed-off-by: Zach Puller <[email protected]>
I've pushed several updates to the branch. Per my previous comment, now the changes are structured in such a way that the interface does not need to change. I was able to test this and see evidence in traces showing that it allows multiple threads to spill concurrently. I considered refactoring some duplicate code fragments between the handle types around how we do the buffer swaps etc. but I'm not sure if it will actually simplify the overall code readability/PR so I held off for now. |
Signed-off-by: Zach Puller <[email protected]>
} | ||
} | ||
|
||
override def close(): Unit = { | ||
// do we need to Cuda.deviceSynchronize here? | ||
// what if we don't spill |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a need here. close
is called from the user code and that's who owns the handle
, so the caller needs to be careful about calling synchronize before closing memory, but at the same time we use stream aware allocators, so I don't see cases where we need to add extra synchronization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh sorry, I know we just discussed that offline, I forgot to delete this comment
@@ -527,33 +601,64 @@ class SpillableColumnarBatchHandle private ( | |||
materialized | |||
} | |||
|
|||
|
|||
private var toSpill: Option[ColumnarBatch] = None | |||
private var spilled: Option[ColumnarBatch] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need both toSpill
and spilled
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
toSpill
is used to both indicate which thread is currently spilling as well as hold a reference to the underlying buffer during spill. spilled
is used to make sure the resource gets cleaned up properly after spilling. It's possible that there's some clever way to consolidate them that I couldn't think of
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but for instance if you are spilling only have toSpill
set and you do toSpill = None, dev = None
when closing the handle, then when spill IO finishes and it tries to swap the buffers you'll end up with all reference set to None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what do you expect the final state of a closed handle to look like? I would expect them to all be set to None after we closed anything that could not be closed in when the spill was happening.
To me it just feels simpler that if we see a close when a spill is happening, we mark the handle as closed, but we don't try to close anything. Then when the spill is finished, but still in the synchronized step we call close again (or perhaps an underlying close method) to finish the job.
I would also really like some tests to verify that this is working because I keep seeing race conditions and I don't trust myself to have caught all of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep they should all be set to None. I've tried to rework it to put as much as possible into a separate close impl method that we can invoke from close
or spill
as you described. It only doesn't handle the local variables like staging
but I still set those to None and close, separately within the spill call.
Also added some monte carlo style tests to test overlaying async closing and spilling at different delays. I was able to see in my local testing that I'm producing all possible branches of the race so to speak, but I removed the print statements for the PR, not sure if it's sufficient to simply run the tests and see that no buffers leak, or if we want to add some test helper logic into SpillFramework to actually assert that we triggered the race. For now I left that out as it may be overkill.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Zach Puller <[email protected]>
Signed-off-by: Zach Puller <[email protected]>
host = stagingHost | ||
} | ||
spilled = dev | ||
dev = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to have code to handle the case where close was called in the middle of a spill.
It looks like we are going to leak stagingHost, and dev which is then stored into spilled with the current code.
@@ -527,33 +601,64 @@ class SpillableColumnarBatchHandle private ( | |||
materialized | |||
} | |||
|
|||
|
|||
private var toSpill: Option[ColumnarBatch] = None | |||
private var spilled: Option[ColumnarBatch] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what do you expect the final state of a closed handle to look like? I would expect them to all be set to None after we closed anything that could not be closed in when the spill was happening.
To me it just feels simpler that if we see a close when a spill is happening, we mark the handle as closed, but we don't try to close anything. Then when the spill is finished, but still in the synchronized step we call close again (or perhaps an underlying close method) to finish the job.
I would also really like some tests to verify that this is working because I keep seeing race conditions and I don't trust myself to have caught all of them.
Signed-off-by: Zach Puller <[email protected]>
Signed-off-by: Zach Puller <[email protected]>
…into spill_lock
Signed-off-by: Zach Puller <[email protected]>
@@ -160,6 +155,11 @@ trait StoreHandle extends AutoCloseable { | |||
* removed on shutdown, or by handle.close, but 0-byte handles are not spillable. | |||
*/ | |||
val approxSizeInBytes: Long | |||
|
|||
/** | |||
* This is used to resolve races between closing a handle while spilling. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* This is used to resolve races between closing a handle while spilling. | |
* This is used to resolve races between closing a handle and spilling. |
} | ||
sizeInBytes | ||
} else { | ||
0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spacing is odd here.
} else { | ||
host = staging | ||
} | ||
// set spilled to dev instead of toSpill so that if dev was already closed during spill, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a little confused by this comment. toSpill
is dev
. It's just a reference to it. So when you increased the refcount at line 649, you increased it for dev
and toSpill
. I don't believe it makes much of a difference what we set spilled
to (dev or toSpill) but we should just remove the comment I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here is that it's an issue of timing: yes we set toSpill
to equal dev
but later if we call close, we intentionally leave toSpill
as is in case we are mid spill, but set dev
to None
, therefore the two variables are no longer equal at that point.
Having said that, I think I may be able to simplify this now that we explicitly keep track of closed
but let me check.
} | ||
} | ||
|
||
private def withChunkedPacker[T](body: ChunkedPacker => T): T = { | ||
val tbl = synchronized { | ||
if (dev.isEmpty) { | ||
if (toSpill.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a bad design for this method, and it's my fault. Could we pass to withChunkedPacker
a batch instead? That way we don't have this exception and don't rely on state.
So the signature would become:
private def withChunkedPacker[T](batchToPack: ColumnarBatch)(body: ChunkedPacker => T): T
} else { | ||
host = staging | ||
} | ||
// set spilled to dev instead of toSpill so that if dev was already closed during spill, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, the comment is confusing to me.
} | ||
} | ||
|
||
override def close(): Unit = { | ||
private def doClose(): Unit = synchronized { | ||
releaseDeviceResource() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so now releaseDeviceResource
is inside of the handle lock. This method ends up calling the spill store and takes its lock. I was trying to avoid this to prevent lock inversion deadlocks. Do you have a reason to move this inside of the handle lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it shouldn't be needed, let me try to undo that.
Addresses #11830
Moves the IO outside of the critical section in the spillable buffer handle
spill
functions to allow threads interacting with theSpillFramework
to manage the spill state of a handle consistently without being blocked on IO. Eg. if threadt1
is in the middle of spilling, and threadt2
wants to check whether this handle is currently spilling, it doesn't need to wait for the spill IO operation to complete in order to check whether the handle it spillable.