summaryrefslogtreecommitdiff
path: root/searx/network/client.py
diff options
context:
space:
mode:
authorAlexandre Flament <alex@al-f.net>2021-04-05 10:43:33 +0200
committerAlexandre Flament <alex@al-f.net>2021-04-12 17:25:56 +0200
commitd14994dc73ba5c95382812581dac146d9eceaafa (patch)
tree2f7720dbae8f1064fe479f986f0b198aff2beb99 /searx/network/client.py
parenteaa694fb7d0e47b943bc6d6edb6cb6a40ab2d85e (diff)
[httpx] replace searx.poolrequests by searx.network
settings.yml: * outgoing.networks: * can contains network definition * propertiers: enable_http, verify, http2, max_connections, max_keepalive_connections, keepalive_expiry, local_addresses, support_ipv4, support_ipv6, proxies, max_redirects, retries * retries: 0 by default, number of times searx retries to send the HTTP request (using different IP & proxy each time) * local_addresses can be "192.168.0.1/24" (it supports IPv6) * support_ipv4 & support_ipv6: both True by default see https://github.com/searx/searx/pull/1034 * each engine can define a "network" section: * either a full network description * either reference an existing network * all HTTP requests of engine use the same HTTP configuration (it was not the case before, see proxy configuration in master)
Diffstat (limited to 'searx/network/client.py')
-rw-r--r--searx/network/client.py214
1 files changed, 214 insertions, 0 deletions
diff --git a/searx/network/client.py b/searx/network/client.py
new file mode 100644
index 00000000..631e36f8
--- /dev/null
+++ b/searx/network/client.py
@@ -0,0 +1,214 @@
+# SPDX-License-Identifier: AGPL-3.0-or-later
+
+import asyncio
+import logging
+import threading
+
+import httpcore
+import httpx
+from httpx_socks import AsyncProxyTransport
+from python_socks import parse_proxy_url
+import python_socks._errors
+
+from searx import logger
+
+# Optional uvloop (support Python 3.6)
+try:
+ import uvloop
+except ImportError:
+ pass
+else:
+ uvloop.install()
+
+
+logger = logger.getChild('searx.http.client')
+LOOP = None
+TRANSPORT_KWARGS = {
+ 'backend': 'asyncio',
+ 'trust_env': False,
+}
+
+
+async def close_connections_for_url(connection_pool: httpcore.AsyncConnectionPool, url: httpcore._utils.URL):
+ origin = httpcore._utils.url_to_origin(url)
+ logger.debug('Drop connections for %r', origin)
+ connections_to_close = connection_pool._connections_for_origin(origin)
+ for connection in connections_to_close:
+ await connection_pool._remove_from_pool(connection)
+ try:
+ await connection.aclose()
+ except httpcore.NetworkError as e:
+ logger.warning('Error closing an existing connection', exc_info=e)
+
+
+class AsyncHTTPTransportNoHttp(httpcore.AsyncHTTPTransport):
+ """Block HTTP request"""
+
+ async def arequest(self, method, url, headers=None, stream=None, ext=None):
+ raise httpcore.UnsupportedProtocol("HTTP protocol is disabled")
+
+
+class AsyncProxyTransportFixed(AsyncProxyTransport):
+ """Fix httpx_socks.AsyncProxyTransport
+
+ Map python_socks exceptions to httpcore.ProxyError
+
+ Map socket.gaierror to httpcore.ConnectError
+
+ Note: keepalive_expiry is ignored, AsyncProxyTransport should call:
+ * self._keepalive_sweep()
+ * self._response_closed(self, connection)
+
+ Note: AsyncProxyTransport inherit from AsyncConnectionPool
+
+ Note: the API is going to change on httpx 0.18.0
+ see https://github.com/encode/httpx/pull/1522
+ """
+
+ async def arequest(self, method, url, headers=None, stream=None, ext=None):
+ retry = 2
+ while retry > 0:
+ retry -= 1
+ try:
+ return await super().arequest(method, url, headers, stream, ext)
+ except (python_socks._errors.ProxyConnectionError,
+ python_socks._errors.ProxyTimeoutError,
+ python_socks._errors.ProxyError) as e:
+ raise httpcore.ProxyError(e)
+ except OSError as e:
+ # socket.gaierror when DNS resolution fails
+ raise httpcore.NetworkError(e)
+ except httpcore.RemoteProtocolError as e:
+ # in case of httpcore.RemoteProtocolError: Server disconnected
+ await close_connections_for_url(self, url)
+ logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
+ # retry
+ except (httpcore.NetworkError, httpcore.ProtocolError) as e:
+ # httpcore.WriteError on HTTP/2 connection leaves a new opened stream
+ # then each new request creates a new stream and raise the same WriteError
+ await close_connections_for_url(self, url)
+ raise e
+
+
+class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport):
+ """Fix httpx.AsyncHTTPTransport"""
+
+ async def arequest(self, method, url, headers=None, stream=None, ext=None):
+ retry = 2
+ while retry > 0:
+ retry -= 1
+ try:
+ return await super().arequest(method, url, headers, stream, ext)
+ except OSError as e:
+ # socket.gaierror when DNS resolution fails
+ raise httpcore.ConnectError(e)
+ except httpcore.CloseError as e:
+ # httpcore.CloseError: [Errno 104] Connection reset by peer
+ # raised by _keepalive_sweep()
+ # from https://github.com/encode/httpcore/blob/4b662b5c42378a61e54d673b4c949420102379f5/httpcore/_backends/asyncio.py#L198 # noqa
+ await close_connections_for_url(self._pool, url)
+ logger.warning('httpcore.CloseError: retry', exc_info=e)
+ # retry
+ except httpcore.RemoteProtocolError as e:
+ # in case of httpcore.RemoteProtocolError: Server disconnected
+ await close_connections_for_url(self._pool, url)
+ logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
+ # retry
+ except (httpcore.ProtocolError, httpcore.NetworkError) as e:
+ await close_connections_for_url(self._pool, url)
+ raise e
+
+
+def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
+ global LOOP, TRANSPORT_KWARGS
+ # support socks5h (requests compatibility):
+ # https://requests.readthedocs.io/en/master/user/advanced/#socks
+ # socks5:// hostname is resolved on client side
+ # socks5h:// hostname is resolved on proxy side
+ rdns = False
+ socks5h = 'socks5h://'
+ if proxy_url.startswith(socks5h):
+ proxy_url = 'socks5://' + proxy_url[len(socks5h):]
+ rdns = True
+
+ proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
+
+ return AsyncProxyTransportFixed(proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port,
+ username=proxy_username, password=proxy_password,
+ rdns=rdns,
+ loop=LOOP,
+ verify=verify,
+ http2=http2,
+ local_address=local_address,
+ max_connections=limit.max_connections,
+ max_keepalive_connections=limit.max_keepalive_connections,
+ keepalive_expiry=limit.keepalive_expiry,
+ retries=retries,
+ **TRANSPORT_KWARGS)
+
+
+def get_transport(verify, http2, local_address, proxy_url, limit, retries):
+ return AsyncHTTPTransportFixed(verify=verify,
+ http2=http2,
+ local_address=local_address,
+ proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
+ limits=limit,
+ retries=retries,
+ **TRANSPORT_KWARGS)
+
+
+def iter_proxies(proxies):
+ # https://www.python-httpx.org/compatibility/#proxy-keys
+ if isinstance(proxies, str):
+ yield 'all://', proxies
+ elif isinstance(proxies, dict):
+ for pattern, proxy_url in proxies.items():
+ yield pattern, proxy_url
+
+
+def new_client(enable_http, verify, enable_http2,
+ max_connections, max_keepalive_connections, keepalive_expiry,
+ proxies, local_address, retries, max_redirects):
+ limit = httpx.Limits(max_connections=max_connections,
+ max_keepalive_connections=max_keepalive_connections,
+ keepalive_expiry=keepalive_expiry)
+ # See https://www.python-httpx.org/advanced/#routing
+ mounts = {}
+ for pattern, proxy_url in iter_proxies(proxies):
+ if not enable_http and (pattern == 'http' or pattern.startswith('http://')):
+ continue
+ if proxy_url.startswith('socks4://') \
+ or proxy_url.startswith('socks5://') \
+ or proxy_url.startswith('socks5h://'):
+ mounts[pattern] = get_transport_for_socks_proxy(verify, enable_http2, local_address, proxy_url, limit,
+ retries)
+ else:
+ mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries)
+
+ if not enable_http:
+ mounts['http://'] = AsyncHTTPTransportNoHttp()
+
+ transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
+ return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects)
+
+
+def init():
+ # log
+ for logger_name in ('hpack.hpack', 'hpack.table'):
+ logging.getLogger(logger_name).setLevel(logging.WARNING)
+
+ # loop
+ def loop_thread():
+ global LOOP
+ LOOP = asyncio.new_event_loop()
+ LOOP.run_forever()
+
+ th = threading.Thread(
+ target=loop_thread,
+ name='asyncio_loop',
+ daemon=True,
+ )
+ th.start()
+
+
+init()