Computer Science

Here you'll find some information about programming languages and software development in general.

Posts Tagged ‘pipelining’

Pipelining in Python

Posted by mtomassoli on March 29, 2012

Pipelining in F#

F# is a multi-paradigm language which targets the .NET platform and is based on OCaml, which is a variant of ML. Even though F# supports imperative and object-oriented programming, it’s mainly a functional programming language.

Pipelining isn’t a new idea, no doubt about it. Unix and Linux folks have been using it for a very long time. It’s also known as forward-chaining and is basically a function application in reverse. Transformations can be chained with the operator ‘|>’ in F#.

Let’s start with a simple example:

Seq.filter (fun i -> i%2 <> 0) (Seq.map (fun i -> i*i) (seq {0..40}))

Here’s one way to express that in Python:

filter(lambda i : i%2, map(lambda i : i*i, range(41)))

We simply start with the list of the first 40 non-negative integers, square them and filter out the even ones. Therefore, we get an iterable which contains the following integers:

1, 9, 25, 49, 81, 121, 169, 225, 289, 361, 441, 529, 625, 729, 841, 961, 1089, 1225, 1369, 1521

Now let’s rewrite the F# version using pipelining:

seq {0..40} |> Seq.map (fun i -> i*i) |> Seq.filter (fun i -> i%2 <> 0)

The first thing we notice is that there are fewer parentheses, which is always a good thing (I probably shouldn’t have said that…). The next thing is that we can understand that line by reading it from left to right. I don’t know about you, but I had to reread the first version from right to left to fully understand it. Ok, that was simple and I wrote it so I already knew what it does, but consider the situation where we have some highly “overloaded” functions, i.e. functions which do different things depending on the type of the arguments (actually, we shouldn’t have such functions). Now how can you tell what a function does without knowing the right-side of the expression? That suggests that reading such an expression from left to right is less efficient.

As a side note, F# is a typed functional language which uses type inference so the programmer needs to provide types only when they can’t be derived by analyzing the expression. Well, F# likes pipelining because it lets type information flow from input objects to the functions which transform those objects into output objects.

Let’s get back to our expression: did you notice the currying?

Here’s how F# defines the operator ‘|>’:

let (|>) x f = f x

That’s all! It says that

arg |> func

is equivalent to

func arg

or, if you prefer,

func(arg)

Let’s analyze the first part of the code above:

seq {0..40} |> Seq.map (fun i -> i*i)

That must be equivalent to

Seq.map (fun i -> i*i) (seq {0..40})

It should be obvious that

Seq.map (fun i -> i*i)

is a curried version of Seq.map. We might say that the operator ‘|>’ lets us pass a function its last argument. By the way, now you know why map takes a function and a sequence in that particular order 🙂

Pipelining in Python

What should pipelining look like in Python? I don’t know, but here’s what it looks like in my code:

range(41) >> filter(lambda i : i%2) >> map(lambda i : i*i)

Not too shabby, eh? If you haven’t done that already, please read my previous post “Currying in Python” because filter and map in the code above are the currying-able versions of the built-in functions you usually use in your code.

Composing functions in F#

You can also compose functions in F#. The usual composition, i.e. the one your math teacher taught you, is defined (in math-like language) as follows:

(g o f)(x) := g(f(x))

where that ‘o’ should really be a small circle.

Since pipelining is a sort of function application in reverse, it’s only natural for us to reverse the composition of functions as well:

(f >> g)(x) := g(f(x))

Again, that’s not an F# definition: it’s just my way to explain it to you.

Yes, F# uses ‘>>’ for composition, but we’ve already used that for pipelining in Python. We’re going to use some other operator in Python. Here’s how function composition is used in F#:

let comp : int seq -> int seq = Seq.map (fun i -> i*i) >> Seq.filter (fun i -> i%2 <> 0)
seq {0..40} |> comp

Please ignore the type annotations (i.e. int seq –> int seq) or, better, let’s write it another way:

seq {0..40} |> (Seq.map (fun i -> i*i) >> Seq.filter (fun i -> i%2 <> 0))

So what’s the difference?

Basically, we aren’t passing a sequence “through” two individual functions anymore, but through their composition, i.e. through a single function. That’s useful when we want to reuse a “chain” of functions. For instance (in pseudo-code) we can rewrite

out1 = input1 |> func1 |> func2 |> func3 |> func4
out2 = input2 |> func1 |> func2 |> func3 |> func4
out3 = input3 |> func1 |> func2 |> func3 |> func4
out4 = input4 |> func1 |> func2 |> func3 |> func4

as

compFunc = func1 >> func2 >> func3 >> func4
out1 = input1 |> compFunc
out2 = input2 |> compFunc
out3 = input3 |> compFunc
out4 = input4 |> compFunc

which is much better, I think. The beauty of ‘>>’ is that it lets us compose functions in the same reversed order they appear in our pipelines.

Composing functions in Python

You won’t like this, I’m afraid:

compFunc = filter(lambda i : i%2) - map(lambda i : i*i)
range(0,41) >> compFunc

Yep: a minus. Why? Because I couldn’t think of anything better, but I’m open to suggestions. My first idea was to use a plus, then I used an asterisk and, finally, a minus. That minus is not really a minus but a sort of link which should remind us that functions are not composed the usual way.

Let’s create a few tests that our implementation should be able to handle.

Some tests

To make our tests a little more readable, we’ll import some definitions and redefine a few symbols:

import sys
from urllib.request import urlopen
from re import findall
from math import sqrt, floor
from functools import reduce

map = cur(map, 2)
filter = cur(filter, 2)
urlopen = cur(urlopen)
findall = cur(findall)
my_print = cur(lambda list : print(*list))
reduce = cur(reduce, 2)

As I said in my previous article, cur is a function which takes another function and returns a curried version of it. The function cur also accepts some optional arguments such as minArgs which tells cur to force the evaluation of the curried function after at least minArgs arguments have been provided.

Test 1 (Example 5, in the code)

range(0,50) >> filter(lambda i : i%2) >> map(lambda i : i*i) >> my_print

It prints

1 9 25 49 81 121 169 225 289 361 441 529 625 729 841 961 1089 1225 1369 1521 168
1 1849 2025 2209 2401

Test 2 (Example 6, in the code)

Now we check that function composition works as expected:

compFunc = filter(lambda i : i%2) - map(lambda i : i*i)
range(0,50) >> compFunc >> my_print

It prints the same list of numbers of Test 1.

Test 3 (Example 7, in the code)

This test is more complex than the first two:

# Tells whether x is not a proper multiple of n.
notPropMult = cur(lambda n, x : x <= n or x % n, 2)

def findPrimes(upTo):
    if (upTo <= 5): return [2, 3, 5]
    filterAll = (findPrimes(floor(sqrt(upTo)))
                 >> map(lambda x : filter(notPropMult(x)))
                 >> reduce(lambda f, g : f - g))
    return list(range(2, upTo + 1)) >> filterAll

findPrimes(1000) >> my_print

This prints:

2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97 101 103 1
07 109 113 127 131 137 139 149 151 157 163 167 173 179 181 191 193 197 199 211 2
23 227 229 233 239 241 251 257 263 269 271 277 281 283 293 307 311 313 317 331 3
37 347 349 353 359 367 373 379 383 389 397 401 409 419 421 431 433 439 443 449 4
57 461 463 467 479 487 491 499 503 509 521 523 541 547 557 563 569 571 577 587 5
93 599 601 607 613 617 619 631 641 643 647 653 659 661 673 677 683 691 701 709 7
19 727 733 739 743 751 757 761 769 773 787 797 809 811 821 823 827 829 839 853 8
57 859 863 877 881 883 887 907 911 919 929 937 941 947 953 967 971 977 983 991 9
97

A little bit of math

As all of you already know, to test whether n is a prime we can try to divide it by the integers 2,3, …, n-1.
As some of you already know, we should really limit ourselves to the integers 2,3, …, floor(sqrt(n)). Why is that? Because if some integer x divides n and x > sqrt(n), then there must be some positive integer y < sqrt(n) which divides n. So, if we didn’t find any y < sqrt(n) which divides n, then we can’t possibly find any x > sqrt(n).

The Sieve of Eratosthenes consists in taking the numbers 2, 3, …, upTo and filtering out all the numbers which aren’t prime numbers. Starting by 2, we cross out all multiples of 2 greater than 2. That gives us 3, 5, 7, 9, 11, …, upTo (assuming that upTo is odd). We know for sure that 3 is a prime so we keep it and start crossing out all multiples of 3 greater than 3, and so on…

When do we stop? We stop when we reach floor(sqrt(upTo)).

If you’ve already looked at the code above, you should have noticed that it’s recursive. Why? Because I like recursion and I wanted to do something more interesting than the usual sieve. Here’s what I came up with:

the primes up to upTo are

  • (if upTo <= 5)      [2, 3, 5]
  • (otherwise)           [2, …, upTo] without the proper multiples of the primes up to floor(sqrt(upTo))

where a proper multiple of x is a multiple of x greater than x (we don’t want to cross out the prime numbers themselves!). Indeed, to tell if a number n is a prime, we don’t need to consider all the divisors up to floor(sqrt(n)), but just the prime numbers up to floor(sqrt(n)). If you think about it, that’s exactly what the iterative Sieve of Eratosthenes already does. With recursion, for once, we need to be a little more explicit.

So, if upTo = 20, we have:

    primes up to 20 =
    [2, …, 20] without the proper multiples of the primes up to 4 =
    [2, …, 20] without the proper multiples of [2, 3, 5] =
    [2, 3, 5, 7, 11, 13, 17, 19]

That 5 wasn’t needed, but I decided to pick that as the base case.

The actual implementation

This is the interesting part:

    filterAll = (findPrimes(floor(sqrt(upTo)))
                 >> map(lambda x : filter(notPropMult(x)))
                 >> reduce(lambda f, g : f - g))

Let’s see what it does by looking at an example. Let assume that findPrimes(floor(sqrt(upTo))) is the list [2, 3, 5, 7]. Here, ‘->’ means “is transformed into” and corresponds to the occurrences of the ‘>>’ operator in the code above. I will ignore the differences between lists and iterators for the sake of clarity:

    [2, 3, 5, 7] –>
    [filter(notPropMult(2)), filter(notPropMult(3)), filter(notPropMult(5)), filter(notPropMult(7))] –>
    filter(notPropMult(2)) – filter(notPropMult(3)) – filter(notPropMult(5)) – filter(notPropMult(7))

So, filterAll is really the filter we need to select all the prime numbers from the first upTo integers (starting from 2).

Test 4 (Example 8, in the code)

This test is easy but interesting nonetheless:

def do(proc, arg):
   proc()
   return arg
do = cur(do)

cprint = cur(print)

("http://python.org"
 >> do(cprint("The page http://python.org has about... ", end = ''))
 >> do(sys.stdout.flush)
 >> urlopen
 >> cur(lambda x : x.read())
 >> findall(b"href=\"")
 >> cur(len)
 >> cur("{} hrefs.".format)
 >> cprint)

It prints

The page http://python.org has about... 121 hrefs.

with a small pause after “about…” (that delay is due to urlopen).

The function do is basically an identity transformation with a side-effect. It lets us do something (like printing to the screen) without breaking or altering the flow. I won’t point out that that code is full of currying… ops. On second thought, I think that code doesn’t even need an explanation: every Python programmer should be able to understand what the code does and fill in the details by himself or herself.

How do we implement Pipelining?

That’s very easy. We need to write a class to overload the operators ‘>>’ and ‘-’ so it makes sense to rewrite the code in the article “Currying in Python” like this:

class CurriedFunc:
    def __init__(self, func, args = (), kwArgs = {}, unique = True, minArgs = None):
        self.__func = func
        self.__myArgs = args
        self.__myKwArgs = kwArgs
        self.__unique = unique
        self.__minArgs = minArgs

    def __call__(self, *args, **kwArgs):
        if args or kwArgs:                  # some more args!
            # Allocates data to assign to the next CurriedFunc.
            newArgs = self.__myArgs + args
            newKwArgs = dict.copy(self.__myKwArgs)

            # If unique is True, we don't want repeated keyword arguments.
            if self.__unique and not kwArgs.keys().isdisjoint(newKwArgs):
                raise ValueError("Repeated kw arg while unique = True")

            # Adds/updates keyword arguments.
            newKwArgs.update(kwArgs)

            # Checks whether it's time to evaluate func.
            if self.__minArgs is not None and self.__minArgs <= len(newArgs) + len(newKwArgs):
                return self.__func(*newArgs, **newKwArgs)       # time to evaluate func
            else:
                return CurriedFunc(self.__func, newArgs, newKwArgs, self.__unique, self.__minArgs)
        else:                               # the evaluation was forced
            return self.__func(*self.__myArgs, **self.__myKwArgs)

def cur(f, minArgs = None):
    return CurriedFunc(f, (), {}, True, minArgs)

def curr(f, minArgs = None):
    return CurriedFunc(f, (), {}, False, minArgs)

By the way, while I was recoding genCur into CurriedFunc, I noticed that a “return” was missing! I’ve already updated my previous article.

Now, we just need to add a couple of methods to CurriedFunc:

    def __rrshift__(self, arg):
        return self.__func(*(self.__myArgs + (arg,)), **self.__myKwArgs)      # forces evaluation

    def __sub__(self, other):
        if not isinstance(other, CurriedFunc):
            raise TypeError("Cannot compose a CurriedFunc with another type")

        def compFunc(*args, **kwArgs):
            return other.__func(*(other.__myArgs + (self.__func(*args, **kwArgs),)),
                                **other.__myKwArgs)

        return CurriedFunc(compFunc, self.__myArgs, self.__myKwArgs,
                           self.__unique, self.__minArgs)

As you can see, we added __rrshift__ and not __rshift__. The reason is that instances of CurriedFunc will always appear to the right of the operator ‘>>’. The method __rrshift__ calls function __func with the new positional argument arg without forgetting (stress on the word “forgetting”) the previously added arguments __myArgs and __myKwArgs.

The method __sub__ is not so straight-forward. After a simple sanity check, we define a new function called compFunc and return a curried version of that function. Notice that the functions self.__func and other.__func are composed in the reverse order. To simplify things, let’s say we want to compose two CurriedFunc objects called self and other. We write “self – other” and we expect a CurriedFunc object representing a function which first does what self does and then, on the result, does what other does. Basically, we want “other(self(.))”. From the expression

            return other.__func(*(other.__myArgs + (self.__func(*args, **kwArgs),)),
                                **other.__myKwArgs)

it should be clear that the result of self.__func is passed to other.__func as its last positional argument. Also notice that the arguments of other.__func are frozen (except for the last positional argument coming from self.__func, as we’ve just said).

The full source code

Here’s the entire source code. The new examples start from number 5.

# Coded by Massimiliano Tomassoli, 2012.

class CurriedFunc:
    def __init__(self, func, args = (), kwArgs = {}, unique = True, minArgs = None):
        self.__func = func
        self.__myArgs = args
        self.__myKwArgs = kwArgs
        self.__unique = unique
        self.__minArgs = minArgs

    def __call__(self, *args, **kwArgs):
        if args or kwArgs:                  # some more args!
            # Allocates data to assign to the next CurriedFunc.
            newArgs = self.__myArgs + args
            newKwArgs = dict.copy(self.__myKwArgs)

            # If unique is True, we don't want repeated keyword arguments.
            if self.__unique and not kwArgs.keys().isdisjoint(newKwArgs):
                raise ValueError("Repeated kw arg while unique = True")

            # Adds/updates keyword arguments.
            newKwArgs.update(kwArgs)

            # Checks whether it's time to evaluate func.
            if self.__minArgs is not None and self.__minArgs <= len(newArgs) + len(newKwArgs):
                return self.__func(*newArgs, **newKwArgs)       # time to evaluate func
            else:
                return CurriedFunc(self.__func, newArgs, newKwArgs, self.__unique, self.__minArgs)
        else:                               # the evaluation was forced
            return self.__func(*self.__myArgs, **self.__myKwArgs)

    def __rrshift__(self, arg):
        return self.__func(*(self.__myArgs + (arg,)), **self.__myKwArgs)      # forces evaluation

    def __sub__(self, other):
        if not isinstance(other, CurriedFunc):
            raise TypeError("Cannot compose a CurriedFunc with another type")

        def compFunc(*args, **kwArgs):
            return other.__func(*(other.__myArgs + (self.__func(*args, **kwArgs),)),
                                **other.__myKwArgs)

        return CurriedFunc(compFunc, self.__myArgs, self.__myKwArgs,
                           self.__unique, self.__minArgs)

def cur(f, minArgs = None):
    return CurriedFunc(f, (), {}, True, minArgs)

def curr(f, minArgs = None):
    return CurriedFunc(f, (), {}, False, minArgs)

# Simple Function.
def func(a, b, c, d, e, f, g = 100):
    print(a, b, c, d, e, f, g)

# NOTE: '<====' means "this line prints to the screen".

# Example 1.
f = cur(func)                   # f is a "curried" version of func
c1 = f(1)
c2 = c1(2, d = 4)               # Note that c is still unbound
c3 = c2(3)(f = 6)(e = 5)        # now c = 3
c3()                            # () forces the evaluation              <====
                                #   it prints "1 2 3 4 5 6 100"
c4 = c2(30)(f = 60)(e = 50)     # now c = 30
c4()                            # () forces the evaluation              <====
                                #   it prints "1 2 30 4 50 60 100"

print("\n------\n")

# Example 2.
f = curr(func)                  # f is a "curried" version of func
                                # curr = cur with possibly repeated
                                #   keyword args
c1 = f(1, 2)(3, 4)
c2 = c1(e = 5)(f = 6)(e = 10)() # ops... we repeated 'e' because we     <====
                                #   changed our mind about it!
                                #   again, () forces the evaluation
                                #   it prints "1 2 3 4 10 6 100"

print("\n------\n")

# Example 3.
f = cur(func, 6)        # forces the evaluation after 6 arguments
c1 = f(1, 2, 3)         # num args = 3
c2 = c1(4, f = 6)       # num args = 5
c3 = c2(5)              # num args = 6 ==> evalution                    <====
                        #   it prints "1 2 3 4 5 6 100"
c4 = c2(5, g = -1)      # num args = 7 ==> evaluation                   <====
                        #   we can specify more than 6 arguments, but
                        #   6 are enough to force the evaluation
                        #   it prints "1 2 3 4 5 6 -1"

print("\n------\n")

# Example 4.
def printTree(func, level = None):
    if level is None:
        printTree(cur(func), 0)
    elif level == 6:
        func(g = '')()      # or just func('')()
    else:
        printTree(func(0), level + 1)
        printTree(func(1), level + 1)

printTree(func)

print("\n------\n")

def f2(*args):
    print(", ".join(["%3d"%(x) for x in args]))

def stress(f, n):
    if n: stress(f(n), n - 1)
    else: f()               # enough is enough

stress(cur(f2), 100)

# Pipelining and Function Composition
print("\n--- Pipelining & Composition ---\n")

import sys
from urllib.request import urlopen
from re import findall
from math import sqrt, floor
from functools import reduce

map = cur(map, 2)
filter = cur(filter, 2)
urlopen = cur(urlopen)
findall = cur(findall)
my_print = cur(lambda list : print(*list))
reduce = cur(reduce, 2)

# Example 5

range(0,50) >> filter(lambda i : i%2) >> map(lambda i : i*i) >> my_print

print("---")

# Example 6

compFunc = filter(lambda i : i%2) - map(lambda i : i*i)
range(0,50) >> compFunc >> my_print

print("---")

# Example 7

# Tells whether x is not a proper multiple of n.
notPropMult = cur(lambda n, x : x <= n or x % n, 2)

def findPrimes(upTo):
    if (upTo <= 5): return [2, 3, 5]
    filterAll = (findPrimes(floor(sqrt(upTo)))
                 >> map(lambda x : filter(notPropMult(x)))
                 >> reduce(lambda f, g : f - g))
    return list(range(2, upTo + 1)) >> filterAll

findPrimes(1000) >> my_print

print("---")

# Example 8
# Finds the approximate number of hrefs in a web page.

def do(proc, arg):
   proc()
   return arg
do = cur(do)

cprint = cur(print)

("http://python.org"
 >> do(cprint("The page http://python.org has about... ", end = ''))
 >> do(sys.stdout.flush)
 >> urlopen
 >> cur(lambda x : x.read())
 >> findall(b"href=\"")
 >> cur(len)
 >> cur("{} hrefs.".format)
 >> cprint)

That’s all!

Please feel free to let me know what you think by leaving a comment!

Advertisements

Posted in Python | Tagged: , , , , , , | 3 Comments »