import datetime
import os
import smtplib
import socket
from dataclasses import dataclass
from typing import Annotated
from typing import Any
from typing import Literal
from pydantic import AmqpDsn
from pydantic import BaseModel as PydanticBaseModel
from pydantic import BeforeValidator
from pydantic import RedisDsn
from pydantic import TypeAdapter
from pydantic import ValidationError
from pydantic import create_model
from pydantic import field_validator
from pydantic_settings import BaseSettings
from pydantic_settings import NoDecode
from pydantic_settings import SettingsConfigDict
from pydantic_settings.sources import TomlConfigSettingsSource
from canaille.app.toml import example_settings
DEFAULT_CONFIG_FILE = "canaille.toml"
def parse_comma_separated(value: Any) -> list[str]:
if isinstance(value, str):
return [item.strip() for item in value.split(",") if item.strip()]
return value
CommaSeparatedList = Annotated[
list[str], NoDecode, BeforeValidator(parse_comma_separated)
]
class BaseModel(PydanticBaseModel):
model_config = SettingsConfigDict(
use_attribute_docstrings=True,
case_sensitive=False,
)
[docs]
class RootSettings(BaseSettings):
"""The top-level namespace contains the configuration settings unrelated to Canaille.
The configuration parameters from the following libraries can be used:
- :doc:`Flask <flask:config>`
- :doc:`Flask-WTF <flask-wtf:config>`
- :doc:`Flask-Babel <flask-babel:index>`
- :doc:`Authlib <authlib:oauth2/authorization-server/flask/authorization-server>`
.. code-block:: toml
:caption: config.toml
SECRET_KEY = "very-secret"
SERVER_NAME = "auth.mydomain.example"
PREFERRED_URL_SCHEME = "https"
DEBUG = false
[CANAILLE]
NAME = "My organization"
...
"""
model_config = SettingsConfigDict(
extra="allow",
env_nested_delimiter="__",
case_sensitive=True,
use_attribute_docstrings=True,
toml_file=DEFAULT_CONFIG_FILE,
)
@classmethod
def settings_customise_sources(
cls,
settings_cls,
init_settings,
env_settings,
dotenv_settings,
file_secret_settings,
):
toml_file = os.getenv("CANAILLE_CONFIG") or settings_cls.model_config.get(
"toml_file"
)
return (
init_settings,
TomlConfigSettingsSource(settings_cls, toml_file),
env_settings,
dotenv_settings,
file_secret_settings,
)
SECRET_KEY: str | None = None
"""The Flask :external:py:data:`SECRET_KEY` configuration setting.
You MUST set a value before deploying in production.
"""
SERVER_NAME: str | None = None
"""The Flask :external:py:data:`SERVER_NAME` configuration setting.
This sets domain name on which canaille will be served.
"""
TRUSTED_HOSTS: list[str] | None = None
"""The Flask :external:py:data:`TRUSTED_HOSTS` configuration setting.
This sets trusted values for hosts and validates hosts during requests.
"""
BROKER_URL: str | None = None
"""The URL of the running task worker.
It is passed as ``url`` keyword argument to broker class. For example:
- ``redis://localhost:6379``
- ``amqp://localhost``
- ``redis://username:password@redis.example:6379/0``
- ``amqp://guest:guest@localhost:5672/?heartbeat=30&connection_timeout=10``
If none, all the tasks are executed synchronously without requiring to run a task worker.
This has poor performance but can be useful in tests environments.
"""
BROKER: str | None = None
"""Points to the broker class.
If none, this will be guessed from the value of :attr:`~canaille.app.configuration.RootSettings.BROKER_URL`:
- ``dramatiq_eager_broker:EagerBroker`` is used if the broker URL is unset.
This broker executes that's synchronously, meaning there is no need to run a task worker.
- ``dramatiq.brokers.rabbitmq:RabbitmqBroker`` is used if the URL is an AMQP URL.
- ``dramatiq.brokers.redis:RedisBroker`` is used if the URL is a redis URL.
"""
# TODO: Use the upstream feature when it is implemented
# https://gitlab.com/bersace/flask-dramatiq/-/issues/13
@field_validator("BROKER", mode="after")
@classmethod
def guess_broker(cls, v, info):
"""Guess BROKER from BROKER_URL if not set."""
if v is not None:
return v
broker_url = info.data.get("BROKER_URL")
if not broker_url:
return "dramatiq_eager_broker:EagerBroker"
try:
TypeAdapter(AmqpDsn).validate_python(broker_url)
return "dramatiq.brokers.rabbitmq:RabbitmqBroker"
except ValidationError:
pass
try:
TypeAdapter(RedisDsn).validate_python(broker_url)
return "dramatiq.brokers.redis:RedisBroker"
except ValidationError:
pass
# Fallback for unix:// scheme (supported by RedisBroker but not by RedisDsn)
if broker_url.startswith("unix://"):
return "dramatiq.brokers.redis:RedisBroker"
raise ValueError(
f"Unable to guess BROKER from BROKER_URL='{broker_url}'. "
f"Supported schemes: amqp://, amqps://, redis://, rediss://, unix://"
)
PREFERRED_URL_SCHEME: str = "https"
"""The Flask :external:py:data:`PREFERRED_URL_SCHEME` configuration
setting.
This sets the url scheme by which canaille will be served.
"""
DEBUG: bool = False
"""The Flask :external:py:data:`DEBUG` configuration setting.
This enables debug options.
.. danger::
This is useful for development but should be absolutely
avoided in production environments.
"""
CACHE_TYPE: str = "SimpleCache"
"""The cache type.
The default ``SimpleCache`` is a lightweight in-memory cache.
See the :doc:`Flask-Caching documentation <flask-caching:index>` for further details.
"""
PERMANENT_SESSION_LIFETIME: datetime.timedelta = datetime.timedelta(days=30)
"""The Flask :external:py:data:`PERMANENT_SESSION_LIFETIME` configuration setting.
This sets the lifetime of a permanent session. Users sessions are permanent when
they check the "Remember me" checkbox during login.
The value is expressed in `ISO8601 duration format <https://en.wikipedia.org/wiki/ISO_8601#Durations>`_.
For example:
- ``P365D`` for 365 days
- ``P30D`` for 30 days
- ``P1W`` for 1 week
- ``PT12H`` for 12 hours
- ``P1DT12H`` for 1 day and 12 hours
"""
SESSION_TYPE: (
Literal[
"redis",
"memcached",
"filesystem",
"sqlalchemy",
"mongodb",
"cachelib",
"dynamodb",
]
| None
) = None
"""The Flask-Session backend type.
If ``None`` (default), Flask's default session implementation is used (client-side signed cookies).
When set, sessions are stored server-side using the specified backend.
Available backends:
- ``redis``: Store sessions in Redis (connects to localhost:6379 by default)
- ``memcached``: Store sessions in Memcached (connects to localhost:11211 by default)
- ``filesystem``: Store sessions in local files (uses /tmp/flask-session by default)
- ``sqlalchemy``: Store sessions in a SQL database
- ``mongodb``: Store sessions in MongoDB
- ``cachelib``: Store sessions using a cachelib backend
- ``dynamodb``: Store sessions in DynamoDB
See the :doc:`Flask-Session documentation <flask-session:index>` for backend-specific details.
"""
def settings_factory(
config=None,
env_file=None,
env_prefix="",
init_with_examples=False,
):
"""Push the backend specific configuration into CoreSettings.
In the purpose to break dependency against backends libraries like python-ldap or
sqlalchemy.
"""
from canaille.backends.ldap.configuration import LDAPSettings
from canaille.backends.sql.configuration import SQLSettings
from canaille.core.configuration import CoreSettings
from canaille.hypercorn.configuration import HypercornSettings
from canaille.oidc.configuration import OIDCSettings
from canaille.scim.configuration import SCIMSettings
config = config or {}
default = example_settings(CoreSettings) if init_with_examples else CoreSettings()
attributes = {"CANAILLE": (CoreSettings, default)}
additional_settings = {
"CANAILLE_SQL": (SQLSettings, True),
"CANAILLE_LDAP": (LDAPSettings, False),
"CANAILLE_OIDC": (OIDCSettings, True),
"CANAILLE_SCIM": (SCIMSettings, True),
"CANAILLE_HYPERCORN": (HypercornSettings, True),
}
for prefix, (setting, enabled_by_default) in additional_settings.items():
if init_with_examples:
default_value = example_settings(setting)
if prefix in config and config[prefix] is None:
del config[prefix]
elif enabled_by_default and not config.get("TESTING"):
default_value = setting.model_construct()
else:
default_value = None
attributes[prefix] = ((setting | None), default_value)
Settings = create_model(
"Settings",
__base__=RootSettings,
**attributes,
)
return Settings(
**config,
_secrets_dir=os.environ.get("SECRETS_DIR"),
_env_file=env_file,
_env_prefix=env_prefix,
)
class ConfigurationException(Exception):
pass
@dataclass
class CheckResult:
message: str
success: bool | None = None
def setup_config(app, config=None, env_file=None, env_prefix=""):
app.config.from_mapping(
{
# https://flask.palletsprojects.com/en/stable/config/#SESSION_COOKIE_NAME
"SESSION_COOKIE_NAME": "canaille",
}
)
env_file = env_file or os.getenv("CANAILLE_ENV")
try:
config_obj = settings_factory(
config or {}, env_file=env_file, env_prefix=env_prefix
)
except ValidationError as exc: # pragma: no cover
app.logger.critical(str(exc))
return False
config_dict = config_obj.model_dump()
app.no_secret_key = config_dict["SECRET_KEY"] is None
app.config.from_mapping(config_dict)
return True
def check_network_config(config):
"""Perform various network connection to services described in the configuration file."""
from canaille.backends import Backend
results = [Backend.instance.check_network_config(config)]
if smtp_config := config["CANAILLE"]["SMTP"]:
results.append(check_smtp_connection(smtp_config))
else:
results.append(CheckResult(message="No SMTP server configured"))
if smpp_config := config["CANAILLE"]["SMPP"]:
results.append(check_smpp_connection(smpp_config))
else:
results.append(CheckResult(message="No SMPP server configured"))
return results
def check_smtp_connection(config) -> CheckResult:
host = config["HOST"]
port = config["PORT"]
try:
with smtplib.SMTP(host=host, port=port) as smtp:
if config["TLS"]:
smtp.starttls()
if config["LOGIN"]:
smtp.login(
user=config["LOGIN"],
password=config["PASSWORD"],
)
except (socket.gaierror, ConnectionRefusedError):
return CheckResult(
message=f"Could not connect to the SMTP server '{host}' on port '{port}'",
success=False,
)
except smtplib.SMTPAuthenticationError:
return CheckResult(
message=f"SMTP authentication failed with user '{config['LOGIN']}'",
success=False,
)
except smtplib.SMTPNotSupportedError as exc:
return CheckResult(
message=str(exc),
success=False,
)
return CheckResult(
message="Successful SMTP connection",
success=True,
)
def check_smpp_connection(config):
import smpplib
host = config["HOST"]
port = config["PORT"]
try:
with smpplib.client.Client(host, port, allow_unknown_opt_params=True) as client:
client.connect()
if config["LOGIN"]:
client.bind_transmitter(
system_id=config["LOGIN"], password=config["PASSWORD"]
)
except smpplib.exceptions.ConnectionError:
return CheckResult(
success=False,
message=f"Could not connect to the SMPP server '{host}' on port '{port}'",
)
except smpplib.exceptions.UnknownCommandError as exc: # pragma: no cover
return CheckResult(
message=str(exc),
success=False,
)
return CheckResult(
message="Successful SMPP connection",
success=True,
)