|
Server : Apache System : Linux cvar2.toservers.com 3.10.0-962.3.2.lve1.5.73.el7.x86_64 #1 SMP Wed Aug 24 21:31:23 UTC 2022 x86_64 User : njnconst ( 1116) PHP Version : 8.4.18 Disable Function : NONE Directory : /opt/alt/python37/lib/python3.7/site-packages/clwpos/ |
Upload File : |
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
# TODO: convert this file into python package
# and move logic of modules manipulations here
import json
import os
import logging
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Optional, Dict, Tuple
import argparse
from clcommon.clwpos_lib import get_wp_cache_plugin
import cpanel
from clwpos.cl_wpos_exceptions import WposError, WpCliCommandError
from clwpos.utils import uid_by_name, is_conflict_modules_installed, PHP
from clcommon.lib.cledition import is_cl_solo_edition
from clcommon.cpapi import cpusers
from clwpos.constants import CLWPOS_VAR_DIR, ALLOWED_MODULES_JSON, CLWPOS_UIDS_PATH, MINIMUM_SUPPORTED_PHP_OBJECT_CACHE, \
CL_DOC_USER_PLUGIN
from clwpos import gettext as _, constants
from cpanel import wordpress, WordpressError
ALLOWED_MODULES_CONFIG_VERSION = 1
@dataclass
class Issue:
"""
Generic class for keeping compatibility/misconfiguration issues
"""
header: str
description: str
fix_tip: str
context: Dict[str, str] = field(default_factory=dict)
@property
def dict_repr(self):
return asdict(self)
class UniqueId:
PHP_NOT_SUPPORTED = 'PHP_NOT_SUPPORTED'
PLUGIN_CONFLICT = 'PLUGIN_CONFLICT'
WORDPRESS_MULTISITE_ENABLED = 'WORDPRESS_MULTISITE_ENABLED'
MISCONFIGURED_WORDPRESS = 'MISCONFIGURED_WORDPRESS'
WEBSERVER_NOT_SUPPORTED = 'WEBSERVER_NOT_SUPPORTED'
PHP_MISCONFIGURATION = 'PHP_MISCONFIGURATION'
UNCOMPATIBLE_WORDPRESS_VERSION = 'UNCOMPATIBLE_WORDPRESS_VERSION'
CLOUDLINUX_MODULE_ALREADY_ENABLED = 'CLOUDLINUX_MODULE_ALREADY_ENABLED'
@dataclass
class CompatibilityIssue(Issue):
"""
For compatibility issues
"""
unique_id: str = None
telemetry: Dict[str, str] = field(default_factory=dict)
type: str = 'incompatibility'
@property
def dict_repr(self):
representation = asdict(self)
representation.pop('unique_id')
representation.pop('telemetry')
return representation
@dataclass
class MisconfigurationIssue(Issue):
"""
For misconfiguration issues
"""
type: str = 'misconfiguration'
class Module(str):
"""
Helper class which hides differences of optimization modules behind abstract methods.
"""
def __new__(cls, *args, **kwargs):
if cls != Module:
return str.__new__(cls, *args)
classes = {
"object_cache": _ObjectCache,
"site_optimization": _SiteOptimization
}
try:
return classes[args[0]](*args)
except KeyError:
raise argparse.ArgumentTypeError(f"No such module: {args[0]}.")
@classmethod
def redis_daemon_required(cls):
raise NotImplementedError
@classmethod
def collect_docroot_issues(cls, wpos_user_obj, doc_root_info, allowed_modules=None):
raise NotImplementedError
@classmethod
def is_php_supported(cls, php_version: PHP):
raise NotImplementedError
@classmethod
def minimum_supported_wp_version(cls):
raise NotImplementedError
@staticmethod
def collect_wordpress_issues(self, wordpress: Dict, docroot: str, module_is_enabled: bool):
raise NotImplementedError
class _ObjectCache(Module):
"""Implementation for object cache module"""
NAME = 'OBJECT_CACHE'
@classmethod
def redis_daemon_required(cls):
return True
@classmethod
def collect_docroot_issues(cls, wpos_user_obj, doc_root_info, allowed_modules=None):
"""
Collects incompatibilities related to docroot (non-supported handler, etc)
for object cache module.
"""
issues = []
php_version = doc_root_info['php_version']
is_modules_allowed = None
supported_php_versions = wpos_user_obj.supported_php_versions[OBJECT_CACHE_MODULE]
header__, fix_tip__, description__, uniq_id__, telemetry__ = None, None, None, None, None
if allowed_modules is not None:
is_modules_allowed = 'object_cache' in allowed_modules
if not cls.is_php_supported(php_version):
header__ = _('PHP version is not supported')
fix_tip__ = _('Please, set or ask your system administrator to set one of the '
'supported PHP versions: %(compatible_versions)s')
description__ = _('Non supported PHP version %(php_version)s currently is used.')
uniq_id__ = UniqueId.PHP_NOT_SUPPORTED
telemetry__ = dict(
reason='PHP_VERSION_TOO_LOW',
php_version=php_version,
supported_php_versions=supported_php_versions
)
elif php_version not in cpanel.get_cached_php_versions_with_redis_present():
header = _('Redis extension is not installed for selected php version')
fix_tip = _('Please, install or ask your system administrator to install redis extension '
'for current %(php_version)s version, or use one of the compatible php versions: '
'%(compatible_versions)s for the domain.')
description = _('Redis PHP extension is required for optimization module, but not installed for '
'selected PHP version: %(php_version)s.')
if not is_modules_allowed:
issues.append(MisconfigurationIssue(
header=header,
fix_tip=fix_tip,
description=description,
context=dict(php_version=php_version, compatible_versions=supported_php_versions)))
else:
header__ = header
fix_tip__ = fix_tip
description__ = description
uniq_id__ = UniqueId.PHP_NOT_SUPPORTED
telemetry__ = dict(
php_version=php_version,
reason='PHP_REDIS_NOT_INSTALLED',
supported_php_versions=supported_php_versions
)
elif php_version not in cpanel.get_cached_php_versions_with_redis_loaded():
header = _('Redis extension is not loaded for selected php version')
fix_tip = _('Please, load or ask your system administrator to load redis extension '
'for current %(php_version)s version, or use one of the compatible php versions: '
'%(compatible_versions)s for the domain.')
description = _('Redis PHP extension is required for optimization module, but not loaded for '
'selected PHP version: %(php_version)s.')
if not is_modules_allowed:
issues.append(MisconfigurationIssue(
header=header,
fix_tip=fix_tip,
description=description,
context=dict(php_version=php_version, compatible_versions=supported_php_versions)))
else:
header__ = header
fix_tip__ = fix_tip
description__ = description
uniq_id__ = UniqueId.PHP_NOT_SUPPORTED
telemetry__ = dict(
php_version=php_version,
reason='PHP_REDIS_NOT_LOADED',
supported_php_versions=supported_php_versions
)
if not supported_php_versions:
fix_tip__ = _('Please, ask your system administrator to setup at least '
'one of the recommended PHP version in accordance with docs (%(docs_url)s).')
if header__ is not None:
issues.append(
CompatibilityIssue(
header=header__,
description=description__,
fix_tip=fix_tip__,
context=dict(php_version=php_version,
compatible_versions=', '.join(supported_php_versions),
docs_url=constants.CL_DOC_USER_PLUGIN),
unique_id=uniq_id__,
telemetry=telemetry__
)
)
if doc_root_info["php_handler"] not in wpos_user_obj.supported_handlers:
issues.append(
CompatibilityIssue(
header=_('Unsupported PHP handler'),
description=_('Website uses unsupported PHP handler. Currently supported '
'handler(s): %(supported_handlers)s.'),
fix_tip=_('Please, set or ask your system administrator to set one of the '
'supported PHP handlers for the domain: %(supported_handlers)s. '
'Or keep watching our blog: %(blog_url)s for supported handlers list updates.'),
context={
'supported_handlers': ", ".join(wpos_user_obj.supported_handlers),
'blog_url': 'https://blog.cloudlinux.com/'
},
unique_id=UniqueId.PHP_NOT_SUPPORTED,
telemetry=dict(
reason='PHP_UNSUPPORTED_HANDLER',
handler=doc_root_info["php_handler"],
supported_handlers=wpos_user_obj.supported_handlers,
php_version=php_version
)
)
)
incompatible_php_modules = {}
incompatible_module = 'snuffleupagus'
if incompatible_php_modules.get(php_version) == incompatible_module or \
is_conflict_modules_installed(php_version, incompatible_module):
incompatible_php_modules[php_version] = incompatible_module
issues.append(
CompatibilityIssue(
header=_('Unsupported PHP module is loaded'),
description=_('Incompatible PHP module "%(incompatible_module)s" is currently used.'),
fix_tip=_('Please, disable or remove "%(incompatible_module)s" PHP extension.'),
context=dict(incompatible_module=incompatible_module),
unique_id=UniqueId.PHP_NOT_SUPPORTED,
telemetry=dict(
handler=doc_root_info["php_handler"],
supported_handlers=wpos_user_obj.supported_handlers,
php_version=php_version
)
))
return issues
@classmethod
def is_php_supported(cls, php_version: PHP):
"""
Check if passed php version >= minimum PHP version
supported by object cache module.
"""
return php_version.digits >= MINIMUM_SUPPORTED_PHP_OBJECT_CACHE
@classmethod
def minimum_supported_wp_version(cls):
return constants.MINIMUM_SUPPORTED_WP_OBJECT_CACHE
@classmethod
def collect_wordpress_issues(cls, self, wordpress: Dict, docroot: str, module_is_enabled: bool):
issues = []
wp_dir = Path(docroot).joinpath(wordpress["path"])
wp_content_dir = wp_dir.joinpath("wp-content")
plugin_type = "object-cache"
detected_object_cache_plugin = get_wp_cache_plugin(wp_dir, plugin_type)
if module_is_enabled:
if detected_object_cache_plugin != "redis-cache":
issue = self._get_wp_plugin_compatibility_issues(docroot, wordpress)
if issue:
issues.append(issue)
if not self.is_redis_running:
issues.append(
MisconfigurationIssue(
header=_('Redis is not running'),
description=_('Object cache module is enabled, but redis process is not running.'),
fix_tip=_('Redis will start automatically in 5 minutes. '
'If the issue persists - contact your system administrator and report this issue')
)
)
try:
cpanel.diagnose_redis_connection_constants(docroot, wordpress['path'])
except WpCliCommandError as e:
issues.append(
MisconfigurationIssue(
header=_('Unable to identify redis constants in wordpress config'),
description=_('wp-cli utility returns malformed response, reason: "%(reason)s"'),
fix_tip=_('Please, try to check executed command and fix possible issues with it. '
'If issue persists - please, contact CloudLinux support.'),
context=dict(
reason=e.message % e.context
)
)
)
except WposError as e:
issues.append(
MisconfigurationIssue(
header=_('Missed redis constants in site config'),
description=_('WordPress config does not have needed constants '
'for redis connection establishment.\n'
'Details: %(reason)s'),
fix_tip=_('Please, try to disable and enable plugin again. '
'If issue persists - please, contact CloudLinux support.'),
context=dict(
reason=e.message % e.context
)
)
)
if detected_object_cache_plugin == "Unknown":
drop_in_file = wp_content_dir.joinpath(f'{plugin_type}.php')
issues.append(
CompatibilityIssue(
header=_('Conflicting object cache plugin enabled'),
description=_('Unknown custom object cache plugin is already enabled'),
fix_tip=_(f'Remove the drop-in ({drop_in_file}) file from the WordPress '
f'instance because it conflicts with AccelerateWP object caching.'),
unique_id=UniqueId.PLUGIN_CONFLICT,
telemetry=dict(
reason='OBJECT_CACHE_ALREADY_ENABLED',
plugin=detected_object_cache_plugin
)
))
elif detected_object_cache_plugin == "w3-total-cache":
issues.append(
CompatibilityIssue(
header=_('Object Cache module of W3 Total Cache plugin is incompatible'),
description=_('WordPress website already has Object Cache feature enabled '
'with caching backend configured by the the W3 Total Cache plugin.'),
fix_tip=_('Deactivate Object Cache in W3 Total Cache plugin settings.'),
context=dict(),
unique_id=UniqueId.PLUGIN_CONFLICT,
telemetry=dict(
reason='OBJECT_CACHE_ALREADY_ENABLED',
plugin=detected_object_cache_plugin
)
))
elif detected_object_cache_plugin not in (None, "redis-cache"):
issues.append(
CompatibilityIssue(
header=_('Conflicting object cache plugin enabled'),
description=_('The "%(detected_wp_plugin)s" plugin conflicts with AccelerateWP object caching.'),
fix_tip=_('Deactivate object caching in the plugin settings or completely uninstall'
'the conflicting plugin using the WordPress administration interface.'),
context=dict(detected_wp_plugin=detected_object_cache_plugin),
unique_id=UniqueId.PLUGIN_CONFLICT,
telemetry=dict(
reason='OBJECT_CACHE_ALREADY_ENABLED',
plugin=detected_object_cache_plugin
)
))
try:
if not self.check_installed_roc_plugin(os.path.join(docroot, wordpress['path'])):
issues.append(
CompatibilityIssue(
header=_('Another Redis Object Cache plugin is installed'),
description=_('Non CloudLinux Redis Object Cache is installed for the website'),
fix_tip=_('Uninstall Redis Object Cache plugin using WordPress administration page'),
unique_id=UniqueId.PLUGIN_CONFLICT,
telemetry=dict(
reason='OBJECT_CACHE_ALREADY_ENABLED',
plugin=detected_object_cache_plugin
)
))
except WpCliCommandError as e:
issues.append(
MisconfigurationIssue(
header=_('Unable to identify installed object cache plugin in WordPress'),
description=_('wp-cli utility returns malformed response, reason: "%(reason)s"'),
fix_tip=_('Please, try to check executed command and fix possible issues with it. '
'If issue persists - please, contact CloudLinux support.'),
context=dict(
reason=e.message % e.context
)
)
)
try:
multisite = cpanel.is_multisite(os.path.join(docroot, wordpress["path"]))
if multisite:
issues.append(
CompatibilityIssue(
header=_('WordPress Multisite mode is enabled'),
description=_('WordPress uses the Multisite mode which is currently not supported.'),
fix_tip=_('Install or configure WordPress in the single-site mode.'),
unique_id=UniqueId.WORDPRESS_MULTISITE_ENABLED,
telemetry=dict()
))
except WposError as e:
issues.append(
CompatibilityIssue(
header=_('Unexpected WordPress error'),
description=_('Unable to detect if the WordPress installation has the Multisite mode enabled '
'mode due to unexpected error. '
'\n\n'
'Technical details:\n%(error_message)s.\n'
'\nMost likely WordPress installation is not working properly.'),
fix_tip=_('If this is only one issue, please check that your website is working properly – '
'try to run the specified command to find any obvious '
'errors in the WordPress configuration. '
'Otherwise, try to fix other issues first - it may help to resolve this issue as well.'),
context=dict(
error_message=e.message % e.context
),
unique_id=UniqueId.MISCONFIGURED_WORDPRESS,
telemetry=dict(
error_message=e.message % e.context
)
))
return issues
class _SiteOptimization(Module):
"""Implementation for site optimization module"""
NAME = 'SITE_OPTIMIZATION'
@classmethod
def redis_daemon_required(cls):
return False
@classmethod
def collect_docroot_issues(cls, wpos_user_obj, doc_root_info, allowed_modules=None):
"""
Collects incompatibilities related to docroot (non-supported handler, etc)
for site optimizatin module.
"""
issues = []
php_version = doc_root_info['php_version']
if not cls.is_php_supported(php_version):
supported_php_versions = wpos_user_obj.supported_php_versions[SITE_OPTIMIZATION_MODULE]
issues.append(
CompatibilityIssue(
header=_('PHP version is not supported'),
fix_tip=_('Please, set or ask your system administrator to set one of the '
'supported PHP version: %(compatible_versions)s for the domain.'),
description=_('Non supported PHP version %(php_version)s currently is used.'),
context=dict(php_version=php_version,
compatible_versions=', '.join(supported_php_versions),
docs_url=CL_DOC_USER_PLUGIN),
unique_id=UniqueId.PHP_NOT_SUPPORTED,
telemetry=dict(reason='PHP_VERSION_TOO_LOW')
)
)
return issues
@staticmethod
def _requirements():
with open("/opt/cloudlinux-site-optimization-module/requirements.json", "r") as f:
# {
# "required_php_version": "7.0",
# "required_wp_version": "5.4",
# "incompatible_plugins": {
# "w3-total-cache": "w3-total-cache/w3-total-cache.php",
# "wp-super-cache": "wp-super-cache/wp-cache.php"
# }
# }
return json.load(f)
@classmethod
def is_php_supported(cls, php_version: PHP):
"""
Check if passed php version >= minimum PHP version
supported by site optimization module.
"""
return php_version.digits >= int(cls._requirements()["required_php_version"].replace(".", ""))
@classmethod
def minimum_supported_wp_version(cls):
return cls._requirements()["required_wp_version"]
@classmethod
def collect_wordpress_issues(cls, self, wordpress_info: Dict, docroot: str, module_is_enabled: bool):
issues = []
abs_wp_path = Path(docroot).joinpath(wordpress_info["path"])
wp_content_dir = abs_wp_path.joinpath("wp-content")
plugin_type = "advanced-cache"
detected_advanced_cache_plugin = get_wp_cache_plugin(abs_wp_path, plugin_type)
plugins_data = wordpress(str(abs_wp_path), "plugin", "list", "--status=active", "--format=json")
if isinstance(plugins_data, WordpressError):
found_plugins = set()
else:
try:
found_plugins = {item["name"] for item in json.loads(plugins_data)}
except (ValueError, TypeError, json.JSONDecodeError):
issues.append(
MisconfigurationIssue(
header=_('Unable to identify module compatibility'),
description=_('Malformed output received from the following command: <br> $/opt/clwpos/wp-cli plugin list --status=active --format=json'
'<br><br>The raw command output is:<br> \"%(wp_cli_response)s\"'),
fix_tip=_('Please, check the received command output and ensure it returns a valid JSON.'),
context=dict(
wp_cli_response=plugins_data
)
)
)
found_plugins = set()
incompatible_plugins = set(cls._requirements()["incompatible_plugins"].keys())
result = found_plugins & incompatible_plugins
if detected_advanced_cache_plugin:
result.add(detected_advanced_cache_plugin)
# if our WP Rocket module is enabled it's not conflicting plugin
if module_is_enabled:
result.discard("WP Rocket")
# for more beautiful output
if len(result) > 1:
result.discard("Unknown")
result = list(result)
if len(result) == 1 and result[0] == 'Unknown':
drop_in_file = wp_content_dir.joinpath(f'{plugin_type}.php')
issues.append(
CompatibilityIssue(
header=_("Conflicting advanced cache plugin enabled"),
description=_("Unknown advanced cache plugin is already enabled."),
fix_tip=_(f'Remove the drop-in ({drop_in_file}) file from the WordPress '
f'instance because it conflicts with AccelerateWP site optimization.'),
context=dict(plugins=", ".join(result)),
unique_id=UniqueId.PLUGIN_CONFLICT,
telemetry=dict(
reason='SOM_ALREADY_ENABLED',
plugin=list(result)
)
)
)
elif result:
issues.append(
CompatibilityIssue(
header=_("Conflicting plugins are enabled"),
description=_("Found conflicting plugins: %(plugins)s."),
fix_tip=_("Deactivate and uninstall the conflicting plugin "
"using the WordPress administration interface."),
context=dict(plugins=", ".join(result)),
unique_id=UniqueId.PLUGIN_CONFLICT,
telemetry=dict(
reason='SOM_ALREADY_ENABLED',
plugin=list(result)
)
)
)
return issues
OBJECT_CACHE_MODULE = Module("object_cache")
SITE_OPTIMIZATION_MODULE = Module("site_optimization")
ALL_OPTIMIZATION_MODULES = [
OBJECT_CACHE_MODULE,
SITE_OPTIMIZATION_MODULE
]
# on CloudLinux Solo we enable modules by default while on other
# editions we want it to be disabled by default
IS_MODULE_ALLOWED_BY_DEFAULT = bool(is_cl_solo_edition())
def get_admin_config_directory(uid: Optional[int]) -> str:
"""
Get directory path in which admin's config files are stored.
Hides logic of detecting current OS edition environment.
:param uid: uid
:return: admin's config directory path
"""
is_solo = is_cl_solo_edition()
if is_solo:
admin_config_dir = os.path.join(CLWPOS_VAR_DIR, 'solo')
else:
if uid is None:
raise WposError(
message=_('Internal error: obtaining config path without uid is only '
'available for CloudLinux OS Solo. '
'Please contact support for help: '
'https://cloudlinux.zendesk.com'))
admin_config_dir = os.path.join(CLWPOS_UIDS_PATH, str(uid))
return admin_config_dir
def get_modules_allowed_path(uid: Optional[int]) -> str:
"""
Get modules_allowed file path for user.
:param uid: uid
:return: modules_allowed file path
"""
admin_config_dir = get_admin_config_directory(uid)
modules_allowed_path = os.path.join(admin_config_dir, ALLOWED_MODULES_JSON)
return modules_allowed_path
def get_allowed_modules(uid: int) -> list:
"""
Reads configuration file (which is manipulated by admin)
and returns only that modules which are allowed
to be enabled by endusers.
:param uid: uid (used only for CL Shared, not used on solo)
@return: list of module unique ids
"""
modules_admin_config = get_admin_modules_config(uid)
return [
module
for module, is_allowed in modules_admin_config['modules'].items()
if is_allowed
]
def is_module_allowed_for_user(module: str) -> bool:
"""
Checks whether <module> enabled for at least one user
"""
if is_cl_solo_edition(skip_jwt_check=True):
data = get_admin_modules_config(uid=None)['modules']
return data.get(module)
else:
users = list(cpusers())
for username in users:
uid = uid_by_name(username)
if not uid:
continue
if get_admin_modules_config(uid)['modules'].get(module):
return True
return False
def any_module_allowed_on_server() -> bool:
"""
Check if there are any optimization module allowed on server
"""
return any(is_module_allowed_for_user(module) for module in ALL_OPTIMIZATION_MODULES)
def get_admin_modules_config(uid=None):
"""
Reads modules statuses from .json.
In case if config does not exist returns defaults.
"""
defaults = {
"version": str(ALLOWED_MODULES_CONFIG_VERSION),
"modules": dict.fromkeys(ALL_OPTIMIZATION_MODULES, IS_MODULE_ALLOWED_BY_DEFAULT),
}
modules_json_path = get_modules_allowed_path(uid)
if not os.path.exists(modules_json_path):
return defaults
# TODO: locking and tempfiles
# https://cloudlinux.atlassian.net/browse/LU-2073
try:
# modules_allowed.json contents:
# {
# "version": "1",
# "modules": {
# "object_cache": true,
# "site_optimization": true
# }
# }
with open(modules_json_path, "r") as f:
modules_from_file = json.load(f)
# update admin's config with modules that are not in it (values are taken from defaults)
# case: new module was added in the lve-utils update and it is not in the config yet
for module, status in defaults['modules'].items():
modules_from_file['modules'].setdefault(module, status)
except (json.JSONDecodeError, KeyError) as e:
logging.warning('Config %s is malformed, using defaults instead, error: %s', modules_json_path, e)
return defaults
return modules_from_file
def write_modules_allowed(uid: int, gid: int, data_dict_to_write: dict):
"""
Writes modules_allowed file for user
:param uid: User uid
:param gid: User gid
:param data_dict_to_write: Data to write
"""
modules_allowed_path = get_modules_allowed_path(uid)
json_data = json.dumps(data_dict_to_write, indent=4)
with open(modules_allowed_path, "w") as f:
f.write(json_data)
owner, group, mode = get_admin_config_permissions(gid)
os.chown(modules_allowed_path, owner, group)
os.chmod(modules_allowed_path, mode)
def get_admin_config_permissions(gid: int) -> Tuple[int, int, int]:
"""
Return owner, group and permission which files inside
admin's config directory should have.
User should have rights to read (not write) config,
so we set owner root, group depends on CL edition (see comment above)
"""
if is_cl_solo_edition(skip_jwt_check=True):
# root:root 644 - CL Solo
owner, group, mode = 0, 0, 0o644
else:
# root:username 640 - CL Shared Pro
owner, group, mode = 0, gid, 0o640
return owner, group, mode