From e359a09bb8e8393918a777bb9c6298dc0089c9c2 Mon Sep 17 00:00:00 2001 From: Xavier Montillet Date: Fri, 13 Dec 2024 16:24:53 +0100 Subject: [PATCH] Add `add_nonblocking`, `capacity` and `is_full` for streams --- lib_eio/stream.ml | 35 +++++++++++++++++++++++++++++++++++ lib_eio/stream.mli | 23 +++++++++++++++++++++++ tests/stream.md | 27 +++++++++++++++++++++++++++ 3 files changed, 85 insertions(+) diff --git a/lib_eio/stream.ml b/lib_eio/stream.ml index 83d6687a3..2f166a477 100644 --- a/lib_eio/stream.ml +++ b/lib_eio/stream.ml @@ -1,3 +1,5 @@ +type drop_priority = Newest | Oldest + module Locking = struct type 'a t = { mutex : Mutex.t; @@ -64,6 +66,26 @@ module Locking = struct ) ) + let add_nonblocking ~drop_priority t item = + Mutex.lock t.mutex; + match Waiters.wake_one t.readers item with + | `Ok -> Mutex.unlock t.mutex; None + | `Queue_empty -> + (* No-one is waiting for an item. Queue it. *) + if Queue.length t.items < t.capacity then ( + Queue.add item t.items; + Mutex.unlock t.mutex; + None + ) else ( + match drop_priority with + | Newest -> Mutex.unlock t.mutex; Some item + | Oldest -> + let dropped_item = Queue.take t.items in + Queue.add item t.items; + Mutex.unlock t.mutex; + Some dropped_item + ) + let take t = Mutex.lock t.mutex; match Queue.take_opt t.items with @@ -101,6 +123,8 @@ module Locking = struct let len = Queue.length t.items in Mutex.unlock t.mutex; len + + let capacity t = t.capacity let dump f t = Fmt.pf f "" (length t) t.capacity @@ -123,6 +147,11 @@ let take = function | Sync x -> Sync.take x |> Result.get_ok (* todo: allow closing streams *) | Locking x -> Locking.take x +let add_nonblocking ~drop_priority t v = + match t with + | Sync _ -> Some v + | Locking x -> Locking.add_nonblocking ~drop_priority x v + let take_nonblocking = function | Locking x -> Locking.take_nonblocking x | Sync x -> @@ -134,8 +163,14 @@ let length = function | Sync _ -> 0 | Locking x -> Locking.length x +let capacity = function + | Sync _ -> 0 + | Locking x -> Locking.capacity x + let is_empty t = (length t = 0) +let is_full t = (length t = capacity t) + let dump f = function | Sync x -> Sync.dump f x | Locking x -> Locking.dump f x diff --git a/lib_eio/stream.mli b/lib_eio/stream.mli index 6554cac1a..85b0d1e8f 100644 --- a/lib_eio/stream.mli +++ b/lib_eio/stream.mli @@ -33,6 +33,23 @@ val take : 'a t -> 'a If no items are available, it waits until one becomes available. *) +type drop_priority = Newest | Oldest + +val add_nonblocking : drop_priority: drop_priority -> 'a t -> 'a -> 'a option +(** [add_nonblocking ~drop_priority t item] is like [(add t item); None] except that + it returns [Some dropped_item] if the stream is full rather than waiting, where + [dropped_item] is [item] if [drop_priority = Newest], and the first element of the + stream if [drop_priority = Oldest]. + + In other words, if the stream is full then: + - [add_nonblocking ~drop_priority:Newest t item] is like [Some item]; and + - [add_nonblocking ~drop_priority:Oldest t item] is like + [let dropped_item = take t in add t item; Some dropped_item] + except that no other stream operation can happen (even in other threads) + between the [take] and the [add]. + + On streams of capacity [0], this always returns [Some item], even if a reader is waiting. *) + val take_nonblocking : 'a t -> 'a option (** [take_nonblocking t] is like [Some (take t)] except that it returns [None] if the stream is empty rather than waiting. @@ -43,8 +60,14 @@ val take_nonblocking : 'a t -> 'a option val length : 'a t -> int (** [length t] returns the number of items currently in [t]. *) +val capacity : 'a t -> int +(** [capacity t] returns the number of items [t] can hold without blocking writers. *) + val is_empty : 'a t -> bool (** [is_empty t] is [length t = 0]. *) +val is_full : 'a t -> bool +(** [is_full t] is [length t = capacity t]. *) + val dump : 'a t Fmt.t (** For debugging. *) diff --git a/tests/stream.md b/tests/stream.md index c5a035e3b..7459fb32b 100644 --- a/tests/stream.md +++ b/tests/stream.md @@ -20,6 +20,16 @@ let add t v = S.add t v; traceln "Added %d to stream" v +let add_nonblocking ~drop_priority t v = + traceln "Adding %d to stream" v; + match S.add_nonblocking ~drop_priority t v with + | None -> traceln "Added %d to stream" v + | Some d -> + match drop_priority, S.capacity t with + | Newest, _ | _, 0 -> assert (d = v); traceln "Dropped %i instead of adding it to stream" v + | Oldest, _ -> traceln "Dropped %i from stream and added %i to stream" d v + + let take t = traceln "Reading from stream"; traceln "Got %d from stream" (S.take t) @@ -320,6 +330,23 @@ Cancelling writing to a stream: - : unit = () ``` +Non-blocking add: + +```ocaml +# run @@ fun () -> + let t = S.create 1 in + add t 0; + add_nonblocking ~drop_priority:Newest t 1; + add_nonblocking ~drop_priority:Oldest t 2;; ++Adding 0 to stream ++Added 0 to stream ++Adding 1 to stream ++Dropped 1 instead of adding it to stream ++Adding 2 to stream ++Dropped 0 from stream and added 2 to stream +- : unit = () +``` + Non-blocking take: ```ocaml