Primes

An old interview challenge is to generate prime numbers or check if a number is prime. No advanced mathematics needed, just variants on the Sieve of Eratosthenes. Starting with a basic prime checker.

In [1]:
def isprime(n):
    divs = range(2, int(n ** 0.5) + 1)
    return all(n % d for d in divs)

%time isprime(1_000_003)
CPU times: user 83 µs, sys: 1e+03 ns, total: 84 µs
Wall time: 85.1 µs
Out[1]:
True

A common optimization is to skip even numbers.

In [2]:
def isprime(n):
    divs = range(3, int(n ** 0.5) + 1, 2)
    return n == 2 or all(n % d for d in divs)

%time isprime(1_000_003)
CPU times: user 43 µs, sys: 0 ns, total: 43 µs
Wall time: 46.3 µs
Out[2]:
True

Brief digression on that optimization. There's nothing special about removing multiples of 2; removing multiples is the whole point. The step scalar could instead be thought of as a cycle: itertools.accumulate(itertools.repeat(2)). So removing multiples of 3 would remove every third step: itertools.accumulate(itertools.cycle([2, 4])).

Or the equivalent could be done with slicing.

In [3]:
import itertools

def isprime(n):
    divs = range(5, int(n ** 0.5) + 1, 2)
    return n in (2, 3) or all(n % d for d in itertools.chain(divs[::3], divs[1::3]))

%time isprime(1_000_003)
CPU times: user 42 µs, sys: 1 µs, total: 43 µs
Wall time: 44.1 µs
Out[3]:
True

The catch is the cycles grow exponentially with diminishing returns on each successive number.

Onto prime generation, while keeping the odds-only optimization. Typically it's requested to generate the first N primes, or up to some value. But that's easily generalized with itertools.islice and itertools.takewhile. A more Pythonic approach is an unbounded generator.

In [4]:
def primes():
    yield 2
    ints = itertools.count(3, 2)
    while True:
        prime = next(ints)
        yield prime
        ints = (n for n in ints if n % prime)

list(itertools.islice(primes(), 10))
Out[4]:
[2, 3, 5, 7, 9, 11, 13, 15, 17, 19]

Elegant, but doesn't work. The problem is the scoping of prime, which is being used in the generator expression but also modified in the loop. Instead it can be replaced with a filter on a partially bound function, but unfortunately functools.partial only binds left arguments and rmod is needed here. One alternative is to use bound methods as a first-class function, even dunder methods.

In [5]:
def primes():
    yield 2
    ints = itertools.count(3, 2)
    while True:
        prime = next(ints)
        yield prime
        ints = filter(prime.__rmod__, ints)

%time next(itertools.islice(primes(), 1000, None))
CPU times: user 30.7 ms, sys: 1.82 ms, total: 32.5 ms
Wall time: 32 ms
Out[5]:
7927

Elegant, but slow and could overflow the stack. A more traditional approach would use the same checking logic as isprime, but also cache the primes so as to not duplicate divisors.

In [6]:
def primes():
    yield 2
    primes = []
    for n in itertools.count(3, 2):
        if all(n % p for p in itertools.takewhile(int(n ** 0.5).__ge__, primes)):
            primes.append(n)
            yield n

%time next(itertools.islice(primes(), 1000, None))
CPU times: user 5.49 ms, sys: 423 µs, total: 5.92 ms
Wall time: 5.8 ms
Out[6]:
7927

Onto interface design. The primes are being stored anyway, so it would be nice if they were re-iterable. A generator can be written as a class with __iter__ and __next__, but an under-appreciated feature is that __iter__ itself can be a generator. And now that it's a class, isprime can be expressed as in while also benefiting from the cache.

In [7]:
class Primes:
    def __init__(self):
        self.ints = itertools.count(3, 2)
        self.cache = [2]
    
    def __iter__(self):
        yield from self.cache
        for n in self.ints:
            if n in self:
                self.cache.append(n)
                yield n

    def __contains__(self, n):
        return all(n % p for p in itertools.takewhile(int(n ** 0.5).__ge__, self))

primes = Primes()
%time next(itertools.islice(primes, 1000, None))
CPU times: user 7.89 ms, sys: 483 µs, total: 8.37 ms
Wall time: 8 ms
Out[7]:
7927
In [8]:
%time 1_000_003 in primes
CPU times: user 34 µs, sys: 0 ns, total: 34 µs
Wall time: 37 µs
Out[8]:
True

There's a hybrid approach though, that's faster and nearly as simple as the above sieves. Instead of doing repeated divisions, keep track of each found prime along with the next multiple that it would eliminate. The inner loop is then optimized because it only needs to account for collisions.

In [9]:
def primes():
    multiples = {}
    for n in itertools.count(2):
        prime = multiples.pop(n, 0)
        if not prime:
            prime = n
            yield n
        key = n + prime
        while key in multiples:
            key += prime
        multiples[key] = prime

%time next(itertools.islice(primes(), 1000, None))
CPU times: user 2.59 ms, sys: 103 µs, total: 2.69 ms
Wall time: 2.7 ms
Out[9]:
7927

Now to add back the odds-only optimization, the step scalar needs to be double the prime number. Another way to reduce collisions is to recognize that each new prime is irrelevant until its square value is reached.

In [10]:
def primes():
    yield 2
    multiples = {}
    for n in itertools.count(3, 2):
        step = multiples.pop(n, 0)
        if step:  # composite
            key = n + step
            while key in multiples:
                key += step
            multiples[key] = step
        else:  # prime
            multiples[n ** 2] = n * 2
            yield n

%time next(itertools.islice(primes(), 1000, None))
CPU times: user 1.37 ms, sys: 5 µs, total: 1.38 ms
Wall time: 1.38 ms
Out[10]:
7927

And finally let's add back the caching. Yielding a clean interface, an efficient implementation for all use cases, and still relatively simple.

In [11]:
class Primes:
    def __init__(self):
        self.ints = itertools.count(3, 2)
        self.cache = [2]
        self.multiples = {}
    
    def __iter__(self):
        yield from self.cache
        for n in self.ints:
            step = self.multiples.pop(n, 0)
            if step:  # composite
                key = n + step
                while key in self.multiples:
                    key += step
                self.multiples[key] = step
            else:  # prime
                self.multiples[n ** 2] = n * 2
                self.cache.append(n)
                yield n

    def __contains__(self, n):
        return all(n % p for p in itertools.takewhile(int(n ** 0.5).__ge__, self))

primes = Primes()
%time 1_000_003 in primes
CPU times: user 242 µs, sys: 0 ns, total: 242 µs
Wall time: 245 µs
Out[11]:
True
In [12]:
%time 1_000_003 in primes
CPU times: user 40 µs, sys: 0 ns, total: 40 µs
Wall time: 43.2 µs
Out[12]:
True

Hardest Logic Puzzle Ever

How to solve the Hardest Logic Puzzle Ever programmatically.

Three gods A, B, and C are called, in no particular order, True, False, and Random. True always speaks truly, False always speaks falsely, but whether Random speaks truly or falsely is a completely random matter. Your task is to determine the identities of A, B, and C by asking three yes-no questions; each question must be put to exactly one god. The gods understand English, but will answer all questions in their own language, in which the words for yes and no are da and ja, in some order. You do not know which word means which.

With 3 binary questions, it's possible to distinguish $2^3 = 8$ scenarios. And there are $3! = 6$ possibilities. Note that means it's impossible to additionally identify what da and ja mean, as that would be $3! * 2 = 12$ possibilities.

As always, start with modeling the data. We need a ternary enumeration for the god identifiers. It's almost a boolean anyway, so let's use None to indicate neither, i.e., Random. To represent the identities, a mapping from names to ids would be natural. But the mapping has to be one-to-one and onto, and using an immutable key is convenient, so an implicitly ordered tuple of names is also a valid choice. Here a named tuple represents the "state" of the world .

In [1]:
import itertools
from typing import NamedTuple

IDS = (False, True, None)
NAMES = 'ABC'
WORDS = ('da', 'ja')

class State(NamedTuple):
    names: str  # order corresponds to IDS
    yes: str

STATES = list(itertools.starmap(State, itertools.product(map(''.join, itertools.permutations(NAMES)), WORDS)))
STATES
Out[1]:
[State(names='ABC', yes='da'),
 State(names='ABC', yes='ja'),
 State(names='ACB', yes='da'),
 State(names='ACB', yes='ja'),
 State(names='BAC', yes='da'),
 State(names='BAC', yes='ja'),
 State(names='BCA', yes='da'),
 State(names='BCA', yes='ja'),
 State(names='CAB', yes='da'),
 State(names='CAB', yes='ja'),
 State(names='CBA', yes='da'),
 State(names='CBA', yes='ja')]

Now to model asking a question. A typical approach would take input parameters relevant to the question, such as asking a god which one they are. And the current reality would be need to be encapsulated to answer the question.

In [2]:
def ask(self: State, name: str, id) -> str:
    """Ask `name`: are you `id`?"""

The problem with that approach is Random's answer would have to be modeled as actually random, which would require running many simulations. Since this is a logic puzzle, it's easier to model Random's answer as non-deterministic, i.e., could answer either way. The question will take as input the current set of possible states, splitting the states into two groups corresponding to answers da or ja. This function will be used in a search algorithm anyway, so it's effectively participating in the search.

In [3]:
def ask(name: str, id, states: set) -> dict:
    """Ask `name`: are you `id`? and return mapping of answers to possible states."""
    groups = {word: set() for word in WORDS}
    for state in states:
        identity = IDS[state.names.index(name)]
        truth = identity == id
        words = sorted(WORDS, key=state.yes.__eq__)
        if identity in (True, None):
            groups[words[truth]].add(state)
        if identity in (False, None):
            groups[words[not truth]].add(state)
    return groups

ask('A', None, STATES)
Out[3]:
{'da': {State(names='ABC', yes='da'),
  State(names='ACB', yes='da'),
  State(names='BAC', yes='ja'),
  State(names='BCA', yes='da'),
  State(names='BCA', yes='ja'),
  State(names='CAB', yes='ja'),
  State(names='CBA', yes='da'),
  State(names='CBA', yes='ja')},
 'ja': {State(names='ABC', yes='ja'),
  State(names='ACB', yes='ja'),
  State(names='BAC', yes='da'),
  State(names='BCA', yes='da'),
  State(names='BCA', yes='ja'),
  State(names='CAB', yes='da'),
  State(names='CBA', yes='da'),
  State(names='CBA', yes='ja')}}

To determine if a question is making progress, we need to look at the set of names returned for each answer. A valid solution would need to output sets of at most size $2^2 = 4$ on the first question in order to proceed.

In [4]:
for id in IDS:
    groups = ask('A', id, STATES)
    identities = [{names for names, _ in group} for group in groups.values()]
    print(id, *identities)
False {'BAC', 'CBA', 'CAB', 'ABC', 'ACB', 'BCA'} {'CAB', 'CBA', 'BAC', 'ABC', 'ACB', 'BCA'}
True {'CAB', 'CBA', 'BAC', 'ABC', 'ACB', 'BCA'} {'BAC', 'CBA', 'CAB', 'ABC', 'ACB', 'BCA'}
None {'BAC', 'CBA', 'CAB', 'ABC', 'ACB', 'BCA'} {'CAB', 'CBA', 'BAC', 'ABC', 'ACB', 'BCA'}

So that question made no progress. Unsurprising given that we don't know which god is "Random". Here one could resort to heuristics and increasingly convoluted questions.

Let's do the opposite: write the most general question possible and automate the search. Any question can be modeled by asking whether any of a given set of states is accurate.

In [5]:
def ask(name: str, include: set, exclude: set) -> dict:
    """Ask `name`: are we in any of `include` states? and return mapping of answers to possible states."""
    assert include.isdisjoint(exclude)
    groups = {word: set() for word in WORDS}
    for state in include | exclude:
        identity = IDS[state.names.index(name)]
        truth = state in include
        words = sorted(WORDS, key=state.yes.__eq__)
        if identity in (True, None):
            groups[words[truth]].add(state)
        if identity in (False, None):
            groups[words[not truth]].add(state)
    return groups

include = set(STATES[:len(STATES) // 2])
ask('A', include, set(STATES) - include)        
Out[5]:
{'da': {State(names='ABC', yes='ja'),
  State(names='ACB', yes='ja'),
  State(names='BAC', yes='da'),
  State(names='BCA', yes='da'),
  State(names='BCA', yes='ja'),
  State(names='CAB', yes='ja'),
  State(names='CBA', yes='da'),
  State(names='CBA', yes='ja')},
 'ja': {State(names='ABC', yes='da'),
  State(names='ACB', yes='da'),
  State(names='BAC', yes='ja'),
  State(names='BCA', yes='da'),
  State(names='BCA', yes='ja'),
  State(names='CAB', yes='da'),
  State(names='CBA', yes='da'),
  State(names='CBA', yes='ja')}}

With that, the power set of all possible combinations can be searched.

In [6]:
def powerset(states: set):
    """Generate all possible subsets."""
    for r in range(len(states) + 1):
        yield from map(set, itertools.combinations(states, r))

count = 0
for states in powerset(STATES):
    groups = ask('A', states, set(STATES) - states)
    identities = [{names for names, _ in group} for group in groups.values()]
    count += max(map(len, identities)) <= 4
count
Out[6]:
96

So there are many potential solutions. Onto automating the search.

In [7]:
def search(states: set, count: int = 3):
    """Recursively ask all possible questions until a solution is found."""
    identities = {names for names, _ in states}
    if len(identities) == 1:  # solved
        return identities.pop()
    if not count or len(identities) > (2 ** count):  # impossible
        return None
    for name, subset in itertools.product(NAMES, powerset(states)):
        groups = ask(name, subset, states - subset)
        solutions = [search(group, count - 1) for group in groups.values()]
        if all(solutions):
            return name, subset, solutions

search(set(STATES[:2]), 0)
Out[7]:
'ABC'
In [8]:
search(set(STATES[:4]), 1)
Out[8]:
('A',
 {State(names='ABC', yes='da'), State(names='ACB', yes='ja')},
 ['ACB', 'ABC'])
In [9]:
search(set(STATES[:6]), 2)
Out[9]:
('A',
 {State(names='ABC', yes='da'), State(names='ACB', yes='ja')},
 [('A', {State(names='ACB', yes='da')}, ['BAC', 'ACB']),
  ('A', {State(names='ABC', yes='ja')}, ['ABC', 'BAC'])])

So far, so good. The output is binary tree specifying the addressed god and the states asked about at each node.

The sub-problems are solving a sufficient number of cases. It's no surpise that there should be solutions asking "A" first since it can't matter who gets the first question. Now for the real solution.

In [10]:
%time search(set(STATES))
CPU times: user 1.29 s, sys: 10.8 ms, total: 1.3 s
Wall time: 1.3 s
Out[10]:
('A',
 {State(names='ABC', yes='da'),
  State(names='ACB', yes='ja'),
  State(names='BAC', yes='ja'),
  State(names='CAB', yes='da')},
 [('C',
   {State(names='ACB', yes='ja'),
    State(names='BCA', yes='da'),
    State(names='CAB', yes='da'),
    State(names='CBA', yes='ja')},
   [('B',
     {State(names='BCA', yes='ja'), State(names='CBA', yes='ja')},
     ['BCA', 'CBA']),
    ('A',
     {State(names='ACB', yes='da'), State(names='CAB', yes='da')},
     ['CAB', 'ACB'])]),
  ('B',
   {State(names='ABC', yes='da'),
    State(names='BAC', yes='da'),
    State(names='BCA', yes='ja'),
    State(names='CBA', yes='ja')},
   [('B',
     {State(names='ABC', yes='da'), State(names='BCA', yes='da')},
     ['ABC', 'BCA']),
    ('B',
     {State(names='BAC', yes='ja'), State(names='CBA', yes='ja')},
     ['BAC', 'CBA'])])])

The puzzle is solved. Can it be simplified? Depends on your point of view.

Canonical solutions introduce biconditionals or conterfactuals in an attempt to collapse the output possibilities. This is ultimately hopeless though, as the solution is a binary search tree regardless. Is asking a question of the form "If I asked you ..., would just say ja" actually clearer than asking "Are we in any of these possibilities: ..."?

Nonetheless patterns do emerge:

  • The first question revolves around ruling out "Random".
  • Subsequent questions address non-"Random" gods.

The random behavior adds true complexity to the algorithm, whereas the the boolean encoding adds arbitrary complications.

GraphQL - ORM

GraphQL is the new ORM.

REST and ORMs are both infamous for:

  • over-fetching: fetching more data than is needed per request
  • under-fetching: fetching less data than is needed, requiring multiple requests
  • select N+1 problem: under-fetching applied to multiple associated objects

GraphQL aims to overcome REST's shortcomings through a flexible query language, and succeeds in doing so on the client side. But on the server side, GraphQL resolvers have effectively recreated the same over- and under- fetching problems that have long plagued ORMs. The fact that ORMs remain popular despite of their inefficiency is a testament to the benefits of having in-memory objects behave consistently. There is no such trade-off for server-side GraphQL, where the only point of the objects is to be immediately serialized.

The so-called N+1 problem is generally acknowledged in the GraphQL community, but this article will argue only the symptoms are being addressed with workarounds like dataloader.

Example

The problems can be seen immediately in GraphQL's own introductory resolver example.

Query: {
  human(obj, args, context, info) {
    return context.db.loadHumanByID(args.id).then(
      userData => new Human(userData)
    )
  }
}

Human: {
  name(obj, args, context, info) {
    return obj.name
  }
}

What makes the name resolver trivial? It's pre-fetched by loadHumanByID, whose only parameter is id, so it's clearly unaware of whether name has been requested. What if the requested field was a nested object, or a json field, or just a large text blob? Then one would clearly be directed towards using a non-trivial resolver which fetches the field on demand. Of course, but then whatever work is common in the human id lookup is duplicated.

This is by no means specific to SQL or relational databases, but SQL is a convenient lingua franca of databases to demonstrate the inefficiency. The choices are:

  • SELECT * FROM human WHERE id = ?
  • SELECT field FROM human WHERE id = ? repeated for each "expensive" field

Even in the simplest possible example, over-fetching has already occurred, and the only proposed workaround is under-fetching. The single query we want is:

  • SELECT f1, f2, ... FROM human WHERE id = ? for requested fields

In other words, exactly what happens with ORMs, except even ORMs typically offer an option of requesting a subset of fields. Naturally the problem gets worse with list fields.

Human: {
  appearsIn(obj) {
    return obj.appearsIn // returns [ 4, 5, 6 ]
  }
}

Human: {
  starships(obj, args, context, info) {
    return obj.starshipIDs.map(
      id => context.db.loadStarshipByID(id).then(
        shipData => new Starship(shipData)
      )
    )
  }
}

Now there are two sets of associate keys (.appearsIn and .starshipIDs) that have been over-fetched. Nonetheless starships is under-fetched as well. The starships resolver is neither the most efficient nor the simplest way of resolving this field:

  • fetch all the data by human id in the starships resolver
  • fetch all the data in bulk by starshipIDs
  • push the resolution down to the Starship layer if forced to fetch one at a time

The example implementation seems to be going out of its way to showcase JavaScript promises. And the assumptions being made about the underlying data store are unusual:

  1. The data is relational in nature.
  2. Associative keys have neglible cost to pre-fetch.
  3. But joins are not available.
  4. And neither are primary key in queries.

Workaround

From GraphQL's best practices

GraphQL is designed in a way that allows you to write clean code on the server, where every field on every type has a focused single-purpose function for resolving that value. However without additional consideration, a naive GraphQL service could be very "chatty" or repeatedly load data from your databases.

This is commonly solved by a batching technique, where multiple requests for data from a backend are collected over a short period of time and then dispatched in a single request to an underlying database or microservice by using a tool like Facebook's DataLoader.

That's an understatement. It's not clear how "naive" differs from best practice.

Clearly there is value in transforming multiple primary key = queries into a single primary key in query. As in the starships example however, that can be done more simply in the parent resolver. There is more value in not needing a primary key in query at all. Furthermore adding caching to a dataloader avoids the central issue.

Again reminiscent of ORMs, any data layer can add caching. The point is efficiently resolving a query requires context, which strict adherence to single-purpose resolvers explictly disregards.

Aggregation

The inefficencies becomes even more glaring when moving beyond associative keys. Nearly any aggregation requires knowing what summaries are requested. Such as if the appearsIn field optionally included counts or times. Using SQL as an example again, the query would resemble one of:

  • SELECT distinct field FROM ...
  • SELECT field, count(*) FROM ... GROUP BY field

The conditional logic if "count" in requested_fields must exist in some form in the code, because the alternatives are over-fetching or under-fetching. Both of which are far more inefficient in this scenario than in the "select N+1" problem. A dataloader-esque approach is not going to be applicable to "group by" operations.

Computation

One last generalized example: computed fields. What are typically query flags in REST (and RPC) APIs, become field selections in GraphQL.

For example, computing scores in a search engine interface. Instead of a scores: Boolean! = false input option, the more obvious approach would be to skip score calculation when the score field isn't requested.

As with aggregation, the same pattern recurs. It's unacceptable to over-fetch, i.e., compute the scores when not needed. It's worse still to under-fetch, i.e., run some sort of lean search that will find matches and then go back and compute scores later.

Solution

The inescapable conclusion is that in some cases parent resolvers need to know which of their children are being requested. There's no need to throw away GraphQL's server-side resolver hierarchy. No one is advocating a thousand line root resolver named RunIt that processes the entire query.

All that's needed is a conceptual shift which encourages introspecting the GraphQLResolveInfo object. The requested fields are right there waiting to be useful, but good luck finding documentation and examples to that effect. In every non-trivial GraphQL project, this author has used a utility like:

In [ ]:
def selections(node):
    """Return tree of field name selections."""
    nodes = getattr(node.selection_set, 'selections', [])
    return {node.name.value: selections(node) for node in nodes}

Those fields would be checked or passed to a data layer as needed. For example, in Django's ORM, it could be as simple as appending a query set with .values(*selections(*info.nodes)).

Well, almost. The next issue is that typical GraphQL model validators raise an error if required fields are missing. Thanks validator; the field is missing because the client didn't request it.

This is actually a different age-old problem: equivocating "optional" and "nullable". GraphQL requires populating all requested fields, and specifiying whether they may be null. Server-side implementations understandably, but still incorrectly, interpret that by making nullables optional and non-nullables required at the model layer. So typically it's necessary to pad resolved objects with empty (but not null) data.

Although a minor problem, it reveals the bias related to single-purpose resolvers. The point of GraphQL is to efficiently return only the requested fields, yet standard practice in GraphQL models is to require populating fields that haven't been requested.

Over- and under- fetching can be addressed directly in resolvers, with the data layer's own interface, instead of hidden behind another abstraction layer.

Closing files

Contrarian view on closing files.

It has become conventional wisdom to always explicitly close file-like objects, via context managers. The google style guide is representative:

Explicitly close files and sockets when done with them. Leaving files, sockets or other file-like objects open unnecessarily has many downsides, including:

They may consume limited system resources, such as file descriptors.

  • Code that deals with many such objects may exhaust those resources unnecessarily if they're not returned to the system promptly after use.
  • Holding files open may prevent other actions being performed on them, such as moves or deletion.
  • Files and sockets that are shared throughout a program may inadvertantly be read from or written to after logically being closed. If they are actually closed, attempts to read or write from them will throw exceptions, making the problem known sooner.

Furthermore, while files and sockets are automatically closed when the file object is destructed, tying the life-time of the file object to the state of the file is poor practice, for several reasons:

  • There are no guarantees as to when the runtime will actually run the file's destructor. Different Python implementations use different memory management techniques, such as delayed Garbage Collection, which may increase the object's lifetime arbitrarily and indefinitely.
  • Unexpected references to the file may keep it around longer than intended (e.g. in tracebacks of exceptions, inside globals, etc).

The preferred way to manage files is using the "with" statement:

with open("hello.txt") as hello_file:
    for line in hello_file:
        print line

In theory

Good points, and why limit this advice to file descriptors? Any resource may be limited or require exclusivity; that's why they're called resources. Similarly one should always explicitly call dict.clear when finished with a dict. After all, "there are no guarantees as to when the runtime will actually run the <object's> destructor. And "code that deals with many such objects may exhaust those resources unnecessarily", such as memory, or whatever else is in the dict.

But in all seriousness, this advice is applying a notably higher standard of premature optimization to file descriptors than to any other kind of resource. There are plenty of Python projects that are guaranteed to run on CPython for a variety of reasons, where destructors are immediately called. And there are plenty of Python projects where file descriptor usage is just a non-issue. It's now depressingly commonplace to see this in setup.py files:

In [ ]:
with open("README.md") as readme:
    long_description = readme.read()

Let's consider a practical example: a load function which is supposed to read and parse data given a file path.

In [ ]:
import csv
import json

def load(filepath):
    """the supposedly bad way"""
    return json.load(open(filepath))

def load(filepath):
    """the supposedly good way"""
    with open(filepath) as file:
        return json.load(file)

def load(filepath):
    """with a different file format"""
    with open(filepath) as file:
        return csv.reader(file)

Which versions work correctly? Are you sure? If it's not immediately obvious why one of these is broken, that's the point. In fact, it's worth trying out before reading on.

...

The csv version returns an iterator over a closed file. It's a violation of procedural abstraction to know whether the result of load is lazily evaluated or not; it's just supposed to implement an interface. Moreover, according to this best practice, it's impossible to write the csv version correctly. As absurd as it sounds, it's just an abstraction that can't exist.

Defiantly clever readers are probably already trying to fix it. Maybe like this:

In [ ]:
def load(filepath):
    with open(filepath) as file:
        yield from csv.reader(file)

No, it will not be fixed. This version only appears to work by not closing the file until the generator is exhausted or collected.

This trivial example has deeper implications. If one accepts this practice, then one must also accept that storing a file handle anywhere, such as on an instance, is also disallowed. Unless of course that object then virally implements it owns context manager, ad infinitum.

Furthermore it demonstrates that often the context is not being managed locally. If a file object is passed another function, then it's being used outside of the context. Let's revisit the json version, which works because the file is fully read. Doesn't a json parser have some expensive parsing to do after it's read the file? It might even throw an error. And isn't it desirable, trivial, and likely that the implementation releases interest in the file as soon as possible?

So in reality there are scenarios where the supposedly good way could keep the file open longer than the supposedly bad way. The original inline version does exactly what it's supposed to do: close the file when all interested parties are done with it. Python uses garbage collection to manage shared resources. Any attempt to pretend otherwise will result in code that is broken, inefficient, or reinventing reference counting.

A true believer now has to accept that json.load is a useless and dangerous wrapper, and that the only correct implementation is:

In [ ]:
def load(filepath):
    with open(filepath) as file:
        contents = file.read()
    return json.loads(contents)

This line of reasoning reduces to the absurd: a file should never be passed or stored anywhere. Next an example where the practice has caused real-world damage.

In practice

Requests is one of the most popular python packages, and officially recommended. It includes a Session object which supports closing via a context manager. The vast majority of real-world code uses the the top-level functions or single-use sessions.

In [ ]:
response = requests.get(...)

with requests.Session() as session:
    response = session.get(...)

Sessions manage the connection pool, so this pattern of usage is establishing a new connection every time. There are popular standard API clients which seriously do this, for every single request to the same endpoint.

Requests' documentation prominently states that "Keep-alive and HTTP connection pooling are 100% automatic". So part of the blame may lay with that phrasing, since it's only "automatic" if sessions are reused. But surely a large part of the blame is the dogma of closing sockets, and therefore sessions, explicitly. The whole point of a connection pool is that it may leave connections open, so users who genuinely need this granularity are working at the wrong abstraction layer. http.client is already builtin for that level of control.

Tellingly, requests' own top-level functions didn't always close sessions. There's a long history to that code, including a version that only closed sessions on success. An older version was causing warnings, when run to check for such warnings, and was being blamed for the appearance of leaking memory. Those threads are essentially debating whether a resource pool is "leaking" resources.

Truce

Prior to with being introduced in Python 2.5, it was not recommended that inlined reading of a file required a try... finally block. Far from it, in the past idioms like open(...).read() and for line in open(...) were lauded for being succinct and expressive. But if all this orphaned file descriptor paranoia was well-founded, it would have been a problem back then too.

Finally, let's address readability. It could be argued (though it rarely is) that showing the reader when the file is closed has inherent value. Conveniently, that tends to align with having opened the file for writing anyway, thereby needing an reference to it. In which case, the readability is approximately equal, and potential pitfalls are more realistic. But readability is genuinely lost when the file would have been opened in a inline expression.

The best practice is unjustifiably special-casing file descriptors, and not seeing its own reasoning through to its logical conclusion. This author proposes advocating for anonymous read-only open expressions. Your setup script is not going to run out of file descriptors because you wrote open("README.md").read().

Mutable defaults

Contrarian view on mutable default arguments.

The use of mutable defaults is probably the most infamous Python gotcha. Default values are evaluated at definition time, which means mutating them will be persistent across multiple calls. Many articles on this topic even use the same append example.

In [1]:
def append_to(element, to=[]):
    to.append(element)
    return to

append_to(0)
Out[1]:
[0]
In [2]:
append_to(1)
Out[2]:
[0, 1]

And the solution is invariably to use None instead, and convert as needed.

In [3]:
def append_to(element, to=None):
    if to is None:
        to = []
    to.append(element)
    return to

There is another solution to the problem of mutating a default value: don't do that. More specifically, the problem isn't using mutables as defaults; the problem is actually mutating them.

If the input from the caller is being mutated, then the caller doesn't need it returned because the caller already has a reference. This distinction is explicitly encouraged in Python, e.g., list.sort vs. sorted. But it follows that if the input doesn't need to be returned, then there's no point in the input being optional. How would the caller know the difference?

The reason why examples like the fluent append seem so contrived is because they are. No one actually wants a function named append to take one argument. The realistic fix would be:

In [4]:
def append_to(element, to):
    to.append(element)
    return to

Sure, there are rare occassions where a parameter is mutable but optional, such as a recursive algorithm that's implicitly passing around its own cache. But this author would wager that given any real-world code that's been bitten by this gotcha there is:

  • a ~90% chance the function would have a related bug if defaults were evaluated at runtime
  • a ~95% chance the function has a poor interface

What harm does this advice do? Well, it's caused an over-reaction resulting in using None as the only default, even for immutables. It's so prevalent that it appears many beginners believe using None is the one and only way of making an argument optional.

Besides immutable types, there are also cases where mutation is irrelevant. Consider the following example adapted from a popular project.

In [5]:
from typing import List

def __init__(self, alist: List = None):
    self.alist = [] if alist is None else list(alist)

Notice that the correctness of this code relies on the member list being newly created in either case. What could possibly go wrong with:

In [6]:
def __init__(self, alist: List = []):
    self.alist = list(alist)

Or better yet, why not support the more liberal interface.

In [7]:
from typing import Iterable

def __init__(self, alist: Iterable = ()):
    self.alist = list(alist)

The point is that there are at least 4 solutions to this problem:

  1. use mutable defaults, but don't mutate them
  2. use immutable substitute defaults with a compatible interface
  3. use None for mutables, and matching types for immutables
  4. use None for all defaults

Only #1 is even remotely controversial, yet somehow the status quo has landed on #4. Besides being needlessly verbose, it has another pitfall. Python doesn't natively support detecting where the argument was actually passed; a sentinel default is required for that. The implementation detail is leaking through the interface, indicating to the caller that None is an acceptable argument to pass explicitly. As if the type hint was Optional[List], even though that's not the intention. Factor in using **kwargs - which clearly doesn't want data padded with nulls - and actual code breakage can result.

Presumably the disdain for option #1 is because it might encourage the gotcha. But it's disingenous to just let that go unsaid. The implementer is responsible for writing correct code, and the caller sees the right interface. The speculation is that beginners will read code which uses mutable defaults but doesn't mutate them, and follow the former pattern but not the latter.

As a community, let's at least push towards option #3. Using empty strings and zeros as defaults is all upside.

Water pouring

How to solve the water pouring puzzle programmatically.

Given two jugs of capcity of 3 and 5 liters, acquire exactly 4 liters in a jug. Assume an unlimited water supply, and that jugs can only be filled or emptied, i.e., no estimations.

First to model the data: a mapping of jug sizes to their current quantity. There are 3 primitive operations:

  • filling a jug to capacity
  • emptying a jug entirely
  • pouring from a source jug to a destination jug, until either the source is emptied or the destination is full
In [1]:
class Jugs(dict):
    def fill(self, size):
        self[size] = size

    def empty(self, size):
        self[size] = 0
    
    def pour(self, src, dest):
        total = self[src] + self[dest]
        self[src] = max(total - dest, 0)
        self[dest] = min(total, dest)

That's sufficient to solve the puzzle through sheer brute force: scanning every possible combination breadth-first. Note the below implementations don't terminate unless a solution is found.

In [2]:
import itertools
from functools import partial

sizes = 3, 5
operations = list(itertools.chain(
    (partial(Jugs.fill, size=size) for size in sizes),
    (partial(Jugs.empty, size=size) for size in sizes),
    (partial(Jugs.pour, src=src, dest=dest)
         for src, dest in itertools.permutations(sizes, 2)),
))

def search(target):
    for n in itertools.count(1):
        for ops in itertools.product(operations, repeat=n):
            jugs = Jugs.fromkeys(sizes, 0)
            states = [op(jugs) or tuple(jugs.values()) for op in ops]
            if any(target in state for state in states):
                return states

%time search(4)
CPU times: user 138 ms, sys: 2.28 ms, total: 140 ms
Wall time: 144 ms
Out[2]:
[(0, 5), (3, 2), (0, 2), (2, 0), (2, 5), (3, 4)]

Now to relentlessly simplify the code. The first observation is that an empty source is useless, as is a full destination. So the fill and empty primitives are actually unneeded, and can be integrated into the pour method.

In [3]:
def pour(jugs, src, dest):
    if not jugs[src]:
        jugs[src] = src
    if jugs[dest] >= dest:
        jugs[dest] = 0
    total = jugs[src] + jugs[dest]
    jugs[src] = max(total - dest, 0)
    jugs[dest] = min(total, dest)

operations = [partial(pour, src=src, dest=dest) 
               for src, dest in itertools.permutations(sizes, 2)]

%time search(4)
CPU times: user 87 µs, sys: 0 ns, total: 87 µs
Wall time: 88.7 µs
Out[3]:
[(3, 2), (2, 0), (3, 4)]

That reduced the search space considerably, but moreover has revealed another simplification: it's pointless to "undo" a pour. Whether the source was emptied or the destination was filled, whatever reversing the previous pour direction would accomplish could have been done in the first place.

If there were more than 2 jugs, then there could be complex workflows. But with only 2, the first choice in pour directions determines the rest. There are only 2 potential solutions to the puzzle.

In [4]:
def search(target, src, dest):
    jugs = dict.fromkeys((src, dest), 0)
    while True:
        pour(jugs, src, dest)
        yield tuple(jugs.values())
        if target in jugs.values():
            return

list(search(4, 5, 3))
Out[4]:
[(2, 3), (0, 2), (4, 3)]
In [5]:
list(search(4, 3, 5))
Out[5]:
[(0, 3), (1, 5), (0, 1), (0, 4)]

And both of them are valid solutions. The solution to the puzzle is quite simply: keep pouring. It doesn't even matter which to start with.

But it can be further simplified. Now it's clear that the jug data structure is only providing modular arithmetic.

In [6]:
def search(target, src, dest):
    for n in itertools.count(1):
        quot, rem = divmod(n * src - target, dest)
        if not rem:
            return n, -quot

search(4, 5, 3)
Out[6]:
(2, -2)
In [7]:
search(4, 3, 5)
Out[7]:
(3, -1)
In [8]:
assert (5 * 2) - (3 * 2) == (3 * 3) - (5 * 1) == 4

The puzzle is looking for integer solutions to $$ \ 3x + 5y = 4 $$ Which is known as a linear Diophantine equation, and must have a solution because $4$ is a multiple of $gcd(3, 5)$.

Coin balance

How to solve the coin balance puzzle programmatically.

Given a balance and a set of coins, which are all equal in weight except for one, determine which coin is of different weight in as few weighings as possible.

Twelve-coin problem

A more complex version has twelve coins, eleven or twelve of which are identical. If one is different, we don't know whether it is heavier or lighter than the others. This time the balance may be used three times to determine if there is a unique coin—and if there is, to isolate it and determine its weight relative to the others. (This puzzle and its solution first appeared in an article in 1945.[2]) The problem has a simpler variant with three coins in two weighings, and a more complex variant with 39 coins in four weighings.

First to model the data:

  • An enum to represent different weights. Following the ternary comparison convention, such as Python 2's cmp, is convenient.
  • An object to represent the balance. For testing, it will need to be configurable with the target coin and relative weight.
  • An object to represent a coin and its state. A class is tempting, but the most useful data structure would keep the coins grouped by their known (or unknown) state anyway. So any hashable unique identifier is sufficient.
In [1]:
import enum

class Weight(enum.IntEnum):
    LIGHT = -1
    EVEN = 0
    HEAVY = 1

class Balance:
    def __init__(self, coin, weight: Weight):
        self.coin = coin
        self.weight = weight

    def weigh(self, left: set, right: set):
        """Return relative Weight of left side to right side."""
        assert len(left) == len(right)
        if self.coin in left:
            return self.weight
        if self.coin in right:
            return Weight(-self.weight)
        return Weight.EVEN

coins = 'abcdefghijkl'
assert len(coins) == 12
balance = Balance('a', Weight.LIGHT)
assert balance.weigh('a', 'b') == Weight.LIGHT
assert balance.weigh('b', 'c') == Weight.EVEN
assert balance.weigh('b', 'a') == Weight.HEAVY

As is typical with induction puzzles, the constants chosen are just large enough to thwart an iterative approach. The 2 weighing variation would be trivial enough for most people to brute force the solution. Whereas 4 weighings would already be such a large decision tree, it would be tedious to even output. The easier approach is solve the puzzle recursively and more generally, for any number of coins and weighings.

So what can be done in a single weighing? Clearly all weighings must have an equal number of coins on each side, else nothing is learned. If it balances, then the different coin is in the unweighed group. If it doesn't balance, then the different coin is in the weighed group, but additionally it is known whether each coin would be heavy or light based on which side it was on. This is the crucial insight: there's a variant recursive puzzle embedded inside this puzzle.

The Towers of Hanoi is a classic puzzle often used in computer science curricula to teach recursion. This one would suitable as a subsequent more advanced problem.

So what can be done with known coins in a single weighing? If it balances, then as before the different coin is in the unweighed group. But if it doesn't balance, then which way can be used to further narrow the coins. Consider the heavier side; the different coin must be one of the heavy ones on that side, or one of the light ones on the other side. Therefore the coins can be split into 2 equal sized groups by putting equal numbers of heavy coins on each side, and equal numbers of light coins on each side. One obstacle is that if there aren't an even number, there will need to be filler coins just to balance. But that won't be a problem after the first weighing.

Now we can implement a solution to the sub-problem, and build the need for filler coins into the balance implementation. A generator is used so that the output of each weighing can be displayed.

In [2]:
import itertools

class Balance:
    def __init__(self, coin, weight: Weight):
        self.coin = coin
        self.weight = weight
        self.filler = set()

    def weigh(self, left: set, right: set):
        """Return relative Weight of left side to right side."""
        assert abs(len(left) - len(right)) <= len(self.filler)
        if self.coin in left:
            return self.weight
        if self.coin in right:
            return Weight(-self.weight)
        return Weight.EVEN

    def find(self, light: set, heavy: set):
        """Recursively find target coin from sets of potentially light and heavy coins."""
        yield light, heavy
        union = light | heavy
        if len(union) <= 1:
            return
        left, right = set(), set()
        # split into 3 groups
        for start, third in enumerate([left, right]):
            for group in (light, heavy):
                third.update(itertools.islice(group, start, None, 3))
        weight = self.weigh(left, right)
        if weight < 0:
            light, heavy = (light & left), (heavy & right)
        elif weight > 0:
            light, heavy = (light & right), (heavy & left)
        else:
            light, heavy = (light - left - right), (heavy - left - right)
        self.filler.update(union - light - heavy)
        yield from self.find(light, heavy)

balance = Balance('a', Weight.LIGHT)
for light, heavy in balance.find(set('abc'), set('def')):
    print(''.join(light), ''.join(heavy))
cba dfe
a e
a 

Now with the sub-problem solved, there's just one thing missing for the main puzzle. In the known case, splitting into 3 equal sized groups is cleary optimal. But in the unknown case, we need to know how many coins to exclude from the weighing. This requires computing how many coins can be handled in the subsolution. Luckily it's a trivial recurrence relation: n weighings can solve 3 times the number of n - 1 weighings. $$ \prod_{}^n 3 = 3^n $$

In [3]:
class Balance(Balance):
    def solve(self, count: int, coins):
        """Recursively find target coin."""
        if count <= 0:
            return
        weigh = set(itertools.islice(coins, 3 ** (count - 1) - (not self.filler)))
        exclude = set(coins) - weigh
        left, right = (set(itertools.islice(weigh, start, None, 2)) for start in range(2))
        weight = self.weigh(left, right)
        self.filler.update(exclude if weight else weigh)
        if weight < 0:
            yield from self.find(left, right)
        elif weight > 0:
            yield from self.find(right, left)
        else:
            yield from self.solve(count - 1, exclude)

balance = Balance('a', Weight.LIGHT)
for light, heavy in balance.solve(3, coins):
    print(''.join(light), ''.join(heavy))

for coin in coins:
    light, heavy =  list(Balance(coin, Weight.LIGHT).solve(3, coins))[-1]
    assert light == {coin} and not heavy
    light, heavy =  list(Balance(coin, Weight.HEAVY).solve(3, coins))[-1]
    assert not light and heavy == {coin}
dbah fceg
a e
a 

The puzzle is solved. There's one last simplifcation that can be made, but requires a bit more math background. Ideally we wouldn't need to know the objective number of weighings; the algorithm would just solve any set of coins as efficiently as possible. To do that, the number of coins that can be solved has to be computed. As was done above, but this recurrence relation is more advanced: each weighing can solve 3 ^ n more coins. $$ \sum_{k=0}^{n-1} 3^k = (3^n - 1) / 2 $$

With that calculation inverted, the count can be removed from the interface

In [4]:
import math

class Balance(Balance):
    def solve(self, coins):
        if not coins:
            return
        count = math.ceil(math.log(len(coins) * 2 + 1, 3))
        weigh = set(itertools.islice(coins, 3 ** (count - 1) - (not self.filler)))
        exclude = set(coins) - weigh
        left, right = (set(itertools.islice(weigh, start, None, 2)) for start in range(2))
        weight = self.weigh(left, right)
        self.filler.update(exclude if weight else weigh)
        if weight < 0:
            yield from self.find(left, right)
        elif weight > 0:
            yield from self.find(right, left)
        else:
            yield from self.solve(exclude)

balance = Balance('a', Weight.LIGHT)
for light, heavy in balance.solve(coins):
    print(''.join(light), ''.join(heavy))
dbah fceg
a e
a 

Notice the formula indicates it's possible to do 13 coins in 3 weighings, and it would be with a filler coin to balance out the 9 that need weighing.

Hat puzzle

How to solve the Hat puzzle programmatically.

Ten-Hat Variant

In this variant there are 10 prisoners and 10 hats. Each prisoner is assigned a random hat, either red or blue, but the number of each color hat is not known to the prisoners. The prisoners will be lined up single file where each can see the hats in front of him but not behind. Starting with the prisoner in the back of the line and moving forward, they must each, in turn, say only one word which must be "red" or "blue". If the word matches their hat color they are released, if not, they are killed on the spot. A friendly guard warns them of this test one hour beforehand and tells them that they can formulate a plan where by following the stated rules, 9 of the 10 prisoners will definitely survive, and 1 has a 50/50 chance of survival. What is the plan to achieve the goal?

This puzzle involves three concepts common to classic logic puzzles:

Theory of mind comes into play because each prisoner has differing knowledge, but assumes everyone else will think similarly. Functional fixedness occurs more subtly; each prisoner may state a color only to convey information. But because the information is encoded as a color, it tends to focus thinking on the colors themselves. So to combat that cognitive bias, first create a different enumeration to represent statements. Any binary enum can be mapped back to colors, so why not bool.

In [1]:
colors = 'red', 'blue'
colors[False], colors[True]
Out[1]:
('red', 'blue')

Which leaves induction: solve the puzzle for the base case (smallest size) first, and then methodically build on that solution. In the case of 1 prisoner, they have no information a priori, and therefore have a 50/50 chance of survival regardless of strategy. This variant of the puzzle already gives the optimal goal, so we know that everyone but the 1st can say their color and be saved, while the 1st can devote their answer to the common cause.

In the case of 2 prisoners, obviously the 1st can say the color of the 2nd. That approach does not scale; it is the path to functional fixedness. Instead, methodically enumerate all possible statements and colors to determine if there is an unambiguous solution.

In [2]:
table = list(zip([False, True], colors))
table
Out[2]:
[(False, 'red'), (True, 'blue')]

The above table is a general solution with no assumptions other than the arbitrary ordering of enums. While it may appear absurdly pedantic, it represents a rule set which is key to building a recursive solution.

In the case of the 3rd prisoner, clearly they can not just repeat the above rule set, because the 3rd would receive no information. But there are only 2 choices, so the only option is to follow the opposite rule set, depending on the 3rd color.

The crucial step is to build off of the existing table.

In [3]:
table = [row + colors[:1] for row in table] + [(not row[0],) + row[1:] + colors[1:] for row in table]
table
Out[3]:
[(False, 'red', 'red'),
 (True, 'blue', 'red'),
 (True, 'red', 'blue'),
 (False, 'blue', 'blue')]

The solution is valid if each prisoner is able to narrow the possibilities to a unique row based on the colors they hear and see.

In [4]:
import collections

def test(table):
    """Assert that the input table is a valid solution."""
    (size,) = set(map(len, table))
    for index in range(size):
        counts = collections.Counter(row[:index] + row[index + 1:] for row in table)
        assert set(counts.values()) == {1}

test(table)

The general solution is simply the above logic in recursive form, with a parametrized size.

In [5]:
def solve(count: int):
    """Generate a flat table of all spoken possibilities."""
    if count <= 1:
        yield False,
        return
    for row in solve(count - 1):
        yield row + colors[:1]
        yield (not row[0],) + row[1:] + colors[1:]

list(solve(3))
Out[5]:
[(False, 'red', 'red'),
 (True, 'red', 'blue'),
 (True, 'blue', 'red'),
 (False, 'blue', 'blue')]

The complicated puzzle is actually a trivial recurrence relation: $$ 2^n = 2^{n-1} * 2 $$ There are $2^n$ states of the prisoners, and each prisoner has $n-1$ bits of data. So an additional bit of data from the first is sufficient to solve the puzzle.

In [6]:
table = list(solve(10))
test(table)
len(table)
Out[6]:
512
In [7]:
table[:3]
Out[7]:
[(False, 'red', 'red', 'red', 'red', 'red', 'red', 'red', 'red', 'red'),
 (True, 'red', 'red', 'red', 'red', 'red', 'red', 'red', 'red', 'blue'),
 (True, 'red', 'red', 'red', 'red', 'red', 'red', 'red', 'blue', 'red')]

The puzzle is solved, but the output is of exponential size, certainly not the succinct solution which makes the puzzle famous. But instead of relying on a flash of insight, this approach produces not just a solution, but the solution. The only arbitrary decision made was the enumeration. Therefore it must be the case that the solution can be summarized.

First, it would be helpful to group the solution by the 1st statement. Any summary function would have to ensure that there is no collision in the grouped possibilities.

In [8]:
groups = collections.defaultdict(set)
for row in table:
    groups[row[0]].add(row[1:])
groups = groups[False], groups[True]

def summarize(func, groups):
    """Apply summary function to groups and assert uniqueness."""
    groups = tuple(set(map(func, group)) for group in groups)
    assert set.isdisjoint(*groups)
    return groups

assert summarize(lambda g: g, groups) == groups
tuple(map(len, groups))
Out[8]:
(256, 256)

Now what summaries to attempt? Well there are few properties of sequences to work with: size and order. They are all the same size, so that won't help. That leaves ordering, which can be easily tested by sorting.

In [9]:
summarize(lambda g: tuple(sorted(g)), groups)
Out[9]:
({('blue', 'blue', 'blue', 'blue', 'blue', 'blue', 'blue', 'blue', 'red'),
  ('blue', 'blue', 'blue', 'blue', 'blue', 'blue', 'red', 'red', 'red'),
  ('blue', 'blue', 'blue', 'blue', 'red', 'red', 'red', 'red', 'red'),
  ('blue', 'blue', 'red', 'red', 'red', 'red', 'red', 'red', 'red'),
  ('red', 'red', 'red', 'red', 'red', 'red', 'red', 'red', 'red')},
 {('blue', 'blue', 'blue', 'blue', 'blue', 'blue', 'blue', 'blue', 'blue'),
  ('blue', 'blue', 'blue', 'blue', 'blue', 'blue', 'blue', 'red', 'red'),
  ('blue', 'blue', 'blue', 'blue', 'blue', 'red', 'red', 'red', 'red'),
  ('blue', 'blue', 'blue', 'red', 'red', 'red', 'red', 'red', 'red'),
  ('blue', 'red', 'red', 'red', 'red', 'red', 'red', 'red', 'red')})

Success. Now that order does not matter, the appropriate data structure is a multiset (a.k.a. bag). Each prisoner can keep track of only how many of each color they hear and see.

In [10]:
summarize(lambda g: frozenset(collections.Counter(g).items()), groups)
Out[10]:
({frozenset({('blue', 8), ('red', 1)}),
  frozenset({('blue', 2), ('red', 7)}),
  frozenset({('blue', 6), ('red', 3)}),
  frozenset({('blue', 4), ('red', 5)}),
  frozenset({('red', 9)})},
 {frozenset({('blue', 1), ('red', 8)}),
  frozenset({('blue', 7), ('red', 2)}),
  frozenset({('blue', 5), ('red', 4)}),
  frozenset({('blue', 3), ('red', 6)}),
  frozenset({('blue', 9)})})

Since there are only 2 colors which sum to a constant, keeping track of just one is sufficient.

In [11]:
summarize(lambda g: g.count(colors[0]), groups)
Out[11]:
({1, 3, 5, 7, 9}, {0, 2, 4, 6, 8})

There's one last pattern to the numbers, which can be used to achieve parity with the canonical solution.

Split an Iterable

Split an iterable into equal sized chunks.

A common task and interview question, with many variants. It's frequently asked and answered in a way that's suboptimal and only handles one specific case. The goal here is to present definitive, general, and efficient solutions.

The first variant is whether or not the chunks will overlap. Although this could be generalized into a step parameter, it's nearly always the case that step in (1, size).

The second variant is whether the tail of the data should be returned, if it's not of equal size. Clearly when step == 1 that's unlikely to be desirable. However, when step == size, that would seem to be the natural choice. And again when 1 < step < size, the desired behavior isn't clear at all.

The third variant is whether to slice sequences, or support any iterable. Obviously working for any iterable would be ideal, but it's also likely a user would expect slices given a sequence, particularly in the case of strings.

So this author feels it's best to split the problem in 2 distinct cases: a sliding window for overlapping sequences, and chunks for discrete sequences. In each case, supporting iterables and using advanced iterator algebra for a minimal and efficient solution.

Window

In [1]:
import itertools

def window(iterable, size=2):
    """Generate a sliding window of values."""
    its = itertools.tee(iterable, size)
    return zip(*(itertools.islice(it, index, None) for index, it in enumerate(its)))

list(window('abcde'))
Out[1]:
[('a', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e')]

That's simple, and close to optimal. There is slight overhead in iterating an islice object, so a minor variant would be to force the step-wise iteration in advance.

In [2]:
import collections

def window(iterable, size=2):
    """Generate a sliding window of values."""
    its = itertools.tee(iterable, size)
    for index, it in enumerate(its):
        collections.deque(itertools.islice(it, index), 0)  # exhaust iterator
    return zip(*its)

list(window('abcde'))
Out[2]:
[('a', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e')]

Chunks

A lesser-known and under-utilized feature of iter is that in can take a callable (of no arguments) and a sentinel to create an iterator. A perfect use case of the "loop and a half" idiom.

In [3]:
def chunks(iterable, size):
    """Generate adjacent chunks of data"""
    it = iter(iterable)
    return iter(lambda: tuple(itertools.islice(it, size)), ())

list(chunks('abcde', 3))
Out[3]:
[('a', 'b', 'c'), ('d', 'e')]

This should also be optimal, for reasonable sizes.

Sequences with dispatch

Rather than explicitly check isinstance, this is a perfect use case for functools.singledispatch.

In [4]:
import functools
from collections.abc import Sequence

window = functools.singledispatch(window)

@window.register
def _(seq: Sequence, size=2):
    for index in range(len(seq) - size + 1):
        yield seq[index:index + size]

list(window('abcde'))
Out[4]:
['ab', 'bc', 'cd', 'de']
In [5]:
list(window(iter('abcde')))
Out[5]:
[('a', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e')]
In [6]:
chunks = functools.singledispatch(chunks)

@chunks.register
def _(seq: Sequence, size):
    for index in range(0, len(seq), size):
        yield seq[index:index + size]

list(chunks('abcde', 3))
Out[6]:
['abc', 'de']
In [7]:
list(chunks(iter('abcde'), 3))
Out[7]:
[('a', 'b', 'c'), ('d', 'e')]

Accumulator

A Paul Graham classic, the accumulator function.

As an illustration of what I mean about the relative power of programming languages, consider the following problem. We want to write a function that generates accumulators-- a function that takes a number n, and returns a function that takes another number i and returns n incremented by i.

(That's incremented by, not plus. An accumulator has to accumulate.)

In Common Lisp this would be

(defun foo (n) (lambda (i) (incf n i)))

...

If you try to translate the Lisp/Perl/Smalltalk/Javascript code into Python you run into some limitations. Because Python doesn't fully support lexical variables, you have to create a data structure to hold the value of n. And although Python does have a function data type, there is no literal representation for one (unless the body is only a single expression) so you need to create a named function to return. This is what you end up with:

def foo(n): s = [n] def bar(i): s[0] += i return s[0] return bar

Python users might legitimately ask why they can't just write

def foo(n): return lambda i: return n += i

or even

def foo(n): lambda i: n += i

and my guess is that they probably will, one day. (But if they don't want to wait for Python to evolve the rest of the way into Lisp, they could always just...)

There are other solutions, using function attributes or instances with a __call__ method, but none are substantially more elegant. The challenge predates Python 3, which introduced the nonlocal keyword, making this the presumably preferred solution:

In [1]:
def foo(n):
    def inc(x):
        nonlocal n
        n += x
        return n
    return inc

acc = foo(0)
acc(1)
acc(2)
Out[1]:
3

There was also yet another alternative as of Python 2.6: using a generator as a coroutine.

In [2]:
def foo(n):
    while True:
        n += yield n

acc = foo(0)
next(acc)
acc.send(1)
acc.send(2)
Out[2]:
3

To satisfy the challenge, that would need to be wrapped with a decorator. The triple partial expression below may seem a little obtuse, but it's not as bad as it looks. Just unwind it one step at a time.

In [3]:
from functools import partial

@partial(partial, partial)
def coroutine(func, *args, **kwargs):
    gen = func(*args, **kwargs)
    next(gen)
    return gen.send

coroutine
Out[3]:
functools.partial(<class 'functools.partial'>, <function coroutine at 0x10bf970e0>)
In [4]:
@coroutine
def foo(n):
    while True:
        n += yield n

foo
Out[4]:
functools.partial(<function coroutine at 0x10bf970e0>, <function foo at 0x10bf7eb90>)
In [5]:
acc = foo(0)
acc
Out[5]:
<function generator.send>
In [6]:
acc(1)
acc(2)
Out[6]:
3

But what's the most Pythonic solution? This author would argue... don't. In my experience, I have never really needed global or nonlocal in production code. Typically it's because the objects in question are mutable, so it's not necessary to rebind a name in a different scope to a new object.

A typical tell of these kinds of code challenges are that they focus on the interface or implementation exclusively, never both in context. Python numbers are immutable, and have syntactic support for incrementing, so there's nothing more readable about acc(...) instead of n += ....

Futhermore, the accumulator object is intended to be used repeatedly, such as in a loop. In a language with such strong iteration support as Python, it's extremely likely that accumulation will occur in a iterative loop. Indeed, the real accumulator has since been added to the standard library.

In [7]:
import itertools

list(itertools.accumulate(range(10)))
Out[7]:
[0, 1, 3, 6, 10, 15, 21, 28, 36, 45]