## Pipelining in Python

Posted by mtomassoli on March 29, 2012

# Pipelining in F#

F# is a multi-paradigm language which targets the .NET platform and is based on OCaml, which is a variant of ML. Even though F# supports imperative and object-oriented programming, it’s mainly a functional programming language.

*Pipelining* isn’t a new idea, no doubt about it. Unix and Linux folks have been using it for a very long time. It’s also known as *forward-chaining* and is basically a *function application in reverse*. Transformations can be chained with the operator ‘|>’ in F#.

Let’s start with a simple example:

Seq.filter (fun i -> i%2 <> 0) (Seq.map (fun i -> i*i) (seq {0..40}))

Here’s one way to express that in Python:

filter(lambda i : i%2, map(lambda i : i*i, range(41)))

We simply start with the list of the first 40 non-negative integers, square them and filter out the even ones. Therefore, we get an *iterable* which contains the following integers:

1, 9, 25, 49, 81, 121, 169, 225, 289, 361, 441, 529, 625, 729, 841, 961, 1089, 1225, 1369, 1521

Now let’s rewrite the F# version using pipelining:

seq {0..40} |> Seq.map (fun i -> i*i) |> Seq.filter (fun i -> i%2 <> 0)

The first thing we notice is that there are fewer parentheses, which is always a good thing (I probably shouldn’t have said that…). The next thing is that we can understand that line by reading it from left to right. I don’t know about you, but I had to reread the first version from right to left to fully understand it. Ok, that was simple and I wrote it so I already knew what it does, but consider the situation where we have some *highly “overloaded”* functions, i.e. functions which do different things depending on the type of the arguments (actually, we shouldn’t have such functions). Now how can you tell what a function does without knowing the right-side of the expression? That suggests that reading such an expression from left to right is less efficient.

As a side note, F# is a *typed* functional language which uses *type inference* so the programmer needs to provide types only when they can’t be derived by analyzing the expression. Well, F# likes pipelining because it lets type information *flow* from input objects to the functions which transform those objects into output objects.

Let’s get back to our expression: did you notice the *currying*?

Here’s how F# defines the operator ‘|>’:

let (|>) x f = f x

That’s all! It says that

arg |> func

is equivalent to

func arg

or, if you prefer,

func(arg)

Let’s analyze the first part of the code above:

seq {0..40} |> Seq.map (fun i -> i*i)

That must be equivalent to

Seq.map (fun i -> i*i) (seq {0..40})

It should be obvious that

Seq.map (fun i -> i*i)

is a curried version of *Seq.map*. We might say that the operator ‘|>’ lets us pass a function its last argument. By the way, now you know why *map* takes a function and a sequence in that particular order 🙂

# Pipelining in Python

What should pipelining look like in Python? I don’t know, but here’s what it looks like in my code:

range(41) >> filter(lambda i : i%2) >> map(lambda i : i*i)

Not too shabby, eh? If you haven’t done that already, please read my previous post “*Currying in Python*” because *filter* and *map* in the code above are the *currying-able* versions of the built-in functions you usually use in your code.

# Composing functions in F#

You can also compose functions in F#. The usual composition, i.e. the one your math teacher taught you, is defined (in *math*-like language) as follows:

(g o f)(x) := g(f(x))

where that ‘o’ should really be a small circle.

Since pipelining is a sort of *function application in reverse*, it’s only natural for us to *reverse* the composition of functions as well:

(f >> g)(x) := g(f(x))

Again, that’s not an F# definition: it’s just my way to explain it to you.

Yes, F# uses ‘>>’ for composition, but we’ve already used that for pipelining in Python. We’re going to use some other operator in Python. Here’s how function composition is used in F#:

let comp : int seq -> int seq = Seq.map (fun i -> i*i) >> Seq.filter (fun i -> i%2 <> 0) seq {0..40} |> comp

Please ignore the type annotations (i.e. *int seq –> int seq*) or, better, let’s write it another way:

seq {0..40} |> (Seq.map (fun i -> i*i) >> Seq.filter (fun i -> i%2 <> 0))

So what’s the difference?

Basically, we aren’t passing a sequence “through” two individual functions anymore, but through their composition, i.e. through a *single* function. That’s useful when we want to reuse a “chain” of functions. For instance (in pseudo-code) we can rewrite

out1 = input1 |> func1 |> func2 |> func3 |> func4 out2 = input2 |> func1 |> func2 |> func3 |> func4 out3 = input3 |> func1 |> func2 |> func3 |> func4 out4 = input4 |> func1 |> func2 |> func3 |> func4

as

compFunc = func1 >> func2 >> func3 >> func4 out1 = input1 |> compFunc out2 = input2 |> compFunc out3 = input3 |> compFunc out4 = input4 |> compFunc

which is much better, I think. The beauty of ‘>>’ is that it lets us compose functions in the same reversed order they appear in our pipelines.

# Composing functions in Python

You won’t like this, I’m afraid:

compFunc = filter(lambda i : i%2) - map(lambda i : i*i) range(0,41) >> compFunc

Yep: a *minus*. Why? Because I couldn’t think of anything better, but I’m open to suggestions. My first idea was to use a *plus*, then I used an *asterisk* and, finally, a *minus*. That minus is not really a minus but a sort of *link* which should remind us that functions are not composed the usual way.

Let’s create a few tests that our implementation should be able to handle.

# Some tests

To make our tests a little more readable, we’ll import some definitions and redefine a few symbols:

import sys from urllib.request import urlopen from re import findall from math import sqrt, floor from functools import reduce map = cur(map, 2) filter = cur(filter, 2) urlopen = cur(urlopen) findall = cur(findall) my_print = cur(lambda list : print(*list)) reduce = cur(reduce, 2)

As I said in my previous article, *cur* is a function which takes another function and returns a *curried* version of it. The function cur also accepts some optional arguments such as *minArgs* which tells cur to force the evaluation of the curried function after at least minArgs arguments have been provided.

## Test 1 (*Example 5, in the code)*

range(0,50) >> filter(lambda i : i%2) >> map(lambda i : i*i) >> my_print

It prints

1 9 25 49 81 121 169 225 289 361 441 529 625 729 841 961 1089 1225 1369 1521 168 1 1849 2025 2209 2401

## Test 2 (*Example 6, in the code)*

Now we check that function composition works as expected:

compFunc = filter(lambda i : i%2) - map(lambda i : i*i) range(0,50) >> compFunc >> my_print

It prints the same list of numbers of *Test 1*.

## Test 3 (*Example 7, in the code)*

This test is more complex than the first two:

# Tells whether x is not a proper multiple of n. notPropMult = cur(lambda n, x : x <= n or x % n, 2) def findPrimes(upTo): if (upTo <= 5): return [2, 3, 5] filterAll = (findPrimes(floor(sqrt(upTo))) >> map(lambda x : filter(notPropMult(x))) >> reduce(lambda f, g : f - g)) return list(range(2, upTo + 1)) >> filterAll findPrimes(1000) >> my_print

This prints:



*A little bit of math*

As *all* of you already know, to test whether *n* is a prime we can try to divide it by the integers 2,3, …, *n*-1.

As *some* of you already know, we should really limit ourselves to the integers 2,3, …, floor(sqrt(*n*)). Why is that? Because if some integer *x *divides *n* and *x* > sqrt(*n*), then there must be some positive integer *y* < sqrt(*n*) which divides *n*. So, if we didn’t find any *y *< sqrt(*n*) which divides *n*, then we can’t possibly find any *x* > sqrt(*n*).

The *Sieve of Eratosthenes *consists in taking the numbers 2, 3, …, *upTo* and filtering out all the numbers which aren’t prime numbers. Starting by 2, we cross out all multiples of 2 greater than 2. That gives us 3, 5, 7, 9, 11, …, *upTo* (assuming that *upTo* is odd). We know for sure that 3 is a prime so we keep it and start crossing out all multiples of 3 greater than 3, and so on…

When do we stop? We stop when we reach floor(sqrt(*upTo*)).

If you’ve already looked at the code above, you should have noticed that it’s recursive. Why? Because I like recursion and I wanted to do something more interesting than the usual sieve. Here’s what I came up with:

the primes up to *upTo* are

- (if
*upTo*<= 5) [2, 3, 5] - (otherwise) [2, …,
*upTo*] without the proper multiples of the primes up to floor(sqrt(*upTo*))

where a *proper multiple* of *x* is a multiple of *x* greater than *x* (we don’t want to cross out the prime numbers themselves!). Indeed, to tell if a number *n* is a prime, we don’t need to consider all the divisors up to floor(sqrt(*n*)), but just the *prime numbers* up to floor(sqrt(*n*)). If you think about it, that’s exactly what the *iterative* *Sieve of Eratosthenes* already does. With recursion, for once, we need to be a little more explicit.

So, if *upTo* = 20, we have:

primes up to 20 =

[2, …, 20] without the proper multiples of the primes up to 4 =

[2, …, 20] without the proper multiples of [2, 3, 5] =

[2, 3, 5, 7, 11, 13, 17, 19]

That 5 wasn’t needed, but I decided to pick that as the base case.

*The actual implementation*

This is the interesting part:

filterAll = (findPrimes(floor(sqrt(upTo))) >> map(lambda x : filter(notPropMult(x))) >> reduce(lambda f, g : f - g))

Let’s see what it does by looking at an example. Let assume that findPrimes(floor(sqrt(*upTo*))) is the list [2, 3, 5, 7]. Here, ‘->’ means “*is transformed into*” and corresponds to the occurrences of the ‘>>’ operator in the code above. I will ignore the differences between *lists* and *iterators* for the sake of clarity:

[2, 3, 5, 7] –>

[filter(notPropMult(2)), filter(notPropMult(3)), filter(notPropMult(5)), filter(notPropMult(7))] –>

filter(notPropMult(2)) – filter(notPropMult(3)) – filter(notPropMult(5)) – filter(notPropMult(7))

So, filterAll is really the filter we need to select all the prime numbers from the first *upTo* integers (starting from 2).

## Test 4 (*Example 8, in the code)*

This test is easy but interesting nonetheless:

def do(proc, arg): proc() return arg do = cur(do) cprint = cur(print) ("http://python.org" >> do(cprint("The page http://python.org has about... ", end = '')) >> do(sys.stdout.flush) >> urlopen >> cur(lambda x : x.read()) >> findall(b"href=\"") >> cur(len) >> cur("{} hrefs.".format) >> cprint)

It prints

The page http://python.org has about... 121 hrefs.

with a small pause after “about…” (that delay is due to urlopen).

The function *do* is basically an *identity* transformation with a side-effect. It lets us do *something* (like printing to the screen) without breaking or altering the flow. I won’t point out that that code is full of *currying*… ops. On second thought, I think that code doesn’t even need an explanation: every Python programmer should be able to *understand* what the code does and fill in the details by himself or herself.

# How do we implement Pipelining?

That’s very easy. We need to write a class to overload the operators ‘>>’ and ‘-’ so it makes sense to rewrite the code in the article “Currying in Python” like this:

class CurriedFunc: def __init__(self, func, args = (), kwArgs = {}, unique = True, minArgs = None): self.__func = func self.__myArgs = args self.__myKwArgs = kwArgs self.__unique = unique self.__minArgs = minArgs def __call__(self, *args, **kwArgs): if args or kwArgs: # some more args! # Allocates data to assign to the next CurriedFunc. newArgs = self.__myArgs + args newKwArgs = dict.copy(self.__myKwArgs) # If unique is True, we don't want repeated keyword arguments. if self.__unique and not kwArgs.keys().isdisjoint(newKwArgs): raise ValueError("Repeated kw arg while unique = True") # Adds/updates keyword arguments. newKwArgs.update(kwArgs) # Checks whether it's time to evaluate func. if self.__minArgs is not None and self.__minArgs <= len(newArgs) + len(newKwArgs): return self.__func(*newArgs, **newKwArgs) # time to evaluate func else: return CurriedFunc(self.__func, newArgs, newKwArgs, self.__unique, self.__minArgs) else: # the evaluation was forced return self.__func(*self.__myArgs, **self.__myKwArgs) def cur(f, minArgs = None): return CurriedFunc(f, (), {}, True, minArgs) def curr(f, minArgs = None): return CurriedFunc(f, (), {}, False, minArgs)

By the way, while I was recoding genCur into CurriedFunc, I noticed that a “*return*” was missing! I’ve already updated my previous article.

Now, we just need to add a couple of methods to CurriedFunc:

def __rrshift__(self, arg): return self.__func(*(self.__myArgs + (arg,)), **self.__myKwArgs) # forces evaluation def __sub__(self, other): if not isinstance(other, CurriedFunc): raise TypeError("Cannot compose a CurriedFunc with another type") def compFunc(*args, **kwArgs): return other.__func(*(other.__myArgs + (self.__func(*args, **kwArgs),)), **other.__myKwArgs) return CurriedFunc(compFunc, self.__myArgs, self.__myKwArgs, self.__unique, self.__minArgs)

As you can see, we added *__rrshift__* and not *__rshift__*. The reason is that instances of CurriedFunc will always appear to the *right* of the operator ‘>>’. The method *__rrshift__* calls function *__func* with the new positional argument *arg* without forgetting (stress on the word “forgetting”) the previously added arguments *__myArgs* and *__myKwArgs*.

The method *__sub__* is not so straight-forward. After a simple sanity check, we define a new function called *compFunc* and return a curried version of that function. Notice that the functions self.*__func* and *other.__func* are composed in the reverse order. To simplify things, let’s say we want to compose two CurriedFunc objects called *self* and *other*. We write “self – other” and we expect a CurriedFunc object representing a function which first does what *self* does and then, on the result, does what *other* does. Basically, we want “other(self(.))”. From the expression

return other.__func(*(other.__myArgs + (self.__func(*args, **kwArgs),)), **other.__myKwArgs)

it should be clear that the result of *self.__func* is passed to *other.__func* as its last positional argument. Also notice that the arguments of *other.__func* are *frozen* (except for the last positional argument coming from *self.__func*, as we’ve just said).

# The full source code

Here’s the entire source code. The new examples start from number 5.

# Coded by Massimiliano Tomassoli, 2012. class CurriedFunc: def __init__(self, func, args = (), kwArgs = {}, unique = True, minArgs = None): self.__func = func self.__myArgs = args self.__myKwArgs = kwArgs self.__unique = unique self.__minArgs = minArgs def __call__(self, *args, **kwArgs): if args or kwArgs: # some more args! # Allocates data to assign to the next CurriedFunc. newArgs = self.__myArgs + args newKwArgs = dict.copy(self.__myKwArgs) # If unique is True, we don't want repeated keyword arguments. if self.__unique and not kwArgs.keys().isdisjoint(newKwArgs): raise ValueError("Repeated kw arg while unique = True") # Adds/updates keyword arguments. newKwArgs.update(kwArgs) # Checks whether it's time to evaluate func. if self.__minArgs is not None and self.__minArgs <= len(newArgs) + len(newKwArgs): return self.__func(*newArgs, **newKwArgs) # time to evaluate func else: return CurriedFunc(self.__func, newArgs, newKwArgs, self.__unique, self.__minArgs) else: # the evaluation was forced return self.__func(*self.__myArgs, **self.__myKwArgs) def __rrshift__(self, arg): return self.__func(*(self.__myArgs + (arg,)), **self.__myKwArgs) # forces evaluation def __sub__(self, other): if not isinstance(other, CurriedFunc): raise TypeError("Cannot compose a CurriedFunc with another type") def compFunc(*args, **kwArgs): return other.__func(*(other.__myArgs + (self.__func(*args, **kwArgs),)), **other.__myKwArgs) return CurriedFunc(compFunc, self.__myArgs, self.__myKwArgs, self.__unique, self.__minArgs) def cur(f, minArgs = None): return CurriedFunc(f, (), {}, True, minArgs) def curr(f, minArgs = None): return CurriedFunc(f, (), {}, False, minArgs) # Simple Function. def func(a, b, c, d, e, f, g = 100): print(a, b, c, d, e, f, g) # NOTE: '<====' means "this line prints to the screen". # Example 1. f = cur(func) # f is a "curried" version of func c1 = f(1) c2 = c1(2, d = 4) # Note that c is still unbound c3 = c2(3)(f = 6)(e = 5) # now c = 3 c3() # () forces the evaluation <==== # it prints "1 2 3 4 5 6 100" c4 = c2(30)(f = 60)(e = 50) # now c = 30 c4() # () forces the evaluation <==== # it prints "1 2 30 4 50 60 100" print("\n------\n") # Example 2. f = curr(func) # f is a "curried" version of func # curr = cur with possibly repeated # keyword args c1 = f(1, 2)(3, 4) c2 = c1(e = 5)(f = 6)(e = 10)() # ops... we repeated 'e' because we <==== # changed our mind about it! # again, () forces the evaluation # it prints "1 2 3 4 10 6 100" print("\n------\n") # Example 3. f = cur(func, 6) # forces the evaluation after 6 arguments c1 = f(1, 2, 3) # num args = 3 c2 = c1(4, f = 6) # num args = 5 c3 = c2(5) # num args = 6 ==> evalution <==== # it prints "1 2 3 4 5 6 100" c4 = c2(5, g = -1) # num args = 7 ==> evaluation <==== # we can specify more than 6 arguments, but # 6 are enough to force the evaluation # it prints "1 2 3 4 5 6 -1" print("\n------\n") # Example 4. def printTree(func, level = None): if level is None: printTree(cur(func), 0) elif level == 6: func(g = '')() # or just func('')() else: printTree(func(0), level + 1) printTree(func(1), level + 1) printTree(func) print("\n------\n") def f2(*args): print(", ".join(["%3d"%(x) for x in args])) def stress(f, n): if n: stress(f(n), n - 1) else: f() # enough is enough stress(cur(f2), 100) # Pipelining and Function Composition print("\n--- Pipelining & Composition ---\n") import sys from urllib.request import urlopen from re import findall from math import sqrt, floor from functools import reduce map = cur(map, 2) filter = cur(filter, 2) urlopen = cur(urlopen) findall = cur(findall) my_print = cur(lambda list : print(*list)) reduce = cur(reduce, 2) # Example 5 range(0,50) >> filter(lambda i : i%2) >> map(lambda i : i*i) >> my_print print("---") # Example 6 compFunc = filter(lambda i : i%2) - map(lambda i : i*i) range(0,50) >> compFunc >> my_print print("---") # Example 7 # Tells whether x is not a proper multiple of n. notPropMult = cur(lambda n, x : x <= n or x % n, 2) def findPrimes(upTo): if (upTo <= 5): return [2, 3, 5] filterAll = (findPrimes(floor(sqrt(upTo))) >> map(lambda x : filter(notPropMult(x))) >> reduce(lambda f, g : f - g)) return list(range(2, upTo + 1)) >> filterAll findPrimes(1000) >> my_print print("---") # Example 8 # Finds the approximate number of hrefs in a web page. def do(proc, arg): proc() return arg do = cur(do) cprint = cur(print) ("http://python.org" >> do(cprint("The page http://python.org has about... ", end = '')) >> do(sys.stdout.flush) >> urlopen >> cur(lambda x : x.read()) >> findall(b"href=\"") >> cur(len) >> cur("{} hrefs.".format) >> cprint)

# That’s all!

Please feel free to let me know what you think by leaving a comment!

This entry was posted on March 29, 2012 at 8:52 pm and is filed under Python. Tagged: currying, F#, functional programming, functional programming language, language python, pipelining, python. You can follow any responses to this entry through the RSS 2.0 feed. You can leave a response, or trackback from your own site.

## buy blog articles said

Simply desire to say your article is as amazing.

The clarity in your post is simply spectacular and i can assume you are an expert on this

subject. Well with your permission let me to grab your RSS feed to keep up to date with forthcoming post.

Thanks a million and please carry on the gratifying work.

## Hernán Pentimalli said

Pal, Did you uploaded this to github or something like that? I think it should be a package ready to install with any of the package managers. Superb work

## Mihai said

super post! thank you! pipelining should be in standard python