[Spawner / ProgressAPI] only flushing events once the server is ready

Hi all !

I have been deploying hubs with custom spawners for years (3 at least) and I never got the event log / progress bar to work properly : events are flushed to the browser in one go when the single user server is ready.
Our custom spawner does not implement progress() method yet, so only the 2 default event messages from the main Spawner class are sent.

Versions used:
jupyterhub==2.3.1 (same behavior with previous versions and also with 3.0.0)
tornado==6.2 (same behavior with previous versions)

I found other mentions of similar behavior… but none is providing anything that looks like a working solution.

I opened an issue on github (Spawner / ProgressAPI only flushing events once the server is ready · Issue #4043 · jupyterhub/jupyterhub · GitHub) ; but it looks like I am the only one with this issue - so probably something I’m doing wrong.

Can anyone point me to the right documentation on implementing progress() in our custom spawner ?
I checked Starting servers with the JupyterHub API — JupyterHub 3.0.0 documentation but I did not find what I was looking for.

Thanks !

Is your custom Spawner available somewhere to look at? The issue may be in start (or any other call) blocking somewhere instead of letting async things proceed. The progress events will only be delivered if all pending operations are in an await step, because asyncio is ‘cooperative’.

Hi @minrk, thank you for this info, it looks promising.

If I understand correctly : our custom start() (or some other method) is not being cooperative and does not let other asynchronous jobs (including progress events) the chance to run.

I’m going to dig into that. Our spawner is not (yet) available in a public repo - still not sure if it would make sense since it is very specific to our deployment. Thanks again for pointing me into that direction.

Hi again!

I checked and our start() method is basically defined like this:

    async def start(self):
        ...
        # build ssh command and send it to the remote host
        ...
        for i in range(self.start_timeout):
            is_up = await self.poll()
            if is_up is None:
                return (self.ip, self.port)
            await asyncio.sleep(1)
        return None

… so it definitively makes use of await.

I also checked the poll() method, and it’s a non blocking method returning almost immediately.
Still, our poll() method does not follow the rules from (jupyterhub/spawner.py at 38afbcc0d067863d9690328dc41a24cd7487e62b ¡ jupyterhub/jupyterhub ¡ GitHub) - it never returns 0.
I’ll make it comply and see it that changes something.

@nibheis have you managed to solve this problem?

Hi!

Nope, still not solved. We’ve migrated to jupyterhub 3.1.1, and I can figure out a way to send progress during the spawn of the container.

Here is the current code of our systemdremotespawner. It’s based on systemdspawner (… a version from 2019) ; I added a “balancer” class (that selects the node where the user’s systemd service will be launched) and a few other specific classes to interact with our school’s LDAP, user names, etc (those are not included below).

systemdremotespawner.py

import os
import pwd
import subprocess
from traitlets import Bool, Unicode, List, Dict, Int
import asyncio

from jupyterhub.spawner import Spawner
from jupyterhub.utils import random_port

from systemdremotespawner.ssp import SSP

from cedelogger import cedeLogger
from notouser import notoDiskInfo 
import hashlib, logging

# To populate self.state during start up (email adress)
from epflldap import EPFL_LDAP

# To read Quotas
import json


class QuotaReader:
    quotafile = '/etc/jupyterhub/quota.rules'

    def _quota_file_exists(self):
        return os.path.exists(self.quotafile)

    def _get_user_data(self, username = None):
        default_quotas = {}
        user_quotas = {}
        if not self._quota_file_exists() or username is None:
            return user_quotas
        pattern = username+':'
        fquota = open(self.quotafile, 'r')
        for line in fquota:
            if line.startswith('#'):
                continue
            if line.startswith(pattern):
                try:
                    user_quotas = json.loads(line.split(pattern)[1])
                except:
                    continue
            elif line.startswith('*:'):
                try:
                    default_quotas = json.loads(line.split('*:')[1])
                except:
                    continue
        if bool(user_quotas):
            return user_quotas
        return default_quotas

    def get_value(self, username, key, default = None):
        value = default
        user_quotas = self._get_user_data(username)
        if key in user_quotas: value = user_quotas[key]
        return value


class SystemdRemoteSpawner(Spawner):
    # Below this point, configuration items for systemd

    user_workingdir = Unicode(
        None,
        allow_none=True,
        help="""
        Path to start each notebook user on.

        {USERNAME} and {USERID} are expanded.

        Defaults to the home directory of the user.

        Not respected if dynamic_users is set to True.
        """
    ).tag(config=True)

    username_template = Unicode(
        '{USERNAME}',
        help="""
        Template for unix username each user should be spawned as.

        {USERNAME} and {USERID} are expanded.

        This user should already exist in the system.

        Not respected if dynamic_users is set to True
        """
    ).tag(config=True)

    default_shell = Unicode(
        os.environ.get('SHELL', '/bin/bash'),
        help='Default shell for users on the notebook terminal'
    ).tag(config=True)

    extra_paths = List(
        [],
        help="""
        Extra paths to prepend to the $PATH environment variable.

        {USERNAME} and {USERID} are expanded
        """,
    ).tag(config=True)

    unit_name_template = Unicode(
        '{USERNAME}-j',
        help="""
        Template to use to make the systemd service names.

        {USERNAME} and {USERID} are expanded}
        """
    ).tag(config=True)

    isolate_tmp = Bool(
        False,
        help="""
        Give each notebook user their own /tmp, isolated from the system & each other
        """
    ).tag(config=True)

    isolate_devices = Bool(
        False,
        help="""
        Give each notebook user their own /dev, with a very limited set of devices mounted
        """
    ).tag(config=True)

    disable_user_sudo = Bool(
        False,
        help="""
        Set to true to disallow becoming root (or any other user) via sudo or other means from inside the notebook
        """,
    ).tag(config=True)

    readonly_paths = List(
        None,
        allow_none=True,
        help="""
        List of paths that should be marked readonly from the user notebook.

        Subpaths maybe be made writeable by setting readwrite_paths
        """,
    ).tag(config=True)

    readwrite_paths = List(
        None,
        allow_none=True,
        help="""
        List of paths that should be marked read-write from the user notebook.

        Used to make a subpath of a readonly path writeable
        """,
    ).tag(config=True)

    unit_extra_properties = Dict(
        {},
        help="""
        Dict of extra properties for systemd-run --property=[...].

        Keys are property names, and values are either strings or
        list of strings (for multiple entries). When values are
        lists, ordering is guaranteed. Ordering across keys of the
        dictionary are *not* guaranteed.

        Used to add arbitrary properties for spawned Jupyter units.
        Read `man systemd-run` for details on per-unit properties
        available in transient units.
        """
    ).tag(config=True)

    dynamic_users = Bool(
        False,
        help="""
        Allocate system users dynamically for each user.

        Uses the DynamicUser= feature of Systemd to make a new system user
        for each hub user dynamically. Their home directories are set up
        under /var/lib/{USERNAME}, and persist over time. The system user
        is deallocated whenever the user's server is not running.

        See http://0pointer.net/blog/dynamic-users-with-systemd.html for more
        information.

        Requires systemd 235.
        """
    ).tag(config=True)

    # End of configuration items

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # All traitlets configurables are configured by now

        self.log.debug('start of constructor')
        # Instantiate Balancer
        self.balancer = SSP(logger = self.log, debug = True)
        self.unit_name = self._expand_user_vars(self.unit_name_template)
        self.log.debug('user:%s Initialized spawner with unit %s', self.user.name, self.unit_name)
        self.cl_uid = self._get_cedelogger_uid()
        self.cedeLogger = cedeLogger(tag='noto.systemdremotespawner')
        self.ndi = notoDiskInfo(record = True)
        self.QR = QuotaReader()
        self.diskdata = self.ndi.get_diskdata(self._expand_user_vars('{USERNAME}'))
        self.current_state = None
        self.log.debug('end of constructor')

    def _get_cedelogger_uid(self):
        return hashlib.sha1(('Salt_'+self.user.name+'_Pepper').encode('utf-8')).hexdigest()

    def _expand_user_vars(self, string):
        """
        Expand user related variables in a given string

        Currently expands:
          {USERNAME} -> Name of the user
          {USERID} -> UserID
        """
        return string.format(
            USERNAME=self.user.name,
            USERID=self.user.id
        )

    def get_state(self):
        """
        Save state required to reconstruct spawner from scratch

        We save the unit name, just in case the unit template was changed
        between a restart. We do not want to lost the previously launched
        events.

        JupyterHub before 0.7 also assumed your notebook was dead if it
        saved no state, so this helps with that too!
        """
        if self.current_state is not None: return self.current_state
        # Re/construct state:
        self.current_state = super().get_state()
        if 'complete' in self.current_state: return self.current_state
        self.current_state['unit_name'] = self.unit_name
        username = self.unit_name
        specific_usernames = { 'povalles-j': 'ch-epfl-253705-j', 'pjermann-j': 'ch-epfl-157873-j' }
        if username in specific_usernames:
            username = specific_usernames[username]
        EL = EPFL_LDAP(
                user        = username,
                logger      = self.log,
                use_cache   = True,
                cache_dir   = '/etc/jupyterhub/ldap.d',
                touch_cache = True
                )
        self.current_state['email'] = EL.get_email()
        self.current_state['sciper'] = EL.get_sciper()
        self.current_state['ip'] = self.ip
        self.current_state['port'] = self.port
        self.current_state['disk'] = self.diskdata
        self.current_state['complete'] = True
        return self.current_state

    def load_state(self, state):
        """
        Load state from storage required to reinstate this user's server

        This runs after __init__, so we can override it with saved unit name
        if needed. This is useful primarily when you change the unit name template
        between restarts.

        JupyterHub before 0.7 also assumed your notebook was dead if it
        saved no state, so this helps with that too!
        """
        if 'unit_name' in state:
            self.unit_name = state['unit_name']

    def get_env(self):
        env = super().get_env()
        # Clean-up - remove some environment variables not needed for systemd/bwrap containers
        for e in ['MEM_LIMIT', 'CPU_LIMIT']:
            if e in env:
                del env[e]
        return env

    async def start(self):
        self.port = random_port()
        self.log.debug('user:%s Using port %s to start spawning user server', self.user.name, self.port)

        # If there's a unit with this name running already. This means a bug in
        # JupyterHub, a remanant from a previous install or a failed service start
        # from earlier. Regardless, we kill it and start ours in its place.
        already_running_on_host = self.balancer.service_running(self.unit_name)
        if already_running_on_host != False:
            self.log.info('user:%s Unit %s already exists on host %s, but is not known to JupyterHub. Killing it.', self.user.name, self.unit_name, already_running_on_host)
            self.balancer.stop_service(self.unit_name, already_running_on_host)
            # Still running ?
            if self.balancer.service_running(self.unit_name):
                self.log.error('user:%s Could not stop already existing unit %s on host %s', self.user.name, self.unit_name, already_running_on_host)
                raise Exception('Could not stop already existing unit {}'.format(self.unit_name))

        # Good, now let's start the server on the remote host.
        # We allow an override list (/etc/jupyterhub/spawn.rules).
        # Format: username:server_ip (one user per line)
        rules = {}
        try:
            with open('/etc/jupyterhub/spawn.rules', 'r') as f: content = f.readlines()
            for line in content:
                rule_user, rule_server = line.rstrip().split(':')
                rules[rule_user] = rule_server
        except:
            rules = {}
        if self.user.name in rules:
            self.ip = rules[self.user.name]
            self.cedeLogger.log(record={"event": "spawning", "destination": self.ip, "reason": "/etc/jupyterhub/spawn.rules", "uid": self.cl_uid}, level=logging.INFO)
            self.log.info('user:%s Will be spawned on remote host: %s [HARD RULE from /etc/jupyterhub/spawn.rules]', self.user.name, self.ip)
        else:
            self.ip = self.balancer.get_balanced_host(self.user.name)
            self.cedeLogger.log(record={"event": "spawning", "destination": self.ip, "reason": "load-balancing", "uid": self.cl_uid}, level=logging.INFO)
            self.log.info('user:%s Will be spawned on remote host: %s', self.user.name, self.ip)

        env = self.get_env()
        properties = {}

        # Dynamic user
        ndi = notoDiskInfo(record = True)
        self.diskdata = ndi.get_diskdata(self._expand_user_vars('{USERNAME}'))
        state_directory = ndi.get_sysd_statedir(self._expand_user_vars('{USERNAME}'))
        properties['DynamicUser'] = 'yes'
        properties['StateDirectory'] = state_directory['relative']
        env['HOME'] = state_directory['absolute']
        working_dir = state_directory['absolute']
        uid = gid = None

        if self.isolate_tmp: properties['PrivateTmp'] = 'yes'

        if self.isolate_devices: properties['PrivateDevices'] = 'yes'

        if self.extra_paths:
            env['PATH'] = '{extrapath}:{curpath}'.format(
                curpath=env['PATH'],
                extrapath=':'.join(
                    [self._expand_user_vars(p) for p in self.extra_paths]
                )
            )

        env['SHELL'] = self.default_shell

        if self.mem_limit is not None:
            properties['MemoryAccounting'] = 'yes'
            quota_value = self.QR.get_value(self.user.name, 'mem_limit', self.mem_limit)
            properties['MemoryLimit'] = quota_value

        if self.cpu_limit is not None:
            properties['CPUAccounting'] = 'yes'
            quota_value = self.QR.get_value(self.user.name, 'cpu_limit', self.cpu_limit)
            properties['CPUQuota'] = '{}%'.format(int(quota_value * 100))

        if self.disable_user_sudo: properties['NoNewPrivileges'] = 'yes'

        if self.readonly_paths is not None:
            properties['ReadOnlyDirectories'] = [
                self._expand_user_vars(path)
                for path in self.readonly_paths
            ]

        if self.readwrite_paths is not None:
            properties['ReadWriteDirectories'] = [
                self._expand_user_vars(path)
                for path in self.readwrite_paths
            ]

        # We want to split the log messages into dedicated log files (in /var/log/jupyter/%unit%.log)
        properties['StandardOutput'] = 'journal'
        properties['StandardError'] = 'journal'
        # The important part here is this:
        # rsyslog is configured to detect this name pattern and send logs to the corresponding log file.
        properties['SyslogIdentifier'] = self.unit_name+'split'

        # Setup group (100 == 'users')
        properties['Group'] = '100'

        properties.update(self.unit_extra_properties)

        command = self.balancer.build_transient_service_command(
            self.unit_name,
            cmd                     = [self._expand_user_vars(c) for c in self.cmd],
            args                    = [self._expand_user_vars(a) for a in self.get_args()],
            working_dir             = working_dir,
            environment_variables   = env,
            properties              = properties,
            uid                     = uid,
            gid                     = gid
        )

        self.cedeLogger.log(record={'event': 'spawn', 'target': self.ip, "uid": self.cl_uid}, level=logging.INFO)
        started = self.balancer.start_transient_service(command, self.unit_name, self.ip, state_directory['absolute'])
        if not started:
            self.log.debug('Error while starting remote transient service.')
            return None

        for i in range(self.start_timeout):
            is_up = await self.poll()
            if is_up is None:
                return (self.ip, self.port)
            await asyncio.sleep(1)
        return None

    async def stop(self, now=False):
        self.log.debug('Stopping unit {} on host {}.'.format(self.unit_name, self.ip))
        self.cedeLogger.log(record={'event': 'stop', 'target': self.ip, 'uid': self.cl_uid}, level=logging.INFO)
        self.balancer.stop_service(self.unit_name, self.ip)

    async def poll(self):
        host = self.balancer.service_running(self.unit_name, self.ip)
        if host:
            self.ip = host
            return None
        return 1

# EOF

If you see anything that can explain why progress messages are all sent at once at the end of the spawning process, please let me know :slight_smile:

Are you sure the methods on balancer like stop_service, service_running or not blocking? Could you share the code snippets of those methods?

I observe similar problem where events are flushed once the server is ready.
My setup: jupyterhub 4.0.2 + batchspawner (slurm)+ wrapspawner + nginx reverse proxy.
I can confirm that progress generator from BatchSpawner is getting called properly

While waiting in start() no progress is updated in the browser, shortly server is ready (start returns)
progress is flushed and visible for blink of the eye till next page with lab gets loaded.

From the browser point of view i see that GET request is waiting to be served but times out after 20 seconds with 503.

We have a very similar setup (batchspawner+wrapspawner) and it works pretty neat. This is our custom progress method. Hope that can help you!

Thank you for the tip and your code. Your custom progress seems very similar to default one and to ours.
Are you using nginx as reverse proxy or apache?
My debugging by printf methodology makes me suspect that socket flushing doesn’t work or we reverse proxy is misbehaving

Yes, we use nginx as reverse proxy. Are you overriding any method in batchspawner?

My little attempt to figure out how progress api works in JupyterHub led me to the following conclusions:

  • GET call to progress API will not return until finish_user_spawn returns (what means that start method in Spawner must finish providing port and ip for proxy configuration)
  • progress method in batchspawner returns only when job is in running state and GET handler for progress API call waits for it to return

So the simplest case: to inform user that job is scheduled and it’s waiting for start within X minutes
won’t be possible w/o modifying SpawnProgressAPIHandler.get() and reworking progress() generator on BatchSpawner side

We use Python 3.11 + jupytherhub 3.1.1 + nginx (reverse proxy) + ConfigurableHTTPProxy.
This combo’s been working flawlessly for years - minus the progress() stuff.

Here they are. We have the tinyest timeouts on ssh commands and state proxy request (1s) - those methods should not be blocking, as far as I understand.

    def start_transient_service(self, command, unit, host, home_directory):
        """
        Start a systemd transient service with given paramters
        """
        self._query_proxy('ASSIGN', unit, host)
        rc, out, err = self._run_command("/usr/bin/sudo /sbin/unfreeze_user {}; /usr/bin/sudo {}".format(home_directory, command), host, sudo = False)
        if rc == 0: return True
        self._query_proxy('REMOVE', unit, host)
        return False
        
        
    def service_running(self, unit, host = None):
        """ 
        Return True if service with given unit name is running (active).
        """ 
        return self._query_proxy('RUNNING', unit, host) or False
    
        
    def stop_service(self, unit, host = None):
        """
        Stop service with given unit name, on the provided host.
        """
        if host is None:
            host = self._query_proxy('RUNNING', unit)
        if host is None:
            return True 
        command = 'systemctl stop '+ unit
        rc, out, err = self._run_command(command, host, sudo = True)
        self._query_proxy('REMOVE', unit, host)
        return True

I assume that _run_command method uses subprocess under the hood and you can try swapping it to asyncio flavoured subprocess. You can look into batchspawner for an example on how to run subprocess calls asynchronously. And then await co-routines self.balancer.start_transient_service, self.balancer.stop_service and self.balancer.service_running. Maybe it is start_transient_service that is starting systemd container is blocking and not letting interpreter to emit progress events.

@nibheis If you want near real-time progress updates you need to override the progress() method to return events asynchronously. For example, you could read from an async queue, e.g.

or, as in BatchSpawner check for the spawners’s state and return an appropriate message

Hi @mahendrapaipuri,

I just checked : _run_command is not using subprocess. It just execute the ssh command (using paramiko) on the remote host and then returns (takes half a second in the debug logs).

Thanks, I tested that : it does not change the behavior, everything is still displayed at the end, right before redirecting the user to its server.

I managed to get some kind of progress (“Server requested” message repeating every one second) only when my progress() method is throwing an exception.

If you are using ssh to execute command, use asyncssh. The thing is not about how long it takes to execute the command. With asyncio, while awaiting for command to return (in your case start_transient_service), you are letting the interpreter to execute other co-routines (like progress).

Also like @manics mentioned, you must emit events for them to appear in the frontend. In the code you shared, there is no progress implementation. So, what exactly are you expecting to see in the frontend? Check batchspawner’s progress implementation which is a very good example.

1 Like

in class SpawnProgressAPIHandler(APIHandler) method get:

async def get(self, user_name, server_name=''):
add

       # progress finished, wait for spawn to actually resolve,
        # in case progress finished early
        # (ignore errors, which will be logged elsewhere)
+        return
         await asyncio.wait([spawn_future])

otherwise messages won’t be flushed to http response until start() method in spawner is completed