name: mystyle layout: true --- class: center, middle # The JobCenter ## An Orchestration Workflow Engine Wieger Opmeer (opmeer@strato.de) .footnote[TPC Glasgow 2018] --- # A bit about me - Been programming since the '80s - Perl since the late '90s - Also worked with C, PHP, Python, Lua.. - Did a lot of system administration / devops (before it was called that) - Mainly Linux - Also worked with Solaris, SCO Unix, AIX, Ultrix.. - Worked with Mysql, PostreSQL, Orcacle, .. --- # A bit about Strato - Web-hosting company - Shared, Virtual, Dedicated, etc. - Been around for 20+ years - Lots of legacy code and systems - Oracle, Sybase, Solaris.. - Lots of Perl - Lots of self-built solutions - for example own KV store (dbmd) - Part of the United Internet Group since 2017 - 1&1 - home.pl, arsys.es, fasthost.co.uk.. - "Looking for synergy" --- # Orchestration Workflow Example __Ordering a Wordpress Package on Shared Webhosting__ - Allocate storage (filer) - May determine IP number - Configure DNS (nameserver) - With the IP number from the previous step - Register Domain (EPP client) - Configure Vhost (apache) - May be dependent on storage location - Allocate Mysql Database - May also be dependent on storage location - Install Wordpress - Configure Wordpress --- # How it used to be.. *(and mostly still is)* - lots ad-hoc state machines - state transition logic buried deep in the scripts - often spread over multiple scripts - most state is stored in *the* database - but some might be somewhere else - mostly cronjobs - depending on cron-schedule some things can take hours - horrible hacks to speed things up (procmail..) --- # Wouldn't it be nice if.. - we had some easy way to model state machines - we could store the workflow logic in some central place - we could introspect running jobs in some standard way - etc. --- # JobCenter Design Criteria - Database as central storage - The state machine transition table is stored there - Current state is stored there - Every state transition is logged there - "Crash-Proof" - Job state should be easily inspectable - Everything is JSON - It should be relatively easy to 'meddle' with a Job - update jobs set ... where job_id=... - Relatively "Lightweight" and "Simple" - Written in our favourite language - Open Source --- # Some JobCenter terminology - A Workflow is a state machine transition table - or a program for the JobCenter virtual machine - Actions are the possible instructions - external (done by a worker) - internal (if, while, etc) - workflow ('function') - procedure - Actions have declared 'inputs' and 'outputs' - using JSON Schemas - A Workflow is (also) an Action - workflows as subroutines --- # Some more terminology - A Task is an Action in a Workflow - an instruction in a program - A Job is an instance of a Workflow - executing the program - A Child Job is a workflow that was called from another workflow - next 'stack frame' for a subroutine - A Worker is an external entity capable of performing one of more Actions --- # A bit about Job state .nomargin[ A running job has: ] - a "current task" ("current instruction") - arguments (ro) - variables (rw) - task state All job state is persistent. ___ Per task state sequence:
??? - timestamps - stepcounter - ... --- # The JobCenter Language (JCL) .nomargin[ a custom workflow modelling language: ] - intended to look and feel like a somewhat 'normal' programming language - easy to version control - easy to diff - compiled using the `jcc` compiler - compiled form is stored in the database --- # Basic JCL example ``` workflow calltest: in: input number out: output number wfomap: output = v.thing do: call add: counter = a.input step = 3 into: thing = o.counter ``` - "input mapping" is calculating the subroutine arguments - "output mapping" is storing the subroutine results - the "workflow output map" calculates the workflow results --- # JCL: Loops ``` workflow whiletest: in: input number out: output number do: while !v.thing or v.thing < 10: call add: counter = v.thing // a.input into: thing = o.counter wfomap: output = v.thing ``` Like all job state the while loop state is persistent. --- # JCL: Ifs and such ``` workflow casetest: in: input string out: counter number thing string wfomap: counter = v.counter thing = v.thing do: case a.input: when "foo": call add: counter = 10 into: counter = o.counter thing = 'got foo' when bar, baz: eval: counter = -1 thing = 'got bar or baz' else: eval: counter = -1 thing = 'dunno what i got: ' . a.input ``` --- # Implementation - High level view
--- # Low level API - the basis is a PostgreSQL stored procedure API: - create_job - get_job_status - announce - get_task - task_done - withdraw - PostgreSQL listen/notify (pubsub) is used for ipc: - on creation of a new job - when a job finishes - when a task is ready for a worker - when a worker is done - etc. --- # The Maestro - keeps the time - timeouts - retries - scheduling of periodic jobs (dead worker detection, archiving) - tells the database what to do: - calls stored procedures on the first available connection: - in response to notifications - task ready / done / error / ping - based on the results of a query --- # The JSON-RPC 2.0 API - mostly a 1:1 translation of the low level api - uses communication over establised connections - tcp / tls / websocket - multiple request can be "in flight" at the same time - asynchronous - notifications - basically the same api for clients and workers - authentication handshake - a worker is a client that announces one or more actions - the API calls the low level stored procedures "on behalf of" or "impersonating" the clients and workers - the API adds the concept of 'slots' - a worker can limit the amount of work it is being sent --- # Implementation - Low level view
--- # JobCenter::Pg - a subclass of Mojo::Pg - adds a 'queue_query($cb)' method - calls that callback as soon as a database connection is available - the cb 'owns' that connection until it goes out of scope - allows a "at most _n_ connections" model - connections get closed after _max_use_ times - this allows the Maestro to "play" on _n_ connections the same time - this allows the jcapi to work "on behalf" of _n_ clients/workers at the same time --- # Low Level Example 1 ``` # select * from create_job('calltest','{"input":123}'::jsonb); o_job_id | o_listenstring ----------+---------------- 9 | job:9:finished ``` Query done by the client. `create_job`: - verifies that the caller is allowed to call `calltest` - verifies the input arguments against the specification - inserts a record into the jobs table - sends a `notify "jobtaskdone", '(5,3,9)'` to the Maestro - meaning: the 'start' task of this job has been done --- # Low Level Example 2 ``` # select * from do_jobtaskdone('(5,3,9)'); error | jobtask -------+--------- false | (5,2,9) ``` Query done by the Maestro on receipt of the notification. `do_jobtaskdone`: - finds the next task in the workflow - the next instruction in the program - returns the "next thing to do": - (workflow_id, task_id, job_id) ??? Ignore the error = false for now --- # Low Level Example 3 ``` # select * from do_jobtask('(5,2,9)'); error | jobtask -------+--------- false | null ``` Query done by the Maestro. `do_jobtask`: - would return the "next thing to do" - keeping the workflow moving (program running) - there is no next thing to do - because a worker needs to do the 'add' action first - a `notify "action:1:ready", '{"job_id": 9}'` has been sent - alerting the worker --- # Low Level Example 4 ``` # select * from get_task('thisworker','add',9); o_job_id | o_job_cookie | o_in_args | o_env ----------+--------------+-----------------+------- 9 | da3c5c5d | {"counter":123} | null ``` Query done by the worker on receipt of the notification. `get_task`: - returns the "work to do" if there is any - maybe another worker was faster - the cookie is unique for "this worker doing this task now" --- # Low Level Example 5 ``` # select task_done('da3c5c5d...', '{"counter":126}'); void ``` Query done by the worker when it has done it's work. `task_done`: - looks for the 'jobbask' using the cookie - verifies the "output arguments" against the specification - performs the output mapping - stores the new value for the variables in the job record - does a `NOTIFY "jobtaskdone", '(5,2,9)'` --- # Low Level Example 6 ``` # select * from do_jobtaskdone('(5,2,9)'); error | jobtask -------+--------- false | (5,4,9) ``` Query done by the Maestro on receipt of the notification. `do_jobtaskdone`: - finds the next task in the workflow --- # Low Level Example 7 ``` # select * from do_jobtask('(5,4,9)'); error | jobtask -------+--------- false | null ``` Query done by the Maestro. Task_id 4 happens to be the "end task" for workflow_id 5: - the workflow output map (wfomap) is executed - the 'outargs' are checked against the workflow specification - the 'outargs' are stored in the job record - a `notify "job:9:finished", ''` is sent. --- # Low Level Example 8 ``` # select get_job_status(9); get_job_status ----------------- {"output": 126} ``` Called by the client on receipt of the notifcation `get_job_status`: - returns the job results - or *null* if the job hasn't finished yet --- # Secret Perl - all expressions get compiled to perl - imap - omap - wfomap - if / while / repeat (bool) - case (string) - executed in PostgreSQL using pl/perl --- # So this : ``` workflow calltest: in: input number out: output number wfomap: output = v.thing do: call add: counter = a.input step = 3 into: thing = o.counter ``` --- # Becomes this: ``` workflow calltest: in: input number out: output number wfomap: * [[ $o{output} = $v{thing}; ]] do: call add: * [[ * $i{counter} = $a{input}; * $i{step} = 3; * ]] into: * [[ $v{thing} = $o{counter}; ]] ``` --- # And is executed like this: ```perl CREATE OR REPLACE FUNCTION jobcenter.do_imap(code text, args jsonb, env jsonb, vars jsonb) RETURNS jsonb LANGUAGE plperl AS $function$ use strict; use warnings; use JSON::MaybeXS qw(from_json to_json JSON); use JobCenter::Safe; my $safe = new JobCenter::Safe; my ($code, $jargs, $jenv, $jvars) = @_; our %a = %{from_json($jargs // '{}')}; our %e = %{from_json($jenv // '{}')}; our %v = %{from_json($jvars // '{}')}; our %i = (); our $TRUE = JSON->true; our $FALSE = JSON->false; our $JSON = JSON::MaybeXS->new(utf8 => 0); $safe->share(qw(%a %e %v %i $TRUE $FALSE $JSON)); $safe->reval($code, 1); die "$@" if $@; return to_json(\%i); $function$ ``` --- # JobCenter::Client::Mojo Worker ```perl use JobCenter::Client::Mojo; use Mojo::IOLoop; $client = JobCenter::Client::Mojo->new( who => 'theEmployee', token => 'doesThings', ); $client->announce( actionname => 'add', cd => \&do_add, mode => 'async', ) $client->work(); sub do_add { my ($job_id, $vars, $cb) = @_; Mojo::IOLoop->timer(.5 => sub { $cb->({ counter => $vars->{counter} + $vars->{step} }); }); } ``` --- # JobCenter::Client::Mojo Client ```perl my $client = JobCenter::Client::Mojo->new( who => 'theCustomer', token => 'wantsThings', json => 1, ); my ($job_id, $outargs) = $client->call( wfname => 'calltest', inargs => '{"input":456}' ); print "$job_id: $outargs\n" if $job_id; ``` > 2824: {"output":459} --- # Actual workflow example: ``` workflow "prov.vhost_event": in: zone string zoneid integer vhost string out: result boolean log string optional *role: jobs_spooler wfomap: <result> log = ifdef(v.log) *locks: zoneid _ manual workflowlock 'prov.vhost_event' do: `lock zoneid a.zoneid # lock zoneid` call provdb_get_rid_by_vhost: zoneid = a.zoneid vhost = a.vhost into: <rid> <orderno> <clientid> ``` --- ## Actual workflow example 2: ``` call dwh_vhost_event: <rid> <orderno> <clientid> zone = a.zone zoneid = a.zoneid vhost = a.vhost into: <result> v.postconfigure = ifdef(o.postconfigure) v.log = ifdef(o.comment) `if [[ $v{postconfigure} and $v{postconfigure} eq 'set_default_ip' ]]:` call provdb_update_zone_template: zoneid = a.zoneid template = 'strato-dwh-std' into: <result> call 'prov.get_ip_list': <rid> usage = 'DEFAULT_IP' into: v.ips = o.array ``` --- ## Actual workflow example 3: ``` if [[ @{v{ips}} ]]: call provdb_set_initial_ip: zoneid = a.zoneid <ips> into: <result> v.log .= ifdef(o.log) # unlock unlock zoneid a.zoneid # done ``` --- # More features - limits (steps/depth) - error handling (try/catch) - split/join multiple child jobs - events --- # Closely related: RPC-Switch ### JSON RPC 2.0 service composition - workers connect to the RPC Switch offering methods - a layer of method name mapping allows service composition - clients 'consume' a service ### Jobcenter RPC-Switch bridge - 'jcswitch' - uses the low level PG api - can offer JC workflows as methods on the RPC-Switch - can offer RPC-Switch methods as acions to the JC --- # The JobCenter is small - compared to other workflow engines - about 6 kloc perl and 3.5 kloc sql - jcc is a big part of that - runs quite well on a Raspberry Pi 3 - bottle neck is io (tps) --- # Todo - documentation - better & more - better compiler (jcc): - not everything compiles that should compile - sensible error messages - change syntax? - rewrite in perl6? - type checking for job-state variables --- # Questions? Find it at: - https://github.com/a6502/jobcenter - https://gitlab.com/a6502/jobcenter - JobCenter::Client::Mojo on CPAN - including: - jcclient command line client - jcworker example worker