# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

"""
This module contains contains classes implementing SSA Agent behaviour
"""
import atexit
import json
import logging
import pwd
import re
import socket as socket_module
import struct
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
from urllib.parse import urlparse

from clcommon.cpapi import cpusers, domain_owner, get_main_username_by_uid

from .internal.constants import agent_sock
from .internal.exceptions import SSAError
from .internal.utils import create_socket
from .modules.processor import RequestProcessor

# Maximum number of concurrent worker threads for handling requests.
# Limits memory usage on high-traffic servers.
MAX_WORKERS = 50

# Upper bound on bytes accepted from a single connection.
# The PHP extension's JSON payload is always under 1 KB; 8 KB gives
# ample headroom while preventing unbounded reads.
MAX_MSG_SIZE = 8192

# Seconds to wait for a peer to finish sending its payload.
# Keeps slow or stalled connections from holding a worker thread open.
SOCKET_READ_TIMEOUT = 10


class SimpleAgent:
    """
    SSA Simple Agent class
    """

    def __init__(self):
        self.logger = logging.getLogger('agent')
        self.request_processor = RequestProcessor()
        self.executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
        atexit.register(self._shutdown)
        # start serving incoming connections
        self.listen()

    def _shutdown(self):
        """Gracefully shutdown the thread pool executor."""
        self.executor.shutdown(wait=False)

    def listen(self) -> None:
        """
        Start listening socket
        """
        _socket = create_socket(agent_sock)
        while True:
            connection, address = _socket.accept()
            try:
                self.executor.submit(self.handle, connection)
            except RuntimeError as e:
                # CPython's ThreadPoolExecutor.submit() enqueues the work
                # item BEFORE calling _adjust_thread_count(); a RuntimeError
                # from Thread.start() (e.g. the cgroup pids controller
                # rejecting a new thread because TasksMax was hit,
                # CLPRO-3118) therefore leaves the work item in the queue,
                # and existing workers will drain it. Do NOT close the
                # connection here — that would race with a worker that may
                # yet process it and turn a real burst-induced failure into
                # spurious "Bad file descriptor" errors. Just log and keep
                # accepting; killing the agent would only trigger a restart
                # loop.
                self.logger.error(
                    '[ThreadPool] submit raised (work item queued for '
                    'existing workers): %s', str(e))
                continue
            self.logger.debug('[ThreadPool] Submitted task')

    # Fields that the PHP extension sends (see dump.c: ssa_agent_dump)
    _REQUIRED_FIELDS = frozenset({
        'timestamp', 'url', 'duration',
        'hitting_limits', 'throttled_time', 'io_throttled_time', 'wordpress'
    })

    # 8190 matches Apache's default LimitRequestLine, which is the effective
    # upper bound on URLs reaching the PHP extension in practice.
    _MAX_URL_LENGTH = 8190
    _URL_RE = re.compile(r'^https?\??://[^\x00-\x1f\s<>"{}|\\^`\[\]]+\Z')

    @staticmethod
    def _get_peer_uid(connection: 'socket object') -> int:
        """
        Get the UID of the peer process using SO_PEERCRED.
        :param connection: socket object
        :return: UID of the connecting process
        """
        cred = connection.getsockopt(
            socket_module.SOL_SOCKET,
            socket_module.SO_PEERCRED,
            struct.calcsize('3i')
        )
        _pid, uid, _gid = struct.unpack('3i', cred)
        return uid

    @classmethod
    def _validate_input(cls, data: dict) -> bool:
        """
        Validate that input data contains exactly the expected metric fields
        with the correct value types. The PHP extension always sends all 7
        fields (see dump.c), so we require an exact key match and enforce
        the types produced by the C formatter to reject both malformed and
        spoofed payloads.
        """
        if not isinstance(data, dict) or not data:
            return False
        if set(data.keys()) != cls._REQUIRED_FIELDS:
            return False
        if not isinstance(data['timestamp'], str) or not data['timestamp'].isascii() or not data['timestamp'].isdigit():
            return False
        if not isinstance(data['url'], str) or not data['url']:
            return False
        if len(data['url']) > cls._MAX_URL_LENGTH:
            return False
        if not cls._URL_RE.match(data['url']):
            return False
        if isinstance(data['duration'], bool) or \
                not isinstance(data['duration'], int) or data['duration'] < 0:
            return False
        if not isinstance(data['hitting_limits'], bool):
            return False
        if isinstance(data['throttled_time'], bool) or \
                not isinstance(data['throttled_time'], int) or data['throttled_time'] < 0:
            return False
        if isinstance(data['io_throttled_time'], bool) or \
                not isinstance(data['io_throttled_time'], int) or data['io_throttled_time'] < 0:
            return False
        if not isinstance(data['wordpress'], bool):
            return False
        return True

    def _authorize_sender(self, peer_uid: int, url: str) -> bool:
        """
        Authenticate the sender against the domain it reports for.

        The socket is world-writable by design (PHP workers run under
        arbitrary tenant UIDs), so the peer UID is the only sender identity
        available. The sender is classified by panel hosting-account
        membership, NOT by a numeric UID threshold: a UID_MIN cutoff is wrong
        panel/OS-dependently — DirectAdmin's `webapps` (uid 1001) and `admin`
        (uid 1000) sit at/above UID_MIN yet own no per-tenant identity, and
        cPanel/LiteSpeed on EL8/EL9 runs DSO/non-suEXEC PHP as `nobody`
        (uid 65534, not 99), also ≥ UID_MIN. A threshold would false-reject
        all of their legitimate metrics.

        Instead the peer's UID is resolved to a username via the panel
        (get_main_username_by_uid) and checked against the panel's
        authoritative tenant set (cpusers(), via _panel_tenant_users).
        A peer that is NOT a hosting account — root, apache, nobody=65534,
        webapps, the LiteSpeed master, … — has no per-tenant identity to
        enforce and is trusted for any domain. A peer that IS a hosting
        account is rejected only on a POSITIVE cross-tenant mismatch: the
        reported domain's owner resolves to a system UID different from the
        peer UID. When ownership cannot be established (domain_owner raises,
        returns nothing, or names a user absent from the passwd database —
        panel DB/file unavailable, domain mid-provisioning, parked/alias
        domains) we fail OPEN (accept + warn); we likewise fail OPEN when
        cpusers() itself fails (the tenant set degrades to empty, so every
        peer is treated as a non-tenant). This fail-open-on-unresolvable is a
        deliberate product choice: dropping such payloads buys no security
        over the confirmed spoof (which needs a resolvable owner that differs)
        while silently losing legitimate metrics.

        Ownership is compared by UID, NOT by username string (CLPRO-3231).
        The peer identity is the kernel-trusted SO_PEERCRED UID; with per-user
        PHP (lsphp / suEXEC / PHP-FPM-per-user) that UID is the domain owner's
        UID by construction, so mapping the reported domain's owner to a UID
        (pwd.getpwnam) and comparing UIDs can never false-reject a site's own
        traffic. Comparing two independently-resolved name strings —
        getpwuid(peer_uid).pw_name versus the owner recorded in the panel's
        domain DB — is NOT safe: the live passwd DB and the panel DB can spell
        the same account differently (case, account rename, duplicate/alias
        passwd entry, reseller naming), which false-rejected entire legitimate
        sites and flooded Sentry once 0.4-28 shipped the string comparison.

        Inherent limit: on pure-DSO / shared-`nobody` configurations every PHP
        worker runs under the same non-tenant UID, so there is no per-tenant
        kernel identity to bind a payload to and authorization degrades to
        trust. That is acceptable — such configs provide no tenant isolation
        in the first place, and the threat model here is per-user PHP.

        Lookups (cpusers() and domain_owner) are performed FRESH on every
        payload — there is no cache. The previous owner/tenant-set memo was
        removed: it guarded a security decision and produced repeated
        staleness/race findings. Fresh lookups are obviously correct and
        acceptable here because the panel layer (clcommon.cpapi) memoizes
        internally (the DirectAdmin backend caches its domain DB), and this
        runs on a background daemon (50-worker pool), not the PHP request hot
        path.

        The URL host is normalized to the panel's canonical form before the
        lookup: urlparse().hostname strips any userinfo (user:pass@) prefix
        and :port suffix and lowercases the host; trailing root-label dots
        are removed (the panel keys ownership on the bare lowercase
        hostname, and the cPanel and DirectAdmin backends match it
        case-sensitively). Without this, an uppercase or trailing-dot
        variant of a victim domain passes _URL_RE but misses the owner
        lookup and fails open, re-opening cross-tenant spoofing
        (!45 note 557376).

        After that, every 'www.' substring is removed — NOT just a leading
        label — because RequestProcessor attributes the stored metric to
        url_split()'s netloc, which does exactly netloc.replace('www.', '').
        Authorization MUST canonicalize to the same domain string the
        processor will store: with a weaker strip, a crafted host like
        www.www.victim.com authorizes as the unresolvable www.victim.com
        (fail-open) yet is stored as victim.com (bugbot 95cac397). Any
        netloc whose stored form equals a victim domain is an all-lowercase,
        port-free 'www.'-insertion variant, and this same replace-all maps
        every such variant back to the victim domain, forcing the owner
        check. (url_split's substring replace can also rewrite domains that
        merely contain 'www.' mid-name; that attribution quirk is
        pre-existing and shared deliberately — parity is the security
        property.)
        """
        peer_user = self._peer_panel_user(peer_uid)
        if peer_user is None or peer_user not in self._panel_tenant_users():
            return True
        host = (urlparse(url).hostname or '').rstrip('.')
        domain = host.replace('www.', '')
        owner = self._resolve_domain_owner(domain, peer_user)
        if not owner:
            return True
        try:
            owner_uid = pwd.getpwnam(owner).pw_uid
        except KeyError:
            self.logger.warning(
                '[%s] Owner %r of %r not in passwd (peer UID=%d), accepting',
                current_thread().name, owner, domain, peer_uid)
            return True
        return owner_uid == peer_uid

    @staticmethod
    def _peer_panel_user(peer_uid: int):
        """
        Resolve the peer UID to its panel main username, or None when it
        cannot be mapped (no such panel user). An unmappable UID is not a
        tenant and is trusted (fail open).
        """
        try:
            return get_main_username_by_uid(peer_uid)
        except Exception:
            return None

    def _panel_tenant_users(self) -> frozenset:
        """Fresh frozenset(cpusers()). On failure -> empty frozenset, so every
        peer is treated as a non-tenant and trusted (fail open), consistent with
        the unresolvable-owner / nopanel behaviour."""
        try:
            return frozenset(cpusers())
        except Exception as e:
            self.logger.warning(
                '[%s] Could not resolve panel hosting accounts, treating every '
                'peer as non-tenant (fail open): %s',
                current_thread().name, str(e))
            return frozenset()

    def _resolve_domain_owner(self, domain: str, peer_user: str):
        """
        Resolve the owner username of a bare hostname via the panel API.
        Returns the owner username, or None for every case where ownership
        cannot be established (the fail-open cases documented in
        _authorize_sender).
        """
        try:
            owner = domain_owner(domain)
        except Exception as e:
            self.logger.warning(
                '[%s] Could not resolve owner of %r for user=%r, accepting: %s',
                current_thread().name, domain, peer_user, str(e))
            return None
        if not owner:
            self.logger.warning(
                '[%s] No owner for %r (user=%r), accepting',
                current_thread().name, domain, peer_user)
            return None
        return owner

    def handle(self, connection: 'socket object') -> None:
        """
        Handle incoming connection
        :param connection: socket object usable to
        send and receive data on the connection
        """
        try:
            peer_uid = self._get_peer_uid(connection)
        except (OSError, struct.error) as e:
            self.logger.error(
                '[%s] Failed to get peer credentials: %s',
                current_thread().name, str(e))
            connection.close()
            return

        connection.settimeout(SOCKET_READ_TIMEOUT)
        fileobj = connection.makefile(errors='ignore')
        try:
            input_data = self.read_input(fileobj)
            if not self._validate_input(input_data):
                self.logger.warning(
                    '[%s] Rejected invalid payload from UID=%d: keys=%s',
                    current_thread().name, peer_uid,
                    sorted(input_data.keys()) if isinstance(input_data, dict) else type(input_data).__name__)
                return
            if not self._authorize_sender(peer_uid, input_data['url']):
                self.logger.warning(
                    '[%s] Rejected cross-tenant payload from UID=%d for url=%r',
                    current_thread().name, peer_uid, input_data['url'])
                return
            self.request_processor.handle(input_data)
            self.logger.info('[%s] Accepted payload from UID %d',
                             current_thread().name, peer_uid)
        except socket_module.timeout as e:
            self.logger.warning('[%s] Connection timed out (peer UID=%d): %s',
                                current_thread().name, peer_uid, str(e))
        except (SSAError, json.JSONDecodeError, ValueError) as e:
            self.logger.error('Handled exception in [%s]: %s',
                              current_thread().name, str(e))
        except Exception as e:
            self.logger.exception('Unexpected exception in [%s]: %s',
                                  current_thread().name, str(e))
        finally:
            fileobj.close()
            connection.close()

    def read_input(self, fileio: 'file object') -> dict:
        """
        Read input data and return decoded json
        :param fileio: a file-like object providing read method
        """
        data = fileio.read(MAX_MSG_SIZE)
        self.logger.info('[%s] I received %i bytes',
                         current_thread().name, len(data.encode()))
        self.logger.debug('[%s] payload: %s',
                          current_thread().name, data)
        if data:
            return json.loads(data.strip())
        else:
            return {}
