Hi!
Nope, still not solved. Weâve migrated to jupyterhub 3.1.1, and I can figure out a way to send progress during the spawn of the container.
Here is the current code of our systemdremotespawner
. Itâs based on systemdspawner
(⌠a version from 2019) ; I added a âbalancerâ class (that selects the node where the userâs systemd service will be launched) and a few other specific classes to interact with our schoolâs LDAP, user names, etc (those are not included below).
systemdremotespawner.py
import os
import pwd
import subprocess
from traitlets import Bool, Unicode, List, Dict, Int
import asyncio
from jupyterhub.spawner import Spawner
from jupyterhub.utils import random_port
from systemdremotespawner.ssp import SSP
from cedelogger import cedeLogger
from notouser import notoDiskInfo
import hashlib, logging
# To populate self.state during start up (email adress)
from epflldap import EPFL_LDAP
# To read Quotas
import json
class QuotaReader:
quotafile = '/etc/jupyterhub/quota.rules'
def _quota_file_exists(self):
return os.path.exists(self.quotafile)
def _get_user_data(self, username = None):
default_quotas = {}
user_quotas = {}
if not self._quota_file_exists() or username is None:
return user_quotas
pattern = username+':'
fquota = open(self.quotafile, 'r')
for line in fquota:
if line.startswith('#'):
continue
if line.startswith(pattern):
try:
user_quotas = json.loads(line.split(pattern)[1])
except:
continue
elif line.startswith('*:'):
try:
default_quotas = json.loads(line.split('*:')[1])
except:
continue
if bool(user_quotas):
return user_quotas
return default_quotas
def get_value(self, username, key, default = None):
value = default
user_quotas = self._get_user_data(username)
if key in user_quotas: value = user_quotas[key]
return value
class SystemdRemoteSpawner(Spawner):
# Below this point, configuration items for systemd
user_workingdir = Unicode(
None,
allow_none=True,
help="""
Path to start each notebook user on.
{USERNAME} and {USERID} are expanded.
Defaults to the home directory of the user.
Not respected if dynamic_users is set to True.
"""
).tag(config=True)
username_template = Unicode(
'{USERNAME}',
help="""
Template for unix username each user should be spawned as.
{USERNAME} and {USERID} are expanded.
This user should already exist in the system.
Not respected if dynamic_users is set to True
"""
).tag(config=True)
default_shell = Unicode(
os.environ.get('SHELL', '/bin/bash'),
help='Default shell for users on the notebook terminal'
).tag(config=True)
extra_paths = List(
[],
help="""
Extra paths to prepend to the $PATH environment variable.
{USERNAME} and {USERID} are expanded
""",
).tag(config=True)
unit_name_template = Unicode(
'{USERNAME}-j',
help="""
Template to use to make the systemd service names.
{USERNAME} and {USERID} are expanded}
"""
).tag(config=True)
isolate_tmp = Bool(
False,
help="""
Give each notebook user their own /tmp, isolated from the system & each other
"""
).tag(config=True)
isolate_devices = Bool(
False,
help="""
Give each notebook user their own /dev, with a very limited set of devices mounted
"""
).tag(config=True)
disable_user_sudo = Bool(
False,
help="""
Set to true to disallow becoming root (or any other user) via sudo or other means from inside the notebook
""",
).tag(config=True)
readonly_paths = List(
None,
allow_none=True,
help="""
List of paths that should be marked readonly from the user notebook.
Subpaths maybe be made writeable by setting readwrite_paths
""",
).tag(config=True)
readwrite_paths = List(
None,
allow_none=True,
help="""
List of paths that should be marked read-write from the user notebook.
Used to make a subpath of a readonly path writeable
""",
).tag(config=True)
unit_extra_properties = Dict(
{},
help="""
Dict of extra properties for systemd-run --property=[...].
Keys are property names, and values are either strings or
list of strings (for multiple entries). When values are
lists, ordering is guaranteed. Ordering across keys of the
dictionary are *not* guaranteed.
Used to add arbitrary properties for spawned Jupyter units.
Read `man systemd-run` for details on per-unit properties
available in transient units.
"""
).tag(config=True)
dynamic_users = Bool(
False,
help="""
Allocate system users dynamically for each user.
Uses the DynamicUser= feature of Systemd to make a new system user
for each hub user dynamically. Their home directories are set up
under /var/lib/{USERNAME}, and persist over time. The system user
is deallocated whenever the user's server is not running.
See http://0pointer.net/blog/dynamic-users-with-systemd.html for more
information.
Requires systemd 235.
"""
).tag(config=True)
# End of configuration items
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# All traitlets configurables are configured by now
self.log.debug('start of constructor')
# Instantiate Balancer
self.balancer = SSP(logger = self.log, debug = True)
self.unit_name = self._expand_user_vars(self.unit_name_template)
self.log.debug('user:%s Initialized spawner with unit %s', self.user.name, self.unit_name)
self.cl_uid = self._get_cedelogger_uid()
self.cedeLogger = cedeLogger(tag='noto.systemdremotespawner')
self.ndi = notoDiskInfo(record = True)
self.QR = QuotaReader()
self.diskdata = self.ndi.get_diskdata(self._expand_user_vars('{USERNAME}'))
self.current_state = None
self.log.debug('end of constructor')
def _get_cedelogger_uid(self):
return hashlib.sha1(('Salt_'+self.user.name+'_Pepper').encode('utf-8')).hexdigest()
def _expand_user_vars(self, string):
"""
Expand user related variables in a given string
Currently expands:
{USERNAME} -> Name of the user
{USERID} -> UserID
"""
return string.format(
USERNAME=self.user.name,
USERID=self.user.id
)
def get_state(self):
"""
Save state required to reconstruct spawner from scratch
We save the unit name, just in case the unit template was changed
between a restart. We do not want to lost the previously launched
events.
JupyterHub before 0.7 also assumed your notebook was dead if it
saved no state, so this helps with that too!
"""
if self.current_state is not None: return self.current_state
# Re/construct state:
self.current_state = super().get_state()
if 'complete' in self.current_state: return self.current_state
self.current_state['unit_name'] = self.unit_name
username = self.unit_name
specific_usernames = { 'povalles-j': 'ch-epfl-253705-j', 'pjermann-j': 'ch-epfl-157873-j' }
if username in specific_usernames:
username = specific_usernames[username]
EL = EPFL_LDAP(
user = username,
logger = self.log,
use_cache = True,
cache_dir = '/etc/jupyterhub/ldap.d',
touch_cache = True
)
self.current_state['email'] = EL.get_email()
self.current_state['sciper'] = EL.get_sciper()
self.current_state['ip'] = self.ip
self.current_state['port'] = self.port
self.current_state['disk'] = self.diskdata
self.current_state['complete'] = True
return self.current_state
def load_state(self, state):
"""
Load state from storage required to reinstate this user's server
This runs after __init__, so we can override it with saved unit name
if needed. This is useful primarily when you change the unit name template
between restarts.
JupyterHub before 0.7 also assumed your notebook was dead if it
saved no state, so this helps with that too!
"""
if 'unit_name' in state:
self.unit_name = state['unit_name']
def get_env(self):
env = super().get_env()
# Clean-up - remove some environment variables not needed for systemd/bwrap containers
for e in ['MEM_LIMIT', 'CPU_LIMIT']:
if e in env:
del env[e]
return env
async def start(self):
self.port = random_port()
self.log.debug('user:%s Using port %s to start spawning user server', self.user.name, self.port)
# If there's a unit with this name running already. This means a bug in
# JupyterHub, a remanant from a previous install or a failed service start
# from earlier. Regardless, we kill it and start ours in its place.
already_running_on_host = self.balancer.service_running(self.unit_name)
if already_running_on_host != False:
self.log.info('user:%s Unit %s already exists on host %s, but is not known to JupyterHub. Killing it.', self.user.name, self.unit_name, already_running_on_host)
self.balancer.stop_service(self.unit_name, already_running_on_host)
# Still running ?
if self.balancer.service_running(self.unit_name):
self.log.error('user:%s Could not stop already existing unit %s on host %s', self.user.name, self.unit_name, already_running_on_host)
raise Exception('Could not stop already existing unit {}'.format(self.unit_name))
# Good, now let's start the server on the remote host.
# We allow an override list (/etc/jupyterhub/spawn.rules).
# Format: username:server_ip (one user per line)
rules = {}
try:
with open('/etc/jupyterhub/spawn.rules', 'r') as f: content = f.readlines()
for line in content:
rule_user, rule_server = line.rstrip().split(':')
rules[rule_user] = rule_server
except:
rules = {}
if self.user.name in rules:
self.ip = rules[self.user.name]
self.cedeLogger.log(record={"event": "spawning", "destination": self.ip, "reason": "/etc/jupyterhub/spawn.rules", "uid": self.cl_uid}, level=logging.INFO)
self.log.info('user:%s Will be spawned on remote host: %s [HARD RULE from /etc/jupyterhub/spawn.rules]', self.user.name, self.ip)
else:
self.ip = self.balancer.get_balanced_host(self.user.name)
self.cedeLogger.log(record={"event": "spawning", "destination": self.ip, "reason": "load-balancing", "uid": self.cl_uid}, level=logging.INFO)
self.log.info('user:%s Will be spawned on remote host: %s', self.user.name, self.ip)
env = self.get_env()
properties = {}
# Dynamic user
ndi = notoDiskInfo(record = True)
self.diskdata = ndi.get_diskdata(self._expand_user_vars('{USERNAME}'))
state_directory = ndi.get_sysd_statedir(self._expand_user_vars('{USERNAME}'))
properties['DynamicUser'] = 'yes'
properties['StateDirectory'] = state_directory['relative']
env['HOME'] = state_directory['absolute']
working_dir = state_directory['absolute']
uid = gid = None
if self.isolate_tmp: properties['PrivateTmp'] = 'yes'
if self.isolate_devices: properties['PrivateDevices'] = 'yes'
if self.extra_paths:
env['PATH'] = '{extrapath}:{curpath}'.format(
curpath=env['PATH'],
extrapath=':'.join(
[self._expand_user_vars(p) for p in self.extra_paths]
)
)
env['SHELL'] = self.default_shell
if self.mem_limit is not None:
properties['MemoryAccounting'] = 'yes'
quota_value = self.QR.get_value(self.user.name, 'mem_limit', self.mem_limit)
properties['MemoryLimit'] = quota_value
if self.cpu_limit is not None:
properties['CPUAccounting'] = 'yes'
quota_value = self.QR.get_value(self.user.name, 'cpu_limit', self.cpu_limit)
properties['CPUQuota'] = '{}%'.format(int(quota_value * 100))
if self.disable_user_sudo: properties['NoNewPrivileges'] = 'yes'
if self.readonly_paths is not None:
properties['ReadOnlyDirectories'] = [
self._expand_user_vars(path)
for path in self.readonly_paths
]
if self.readwrite_paths is not None:
properties['ReadWriteDirectories'] = [
self._expand_user_vars(path)
for path in self.readwrite_paths
]
# We want to split the log messages into dedicated log files (in /var/log/jupyter/%unit%.log)
properties['StandardOutput'] = 'journal'
properties['StandardError'] = 'journal'
# The important part here is this:
# rsyslog is configured to detect this name pattern and send logs to the corresponding log file.
properties['SyslogIdentifier'] = self.unit_name+'split'
# Setup group (100 == 'users')
properties['Group'] = '100'
properties.update(self.unit_extra_properties)
command = self.balancer.build_transient_service_command(
self.unit_name,
cmd = [self._expand_user_vars(c) for c in self.cmd],
args = [self._expand_user_vars(a) for a in self.get_args()],
working_dir = working_dir,
environment_variables = env,
properties = properties,
uid = uid,
gid = gid
)
self.cedeLogger.log(record={'event': 'spawn', 'target': self.ip, "uid": self.cl_uid}, level=logging.INFO)
started = self.balancer.start_transient_service(command, self.unit_name, self.ip, state_directory['absolute'])
if not started:
self.log.debug('Error while starting remote transient service.')
return None
for i in range(self.start_timeout):
is_up = await self.poll()
if is_up is None:
return (self.ip, self.port)
await asyncio.sleep(1)
return None
async def stop(self, now=False):
self.log.debug('Stopping unit {} on host {}.'.format(self.unit_name, self.ip))
self.cedeLogger.log(record={'event': 'stop', 'target': self.ip, 'uid': self.cl_uid}, level=logging.INFO)
self.balancer.stop_service(self.unit_name, self.ip)
async def poll(self):
host = self.balancer.service_running(self.unit_name, self.ip)
if host:
self.ip = host
return None
return 1
# EOF
If you see anything that can explain why progress messages are all sent at once at the end of the spawning process, please let me know