+ - 0:00:00
Notes for current slide
Notes for next slide

The JobCenter

An Orchestration Workflow Engine

Wieger Opmeer

(opmeer@strato.de) TPC Glasgow 2018

1 / 42

A bit about me

  • Been programming since the '80s
  • Perl since the late '90s
  • Also worked with C, PHP, Python, Lua..
  • Did a lot of system administration / devops (before it was called that)
    • Mainly Linux
    • Also worked with Solaris, SCO Unix, AIX, Ultrix..
  • Worked with Mysql, PostreSQL, Orcacle, ..
2 / 42

A bit about Strato

  • Web-hosting company
    • Shared, Virtual, Dedicated, etc.
  • Been around for 20+ years
    • Lots of legacy code and systems
    • Oracle, Sybase, Solaris..
    • Lots of Perl
    • Lots of self-built solutions
    • for example own KV store (dbmd)
  • Part of the United Internet Group since 2017
    • 1&1
    • home.pl, arsys.es, fasthost.co.uk..
    • "Looking for synergy"
3 / 42

Orchestration Workflow Example

Ordering a Wordpress Package on Shared Webhosting

  • Allocate storage (filer)
    • May determine IP number
  • Configure DNS (nameserver)
    • With the IP number from the previous step
  • Register Domain (EPP client)
  • Configure Vhost (apache)
    • May be dependent on storage location
  • Allocate Mysql Database
    • May also be dependent on storage location
  • Install Wordpress
  • Configure Wordpress
4 / 42

How it used to be..

(and mostly still is)

  • lots ad-hoc state machines
    • state transition logic buried deep in the scripts
    • often spread over multiple scripts
    • most state is stored in the database
    • but some might be somewhere else
  • mostly cronjobs
    • depending on cron-schedule some things can take hours
    • horrible hacks to speed things up (procmail..)
5 / 42

Wouldn't it be nice if..

  • we had some easy way to model state machines
  • we could store the workflow logic in some central place
  • we could introspect running jobs in some standard way
  • etc.
6 / 42

JobCenter Design Criteria

  • Database as central storage
    • The state machine transition table is stored there
    • Current state is stored there
    • Every state transition is logged there
    • "Crash-Proof"
  • Job state should be easily inspectable
    • Everything is JSON
  • It should be relatively easy to 'meddle' with a Job
    • update jobs set ... where job_id=...
  • Relatively "Lightweight" and "Simple"
  • Written in our favourite language
  • Open Source
7 / 42

Some JobCenter terminology

  • A Workflow is a state machine transition table
    • or a program for the JobCenter virtual machine
  • Actions are the possible instructions
    • external (done by a worker)
    • internal (if, while, etc)
    • workflow ('function')
    • procedure
  • Actions have declared 'inputs' and 'outputs'
    • using JSON Schemas
  • A Workflow is (also) an Action
    • workflows as subroutines
8 / 42

Some more terminology

  • A Task is an Action in a Workflow
    • an instruction in a program
  • A Job is an instance of a Workflow
    • executing the program
  • A Child Job is a workflow that was called from another workflow
    • next 'stack frame' for a subroutine
  • A Worker is an external entity capable of performing one of more Actions
9 / 42

A bit about Job state

A running job has:

  • a "current task" ("current instruction")
  • arguments (ro)
  • variables (rw)
  • task state

All job state is persistent.


Per task state sequence:

10 / 42
  • timestamps
  • stepcounter
  • ...

The JobCenter Language (JCL)

a custom workflow modelling language:

  • intended to look and feel like a somewhat 'normal' programming language
  • easy to version control
  • easy to diff
  • compiled using the jcc compiler
    • compiled form is stored in the database
11 / 42

Basic JCL example

workflow calltest:
in:
input number
out:
output number
wfomap:
output = v.thing
do:
call add:
counter = a.input
step = 3
into:
thing = o.counter
  • "input mapping" is calculating the subroutine arguments
  • "output mapping" is storing the subroutine results
  • the "workflow output map" calculates the workflow results
12 / 42

JCL: Loops

workflow whiletest:
in:
input number
out:
output number
do:
while !v.thing or v.thing < 10:
call add:
counter = v.thing // a.input
into:
thing = o.counter
wfomap:
output = v.thing

Like all job state the while loop state is persistent.

13 / 42

JCL: Ifs and such

workflow casetest:
in:
input string
out:
counter number
thing string
wfomap:
counter = v.counter
thing = v.thing
do:
case a.input:
when "foo":
call add:
counter = 10
into:
counter = o.counter
thing = 'got foo'
when bar, baz:
eval:
counter = -1
thing = 'got bar or baz'
else:
eval:
counter = -1
thing = 'dunno what i got: ' . a.input
14 / 42

Implementation - High level view

15 / 42

Low level API

  • the basis is a PostgreSQL stored procedure API:
    • create_job
    • get_job_status
    • announce
    • get_task
    • task_done
    • withdraw
  • PostgreSQL listen/notify (pubsub) is used for ipc:
    • on creation of a new job
    • when a job finishes
    • when a task is ready for a worker
    • when a worker is done
    • etc.
16 / 42

The Maestro

  • keeps the time
    • timeouts
    • retries
  • scheduling of periodic jobs (dead worker detection, archiving)
  • tells the database what to do:
    • calls stored procedures on the first available connection:
      • in response to notifications
        • task ready / done / error / ping
      • based on the results of a query
17 / 42

The JSON-RPC 2.0 API

  • mostly a 1:1 translation of the low level api
  • uses communication over establised connections
    • tcp / tls / websocket
    • multiple request can be "in flight" at the same time
    • asynchronous
    • notifications
  • basically the same api for clients and workers
    • authentication handshake
    • a worker is a client that announces one or more actions
  • the API calls the low level stored procedures "on behalf of" or "impersonating" the clients and workers
  • the API adds the concept of 'slots'
    • a worker can limit the amount of work it is being sent
18 / 42

Implementation - Low level view

19 / 42

JobCenter::Pg

  • a subclass of Mojo::Pg
    • adds a 'queue_query($cb)' method
    • calls that callback as soon as a database connection is available
    • the cb 'owns' that connection until it goes out of scope
    • allows a "at most n connections" model
    • connections get closed after max_use times
  • this allows the Maestro to "play" on n connections the same time
  • this allows the jcapi to work "on behalf" of n clients/workers at the same time
20 / 42

Low Level Example 1

# select * from create_job('calltest','{"input":123}'::jsonb);
o_job_id | o_listenstring
----------+----------------
9 | job:9:finished

Query done by the client.

create_job:

  • verifies that the caller is allowed to call calltest
  • verifies the input arguments against the specification
  • inserts a record into the jobs table
  • sends a notify "jobtaskdone", '(5,3,9)' to the Maestro
    • meaning: the 'start' task of this job has been done
21 / 42

Low Level Example 2

# select * from do_jobtaskdone('(5,3,9)');
error | jobtask
-------+---------
false | (5,2,9)

Query done by the Maestro on receipt of the notification.

do_jobtaskdone:

  • finds the next task in the workflow
    • the next instruction in the program
  • returns the "next thing to do":
    • (workflow_id, task_id, job_id)
22 / 42

Ignore the error = false for now

Low Level Example 3

# select * from do_jobtask('(5,2,9)');
error | jobtask
-------+---------
false | null

Query done by the Maestro.

do_jobtask:

  • would return the "next thing to do"
    • keeping the workflow moving (program running)
  • there is no next thing to do
    • because a worker needs to do the 'add' action first
  • a notify "action:1:ready", '{"job_id": 9}' has been sent
    • alerting the worker
23 / 42

Low Level Example 4

# select * from get_task('thisworker','add',9);
o_job_id | o_job_cookie | o_in_args | o_env
----------+--------------+-----------------+-------
9 | da3c5c5d | {"counter":123} | null

Query done by the worker on receipt of the notification.

get_task:

  • returns the "work to do" if there is any
    • maybe another worker was faster
  • the cookie is unique for "this worker doing this task now"
24 / 42

Low Level Example 5

# select task_done('da3c5c5d...', '{"counter":126}');
void

Query done by the worker when it has done it's work.

task_done:

  • looks for the 'jobbask' using the cookie
  • verifies the "output arguments" against the specification
  • performs the output mapping
  • stores the new value for the variables in the job record
  • does a NOTIFY "jobtaskdone", '(5,2,9)'
25 / 42

Low Level Example 6

# select * from do_jobtaskdone('(5,2,9)');
error | jobtask
-------+---------
false | (5,4,9)

Query done by the Maestro on receipt of the notification.

do_jobtaskdone:

  • finds the next task in the workflow
26 / 42

Low Level Example 7

# select * from do_jobtask('(5,4,9)');
error | jobtask
-------+---------
false | null

Query done by the Maestro.

Task_id 4 happens to be the "end task" for workflow_id 5:

  • the workflow output map (wfomap) is executed
  • the 'outargs' are checked against the workflow specification
  • the 'outargs' are stored in the job record
  • a notify "job:9:finished", '' is sent.
27 / 42

Low Level Example 8

# select get_job_status(9);
get_job_status
-----------------
{"output": 126}

Called by the client on receipt of the notifcation

get_job_status:

  • returns the job results
    • or null if the job hasn't finished yet
28 / 42

Secret Perl

  • all expressions get compiled to perl
    • imap
    • omap
    • wfomap
    • if / while / repeat (bool)
    • case (string)
  • executed in PostgreSQL using pl/perl
29 / 42

So this :

workflow calltest:
in:
input number
out:
output number
wfomap:
output = v.thing
do:
call add:
counter = a.input
step = 3
into:
thing = o.counter
30 / 42

Becomes this:

workflow calltest:
in:
input number
out:
output number
wfomap:
[[ $o{output} = $v{thing}; ]]
do:
call add:
[[
$i{counter} = $a{input};
$i{step} = 3;
]]
into:
[[ $v{thing} = $o{counter}; ]]
31 / 42

And is executed like this:

CREATE OR REPLACE FUNCTION jobcenter.do_imap(code text, args jsonb, env jsonb, vars jsonb)
RETURNS jsonb LANGUAGE plperl
AS $function$
use strict;
use warnings;
use JSON::MaybeXS qw(from_json to_json JSON);
use JobCenter::Safe;
my $safe = new JobCenter::Safe;
my ($code, $jargs, $jenv, $jvars) = @_;
our %a = %{from_json($jargs // '{}')};
our %e = %{from_json($jenv // '{}')};
our %v = %{from_json($jvars // '{}')};
our %i = ();
our $TRUE = JSON->true;
our $FALSE = JSON->false;
our $JSON = JSON::MaybeXS->new(utf8 => 0);
$safe->share(qw(%a %e %v %i $TRUE $FALSE $JSON));
$safe->reval($code, 1);
die "$@" if $@;
return to_json(\%i);
$function$
32 / 42

JobCenter::Client::Mojo Worker

use JobCenter::Client::Mojo;
use Mojo::IOLoop;
$client = JobCenter::Client::Mojo->new(
who => 'theEmployee',
token => 'doesThings',
);
$client->announce(
actionname => 'add',
cd => \&do_add,
mode => 'async',
)
$client->work();
sub do_add {
my ($job_id, $vars, $cb) = @_;
Mojo::IOLoop->timer(.5 => sub {
$cb->({ counter => $vars->{counter} + $vars->{step} });
});
}
33 / 42

JobCenter::Client::Mojo Client

my $client = JobCenter::Client::Mojo->new(
who => 'theCustomer',
token => 'wantsThings',
json => 1,
);
my ($job_id, $outargs) = $client->call(
wfname => 'calltest',
inargs => '{"input":456}'
);
print "$job_id: $outargs\n" if $job_id;

2824: {"output":459}

34 / 42

Actual workflow example:

workflow "prov.vhost_event":
in:
zone string
zoneid integer
vhost string
out:
result boolean
log string optional
role:
jobs_spooler
wfomap:
<result>
log = ifdef(v.log)
locks:
zoneid _ manual
workflowlock 'prov.vhost_event'
do:
lock zoneid a.zoneid # lock zoneid
call provdb_get_rid_by_vhost:
zoneid = a.zoneid
vhost = a.vhost
into:
<rid>
<orderno>
<clientid>
35 / 42

Actual workflow example 2:

call dwh_vhost_event:
<rid>
<orderno>
<clientid>
zone = a.zone
zoneid = a.zoneid
vhost = a.vhost
into:
<result>
v.postconfigure = ifdef(o.postconfigure)
v.log = ifdef(o.comment)
if [[ $v{postconfigure} and $v{postconfigure} eq 'set_default_ip' ]]:
call provdb_update_zone_template:
zoneid = a.zoneid
template = 'strato-dwh-std'
into:
<result>
call 'prov.get_ip_list':
<rid>
usage = 'DEFAULT_IP'
into:
v.ips = o.array
36 / 42

Actual workflow example 3:

if [[ @{v{ips}} ]]:
call provdb_set_initial_ip:
zoneid = a.zoneid
<ips>
into:
<result>
v.log .= ifdef(o.log)
# unlock
unlock zoneid a.zoneid
# done
37 / 42

More features

  • limits (steps/depth)
  • error handling (try/catch)
  • split/join multiple child jobs
  • events
38 / 42

Closely related: RPC-Switch

JSON RPC 2.0 service composition

  • workers connect to the RPC Switch offering methods
  • a layer of method name mapping allows service composition
  • clients 'consume' a service

Jobcenter RPC-Switch bridge

  • 'jcswitch'
    • uses the low level PG api
  • can offer JC workflows as methods on the RPC-Switch
  • can offer RPC-Switch methods as acions to the JC
39 / 42

The JobCenter is small

  • compared to other workflow engines
  • about 6 kloc perl and 3.5 kloc sql
    • jcc is a big part of that
  • runs quite well on a Raspberry Pi 3
    • bottle neck is io (tps)
40 / 42

Todo

  • documentation
    • better & more
  • better compiler (jcc):
    • not everything compiles that should compile
    • sensible error messages
    • change syntax?
    • rewrite in perl6?
  • type checking for job-state variables
41 / 42

Questions?

Find it at:

42 / 42

A bit about me

  • Been programming since the '80s
  • Perl since the late '90s
  • Also worked with C, PHP, Python, Lua..
  • Did a lot of system administration / devops (before it was called that)
    • Mainly Linux
    • Also worked with Solaris, SCO Unix, AIX, Ultrix..
  • Worked with Mysql, PostreSQL, Orcacle, ..
2 / 42
Paused

Help

Keyboard shortcuts

, , Pg Up, k Go to previous slide
, , Pg Dn, Space, j Go to next slide
Home Go to first slide
End Go to last slide
Number + Return Go to specific slide
b / m / f Toggle blackout / mirrored / fullscreen mode
c Clone slideshow
p Toggle presenter mode
t Restart the presentation timer
?, h Toggle this help
Esc Back to slideshow