Skip to content

Test cases for reverse proxy asserting all buffers are flushed before teardown #1495

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Ensure all internal and external work buffers are flushed
  • Loading branch information
abhinavsingh committed Oct 15, 2024
commit 6ab88c3db519bb18b2209a1a6844935c742b529b
28 changes: 24 additions & 4 deletions proxy/core/base/tcp_server.py
Original file line number Diff line number Diff line change
@@ -111,6 +111,9 @@ class BaseTcpServerHandler(Work[T]):
a. handle_data(data: memoryview) implementation
b. Optionally, also implement other Work method
e.g. initialize, is_inactive, shutdown
c. Optionally, override has_buffer method to avoid
shutting down the connection unless additional
buffers are also cleared up.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
@@ -135,6 +138,24 @@ def handle_data(self, data: memoryview) -> Optional[bool]:
"""Optionally return True to close client connection."""
pass # pragma: no cover

def has_external_buffer(self) -> bool:
"""BaseTcpServerHandler makes sure that Work.buffers are flushed before shutting down.
Example, HttpProtocolHandler uses BaseTcpServerHandler with TcpClientConnection as the
Work class. So, automagically, BaseTcpServerHandler implementation makes sure that
pending TcpClientConnection buffers are flushed to the clients before tearing down
the connection.
But, imagine reverse proxy scenario where ReverseProxy has also opened an upstream
TcpServerConnection object. ReverseProxy would also want any pending buffer for
upstream to flush out before tearing down the connection. For such scenarios,
you must override the has_buffer to incorporate upstream buffers in the logic.
"""
return False

def has_buffer(self) -> bool:
return self.work.has_buffer() or self.has_external_buffer()

async def get_events(self) -> SelectableEvents:
events = {}
# We always want to read from client
@@ -143,7 +164,7 @@ async def get_events(self) -> SelectableEvents:
events[self.work.connection.fileno()] = selectors.EVENT_READ
# If there is pending buffer for client
# also register for EVENT_WRITE events
if self.work.has_buffer():
if self.has_buffer():
if self.work.connection.fileno() in events:
events[self.work.connection.fileno()] |= selectors.EVENT_WRITE
else:
@@ -174,8 +195,7 @@ async def handle_writables(self, writables: Writables) -> bool:
'Flushing buffer to client {0}'.format(self.work.address),
)
self.work.flush(self.flags.max_sendbuf_size)
if self.must_flush_before_shutdown is True and \
not self.work.has_buffer():
if self.must_flush_before_shutdown is True and not self.has_buffer():
teardown = True
self.must_flush_before_shutdown = False
return teardown
@@ -214,7 +234,7 @@ async def handle_readables(self, readables: Readables) -> bool:
self.work.address,
),
)
if self.work.has_buffer():
if self.has_buffer():
logger.debug(
'Client {0} has pending buffer, will be flushed before shutting down'.format(
self.work.address,
10 changes: 6 additions & 4 deletions proxy/http/handler.py
Original file line number Diff line number Diff line change
@@ -70,8 +70,10 @@ def initialize(self) -> None:
)

def is_inactive(self) -> bool:
if not self.work.has_buffer() and \
self._connection_inactive_for() > self.flags.timeout:
if (
not self.work.has_buffer()
and self._connection_inactive_for() > self.flags.timeout
):
return True
return False

@@ -87,8 +89,8 @@ def shutdown(self) -> None:
if self.plugin:
self.plugin.on_client_connection_close()
logger.debug(
'Closing client connection %s has buffer %s' %
(self.work.address, self.work.has_buffer()),
"Closing client connection %s has buffer %s"
% (self.work.address, self.work.has_buffer()),
)
conn = self.work.connection
# Unwrap if wrapped before shutdown.
3 changes: 2 additions & 1 deletion proxy/plugin/cache/cache_responses.py
Original file line number Diff line number Diff line change
@@ -23,7 +23,8 @@

br_installed = False
try:
import brotli
import brotli # type: ignore[import-untyped]

br_installed = True
except ModuleNotFoundError:
pass
2 changes: 1 addition & 1 deletion proxy/socks/handler.py
Original file line number Diff line number Diff line change
@@ -25,4 +25,4 @@ def create(*args: Any) -> SocksClientConnection:
return SocksClientConnection(*args) # pragma: no cover

def handle_data(self, data: memoryview) -> Optional[bool]:
return super().handle_data(data) # pragma: no cover
pass # pragma: no cover