Skip to content

Switched to rent an array in Giraffe handler and use PooledList in AsyncVal #465

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactored AsyncVal to use PooledResizeArray under the hood
  • Loading branch information
xperiandri committed Mar 24, 2024
commit 255db1f8b9a3131fae412d6cd867e32bd3af6c6d
Original file line number Diff line number Diff line change
@@ -82,7 +82,7 @@ type GraphQLWebSocketMiddleware<'Root>
let receiveMessageViaSocket (cancellationToken : CancellationToken) (serializerOptions : JsonSerializerOptions) (socket : WebSocket) = taskResult {
let buffer = ArrayPool.Shared.Rent options.ReadBufferSize
try
let completeMessage = new PooledList<byte> ()
use completeMessage = new PooledResizeArray<byte> ()
let mutable segmentResponse : WebSocketReceiveResult = null
while (not cancellationToken.IsCancellationRequested)
&& socket |> isSocketOpen
6 changes: 2 additions & 4 deletions src/FSharp.Data.GraphQL.Server/Execution.fs
Original file line number Diff line number Diff line change
@@ -300,7 +300,7 @@ let deferResults path (res : ResolverResult<obj>) : IObservable<GQLDeferredRespo
| Error errs -> Observable.singleton <| DeferredErrors (null, errs, formattedPath)

/// Collect together an array of results using the appropriate execution strategy.
let collectFields (strategy : ExecutionStrategy) (rs : AsyncVal<ResolverResult<KeyValuePair<string, obj>>> []) : AsyncVal<ResolverResult<KeyValuePair<string, obj> []>> = asyncVal {
let collectFields (strategy : ExecutionStrategy) (rs : AsyncVal<ResolverResult<KeyValuePair<string, obj>>> seq) : AsyncVal<ResolverResult<KeyValuePair<string, obj> []>> = asyncVal {
let! collected =
match strategy with
| Parallel -> AsyncVal.collectParallel rs
@@ -354,8 +354,7 @@ let rec private direct (returnDef : OutputDef) (ctx : ResolveFieldContext) (path
| :? System.Collections.IEnumerable as enumerable ->
enumerable
|> Seq.cast<obj>
|> Seq.toArray
|> Array.mapi resolveItem
|> Seq.mapi resolveItem
|> collectFields Parallel
|> AsyncVal.map(ResolverResult.mapValue(fun items -> KeyValuePair(name, items |> Array.map(fun d -> d.Value) |> box)))
| _ -> raise <| GQLMessageException (ErrorMessages.expectedEnumerableValue ctx.ExecutionInfo.Identifier (value.GetType()))
@@ -543,7 +542,6 @@ and executeObjectFields (fields : ExecutionInfo list) (objName : string) (objDef
let! res =
fields
|> Seq.map executeField
|> Seq.toArray
|> collectFields Parallel
match res with
| Error errs -> return Error errs
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Collections.Pooled" />
<PackageReference Include="FSharp.Control.Reactive" />
<PackageReference Include="System.Reactive" />
</ItemGroup>
52 changes: 30 additions & 22 deletions src/FSharp.Data.GraphQL.Shared/AsyncVal.fs
Original file line number Diff line number Diff line change
@@ -130,49 +130,55 @@ module AsyncVal =
/// executed asynchronously, one by one with regard to their order in array.
/// Returned array maintain order of values.
/// If the array contains a Failure, then the entire array will not resolve
let collectSequential (values : AsyncVal<'T>[]) : AsyncVal<'T[]> =
if values.Length = 0 then Value [||]
elif values |> Array.exists isAsync then
let collectSequential (values : AsyncVal<'T> seq) : AsyncVal<'T[]> =
let values = new PooledResizeArray<_> (values)
let length = values.Count
if length = 0 then Value [||]
elif values.Exists isAsync then
Async (async {
let results = Array.zeroCreate values.Length
let exceptions = ResizeArray values.Length
for i = 0 to values.Length - 1 do
let results = Array.zeroCreate length
use exceptions = new PooledResizeArray<_> (length)
for i = 0 to length - 1 do
let v = values.[i]
match v with
| Value v -> results.[i] <- v
| Async a ->
let! r = a
results.[i] <- r
| Failure f -> exceptions.Add f
values.Dispose()
match exceptions.Count with
| 0 -> return results
| 1 -> return exceptions.First().Reraise ()
| _ -> return AggregateException exceptions |> raise
| _ -> return AggregateException (exceptions.AsReadOnly()) |> raise
})
else
let exceptions =
use values = values
use exceptions =
values
|> Array.choose (function
| Failure f -> Some f
| _ -> None)
match exceptions.Length with
| 0 -> Value (values |> Array.map (fun (Value v) -> v))
|> PooledResizeArray.vChoose (function
| Failure f -> ValueSome f
| _ -> ValueNone)
match exceptions.Count with
| 0 -> Value (values |> Seq.map (fun (Value v) -> v) |> Seq.toArray)
| 1 -> Failure (exceptions.First ())
| _ -> Failure (AggregateException exceptions)
| _ -> Failure (AggregateException (exceptions.AsReadOnly()))

/// Converts array of AsyncVals into AsyncVal with array results.
/// In case when are non-immediate values in provided array, they are
/// executed all in parallel, in unordered fashion. Order of values
/// inside returned array is maintained.
/// If the array contains a Failure, then the entire array will not resolve
let collectParallel (values : AsyncVal<'T>[]) : AsyncVal<'T[]> =
if values.Length = 0 then Value [||]
let collectParallel (values : AsyncVal<'T> seq) : AsyncVal<'T[]> =
use values = new PooledResizeArray<_> (values)
let length = values.Count
if length = 0 then Value [||]
else
let indexes = List<_> (0)
let continuations = List<_> (0)
let results = Array.zeroCreate values.Length
let exceptions = ResizeArray values.Length
for i = 0 to values.Length - 1 do
let indexes = new PooledResizeArray<_> (length)
let continuations = new PooledResizeArray<_> (length)
let results = Array.zeroCreate length
use exceptions = new PooledResizeArray<_> (length)
for i = 0 to length - 1 do
let value = values.[i]
match value with
| Value v -> results.[i] <- v
@@ -182,13 +188,15 @@ module AsyncVal =
| Failure f -> exceptions.Add f
match exceptions.Count with
| 1 -> AsyncVal.Failure (exceptions.First ())
| count when count > 1 -> AsyncVal.Failure (AggregateException exceptions)
| count when count > 1 -> AsyncVal.Failure (AggregateException (exceptions.AsReadOnly()))
| _ ->
if indexes.Count = 0 then Value (results)
else Async (async {
let! vals = continuations |> Async.Parallel
for i = 0 to indexes.Count - 1 do
results.[indexes.[i]] <- vals.[i]
indexes.Dispose()
continuations.Dispose()
return results
})

Original file line number Diff line number Diff line change
@@ -17,6 +17,9 @@
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>FSharp.Data.GraphQL.Server</_Parameter1>
</AssemblyAttribute>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>FSharp.Data.GraphQL.Server.AspNetCore</_Parameter1>
</AssemblyAttribute>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>FSharp.Data.GraphQL.Client</_Parameter1>
</AssemblyAttribute>
@@ -29,6 +32,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Collections.Pooled" />
<PackageReference Include="FParsec" />
<PackageReference Include="FSharp.SystemTextJson" />
<PackageReference Include="FsToolkit.ErrorHandling" />
@@ -44,6 +48,7 @@
<Compile Include="Helpers\Extensions.fs" />
<Compile Include="Helpers\Reflection.fs" />
<Compile Include="Helpers\MemoryCache.fs" />
<Compile Include="Helpers\PooledResizeArray.fs" />
<Compile Include="Errors.fs" />
<Compile Include="Exception.fs" />
<Compile Include="ValidationTypes.fs" />
36 changes: 36 additions & 0 deletions src/FSharp.Data.GraphQL.Shared/Helpers/PooledResizeArray.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
namespace FSharp.Data.GraphQL

open Collections.Pooled

type internal PooledResizeArray<'T> = PooledList<'T>

module internal PooledResizeArray =

let ofSeqWithCapacity capacity items =
let list = new PooledResizeArray<'T> (capacity = capacity)
list.AddRange (collection = items)
list

let exists f (list : PooledResizeArray<'T>) = list.Exists (f)

let length (list : PooledResizeArray<'T>) = list.Count

let vChoose f (list : PooledResizeArray<'T>) =
let res = new PooledResizeArray<_> (list.Count)

for i = 0 to length list - 1 do
match f list[i] with
| ValueNone -> ()
| ValueSome b -> res.Add (b)

res

let choose f (list : PooledResizeArray<'T>) =
let res = new PooledResizeArray<_> (list.Count)

for i = 0 to length list - 1 do
match f list[i] with
| None -> ()
| Some b -> res.Add (b)

res