Dynamic Source for Image Puller

Hey there,

Is it possible to provide a dynamic source of images to the image puller besides images specified in the Helm values config?
In addition, is it possible to run it every X minutes since the dynamic source of images (e.g., database) grows over time?

Otherwise, I assume that a custom solution based on the image-awaiter image is the most suitable solution?

Best regards
Paul

The Z2JH image puller only supports static declarative configuration. For your use case I think writing a custom puller is the best solution.

The image-awaiter is only required to delay the creation of pods such as the hub and proxy until the images have already been pulled, so I don’t think you need it since you’re only using it to update images on a regular schedule.

In Z2jh the images by creating a daemonset to run the pulled image with a no-op command:

so since the image-awaiter code creates a daemonset you could copy and modify it to do the equivalent of the Helm chart, but dynamically.

1 Like

Thank you for providing further insights.

I ended up writing a custom script that queries the Docker images and dynamically creates a DaemonSet to pull the images on all nodes frequently.

1 Like

Could you share what that script looks like?

Sure! In my case, I have a PostgreSQL database with Docker images.
Every 15 minutes, I fetch the images and check if they have been pulled since the pod started.
If not, they are added to the list of init containers.
However, the script can be improved with proper error handling. For instance, if an image cannot be pulled (e.g., credentials are missing), the script breaks and does not skip the image.

package main

import (
	"context"
	"database/sql"
	"fmt"
	"os"
	"time"

	"slices"

	appsv1 "k8s.io/api/apps/v1"
	v1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/fields"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"

	log "github.com/sirupsen/logrus"

	_ "github.com/lib/pq"
)

func doesDaemonSetExists(clientset *kubernetes.Clientset) (bool, error) {
	result, err := clientset.AppsV1().DaemonSets(Namespace).List(context.Background(), metav1.ListOptions{
		FieldSelector: fields.OneTermEqualSelector("metadata.name", "profile-puller-node").String(),
	})
	if err != nil {
		return false, err
	}

	return len(result.Items) == 1, nil
}

func areAllPodsRunning(clientset *kubernetes.Clientset) (bool, error) {
	pods, err := clientset.CoreV1().Pods(Namespace).List(context.Background(), metav1.ListOptions{
		LabelSelector: "app=profile-puller-node",
	})
	if err != nil {
		return false, err
	}

	if len(pods.Items) == 0 {
		return false, nil
	}

	for _, pod := range pods.Items {
		if pod.Status.Phase != v1.PodRunning {
			return false, nil
		}
	}
	return true, nil
}

func openDatabaseConnection() (*sql.DB, error) {
	connStr := fmt.Sprintf("user=%s dbname=%s password=%s host=%s sslmode=disable",
		os.Getenv("JH_DB_USER"),
		os.Getenv("JH_DB_NAME"),
		os.Getenv("JH_DB_PASSWORD"),
		os.Getenv("JH_DB_HOST"))

	db, err := sql.Open("postgres", connStr)
	if err != nil {
		return nil, err
	}

	err = db.Ping()
	if err != nil {
		return nil, err
	}

	return db, err
}

func queryProfiles(db *sql.DB) ([]string, error) {
	query := `SELECT DISTINCT docker_image FROM profiles`

	rows, err := db.Query(query)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	dockerImages := []string{}

	for rows.Next() {
		var dockerImage string
		err := rows.Scan(&dockerImage)
		if err != nil {
			return nil, err
		}

		dockerImages = append(dockerImages, dockerImage)
	}

	if err := rows.Err(); err != nil {
		return nil, err
	}

	return dockerImages, nil
}

const Namespace = "<Kubernetes namespace>"

func main() {
	log.Infoln("Starting profile puller")

	// Connect to cluster
	config, err := rest.InClusterConfig()
	if err != nil {
		log.Fatalf("Could not get in-cluster config: %v", err)
	}
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		log.Fatalf("Could not connect to Kubernetes API: %v", err)
	}

	// "Cache" already pulled images
	imageCache := []string{}

	// Query images
	db, err := openDatabaseConnection()
	if err != nil {
		log.Fatalf("Failed to connect to database: %v", err)
	}

	for {
		time.Sleep(15 * time.Minute)

		// Check if image puller is still running
		running, err := doesDaemonSetExists(clientset)
		if err != nil {
			log.Fatalf("Failed to check list of DaemonSets: %v", err)
			continue
		}
		if running {
			log.Warnln("Image puller is still pulling images from previous job")
			continue
		}

		queriedDockerImages, err := queryProfiles(db)
		if err != nil {
			log.Errorf("Failed to query docker images: %v", err)
			continue
		}

		// Filter images
		dockerImages := []string{}
		for i := range queriedDockerImages {
			if !slices.Contains(imageCache, queriedDockerImages[i]) {
				dockerImages = append(dockerImages, queriedDockerImages[i])

				imageCache = append(imageCache, queriedDockerImages[i])
			}
		}

		// Remove duplicates
		slices.Sort(dockerImages)
		dockerImages = slices.Compact(dockerImages)

		if len(dockerImages) == 0 {
			log.Infoln("Profiles contain only already pulled Docker images.")
			continue
		} else {
			log.Infof("Found a total of %d uncached Docker images to pull:", len(dockerImages))

			for i := range dockerImages {
				log.Infof("  - %s", dockerImages[i])
			}
		}

		// Build DaemonSet
		initContainers := []v1.Container{}

		for i := range dockerImages {
			dockerImageName := dockerImages[i]

			initContainer := v1.Container{
				Name:            fmt.Sprintf("profile-puller-%d", i),
				Image:           dockerImageName,
				ImagePullPolicy: v1.PullIfNotPresent,
				Command:         []string{"echo", "Hello World"},
			}
			initContainers = append(initContainers, initContainer)
		}

		daemonSet := appsv1.DaemonSet{
			ObjectMeta: metav1.ObjectMeta{Name: "profile-puller-node"},
			Spec: appsv1.DaemonSetSpec{
				Selector: &metav1.LabelSelector{MatchLabels: map[string]string{
					"name": "profile-puller-node",
					"app":  "profile-puller-node",
				}},
				Template: v1.PodTemplateSpec{
					ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
						"name": "profile-puller-node",
						"app":  "profile-puller-node",
					}},
					Spec: v1.PodSpec{
						InitContainers: initContainers,
						Containers: []v1.Container{
							{Name: "pause", Image: "gcr.io/google_containers/pause:3.2"},
						},
					},
				},
			},
		}
		_, err = clientset.AppsV1().DaemonSets(Namespace).Create(context.Background(), &daemonSet, metav1.CreateOptions{})

		if err != nil {
			log.Errorf("Could not create DaemonSet: %v", err)
			continue
		}

		// Poll running pods to delete DaemonSet
		interval := 30 * time.Second

		for {
			allRunning, err := areAllPodsRunning(clientset)
			if err != nil {
				log.Errorf("Could not get pod status: %v", err)
				continue
			}

			if allRunning {
				log.Infof("All pods for DaemonSet %s are running. Proceeding to delete.", "profile-puller-node")
				err := clientset.AppsV1().DaemonSets(Namespace).Delete(context.Background(), "profile-puller-node", metav1.DeleteOptions{})
				if err != nil {
					log.Errorf("Failed to delete DaemonSet: %v", err)
					continue
				}

				log.Infof("DaemonSet %s deleted successfully.", "profile-puller-node")
				break
			} else {
				log.Infof("Not all pods are running for DaemonSet %s. Checking again in %v...", "profile-puller-node", interval)
				time.Sleep(interval)
			}
		}
	}
}

Note that the deployment of the script as a pod requires a dedicated service account with the following role:

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: profile-puller
rules:
  - apiGroups: [""]
    resources: ["pods"]
    verbs: ["get", "list"]
  - apiGroups: ["apps"]
    resources: ["daemonsets"]
    verbs: ["get", "watch", "list", "create", "delete"]

Please let me know if you have further questions!

2 Likes

I ended up using keel (GitHub - keel-hq/keel: Kubernetes Operator to automate Helm, DaemonSet, StatefulSet & Deployment updates) to update the images of the image puller.