first commit

This commit is contained in:
git
2025-07-20 13:25:51 +10:00
commit a2971879f0
294 changed files with 42788 additions and 0 deletions
+27
View File
@@ -0,0 +1,27 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
""".. _botdetection src:
X-Forwarded-For
===============
.. attention::
A correct setup of the HTTP request headers ``X-Forwarded-For`` and
``X-Real-IP`` is essential to be able to assign a request to an IP correctly:
- `NGINX RequestHeader`_
- `Apache RequestHeader`_
.. _NGINX RequestHeader:
https://docs.searxng.org/admin/installation-nginx.html#nginx-s-searxng-site
.. _Apache RequestHeader:
https://docs.searxng.org/admin/installation-apache.html#apache-s-searxng-site
.. autofunction:: searx.botdetection.get_real_ip
"""
from ._helpers import dump_request
from ._helpers import get_real_ip
from ._helpers import too_many_requests
+120
View File
@@ -0,0 +1,120 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
# pylint: disable=missing-module-docstring, invalid-name
from __future__ import annotations
from ipaddress import (
IPv4Network,
IPv6Network,
IPv4Address,
IPv6Address,
ip_network,
)
import flask
import werkzeug
from searx.tools import config
from searx import logger
logger = logger.getChild('botdetection')
def dump_request(request: flask.Request):
return (
request.path
+ " || X-Forwarded-For: %s" % request.headers.get('X-Forwarded-For')
+ " || X-Real-IP: %s" % request.headers.get('X-Real-IP')
+ " || form: %s" % request.form
+ " || Accept: %s" % request.headers.get('Accept')
+ " || Accept-Language: %s" % request.headers.get('Accept-Language')
+ " || Accept-Encoding: %s" % request.headers.get('Accept-Encoding')
+ " || Content-Type: %s" % request.headers.get('Content-Type')
+ " || Content-Length: %s" % request.headers.get('Content-Length')
+ " || Connection: %s" % request.headers.get('Connection')
+ " || User-Agent: %s" % request.headers.get('User-Agent')
)
def too_many_requests(network: IPv4Network | IPv6Network, log_msg: str) -> werkzeug.Response | None:
"""Returns a HTTP 429 response object and writes a ERROR message to the
'botdetection' logger. This function is used in part by the filter methods
to return the default ``Too Many Requests`` response.
"""
logger.debug("BLOCK %s: %s", network.compressed, log_msg)
return flask.make_response(('Too Many Requests', 429))
def get_network(real_ip: IPv4Address | IPv6Address, cfg: config.Config) -> IPv4Network | IPv6Network:
"""Returns the (client) network of whether the real_ip is part of."""
if real_ip.version == 6:
prefix = cfg['real_ip.ipv6_prefix']
else:
prefix = cfg['real_ip.ipv4_prefix']
network = ip_network(f"{real_ip}/{prefix}", strict=False)
# logger.debug("get_network(): %s", network.compressed)
return network
def get_real_ip(request: flask.Request) -> str:
"""Returns real IP of the request. Since not all proxies set all the HTTP
headers and incoming headers can be faked it may happen that the IP cannot
be determined correctly.
.. sidebar:: :py:obj:`flask.Request.remote_addr`
SearXNG uses Werkzeug's ProxyFix_ (with it default ``x_for=1``).
This function tries to get the remote IP in the order listed below,
additional some tests are done and if inconsistencies or errors are
detected, they are logged.
The remote IP of the request is taken from (first match):
- X-Forwarded-For_ header
- `X-real-IP header <https://github.com/searxng/searxng/issues/1237#issuecomment-1147564516>`__
- :py:obj:`flask.Request.remote_addr`
.. _ProxyFix:
https://werkzeug.palletsprojects.com/middleware/proxy_fix/
.. _X-Forwarded-For:
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
"""
forwarded_for = request.headers.get("X-Forwarded-For")
real_ip = request.headers.get('X-Real-IP')
remote_addr = request.remote_addr
# logger.debug(
# "X-Forwarded-For: %s || X-Real-IP: %s || request.remote_addr: %s", forwarded_for, real_ip, remote_addr
# )
if not forwarded_for:
logger.error("X-Forwarded-For header is not set!")
else:
from .limiter import get_cfg # pylint: disable=import-outside-toplevel, cyclic-import
forwarded_for = [x.strip() for x in forwarded_for.split(',')]
x_for: int = get_cfg()['real_ip.x_for'] # type: ignore
forwarded_for = forwarded_for[-min(len(forwarded_for), x_for)]
if not real_ip:
logger.error("X-Real-IP header is not set!")
if forwarded_for and real_ip and forwarded_for != real_ip:
logger.warning("IP from X-Real-IP (%s) is not equal to IP from X-Forwarded-For (%s)", real_ip, forwarded_for)
if forwarded_for and remote_addr and forwarded_for != remote_addr:
logger.warning(
"IP from WSGI environment (%s) is not equal to IP from X-Forwarded-For (%s)", remote_addr, forwarded_for
)
if real_ip and remote_addr and real_ip != remote_addr:
logger.warning("IP from WSGI environment (%s) is not equal to IP from X-Real-IP (%s)", remote_addr, real_ip)
request_ip = forwarded_for or real_ip or remote_addr or '0.0.0.0'
# logger.debug("get_real_ip() -> %s", request_ip)
return request_ip
+39
View File
@@ -0,0 +1,39 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
"""
Method ``http_accept``
----------------------
The ``http_accept`` method evaluates a request as the request of a bot if the
Accept_ header ..
- did not contain ``text/html``
.. _Accept:
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept
"""
# pylint: disable=unused-argument
from __future__ import annotations
from ipaddress import (
IPv4Network,
IPv6Network,
)
import flask
import werkzeug
from searx.tools import config
from ._helpers import too_many_requests
def filter_request(
network: IPv4Network | IPv6Network,
request: flask.Request,
cfg: config.Config,
) -> werkzeug.Response | None:
if 'text/html' not in request.accept_mimetypes:
return too_many_requests(network, "HTTP header Accept did not contain text/html")
return None
+41
View File
@@ -0,0 +1,41 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
"""
Method ``http_accept_encoding``
-------------------------------
The ``http_accept_encoding`` method evaluates a request as the request of a
bot if the Accept-Encoding_ header ..
- did not contain ``gzip`` AND ``deflate`` (if both values are missed)
- did not contain ``text/html``
.. _Accept-Encoding:
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept-Encoding
"""
# pylint: disable=unused-argument
from __future__ import annotations
from ipaddress import (
IPv4Network,
IPv6Network,
)
import flask
import werkzeug
from searx.tools import config
from ._helpers import too_many_requests
def filter_request(
network: IPv4Network | IPv6Network,
request: flask.Request,
cfg: config.Config,
) -> werkzeug.Response | None:
accept_list = [l.strip() for l in request.headers.get('Accept-Encoding', '').split(',')]
if not ('gzip' in accept_list or 'deflate' in accept_list):
return too_many_requests(network, "HTTP header Accept-Encoding did not contain gzip nor deflate")
return None
+35
View File
@@ -0,0 +1,35 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
"""
Method ``http_accept_language``
-------------------------------
The ``http_accept_language`` method evaluates a request as the request of a bot
if the Accept-Language_ header is unset.
.. _Accept-Language:
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent
"""
# pylint: disable=unused-argument
from __future__ import annotations
from ipaddress import (
IPv4Network,
IPv6Network,
)
import flask
import werkzeug
from searx.tools import config
from ._helpers import too_many_requests
def filter_request(
network: IPv4Network | IPv6Network,
request: flask.Request,
cfg: config.Config,
) -> werkzeug.Response | None:
if request.headers.get('Accept-Language', '').strip() == '':
return too_many_requests(network, "missing HTTP header Accept-Language")
return None
+37
View File
@@ -0,0 +1,37 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
"""
Method ``http_connection``
--------------------------
The ``http_connection`` method evaluates a request as the request of a bot if
the Connection_ header is set to ``close``.
.. _Connection:
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection
"""
# pylint: disable=unused-argument
from __future__ import annotations
from ipaddress import (
IPv4Network,
IPv6Network,
)
import flask
import werkzeug
from searx.tools import config
from ._helpers import too_many_requests
def filter_request(
network: IPv4Network | IPv6Network,
request: flask.Request,
cfg: config.Config,
) -> werkzeug.Response | None:
if request.headers.get('Connection', '').strip() == 'close':
return too_many_requests(network, "HTTP header 'Connection=close")
return None
+67
View File
@@ -0,0 +1,67 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
"""
Method ``http_user_agent``
--------------------------
The ``http_user_agent`` method evaluates a request as the request of a bot if
the User-Agent_ header is unset or matches the regular expression
:py:obj:`USER_AGENT`.
.. _User-Agent:
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent
"""
# pylint: disable=unused-argument
from __future__ import annotations
import re
from ipaddress import (
IPv4Network,
IPv6Network,
)
import flask
import werkzeug
from searx.tools import config
from ._helpers import too_many_requests
USER_AGENT = (
r'('
+ r'unknown'
+ r'|[Cc][Uu][Rr][Ll]|[wW]get|Scrapy|splash|JavaFX|FeedFetcher|python-requests|Go-http-client|Java|Jakarta|okhttp'
+ r'|HttpClient|Jersey|Python|libwww-perl|Ruby|SynHttpClient|UniversalFeedParser|Googlebot|GoogleImageProxy'
+ r'|bingbot|Baiduspider|yacybot|YandexMobileBot|YandexBot|Yahoo! Slurp|MJ12bot|AhrefsBot|archive.org_bot|msnbot'
+ r'|MJ12bot|SeznamBot|linkdexbot|Netvibes|SMTBot|zgrab|James BOT|Sogou|Abonti|Pixray|Spinn3r|SemrushBot|Exabot'
+ r'|ZmEu|BLEXBot|bitlybot'
# unmaintained Farside instances
+ r'|'
+ re.escape(r'Mozilla/5.0 (compatible; Farside/0.1.0; +https://farside.link)')
# other bots and client to block
+ '|.*PetalBot.*'
+ r')'
)
"""Regular expression that matches to User-Agent_ from known *bots*"""
_regexp = None
def regexp_user_agent():
global _regexp # pylint: disable=global-statement
if not _regexp:
_regexp = re.compile(USER_AGENT)
return _regexp
def filter_request(
network: IPv4Network | IPv6Network,
request: flask.Request,
cfg: config.Config,
) -> werkzeug.Response | None:
user_agent = request.headers.get('User-Agent', 'unknown')
if regexp_user_agent().match(user_agent):
return too_many_requests(network, f"bot detected, HTTP header User-Agent: {user_agent}")
return None
+148
View File
@@ -0,0 +1,148 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
""".. _botdetection.ip_limit:
Method ``ip_limit``
-------------------
The ``ip_limit`` method counts request from an IP in *sliding windows*. If
there are to many requests in a sliding window, the request is evaluated as a
bot request. This method requires a redis DB and needs a HTTP X-Forwarded-For_
header. To take privacy only the hash value of an IP is stored in the redis DB
and at least for a maximum of 10 minutes.
The :py:obj:`.link_token` method can be used to investigate whether a request is
*suspicious*. To activate the :py:obj:`.link_token` method in the
:py:obj:`.ip_limit` method add the following to your
``/etc/searxng/limiter.toml``:
.. code:: toml
[botdetection.ip_limit]
link_token = true
If the :py:obj:`.link_token` method is activated and a request is *suspicious*
the request rates are reduced:
- :py:obj:`BURST_MAX` -> :py:obj:`BURST_MAX_SUSPICIOUS`
- :py:obj:`LONG_MAX` -> :py:obj:`LONG_MAX_SUSPICIOUS`
To intercept bots that get their IPs from a range of IPs, there is a
:py:obj:`SUSPICIOUS_IP_WINDOW`. In this window the suspicious IPs are stored
for a longer time. IPs stored in this sliding window have a maximum of
:py:obj:`SUSPICIOUS_IP_MAX` accesses before they are blocked. As soon as the IP
makes a request that is not suspicious, the sliding window for this IP is
droped.
.. _X-Forwarded-For:
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
"""
from __future__ import annotations
from ipaddress import (
IPv4Network,
IPv6Network,
)
import flask
import werkzeug
from searx.tools import config
from searx import redisdb
from searx.redislib import incr_sliding_window, drop_counter
from . import link_token
from ._helpers import (
too_many_requests,
logger,
)
logger = logger.getChild('ip_limit')
BURST_WINDOW = 20
"""Time (sec) before sliding window for *burst* requests expires."""
BURST_MAX = 15
"""Maximum requests from one IP in the :py:obj:`BURST_WINDOW`"""
BURST_MAX_SUSPICIOUS = 2
"""Maximum of suspicious requests from one IP in the :py:obj:`BURST_WINDOW`"""
LONG_WINDOW = 600
"""Time (sec) before the longer sliding window expires."""
LONG_MAX = 150
"""Maximum requests from one IP in the :py:obj:`LONG_WINDOW`"""
LONG_MAX_SUSPICIOUS = 10
"""Maximum suspicious requests from one IP in the :py:obj:`LONG_WINDOW`"""
API_WONDOW = 3600
"""Time (sec) before sliding window for API requests (format != html) expires."""
API_MAX = 4
"""Maximum requests from one IP in the :py:obj:`API_WONDOW`"""
SUSPICIOUS_IP_WINDOW = 3600 * 24 * 30
"""Time (sec) before sliding window for one suspicious IP expires."""
SUSPICIOUS_IP_MAX = 3
"""Maximum requests from one suspicious IP in the :py:obj:`SUSPICIOUS_IP_WINDOW`."""
def filter_request(
network: IPv4Network | IPv6Network,
request: flask.Request,
cfg: config.Config,
) -> werkzeug.Response | None:
# pylint: disable=too-many-return-statements
redis_client = redisdb.client()
if network.is_link_local and not cfg['botdetection.ip_limit.filter_link_local']:
logger.debug("network %s is link-local -> not monitored by ip_limit method", network.compressed)
return None
if request.args.get('format', 'html') != 'html':
c = incr_sliding_window(redis_client, 'ip_limit.API_WONDOW:' + network.compressed, API_WONDOW)
if c > API_MAX:
return too_many_requests(network, "too many request in API_WINDOW")
if cfg['botdetection.ip_limit.link_token']:
suspicious = link_token.is_suspicious(network, request, True)
if not suspicious:
# this IP is no longer suspicious: release ip again / delete the counter of this IP
drop_counter(redis_client, 'ip_limit.SUSPICIOUS_IP_WINDOW' + network.compressed)
return None
# this IP is suspicious: count requests from this IP
c = incr_sliding_window(
redis_client, 'ip_limit.SUSPICIOUS_IP_WINDOW' + network.compressed, SUSPICIOUS_IP_WINDOW
)
if c > SUSPICIOUS_IP_MAX:
logger.error("BLOCK: too many request from %s in SUSPICIOUS_IP_WINDOW (redirect to /)", network)
return flask.redirect(flask.url_for('index'), code=302)
c = incr_sliding_window(redis_client, 'ip_limit.BURST_WINDOW' + network.compressed, BURST_WINDOW)
if c > BURST_MAX_SUSPICIOUS:
return too_many_requests(network, "too many request in BURST_WINDOW (BURST_MAX_SUSPICIOUS)")
c = incr_sliding_window(redis_client, 'ip_limit.LONG_WINDOW' + network.compressed, LONG_WINDOW)
if c > LONG_MAX_SUSPICIOUS:
return too_many_requests(network, "too many request in LONG_WINDOW (LONG_MAX_SUSPICIOUS)")
return None
# vanilla limiter without extensions counts BURST_MAX and LONG_MAX
c = incr_sliding_window(redis_client, 'ip_limit.BURST_WINDOW' + network.compressed, BURST_WINDOW)
if c > BURST_MAX:
return too_many_requests(network, "too many request in BURST_WINDOW (BURST_MAX)")
c = incr_sliding_window(redis_client, 'ip_limit.LONG_WINDOW' + network.compressed, LONG_WINDOW)
if c > LONG_MAX:
return too_many_requests(network, "too many request in LONG_WINDOW (LONG_MAX)")
return None
+85
View File
@@ -0,0 +1,85 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
""".. _botdetection.ip_lists:
Method ``ip_lists``
-------------------
The ``ip_lists`` method implements IP :py:obj:`block- <block_ip>` and
:py:obj:`pass-lists <pass_ip>`.
.. code:: toml
[botdetection.ip_lists]
pass_ip = [
'140.238.172.132', # IPv4 of check.searx.space
'192.168.0.0/16', # IPv4 private network
'fe80::/10' # IPv6 linklocal
]
block_ip = [
'93.184.216.34', # IPv4 of example.org
'257.1.1.1', # invalid IP --> will be ignored, logged in ERROR class
]
"""
# pylint: disable=unused-argument
from __future__ import annotations
from typing import Tuple
from ipaddress import (
ip_network,
IPv4Address,
IPv6Address,
)
from searx.tools import config
from ._helpers import logger
logger = logger.getChild('ip_limit')
SEARXNG_ORG = [
# https://github.com/searxng/searxng/pull/2484#issuecomment-1576639195
'140.238.172.132', # IPv4 check.searx.space
'2603:c022:0:4900::/56', # IPv6 check.searx.space
]
"""Passlist of IPs from the SearXNG organization, e.g. `check.searx.space`."""
def pass_ip(real_ip: IPv4Address | IPv6Address, cfg: config.Config) -> Tuple[bool, str]:
"""Checks if the IP on the subnet is in one of the members of the
``botdetection.ip_lists.pass_ip`` list.
"""
if cfg.get('botdetection.ip_lists.pass_searxng_org', default=True):
for net in SEARXNG_ORG:
net = ip_network(net, strict=False)
if real_ip.version == net.version and real_ip in net:
return True, f"IP matches {net.compressed} in SEARXNG_ORG list."
return ip_is_subnet_of_member_in_list(real_ip, 'botdetection.ip_lists.pass_ip', cfg)
def block_ip(real_ip: IPv4Address | IPv6Address, cfg: config.Config) -> Tuple[bool, str]:
"""Checks if the IP on the subnet is in one of the members of the
``botdetection.ip_lists.block_ip`` list.
"""
block, msg = ip_is_subnet_of_member_in_list(real_ip, 'botdetection.ip_lists.block_ip', cfg)
if block:
msg += " To remove IP from list, please contact the maintainer of the service."
return block, msg
def ip_is_subnet_of_member_in_list(
real_ip: IPv4Address | IPv6Address, list_name: str, cfg: config.Config
) -> Tuple[bool, str]:
for net in cfg.get(list_name, default=[]):
try:
net = ip_network(net, strict=False)
except ValueError:
logger.error("invalid IP %s in %s", net, list_name)
continue
if real_ip.version == net.version and real_ip in net:
return True, f"IP matches {net.compressed} in {list_name}."
return False, f"IP is not a member of an item in the f{list_name} list"
+147
View File
@@ -0,0 +1,147 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
""".. _limiter src:
Limiter
=======
.. sidebar:: info
The limiter requires a :ref:`Redis <settings redis>` database.
Bot protection / IP rate limitation. The intention of rate limitation is to
limit suspicious requests from an IP. The motivation behind this is the fact
that SearXNG passes through requests from bots and is thus classified as a bot
itself. As a result, the SearXNG engine then receives a CAPTCHA or is blocked
by the search engine (the origin) in some other way.
To avoid blocking, the requests from bots to SearXNG must also be blocked, this
is the task of the limiter. To perform this task, the limiter uses the methods
from the :py:obj:`searx.botdetection`.
To enable the limiter activate:
.. code:: yaml
server:
...
limiter: true # rate limit the number of request on the instance, block some bots
and set the redis-url connection. Check the value, it depends on your redis DB
(see :ref:`settings redis`), by example:
.. code:: yaml
redis:
url: unix:///usr/local/searxng-redis/run/redis.sock?db=0
"""
from __future__ import annotations
from pathlib import Path
from ipaddress import ip_address
import flask
import werkzeug
from searx.tools import config
from searx import logger
from . import (
http_accept,
http_accept_encoding,
http_accept_language,
http_connection,
http_user_agent,
ip_limit,
ip_lists,
)
from ._helpers import (
get_network,
get_real_ip,
dump_request,
)
logger = logger.getChild('botdetection.limiter')
CFG: config.Config = None # type: ignore
LIMITER_CFG_SCHEMA = Path(__file__).parent / "limiter.toml"
"""Base configuration (schema) of the botdetection."""
LIMITER_CFG = Path('/etc/searxng/limiter.toml')
"""Lokal Limiter configuration."""
CFG_DEPRECATED = {
# "dummy.old.foo": "config 'dummy.old.foo' exists only for tests. Don't use it in your real project config."
}
def get_cfg() -> config.Config:
global CFG # pylint: disable=global-statement
if CFG is None:
CFG = config.Config.from_toml(LIMITER_CFG_SCHEMA, LIMITER_CFG, CFG_DEPRECATED)
return CFG
def filter_request(request: flask.Request) -> werkzeug.Response | None:
# pylint: disable=too-many-return-statements
cfg = get_cfg()
real_ip = ip_address(get_real_ip(request))
network = get_network(real_ip, cfg)
if request.path == '/healthz':
return None
# link-local
if network.is_link_local:
return None
# block- & pass- lists
#
# 1. The IP of the request is first checked against the pass-list; if the IP
# matches an entry in the list, the request is not blocked.
# 2. If no matching entry is found in the pass-list, then a check is made against
# the block list; if the IP matches an entry in the list, the request is
# blocked.
# 3. If the IP is not in either list, the request is not blocked.
match, msg = ip_lists.pass_ip(real_ip, cfg)
if match:
logger.warning("PASS %s: matched PASSLIST - %s", network.compressed, msg)
return None
match, msg = ip_lists.block_ip(real_ip, cfg)
if match:
logger.error("BLOCK %s: matched BLOCKLIST - %s", network.compressed, msg)
return flask.make_response(('IP is on BLOCKLIST - %s' % msg, 429))
# methods applied on /
for func in [
http_user_agent,
]:
val = func.filter_request(network, request, cfg)
if val is not None:
return val
# methods applied on /search
if request.path == '/search':
for func in [
http_accept,
http_accept_encoding,
http_accept_language,
http_connection,
http_user_agent,
ip_limit,
]:
val = func.filter_request(network, request, cfg)
if val is not None:
return val
logger.debug(f"OK {network}: %s", dump_request(flask.request))
return None
+157
View File
@@ -0,0 +1,157 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
"""
Method ``link_token``
---------------------
The ``link_token`` method evaluates a request as :py:obj:`suspicious
<is_suspicious>` if the URL ``/client<token>.css`` is not requested by the
client. By adding a random component (the token) in the URL, a bot can not send
a ping by request a static URL.
.. note::
This method requires a redis DB and needs a HTTP X-Forwarded-For_ header.
To get in use of this method a flask URL route needs to be added:
.. code:: python
@app.route('/client<token>.css', methods=['GET', 'POST'])
def client_token(token=None):
link_token.ping(request, token)
return Response('', mimetype='text/css')
And in the HTML template from flask a stylesheet link is needed (the value of
``link_token`` comes from :py:obj:`get_token`):
.. code:: html
<link rel="stylesheet"
href="{{ url_for('client_token', token=link_token) }}"
type="text/css" />
.. _X-Forwarded-For:
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
"""
from __future__ import annotations
from ipaddress import (
IPv4Network,
IPv6Network,
ip_address,
)
import string
import random
import flask
from searx import logger
from searx import redisdb
from searx.redislib import secret_hash
from ._helpers import (
get_network,
get_real_ip,
)
TOKEN_LIVE_TIME = 600
"""Livetime (sec) of limiter's CSS token."""
PING_LIVE_TIME = 3600
"""Livetime (sec) of the ping-key from a client (request)"""
PING_KEY = 'SearXNG_limiter.ping'
"""Prefix of all ping-keys generated by :py:obj:`get_ping_key`"""
TOKEN_KEY = 'SearXNG_limiter.token'
"""Key for which the current token is stored in the DB"""
logger = logger.getChild('botdetection.link_token')
def is_suspicious(network: IPv4Network | IPv6Network, request: flask.Request, renew: bool = False):
"""Checks whether a valid ping is exists for this (client) network, if not
this request is rated as *suspicious*. If a valid ping exists and argument
``renew`` is ``True`` the expire time of this ping is reset to
:py:obj:`PING_LIVE_TIME`.
"""
redis_client = redisdb.client()
if not redis_client:
return False
ping_key = get_ping_key(network, request)
if not redis_client.get(ping_key):
logger.warning("missing ping (IP: %s) / request: %s", network.compressed, ping_key)
return True
if renew:
redis_client.set(ping_key, 1, ex=PING_LIVE_TIME)
logger.debug("found ping for (client) network %s -> %s", network.compressed, ping_key)
return False
def ping(request: flask.Request, token: str):
"""This function is called by a request to URL ``/client<token>.css``. If
``token`` is valid a :py:obj:`PING_KEY` for the client is stored in the DB.
The expire time of this ping-key is :py:obj:`PING_LIVE_TIME`.
"""
from . import limiter # pylint: disable=import-outside-toplevel, cyclic-import
redis_client = redisdb.client()
if not redis_client:
return
if not token_is_valid(token):
return
cfg = limiter.get_cfg()
real_ip = ip_address(get_real_ip(request))
network = get_network(real_ip, cfg)
ping_key = get_ping_key(network, request)
logger.debug("store ping_key for (client) network %s (IP %s) -> %s", network.compressed, real_ip, ping_key)
redis_client.set(ping_key, 1, ex=PING_LIVE_TIME)
def get_ping_key(network: IPv4Network | IPv6Network, request: flask.Request) -> str:
"""Generates a hashed key that fits (more or less) to a *WEB-browser
session* in a network."""
return (
PING_KEY
+ "["
+ secret_hash(
network.compressed + request.headers.get('Accept-Language', '') + request.headers.get('User-Agent', '')
)
+ "]"
)
def token_is_valid(token) -> bool:
valid = token == get_token()
logger.debug("token is valid --> %s", valid)
return valid
def get_token() -> str:
"""Returns current token. If there is no currently active token a new token
is generated randomly and stored in the redis DB.
- :py:obj:`TOKEN_LIVE_TIME`
- :py:obj:`TOKEN_KEY`
"""
redis_client = redisdb.client()
if not redis_client:
# This function is also called when limiter is inactive / no redis DB
# (see render function in webapp.py)
return '12345678'
token = redis_client.get(TOKEN_KEY)
if token:
token = token.decode('UTF-8')
else:
token = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(16))
redis_client.set(TOKEN_KEY, token, ex=TOKEN_LIVE_TIME)
return token