diff --git a/dev/.documenter-siteinfo.json b/dev/.documenter-siteinfo.json index eab679796..75a69a466 100644 --- a/dev/.documenter-siteinfo.json +++ b/dev/.documenter-siteinfo.json @@ -1 +1 @@ -{"documenter":{"julia_version":"1.10.2","generation_timestamp":"2024-04-05T21:11:34","documenter_version":"1.3.0"}} \ No newline at end of file +{"documenter":{"julia_version":"1.10.2","generation_timestamp":"2024-04-11T22:39:16","documenter_version":"1.3.0"}} \ No newline at end of file diff --git a/dev/api-dagger/functions/index.html b/dev/api-dagger/functions/index.html index 65445f474..8bab8f32a 100644 --- a/dev/api-dagger/functions/index.html +++ b/dev/api-dagger/functions/index.html @@ -1,15 +1,15 @@ -Functions and Macros · Dagger.jl

Dagger Functions

Task Functions/Macros

Dagger.@spawnMacro
@spawn [opts] f(args...) -> Thunk

Convenience macro like Dagger.@par, but eagerly executed from the moment it's called (equivalent to spawn).

See the docs for @par for more information and usage examples.

source
Dagger.spawnFunction
spawn(f, args...; kwargs...) -> EagerThunk

Spawns a task with f as the function, args as the arguments, and kwargs as the keyword arguments, returning an EagerThunk. Uses a scheduler running in the background to execute code.

source
Dagger.delayedFunction
delayed(f, options=Options())(args...; kwargs...) -> Thunk
-delayed(f; options...)(args...; kwargs...) -> Thunk

Creates a Thunk object which can be executed later, which will call f with args and kwargs. options controls various properties of the resulting Thunk.

source
Dagger.@parMacro
@par [opts] f(args...; kwargs...) -> Thunk

Convenience macro to call Dagger.delayed on f with arguments args and keyword arguments kwargs. May also be called with a series of assignments like so:

x = @par begin
+Functions and Macros · Dagger.jl

Dagger Functions

Task Functions/Macros

Dagger.@spawnMacro
@spawn [opts] f(args...) -> Thunk

Convenience macro like Dagger.@par, but eagerly executed from the moment it's called (equivalent to spawn).

See the docs for @par for more information and usage examples.

source
Dagger.spawnFunction
spawn(f, args...; kwargs...) -> EagerThunk

Spawns a task with f as the function, args as the arguments, and kwargs as the keyword arguments, returning an EagerThunk. Uses a scheduler running in the background to execute code.

source
Dagger.delayedFunction
delayed(f, options=Options())(args...; kwargs...) -> Thunk
+delayed(f; options...)(args...; kwargs...) -> Thunk

Creates a Thunk object which can be executed later, which will call f with args and kwargs. options controls various properties of the resulting Thunk.

source
Dagger.@parMacro
@par [opts] f(args...; kwargs...) -> Thunk

Convenience macro to call Dagger.delayed on f with arguments args and keyword arguments kwargs. May also be called with a series of assignments like so:

x = @par begin
     a = f(1,2)
     b = g(a,3)
     h(a,b)
-end

x will hold the Thunk representing h(a,b); additionally, a and b will be defined in the same local scope and will be equally accessible for later calls.

Options to the Thunk can be set as opts with namedtuple syntax, e.g. single=1. Multiple options may be provided, and will be applied to all generated thunks.

source

Task Options Functions/Macros

Dagger.with_optionsFunction
with_options(f, options::NamedTuple) -> Any
-with_options(f; options...) -> Any

Sets one or more options to the given values, executes f(), resets the options to their previous values, and returns the result of f(). This is the recommended way to set options, as it only affects tasks spawned within its scope. Note that setting an option here will propagate its value across Julia or Dagger tasks spawned by f() or its callees (i.e. the options propagate).

source
Dagger.get_optionsFunction
get_options(key::Symbol, default) -> Any
-get_options(key::Symbol) -> Any

Returns the value of the option named key. If option does not have a value set, then an error will be thrown, unless default is set, in which case it will be returned instead of erroring.

get_options() -> NamedTuple

Returns a NamedTuple of all option key-value pairs.

source
Dagger.@optionMacro
@option name myfunc(A, B, C) = value

A convenience macro for defining default_option. For example:

Dagger.@option single mylocalfunc(Int) = 1

The above call will set the single option to 1 for any Dagger task calling mylocalfunc(Int) with an Int argument.

source
Dagger.default_optionFunction
default_option(::Val{name}, Tf, Targs...) where name = value

Defines the default value for option name to value when Dagger is preparing to execute a function with type Tf with the argument types Targs. Users and libraries may override this to set default values for tasks.

An easier way to define these defaults is with @option.

Note that the actual task's argument values are not passed, as it may not always be possible or efficient to gather all Dagger task arguments on one worker.

This function may be executed within the scheduler, so it should generally be made very cheap to execute. If the function throws an error, the scheduler will use whatever the global default value is for that option instead.

source

Data Management Functions

Dagger.tochunkFunction
tochunk(x, proc::Processor, scope::AbstractScope; device=nothing, kwargs...) -> Chunk

Create a chunk from data x which resides on proc and which has scope scope.

device specifies a MemPool.StorageDevice (which is itself wrapped in a Chunk) which will be used to manage the reference contained in the Chunk generated by this function. If device is nothing (the default), the data will be inspected to determine if it's safe to serialize; if so, the default MemPool storage device will be used; if not, then a MemPool.CPURAMDevice will be used.

All other kwargs are passed directly to MemPool.poolset.

source
Dagger.mutableFunction
mutable(f::Base.Callable; worker, processor, scope) -> Chunk

Calls f() on the specified worker or processor, returning a Chunk referencing the result with the specified scope scope.

source
Dagger.shardFunction
shard(f; kwargs...) -> Chunk{Shard}

Executes f on all workers in workers, wrapping the result in a process-scoped Chunk, and constructs a Chunk{Shard} containing all of these Chunks on the current worker.

Keyword arguments:

  • procs – The list of processors to create pieces on. May be any iterable container of Processors.
  • workers – The list of workers to create pieces on. May be any iterable container of Integers.
  • per_thread::Bool=false – If true, creates a piece per each thread, rather than a piece per each worker.
source

Scope Functions

Dagger.scopeFunction
scope(scs...) -> AbstractScope
+end

x will hold the Thunk representing h(a,b); additionally, a and b will be defined in the same local scope and will be equally accessible for later calls.

Options to the Thunk can be set as opts with namedtuple syntax, e.g. single=1. Multiple options may be provided, and will be applied to all generated thunks.

source

Task Options Functions/Macros

Dagger.with_optionsFunction
with_options(f, options::NamedTuple) -> Any
+with_options(f; options...) -> Any

Sets one or more options to the given values, executes f(), resets the options to their previous values, and returns the result of f(). This is the recommended way to set options, as it only affects tasks spawned within its scope. Note that setting an option here will propagate its value across Julia or Dagger tasks spawned by f() or its callees (i.e. the options propagate).

source
Dagger.get_optionsFunction
get_options(key::Symbol, default) -> Any
+get_options(key::Symbol) -> Any

Returns the value of the option named key. If option does not have a value set, then an error will be thrown, unless default is set, in which case it will be returned instead of erroring.

get_options() -> NamedTuple

Returns a NamedTuple of all option key-value pairs.

source
Dagger.@optionMacro
@option name myfunc(A, B, C) = value

A convenience macro for defining default_option. For example:

Dagger.@option single mylocalfunc(Int) = 1

The above call will set the single option to 1 for any Dagger task calling mylocalfunc(Int) with an Int argument.

source
Dagger.default_optionFunction
default_option(::Val{name}, Tf, Targs...) where name = value

Defines the default value for option name to value when Dagger is preparing to execute a function with type Tf with the argument types Targs. Users and libraries may override this to set default values for tasks.

An easier way to define these defaults is with @option.

Note that the actual task's argument values are not passed, as it may not always be possible or efficient to gather all Dagger task arguments on one worker.

This function may be executed within the scheduler, so it should generally be made very cheap to execute. If the function throws an error, the scheduler will use whatever the global default value is for that option instead.

source

Data Management Functions

Dagger.tochunkFunction
tochunk(x, proc::Processor, scope::AbstractScope; device=nothing, kwargs...) -> Chunk

Create a chunk from data x which resides on proc and which has scope scope.

device specifies a MemPool.StorageDevice (which is itself wrapped in a Chunk) which will be used to manage the reference contained in the Chunk generated by this function. If device is nothing (the default), the data will be inspected to determine if it's safe to serialize; if so, the default MemPool storage device will be used; if not, then a MemPool.CPURAMDevice will be used.

All other kwargs are passed directly to MemPool.poolset.

source
Dagger.mutableFunction
mutable(f::Base.Callable; worker, processor, scope) -> Chunk

Calls f() on the specified worker or processor, returning a Chunk referencing the result with the specified scope scope.

source
Dagger.shardFunction
shard(f; kwargs...) -> Chunk{Shard}

Executes f on all workers in workers, wrapping the result in a process-scoped Chunk, and constructs a Chunk{Shard} containing all of these Chunks on the current worker.

Keyword arguments:

  • procs – The list of processors to create pieces on. May be any iterable container of Processors.
  • workers – The list of workers to create pieces on. May be any iterable container of Integers.
  • per_thread::Bool=false – If true, creates a piece per each thread, rather than a piece per each worker.
source

Scope Functions

Dagger.scopeFunction
scope(scs...) -> AbstractScope
 scope(;scs...) -> AbstractScope

Constructs an AbstractScope from a set of scope specifiers. Each element in scs is a separate specifier; if scs is empty, an empty UnionScope() is produced; if scs has one element, then exactly one specifier is constructed; if scs has more than one element, a UnionScope of the scopes specified by scs is constructed. A variety of specifiers can be passed to construct a scope:

  • :any - Constructs an AnyScope()
  • :default - Constructs a DefaultScope()
  • (scs...,) - Constructs a UnionScope of scopes, each specified by scs
  • thread=tid or threads=[tids...] - Constructs an ExactScope or UnionScope containing all Dagger.ThreadProcs with thread ID tid/tids across all workers.
  • worker=wid or workers=[wids...] - Constructs a ProcessScope or UnionScope containing all Dagger.ThreadProcs with worker ID wid/wids across all threads.
  • thread=tid/threads=tids and worker=wid/workers=wids - Constructs an ExactScope, ProcessScope, or UnionScope containing all Dagger.ThreadProcs with worker ID wid/wids and threads tid/tids.

Aside from the worker and thread specifiers, it's possible to add custom specifiers for scoping to other kinds of processors (like GPUs) or providing different ways to specify a scope. Specifier selection is determined by a precedence ordering: by default, all specifiers have precedence 0, which can be changed by defining scope_key_precedence(::Val{spec}) = precedence (where spec is the specifier as a Symbol). The specifier with the highest precedence in a set of specifiers is used to determine the scope by calling to_scope(::Val{spec}, sc::NamedTuple) (where sc is the full set of specifiers), which should be overriden for each custom specifier, and which returns an AbstractScope. For example:

# Setup a GPU specifier
 Dagger.scope_key_precedence(::Val{:gpu}) = 1
 Dagger.to_scope(::Val{:gpu}, sc::NamedTuple) = ExactScope(MyGPUDevice(sc.worker, sc.gpu))
 
 # Generate an `ExactScope` for `MyGPUDevice` on worker 2, device 3
-Dagger.scope(gpu=3, worker=2)
source
Dagger.constrainFunction
constraint(x::AbstractScope, y::AbstractScope) -> ::AbstractScope

Constructs a scope that is the intersection of scopes x and y.

source

Lazy Task Functions

Dagger.domainFunction
domain(x::T)

Returns metadata about x. This metadata will be in the domain field of a Chunk object when an object of type T is created as the result of evaluating a Thunk.

source
Dagger.computeFunction
compute(ctx::Context, d::Thunk; options=nothing) -> Chunk

Compute a Thunk - creates the DAG, assigns ranks to nodes for tie breaking and runs the scheduler with the specified options. Returns a Chunk which references the result.

source
Dagger.dependentsFunction
dependents(node::Thunk) -> Dict{Union{Thunk,Chunk}, Set{Thunk}}

Find the set of direct dependents for each task.

source
Dagger.noffspringFunction
noffspring(dpents::Dict{Union{Thunk,Chunk}, Set{Thunk}}) -> Dict{Thunk, Int}

Recursively find the number of tasks dependent on each task in the DAG. Takes a Dict as returned by dependents.

source
Dagger.orderFunction
order(node::Thunk, ndeps) -> Dict{Thunk,Int}

Given a root node of the DAG, calculates a total order for tie-breaking.

  • Root node gets score 1,
  • rest of the nodes are explored in DFS fashion but chunks of each node are explored in order of noffspring, i.e. total number of tasks depending on the result of the said node.

Args:

source

Processor Functions

Dagger.execute!Function
execute!(proc::Processor, f, args...; kwargs...) -> Any

Executes the function f with arguments args and keyword arguments kwargs on processor proc. This function can be overloaded by Processor subtypes to allow executing function calls differently than normal Julia.

source
Dagger.iscompatibleFunction
iscompatible(proc::Processor, opts, f, Targs...) -> Bool

Indicates whether proc can execute f over Targs given opts. Processor subtypes should overload this function to return true if and only if it is essentially guaranteed that f(::Targs...) is supported. Additionally, iscompatible_func and iscompatible_arg can be overriden to determine compatibility of f and Targs individually. The default implementation returns false.

source
Dagger.default_enabledFunction
default_enabled(proc::Processor) -> Bool

Returns whether processor proc is enabled by default. The default value is false, which is an opt-out of the processor from execution when not specifically requested by the user, and true implies opt-in, which causes the processor to always participate in execution when possible.

source
Dagger.get_processorsFunction
get_processors(proc::Processor) -> Set{<:Processor}

Returns the set of processors contained in proc, if any. Processor subtypes should overload this function if they can contain sub-processors. The default method will return a Set containing proc itself.

source
Dagger.get_parentFunction
get_parent(proc::Processor) -> Processor

Returns the parent processor for proc. The ultimate parent processor is an OSProc. Processor subtypes should overload this to return their most direct parent.

source
Dagger.moveFunction
move(from_proc::Processor, to_proc::Processor, x)

Moves and/or converts x such that it's available and suitable for usage on the to_proc processor. This function can be overloaded by Processor subtypes to transport arguments and convert them to an appropriate form before being used for exection. Subtypes of Processor wishing to implement efficient data movement should provide implementations where x::Chunk.

source

Context Functions

Dagger.addprocs!Function
addprocs!(ctx::Context, xs)

Add new workers xs to ctx.

Workers will typically be assigned new tasks in the next scheduling iteration if scheduling is ongoing.

Workers can be either Processors or the underlying process IDs as Integers.

source
Dagger.rmprocs!Function
rmprocs!(ctx::Context, xs)

Remove the specified workers xs from ctx.

Workers will typically finish all their assigned tasks if scheduling is ongoing but will not be assigned new tasks after removal.

Workers can be either Processors or the underlying process IDs as Integers.

source

Thunk Execution Environment Functions

These functions are used within the function called by a Thunk.

Dynamic Scheduler Control Functions

These functions query and control the scheduler remotely.

Base.fetchFunction
Base.fetch(c::DArray)

If a DArray tree has a Thunk in it, make the whole thing a big thunk.

source

Waits on a thunk to complete, and fetches its result.

source
+Dagger.scope(gpu=3, worker=2)
source
Dagger.constrainFunction
constraint(x::AbstractScope, y::AbstractScope) -> ::AbstractScope

Constructs a scope that is the intersection of scopes x and y.

source

Lazy Task Functions

Dagger.domainFunction
domain(x::T)

Returns metadata about x. This metadata will be in the domain field of a Chunk object when an object of type T is created as the result of evaluating a Thunk.

source
Dagger.computeFunction
compute(ctx::Context, d::Thunk; options=nothing) -> Chunk

Compute a Thunk - creates the DAG, assigns ranks to nodes for tie breaking and runs the scheduler with the specified options. Returns a Chunk which references the result.

source
Dagger.dependentsFunction
dependents(node::Thunk) -> Dict{Union{Thunk,Chunk}, Set{Thunk}}

Find the set of direct dependents for each task.

source
Dagger.noffspringFunction
noffspring(dpents::Dict{Union{Thunk,Chunk}, Set{Thunk}}) -> Dict{Thunk, Int}

Recursively find the number of tasks dependent on each task in the DAG. Takes a Dict as returned by dependents.

source
Dagger.orderFunction
order(node::Thunk, ndeps) -> Dict{Thunk,Int}

Given a root node of the DAG, calculates a total order for tie-breaking.

  • Root node gets score 1,
  • rest of the nodes are explored in DFS fashion but chunks of each node are explored in order of noffspring, i.e. total number of tasks depending on the result of the said node.

Args:

source

Processor Functions

Dagger.execute!Function
execute!(proc::Processor, f, args...; kwargs...) -> Any

Executes the function f with arguments args and keyword arguments kwargs on processor proc. This function can be overloaded by Processor subtypes to allow executing function calls differently than normal Julia.

source
Dagger.iscompatibleFunction
iscompatible(proc::Processor, opts, f, Targs...) -> Bool

Indicates whether proc can execute f over Targs given opts. Processor subtypes should overload this function to return true if and only if it is essentially guaranteed that f(::Targs...) is supported. Additionally, iscompatible_func and iscompatible_arg can be overriden to determine compatibility of f and Targs individually. The default implementation returns false.

source
Dagger.default_enabledFunction
default_enabled(proc::Processor) -> Bool

Returns whether processor proc is enabled by default. The default value is false, which is an opt-out of the processor from execution when not specifically requested by the user, and true implies opt-in, which causes the processor to always participate in execution when possible.

source
Dagger.get_processorsFunction
get_processors(proc::Processor) -> Set{<:Processor}

Returns the set of processors contained in proc, if any. Processor subtypes should overload this function if they can contain sub-processors. The default method will return a Set containing proc itself.

source
Dagger.get_parentFunction
get_parent(proc::Processor) -> Processor

Returns the parent processor for proc. The ultimate parent processor is an OSProc. Processor subtypes should overload this to return their most direct parent.

source
Dagger.moveFunction
move(from_proc::Processor, to_proc::Processor, x)

Moves and/or converts x such that it's available and suitable for usage on the to_proc processor. This function can be overloaded by Processor subtypes to transport arguments and convert them to an appropriate form before being used for exection. Subtypes of Processor wishing to implement efficient data movement should provide implementations where x::Chunk.

source

Context Functions

Dagger.addprocs!Function
addprocs!(ctx::Context, xs)

Add new workers xs to ctx.

Workers will typically be assigned new tasks in the next scheduling iteration if scheduling is ongoing.

Workers can be either Processors or the underlying process IDs as Integers.

source
Dagger.rmprocs!Function
rmprocs!(ctx::Context, xs)

Remove the specified workers xs from ctx.

Workers will typically finish all their assigned tasks if scheduling is ongoing but will not be assigned new tasks after removal.

Workers can be either Processors or the underlying process IDs as Integers.

source

Thunk Execution Environment Functions

These functions are used within the function called by a Thunk.

Dynamic Scheduler Control Functions

These functions query and control the scheduler remotely.

Base.fetchFunction
Base.fetch(c::DArray)

If a DArray tree has a Thunk in it, make the whole thing a big thunk.

source

Waits on a thunk to complete, and fetches its result.

source
diff --git a/dev/api-dagger/types/index.html b/dev/api-dagger/types/index.html index 692ec09c9..2b8a56744 100644 --- a/dev/api-dagger/types/index.html +++ b/dev/api-dagger/types/index.html @@ -4,7 +4,7 @@ Thunk(sin, (π,)) julia> collect(t) # computes the result and returns it to the current process -1.2246467991473532e-16

Arguments

for each property are described in the next section.

Public Properties

Chunks and passing the raw arguments to f, instead pass the Chunk. Useful for doing manual fetching or manipulation of Chunk references. Non-Chunk arguments are still passed as-is.

f is a callable struct that exists on a given processor and should be transferred appropriately.

Useful if f is a function or callable struct that may only be transferred to, and executed within, the specified scope.

Options

If omitted, options can also be specified by passing key-value pairs as kwargs.

source
Dagger.EagerThunkType
EagerThunk

Returned from spawn/@spawn calls. Represents a task that is in the scheduler, potentially ready to execute, executing, or finished executing. May be fetch'd or wait'd on at any time.

source

Task Options Types

Dagger.OptionsType
Options(::NamedTuple)
-Options(; kwargs...)

Options for thunks and the scheduler. See Task Spawning for more information.

source
Dagger.Sch.ThunkOptionsType
ThunkOptions

Stores Thunk-local options to be passed to the Dagger.Sch scheduler.

Arguments

  • single::Int=0: (Deprecated) Force thunk onto worker with specified id. 0 disables this option.
  • proclist=nothing: (Deprecated) Force thunk to use one or more processors that are instances/subtypes of a contained type. Alternatively, a function can be supplied, and the function will be called with a processor as the sole argument and should return a Bool result to indicate whether or not to use the given processor. nothing enables all default processors.
  • time_util::Dict{Type,Any}: Indicates the maximum expected time utilization for this thunk. Each keypair maps a processor type to the utilization, where the value can be a real (approximately the number of nanoseconds taken), or MaxUtilization() (utilizes all processors of this type). By default, the scheduler assumes that this thunk only uses one processor.
  • alloc_util::Dict{Type,UInt64}: Indicates the maximum expected memory utilization for this thunk. Each keypair maps a processor type to the utilization, where the value is an integer representing approximately the maximum number of bytes allocated at any one time.
  • occupancy::Dict{Type,Real}: Indicates the maximum expected processor occupancy for this thunk. Each keypair maps a processor type to the utilization, where the value can be a real between 0 and 1 (the occupancy ratio, where 1 is full occupancy). By default, the scheduler assumes that this thunk has full occupancy.
  • allow_errors::Bool=true: Allow this thunk to error without affecting non-dependent thunks.
  • checkpoint=nothing: If not nothing, uses the provided function to save the result of the thunk to persistent storage, for later retrieval by restore.
  • restore=nothing: If not nothing, uses the provided function to return the (cached) result of this thunk, were it to execute. If this returns a Chunk, this thunk will be skipped, and its result will be set to the Chunk. If nothing is returned, restoring is skipped, and the thunk will execute as usual. If this function throws an error, restoring will be skipped, and the error will be displayed.
  • storage::Union{Chunk,Nothing}=nothing: If not nothing, references a MemPool.StorageDevice which will be passed to MemPool.poolset internally when constructing Chunks (such as when constructing the return value). The device must support MemPool.CPURAMResource. When nothing, uses MemPool.GLOBAL_DEVICE[].
  • storage_root_tag::Any=nothing: If not nothing, specifies the MemPool storage leaf tag to associate with the thunk's result. This tag can be used by MemPool's storage devices to manipulate their behavior, such as the file name used to store data on disk."
  • storage_leaf_tag::MemPool.Tag,Nothing}=nothing: If not nothing, specifies the MemPool storage leaf tag to associate with the thunk's result. This tag can be used by MemPool's storage devices to manipulate their behavior, such as the file name used to store data on disk."
  • storage_retain::Bool=false: The value of retain to pass to MemPool.poolset when constructing the result Chunk.
source
Dagger.Sch.SchedulerOptionsType
SchedulerOptions

Stores DAG-global options to be passed to the Dagger.Sch scheduler.

Arguments

  • single::Int=0: (Deprecated) Force all work onto worker with specified id. 0 disables this option.
  • proclist=nothing: (Deprecated) Force scheduler to use one or more processors that are instances/subtypes of a contained type. Alternatively, a function can be supplied, and the function will be called with a processor as the sole argument and should return a Bool result to indicate whether or not to use the given processor. nothing enables all default processors.
  • allow_errors::Bool=true: Allow thunks to error without affecting non-dependent thunks.
  • checkpoint=nothing: If not nothing, uses the provided function to save the final result of the current scheduler invocation to persistent storage, for later retrieval by restore.
  • restore=nothing: If not nothing, uses the provided function to return the (cached) final result of the current scheduler invocation, were it to execute. If this returns a Chunk, all thunks will be skipped, and the Chunk will be returned. If nothing is returned, restoring is skipped, and the scheduler will execute as usual. If this function throws an error, restoring will be skipped, and the error will be displayed.
source

Data Management Types

Dagger.ChunkType
Chunk

A reference to a piece of data located on a remote worker. Chunks are typically created with Dagger.tochunk(data), and the data can then be accessed from any worker with collect(::Chunk). Chunks are serialization-safe, and use distributed refcounting (provided by MemPool.DRef) to ensure that the data referenced by a Chunk won't be GC'd, as long as a reference exists on some worker.

Each Chunk is associated with a given Dagger.Processor, which is (in a sense) the processor that "owns" or contains the data. Calling collect(::Chunk) will perform data movement and conversions defined by that processor to safely serialize the data to the calling worker.

Constructors

See tochunk.

source
Dagger.ShardType

Maps a value to one of multiple distributed "mirror" values automatically when used as a thunk argument. Construct using @shard or shard.

source

Processor Types

Dagger.ProcessorType
Processor

An abstract type representing a processing device and associated memory, where data can be stored and operated on. Subtypes should be immutable, and instances should compare equal if they represent the same logical processing device/memory. Subtype instances should be serializable between different nodes. Subtype instances may contain a "parent" Processor to make it easy to transfer data to/from other types of Processor at runtime.

source
Dagger.OSProcType
OSProc <: Processor

Julia CPU (OS) process, identified by Distributed pid. The logical parent of all processors on a given node, but otherwise does not participate in computations.

source
Dagger.ThreadProcType
ThreadProc <: Processor

Julia CPU (OS) thread, identified by Julia thread ID.

source

Scope Types

Dagger.AnyScopeType

Widest scope that contains all processors.

source
Dagger.NodeScopeType

Scoped to the same physical node.

source
Dagger.ProcessScopeType

Scoped to the same OS process.

source
Dagger.ProcessorTypeScopeFunction

Scoped to any processor with a given supertype.

source
Dagger.TaintScopeType

Taints a scope for later evaluation.

source
Dagger.UnionScopeType

Union of two or more scopes.

source
Dagger.ExactScopeType

Scoped to a specific processor.

source

Context Types

Dagger.ContextType
Context(xs::Vector{OSProc}) -> Context
-Context(xs::Vector{Int}) -> Context

Create a Context, by default adding each available worker.

It is also possible to create a Context from a vector of OSProc, or equivalently the underlying process ids can also be passed directly as a Vector{Int}.

Special fields include:

  • 'log_sink': A log sink object to use, if any.
  • profile::Bool: Whether or not to perform profiling with Profile stdlib.
source

Array Types

Dagger.DArrayType
DArray{T,N,F}(domain, subdomains, chunks, concat)
-DArray(T, domain, subdomains, chunks, [concat=cat])

An N-dimensional distributed array of element type T, with a concatenation function of type F.

Arguments

  • T: element type
  • domain::ArrayDomain{N}: the whole ArrayDomain of the array
  • subdomains::AbstractArray{ArrayDomain{N}, N}: a DomainBlocks of the same dimensions as the array
  • chunks::AbstractArray{Union{Chunk,Thunk}, N}: an array of chunks of dimension N
  • concat::F: a function of type F. concat(x, y; dims=d) takes two chunks x and y and concatenates them along dimension d. cat is used by default.
source
Dagger.BlocksType
Blocks(xs...)

Indicates the size of an array operation, specified as xs, whose length indicates the number of dimensions in the resulting array.

source
Dagger.ArrayDomainType
ArrayDomain{N}

An N-dimensional domain over an array.

source
Dagger.UnitDomainType
UnitDomain

Default domain – has no information about the value

source

Logging Event Types

Dagger.Events.BytesAllocdType
BytesAllocd

Tracks memory allocated for Chunks.

source
Dagger.Events.ProcessorSaturationType
ProcessorSaturation

Tracks the compute saturation (running tasks) per-processor.

source
Dagger.Events.WorkerSaturationType
WorkerSaturation

Tracks the compute saturation (running tasks).

source
+1.2246467991473532e-16

Arguments

for each property are described in the next section.

Public Properties

Chunks and passing the raw arguments to f, instead pass the Chunk. Useful for doing manual fetching or manipulation of Chunk references. Non-Chunk arguments are still passed as-is.

f is a callable struct that exists on a given processor and should be transferred appropriately.

Useful if f is a function or callable struct that may only be transferred to, and executed within, the specified scope.

Options

If omitted, options can also be specified by passing key-value pairs as kwargs.

source
Dagger.EagerThunkType
EagerThunk

Returned from spawn/@spawn calls. Represents a task that is in the scheduler, potentially ready to execute, executing, or finished executing. May be fetch'd or wait'd on at any time.

source

Task Options Types

Dagger.OptionsType
Options(::NamedTuple)
+Options(; kwargs...)

Options for thunks and the scheduler. See Task Spawning for more information.

source
Dagger.Sch.ThunkOptionsType
ThunkOptions

Stores Thunk-local options to be passed to the Dagger.Sch scheduler.

Arguments

  • single::Int=0: (Deprecated) Force thunk onto worker with specified id. 0 disables this option.
  • proclist=nothing: (Deprecated) Force thunk to use one or more processors that are instances/subtypes of a contained type. Alternatively, a function can be supplied, and the function will be called with a processor as the sole argument and should return a Bool result to indicate whether or not to use the given processor. nothing enables all default processors.
  • time_util::Dict{Type,Any}: Indicates the maximum expected time utilization for this thunk. Each keypair maps a processor type to the utilization, where the value can be a real (approximately the number of nanoseconds taken), or MaxUtilization() (utilizes all processors of this type). By default, the scheduler assumes that this thunk only uses one processor.
  • alloc_util::Dict{Type,UInt64}: Indicates the maximum expected memory utilization for this thunk. Each keypair maps a processor type to the utilization, where the value is an integer representing approximately the maximum number of bytes allocated at any one time.
  • occupancy::Dict{Type,Real}: Indicates the maximum expected processor occupancy for this thunk. Each keypair maps a processor type to the utilization, where the value can be a real between 0 and 1 (the occupancy ratio, where 1 is full occupancy). By default, the scheduler assumes that this thunk has full occupancy.
  • allow_errors::Bool=true: Allow this thunk to error without affecting non-dependent thunks.
  • checkpoint=nothing: If not nothing, uses the provided function to save the result of the thunk to persistent storage, for later retrieval by restore.
  • restore=nothing: If not nothing, uses the provided function to return the (cached) result of this thunk, were it to execute. If this returns a Chunk, this thunk will be skipped, and its result will be set to the Chunk. If nothing is returned, restoring is skipped, and the thunk will execute as usual. If this function throws an error, restoring will be skipped, and the error will be displayed.
  • storage::Union{Chunk,Nothing}=nothing: If not nothing, references a MemPool.StorageDevice which will be passed to MemPool.poolset internally when constructing Chunks (such as when constructing the return value). The device must support MemPool.CPURAMResource. When nothing, uses MemPool.GLOBAL_DEVICE[].
  • storage_root_tag::Any=nothing: If not nothing, specifies the MemPool storage leaf tag to associate with the thunk's result. This tag can be used by MemPool's storage devices to manipulate their behavior, such as the file name used to store data on disk."
  • storage_leaf_tag::MemPool.Tag,Nothing}=nothing: If not nothing, specifies the MemPool storage leaf tag to associate with the thunk's result. This tag can be used by MemPool's storage devices to manipulate their behavior, such as the file name used to store data on disk."
  • storage_retain::Bool=false: The value of retain to pass to MemPool.poolset when constructing the result Chunk.
source
Dagger.Sch.SchedulerOptionsType
SchedulerOptions

Stores DAG-global options to be passed to the Dagger.Sch scheduler.

Arguments

  • single::Int=0: (Deprecated) Force all work onto worker with specified id. 0 disables this option.
  • proclist=nothing: (Deprecated) Force scheduler to use one or more processors that are instances/subtypes of a contained type. Alternatively, a function can be supplied, and the function will be called with a processor as the sole argument and should return a Bool result to indicate whether or not to use the given processor. nothing enables all default processors.
  • allow_errors::Bool=true: Allow thunks to error without affecting non-dependent thunks.
  • checkpoint=nothing: If not nothing, uses the provided function to save the final result of the current scheduler invocation to persistent storage, for later retrieval by restore.
  • restore=nothing: If not nothing, uses the provided function to return the (cached) final result of the current scheduler invocation, were it to execute. If this returns a Chunk, all thunks will be skipped, and the Chunk will be returned. If nothing is returned, restoring is skipped, and the scheduler will execute as usual. If this function throws an error, restoring will be skipped, and the error will be displayed.
source

Data Management Types

Dagger.ChunkType
Chunk

A reference to a piece of data located on a remote worker. Chunks are typically created with Dagger.tochunk(data), and the data can then be accessed from any worker with collect(::Chunk). Chunks are serialization-safe, and use distributed refcounting (provided by MemPool.DRef) to ensure that the data referenced by a Chunk won't be GC'd, as long as a reference exists on some worker.

Each Chunk is associated with a given Dagger.Processor, which is (in a sense) the processor that "owns" or contains the data. Calling collect(::Chunk) will perform data movement and conversions defined by that processor to safely serialize the data to the calling worker.

Constructors

See tochunk.

source
Dagger.ShardType

Maps a value to one of multiple distributed "mirror" values automatically when used as a thunk argument. Construct using @shard or shard.

source

Processor Types

Dagger.ProcessorType
Processor

An abstract type representing a processing device and associated memory, where data can be stored and operated on. Subtypes should be immutable, and instances should compare equal if they represent the same logical processing device/memory. Subtype instances should be serializable between different nodes. Subtype instances may contain a "parent" Processor to make it easy to transfer data to/from other types of Processor at runtime.

source
Dagger.OSProcType
OSProc <: Processor

Julia CPU (OS) process, identified by Distributed pid. The logical parent of all processors on a given node, but otherwise does not participate in computations.

source
Dagger.ThreadProcType
ThreadProc <: Processor

Julia CPU (OS) thread, identified by Julia thread ID.

source

Scope Types

Dagger.AnyScopeType

Widest scope that contains all processors.

source
Dagger.NodeScopeType

Scoped to the same physical node.

source
Dagger.ProcessScopeType

Scoped to the same OS process.

source
Dagger.ProcessorTypeScopeFunction

Scoped to any processor with a given supertype.

source
Dagger.TaintScopeType

Taints a scope for later evaluation.

source
Dagger.UnionScopeType

Union of two or more scopes.

source
Dagger.ExactScopeType

Scoped to a specific processor.

source

Context Types

Dagger.ContextType
Context(xs::Vector{OSProc}) -> Context
+Context(xs::Vector{Int}) -> Context

Create a Context, by default adding each available worker.

It is also possible to create a Context from a vector of OSProc, or equivalently the underlying process ids can also be passed directly as a Vector{Int}.

Special fields include:

  • 'log_sink': A log sink object to use, if any.
  • profile::Bool: Whether or not to perform profiling with Profile stdlib.
source

Array Types

Dagger.DArrayType
DArray{T,N,F}(domain, subdomains, chunks, concat)
+DArray(T, domain, subdomains, chunks, [concat=cat])

An N-dimensional distributed array of element type T, with a concatenation function of type F.

Arguments

  • T: element type
  • domain::ArrayDomain{N}: the whole ArrayDomain of the array
  • subdomains::AbstractArray{ArrayDomain{N}, N}: a DomainBlocks of the same dimensions as the array
  • chunks::AbstractArray{Union{Chunk,Thunk}, N}: an array of chunks of dimension N
  • concat::F: a function of type F. concat(x, y; dims=d) takes two chunks x and y and concatenates them along dimension d. cat is used by default.
source
Dagger.BlocksType
Blocks(xs...)

Indicates the size of an array operation, specified as xs, whose length indicates the number of dimensions in the resulting array.

source
Dagger.ArrayDomainType
ArrayDomain{N}

An N-dimensional domain over an array.

source
Dagger.UnitDomainType
UnitDomain

Default domain – has no information about the value

source

Logging Event Types

Dagger.Events.BytesAllocdType
BytesAllocd

Tracks memory allocated for Chunks.

source
Dagger.Events.ProcessorSaturationType
ProcessorSaturation

Tracks the compute saturation (running tasks) per-processor.

source
Dagger.Events.WorkerSaturationType
WorkerSaturation

Tracks the compute saturation (running tasks).

source
diff --git a/dev/api-daggerwebdash/functions/index.html b/dev/api-daggerwebdash/functions/index.html index 0573ed0d0..1a1f54595 100644 --- a/dev/api-daggerwebdash/functions/index.html +++ b/dev/api-daggerwebdash/functions/index.html @@ -1,2 +1,2 @@ -Functions and Macros · Dagger.jl
+Functions and Macros · Dagger.jl
diff --git a/dev/api-daggerwebdash/types/index.html b/dev/api-daggerwebdash/types/index.html index 52ff1e2b0..1d9a66fc0 100644 --- a/dev/api-daggerwebdash/types/index.html +++ b/dev/api-daggerwebdash/types/index.html @@ -1,5 +1,5 @@ -Types · Dagger.jl

DaggerWebDash Types

Logging Event Types

DaggerWebDash.D3RendererType
D3Renderer(port::Int, port_range::UnitRange; seek_store=nothing) -> D3Renderer

Constructs a D3Renderer, which is a TimespanLogging aggregator which renders the logs over HTTP using the d3.js library. port is the port that will be serving the HTTP website. port_range specifies a range of ports that will be used to listen for connections from other Dagger workers. seek_store, if specified, is a Tables.jl-compatible object that logs will be written to and read from. This table can be written to disk and then re-read later for offline log analysis.

source
DaggerWebDash.TableStorageType
TableStorage

LogWindow-compatible aggregator which stores logs in a Tables.jl-compatible sink.

Using a TableStorage is reasonably simple:

ml = TimespanLogging.MultiEventLog()
+Types · Dagger.jl

DaggerWebDash Types

Logging Event Types

DaggerWebDash.D3RendererType
D3Renderer(port::Int, port_range::UnitRange; seek_store=nothing) -> D3Renderer

Constructs a D3Renderer, which is a TimespanLogging aggregator which renders the logs over HTTP using the d3.js library. port is the port that will be serving the HTTP website. port_range specifies a range of ports that will be used to listen for connections from other Dagger workers. seek_store, if specified, is a Tables.jl-compatible object that logs will be written to and read from. This table can be written to disk and then re-read later for offline log analysis.

source
DaggerWebDash.TableStorageType
TableStorage

LogWindow-compatible aggregator which stores logs in a Tables.jl-compatible sink.

Using a TableStorage is reasonably simple:

ml = TimespanLogging.MultiEventLog()
 
 ... # Add some events
 
@@ -15,4 +15,4 @@
 ml.aggregators[:lw] = lw
 
 # Logs will now be saved into `df` automatically, and packages like
-# DaggerWebDash.jl will automatically use it to retrieve subsets of the logs.
source
+# DaggerWebDash.jl will automatically use it to retrieve subsets of the logs.
source
diff --git a/dev/api-timespanlogging/functions/index.html b/dev/api-timespanlogging/functions/index.html index 39b4765fc..321a84017 100644 --- a/dev/api-timespanlogging/functions/index.html +++ b/dev/api-timespanlogging/functions/index.html @@ -1,2 +1,2 @@ -Functions and Macros · Dagger.jl

TimespanLogging Functions

Basic Functions

TimespanLogging.timespan_startFunction
timespan_start(ctx, category::Symbol, id, tl)

Generates an Event{:start} which denotes the start of an event. The event is categorized by category, and uniquely identified by id; these two must be the same passed to timespan_finish to close the event. tl is the "timeline" of the event, which is just an arbitrary payload attached to the event.

source
TimespanLogging.timespan_finishFunction
timespan_finish(ctx, category::Symbol, id, tl)

Generates an Event{:finish} which denotes the end of an event. The event is categorized by category, and uniquely identified by id; these two must be the same as previously passed to timespan_start. tl is the "timeline" of the event, which is just an arbitrary payload attached to the event.

source
TimespanLogging.get_logs!Function
get_logs!(::LocalEventLog, raw=false; only_local=false) -> Union{Vector{Timespan},Vector{Event}}

Get the logs from each process' local event log, clearing it in the process. Set raw to true to get potentially unmatched Events; the default is to return only matched events as Timespans. If only_local is set true, only process-local logs will be fetched; the default is to fetch logs from all processes.

source

Logging Metric Functions

+Functions and Macros · Dagger.jl

TimespanLogging Functions

Basic Functions

TimespanLogging.timespan_startFunction
timespan_start(ctx, category::Symbol, id, tl)

Generates an Event{:start} which denotes the start of an event. The event is categorized by category, and uniquely identified by id; these two must be the same passed to timespan_finish to close the event. tl is the "timeline" of the event, which is just an arbitrary payload attached to the event.

source
TimespanLogging.timespan_finishFunction
timespan_finish(ctx, category::Symbol, id, tl)

Generates an Event{:finish} which denotes the end of an event. The event is categorized by category, and uniquely identified by id; these two must be the same as previously passed to timespan_start. tl is the "timeline" of the event, which is just an arbitrary payload attached to the event.

source
TimespanLogging.get_logs!Function
get_logs!(::LocalEventLog, raw=false; only_local=false) -> Union{Vector{Timespan},Vector{Event}}

Get the logs from each process' local event log, clearing it in the process. Set raw to true to get potentially unmatched Events; the default is to return only matched events as Timespans. If only_local is set true, only process-local logs will be fetched; the default is to fetch logs from all processes.

source

Logging Metric Functions

diff --git a/dev/api-timespanlogging/types/index.html b/dev/api-timespanlogging/types/index.html index f37385835..74ec49578 100644 --- a/dev/api-timespanlogging/types/index.html +++ b/dev/api-timespanlogging/types/index.html @@ -1,2 +1,2 @@ -Types · Dagger.jl

TimespanLogging Types

Log Sink Types

TimespanLogging.MultiEventLogType
MultiEventLog

Processes events immediately, generating multiple log streams. Multiple consumers may register themselves in the MultiEventLog, and when accessed, log events will be provided to all consumers. A consumer is simply a function or callable struct which will be called with an event when it's generated. The return value of the consumer will be pushed into a log stream dedicated to that consumer. Errors thrown by consumers will be caught and rendered, but will not otherwise interrupt consumption by other consumers, or future consumption cycles. An error will result in nothing being appended to that consumer's log.

source
TimespanLogging.LocalEventLogType
LocalEventLog

Stores events in a process-local array. Accessing the logs is all-or-nothing; if multiple consumers call get_logs!, they will get different sets of logs.

source

Event Types

Built-in Event Types

```

+Types · Dagger.jl

TimespanLogging Types

Log Sink Types

TimespanLogging.MultiEventLogType
MultiEventLog

Processes events immediately, generating multiple log streams. Multiple consumers may register themselves in the MultiEventLog, and when accessed, log events will be provided to all consumers. A consumer is simply a function or callable struct which will be called with an event when it's generated. The return value of the consumer will be pushed into a log stream dedicated to that consumer. Errors thrown by consumers will be caught and rendered, but will not otherwise interrupt consumption by other consumers, or future consumption cycles. An error will result in nothing being appended to that consumer's log.

source
TimespanLogging.LocalEventLogType
LocalEventLog

Stores events in a process-local array. Accessing the logs is all-or-nothing; if multiple consumers call get_logs!, they will get different sets of logs.

source

Event Types

Built-in Event Types

```

diff --git a/dev/benchmarking/index.html b/dev/benchmarking/index.html index cdd1d498c..a5e46d161 100644 --- a/dev/benchmarking/index.html +++ b/dev/benchmarking/index.html @@ -1,2 +1,2 @@ -Benchmarking · Dagger.jl

Benchmarking Dagger

For ease of benchmarking changes to Dagger's scheduler and the DArray, a benchmarking script exists at benchmarks/benchmark.jl. This script currently allows benchmarking a non-negative matrix factorization (NNMF) algorithm, which we've found to be a good evaluator of scheduling performance. The benchmark script can test with and without Dagger, and also has support for using CUDA or AMD GPUs to accelerate the NNMF via DaggerGPU.jl.

The script checks for a number of environment variables, which are used to control the benchmarks that are performed (all of which are optional):

  • BENCHMARK_PROCS: Selects the number of Julia processes and threads to start-up. Specified as 8:4, this option would start 8 extra Julia processes, with 4 threads each. Defaults to 2 processors and 1 thread each.
  • BENCHMARK_REMOTES: Specifies a colon-separated list of remote servers to connect to and start Julia processes on, using BENCHMARK_PROCS to indicate the processor/thread configuration of those remotes. Disabled by default (uses the local machine).
  • BENCHMARK_OUTPUT_FORMAT: Selects the output format for benchmark results. Defaults to jls, which uses Julia's Serialization stdlib, and can also be jld to use JLD.jl.
  • BENCHMARK_RENDER: Configures rendering, which is disabled by default. Can be "live" or "offline", which are explained below.
  • BENCHMARK: Specifies the set of benchmarks to run as a comma-separated list, where each entry can be one of cpu, cuda, or amdgpu, and may optionally append +dagger (like cuda+dagger) to indicate whether or not to use Dagger. Defaults to cpu,cpu+dagger, which runs CPU benchmarks with and without Dagger.
  • BENCHMARK_SCALE: Determines how much to scale the benchmark sizing by, typically specified as a UnitRange{Int}. Defaults to 1:5:50, which runs each scale from 1 to 50, in steps of 5.

Rendering with BENCHMARK_RENDER

Dagger contains visualization code for the scheduler (as a Gantt chart) and thunk execution profiling (flamechart), which can be enabled with BENCHMARK_RENDER. Additionally, rendering can be done "live", served via a Mux.jl webserver run locally, or "offline", where the visualization will be embedded into the results output file. By default, rendering is disabled. If BENCHMARK_RENDER is set to live, a Mux webserver is started at localhost:8000 (the address is not yet configurable), and the Gantt chart and profiling flamechart will be rendered once the benchmarks start. If set to offline, data visualization will happen in the background, and will be passed in the results file.

Note that Gantt chart and flamechart output is only generated and relevant during Dagger execution.

TODO: Plotting

+Benchmarking · Dagger.jl

Benchmarking Dagger

For ease of benchmarking changes to Dagger's scheduler and the DArray, a benchmarking script exists at benchmarks/benchmark.jl. This script currently allows benchmarking a non-negative matrix factorization (NNMF) algorithm, which we've found to be a good evaluator of scheduling performance. The benchmark script can test with and without Dagger, and also has support for using CUDA or AMD GPUs to accelerate the NNMF via DaggerGPU.jl.

The script checks for a number of environment variables, which are used to control the benchmarks that are performed (all of which are optional):

  • BENCHMARK_PROCS: Selects the number of Julia processes and threads to start-up. Specified as 8:4, this option would start 8 extra Julia processes, with 4 threads each. Defaults to 2 processors and 1 thread each.
  • BENCHMARK_REMOTES: Specifies a colon-separated list of remote servers to connect to and start Julia processes on, using BENCHMARK_PROCS to indicate the processor/thread configuration of those remotes. Disabled by default (uses the local machine).
  • BENCHMARK_OUTPUT_FORMAT: Selects the output format for benchmark results. Defaults to jls, which uses Julia's Serialization stdlib, and can also be jld to use JLD.jl.
  • BENCHMARK_RENDER: Configures rendering, which is disabled by default. Can be "live" or "offline", which are explained below.
  • BENCHMARK: Specifies the set of benchmarks to run as a comma-separated list, where each entry can be one of cpu, cuda, or amdgpu, and may optionally append +dagger (like cuda+dagger) to indicate whether or not to use Dagger. Defaults to cpu,cpu+dagger, which runs CPU benchmarks with and without Dagger.
  • BENCHMARK_SCALE: Determines how much to scale the benchmark sizing by, typically specified as a UnitRange{Int}. Defaults to 1:5:50, which runs each scale from 1 to 50, in steps of 5.

Rendering with BENCHMARK_RENDER

Dagger contains visualization code for the scheduler (as a Gantt chart) and thunk execution profiling (flamechart), which can be enabled with BENCHMARK_RENDER. Additionally, rendering can be done "live", served via a Mux.jl webserver run locally, or "offline", where the visualization will be embedded into the results output file. By default, rendering is disabled. If BENCHMARK_RENDER is set to live, a Mux webserver is started at localhost:8000 (the address is not yet configurable), and the Gantt chart and profiling flamechart will be rendered once the benchmarks start. If set to offline, data visualization will happen in the background, and will be passed in the results file.

Note that Gantt chart and flamechart output is only generated and relevant during Dagger execution.

TODO: Plotting

diff --git a/dev/checkpointing/index.html b/dev/checkpointing/index.html index 9ca9e52eb..263e13d76 100644 --- a/dev/checkpointing/index.html +++ b/dev/checkpointing/index.html @@ -27,4 +27,4 @@ open("checkpoint-final.bin", "r") do io Dagger.tochunk(deserialize(io)) end -end))

In this case, the entire computation will be skipped if checkpoint-final.bin exists!

+end))

In this case, the entire computation will be skipped if checkpoint-final.bin exists!

diff --git a/dev/darray/index.html b/dev/darray/index.html index f1544b108..a5cee2423 100644 --- a/dev/darray/index.html +++ b/dev/darray/index.html @@ -99,4 +99,4 @@ 0.1046 3.65967 1.62098 5.33185 0.0822769 3.30334 5.90173 4.06603 5.00789 4.40601 1.9622 0.755491 2.12264 1.67299 2.34482 4.50632 3.84387 3.22232 5.23164 2.97735 4.37208 5.15253 0.346373 2.98573 5.48589 0.336134 2.25751 2.39057 1.97975 3.24243 - 3.83293 1.69017 3.00189 1.80388 3.43671 5.94085 1.27609 3.98737 0.334963 5.84865

A variety of other operations exist on the DArray, and it should generally behave otherwise similar to any other AbstractArray type. If you find that it's missing an operation that you need, please file an issue!

Known Supported Operations

This list is not exhaustive, but documents operations which are known to work well with the DArray:

From Base:

From Statistics:

From LinearAlgebra:

+ 3.83293 1.69017 3.00189 1.80388 3.43671 5.94085 1.27609 3.98737 0.334963 5.84865

A variety of other operations exist on the DArray, and it should generally behave otherwise similar to any other AbstractArray type. If you find that it's missing an operation that you need, please file an issue!

Known Supported Operations

This list is not exhaustive, but documents operations which are known to work well with the DArray:

From Base:

From Statistics:

From LinearAlgebra:

diff --git a/dev/data-management/index.html b/dev/data-management/index.html index a667c6f45..519b75507 100644 --- a/dev/data-management/index.html +++ b/dev/data-management/index.html @@ -12,4 +12,4 @@ wait.([Dagger.@spawn Threads.atomic_add!(cs, 1) for i in 1:1000]) # And let's fetch the total sum of all counters: -@assert sum(map(ctr->fetch(ctr)[], cs)) == 1000

Note that map, when used on a shard, will execute the provided function once per shard "piece", and each result is considered immutable. map is an easy way to make a copy of each piece of the shard, to be later reduced, scanned, etc.

Further details about what arguments can be passed to @shard/shard can be found in Data Management Functions.

+@assert sum(map(ctr->fetch(ctr)[], cs)) == 1000

Note that map, when used on a shard, will execute the provided function once per shard "piece", and each result is considered immutable. map is an easy way to make a copy of each piece of the shard, to be later reduced, scanned, etc.

Further details about what arguments can be passed to @shard/shard can be found in Data Management Functions.

diff --git a/dev/datadeps/index.html b/dev/datadeps/index.html index 3e4e8f7cb..3d8f88df9 100644 --- a/dev/datadeps/index.html +++ b/dev/datadeps/index.html @@ -29,4 +29,4 @@ As = [rand(1000) for _ in 1:1000] Bs = copy.(As) tree_reduce!(+, As) -@assert isapprox(As[1], reduce((x,y)->x .+ y, Bs))

In the above implementation of tree_reduce! (which is designed to perform an elementwise reduction across a vector of arrays), we have a tree reduction operation where pairs of arrays are reduced, starting with neighboring pairs, and then reducing pairs of reduction results, etc. until the final result is in As[1]. We can see that the application of Dagger to this algorithm is simple - only the single Base.mapreducedim! call is passed to Dagger - yet due to the data dependencies and the algorithm's structure, there should be plenty of parallelism to be exploited across each of the parallel reductions at each "level" of the reduction tree. Specifically, any two Dagger.@spawn calls which access completely different pairs of arrays can execute in parallel, while any call which has an In on an array will wait for any previous call which has an InOut on that same array.

Additionally, we can notice a powerful feature of this model - if the Dagger.@spawn macro is removed, the code still remains correct, but simply runs sequentially. This means that the structure of the program doesn't have to change in order to use Dagger for parallelization, which can make applying Dagger to existing algorithms quite effortless.

+@assert isapprox(As[1], reduce((x,y)->x .+ y, Bs))

In the above implementation of tree_reduce! (which is designed to perform an elementwise reduction across a vector of arrays), we have a tree reduction operation where pairs of arrays are reduced, starting with neighboring pairs, and then reducing pairs of reduction results, etc. until the final result is in As[1]. We can see that the application of Dagger to this algorithm is simple - only the single Base.mapreducedim! call is passed to Dagger - yet due to the data dependencies and the algorithm's structure, there should be plenty of parallelism to be exploited across each of the parallel reductions at each "level" of the reduction tree. Specifically, any two Dagger.@spawn calls which access completely different pairs of arrays can execute in parallel, while any call which has an In on an array will wait for any previous call which has an InOut on that same array.

Additionally, we can notice a powerful feature of this model - if the Dagger.@spawn macro is removed, the code still remains correct, but simply runs sequentially. This means that the structure of the program doesn't have to change in order to use Dagger for parallelization, which can make applying Dagger to existing algorithms quite effortless.

diff --git a/dev/dynamic/index.html b/dev/dynamic/index.html index 45516bd41..92234e7d1 100644 --- a/dev/dynamic/index.html +++ b/dev/dynamic/index.html @@ -9,4 +9,4 @@ y + 1 end return fetch(h, id) -end

Alternatively, Base.wait can be used when one does not wish to retrieve the returned value of the thunk.

Users with needs not covered by the built-in functions should use the Dagger.exec! function to pass a user-defined function, closure, or callable struct to the scheduler, along with a payload which will be provided to that function:

Dagger.exec!

Note that all functions called by Dagger.exec! take the scheduler's internal lock, so it's safe to manipulate the internal ComputeState object within the user-provided function.

+end

Alternatively, Base.wait can be used when one does not wish to retrieve the returned value of the thunk.

Users with needs not covered by the built-in functions should use the Dagger.exec! function to pass a user-defined function, closure, or callable struct to the scheduler, along with a payload which will be provided to that function:

Dagger.exec!

Note that all functions called by Dagger.exec! take the scheduler's internal lock, so it's safe to manipulate the internal ComputeState object within the user-provided function.

diff --git a/dev/index.html b/dev/index.html index c5dd6e0e5..e5a5d6191 100644 --- a/dev/index.html +++ b/dev/index.html @@ -83,4 +83,4 @@ Dagger.@spawn copyto!(C, X) -# C = [4,5,6,7,1,2,3,9,8]

In contrast to the previous example, here, the tasks are executed without argument annotations. As a result, there is a possibility of the copyto! task being executed before the sort! task, leading to unexpected results in the output array C.

+# C = [4,5,6,7,1,2,3,9,8]

In contrast to the previous example, here, the tasks are executed without argument annotations. As a result, there is a possibility of the copyto! task being executed before the sort! task, leading to unexpected results in the output array C.

diff --git a/dev/logging-advanced/index.html b/dev/logging-advanced/index.html index af5090e4d..b2b4b3037 100644 --- a/dev/logging-advanced/index.html +++ b/dev/logging-advanced/index.html @@ -9,4 +9,4 @@ t1 = Dagger.@spawn 3*4 fetch(Dagger.@spawn 1+t1) log = Dagger.fetch_logs!(ctx)[1] # Get the logs for worker 1 -@show log[:bytes]
Note

TimespanLogging.get_logs! clears out the event logs, so that old events don't mix with new ones from future DAGs.

You'll then see that some number of bytes are allocated and then freed during the process of executing and completing those tasks.

There are a variety of other consumers built-in to TimespanLogging and Dagger, under the TimespanLogging.Events and Dagger.Events modules, respectively; see Dagger Types and TimespanLogging Types for details.

The MultiEventLog also has a mechanism to call a set of functions, called "aggregators", after all consumers have been executed, and are passed the full set of log streams as a Dict{Symbol,Vector{Any}}. The only one currently shipped with TimespanLogging directly is the LogWindow, and DaggerWebDash.jl has the TableStorage which integrates with it; see DaggerWebDash Types for details.

+@show log[:bytes]
Note

TimespanLogging.get_logs! clears out the event logs, so that old events don't mix with new ones from future DAGs.

You'll then see that some number of bytes are allocated and then freed during the process of executing and completing those tasks.

There are a variety of other consumers built-in to TimespanLogging and Dagger, under the TimespanLogging.Events and Dagger.Events modules, respectively; see Dagger Types and TimespanLogging Types for details.

The MultiEventLog also has a mechanism to call a set of functions, called "aggregators", after all consumers have been executed, and are passed the full set of log streams as a Dict{Symbol,Vector{Any}}. The only one currently shipped with TimespanLogging directly is the LogWindow, and DaggerWebDash.jl has the TableStorage which integrates with it; see DaggerWebDash Types for details.

diff --git a/dev/logging-visualization/index.html b/dev/logging-visualization/index.html index de43e0d77..1c3ca1a58 100644 --- a/dev/logging-visualization/index.html +++ b/dev/logging-visualization/index.html @@ -48,4 +48,4 @@ ml.aggregators[:d3r] = d3r ctx.log_sink = ml -# ... use `ctx`

Once the server has started, you can browse to http://localhost:8080/ (if running on your local machine) to view the plots in real time. The dashboard also provides options at the top of the page to control the drawing speed, enable and disable reading updates from the server (disabling freezes the display at the current instant), and a selector for which worker to look at. If the connection to the server is lost for any reason, the dashboard will attempt to reconnect at 5 second intervals. The dashboard can usually survive restarts of the server perfectly well, although refreshing the page is usually a good idea. Informational messages are also logged to the browser console for debugging.

+# ... use `ctx`

Once the server has started, you can browse to http://localhost:8080/ (if running on your local machine) to view the plots in real time. The dashboard also provides options at the top of the page to control the drawing speed, enable and disable reading updates from the server (disabling freezes the display at the current instant), and a selector for which worker to look at. If the connection to the server is lost for any reason, the dashboard will attempt to reconnect at 5 second intervals. The dashboard can usually survive restarts of the server perfectly well, although refreshing the page is usually a good idea. Informational messages are also logged to the browser console for debugging.

diff --git a/dev/logging/index.html b/dev/logging/index.html index 5fcd3c2ac..4a5af365b 100644 --- a/dev/logging/index.html +++ b/dev/logging/index.html @@ -1,2 +1,2 @@ -Logging: Basics · Dagger.jl

Logging and Graphing

Dagger's scheduler keeps track of the important and potentially expensive actions it does, such as moving data between workers or executing thunks, and tracks how much time and memory allocations these operations consume, among other things. It does it through the TimespanLogging.jl package (which used to be directly integrated into Dagger). Saving this information somewhere accessible is disabled by default, but it's quite easy to turn it on, through two mechanisms.

The first is Dagger.enable_logging!, which provides an easy-to-use interface to both enable and configure logging. The defaults are usually sufficient for most users, but can be tweaked with keyword arguments.

The second is done by setting a "log sink" in the Dagger Context being used, as ctx.log_sink. These log sinks drive how Dagger's logging behaves, and are configurable by the user, without the need to tweak any of Dagger's internal code.

A variety of log sinks are built-in to TimespanLogging; the NoOpLog is the default log sink when one isn't explicitly specified, and disables logging entirely (to minimize overhead). There are currently two other log sinks of interest; the first and newer of the two is the MultiEventLog, which generates multiple independent log streams, one per "consumer" (details in the next section). This is the log sink that enable_logging! uses, as it's easily the most flexible. The second and older sink is the LocalEventLog, which is explained later in this document. Most users are recommended to use the MultiEventLog (ideally via enable_logging!) since it's far more flexible and extensible, and is more performant in general.

Log sinks are explained in detail in Logging: Advanced; however, if using enable_logging!, everything is already configured for you. Then, all you need to do is call Dagger.fetch_logs!() to get the logs for all workers as a Dict. A variety of tools can operate on these logs, including visualization through show_logs and render_logs.

+Logging: Basics · Dagger.jl

Logging and Graphing

Dagger's scheduler keeps track of the important and potentially expensive actions it does, such as moving data between workers or executing thunks, and tracks how much time and memory allocations these operations consume, among other things. It does it through the TimespanLogging.jl package (which used to be directly integrated into Dagger). Saving this information somewhere accessible is disabled by default, but it's quite easy to turn it on, through two mechanisms.

The first is Dagger.enable_logging!, which provides an easy-to-use interface to both enable and configure logging. The defaults are usually sufficient for most users, but can be tweaked with keyword arguments.

The second is done by setting a "log sink" in the Dagger Context being used, as ctx.log_sink. These log sinks drive how Dagger's logging behaves, and are configurable by the user, without the need to tweak any of Dagger's internal code.

A variety of log sinks are built-in to TimespanLogging; the NoOpLog is the default log sink when one isn't explicitly specified, and disables logging entirely (to minimize overhead). There are currently two other log sinks of interest; the first and newer of the two is the MultiEventLog, which generates multiple independent log streams, one per "consumer" (details in the next section). This is the log sink that enable_logging! uses, as it's easily the most flexible. The second and older sink is the LocalEventLog, which is explained later in this document. Most users are recommended to use the MultiEventLog (ideally via enable_logging!) since it's far more flexible and extensible, and is more performant in general.

Log sinks are explained in detail in Logging: Advanced; however, if using enable_logging!, everything is already configured for you. Then, all you need to do is call Dagger.fetch_logs!() to get the logs for all workers as a Dict. A variety of tools can operate on these logs, including visualization through show_logs and render_logs.

diff --git a/dev/processors/index.html b/dev/processors/index.html index 2a783223d..9f74fa7a6 100644 --- a/dev/processors/index.html +++ b/dev/processors/index.html @@ -33,4 +33,4 @@ @show fetch(job) |> unique # and cleanup after ourselves... -workers() |> rmprocs +workers() |> rmprocs diff --git a/dev/propagation/index.html b/dev/propagation/index.html index 9673cd76d..c5d1330d9 100644 --- a/dev/propagation/index.html +++ b/dev/propagation/index.html @@ -15,4 +15,4 @@ # Or, if `scope` might not have been propagated as an option, we can give # it a default value: fetch(@async @assert Dagger.get_options(:scope, AnyScope()) == ProcessScope(2)) -end

This is a very powerful concept: with a single call to with_options, we can apply any set of options to any nested set of operations. This is great for isolating large workloads to different workers or processors, defining global checkpoint/restore behavior, and more.

+end

This is a very powerful concept: with a single call to with_options, we can apply any set of options to any nested set of operations. This is great for isolating large workloads to different workers or processors, defining global checkpoint/restore behavior, and more.

diff --git a/dev/scheduler-internals/index.html b/dev/scheduler-internals/index.html index e886bc377..e90892831 100644 --- a/dev/scheduler-internals/index.html +++ b/dev/scheduler-internals/index.html @@ -1,2 +1,2 @@ -Scheduler Internals · Dagger.jl

Scheduler Internals

Dagger's scheduler can be found primarily in the Dagger.Sch module. It performs a variety of functions to support tasks and data, and as such is a complex system. This documentation attempts to shed light on how the scheduler works internally (from a somewhat high level), with the hope that it will help users and contributors understand how to improve the scheduler or fix any bugs that may arise from it.

Warn

Dagger's scheduler is evolving at a rapid pace, and is a complex mix of interacting parts. As such, this documentation may become out of date very quickly, and may not reflect the current state of the scheduler. Please feel free to file PRs to correct or improve this document, but also beware that the true functionality is defined in Dagger's source!

Core vs. Worker Schedulers

Dagger's scheduler is really two kinds of entities: the "core" scheduler, and "worker" schedulers:

The core scheduler runs on worker 1, thread 1, and is the entrypoint to tasks which have been submitted. The core scheduler manages all task dependencies, notifies calls to wait and fetch of task completion, and generally performs initial task placement. The core scheduler has cached information about each worker and their processors, and uses that information (together with metrics about previous tasks and other aspects of the Dagger runtime) to generate a near-optimal just-in-time task schedule.

The worker schedulers each run as a set of tasks across all workers and all processors, and handles data movement and task execution. Once the core scheduler has scheduled and launched a task, it arrives at the worker scheduler for handling. The worker scheduler will pass the task to a queue for the assigned processor, where it will wait until the processor has a sufficient amount of "occupancy" for the task. Once the processor is ready for the task, it will first fetch all of the task's arguments from other workers, and then it will execute the task, package the task's result into a Chunk, and pass that back to the core scheduler.

Core: Basics

The core scheduler contains a single internal instance of type ComputeState, which maintains (among many other things) all necessary state to represent the set of waiting, ready, and running tasks, cached task results, and maps of interdependencies between tasks. It uses Julia's task infrastructure to asynchronously send work requests to remote Julia processes, and uses a RemoteChannel as an inbound queue for completed work.

There is an outer loop which drives the scheduler, which continues executing either eternally (excepting any internal scheduler errors or Julia exiting), or until all tasks in the graph have completed executing and the final task in the graph is ready to be returned to the user. This outer loop continuously performs two main operations: the first is to launch the execution of nodes which have become "ready" to execute; the second is to "finish" nodes which have been completed.

Core: Initialization

At the very beginning of a scheduler's lifecycle, a ComputeState object is allocated, workers are asynchronously initialized, and the outer loop is started. Additionally, the scheduler is passed one or more tasks to start scheduling, and so it will also fill out the ComputeState with the computed sets of dependencies between tasks, initially placing all tasks are placed in the "waiting" state. If any of the tasks are found to only have non-task input arguments, then they are considered ready to execute and moved from the "waiting" state to "ready".

Core: Outer Loop

At each outer loop iteration, all tasks in the "ready" state will be scheduled, moved into the "running" state, and asynchronously sent to the workers for execution (called "firing"). Once all tasks are either waiting or running, the scheduler may sleep until actions need to be performed

When fired tasks have completed executing, an entry will exist in the inbound queue signalling the task's result and other metadata. At this point, the most recently-queued task is removed from the queue, "finished", and placed in the "finished" state. Finishing usually unlocks downstream tasks from the waiting state and allows them to transition to the ready state.

Core: Task Scheduling

Once one or more tasks are ready to be scheduled, the scheduler will begin assigning them to the processors within each available worker. This is a sequential operation consisting of:

  • Selecting candidate processors based on the task's combined scope
  • Calculating the cost to move needed data to each candidate processor
  • Adding a "wait time" cost proportional to the estimated run time for all the tasks currently executing on each candidate processor
  • Selecting the least costly candidate processor as the executor for this task

After these operations have been performed for each task, the tasks will be fired off to their appropriate worker for handling.

Worker: Task Execution

Once a worker receives one or more tasks to be executed, the tasks are immediately enqueued into the appropriate processor's queue, and the processors are notified that work is available to be executed. The processors will asynchronously look at their queues and pick the task with the lowest occupancy first; a task with zero occupancy will always be executed immediately, but most tasks have non-zero occupancy, and so will be executed in order of increasing occupancy (effectively prioritizing asynchronous tasks like I/O).

Before a task begins executions, the processor will collect the task's arguments from other workers as needed, and convert them as needed to execute correctly according to the processor's semantics. This operation is called a "move".

Once a task's arguments have been moved, the task's function will be called with the arguments, and assuming the task doesn't throw an error, the result will be wrapped in a Chunk object. This Chunk will then be sent back to the core scheduler along with information about which task generated it. If the task does throw an error, then the error is instead propagated to the core scheduler, along with a flag indicating that the task failed.

Worker: Workload Balancing

In general, Dagger's core scheduler tries to balance workloads as much as possible across all the available processors, but it can fail to do so effectively when either its cached knowledge of each worker's status is outdated, or when its estimates about the task's behavior are inaccurate. To minimize the possibility of workload imbalance, the worker schedulers' processors will attempt to steal tasks from each other when they are under-occupied. Tasks will only be stolen if the task's scope is compatible with the processor attempting the steal, so tasks with wider scopes have better balancing potential.

Core: Finishing

Finishing a task which has completed executing is generally a simple set of operations:

  • The task's result is registered in the ComputeState for any tasks or user code which will need it
  • Any unneeded data is cleared from the scheduler (such as preserved Chunk arguments)
  • Downstream dependencies will be moved from "waiting" to "ready" if this task was the last upstream dependency to them

Core: Shutdown

If the core scheduler needs to shutdown due to an error or Julia exiting, then all workers will be shutdown, and the scheduler will close any open channels. If shutdown was due to an error, then an error will be printed or thrown back to the caller.

+Scheduler Internals · Dagger.jl

Scheduler Internals

Dagger's scheduler can be found primarily in the Dagger.Sch module. It performs a variety of functions to support tasks and data, and as such is a complex system. This documentation attempts to shed light on how the scheduler works internally (from a somewhat high level), with the hope that it will help users and contributors understand how to improve the scheduler or fix any bugs that may arise from it.

Warn

Dagger's scheduler is evolving at a rapid pace, and is a complex mix of interacting parts. As such, this documentation may become out of date very quickly, and may not reflect the current state of the scheduler. Please feel free to file PRs to correct or improve this document, but also beware that the true functionality is defined in Dagger's source!

Core vs. Worker Schedulers

Dagger's scheduler is really two kinds of entities: the "core" scheduler, and "worker" schedulers:

The core scheduler runs on worker 1, thread 1, and is the entrypoint to tasks which have been submitted. The core scheduler manages all task dependencies, notifies calls to wait and fetch of task completion, and generally performs initial task placement. The core scheduler has cached information about each worker and their processors, and uses that information (together with metrics about previous tasks and other aspects of the Dagger runtime) to generate a near-optimal just-in-time task schedule.

The worker schedulers each run as a set of tasks across all workers and all processors, and handles data movement and task execution. Once the core scheduler has scheduled and launched a task, it arrives at the worker scheduler for handling. The worker scheduler will pass the task to a queue for the assigned processor, where it will wait until the processor has a sufficient amount of "occupancy" for the task. Once the processor is ready for the task, it will first fetch all of the task's arguments from other workers, and then it will execute the task, package the task's result into a Chunk, and pass that back to the core scheduler.

Core: Basics

The core scheduler contains a single internal instance of type ComputeState, which maintains (among many other things) all necessary state to represent the set of waiting, ready, and running tasks, cached task results, and maps of interdependencies between tasks. It uses Julia's task infrastructure to asynchronously send work requests to remote Julia processes, and uses a RemoteChannel as an inbound queue for completed work.

There is an outer loop which drives the scheduler, which continues executing either eternally (excepting any internal scheduler errors or Julia exiting), or until all tasks in the graph have completed executing and the final task in the graph is ready to be returned to the user. This outer loop continuously performs two main operations: the first is to launch the execution of nodes which have become "ready" to execute; the second is to "finish" nodes which have been completed.

Core: Initialization

At the very beginning of a scheduler's lifecycle, a ComputeState object is allocated, workers are asynchronously initialized, and the outer loop is started. Additionally, the scheduler is passed one or more tasks to start scheduling, and so it will also fill out the ComputeState with the computed sets of dependencies between tasks, initially placing all tasks are placed in the "waiting" state. If any of the tasks are found to only have non-task input arguments, then they are considered ready to execute and moved from the "waiting" state to "ready".

Core: Outer Loop

At each outer loop iteration, all tasks in the "ready" state will be scheduled, moved into the "running" state, and asynchronously sent to the workers for execution (called "firing"). Once all tasks are either waiting or running, the scheduler may sleep until actions need to be performed

When fired tasks have completed executing, an entry will exist in the inbound queue signalling the task's result and other metadata. At this point, the most recently-queued task is removed from the queue, "finished", and placed in the "finished" state. Finishing usually unlocks downstream tasks from the waiting state and allows them to transition to the ready state.

Core: Task Scheduling

Once one or more tasks are ready to be scheduled, the scheduler will begin assigning them to the processors within each available worker. This is a sequential operation consisting of:

  • Selecting candidate processors based on the task's combined scope
  • Calculating the cost to move needed data to each candidate processor
  • Adding a "wait time" cost proportional to the estimated run time for all the tasks currently executing on each candidate processor
  • Selecting the least costly candidate processor as the executor for this task

After these operations have been performed for each task, the tasks will be fired off to their appropriate worker for handling.

Worker: Task Execution

Once a worker receives one or more tasks to be executed, the tasks are immediately enqueued into the appropriate processor's queue, and the processors are notified that work is available to be executed. The processors will asynchronously look at their queues and pick the task with the lowest occupancy first; a task with zero occupancy will always be executed immediately, but most tasks have non-zero occupancy, and so will be executed in order of increasing occupancy (effectively prioritizing asynchronous tasks like I/O).

Before a task begins executions, the processor will collect the task's arguments from other workers as needed, and convert them as needed to execute correctly according to the processor's semantics. This operation is called a "move".

Once a task's arguments have been moved, the task's function will be called with the arguments, and assuming the task doesn't throw an error, the result will be wrapped in a Chunk object. This Chunk will then be sent back to the core scheduler along with information about which task generated it. If the task does throw an error, then the error is instead propagated to the core scheduler, along with a flag indicating that the task failed.

Worker: Workload Balancing

In general, Dagger's core scheduler tries to balance workloads as much as possible across all the available processors, but it can fail to do so effectively when either its cached knowledge of each worker's status is outdated, or when its estimates about the task's behavior are inaccurate. To minimize the possibility of workload imbalance, the worker schedulers' processors will attempt to steal tasks from each other when they are under-occupied. Tasks will only be stolen if the task's scope is compatible with the processor attempting the steal, so tasks with wider scopes have better balancing potential.

Core: Finishing

Finishing a task which has completed executing is generally a simple set of operations:

  • The task's result is registered in the ComputeState for any tasks or user code which will need it
  • Any unneeded data is cleared from the scheduler (such as preserved Chunk arguments)
  • Downstream dependencies will be moved from "waiting" to "ready" if this task was the last upstream dependency to them

Core: Shutdown

If the core scheduler needs to shutdown due to an error or Julia exiting, then all workers will be shutdown, and the scheduler will close any open channels. If shutdown was due to an error, then an error will be printed or thrown back to the caller.

diff --git a/dev/scopes/index.html b/dev/scopes/index.html index 3b74aa394..2ee2c3cfe 100644 --- a/dev/scopes/index.html +++ b/dev/scopes/index.html @@ -54,4 +54,4 @@ d2 = Dagger.@spawn generate(ps2) # Run on process 2 d3 = Dagger.@spawn generate(ps3) # Run on process 3 -res = Dagger.@spawn d2 * d3 # An error!

Moral of the story: only use scopes when you know you really need them, and if you aren't careful to arrange everything just right, be prepared for Dagger to refuse to schedule your tasks! Scopes should only be used to ensure correctness of your programs, and are not intended to be used to optimize the schedule that Dagger uses for your tasks, since restricting the scope of execution for tasks will necessarily reduce the optimizations that Dagger's scheduler can perform.

+res = Dagger.@spawn d2 * d3 # An error!

Moral of the story: only use scopes when you know you really need them, and if you aren't careful to arrange everything just right, be prepared for Dagger to refuse to schedule your tasks! Scopes should only be used to ensure correctness of your programs, and are not intended to be used to optimize the schedule that Dagger uses for your tasks, since restricting the scope of execution for tasks will necessarily reduce the optimizations that Dagger's scheduler can perform.

diff --git a/dev/task-queues/index.html b/dev/task-queues/index.html index 16bb771e1..e9c67081a 100644 --- a/dev/task-queues/index.html +++ b/dev/task-queues/index.html @@ -23,4 +23,4 @@ Dagger.@spawn vcopy!(B2, A) end Dagger.@spawn vadd!(C, B1, B2) -end)

Conveniently, Dagger's task queues can be nested to get the expected behavior; the above example will submit the two vcopy! tasks as a group (and they can execute concurrently), while still ensuring that those two tasks finish before the vadd! task executes.

Warn

Task queues do not propagate to nested tasks; if a Dagger task launches another task internally, the child task doesn't inherit the task queue that the parent task was enqueued in.

+end)

Conveniently, Dagger's task queues can be nested to get the expected behavior; the above example will submit the two vcopy! tasks as a group (and they can execute concurrently), while still ensuring that those two tasks finish before the vadd! task executes.

Warn

Task queues do not propagate to nested tasks; if a Dagger task launches another task internally, the child task doesn't inherit the task queue that the parent task was enqueued in.

diff --git a/dev/task-spawning/index.html b/dev/task-spawning/index.html index 6f9d9d0df..b415e3772 100644 --- a/dev/task-spawning/index.html +++ b/dev/task-spawning/index.html @@ -51,4 +51,4 @@ Dagger.@spawn single=1 1+2 Dagger.spawn(+, Dagger.Options(;single=1), 1, 2) -delayed(+; single=1)(1, 2) +delayed(+; single=1)(1, 2) diff --git a/dev/use-cases/parallel-nested-loops/index.html b/dev/use-cases/parallel-nested-loops/index.html index c61add6b0..c7625575e 100644 --- a/dev/use-cases/parallel-nested-loops/index.html +++ b/dev/use-cases/parallel-nested-loops/index.html @@ -32,4 +32,4 @@ res.z = fetch.(res.z) res.σ = fetch.(res.σ) res -end

In this code we have job interdependence. Firstly, we are calculating the standard deviation σ and than we are using that value in the function f. Since Dagger.@spawn yields an EagerThunk rather than actual values, we need to use the fetch function to obtain those values. In this example, the value fetching is perfomed once all computations are completed (note that @sync preceding the loop forces the loop to wait for all jobs to complete). Also, note that contrary to the previous example, we do not need to implement locking as we are just pushing the EagerThunk results of Dagger.@spawn serially into the DataFrame (which is fast since Dagger.@spawn doesn't block).

The above use case scenario has been tested by running julia -t 8 (or with JULIA_NUM_THREADS=8 as environment variable). The Threads.@threads code takes 1.8 seconds to run, while the Dagger code, which is also one line shorter, runs around 0.9 seconds, resulting in a 2x speedup.

+end

In this code we have job interdependence. Firstly, we are calculating the standard deviation σ and than we are using that value in the function f. Since Dagger.@spawn yields an EagerThunk rather than actual values, we need to use the fetch function to obtain those values. In this example, the value fetching is perfomed once all computations are completed (note that @sync preceding the loop forces the loop to wait for all jobs to complete). Also, note that contrary to the previous example, we do not need to implement locking as we are just pushing the EagerThunk results of Dagger.@spawn serially into the DataFrame (which is fast since Dagger.@spawn doesn't block).

The above use case scenario has been tested by running julia -t 8 (or with JULIA_NUM_THREADS=8 as environment variable). The Threads.@threads code takes 1.8 seconds to run, while the Dagger code, which is also one line shorter, runs around 0.9 seconds, resulting in a 2x speedup.