3 Ways to Time Kubernetes Job Duration for Better DevOps¶
Knowing how long the exucution of jobs take is a crucial part of monitoring and proactive system administration.
Being able to measure, store and query this value over the course of your application lifecycle can help you identify bottlenecks, optimize your infrastructure and improve the overall performance of your application.
In this blog post, you are presented with three methods to achive this, starting from one where you have the access and ability to modify the source code, to the one where you have control over its runtime execution, and finally without control on either & using only the Kube State Metrics.
Read more to find out how.
Introduction¶
Being able to view the historic values of the execution time of your one-off jobs is crucial if you want to know when, what and how did things change.
It'll help you identify and nail down the moment where things got off-track.
This will enable you to troubleshoot and debug faster and in proactive manner before things get ugly.
There are three approaches that come to mind to measure and store this value, all of which will require the VictoriaMetrics1 or the Prometheus2 ecosystem.
Method 1: Modify the Source Code¶
Let's start with the one where we have access to modify the source code of the application we want monitored.
A simple example will look like the following (provided in Golang & Python)
package main
import (
"fmt"
"io"
"net/http"
"os"
"strings"
"time"
)
var (
PUSHGATEWAY_URL = os.Getenv("PUSHGATEWAY_URL")
JOB_NAME = os.Getenv("JOB_NAME")
INSTANCE_NAME = os.Getenv("INSTANCE_NAME")
)
// This is the main one-off task we want to execute.
//
// The other helper functions in this file are just measuring the duration
// of this task.
func theMainJob() {
time.Sleep(1 * time.Second)
}
func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
fmt.Println("Error getting hostname:", err)
return "unknown"
}
return hostname
}
func initDefaults() {
if PUSHGATEWAY_URL == "" {
PUSHGATEWAY_URL = "http://localhost:9091"
}
if JOB_NAME == "" {
JOB_NAME = "my_job"
}
if INSTANCE_NAME == "" {
INSTANCE_NAME = getHostname()
}
}
func main() {
initDefaults()
fmt.Println("Starting the job...")
start := time.Now()
theMainJob()
duration := time.Since(start).Seconds()
fmt.Println("Job execution finished, push the metrics...")
pushURL := fmt.Sprintf("%s/metrics/job/%s/instance/%s/lang/go", PUSHGATEWAY_URL, JOB_NAME, INSTANCE_NAME)
fmt.Println("Pushing to Pushgateway:", pushURL)
req, err := http.NewRequest("POST", pushURL, nil)
if err != nil {
fmt.Println("Error creating request:", err)
return
}
payload := fmt.Sprintf(`
# TYPE job_duration_seconds gauge
# HELP job_duration_seconds Duration of the job in seconds
job_duration_seconds %s
`, fmt.Sprintf("%f", duration))
httpPayload := strings.NewReader(payload)
client := &http.Client{}
req.Body = io.NopCloser(httpPayload)
resp, err := client.Do(req)
if err != nil {
fmt.Println("Error pushing to Pushgateway:", err)
return
}
defer resp.Body.Close()
fmt.Println("Execution time pushed successfully:", duration)
}
import http.client
import os
import socket
import time
import urllib.parse
def get_hostname():
try:
return socket.gethostname()
except:
return "unknown"
def init_defaults():
global PUSHGATEWAY_URL, JOB_NAME, INSTANCE_NAME
PUSHGATEWAY_URL = os.getenv("PUSHGATEWAY_URL", "http://localhost:9091")
JOB_NAME = os.getenv("JOB_NAME", "my_job")
INSTANCE_NAME = os.getenv("INSTANCE_NAME", get_hostname())
def the_main_job():
time.sleep(1)
def main():
init_defaults()
print("Starting the job...")
start_time = time.time()
the_main_job()
duration = time.time() - start_time
print("Job execution finished, push the metrics...")
url = urllib.parse.urlparse(PUSHGATEWAY_URL)
push_path = f"/metrics/job/{JOB_NAME}/instance/{INSTANCE_NAME}/lang/python"
payload = f"""
# TYPE job_duration_seconds gauge
# HELP job_duration_seconds Duration of the job in seconds
job_duration_seconds {duration}
"""
try:
if url.scheme == "https":
conn = http.client.HTTPSConnection(url.netloc)
else:
conn = http.client.HTTPConnection(url.netloc)
headers = {"Content-Type": "text/plain"}
conn.request("POST", push_path, payload, headers)
response = conn.getresponse()
if response.status < 400:
print(f"Execution time pushed successfully: {duration}")
else:
print(f"Error pushing metrics: {response.status} {response.reason}")
except Exception as e:
print(f"Error pushing to Pushgateway: {e}")
finally:
conn.close()
if __name__ == "__main__":
main()
Now, I'm sure both of these codes could be simplified by orders of magnitude, using decorator/middleware. But the point is only to provide an example.
We want to be able to measure the time of our main task, and report its result to Prometheus Pushgateway.
How to run any of these in a Kubernetes cluster?
Simply provide the container running these apps with the right URL to pushgateway:
# ... truncated ...
- name: my-container
image: my-repo/my-image
env:
- name: PUSHGATEWAY_URL
value: http://pushgateway.monitoring.svc.cluster.local:9091
# ... truncated ...
And the result will look like the following.

Method 2: Modify the Runtime Execution¶
Now, we've seen how to modify the code and how to report the duration when having access to modify the code. But, what if we were not able to do that?
What if we're on a different team or a different department and only have access to how the container is running in the production environment?
That's where Prometheus Command Timer3 comes in.
It aims to be a single binary with all the batteries included to be able to measure the execution time of a command and report its duration to Prometheus Pushgateway.
The way it works and the way it was designed was pretty simple and straightforward actually:
We want to be able to run any command within a container of a pod, measure its duration without touching the code of the upstream, and report the duration to Prometheus Pushgateway.
The idea is to run the command timer as a CLI wrapper before the main one-off task.
The same way you'd run, say, dumb-init
4, gosu
5 or tini
6.
Let's see it in example first before showing you the code.
Imagine running the following Kubernetes Job:
---
apiVersion: batch/v1
kind: Job
metadata:
name: sleep
spec:
template:
spec:
containers:
- args:
- sh
- -c
- |
sleep 10
image: busybox:1
name: sleep
restartPolicy: OnFailure
Now, if I cannot modify the upstream code in the sh
or sleep
command above, the best I can do is to run those commands within another command.
This will only be a tiny wrapper that does nothing else but to track the execution time of the child command.
Let's see how it works:
---
apiVersion: batch/v1
kind: Job
metadata:
name: sleep
spec:
template:
spec:
containers:
- command:
- prometheus-command-timer
- --pushgateway-url=http://pushgateway.monitoring.svc.cluster.local:9091
- --job-name=sleep
- --
args:
- sh
- -c
- |
sleep 10
image: busybox:1
name: sleep
volumeMounts:
- mountPath: /usr/local/bin/prometheus-command-timer
name: shared
subPath: prometheus-command-timer
initContainers:
- args:
- --directory
- /shared
image: ghcr.io/meysam81/prometheus-command-timer:v1
name: install-prometheus-command-timer
volumeMounts:
- mountPath: /shared
name: shared
restartPolicy: OnFailure
volumes:
- emptyDir: {}
name: shared
This is beautiful. This time around, unlike the last one, I didn't have to change the code in the upstream application.
All I had to do was to install the required binary into my container, run the timer command as a wrapper before the job command, and once again, I will have those numbers in my Prometheus Pushgateway.
Now, for those of you who are interested in the underlying code , here's the minimal Golang application that powers this using only the standard library.
Prometheus Command Timer
package main
import (
"flag"
"fmt"
"net/http"
"os"
"os/exec"
"runtime/debug"
"strings"
"syscall"
"time"
)
type Config struct {
PushgatewayURL string
JobName string
InstanceName string
Labels string
Debug bool
Version bool
Info bool
}
func main() {
config := Config{
InstanceName: getHostname(),
Info: true,
}
flag.StringVar(&config.PushgatewayURL, "pushgateway-url", "", "Pushgateway URL (required)")
flag.StringVar(&config.JobName, "job-name", "", "Job name for metrics (required)")
flag.StringVar(&config.InstanceName, "instance-name", config.InstanceName, "Instance name for metrics")
flag.StringVar(&config.Labels, "labels", "", "Additional labels in key=value format, comma-separated (e.g., env=prod,team=infra)")
flag.BoolVar(&config.Version, "version", false, "Output version")
showHelp := flag.Bool("help", false, "Show help message")
flag.BoolVar(showHelp, "h", false, "Show help message (shorthand)")
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s [OPTIONS] -- COMMAND [ARGS...]\n\n", os.Args[0])
fmt.Fprintf(os.Stderr, "Executes a command and reports its duration to a Prometheus Pushgateway.\n\n")
fmt.Fprintf(os.Stderr, "Options:\n")
flag.PrintDefaults()
fmt.Fprintf(os.Stderr, "\nExample:\n")
fmt.Fprintf(os.Stderr, " %s \\\n", os.Args[0])
fmt.Fprintf(os.Stderr, " --pushgateway-url http://pushgateway:9091 \\\n")
fmt.Fprintf(os.Stderr, " --job-name backup --instance-name db01 \\\n")
fmt.Fprintf(os.Stderr, " --labels env=prod,team=infra,type=full \\\n")
fmt.Fprintf(os.Stderr, " -- \\\n")
fmt.Fprintf(os.Stderr, " pg_dump database\n\n")
fmt.Fprintf(os.Stderr, "Note: Use -- to separate the wrapper options from the command to be executed.\n")
}
argsIndex := -1
for i, arg := range os.Args {
if arg == "--" {
argsIndex = i
break
}
}
var cmdArgs []string
if argsIndex != -1 {
flag.CommandLine.Parse(os.Args[1:argsIndex])
if argsIndex+1 < len(os.Args) {
cmdArgs = os.Args[argsIndex+1:]
}
} else {
flag.Parse()
cmdArgs = flag.Args()
}
if *showHelp {
flag.Usage()
os.Exit(0)
}
if config.Version {
buildInfo, _ := debug.ReadBuildInfo()
fmt.Println("Version:", buildInfo.Main.Version)
fmt.Println("Go Version:", buildInfo.GoVersion)
os.Exit(0)
}
if config.PushgatewayURL == "" || config.JobName == "" {
fmt.Fprintln(os.Stderr, "Error: Missing required parameters")
flag.Usage()
os.Exit(1)
}
if len(cmdArgs) == 0 {
fmt.Fprintln(os.Stderr, "Error: No command specified")
flag.Usage()
os.Exit(1)
}
logStdout(&config, "Pushgateway URL: %s", config.PushgatewayURL)
logStdout(&config, "Job name: %s", config.JobName)
logStdout(&config, "Instance name: %s", config.InstanceName)
logStdout(&config, "Labels: %s", config.Labels)
exitCode := executeCommand(&config, cmdArgs)
os.Exit(exitCode)
}
func executeCommand(config *Config, cmdArgs []string) int {
startTime := time.Now().Unix()
cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Stdin = os.Stdin
err := cmd.Run()
exitStatus := 0
if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
exitStatus = status.ExitStatus()
}
} else {
exitStatus = 1
}
}
endTime := time.Now().Unix()
duration := endTime - startTime
sendMetric(config, "job_duration_seconds", fmt.Sprintf("%d", duration), "gauge", "Total time taken for job execution in seconds")
sendMetric(config, "job_exit_status", fmt.Sprintf("%d", exitStatus), "gauge", "Exit status code of the last job execution (0=success)")
sendMetric(config, "job_last_execution_timestamp", fmt.Sprintf("%d", endTime), "gauge", "Timestamp of the last job execution")
sendMetric(config, "job_executions_total", "1", "counter", "Total number of job executions")
return exitStatus
}
func buildPushgatewayURL(config *Config) (string, error) {
url := fmt.Sprintf("%s/metrics/job/%s/instance/%s",
strings.TrimSuffix(config.PushgatewayURL, "/"),
config.JobName,
config.InstanceName)
if config.Labels != "" {
labels := strings.Split(config.Labels, ",")
for _, label := range labels {
parts := strings.SplitN(label, "=", 2)
if len(parts) != 2 {
return "", fmt.Errorf("invalid label format: %s (must be key=value)", label)
}
key := parts[0]
value := parts[1]
if !isValidLabelKey(key) {
return "", fmt.Errorf("invalid label key: %s (must start with a letter or underscore)", key)
}
url = fmt.Sprintf("%s/%s/%s", url, key, value)
}
}
return url, nil
}
func isValidLabelKey(key string) bool {
if len(key) == 0 {
return false
}
first := key[0]
if !((first >= 'a' && first <= 'z') || (first >= 'A' && first <= 'Z') || first == '_') {
return false
}
return true
}
func sendMetric(config *Config, metricName, value, metricType, helpText string) {
url, err := buildPushgatewayURL(config)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
return
}
logStdout(config, "Sending metric: %s=%s (%s)", metricName, value, metricType)
payload := fmt.Sprintf("# TYPE %s %s\n# HELP %s %s\n%s %s\n",
metricName, metricType, metricName, helpText, metricName, value)
req, err := http.NewRequest("POST", url, strings.NewReader(payload))
if err != nil {
fmt.Fprintf(os.Stderr, "Error: Failed to create HTTP request: %v\n", err)
return
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: Failed to send metrics to Pushgateway: %v\n", err)
return
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
fmt.Fprintf(os.Stderr, "Error: Failed to send metrics to Pushgateway (HTTP status %d)\n", resp.StatusCode)
}
}
func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
return "unknown"
}
return hostname
}
func logStdout(config *Config, format string, args ...interface{}) {
if config.Info {
timestamp := time.Now().Format("2006-01-02T15:04:05-0700")
fmt.Printf("[%s]: %s\n", timestamp, fmt.Sprintf(format, args...))
}
}
Method 3: Using Kube State Metrics¶
This approach feels more intuitive for observability people who are dealing with Prometheus and PromQL7 in their day to day job8.
It eases the pain of having to change anything at all other than having either the VictoriaMetrics or the Prometheus Kubernetes stack installed in your cluster.
We can measure the execution time of a Kubernetes Job in the following PromQL query:
Measured as the total minutes of the execution time of a job.
We can further enhance this by having it as a recording rule9 to ensure we don't overload the query engine.
---
groups:
- name: recording-rules
rules:
- record: job_execution_minutes_total
expr: >-
(
kube_job_status_completion_time -
kube_job_status_start_time
) / 60
If you want an online playground, you can try the VictoriaMetrics playground10.
For example, here's the URL to the query above:
Now, we can improve this query further by looking for execution times of CronJobs when there is a parent resource.
To do that, we'll leverage the kube_job_owner
metric.
(
kube_job_status_completion_time -
kube_job_status_start_time
) * on(job_name, namespace)
group_left(owner_name)
kube_job_owner{owner_kind="CronJob"}
And to add it to our recording rules:
---
groups:
- name: recording-rules
rules:
- record: job_execution_minutes_total
expr: >-
(
kube_job_status_completion_time -
kube_job_status_start_time
) / 60
- record: cronjob_execution_minutes_total
expr: >-
(
kube_job_status_completion_time -
kube_job_status_start_time
) * on(job_name, namespace)
group_left(owner_name)
kube_job_owner{owner_kind="CronJob"}
Conclusion¶
This blog post came to being as a result of me trying to find a way to measure the execution time of Kubernetes cronjobs and being able to query their historic values.
I was looking for a way to be able to understand how the execution times have changed and be able to identify the bottlenecks and the slowest jobs.
The three methods provided here have all been sparsely used here and there dependending on how much control I've had over the application.
If you happen to have a better way to persist those metrics, please do let me know using the comments below.
Until next time , ciao
& happy coding!
Subscribe to Newsletter Subscribe to RSS Feed
Share on Share on Share on Share on
-
https://artifacthub.io/packages/helm/victoriametrics/victoria-metrics-k8s-stack/0.38.0 ↩
-
https://artifacthub.io/packages/helm/prometheus-community/kube-prometheus-stack/69.6.0 ↩
-
https://prometheus.io/docs/prometheus/latest/querying/basics/ ↩
-
https://docs.victoriametrics.com/operator/resources/vmrule/#recording-rule ↩