Job

The basic unit of execution, say a python method or a callable.

for methods without arguments

from pypette import Job

def method():
    print("Hello!")

job = Job(method)

for methods with arguments

from pypette import Job

def method(msg):
    print("Hello " + msg + "!")

# As argument list
job = Job(method, args=("World",))

# As key word arguments
job = Job(method, kwargs={"msg":"World"})

for lambda methods

from pypette import Job

job = Job(lambda: print("Hello World!"))

BashJob

The basic unit of execution, which runs a bash command.

from pypette import BashJob

job = BashJob(['ls', '-l'])
job = BashJob(['pwd'])
job = BashJob(['cat', 'file.txt', '|', 'grep', 'colours'])

Pipe

Structure to specify the flow in which the jobs need to be executed.

Creation Modes

FAIL_FAST (Default)

Pipe objects created in this mode do not execute any stages to come after an exception is thrown

from pypette import Pipe, Gate

p = Pipe('Test')
p = Pipe('Test', gate=Gate.FAIL_FAST)

EXECUTE_ALL

Pipe objects created in this mode continue to execute all stages to come after an exception is thrown i,e these pipelines are resilient to exceptions.

from pypette import Pipe, Gate

p = Pipe('Test', gate=Gate.EXECUTE_ALL)

Adding jobs to pipeline

Any object of the type BashJob, Job or Pipe can be added as a job to the pipeline

in series

from pypette import Pipe

p = Pipe('Test')

# A list of jobs can be added in series as
p.add_stage(job1)
p.add_stage(job2)
p.add_stage(job3)
p.add_stage(job4)

# Or as a job list
p.add_jobs([job1, job2, job3, job4])

in parallel

from pypette import Pipe

p = Pipe('Test')

# A list of jobs can be added in series as
p.add_stage(job1, job2, job3, job4)

# Or as a job list
p.add_jobs([job1, job2, job3, job4], run_in_parallel=True)

Adding dependencies to pipelines

Irrespective of the mode the Pipe object is created in, we several times come across scenarios where we want to create dependencies i,e we do not want a pipeline to run unless some other pipeline has succeeded. We can add one Pipe object as a dependency to another Pipe object as follows

from pypette import Pipe

build = Pipe('Build')
test = Pipe('Test')

# Run test pipeline only if build has run and it was successful.
test.add_dependency(build)

cleanup = Pipe('Cleanup')
# Run cleanup pipeline only if build and test have run and completed
# successfully
cleanup.add_dependency(build, test)

Visualizing the pipeline structure

We can visualize the entire pipeline within the comfort of the terminal itself using the graph() method

from pypette import Pipe

test = Pipe('Test')
...
test.graph()

Running the pipeline

We can start executing the stages of the pipeline by using the run() method

from pypette import Pipe

test = Pipe('Test')
...
test.run()

Generating report for the pipeline run

Once the pipleline has been run, we can generate a report of the run using the report() method

from pypette import Pipe

test = Pipe('Test')
...
test.report()

Building complex pipelines

Jobs submitted to pipeline should be callables i.e. structures which can be run. This means python methods, lambdas etc qualify.

What about Pipe itself?

Of course, it is a callable and you can submit a pipe object to be run along with regular jobs. This way you can build small pipelines which achieve a specific task and then combine them to create more complex pipelines.

Let us understand building pipelines using these jobs

from pypette import Job

def _good():
    print("I am good.")

def _bad():
    raise Exception("I am bad.")

def _ugly(): print("I"
    + "am"
        + "ugly.")

def _dummy():
    pass

good = Job(_good)
bad = Job(_bad)
ugly = Job(_ugly)
dummy = Job(_dummy)

Exception scenarios

Let us understand how a pipeline behaves in case of exception scenarios

Exception thrown in series

from pypette import Pipe

p = Pipe("Test") # Note is similar to Pipe("Test", gate=Gate.FAIL_FAST)

p.add_stage(good)
p.add_stage(bad)
p.add_stage(ugly)

p.run()

# Executes good, bad and as bad throws exception, exits pipeline without
# executing ugly.

Exception thrown in parallel

from pypette import Pipe

p = Pipe("Test") # Note is similar to Pipe("Test", gate=Gate.FAIL_FAST)

p.add_stage(good, bad)
p.add_stage(ugly)

p.run()

# Executes both good and bad as they need to be executed in parallel and we
# have no control over what goes before what. And as bad throws an exception
# does not execute the next stage which has ugly.

Resilient mode

from pypette import Gate, Pipe

p = Pipe("Test", gate=Gate.EXECUTE_ALL)

# For a pipe created in the following way, ugly would have been executed for
# both scenarios of exception in series and parallel listed above. What this
# means is that bad would have thrown an exception but that wouldn't have
# stopped pipeline from executing stages after it.

Combining everything

Combining all the scenarios listed above we can create a complex pipeline with jobs and sub pipes etc as follows.

from pypette import Gate, Pipe

jenkins = Pipe("Jenkins", gate=Gate.EXECUTE_ALL)

build = Pipe("Build", gate=Gate.EXECUTE_ALL)
build.add_stage(good)

# Test pipeline will throw an exception.
test = Pipe("Test", gate=Gate.EXECUTE_ALL)
test.add_stage(bad)

# Cleanup pipeline is dependent on both build and test pipeline success.
cleanup = Pipe("Cleanup", gate=Gate.EXECUTE_ALL)
cleanup.add_stage(ugly)
cleanup.add_dependency(build, test)

jenkins.add_stage(dummy, build)
jenkins.add_stage(test, dummy)
jenkins.add_stage(cleanup, dummy, dummy)

jenkins.graph()

# Would output a graph on the lines of
#
# Pipe(Jenkins)
# |
# |----------------------------
# |            |              |
# |          dummy        Pipe(Build)
# |----------------------------
# |            |              |
# |          Pipe(Test)     dummy
# |------------------------------------------
#             |               |             |
#           Pipe(Cleanup)   dummy         dummy
#
# Pipe(Build)
# |
# |----------- good
#
# Pipe(Test)
# |
# |------------ bad
#
# Pipe(Cleanup)
# |
# |------------ ugly

jenkins.run()

# Take a minute before reading the answer and make a mental note of what
# all gets executed and why?
#
# Answer:
# dummy job gets executed 4 times as EXECUTE_ALL is exception resilient.
#
# build pipeline gets successfully executed (implying all internal jobs get
# executed successully)
#
# test pipeline fails (implying one or more jobs threw an exception. Note
# that even though other jobs do get executed within this pipeline as it is
# resilient, it still is marked as overall failure as one ore more jobs
# threw an exception.)
#
# cleanup pipeline wont start executing as it is dependent on build and test
# being successful and test pipe has failed.
#
# jenkins overall gets marked as failed as one or more jobs/sub pipes have
# failed.

jenkins.report()

# Would provide a report on the lines of
#
# Pipe(Jenkins)
# |
# |----------------------------
# |            |              |
# |          SUCCESS        SUCCESS
# |----------------------------
# |            |              |
# |          FAILED         SUCCESS
# |------------------------------------------
#             |               |             |
#            FAILED         SUCCESS       SUCCESS
#
# Pipe(Build)
# |
# |----------- SUCCESS
#
# Pipe(Test)
# |
# |------------ FAILED
#
# Pipe(Cleanup)
# |
# |------------ FAILED