Skip to content

Commit b448277

Browse files
committed
Ok all working again
1 parent 5505668 commit b448277

File tree

8 files changed

+104
-83
lines changed

8 files changed

+104
-83
lines changed

crates/neon/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ features = ["sync"]
4444
optional = true
4545

4646
[features]
47-
default = ["napi-8", "futures"]
47+
default = ["napi-8"]
4848

4949
# Enable extracting values by serializing to JSON
5050
serde = ["dep:serde", "dep:serde_json"]

crates/neon/src/async_local/mod.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,31 @@ use crate::sys;
1717
pub fn spawn_async_local<'a>(
1818
cx: &mut impl Context<'a>,
1919
future: impl Future<Output = ()> + 'static,
20-
) -> Result<(), ()> {
20+
) {
2121
// Add a future to the future pool to be executed
2222
// whenever the Nodejs event loop is free to do so
23-
LocalRuntime::queue_future(future).unwrap();
23+
LocalRuntime::queue_future(future);
2424

2525
// If there are tasks in flight then the executor
2626
// is already running and should be reused
2727
if LocalRuntime::futures_count() > 1 {
28-
return Ok(());
28+
return;
2929
}
3030

31-
// The futures executor runs on another thread and will
32-
// use a threadsafe function to call schedule work
33-
// on the JavaScript thread
31+
// The futures executor runs on the main thread thread but
32+
// the waker runs on another thread.
33+
//
34+
// The main thread executor will run the contained futures
35+
// and as soon as they stall (e.g. waiting for a channel, timer, etc),
36+
// the executor will immediately yield back to the JavaScript event loop.
37+
//
38+
// This "parks" the executer, which normally means the thread
39+
// is block - however we cannot do that here so instead, there
40+
// is a sacrificial "waker" thread who's only job is to sleep/wake and
41+
// signal to Nodejs that futures need to be run.
42+
//
43+
// The waker thread notifies the main thread of pending work by
44+
// running the futures executor within a threadsafe function
3445
let env_raw = cx.env().to_raw();
3546

3647
LocalWaker::send(WakerEvent::Init(unsafe {
@@ -44,6 +55,4 @@ pub fn spawn_async_local<'a>(
4455
}
4556
})
4657
}));
47-
48-
Ok(())
4958
}

crates/neon/src/async_local/runtime.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use super::executor::LocalPool;
55
use super::executor::LocalSpawner;
66
use super::executor::ThreadNotifyRef;
77
use futures::task::LocalSpawnExt;
8-
use futures::task::SpawnError;
98
use once_cell::unsync::Lazy;
109

1110
thread_local! {
@@ -21,12 +20,12 @@ impl LocalRuntime {
2120
Self::count()
2221
}
2322

24-
pub fn queue_future(future: impl Future<Output = ()> + 'static) -> Result<(), SpawnError> {
23+
pub fn queue_future(future: impl Future<Output = ()> + 'static) {
2524
Self::increment();
2625
SPAWNER.with(move |ls| ls.spawn_local(async move {
2726
future.await;
2827
Self::decrement();
29-
}))
28+
})).expect("Unable to spawn future on local pool");
3029
}
3130

3231
pub fn run_until_stalled(thread_notify: ThreadNotifyRef) -> bool {

crates/neon/src/async_local/waker.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,20 @@ pub enum WakerEvent {
2222
}
2323

2424
/// The futures waker that coordinates with the futures executor to notify
25-
/// the main thread to pause and resume execution of futures.
25+
/// the main thread to resume execution of futures.
2626
///
2727
/// The waker is implemented as a dedicated system thread which is parked
28-
/// by the local futures executor while waiting for futures to resume work.
28+
/// by the local futures executor. Futures (like channel, timers) will
29+
/// call the wake() method Futures Waker trait.
2930
///
30-
/// Once woken up, the waker resumes execution of futures on the JavaScript
31-
/// thread by triggering a napi threadsafe function to poll the futures in
32-
/// the local pool until no more progress can be made before yielding back
33-
/// to the Nodejs event loop.
31+
/// This gives it some level of portability - for instance any utilities
32+
/// from the "async_std" crate will work however most things from Tokio
33+
/// won't work.
3434
///
35-
/// This allows for the execution of Rust futures to integrate with the
36-
/// Nodejs event loop without blocking either
35+
/// Once woken up, the waker resumes execution of futures on the JavaScript
36+
/// thread by triggering a napi threadsafe function which executes a callback
37+
/// that runs on the main JavaScript thread. This callback is used to poll
38+
/// the futures in the local pool.
3739
pub struct LocalWaker;
3840

3941
impl LocalWaker {
@@ -46,7 +48,6 @@ impl LocalWaker {
4648
fn start_waker_thread() -> Sender<WakerEvent> {
4749
let (tx, rx) = channel();
4850

49-
// Dedicated waker thread to use for waiting on pending futures
5051
thread::spawn(move || {
5152
let thread_notify = ThreadNotify::new();
5253
let mut handle = None::<WakerInit>;

crates/neon/src/context/mod.rs

Lines changed: 43 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,10 @@ use std::{convert::Into, marker::PhantomData, panic::UnwindSafe};
145145

146146
pub use crate::types::buffer::lock::Lock;
147147

148-
// #[cfg(feature = "async_local")]
149-
// use crate::async_local::{root::RootGlobal, spawn_async_local};
150148
#[cfg(feature = "async_local")]
151149
use futures::Future;
150+
#[cfg(feature = "async_local")]
151+
use crate::handle::StaticHandle;
152152

153153
use crate::{
154154
event::TaskBuilder,
@@ -290,22 +290,22 @@ pub trait Context<'a>: ContextInternal<'a> {
290290
result
291291
}
292292

293+
/// Execute a future on the local JavaScript thread. This does not block JavaScript execution.
294+
///
295+
/// Note: Avoid doing heavy computation on the main thread. The intended use case for this is
296+
/// waiting on channel receivers for data coming from other threads, waiting on timers and
297+
/// handling async behaviors from JavaScript.
293298
#[cfg(feature = "async_local")]
294-
fn execute_async_local<F, Fut>(&mut self, f: F)
299+
fn spawn_local<F, Fut>(&mut self, f: F)
295300
where
296301
Fut: Future<Output = ()>,
297302
F: FnOnce(AsyncContext) -> Fut + 'static,
298303
{
299-
use futures::Future;
300-
301304
let env = self.env();
302-
303305
crate::async_local::spawn_async_local(self, async move {
304-
// let scope = unsafe { HandleScope::new(env.to_raw()) };
305306
let future = f(AsyncContext { env });
306307
future.await;
307-
// drop(scope);
308-
}).unwrap();
308+
});
309309
}
310310

311311
/// Executes a computation in a new memory management scope and computes a single result value that outlives the computation.
@@ -623,29 +623,31 @@ impl<'a> ModuleContext<'a> {
623623
F: Fn(AsyncFunctionContext) -> Fut + 'static + Copy,
624624
V: Value,
625625
{
626-
// let wrapper = JsFunction::new(self, move |mut cx| {
627-
// let mut args = vec![];
628-
629-
// while let Some(arg) = cx.argument_opt(args.len()) {
630-
// let arg = arg.as_value(&mut cx);
631-
// let arg = RootGlobal::new(&mut cx, arg);
632-
// args.push(arg);
633-
// }
634-
635-
// let (deferred, promise) = cx.promise();
636-
// cx.execute_async_local(move |mut cx| async move {
637-
// let acx = AsyncFunctionContext {
638-
// env: cx.env(),
639-
// arguments: args,
640-
// };
641-
// deferred.resolve(&mut cx, f(acx).await.unwrap());
642-
// ()
643-
// });
644-
645-
// Ok(promise)
646-
// })?;
647-
648-
// self.exports.clone().set(self, key, wrapper)?;
626+
use crate::handle::StaticHandle;
627+
628+
let wrapper = JsFunction::new(self, move |mut cx| {
629+
let mut args = vec![];
630+
631+
while let Some(arg) = cx.argument_opt(args.len()) {
632+
let arg = arg.as_value(&mut cx);
633+
let arg = StaticHandle::new(&mut cx, arg)?;
634+
args.push(arg);
635+
}
636+
637+
let (deferred, promise) = cx.promise();
638+
cx.spawn_local(move |mut cx| async move {
639+
let acx = AsyncFunctionContext {
640+
env: cx.env(),
641+
arguments: args,
642+
};
643+
deferred.resolve(&mut cx, f(acx).await.unwrap());
644+
()
645+
});
646+
647+
Ok(promise)
648+
})?;
649+
650+
self.exports.clone().set(self, key, wrapper)?;
649651
Ok(())
650652
}
651653

@@ -851,16 +853,17 @@ impl<'a> Context<'a> for FunctionContext<'a> {}
851853
#[cfg(feature = "async_local")]
852854
pub struct AsyncFunctionContext {
853855
env: Env,
854-
// arguments: Vec<RootGlobal>,
856+
arguments: Vec<StaticHandle<JsValue>>,
855857
}
856858

857859
#[cfg(feature = "async_local")]
858860
impl<'a> AsyncFunctionContext {
859861
pub fn argument<V: Value>(&mut self, i: usize) -> JsResult<'a, V> {
860-
// let arg = self.arguments.get(i).unwrap().clone();
861-
// let handle = arg.into_inner(self);
862-
// Ok(handle)
863-
todo!()
862+
let arg = self.arguments.get(i).unwrap().clone();
863+
let arg = arg.from_static(self)?;
864+
let value = unsafe { V::from_local(self.env(), arg.to_local()) };
865+
let handle = Handle::new_internal(value);
866+
Ok(handle)
864867
}
865868
}
866869

@@ -877,9 +880,9 @@ impl<'a> Context<'a> for AsyncFunctionContext {}
877880
#[cfg(feature = "async_local")]
878881
impl Drop for AsyncFunctionContext {
879882
fn drop(&mut self) {
880-
// while let Some(arg) = self.arguments.pop() {
881-
// arg.remove(self);
882-
// }
883+
while let Some(arg) = self.arguments.pop() {
884+
arg.drop(self).unwrap();
885+
}
883886
}
884887
}
885888

crates/neon/src/handle/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,11 @@ impl<'a, V: Value + 'a> Handle<'a, V> {
9696
}
9797
}
9898

99-
pub fn root_global(self, cx: &mut impl Context<'a>) -> NeonResult<RootGlobal<V>> {
100-
RootGlobal::new(cx, self)
99+
/// Detaches the value from the Nodejs garbage collector
100+
/// and manages the variable lifetime via reference counting.
101+
/// Useful when interacting with a value within async closures
102+
pub fn to_static(self, cx: &mut impl Context<'a>) -> NeonResult<StaticHandle<V>> {
103+
StaticHandle::new(cx, self)
101104
}
102105
}
103106

crates/neon/src/handle/root_value.rs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,32 +12,36 @@ use crate::result::JsResult;
1212
use crate::result::NeonResult;
1313
use crate::types::JsFunction;
1414
use crate::types::JsObject;
15+
use crate::types::JsSymbol;
16+
17+
// This creates a rooted object and stores javascript
18+
// values on it as a way to grant any JavaScript value
19+
// a static lifetime
1520

1621
thread_local! {
17-
// Symbol("__neon_cache")
1822
static NEON_CACHE: OnceCell<Root<JsObject>> = OnceCell::default();
1923
}
2024

2125
/// Reference counted JavaScript value with a static lifetime for use in async closures
22-
pub struct RootGlobal<T> {
26+
pub struct StaticHandle<T> {
2327
pub(crate) count: Rc<RefCell<u32>>,
24-
pub(crate) inner: Rc<String>,
28+
pub(crate) inner: Rc<Root<JsSymbol>>,
2529
_p: PhantomData<T>,
2630
}
2731

28-
impl<T: Value> RootGlobal<T> {
32+
impl<T: Value> StaticHandle<T> {
2933
pub(crate) fn new<'a>(
3034
cx: &mut impl Context<'a>,
3135
value: Handle<'a, T>,
32-
) -> NeonResult<RootGlobal<T>> {
36+
) -> NeonResult<StaticHandle<T>> {
3337
Ok(Self {
3438
count: Rc::new(RefCell::new(1)),
3539
inner: Rc::new(set_ref(cx, value)?),
3640
_p: Default::default(),
3741
})
3842
}
3943

40-
pub fn clone<'a>(&self) -> RootGlobal<T> {
44+
pub fn clone(&self) -> StaticHandle<T> {
4145
let mut count = self.count.borrow_mut();
4246
*count += 1;
4347
drop(count);
@@ -49,16 +53,16 @@ impl<T: Value> RootGlobal<T> {
4953
}
5054
}
5155

52-
pub fn into_inner<'a>(&self, cx: &mut impl Context<'a>) -> JsResult<'a, T> {
53-
get_ref(cx, &*self.inner)
56+
pub fn from_static<'a>(&self, cx: &mut impl Context<'a>) -> JsResult<'a, T> {
57+
get_ref(cx, &self.inner)
5458
}
5559

5660
pub fn drop<'a>(&self, cx: &mut impl Context<'a>) -> NeonResult<()> {
5761
let mut count = self.count.borrow_mut();
5862
*count -= 1;
5963

6064
if *count == 0 {
61-
delete_ref(cx, &*self.inner)?
65+
delete_ref(cx, &self.inner)?
6266
}
6367

6468
Ok(())
@@ -81,41 +85,43 @@ fn get_cache<'a>(cx: &mut impl Context<'a>) -> JsResult<'a, JsObject> {
8185
Ok(neon_cache.into_inner(cx))
8286
}
8387

84-
fn set_ref<'a, V: Value>(cx: &mut impl Context<'a>, value: Handle<'a, V>) -> NeonResult<String> {
88+
fn set_ref<'a, V: Value>(
89+
cx: &mut impl Context<'a>,
90+
value: Handle<'a, V>,
91+
) -> NeonResult<Root<JsSymbol>> {
8592
let neon_cache = get_cache(cx)?;
86-
// Is this safe?
87-
let key = format!("{:?}", value.to_local());
93+
let symbol = cx.symbol(format!("{:?}", value.to_local())).root(cx);
8894

8995
get_cache(cx)?
9096
.get::<JsFunction, _, _>(cx, "set")?
9197
.call_with(cx)
9298
.this(neon_cache)
93-
.arg(cx.string(&key))
99+
.arg(symbol.clone(cx).into_inner(cx))
94100
.arg(value)
95101
.exec(cx)?;
96102

97-
Ok(key)
103+
Ok(symbol)
98104
}
99105

100-
fn get_ref<'a, V: Value>(cx: &mut impl Context<'a>, key: &str) -> JsResult<'a, V> {
106+
fn get_ref<'a, V: Value>(cx: &mut impl Context<'a>, key: &Root<JsSymbol>) -> JsResult<'a, V> {
101107
let neon_cache = get_cache(cx)?;
102108

103109
get_cache(cx)?
104110
.get::<JsFunction, _, _>(cx, "get")?
105111
.call_with(cx)
106112
.this(neon_cache)
107-
.arg(cx.string(&key))
113+
.arg(key.clone(cx).into_inner(cx))
108114
.apply(cx)
109115
}
110116

111-
fn delete_ref<'a>(cx: &mut impl Context<'a>, key: &str) -> NeonResult<()> {
117+
fn delete_ref<'a>(cx: &mut impl Context<'a>, key: &Root<JsSymbol>) -> NeonResult<()> {
112118
let neon_cache = get_cache(cx)?;
113119

114120
get_cache(cx)?
115121
.get::<JsFunction, _, _>(cx, "delete")?
116122
.call_with(cx)
117123
.this(neon_cache)
118-
.arg(cx.string(&key))
124+
.arg(key.clone(cx).into_inner(cx))
119125
.exec(cx)?;
120126

121127
Ok(())

crates/neon/src/prelude.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
#[doc(no_inline)]
44
pub use crate::{
55
context::{
6-
CallKind, ComputeContext, Context, ExecuteContext, FunctionContext, ModuleContext,
7-
TaskContext,
6+
AsyncContext, AsyncFunctionContext, CallKind, ComputeContext, Context, ExecuteContext,
7+
FunctionContext, ModuleContext, TaskContext,
88
},
99
handle::{Handle, Root},
1010
object::Object,

0 commit comments

Comments
 (0)