22
22
get_options ,
23
23
set_clipboard_data_types ,
24
24
)
25
+ from .typing_compat import WindowType
25
26
from .utils import log_error
26
27
27
28
READ_RESPONSE_CHUNK_SIZE = 4096
@@ -200,7 +201,7 @@ def encode_mime(x: str) -> str:
200
201
201
202
202
203
def decode_metadata_value (k : str , x : str ) -> str :
203
- if k == 'mime' :
204
+ if k in ( 'mime' , 'name' , 'pw' ) :
204
205
import base64
205
206
x = base64 .standard_b64decode (x ).decode ('utf-8' )
206
207
return x
@@ -211,6 +212,8 @@ class ReadRequest(NamedTuple):
211
212
mime_types : tuple [str , ...] = ('text/plain' ,)
212
213
id : str = ''
213
214
protocol_type : ProtocolType = ProtocolType .osc_52
215
+ human_name : str = ''
216
+ password : str = ''
214
217
215
218
def encode_response (self , status : str = 'DATA' , mime : str = '' , payload : bytes | memoryview = b'' ) -> bytes :
216
219
ans = f'{ self .protocol_type .value } ;type=read:status={ status } '
@@ -242,9 +245,11 @@ class WriteRequest:
242
245
243
246
def __init__ (
244
247
self , is_primary_selection : bool = False , protocol_type : ProtocolType = ProtocolType .osc_52 , id : str = '' ,
245
- rollover_size : int = 16 * 1024 * 1024 , max_size : int = - 1 ,
248
+ rollover_size : int = 16 * 1024 * 1024 , max_size : int = - 1 , human_name : str = '' , password : str = '' ,
246
249
) -> None :
247
250
self .decoder = StreamingBase64Decoder ()
251
+ self .human_name = human_name
252
+ self .password = password
248
253
self .id = id
249
254
self .is_primary_selection = is_primary_selection
250
255
self .protocol_type = protocol_type
@@ -255,6 +260,8 @@ def __init__(
255
260
self .max_size = (get_options ().clipboard_max_size * 1024 * 1024 ) if max_size < 0 else max_size
256
261
self .aliases : dict [str , str ] = {}
257
262
self .committed = False
263
+ self .permission_pending = True
264
+ self .commit_pending = False
258
265
259
266
def encode_response (self , status : str = 'OK' ) -> bytes :
260
267
ans = f'{ self .protocol_type .value } ;type=write:status={ status } '
@@ -266,7 +273,11 @@ def encode_response(self, status: str = 'OK') -> bytes:
266
273
def commit (self ) -> None :
267
274
if self .committed :
268
275
return
276
+ if self .permission_pending :
277
+ self .commit_pending = True
278
+ return
269
279
self .committed = True
280
+ self .commit_pending = False
270
281
cp = get_boss ().primary_selection if self .is_primary_selection else get_boss ().clipboard
271
282
if cp .enabled :
272
283
for alias , src in self .aliases .items ():
@@ -316,13 +327,21 @@ def data_for(self, mime: str = 'text/plain', offset: int = 0, size: int = -1) ->
316
327
return self .tempfile .read (start + offset , size )
317
328
318
329
330
+ class GrantedPermission :
331
+
332
+ def __init__ (self , read : bool = False , write : bool = False ):
333
+ self .read , self .write = read , write
334
+ self .write_ban = self .read_ban = False
335
+
336
+
319
337
class ClipboardRequestManager :
320
338
321
339
def __init__ (self , window_id : int ) -> None :
322
340
self .window_id = window_id
323
341
self .currently_asking_permission_for : ReadRequest | None = None
324
342
self .in_flight_write_request : WriteRequest | None = None
325
343
self .osc52_in_flight_write_requests : dict [ClipboardType , WriteRequest ] = {}
344
+ self .granted_passwords : dict [str , GrantedPermission ] = {}
326
345
327
346
def parse_osc_5522 (self , data : memoryview ) -> None :
328
347
import base64
@@ -349,13 +368,15 @@ def parse_osc_5522(self, data: memoryview) -> None:
349
368
rr = ReadRequest (
350
369
is_primary_selection = m .get ('loc' , '' ) == 'primary' ,
351
370
mime_types = tuple (payload .decode ('utf-8' ).split ()),
352
- protocol_type = ProtocolType .osc_5522 , id = sanitize_id (m .get ('id' , '' ))
371
+ protocol_type = ProtocolType .osc_5522 , id = sanitize_id (m .get ('id' , '' )),
372
+ human_name = m .get ('name' , '' ), password = m .get ('pw' , '' ),
353
373
)
354
374
self .handle_read_request (rr )
355
375
elif typ == 'write' :
356
376
self .in_flight_write_request = WriteRequest (
357
377
is_primary_selection = m .get ('loc' , '' ) == 'primary' ,
358
- protocol_type = ProtocolType .osc_5522 , id = sanitize_id (m .get ('id' , '' ))
378
+ protocol_type = ProtocolType .osc_5522 , id = sanitize_id (m .get ('id' , '' )),
379
+ human_name = m .get ('name' , '' ), password = m .get ('pw' , '' ),
359
380
)
360
381
self .handle_write_request (self .in_flight_write_request )
361
382
elif typ == 'walias' :
@@ -385,11 +406,17 @@ def parse_osc_5522(self, data: memoryview) -> None:
385
406
self .in_flight_write_request = None
386
407
raise
387
408
else :
388
- wr .flush_base64_data ()
389
- wr .commit ()
390
- self .in_flight_write_request = None
391
- if w is not None :
392
- w .screen .send_escape_code_to_child (ESC_OSC , wr .encode_response (status = 'DONE' ))
409
+ self .commit_write_request (wr )
410
+
411
+ def commit_write_request (self , wr : WriteRequest , needs_flush : bool = True ) -> None :
412
+ if needs_flush :
413
+ wr .flush_base64_data ()
414
+ wr .commit ()
415
+ if wr .committed :
416
+ self .in_flight_write_request = None
417
+ w = get_boss ().window_id_map .get (self .window_id )
418
+ if w is not None :
419
+ w .screen .send_escape_code_to_child (ESC_OSC , wr .encode_response (status = 'DONE' ))
393
420
394
421
def parse_osc_52 (self , data : memoryview , is_partial : bool = False ) -> None :
395
422
idx = find_in_memoryview (data , ord (b';' ))
@@ -423,15 +450,78 @@ def handle_write_request(self, wr: WriteRequest) -> None:
423
450
self .fulfill_write_request (wr , allowed )
424
451
425
452
def fulfill_write_request (self , wr : WriteRequest , allowed : bool = True ) -> None :
453
+ wr .permission_pending = not allowed
426
454
if wr .protocol_type is ProtocolType .osc_52 :
427
455
self .fulfill_legacy_write_request (wr , allowed )
428
456
return
429
- w = get_boss ().window_id_map .get (self .window_id )
430
457
cp = get_boss ().primary_selection if wr .is_primary_selection else get_boss ().clipboard
431
- if not allowed or not cp .enabled :
458
+ w = get_boss ().window_id_map .get (self .window_id )
459
+ if w is None :
432
460
self .in_flight_write_request = None
433
- if w is not None :
434
- w .screen .send_escape_code_to_child (ESC_OSC , wr .encode_response (status = 'EPERM' if not allowed else 'ENOSYS' ))
461
+ return
462
+ if not cp .enabled :
463
+ self .in_flight_write_request = None
464
+ w .screen .send_escape_code_to_child (ESC_OSC , wr .encode_response (status = 'ENOSYS' ))
465
+ return
466
+ if not allowed :
467
+ if wr .password and wr .human_name :
468
+ if self .password_is_allowed_already (wr .password , for_write = True ):
469
+ wr .permission_pending = False
470
+ else :
471
+ wid = w .id
472
+ def callback (granted : bool ) -> None :
473
+ if wr is not self .in_flight_write_request :
474
+ return
475
+ if granted :
476
+ wr .permission_pending = False
477
+ if wr .commit_pending :
478
+ self .commit_write_request (wr , needs_flush = False )
479
+ else :
480
+ w = get_boss ().window_id_map .get (wid )
481
+ if w is not None :
482
+ w .screen .send_escape_code_to_child (ESC_OSC , wr .encode_response (status = 'EPERM' ))
483
+ self .in_flight_write_request = None
484
+
485
+ self .request_permission (w , wr .human_name , wr .password , callback , for_write = True )
486
+ else :
487
+ self .in_flight_write_request = None
488
+ w .screen .send_escape_code_to_child (ESC_OSC , wr .encode_response (status = 'EPERM' ))
489
+
490
+ def request_permission (self , window : WindowType , human_name : str , password : str , callback : Callable [[bool ], None ], for_write : bool = False ) -> None :
491
+ if (gp := self .granted_passwords .get (password )) and (gp .write_ban if for_write else gp .read_ban ):
492
+ callback (False )
493
+ return
494
+
495
+ def cb (q : str ) -> None :
496
+ p = self .granted_passwords .get (password )
497
+ if p is None :
498
+ p = self .granted_passwords [password ] = GrantedPermission ()
499
+ callback (q in ('a' , 'w' ))
500
+ match q :
501
+ case 'w' :
502
+ if for_write :
503
+ p .write = True
504
+ else :
505
+ p .read = True
506
+ case 'b' :
507
+ if for_write :
508
+ p .write = False
509
+ p .write_ban = True
510
+ else :
511
+ p .read = False
512
+ p .read_ban = True
513
+ if for_write :
514
+ msg = _ ('The program {0} running in this window wants to write to the system clipboard.' )
515
+ else :
516
+ msg = _ ('The program {0} running in this window wants to read from the system clipboard.' )
517
+ msg += '\n \n ' + ('If you choose "Always" similar requests from this program will be automatically allowed for the rest of this session.' )
518
+ msg += '\n \n ' + ('If you choose "Ban" similar requests from this program will be automatically dis-allowed for the rest of this session.' )
519
+ from kittens .tui .operations import styled
520
+ get_boss ().choose (msg .format (styled (human_name , fg = 'yellow' )), cb , 'a;green:Allow' , 'w;yellow:Always' , 'd;red:Deny' , 'b;red:Ban' ,
521
+ default = 'd' , window = window , title = _ ('A program wants to access the clipboard' ))
522
+
523
+ def password_is_allowed_already (self , password : str , for_write : bool = False ) -> bool :
524
+ return (q := self .granted_passwords .get (password )) is not None and (q .write if for_write else q .read )
435
525
436
526
def fulfill_legacy_write_request (self , wr : WriteRequest , allowed : bool = True ) -> None :
437
527
cp = get_boss ().primary_selection if wr .is_primary_selection else get_boss ().clipboard
@@ -517,11 +607,24 @@ def ask_to_read_clipboard(self, rr: ReadRequest) -> None:
517
607
w = get_boss ().window_id_map .get (self .window_id )
518
608
if w is not None :
519
609
self .currently_asking_permission_for = rr
520
- get_boss ().confirm (_ (
521
- 'A program running in this window wants to read from the system clipboard.'
522
- ' Allow it to do so, once?' ),
523
- self .handle_clipboard_confirmation , window = w ,
524
- )
610
+ if rr .password and rr .human_name :
611
+ if self .password_is_allowed_already (rr .password ):
612
+ self .handle_clipboard_confirmation (True )
613
+ return
614
+ if (p := self .granted_passwords .get (rr .password )) and p .read_ban :
615
+ self .handle_clipboard_confirmation (False )
616
+ return
617
+ self .request_permission (w , rr .human_name , rr .password , self .handle_clipboard_confirmation )
618
+ else :
619
+ if rr .human_name :
620
+ msg = _ (
621
+ 'The program {} running in this window wants to read from the system clipboard.'
622
+ ' Allow it to do so, once?' ).format (rr .human_name )
623
+ else :
624
+ msg = _ (
625
+ 'A program running in this window wants to read from the system clipboard.'
626
+ ' Allow it to do so, once?' )
627
+ get_boss ().confirm (msg , self .handle_clipboard_confirmation , window = w )
525
628
526
629
def handle_clipboard_confirmation (self , confirmed : bool ) -> None :
527
630
rr = self .currently_asking_permission_for
0 commit comments