|
Server : Apache System : Linux cvar2.toservers.com 3.10.0-962.3.2.lve1.5.73.el7.x86_64 #1 SMP Wed Aug 24 21:31:23 UTC 2022 x86_64 User : njnconst ( 1116) PHP Version : 8.4.18 Disable Function : NONE Directory : /opt/alt-old/python37/lib/python3.7/site-packages/clwpos/ |
Upload File : |
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
# TODO: convert this file into python package
# and move logic of modules manipulations here
import json
import os
import logging
from typing import Optional
from clwpos.cl_wpos_exceptions import WposError
from clwpos.utils import uid_by_name
from clcommon.lib.cledition import is_cl_solo_edition
from clcommon.cpapi import cpusers
from clwpos import gettext
from clwpos.constants import CLWPOS_VAR_DIR, ALLOWED_MODULES_JSON, CLWPOS_UIDS_PATH
ALLOWED_MODULES_CONFIG_VERSION = 1
OBJECT_CACHE_MODULE = "object_cache"
ALL_OPTIMIZATION_MODULES = [
OBJECT_CACHE_MODULE
]
# on CloudLinux Solo we enable modules by default while on other
# editions we want it to be disabled by default
IS_MODULE_ALLOWED_BY_DEFAULT = bool(is_cl_solo_edition())
def get_modules_allowed_path(uid: Optional[int]) -> str:
"""
Get modules_allowed file path for user.
Hides logic of detecting current OS edition environment.
:param uid: uid
:return: modules_allowed file path
"""
is_solo = is_cl_solo_edition()
if is_solo:
modules_allowed_dir = os.path.join(CLWPOS_VAR_DIR, 'solo')
else:
if uid is None:
raise WposError(
message=gettext('Internal error: obtaining config path without uid is only '
'available for CloudLinux OS Solo. '
'Please contact support for help: '
'https://cloudlinux.zendesk.com'))
modules_allowed_dir = os.path.join(CLWPOS_UIDS_PATH, str(uid))
modules_allowed_path = os.path.join(modules_allowed_dir, ALLOWED_MODULES_JSON)
return modules_allowed_path
def get_allowed_modules(uid: int) -> list:
"""
Reads configuration file (which is manipulated by admin)
and returns only that modules which are allowed
to be enabled by endusers.
:param uid: uid (used only for CL Shared, not used on solo)
@return: list of module unique ids
"""
modules_admin_config = get_admin_modules_config(uid)
return [
module
for module, is_allowed in modules_admin_config['modules'].items()
if is_allowed
]
def is_module_allowed_for_user(module=OBJECT_CACHE_MODULE):
"""
Checks whether <module> enabled for at least one user
"""
if is_cl_solo_edition(skip_jwt_check=True):
data = get_admin_modules_config(uid=None)['modules']
return data.get(module)
else:
users = list(cpusers())
for username in users:
uid = uid_by_name(username)
if not uid:
continue
if get_admin_modules_config(uid)['modules'].get(module):
return True
return False
def get_admin_modules_config(uid=None):
"""
Reads modules statuses from .json.
In case if config does not exist returns defaults.
"""
defaults = {
"version": str(ALLOWED_MODULES_CONFIG_VERSION),
"modules": dict.fromkeys(ALL_OPTIMIZATION_MODULES, IS_MODULE_ALLOWED_BY_DEFAULT),
}
modules_json_path = get_modules_allowed_path(uid)
if not os.path.exists(modules_json_path):
return defaults
# TODO: locking and tempfiles
# https://cloudlinux.atlassian.net/browse/LU-2073
try:
with open(modules_json_path, "r") as f:
return json.load(f)
except json.JSONDecodeError:
logging.warning('Config %s is malformed, using defaults instead', modules_json_path)
return defaults
def write_modules_allowed(uid: int, gid: int, data_dict_to_write: dict):
"""
Writes modules_allowed file for user
:param uid: User uid
:param gid: User gid
:param data_dict_to_write: Data to write
"""
modules_allowed_path = get_modules_allowed_path(uid)
json_data = json.dumps(data_dict_to_write, indent=4)
# modules_allowed.json perm: root:root 644 - CL Solo, root:username 640 - CL Shared Pro
if is_cl_solo_edition(skip_jwt_check=True):
umask_saved = os.umask(0o22)
gid = 0
else:
umask_saved = os.umask(0o27)
try:
with open(modules_allowed_path, "w") as f:
f.write(json_data)
# User should have rights to read (not write) config,
# so we set owner root, group depends on CL edition (see comment above)
os.chown(modules_allowed_path, 0, gid)
finally:
os.umask(umask_saved)