The Jobs API
We have been introduced to the concept of concurrency: a method for managing resources such that multiple agents or components of the system can be in progress at the same time without impacting the correctness of the system. We have also discussed the utility of asynchronicity: an approach in concurrency wherein we can schedule a task, receive an immediate response, and continue on to other tasks while the previous task works in the background. The tools we will use to achieve this in the software systems we are building include worker containers, a messaging system, a task queue, and now the Jobs API.
The basic idea is that we will have a new endpoint in our API at a path /jobs (or something similar). A user wanting
to have our system perform a long-running task will create a new job by making an HTTP POST request to /jobs,
describing the job in the POST message body (in JSON). Instead of performing the actual computation, the request will
simply be recorded in Redis and a response will be immediately provided to the user. The response will not include the
result of the job itself, but instead it will indicate that the request has been received and it will be worked on once
it gets to the top of the queue. Also, critically, the response will include an id for the job so that the user can
check the status and, eventually, get the actual result. The Jobs API is a Python module that we will write which includes
methods and tools for managing jobs in our software system.
By the end of this module, students should be able to:
Explain the purpose and reasoning behind all variables and methods in Jobs API
Decide which variables and methods should be private and which should be public
Organize code for software system into API, worker, and jobs modules
Import the Jobs API into other modules to use for jobs functionality
Perform appropriate
curlrequests to POST jobs and GET the result of jobsDesign Principles. The implementation of our Jobs API, comprised of multiple FastAPI routes, a task queue persisted in Redis, and a worker program, will demonstrate the use of modularity and encapsulation in software design.
Concurrency in the Jobs API
Recall that our big-picture goal is to add a Jobs endpoint to our FastAPI system that can process long-running tasks. We will implement our Jobs API with concurrency in mind.
The overall architecture will thus be:
Save the request in a database and respond to the user that the analysis will eventually be run.
Give the user a unique identifier with which they can check the status of their job and fetch the results when they are ready,
Queue the job to run so that a worker can pick it up and run it.
Build the worker to actually work the job.
Parts 1-3 are the tasks of the FastAPI server, while part 4 will be a worker, running as a separate container, that is waiting for new items in the Redis queue.
Strong Types Using Pydantic
Our first order of business is to define the data model(s) (i.e., types) that we will be working with in our application. The primary object here is a job, and we want to model it with a Pydantic model so that we have control over the kinds of job objects that we work with.
Our job is going to have an id field and then one or more fields describing the work to be done. In this simplified example, we will assume just two description fields, a start and an end, which will both be integers. The work will simply print out the numbers between start and end, but a real job would have more parameters.
What else do we need for our job data model? It will be useful to have some additional “bookeeping”
fields, such as the job’s current status, the start time and the end time of the job. We will
use a UUID field for the job’s id, which will ultimately be a str type and we can use int
for the start and end parameters. Here is an initial version:
from pydantic import BaseModel
class Job(BaseModel):
jid: str
start: int
end: int
Optional Fields
Let’s add the start time and end time for the job. We can use the datetime type as the
primary object type for each, but we need to think about whether we will always know those
values for a given job. When a user submits a job and we add it to the queue, will we know
when it will actually start running? It depends on how many other jobs are ahead of it in the
queue, how long those jobs will take, and how many compute resources we have. Similarly, we
won’t know when the job will end until it actually ends.
The job’s start_time and end_time are examples of fields that will not always be available
on the job data model. We denote such fields as “optional” using the typing.Optional class and
provding a default value. In this case, the default will just be None.
Adding these fields yields the following model:
class Job(BaseModel):
jid: str
start: int
end: int
start_time: typing.Optional[datetime] = None
end_time: typing.Optional[datetime] = None
Enumerations for Types with a Fixed Set of Values
The bool type is a special data type with two values: True and False. You could
use a string to represent such as value, i.e., have "True" and "False", but it is not
as convenient and could lead to errors – what happens if someone uses "true" or "TRUE"?
By specifying that the only allowable values are True and False we are able to simplify
our code and more easily ensure the proper values are used.
The bool type is a special case of an Enumeration, that is, a type with a fixed set of
values. The job’s status can be modeled with such a type, no matter what kind of job the user submits or
what happens to it during proccessing, we can specify (or enumerate) all of the possible statuses
that is might encounter. For example, the job will start in “queued” status when it is initially
added to the queue. Then it will go to “running” status once a worker picks it up. And eventually,
it will be finished, either as an “error” or a “success”. So we could say those are the four
possible statuses for our jobs:
queued
running
error (terminal state)
success (terminal state)
In practice, there may want to add additional statuses to track, but for demonstation purposes
we will keep it simple with the above four. To define an enumeration in Python, we use the
Enum class from the enum module and specify the values it should have. When specifying the
values, we give both the “human readable” value as well as a unique specific value, such as an
integer, to bind it to. It is customary to use upper-case letters for the human readable value,
as below.
from enum import Enum
class JobStatus(int, Enum):
QUEUED = 1
RUNNING = 2
ERROR = 3
SUCCESS = 4
Instead of specifying the integer values (i.e., 1, 2, 3, 4) we could use the auto() function
from the enum module.
Code Organization
As software systems get larger, it is very important to keep code organized so that finding the functions, classes, etc. responsible for different behaviors is as easy as possible. To some extent, this is technology-specific, as different languages, frameworks, etc., have different rules and conventions about code organization. We’ll focus on Python, since that is what we are using.
The basic unit of code organization in Python is called a “module”. This is just a Python source file (ends in a .py
extension) with variables, functions, classes, etc., defined in it. We’ve already used a number of modules, including
modules that are part of the Python standard library (e.g. json) and modules that are part of third-party libraries
(e.g., redis).
The following should be kept in mind when designing the modules of a larger system:
Modules should be focused, with specific tasks or functionality in mind, and their names (preferably, short) should match their focus.
Modules are also the most typical entry-point for the Python interpreter itself, (e.g.,
python some_module.py).Accessing code from external modules is accomplished through the
importstatement.Circular imports will cause errors - if module A imports an object from module B, module B cannot import from module A.
Module Design
The Python standard library is a good source of examples of module design. You can browse the standard library for Python 3.14 here.
We see the Python standard library has modules focused on a variety of computing tasks; for example, for working with different data types, such as the
datetimemodule and thearraymodule. The descriptions are succinct:
The datetime module supplies classes for manipulating dates and times.
This module defines an object type which can compactly represent an array of basic values: characters, integers, floating point numbers
For working with various file formats: e.g.,
csv,configparserFor working with concurrency:
threading,multiprocessing, etc.
With this in mind, a first approach might be to break up our system into two modules:
api.py- this module contains the FastAPI web server.
worker.py- this module contains the code to execute jobs.
However, both the API server and the workers will need to interact with the database and the queue:
The API will create new jobs in the database, put new jobs onto the queue, and retrieve the status of jobs (and probably the output products of the job).
The worker will pull jobs off the queue, retrieve jobs from the database, and update them.
This suggests a different structure:
api.py- this module contains the FastAPI web server.
worker.py- this module contains the code to execute jobs.
jobs.py- this module contains core functionality for working with jobs in Redis (and on the queue).
Common code for working with redis/hotqueue can go in the jobs.py module and be imported in both api.py
and worker.py.
Note
High-quality modular design is a crucial aspect of building good software. It requires significant thought and experience to do correctly, and when done poorly it can have dire consequences. In the best case, poor module design can make the software difficult to maintain/upgrade; in the worst case, it can prevent it from running correctly at all.
We can sketch out our module design by making a list of the functionality that will be available in each module. This is only an initial pass at listing the functionality needed – we will refine it over time – but making an initial list is important for thinking through the problem.
api.py: This file will contain all the functionality related to the FastAPI web server, and will
include functions related to each of the API endpoints in our application.
POST /data – Load the data into the application. Will write to Redis.
GET /data?search=… – List all of the data in the system, optionally filtering with a search query parameter. Will read from Redis.
GET /data/<id> – Get a specific object from the dataset using its
id. Will read from Redis.POST /jobs – Create a new job. This function will save the job description to Redis and add a new task on the queue for the job. Will write to Redis and the queue.
GET /jobs – List all the jobs. Will read from Redis.
GET /jobs/<id> – Get the status of a specific job by id. Will read from Redis.
GET /jobs/<id>/results – Return the outputs (results) of a completed job. Will read from Redis.
worker.py: This file will contain all of the functionality needed to get jobs from the task
queue and execute the jobs.
Get a new job – Hotqueue consumer to get an item off the queue. Will get from the queue and write to Redis to update the status of the job.
Perform analysis –
Finalize job – Saves the results of the analysis and updates the job status to complete. Will write to Redis.
jobs.py: This file will contain all functionality needed for working with jobs in the Redis
database and the Hotqueue queue.
Save a new job – Will need to write to Redis.
Retrieve an existing job - Will need to read from Redis.
Update an existing jobs – Will need to read and write to Redis.
Private vs Public Objects
As software projects grow, the notion of public and private access points (functions, variables, etc.) becomes an increasingly important part of code organization.
Private objects should only be used within the module they are defined. If a developer needs to change the implementation of a private object, she only needs to make sure the changes work within the existing module.
Public objects can be used by external modules. Changes to public objects need more careful analysis to understand the impact across the system.
Like the layout of code itself, this topic is technology-specific. In this class, we will take a simplified approach based on our use of Python. Remember, this is a simplification to illustrate the basic concepts - in practice, more advanced/robust approaches are used.
We will name private objects starting with a single underscore (
_) character.If an object does not start with an underscore, it should be considered public.
We can see public and private objects in use within the standard library as well. If we open up the source code for the
datetime module, which can be found on GitHub we see a mix
of public and private objects and methods.
Private objects are listed first.
Public objects start on line 442 with the
timedeltaclass.
EXERCISE 1
Create three files, api.py, worker.py, and jobs.py in your local directory. You may wish to start from the
files you prepared for Homework 06. You should also have a Dockerfile and docker-compose.yml
in this directory to help with containerization and orchestration.
[coe332-vm] $ ls
Dockerfile api.py docker-compose.yaml jobs.py worker.py
Add the following function and variable definitions to jobs.py. Closely examine each line to
make sure you understand
the purpose. Carefully consider which are public and private, and why.
1from datetime import datetime
2import json
3import uuid
4import redis
5from hotqueue import HotQueue
6from enum import Enum
7from pydantic import BaseModel
8import typing
9
10_redis_ip = "172.19.0.1"
11_redis_port = "6379"
12
13rd = redis.Redis(host=_redis_ip, port=6379, db=0)
14q = HotQueue("queue", host=_redis_ip, port=6379, db=1)
15jdb = redis.Redis(host=_redis_ip, port=6379, db=2)
16
17
18class JobStatus(str, Enum):
19 QUEUED = "QUEUED"
20 RUNNING = "RUNNING"
21 ERROR = "FINISHED -- ERROR"
22 SUCCESS = "FINISHED -- SUCCESS"
23
24
25class Job(BaseModel):
26 jid: str
27 status: JobStatus
28 start: int
29 end: int
30 start_time: typing.Optional[datetime] = None
31 end_time: typing.Optional[datetime] = None
32
33
34def _generate_jid() -> str:
35 """
36 Generate a pseudo-random identifier for a job.
37 """
38 return str(uuid.uuid4())
39
40
41def _instantiate_job(jid: str, status: JobStatus, start: int, end: int) -> Job:
42 """
43 Create the job object description as a python dictionary. Requires the job id,
44 status, start and end parameters.
45 """
46 return Job(jid=jid, status=status, start=start, end=end)
47
48
49def _save_job(jid: str, job: Job) -> bool:
50 """Save a job object in the Redis database."""
51 jdb.set(jid, json.dumps(job.model_dump(mode="json")))
52 return True
53
54
55def _queue_job(jid: str) -> bool:
56 """Add a job to the redis queue."""
57 q.put(jid)
58 return True
59
60
61def get_job_by_id(jid: str) -> Job:
62 """Return job object given jid"""
63 raw_data = json.loads(jdb.get(jid))
64 return Job(**raw_data)
65
66
67def add_job(start: int, end: int) -> Job:
68 """Add a job to the redis database and queue."""
69 jid = _generate_jid()
70 job = _instantiate_job(jid, JobStatus.QUEUED, start, end)
71 _save_job(jid, job)
72 _queue_job(jid)
73 return job
74
75
76def start_job(jid: str) -> bool:
77 """Called by worker when starting a new job. Updates the job's status and start time."""
78 start_time = datetime.now()
79 job = get_job_by_id(jid)
80 job.start_time = start_time
81 return _save_job(jid=jid, job=job)
82
83
84def update_job_status(jid: str, status: JobStatus) -> bool:
85 """Update the status of job with job id `jid` to status `status`."""
86 job = get_job_by_id(jid)
87 if job:
88 job.status = status
89 if job.status == JobStatus.ERROR or job.status == JobStatus.SUCCESS:
90 job.end_time = datetime.now()
91 return _save_job(jid, job)
92 else:
93 raise Exception()
EXERCISE 2
Write a skeleton for a FastAPI app in the file api.py. The FastAPI app should:
Import necessary modules, including some from
jobs.pyDeclare an instance of the FastAPI class
Support a route for POSTing a new job – what new data models might you need?
Support a route for GETting job status – do you need any new data models here?
Tip
A job POST request might look like:
curl localhost:5000/jobs -X POST -d '{"start":1, "end":2}' -H "Content-Type: application/json"
In this example, we are sending a ‘start’ and ‘end’ index which is important for the “work”. E.g. perhaps the worker is designed to add up or plot all the values between ‘start’ and ‘end’. In practice, the app that you develop may require different parameters.
EXERCISE 3
Write a skeleton for a worker in the file worker.py: The worker should:
Import necessary modules, including some from
jobs.pyPull items (job IDs) off the queue
When it starts working on a new job, update the job status to ‘RUNNING’
Do work by adding up the values between start and finish. You can also simulate a “longer running” process by issuing a sleep in the worker (e.g. sleep for 5 or 10 seconds or so)
When it finishes working on a new job, update the job status to ‘complete’
What will you need to make sure the worker program can be run from the command line and run forever once started? Make sure you can start up a worker in a new terminal and test that it can work jobs.
EXERCISE 4
Fill out the contents of the Dockerfile and docker-compose.yml in order to help with
containerization and orchestration. Pay careful attention to how you set up and build the containers. Should we be
using one Docker image or two? What should the entrypoint be?
EXERCISE 5
Modify the definition of the rd, q, and jdb objects to not use a hard-coded IP address,
but to instead read the IP address from an environment variable, REDIS_IP. Determine how to set the value of
REDIS_IP in the Dockerfile and / or docker-compose.yml file.