Skip to content

Commit

Permalink
Add streaming API
Browse files Browse the repository at this point in the history
Co-authored-by: JamesWrigley <[email protected]>
Co-authored-by: davidizzle <[email protected]>
  • Loading branch information
3 people committed Dec 4, 2024
1 parent c718390 commit 59a0371
Show file tree
Hide file tree
Showing 15 changed files with 1,361 additions and 8 deletions.
1 change: 0 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ Graphs = "86223c79-3864-5bf0-83f7-82e725a168b6"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09"
MemPool = "f9f48841-c794-520a-933b-121f7ba6ed94"
Mmap = "a63ad114-7e13-5084-954f-fe012c677804"
OnlineStats = "a15396b6-48d5-5d58-9928-6d29437db91e"
PrecompileTools = "aea7be01-6a6a-4083-8856-8a6e6704d82a"
Profile = "9abbd945-dff8-562f-b5e8-e1ebf5ef1b79"
Expand Down
1 change: 1 addition & 0 deletions docs/make.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ makedocs(;
"Task Spawning" => "task-spawning.md",
"Data Management" => "data-management.md",
"Distributed Arrays" => "darray.md",
"Streaming Tasks" => "streaming.md",
"Scopes" => "scopes.md",
"Processors" => "processors.md",
"Task Queues" => "task-queues.md",
Expand Down
34 changes: 34 additions & 0 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,37 @@ Dagger.@spawn copyto!(C, X)

In contrast to the previous example, here, the tasks are executed without argument annotations. As a result, there is a possibility of the `copyto!` task being executed before the `sort!` task, leading to unexpected results in the output array `C`.

## Quickstart: Streaming

Dagger.jl provides a streaming API that allows you to process data in a streaming fashion, where data is processed as it becomes available, rather than waiting for the entire dataset to be loaded into memory.

For more details: [Streaming](@ref)

### Syntax

The `Dagger.spawn_streaming()` function is used to create a streaming region,
where tasks are executed continuously, processing data as it becomes available:

```julia
# Open a file to write to on this worker
f = Dagger.@mutable open("output.txt", "w")
t = Dagger.spawn_streaming() do
# Generate random numbers continuously
val = Dagger.@spawn rand()
# Write each random number to a file
Dagger.@spawn (f, val) -> begin
if val < 0.01
# Finish streaming when the random number is less than 0.01
Dagger.finish_stream()
end
println(f, val)
end
end
# Wait for all values to be generated and written
wait(t)
```

The above example demonstrates a streaming region that generates random numbers
continuously and writes each random number to a file. The streaming region is
terminated when a random number less than 0.01 is generated, which is done by
calling `Dagger.finish_stream()` (this exits the current streaming task).
106 changes: 106 additions & 0 deletions docs/src/streaming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Streaming

Dagger tasks have a limited lifetime - they are created, execute, finish, and
are eventually destroyed when they're no longer needed. Thus, if one wants
to run the same kind of computations over and over, one might re-create a
similar set of tasks for each unit of data that needs processing.

This might be fine for computations which take a long time to run (thus
dwarfing the cost of task creation, which is quite small), or when working with
a limited set of data, but this approach is not great for doing lots of small
computations on a large (or endless) amount of data. For example, processing
image frames from a webcam, reacting to messages from a message bus, reading
samples from a software radio, etc. All of these tasks are better suited to a
"streaming" model of data processing, where data is simply piped into a
continuously-running task (or DAG of tasks) forever, or until the data runs
out.

Thankfully, if you have a problem which is best modeled as a streaming system
of tasks, Dagger has you covered! Building on its support for
[Task Queues](@ref), Dagger provides a means to convert an entire DAG of
tasks into a streaming DAG, where data flows into and out of each task
asynchronously, using the `spawn_streaming` function:

```julia
Dagger.spawn_streaming() do # enters a streaming region
vals = Dagger.@spawn rand()
print_vals = Dagger.@spawn println(vals)
end # exits the streaming region, and starts the DAG running
```

In the above example, `vals` is a Dagger task which has been transformed to run
in a streaming manner - instead of just calling `rand()` once and returning its
result, it will re-run `rand()` endlessly, continuously producing new random
values. In typical Dagger style, `print_vals` is a Dagger task which depends on
`vals`, but in streaming form - it will continuously `println` the random
values produced from `vals`. Both tasks will run forever, and will run
efficiently, only doing the work necessary to generate, transfer, and consume
values.

As the comments point out, `spawn_streaming` creates a streaming region, during
which `vals` and `print_vals` are created and configured. Both tasks are halted
until `spawn_streaming` returns, allowing large DAGs to be built all at once,
without any task losing a single value. If desired, streaming regions can be
connected, although some values might be lost while tasks are being connected:

```julia
vals = Dagger.spawn_streaming() do
Dagger.@spawn rand()
end

# Some values might be generated by `vals` but thrown away
# before `print_vals` is fully setup and connected to it

print_vals = Dagger.spawn_streaming() do
Dagger.@spawn println(vals)
end
```

More complicated streaming DAGs can be easily constructed, without doing
anything different. For example, we can generate multiple streams of random
numbers, write them all to their own files, and print the combined results:

```julia
Dagger.spawn_streaming() do
all_vals = [Dagger.spawn(rand) for i in 1:4]
all_vals_written = map(1:4) do i
Dagger.spawn(all_vals[i]) do val
open("results_$i.txt"; write=true, create=true, append=true) do io
println(io, repr(val))
end
return val
end
end
Dagger.spawn(all_vals_written...) do all_vals_written...
vals_sum = sum(all_vals_written)
println(vals_sum)
end
end
```

If you want to stop the streaming DAG and tear it all down, you can call
`Dagger.cancel!.(all_vals)` and `Dagger.cancel!.(all_vals_written)` to
terminate each streaming task. In the future, a more convenient way to tear
down a full DAG will be added; for now, each task must be cancelled individually.

Alternatively, tasks can stop themselves from the inside with
`finish_streaming`, optionally returning a value that can be `fetch`'d. Let's
do this when our randomly-drawn number falls within some arbitrary range:

```julia
vals = Dagger.spawn_streaming() do
Dagger.spawn() do
x = rand()
if x < 0.001
# That's good enough, let's be done
return Dagger.finish_streaming("Finished!")
end
return x
end
end
fetch(vals)
```

In this example, the call to `fetch` will hang (while random numbers continue
to be drawn), until a drawn number is less than 0.001; at that point, `fetch`
will return with `"Finished!"`, and the task `vals` will have terminated.
6 changes: 5 additions & 1 deletion src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ if !isdefined(Base, :ScopedValues)
else
import Base.ScopedValues: ScopedValue, with
end

import TaskLocalValues: TaskLocalValue

if !isdefined(Base, :get_extension)
Expand Down Expand Up @@ -69,6 +68,11 @@ include("sch/Sch.jl"); using .Sch
# Data dependency task queue
include("datadeps.jl")

# Streaming
include("stream.jl")
include("stream-buffers.jl")
include("stream-transfer.jl")

# Array computations
include("array/darray.jl")
include("array/alloc.jl")
Expand Down
8 changes: 5 additions & 3 deletions src/sch/Sch.jl
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,11 @@ end
Combine `SchedulerOptions` and `ThunkOptions` into a new `ThunkOptions`.
"""
function Base.merge(sopts::SchedulerOptions, topts::ThunkOptions)
single = topts.single !== nothing ? topts.single : sopts.single
allow_errors = topts.allow_errors !== nothing ? topts.allow_errors : sopts.allow_errors
proclist = topts.proclist !== nothing ? topts.proclist : sopts.proclist
select_option = (sopt, topt) -> isnothing(topt) ? sopt : topt

single = select_option(sopts.single, topts.single)
allow_errors = select_option(sopts.allow_errors, topts.allow_errors)
proclist = select_option(sopts.proclist, topts.proclist)
ThunkOptions(single,
proclist,
topts.time_util,
Expand Down
7 changes: 7 additions & 0 deletions src/sch/eager.jl
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ function eager_cleanup(state, uid)
# N.B. cache and errored expire automatically
delete!(state.thunk_dict, tid)
end
remotecall_wait(1, uid) do uid
lock(Dagger.EAGER_THUNK_STREAMS) do global_streams
if haskey(global_streams, uid)
delete!(global_streams, uid)
end
end
end
end

function _find_thunk(e::Dagger.DTask)
Expand Down
2 changes: 2 additions & 0 deletions src/sch/util.jl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ unwrap_nested_exception(err::RemoteException) =
unwrap_nested_exception(err.captured)
unwrap_nested_exception(err::DTaskFailedException) =
unwrap_nested_exception(err.ex)
unwrap_nested_exception(err::TaskFailedException) =
unwrap_nested_exception(err.t.exception)
unwrap_nested_exception(err) = err

"Gets a `NamedTuple` of options propagated by `thunk`."
Expand Down
64 changes: 64 additions & 0 deletions src/stream-buffers.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"A process-local ring buffer."
mutable struct ProcessRingBuffer{T}
read_idx::Int
write_idx::Int
@atomic count::Int
buffer::Vector{T}
@atomic open::Bool
function ProcessRingBuffer{T}(len::Int=1024) where T
buffer = Vector{T}(undef, len)
return new{T}(1, 1, 0, buffer, true)
end
end
Base.isempty(rb::ProcessRingBuffer) = (@atomic rb.count) == 0
isfull(rb::ProcessRingBuffer) = (@atomic rb.count) == length(rb.buffer)
capacity(rb::ProcessRingBuffer) = length(rb.buffer)
Base.length(rb::ProcessRingBuffer) = @atomic rb.count
Base.isopen(rb::ProcessRingBuffer) = @atomic rb.open
function Base.close(rb::ProcessRingBuffer)
@atomic rb.open = false
end
function Base.put!(rb::ProcessRingBuffer{T}, x) where T
while isfull(rb)
yield()
if !isopen(rb)
throw(InvalidStateException("ProcessRingBuffer is closed", :closed))
end
task_may_cancel!(; must_force=true)
end
to_write_idx = mod1(rb.write_idx, length(rb.buffer))
rb.buffer[to_write_idx] = convert(T, x)
rb.write_idx += 1
@atomic rb.count += 1
end
function Base.take!(rb::ProcessRingBuffer)
while isempty(rb)
yield()
if !isopen(rb) && isempty(rb)
throw(InvalidStateException("ProcessRingBuffer is closed", :closed))
end
if task_cancelled() && isempty(rb)
# We respect a graceful cancellation only if the buffer is empty.
# Otherwise, we may have values to continue communicating.
task_may_cancel!()
end
task_may_cancel!(; must_force=true)
end
to_read_idx = rb.read_idx
rb.read_idx += 1
@atomic rb.count -= 1
to_read_idx = mod1(to_read_idx, length(rb.buffer))
return rb.buffer[to_read_idx]
end

"""
`take!()` all the elements from a buffer and put them in a `Vector`.
"""
function collect!(rb::ProcessRingBuffer{T}) where T
output = Vector{T}(undef, rb.count)
for i in 1:rb.count
output[i] = take!(rb)
end

return output
end
71 changes: 71 additions & 0 deletions src/stream-transfer.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
struct RemoteChannelFetcher
chan::RemoteChannel
RemoteChannelFetcher() = new(RemoteChannel())
end
const _THEIR_TID = TaskLocalValue{Int}(()->0)
function stream_push_values!(fetcher::RemoteChannelFetcher, T, our_store::StreamStore, their_stream::Stream, buffer)
our_tid = STREAM_THUNK_ID[]
our_uid = our_store.uid
their_uid = their_stream.uid
if _THEIR_TID[] == 0
_THEIR_TID[] = remotecall_fetch(1) do
lock(Sch.EAGER_ID_MAP) do id_map
id_map[their_uid]
end
end
end
their_tid = _THEIR_TID[]
@dagdebug our_tid :stream_push "taking output value: $our_tid -> $their_tid"
value = try
take!(buffer)
catch
close(fetcher.chan)
rethrow()
end
@lock our_store.lock notify(our_store.lock)
@dagdebug our_tid :stream_push "pushing output value: $our_tid -> $their_tid"
try
put!(fetcher.chan, value)
catch err
if err isa InvalidStateException && !isopen(fetcher.chan)
@dagdebug our_tid :stream_push "channel closed: $our_tid -> $their_tid"
throw(InterruptException())
end
# N.B. We don't close the buffer to allow for eventual reconnection
rethrow(err)
end
@dagdebug our_tid :stream_push "finished pushing output value: $our_tid -> $their_tid"
end
function stream_pull_values!(fetcher::RemoteChannelFetcher, T, our_store::StreamStore, their_stream::Stream, buffer)
our_tid = STREAM_THUNK_ID[]
our_uid = our_store.uid
their_uid = their_stream.uid
if _THEIR_TID[] == 0
_THEIR_TID[] = remotecall_fetch(1) do
lock(Sch.EAGER_ID_MAP) do id_map
id_map[their_uid]
end
end
end
their_tid = _THEIR_TID[]
@dagdebug our_tid :stream_pull "pulling input value: $their_tid -> $our_tid"
value = try
take!(fetcher.chan)
catch err
if err isa InvalidStateException && !isopen(fetcher.chan)
@dagdebug our_tid :stream_pull "channel closed: $their_tid -> $our_tid"
throw(InterruptException())
end
# N.B. We don't close the buffer to allow for eventual reconnection
rethrow(err)
end
@dagdebug our_tid :stream_pull "putting input value: $their_tid -> $our_tid"
try
put!(buffer, value)
catch
close(fetcher.chan)
rethrow()
end
@lock our_store.lock notify(our_store.lock)
@dagdebug our_tid :stream_pull "finished putting input value: $their_tid -> $our_tid"
end
Loading

0 comments on commit 59a0371

Please sign in to comment.