|
Server : Apache System : Linux cvar2.toservers.com 3.10.0-962.3.2.lve1.5.73.el7.x86_64 #1 SMP Wed Aug 24 21:31:23 UTC 2022 x86_64 User : njnconst ( 1116) PHP Version : 8.4.18 Disable Function : NONE Directory : /proc/self/root/opt/alt/python37/lib/python3.7/site-packages/clwpos/user/ |
Upload File : |
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
from __future__ import absolute_import
import json
import os
import pwd
import traceback
from typing import Iterable, Optional
from clwpos.optimization_modules import ALL_OPTIMIZATION_MODULES
from clwpos.logsetup import setup_logging
from clwpos.utils import (
get_relative_docroot,
create_clwpos_dir_if_not_exists,
is_run_under_user
)
from clcommon.clwpos_lib import is_wp_path
from clwpos import constants
from clwpos.cl_wpos_exceptions import WposError
from clwpos import gettext as _
class ConfigError(WposError):
"""
Used for all exceptions during handling clwpos user config
in UserConfig methods
"""
pass
class UserConfig(object):
"""
Class to manage clwpos user config - read, write, set params in config.
"""
CONFIG_PATH = os.path.join("{homedir}", constants.USER_WPOS_DIR, constants.USER_CLWPOS_CONFIG)
DEFAULT_MAX_CACHE_MEMORY = f"{constants.DEFAULT_MAX_CACHE_MEMORY}mb"
DEFAULT_CONFIG = {"docroots": {}, "max_cache_memory": DEFAULT_MAX_CACHE_MEMORY}
def __init__(self, username):
if not is_run_under_user():
raise ConfigError(_("Trying to use UserConfig class as root"))
self.username = username
self.homedir = pwd.getpwnam(username).pw_dir
self.config_path = self.CONFIG_PATH.format(homedir=self.homedir)
self._logger = setup_logging(__name__)
create_clwpos_dir_if_not_exists(username)
def read_config(self):
"""
Reads config from self.config_path
"""
try:
with open(self.config_path, "r") as f:
return json.loads(f.read())
except Exception:
exc_string = traceback.format_exc()
raise ConfigError(
message=_("Error while reading config %(config_path)s: %(exception_string)s"),
context={"config_path": self.config_path, "exception_string": exc_string}
)
def write_config(self, config: dict):
"""
Writes config (as json) to self.config_path
"""
try:
config_json = json.dumps(config, indent=4, sort_keys=True)
with open(self.config_path, "w") as f:
f.write(config_json)
except Exception as e:
raise ConfigError(
message=_("Attempt of writing to config file failed due to error:\n%(exception)s"),
context={"exception": e}
)
def is_default_config(self):
"""
Checks if user customized his config already.
"""
return not os.path.exists(self.config_path)
def get_config(self):
"""
Returns default config or config content from self.config_path
"""
# if config file is not exists, returns DEFAULT CONFIG
if self.is_default_config():
return self.DEFAULT_CONFIG
# Otherwise, reads config from file
# and returns it if it's not broken
try:
config = self.read_config()
except ConfigError:
return self.DEFAULT_CONFIG
return config if isinstance(config, dict) else self.DEFAULT_CONFIG
def set_params(self, params: dict):
"""
Set outer (not "docroots") params in config.
Example:
Old config:
{
"docroots": ...,
"max_cache_memory": "123mb",
}
Input params:
{
"max_cache_memory": "1024mb",
"param": "value"
}
New config:
{
"docroots": ...,
"max_cache_memory": "1024mb",
"param": "value"
}
"""
config = self.get_config()
for key, value in params.items():
config[key] = value
self.write_config(config)
def is_module_enabled(
self, domain: str, wp_path: str, module: str, config: Optional[dict] = None) -> bool:
config = config or self.get_config()
try:
docroot = get_relative_docroot(domain, self.homedir)
except Exception as e:
self._logger.warning(e, exc_info=True)
raise ConfigError(
message=_("Can't find docroot for domain '%(domain)s' and homedir '%(homedir)s'"),
context={"domain": domain, "homedir": self.homedir}
)
if not is_wp_path(os.path.join(self.homedir, docroot, wp_path)):
raise ConfigError(
message=_("Wrong wordpress path '%(wp_path)s' passed"),
context={"wp_path": wp_path}
)
if module not in ALL_OPTIMIZATION_MODULES:
raise ConfigError(
message=_("Invalid module %(module)s, available choices: %(choices)s"),
context={"module": module, "choices": ALL_OPTIMIZATION_MODULES}
)
try:
docroots = config["docroots"]
module_info = docroots.get(docroot, {}).get(wp_path, [])
return module in module_info
except (KeyError, AttributeError, TypeError) as e:
self._logger.warning(f"config {self.config_path} is broken: {e}", exc_info=True)
raise ConfigError(
message=_("Config is broken.\nRepair %(config_path)s or restore from backup."),
context={"config_path": self.config_path}
)
def disable_module(self, domain: str, wp_path: str, module: str) -> None:
try:
docroot = get_relative_docroot(domain, self.homedir)
except Exception as e:
self._logger.exception(e)
raise ConfigError(
message=_("Docroot for domain '%(domain)s' is not found"),
context={"domain": domain}
)
if not is_wp_path(os.path.join(self.homedir, docroot, wp_path)):
raise ConfigError(
message=_("Wrong wordpress path '%(wp_path)s' passed"),
context={"wp_path": wp_path}
)
if module not in ALL_OPTIMIZATION_MODULES:
raise ConfigError(
message=_("Invalid module %(module)s, available choices: %(choices)s"),
context={"module": module, "choices": ALL_OPTIMIZATION_MODULES}
)
config = self.get_config()
# check here as well that config has expected structure
if not self.is_module_enabled(domain, wp_path, module, config):
return
# remove module from the list
config["docroots"][docroot][wp_path].remove(module)
# delete wp_path if all modules are disabled
if not config["docroots"][docroot][wp_path]:
del config["docroots"][docroot][wp_path]
# delete docroot in it doesn't have wordpresses
if not config["docroots"][docroot]:
del config["docroots"][docroot]
self.write_config(config)
def enable_module(self, domain: str, wp_path: str, module: str) -> None:
try:
docroot = get_relative_docroot(domain, self.homedir)
except Exception as e:
self._logger.exception(e)
raise ConfigError(
message=_("Docroot for domain '%(domain)s' is not found"),
context={"domain": domain}
)
if not is_wp_path(os.path.join(self.homedir, docroot, wp_path)):
raise ConfigError(
message=_("Wrong wordpress path '%(wp_path)s' passed"),
context={"wp_path": wp_path}
)
if module not in ALL_OPTIMIZATION_MODULES:
raise ConfigError(
message=_("Invalid module %(module)s, available choices: %(choices)s"),
context={"module": module, "choices": ALL_OPTIMIZATION_MODULES}
)
config = self.get_config()
# check here as well that config has expected structure
if self.is_module_enabled(domain, wp_path, module, config):
return
if "docroots" not in config:
config["docroots"] = {}
if docroot not in config["docroots"]:
config["docroots"][docroot] = {}
if wp_path not in config["docroots"][docroot]:
config["docroots"][docroot][wp_path] = []
config["docroots"][docroot][wp_path].append(module)
self.write_config(config)
def enabled_modules(self):
for doc_root, doc_root_info in self.get_config()["docroots"].items():
for wp_path, module_names in doc_root_info.items():
for name in module_names:
yield doc_root, wp_path, name
def wp_paths_with_enabled_module(self, module_name: str) -> Iterable[str]:
"""
Return absolute WP paths with specified module enabled.
"""
for doc_root, wp_path, name in self.enabled_modules():
if name == module_name:
yield os.path.join(self.homedir, doc_root, wp_path)
def get_enabled_sites_count_by_modules(self, checked_module_names):
"""
Returns count of sites with enabled module
"""
sites_count = 0
for _, doc_root_info in self.get_config().get('docroots', {}).items():
for _, module_names in doc_root_info.items():
sites_count += any(checked_module_name in module_names for checked_module_name in checked_module_names)
return sites_count