https://t.me/AnonymousX5
Server : Apache
System : Linux cvar2.toservers.com 3.10.0-962.3.2.lve1.5.73.el7.x86_64 #1 SMP Wed Aug 24 21:31:23 UTC 2022 x86_64
User : njnconst ( 1116)
PHP Version : 8.4.18
Disable Function : NONE
Directory :  /proc/self/root/opt/alt-old/python37/lib/python3.7/site-packages/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //proc/self/root/opt/alt-old/python37/lib/python3.7/site-packages/cpanel.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

from __future__ import absolute_import

import json
import os
import pwd
import re
import shutil
import subprocess
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path
from typing import Optional, Dict, Tuple, Union, List
from pkg_resources import parse_version

from clcommon.cpapi import userdomains
from clcommon.clwpos_lib import find_wp_paths, get_wp_cache_plugin
from clcommon.lib.cledition import is_cl_solo_edition
from secureio import write_file_via_tempfile

from clwpos.constants import WP_CLI_EXTENSIONS, RedisRequiredConstants, EA_PHP_PREFIX, CAGEFSCTL
from clwpos.cl_wpos_exceptions import WposError, PhpBrokenException

from clwpos.logsetup import setup_logging
from clwpos import gettext as _
from clwpos.utils import check_domain, home_dir, clear_redis_cache_config, create_redis_cache_config, \
    daemon_communicate, PHP, wp_cli_compatibility_check, run_in_cagefs_if_needed, create_pid_file


_logger = setup_logging(__name__)

BASE_CPANEL_EA_PHP_DIR = '/opt/cpanel'


def _get_php_handler(vhost: str) -> str:
    result = uapi("php_get_domain_handler", {"type": "vhost", "vhost": vhost})
    return result["php_handler"]


def _get_doc_roots_info() -> dict:
    user = pwd.getpwuid(os.geteuid()).pw_name
    result = {}
    for domain, doc_root in userdomains(user):
        result.setdefault(doc_root, []).append(domain)

    return result


def _add_wp_path_info(user_info: dict) -> dict:
    wp_paths = {}
    for doc_root, domains in user_info.items():
        # excludes only affects subpaths of doc_root
        excludes = list(user_info)
        item = {
            "domains": domains,
            "wp_paths": list(find_wp_paths(doc_root, excludes=excludes))
        }
        wp_paths[doc_root] = item
    return wp_paths


def _wp_info(doc_root: str, wp_path: str) -> dict:
    """Convert WP path to {"path": str, "version": str}"""
    absolute_wp_path = Path(doc_root, wp_path)
    version_file = list(absolute_wp_path.glob("wp-includes/version.php"))[0]
    result = subprocess.run(["/bin/grep", "-Po", "(?<=wp_version = ')[^']+", version_file], capture_output=True)
    wp_version = result.stdout.strip().decode()
    return {
        "path": wp_path,
        "version": wp_version,
    }


def _add_wp_info(user_info: dict) -> dict:
    for doc_root, doc_root_info in user_info.items():
        wp_paths = doc_root_info.pop("wp_paths")
        doc_root_info["wps"] = [_wp_info(doc_root, wp_path) for wp_path in wp_paths]
    return user_info


def php_info():
    """
    Returns php info, example:
    [{'vhost': 'sub.wposuser.com', 'account': 'stackoverflow',
    'phpversion_source': {'domain': 'sub.wposuser.com'},
    'version': 'ea-php80', 'account_owner': 'root', 'php_fpm': 1,
    'php_fpm_pool_parms': {'pm_process_idle_timeout': 10,
    'pm_max_requests': 20, 'pm_max_children': 5},
    'main_domain': 0, 'documentroot': '/home/stackoverflow/public_html',
    'homedir': '/home/stackoverflow'},
    ...................................................................]
    """
    result = uapi("php_get_vhost_versions")
    for elem in result:
        elem["version"] = _normalized_php_version(PHP(elem["version"]))
    return result


def ea_php_ini_file_path(ini_name: str, php_version: str):
    """
    Builds path to <ini_name>.ini file
    """
    return Path(PHP(php_version).dir()).joinpath(f'root/etc/php.d/{ini_name}')


def get_supported_ea_php():
    """
    Looks through /opt/cpanel and gets installed phps
    """
    base_dir = Path(BASE_CPANEL_EA_PHP_DIR)
    minimal_supported = parse_version('ea-php74')
    supported = []
    for item in os.listdir(base_dir):
        if item.startswith('ea-php') and parse_version(item) >= minimal_supported:
            supported.append(item)
    return supported


def configure_redis_extension_for_ea():
    """
    Sets up redis if needed:
     - installing package
     - enables in .ini file
    """
    need_cagefs_update = False
    cl_jenkins_tests = bool(os.environ.get('CL_WPOS_WAIT_CHILD_PROCESS'))
    php_versions_redis_data = {
        php: _redis_extension_info(PHP(php)) for php in get_supported_ea_php()
    }
    php_versions_to_enable_redis = [
        php for php, redis_data in php_versions_redis_data.items()
        if not redis_data.get('is_present') or not redis_data.get('is_loaded')
    ]
    if not php_versions_to_enable_redis:
        return

    with create_pid_file(EA_PHP_PREFIX):
        for php in php_versions_to_enable_redis:
            redis_data = php_versions_redis_data.get(php)
            if not redis_data.get('is_present'):
                redis_package = f'{php}-php-redis'
                result = subprocess.run(['yum', '-y', 'install', redis_package],
                                        capture_output=True,
                                        text=True)
                if result.returncode != 0 and 'Nothing to do' not in result.stdout:
                    _logger.error('Failed to install package %s, due to reason: %s', redis_package,
                                  f'{result.stdout}\n{result.stderr}')
                    return
                enable_redis_extension_for_ea(php)
                need_cagefs_update = True
            elif not redis_data.get('is_loaded'):
                enable_redis_extension_for_ea(php)
                need_cagefs_update = True

        if need_cagefs_update and cl_jenkins_tests:
            subprocess.run([CAGEFSCTL, '--wait-lock', '--force-update'],
                           stdout=subprocess.DEVNULL,
                           stderr=subprocess.DEVNULL)


def enable_redis_extension_for_ea(php_version):
    """
    Enables (if needed) redis extension in .ini config
    """
    path = ea_php_ini_file_path('50-redis.ini', php_version)
    keyword = 'redis.so'
    if not os.path.exists(path):
        _logger.error('Redis extension config: %s is not found, ensure corresponding rpm package installed: %s',
                      str(path), f'{php_version}-php-redis')
        return
    with open(path) as f:
        extension_data = f.readlines()

    uncommented_pattern = re.compile(fr'^\s*extension\s*=\s*{keyword}')
    commented_pattern = re.compile(fr'^\s*;\s*extension\s*=\s*{keyword}')
    enabled_line = f'extension = {keyword}\n'
    was_enabled = False
    lines = []

    for line in extension_data:
        if uncommented_pattern.match(line):
            return
        if not was_enabled and commented_pattern.match(line):
            lines.append(enabled_line)
            was_enabled = True
        else:
            lines.append(line)
    if not was_enabled:
        lines.append(enabled_line)
    write_file_via_tempfile(''.join(lines), path, 0o644)


@lru_cache()
def _redis_extension_info(version: PHP) -> dict:
    is_present = bool(list(version.dir().glob("**/redis.so")))
    php_bin_path = version.bin()
    if os.geteuid() == 0:
        exec_func = subprocess.run
    else:
        exec_func = run_in_cagefs_if_needed

    is_loaded = exec_func(
        f'{php_bin_path} -m | /bin/grep redis', shell=True, env={}
    ).returncode == 0 if is_present else False

    return {
        "is_present": is_present,
        "is_loaded": is_loaded
    }


def _add_php(user_info: dict) -> dict:
    """
    Updates user_info dict with php data
    """
    result = php_info()
    for item in result:
        user_info[item["documentroot"]]["php"] = {
            "version": item["version"],
            "fpm": bool(item["php_fpm"]),
            "handler": _get_php_handler(item["vhost"])
        }

    return user_info


def _add_object_cache_info(user_info: dict) -> dict:
    """
    Search for 'object-cache.php' files in 'wp-content/plugins' directory
    in order to find what plugin is being used for object caching.
    """
    for doc_root, doc_root_info in user_info.items():
        for wp in doc_root_info["wps"]:
            plugin = get_wp_cache_plugin(Path(doc_root).joinpath(wp["path"]), "object-cache")
            wp["object_cache"] = plugin

    return user_info


def get_user_info() -> dict:
    """
    Collect info about user.
    @return {
        '/home/user/public_html': {
            'domains': ['domain.com'],
            'wps': [
                {
                    'path': 'wp_path_1',
                    'version': '5.7.2',
                    'object_cache': 'redis-cache'
                }
            ],
            'php': {
                'version': 'ea-php74',
                'handler': 'cgi',
                'redis_extension': False,
                'fpm': True
            }
        }
    }
    """
    user_info = _get_doc_roots_info()

    for func in (_add_wp_path_info, _add_wp_info, _add_php, _add_object_cache_info):
        user_info = func(user_info)

    return user_info


def _get_php_version(abs_wp_path: str) -> Optional[PHP]:
    """Return PHP version."""
    result = php_info()

    items = []
    for item in result:
        if abs_wp_path.startswith(item["documentroot"]):
            items.append((item["documentroot"], item["version"]))
    items.sort(reverse=True)
    return items[0][1]


def _normalized_php_version(version: PHP) -> PHP:
    """
    PHP selector can replace path with symlink. It's a reason why we need normalization.
    """
    if not is_cl_solo_edition(skip_jwt_check=True):
        command = f"{version.bin()} -i " \
                  f" | /bin/grep 'Loaded Configuration File'" \
                  f" | /bin/grep -oE \"(alt|ea).*php[^/]*/\""
        result = run_in_cagefs_if_needed(command, shell=True, env={})

        if result.stderr and not result.stdout:
            raise PhpBrokenException(str(version.bin()), result.stderr)

        return PHP(result.stdout.strip().strip("/").replace("/", "-"))

    return version


def filter_php_versions_with_not_loaded_redis(php_versions: List[PHP]) -> List[PHP]:
    """
    Filter list of given php versions to find out
    for which redis extension is presented but not loaded.
    """
    php_versions_with_not_loaded_redis = []
    for version in php_versions:
        php_redis_info = _redis_extension_info(version)
        if not php_redis_info['is_loaded'] and php_redis_info['is_present']:
            php_versions_with_not_loaded_redis.append(version)
    return php_versions_with_not_loaded_redis


@lru_cache(maxsize=None)
def get_cached_php_installed_versions() -> List[PHP]:
    """
    List all installed php version on the system
    :return: installed php version
    """
    result = uapi("php_get_installed_versions")
    return [PHP(version) for version in result["versions"]]


@lru_cache(maxsize=None)
def get_cached_php_versions_with_redis_loaded() -> set:
    """
    List all installed php version on the system which has redis-extension enabled
    :return: installed php versions which has redis-extension
    """
    versions = get_cached_php_installed_versions()
    return {version for version in versions if _redis_extension_info(version).get("is_loaded", False)}


@lru_cache(maxsize=None)
def get_cached_php_versions_with_redis_present() -> set:
    """
    List all installed php version on the system which has redis-extension present but not enabled
    :return: installed php versions which has redis-extension present
    """
    versions = get_cached_php_installed_versions()
    return {version for version in versions if _redis_extension_info(version).get("is_present", False)
            and not _redis_extension_info(version).get("is_loaded", False)}


def uapi(function: str, input_parameters: Optional[Dict[str, str]] = None):
    input_parameters_as_list = [f"{key}={value}" for key, value in input_parameters.items()] if input_parameters else []
    result = run_in_cagefs_if_needed(['/usr/bin/uapi', "--output=json", "LangPHP", function, *input_parameters_as_list],
                                     env={})

    return json.loads(result.stdout)["result"]["data"]


def is_multisite(path: str) -> bool:
    marker = 'cl_multisite_detected'
    command = 'if ( is_multisite() ) { echo "%s"; }' % marker
    result = wordpress(path, 'eval', command)
    if isinstance(result, WordpressError):
        raise WposError(message=result.message, context=result.context)
    return marker in result


def wp_get_constant(wp_path: str, constant: str) -> Optional[str]:
    """
    Get defined constant value or None in case of error or undefined constant
    """
    command = "if (defined('%(const)s')) { echo %(const)s; }" % {'const': constant}
    result = wordpress(wp_path, 'eval', command)
    if isinstance(result, WordpressError):
        _logger.error('Error during get WP constant: %s', result)
        return None
    return result


def diagnose_redis_connection_constants(docroot: str, wordpress_path: str):
    """
    Check required constants for redis connection establishment
    """
    redis_schema = wp_get_constant(os.path.join(docroot, wordpress_path), RedisRequiredConstants.WP_REDIS_SCHEME.name)
    if not redis_schema and redis_schema != RedisRequiredConstants.WP_REDIS_SCHEME.value:
        raise WposError('WordPress constant "%(constant)s" is not defined or defined with wrong value %(value)s',
                        context={'constant': RedisRequiredConstants.WP_REDIS_SCHEME.name, 'value': redis_schema})
    socket = wp_get_constant(os.path.join(docroot, wordpress_path), RedisRequiredConstants.WP_REDIS_PATH.name)
    if not socket:
        raise WposError('WordPress constant "%(constant)s" is not defined',
                        context={'constant': RedisRequiredConstants.WP_REDIS_PATH.name})
    if not os.path.exists(socket):
        raise WposError('Redis socket %(socket)s does not exist in the system',
                        context={'socket': socket})


@dataclass
class WordpressError:
    message: str
    context: dict


def wordpress(path: str, command: str, subcommand: str, *args) -> Union[str, WordpressError]:
    """
    Helper to execute wp commands, for example
        wp --path=<path> plugin install redis-cache
        wp --path=<path> plugin activate redis-cache
        wp --path=<path> redis enable
        wp --path=<path> plugin deactivate redis-cache
        wp --path=<path> plugin uninstall redis-cache
    @return: stderr if error was happened.
    """
    php_version = _get_php_version(path)
    php_bin_path = str(php_version.bin())
    if not os.path.exists(php_bin_path):
        _logger.exception("Error during wp-cli command execution \"%s\": "
                          "invalid path to binary file \"%s\"",
                          command, php_bin_path)
        return WordpressError(
            message=_("Error during resolving path to php binary file:\n"
                      "got non-existent path \"%(path)s\"."),
            context={"path": php_bin_path}
        )
    # [attention] compatibility check may raise WpCliUnsupportedException exception
    wp_cli_compatibility_check(php_bin_path)
    command_part = ["--path={}".format(path), command, subcommand, *args]
    full_command = [php_bin_path, *WP_CLI_EXTENSIONS, "/opt/clwpos/wp-cli", *command_part]
    try:
        output = run_in_cagefs_if_needed(full_command, check=True, env={})
    except subprocess.CalledProcessError as error:
        command = ' '.join(['"%s"' % token for token in full_command])
        _logger.exception("Error during command execution: \n%s\n"
                          "stdout=%s\n"
                          "stderr=%s",
                          command, error.stdout, error.stderr)
        return WordpressError(
            message=_("Unexpected error happened during command execution: '%(command)s'.\n"
                      "Event is logged to file with stdout and stderr recorded."),
            context={
                "command": command
            }
        )

    return output.stdout


class DocRootPath(str):
    """This class represent path to doc_root."""
    pass


class DomainName(str):
    """This class represent domain name."""
    pass


def disable_without_config_affecting(
        arg: Union[DocRootPath, DomainName], wp_path: str, *, module: str = "object_cache",
) -> Optional[WordpressError]:
    """
    Deactivate and delete plugin redis-cache.
    :param arg: user's docroot or domain
    :param wp_path: path to user's wordpress directory
    :param module: module on which to perform enable operations
    :return: error if error was happened else None
    """
    if isinstance(arg, DomainName):
        doc_root = check_domain(arg)[-1]
    elif isinstance(arg, DocRootPath):
        doc_root = Path(home_dir(), arg)
    else:
        raise ValueError("Invalid argument format")

    if module != "object_cache":
        return WordpressError(
            message=_("Unsupported module: %(module)s"),
            context=dict(module=module)
        )

    abs_wp_path = str(Path(doc_root).joinpath(wp_path).absolute())
    last_error = None

    try:
        clear_redis_cache_config(abs_wp_path)
    except WposError as err:
        _logger.exception(err)
        last_error = WordpressError(err.message, err.context)
    except Exception as e:
        _logger.exception(e)
        last_error = WordpressError(
            message=_('Unexpected error happened while clearing cache: %(error)s'),
            context=dict(error=str(e)))

    result = wordpress(abs_wp_path, "plugin", "deactivate", "redis-cache")
    if isinstance(result, WordpressError):
        last_error = result

    result = wordpress(abs_wp_path, "plugin", "delete", "redis-cache")
    if isinstance(result, WordpressError):
        last_error = result

    return last_error


def enable_without_config_affecting(
        arg: Union[DocRootPath, DomainName], wp_path: str, *, module: str = "object_cache",
) -> Tuple[bool, Dict[str, str]]:
    """
    Install and activate plugin redis-cache.
    :param arg: user's docroot or domain
    :param wp_path: path to user's wordpress directory
    :param module: module on which to perform enable operations
    :return: tuple that consists of enabling status and details
    """
    if isinstance(arg, DomainName):
        __, doc_root = check_domain(arg)
    elif isinstance(arg, DocRootPath):
        doc_root = Path(home_dir(), arg)
    else:
        raise ValueError("Invalid argument format")

    wp_path = wp_path.lstrip("/")
    module = module or "object_cache"
    abs_wp_path = str(Path(doc_root).joinpath(wp_path).absolute())

    # try to install plugin
    try:
        install_redis_cache(abs_wp_path)
    except WposError as e:
        return False, dict(
            message=_("WordPress plugin installation failed. "
                      "Try again and contact your system administrator if issue persists."),
            details=e.message,
            context=e.context
        )

    # try to activate plugin and enable object caching
    try:
        enable_redis_object_cache(abs_wp_path)
    except WposError as e:
        rollback_object_cache(abs_wp_path)
        return False, dict(
            message=_("WordPress plugin activation failed. Changes were reverted and caching module is now disabled. "
                      "Try again and contact your system administrator if issue persists."),
            details=e.message,
            context=e.context
        )

    return True, {}


def reload_redis(uid: int = None):
    """
    Make redis reload via CLWPOS daemon
    :param uid: User uid (optional)
    """
    cmd_dict = {"command": "reload"}
    if uid:
        cmd_dict['uid'] = uid
    daemon_communicate(cmd_dict)


def enable_redis_object_cache(abs_wp_path: str):
    """
    Enable redis-cache plugin for user.
    :param abs_wp_path: absolute path to wp site
    :return:
    """
    res = wordpress(abs_wp_path, "plugin", "activate", "redis-cache")
    if isinstance(res, WordpressError):
        raise WposError(message=res.message, context=res.context)

    res = wordpress(abs_wp_path, "redis", "enable")
    if isinstance(res, WordpressError):
        raise WposError(message=res.message, context=res.context)
    create_redis_cache_config(abs_wp_path)


def install_redis_cache(abs_wp_path: str):
    """
    Install redis-cache plugin for user.
    :param abs_wp_path: absolute path to wp site
    :return:
    """
    res = wordpress(abs_wp_path, "plugin", "install", "redis-cache")
    if isinstance(res, WordpressError):
        raise WposError(message=res.message, context=res.context)


def rollback_object_cache(abs_wp_path: str):
    """
    Delete cloudlinux info from wp-config.php,
    deactivate and delete redis-cache plugin for user.
    :param abs_wp_path: absolute path to wp site
    :return:
    """
    clear_redis_cache_config(abs_wp_path)
    wordpress(abs_wp_path, "plugin", "deactivate", "redis-cache")
    wordpress(abs_wp_path, "plugin", "delete", "redis-cache")


def requirements_check(compatibilities: dict, wp_path: str, domain: str, module: str):
    """
    Scan output from the get command to ensure that we can install module
    """
    wp_req = None
    for domains in compatibilities.get("docroots", []):
        if domain not in domains["domains"]:
            continue
        for wps in domains["wps"]:
            if wps["path"] == wp_path:
                wp_req = wps
                break
    if wp_req is None:
        return False, {'result': _('Invalid path for domain "%(domain)s" with path "%(wp_path)s"'),
                       'context': {'domain': domain, 'wp_path': wp_path}}
    modules = wp_req.get("modules", {})
    module_data = modules.get(module, {})
    issues = modules.get(module, {}).get('issues', [])
    incompatibilities_only = [issue for issue in issues if issue['type'] == 'incompatibility']
    if not incompatibilities_only:
        return True, {}
    else:
        return False, {'module': module_data,
                       'result': _('Website "%(domain)s/ %(wp_path)s" has compatibility '
                                   'issues and optimization module cannot be enabled. '
                                   'Update page to view them all.'),
                       'context': {'domain': domain, 'wp_path': wp_path}}

https://t.me/AnonymousX5 - 2025