Skip to content

Commit 71eb3ae

Browse files
Implement FileClient2 progress callbacks (#354)
Supersedes #353 --------- Co-authored-by: chemicstry <[email protected]>
1 parent eda263e commit 71eb3ae

File tree

4 files changed

+73
-21
lines changed

4 files changed

+73
-21
lines changed

CHANGELOG.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@
33
Changelog
44
=========
55

6+
v1.23
7+
-----
8+
9+
- Add progress reporting to `pycyphal.application.file.FileClient2`
10+
(`#353 <https://github.com/OpenCyphal/pycyphal/pull/353>`_).
11+
612
v1.22
713
-----
814

pycyphal/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.22.1"
1+
__version__ = "1.23.0"

pycyphal/application/file.py

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,7 @@ class FileServer:
4343
The lifetime of this instance matches the lifetime of its node.
4444
"""
4545

46-
def __init__(
47-
self, node: pycyphal.application.Node, roots: typing.Iterable[typing.Union[str, pathlib.Path]]
48-
) -> None:
46+
def __init__(self, node: pycyphal.application.Node, roots: typing.Iterable[str | pathlib.Path]) -> None:
4947
"""
5048
:param node:
5149
The node instance to initialize the file server on.
@@ -85,7 +83,7 @@ def close() -> None:
8583
node.add_lifetime_hooks(start, close)
8684

8785
@property
88-
def roots(self) -> typing.List[pathlib.Path]:
86+
def roots(self) -> list[pathlib.Path]:
8987
"""
9088
File operations will be performed within these root directories.
9189
The first directory to match takes precedence.
@@ -94,7 +92,7 @@ def roots(self) -> typing.List[pathlib.Path]:
9492
"""
9593
return self._roots
9694

97-
def locate(self, p: typing.Union[pathlib.Path, str, Path]) -> typing.Tuple[pathlib.Path, pathlib.Path]:
95+
def locate(self, p: pathlib.Path | str | Path) -> tuple[pathlib.Path, pathlib.Path]:
9896
"""
9997
Iterate through :attr:`roots` until a root r is found such that ``r/p`` exists and return ``(r, p)``.
10098
Otherwise, return nonexistent ``(roots[0], p)``.
@@ -413,7 +411,7 @@ async def move(self, src: str, dst: str, overwrite: bool = False) -> int:
413411
assert isinstance(res, Modify.Response)
414412
return int(res.error.value)
415413

416-
async def read(self, path: str, offset: int = 0, size: typing.Optional[int] = None) -> typing.Union[int, bytes]:
414+
async def read(self, path: str, offset: int = 0, size: int | None = None) -> int | bytes:
417415
"""
418416
Proxy for ``uavcan.file.Read``.
419417
@@ -434,7 +432,7 @@ async def read(self, path: str, offset: int = 0, size: typing.Optional[int] = No
434432
data on success (empty if the offset is out of bounds or the file is empty).
435433
"""
436434

437-
async def once() -> typing.Union[int, bytes]:
435+
async def once() -> int | bytes:
438436
res = await self._call(Read, Read.Request(offset=offset, path=Path(path)))
439437
assert isinstance(res, Read.Response)
440438
if res.error.value != 0:
@@ -455,9 +453,7 @@ async def once() -> typing.Union[int, bytes]:
455453
offset += len(out)
456454
return data
457455

458-
async def write(
459-
self, path: str, data: typing.Union[memoryview, bytes], offset: int = 0, *, truncate: bool = True
460-
) -> int:
456+
async def write(self, path: str, data: memoryview | bytes, offset: int = 0, *, truncate: bool = True) -> int:
461457
"""
462458
Proxy for ``uavcan.file.Write``.
463459
@@ -479,7 +475,7 @@ async def write(
479475
:returns: See ``uavcan.file.Error``
480476
"""
481477

482-
async def once(d: typing.Union[memoryview, bytes]) -> int:
478+
async def once(d: memoryview | bytes) -> int:
483479
res = await self._call(
484480
Write,
485481
Write.Request(offset, path=Path(path), data=Unstructured(np.frombuffer(d, np.uint8))),
@@ -658,7 +654,13 @@ async def move(self, src: str, dst: str, overwrite: bool = False) -> None:
658654
assert isinstance(res, Modify.Response)
659655
_raise_on_error(res.error, f"{src}->{dst}")
660656

661-
async def read(self, path: str, offset: int = 0, size: typing.Optional[int] = None) -> bytes:
657+
async def read(
658+
self,
659+
path: str,
660+
offset: int = 0,
661+
size: int | None = None,
662+
progress: typing.Callable[[int, int | None], None] | None = None,
663+
) -> bytes:
662664
"""
663665
Proxy for ``uavcan.file.Read``.
664666
@@ -674,6 +676,10 @@ async def read(self, path: str, offset: int = 0, size: typing.Optional[int] = No
674676
If None (default), the entire file will be read (this may exhaust local memory).
675677
If zero, this call is a no-op.
676678
679+
:param progress:
680+
Optional callback function that receives (bytes_read, total_size)
681+
total_size will be None if size parameter is None
682+
677683
:raises OSError: If the read operation failed; see ``uavcan.file.Error``
678684
679685
:returns:
@@ -686,20 +692,26 @@ async def once() -> bytes:
686692
_raise_on_error(res.error, path)
687693
return bytes(res.data.value.tobytes())
688694

689-
if size is None:
690-
size = 2**64
691695
data = b""
692-
while len(data) < size:
696+
while len(data) < (size or 2**64):
693697
out = await once()
694698
assert isinstance(out, bytes)
695699
if not out:
696700
break
697701
data += out
698702
offset += len(out)
703+
if progress:
704+
progress(len(data), size)
699705
return data
700706

701707
async def write(
702-
self, path: str, data: typing.Union[memoryview, bytes], offset: int = 0, *, truncate: bool = True
708+
self,
709+
path: str,
710+
data: memoryview | bytes,
711+
offset: int = 0,
712+
*,
713+
truncate: bool = True,
714+
progress: typing.Callable[[int, int], None] | None = None,
703715
) -> None:
704716
"""
705717
Proxy for ``uavcan.file.Write``.
@@ -719,22 +731,30 @@ async def write(
719731
If True, the rest of the file after ``offset + len(data)`` will be truncated.
720732
This is done by sending an empty write request, as prescribed by the Specification.
721733
734+
:param progress:
735+
Optional callback function that receives (bytes_written, total_size)
736+
722737
:raises OSError: If the write operation failed; see ``uavcan.file.Error``
723738
"""
724739

725-
async def once(d: typing.Union[memoryview, bytes]) -> None:
740+
async def once(d: memoryview | bytes) -> None:
726741
res = await self._call(
727742
Write,
728743
Write.Request(offset, path=Path(path), data=Unstructured(np.frombuffer(d, np.uint8))),
729744
)
730745
assert isinstance(res, Write.Response)
731746
_raise_on_error(res.error, path)
732747

748+
total_size = len(data)
749+
bytes_written = 0
733750
limit = self.data_transfer_capacity
734751
while len(data) > 0:
735752
frag, data = data[:limit], data[limit:]
736753
await once(frag)
737754
offset += len(frag)
755+
bytes_written += len(frag)
756+
if progress:
757+
progress(bytes_written, total_size)
738758
if truncate:
739759
await once(b"")
740760

tests/application/file.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# This software is distributed under the terms of the MIT License.
33
# Author: Pavel Kirienko <[email protected]>
44

5+
import math
56
import sys
67
import shutil
78
import typing
@@ -13,8 +14,13 @@
1314
import pycyphal
1415

1516

17+
class ProgressTracker:
18+
def __init__(self) -> None:
19+
self.counter = 0
20+
21+
1622
@pytest.mark.asyncio
17-
async def _unittest_file(compiled: typing.List[pycyphal.dsdl.GeneratedPackageInfo]) -> None:
23+
async def _unittest_file(compiled: list[pycyphal.dsdl.GeneratedPackageInfo]) -> None:
1824
from pycyphal.application import make_node, NodeInfo
1925
from pycyphal.transport.udp import UDPTransport
2026
from pycyphal.application.file import FileClient, FileServer, Error
@@ -254,8 +260,28 @@ async def ls(path: str) -> typing.List[str]:
254260
assert e.value.errno == errno.ENOENT
255261

256262
# Write into empty file
257-
await cln.write("a/foo/x", bytes(range(200)) * 3)
258-
assert await cln.read("a/foo/x") == bytes(range(200)) * 3
263+
data = bytes(range(200)) * 3
264+
data_chunks = math.ceil(len(data) / cln.data_transfer_capacity)
265+
write_tracker = ProgressTracker()
266+
267+
def write_progress_cb(bytes_written: int, bytes_total: int) -> None:
268+
write_tracker.counter += 1
269+
assert bytes_total == len(data)
270+
assert bytes_written == min(write_tracker.counter * cln.data_transfer_capacity, len(data))
271+
272+
await cln.write("a/foo/x", data, progress=write_progress_cb)
273+
assert write_tracker.counter == data_chunks
274+
275+
read_tracker = ProgressTracker()
276+
277+
def read_progress_cb(bytes_read: int, bytes_total: int | None) -> None:
278+
read_tracker.counter += 1
279+
assert bytes_total is None
280+
assert bytes_read == min(read_tracker.counter * cln.data_transfer_capacity, len(data))
281+
282+
assert await cln.read("a/foo/x", progress=read_progress_cb) == data
283+
assert read_tracker.counter == data_chunks
284+
259285
assert (await cln.get_info("a/foo/x")).size == 600
260286

261287
# Truncation -- this write is shorter

0 commit comments

Comments
 (0)