|
Server : Apache System : Linux cvar2.toservers.com 3.10.0-962.3.2.lve1.5.73.el7.x86_64 #1 SMP Wed Aug 24 21:31:23 UTC 2022 x86_64 User : njnconst ( 1116) PHP Version : 8.4.18 Disable Function : NONE Directory : /proc/self/root/opt/alt-old/python37/lib/python3.7/site-packages/clwpos/ |
Upload File : |
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
# wposctl.py - work code for clwposctl utility
from __future__ import absolute_import
import argparse
import json
import os
import pwd
import subprocess
from copy import deepcopy
from typing import Dict, Iterator, Set, Tuple, List
import platform
import shutil
import logging
import cpanel
from clcommon.utils import run_command
from clcommon import cpapi
from clwpos.optimization_modules import (
ALL_OPTIMIZATION_MODULES,
get_modules_allowed_path,
get_admin_modules_config,
write_modules_allowed,
is_module_allowed_for_user,
)
from clcommon.clpwd import drop_privileges
from clwpos.cl_wpos_exceptions import WposError
from clwpos.user.config import UserConfig
from clwpos.constants import PUBLIC_OPTIONS, ALT_PHP_REDIS_ENABLE_UTILITY,\
CLWPOS_UIDS_PATH, EA_PHP_REDIS_ENABLE_UTILITY, INSTALL_WPOS_HOOKS_UTILITY
from clwpos import gettext as _
from cpanel import enable_without_config_affecting, disable_without_config_affecting, DocRootPath
from clwpos.parse import ArgumentParser, CustomFormatter
from clwpos.logsetup import setup_logging, init_wpos_sentry_safely
from clcommon.lib.cledition import is_cl_solo_edition
from clcommon.cpapi.cpapiexceptions import NoPackage
from clwpos.report_generator import ReportGenerator, ReportGeneratorError
from clwpos.utils import (
catch_error,
error_and_exit,
print_data,
check_license_decorator,
set_wpos_icon_visibility,
acquire_lock,
get_default_public_options,
get_pw,
is_redis_configuration_running,
)
from clcommon.clcagefs import setup_mount_dir_cagefs, _remount_cagefs
DISABLED_OMS_MESSAGE = _("All optimization modules are currently disabled. "
"End-user WPOS interface blocked.")
WPOS_SERVICE_ENABLE_ERR_MSG = _("Unable to run WPOS daemon. Caching databases won't start and work. "
"You can find detailed information in log file")
REDIS_CONFIGURATION_WARNING_MSG = _("Configuration of PHP redis extension is running in background process. "
"This may take up to several minutes. Until the end of this process "
"functionality of WP Optimization Suite is limited.")
parser = ArgumentParser(
"/usr/bin/clwpos-admin",
"Utility for control CL WPOS admin interface",
formatter_class=CustomFormatter
)
_logger = setup_logging(__name__)
class CloudlinuxWposAdmin(object):
"""
Class for run cloudlinux-wpos-admin commands
"""
DAEMON_INSTALL_MARKER = '/var/lve/clwpos.installed'
def __init__(self):
self._is_json = False
self._opts: argparse.Namespace
self._logger = setup_logging(__name__)
init_wpos_sentry_safely(self._logger)
self.clwpos_path = "/var/clwpos"
self.modules_allowed_name = "modules_allowed.json"
self.is_solo = is_cl_solo_edition(skip_jwt_check=True)
self.wait_child_process = bool(os.environ.get('CL_WPOS_WAIT_CHILD_PROCESS'))
if self.wait_child_process:
self.exec_func = subprocess.run
else:
self.exec_func = subprocess.Popen
@catch_error
def run(self, argv):
"""
Run command action
:param argv: sys.argv[1:]
:return: clwpos-user utility retcode
"""
self._parse_args(argv)
result = getattr(self, self._opts.command.replace("-", "_"))()
print_data(self._is_json, result)
def _parse_args(self, argv):
"""
Parse command line arguments
:param argv: sys.argv[1:]
"""
self._opts = parser.parse_args(argv)
self._is_json = True
@parser.command(help="Uninstall cache for all domain during downgrade")
def uninstall_cache_for_all_domains(self) -> dict:
"""
This command used during downgrade to lve-utils, which version does not support clwpos
:return:
"""
try:
users = cpapi.cpusers()
except (OSError, IOError, IndexError, NoPackage) as e:
self._logger.warning("Can't get user list from panel: %s", str(e))
return {}
for username in users:
with drop_privileges(username):
for doc_root, wp_path, module in _enabled_modules(username):
disable_without_config_affecting(DocRootPath(doc_root), wp_path, module=module)
return {}
@parser.argument(
"--modules",
help="Argument for module of list of comma separated modules",
type=str,
choices=ALL_OPTIMIZATION_MODULES,
required=True
)
@parser.mutual_exclusive_group(
[
(["--allowed"], {"help": "Allow modules for users", "action": "store_true"}),
(["--disallowed"], {"help": "Disallow modules for users", "action": "store_true"}),
],
required=True,
)
@parser.argument("--users", help="User or list of comma separated users", type=str,
required=not is_cl_solo_edition(skip_jwt_check=True))
@parser.command(help="Managing list of allowed modules for users")
@check_license_decorator
def set_module(self) -> dict:
"""
Write info related to module allowance into user file
"""
if self.is_solo:
# For Solo we use first user in list
users = cpapi.cpusers()
if not users:
error_and_exit(
self._is_json,
{"result": _("There are no users in the control panel.")},
)
username = users[0]
else:
# CL Shared (Pro)
user_arg_list = self._opts.users.split(",")
username = user_arg_list[0].strip() # in v1 only single user processing is supported
first_user_enabled = self._opts.allowed and not is_module_allowed_for_user()
warning_dict = {}
if first_user_enabled:
if not os.path.exists(self.DAEMON_INSTALL_MARKER):
retcode, stderr, stdout = self._install_monitoring_daemon()
if retcode:
self._logger.error(
"Starting service ended with error: %s, %s", stdout, stderr)
warning_dict.update({"warning": WPOS_SERVICE_ENABLE_ERR_MSG})
else:
# touch file to avoid multiple attemps to enable daemon
open(self.DAEMON_INSTALL_MARKER, 'w').close()
# update modules only after daemon startup
modules_list = [module.strip() for module in self._opts.modules.split(",")]
warning_dict.update(self.process_user_modules(username, modules_list, self._opts.allowed))
if first_user_enabled:
# If WPOS is allowed for first user on server, show warning message
# and run configuration of redis and cagefs in background processes
if not self.is_solo:
warning_dict.update({"warning": REDIS_CONFIGURATION_WARNING_MSG})
setup_mount_dir_cagefs(
CLWPOS_UIDS_PATH, prefix='*', remount_cagefs=True, remount_in_background=not self.wait_child_process
)
self.exec_func([ALT_PHP_REDIS_ENABLE_UTILITY], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
self.exec_func([EA_PHP_REDIS_ENABLE_UTILITY], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
self.exec_func(
["sh", "/usr/share/cloudlinux/add_clwpos_crons.sh"], # TODO remove this duplicate
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env={
"CLSHARE": "/usr/share/cloudlinux",
"WPOS_REQ_CRON_FILE": "/etc/cron.d/clwpos_req_cron",
"CLWPOS_REDIS_EXTENSION_INSTALLER": "/etc/cron.d/clwpos_redis_extension_installer",
"CLWPOS_CLEANER_CRON": "/etc/cron.d/clwpos_cleaner_cron"
}
)
self.exec_func(
[INSTALL_WPOS_HOOKS_UTILITY, '--install'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
elif self._opts.allowed and is_redis_configuration_running():
warning_dict.update({"warning": REDIS_CONFIGURATION_WARNING_MSG})
return warning_dict
def _install_monitoring_daemon(self):
"""
We are not installing daemon on CloudLinux Shared (NOT pro),
so we must enable it manually during first module installation.
"""
if 'el6' in platform.release():
retcode, stdout, stderr = run_command(['/sbin/chkconfig', '--add', 'clwpos_monitoring'],
return_full_output=True)
else:
# doing the same things that we do in spec for CloudLinux Solo
# the difference is that we don't want to start daemon automatically
# after package installation because of CL Shared (NOT pro) servers
shutil.copy('/usr/share/cloudlinux/clwpos_monitoring.service',
'/etc/systemd/system/clwpos_monitoring.service')
retcode, stdout, stderr = run_command(['systemctl', 'enable', 'clwpos_monitoring.service'],
return_full_output=True)
if not retcode:
retcode, stdout, stderr = run_command(['systemctl', 'daemon-reload'],
return_full_output=True)
if not retcode:
retcode, stdout, stderr = run_command(['service', 'clwpos_monitoring', 'start'],
return_full_output=True)
return retcode, stderr, stdout
@parser.mutual_exclusive_group(
[
(["--hide-icon"], {"help": "Hide WPOS icon", "action": "store_true"}),
(["--show-icon"], {"help": "Show WPOS icon", "action": "store_true"}),
],
required=True,
)
@parser.command(help="Manage global options")
@check_license_decorator
def set_options(self) -> dict:
"""
Set global options that affect all users.
For v1 it is only allowed to control WPOS icon visibility.
"""
if self.is_solo:
error_and_exit(
self._is_json,
{"result": _("This command is not supported on Solo edition.")},
)
retcode, stdout = set_wpos_icon_visibility(hide=self._opts.hide_icon)
if retcode:
error_and_exit(
self._is_json,
{
"result": _("Error during changing of WPOS icon visibility: \n%(error)s"),
"context": {"error": stdout}
},
)
with acquire_lock(PUBLIC_OPTIONS):
if not os.path.isfile(PUBLIC_OPTIONS):
public_config_data = get_default_public_options()
else:
try:
with open(PUBLIC_OPTIONS) as f:
public_config_data = json.load(f)
except json.decoder.JSONDecodeError as err:
raise WposError(
message=_("File is corrupted: Please, delete file %(config_file)s"
" or fix the line provided in details"),
details=str(err),
context={'config_file': PUBLIC_OPTIONS})
public_config_data["show_icon"] = self._opts.show_icon
with open(PUBLIC_OPTIONS, "w") as f:
json.dump(public_config_data, f)
return {}
@catch_error
@parser.command(help="Return public options", only_for_shared_pro=True)
@check_license_decorator
def get_options(self):
if not os.path.isfile(PUBLIC_OPTIONS):
return get_default_public_options()
else:
with acquire_lock(PUBLIC_OPTIONS):
with open(PUBLIC_OPTIONS, 'r') as f:
content = f.read()
try:
return json.loads(content)
except json.decoder.JSONDecodeError as err:
raise WposError(
message=_("File is corrupted: Please, delete file %(config_file)s or fix the line provided in details"),
details=str(err),
context={'config_file': PUBLIC_OPTIONS})
@catch_error
@parser.mutual_exclusive_group(
[
(["--all"], {"help": "Argument for all users in the panel", "action": "store_true"}),
(["--users"], {"help": "Argument for user or list of comma separated users", "type": str}),
],
required=True,
)
@parser.command(help="Return the report about allowed and restricted user's modules")
def get_report(self) -> dict:
"""
Print report in stdout.
[!ATTENTION!] response jsons are different for Solo and Shared!
"""
report = {}
if self.is_solo:
try:
modules = get_admin_modules_config()['modules']
except (KeyError, json.JSONDecodeError):
raise WposError(
message=_("Configuration file '%(config_path)s' is corrupted. "
"Check it and make sure it has valid json format.\n"
"Contact CloudLinux support in case you need any assistance."),
context=dict(config_path=get_modules_allowed_path())
)
report = {'modules': modules}
else:
try:
users = self._opts.users.split(',') if self._opts.users else None
report = ReportGenerator().get(target_users=users)
except ReportGeneratorError as e:
error_and_exit(
self._is_json,
{
'result': e.message,
'context': e.context
}
)
except Exception as e:
error_and_exit(
self._is_json,
{
'result': _('Error during getting report: %(error)s'),
'context': {'error': e},
}
)
return report
@catch_error
@parser.mutual_exclusive_group(
[
(["--all"], {"help": "Argument for all users in the panel", "action": "store_true"}),
(["--status"], {"help": "Show scan status", "action": "store_true"}),
],
required=True,
)
@parser.command(help="Create the report about allowed and restricted user's modules")
def generate_report(self) -> dict:
if self.is_solo:
error_and_exit(
self._is_json, {"result": _("Solo edition is not supported.")}
)
rg = ReportGenerator()
try:
if self._opts.status:
scan_status = rg.get_status()
else:
# TODO: implement --users support: send List[str] argument
scan_status = rg.scan() # initial status dict, like 0/10
return {
'result': 'success',
**scan_status,
}
except ReportGeneratorError as e:
error_and_exit(
self._is_json,
{
'result': e.message,
'context': e.context
}
)
except Exception as e:
error_and_exit(
self._is_json,
{
'result': _('Error during generating report: %(error)s'),
'context': {'error': str(e)},
}
)
@staticmethod
def all_modules_disabled(modules: Dict[str, bool]) -> bool:
"""
Check if all optimization modules are disabled.
"""
if any(status is True for status in modules.values()):
return False
return True
def process_user_modules(self, user_name: str, modules: List[str], allowed_state: bool):
"""
Enable/disable modules for user.
- write admin config for user with new state
- install/uninstall WP plugin
- reload deamon to start/stop redis
:param user_name: username
:param modules: Module list to process
:param allowed_state: True - enable module, False - disale
"""
# Get modules_allowed.json for user
try:
pw_info = get_pw(username=user_name)
uid, gid = pw_info.pw_uid, pw_info.pw_gid
except KeyError:
error_and_exit(
self._is_json,
{
"result": _("User %(username)s does not exist."),
"context": {"username": user_name},
},
)
modules_allowed_path = get_modules_allowed_path(uid)
warning_dict = {}
try:
os.makedirs(os.path.dirname(modules_allowed_path), 0o755, exist_ok=False)
except OSError:
pass
else:
if not self.is_solo:
_remount_cagefs(user_name)
with acquire_lock(modules_allowed_path):
config_contents = get_admin_modules_config(uid)
old_state = deepcopy(config_contents["modules"])
config_contents["modules"].update(dict.fromkeys(modules, allowed_state))
new_state = deepcopy(config_contents["modules"])
try:
write_modules_allowed(uid, gid, config_contents)
except (IOError, OSError) as err:
raise WposError(
message=_("Configuration file '%(path)s' update failed."),
details=str(err),
context=dict(path=modules_allowed_path)
)
synchronize_plugins_status_for_user(user_name, uid, old_state, new_state)
if self.is_solo and self.all_modules_disabled(config_contents["modules"]):
warning_dict.update({"warning": DISABLED_OMS_MESSAGE})
return warning_dict
def synchronize_plugins_status_for_user(username: str, uid: int, old_state: dict, new_state: dict):
"""
Compare old and new states of modules in admin's wpos config,
determine what modules should be enabled and disabled
and synchronize new state for each panel's user.
"""
old_state = {key for key, value in old_state.items() if value}
new_state = {key for key, value in new_state.items() if value}
enabled_modules = new_state - old_state
disabled_modules = old_state - new_state
synchronize_plugins_for_user(username, uid, enabled_modules, disabled_modules)
def synchronize_plugins_for_user(username: str, uid: int, enabled_modules: Set[str], disabled_modules: Set[str]):
"""
Iterate through user's docroots and wp_paths
and enable/disable modules with wp-cli
not modifying user's wpos config.
"""
with drop_privileges(username):
for doc_root, wp_path, _ in _enabled_modules(username):
for module in enabled_modules:
enable_without_config_affecting(
DocRootPath(doc_root),
wp_path,
module=module,
)
for module in disabled_modules:
disable_without_config_affecting(
DocRootPath(doc_root),
wp_path,
module=module,
)
try:
# Reload redis for user
cpanel.reload_redis(uid)
except WposError as e:
_logger.exception("CLWPOS daemon error: '%s'; details: '%s'; context: '%s'", e.message, e.details, e.context)
except Exception as e:
_logger.exception(e)
def _enabled_modules(username: str) -> Iterator[Tuple[str, str, str]]:
return UserConfig(username=username).enabled_modules()