where to buy misoprostol online how to buy valtrex
Python Markov Chains and how to use them | Evan Fosmark

Python Markov Chains and how to use them

So, due to recommendations to use Markov chains for text generation after my last little script on creating fake words, I have finally gotten around to learning and implementing the Markov chain algorithm for text generation in Python. There were a few different implementations already available, but they all seemed a bit gimpy. The solution I wrote up aims to be able to support high volumes of text. Take a look at what I conjured up and tell me what you think:

import collections
import random
 
 
class DynamicDie(collections.defaultdict):
 
    def __init__(self):
        collections.defaultdict.__init__(self, int)
 
    def add_side(self, side):
        self[side] += 1
 
    def total_sides(self):
        return sum(self.values())
 
    def roll(self):
        random_num = random.randint(0, self.total_sides())
        total_pos = 0
        for side, qty in self.items():
            total_pos += qty
            if random_num <= total_pos:
                return side
 
 
class MarkovChain(collections.defaultdict):
    """ Yet another markov chain implementation.
        This one differs in the sense that it is able to better support
        huge amounts of data since the weighted randomization doesn't rely
        on duplicates.
    """
 
    # Sentinals 
    # Discussion here: http://stackoverflow.com/questions/1677726
    class START(object):pass
    class END(object):pass
 
    def __init__(self):
        collections.defaultdict.__init__(self, DynamicDie)
 
    def add(self, iterable):
        """ Insert an iterable (pattern) item into the markov chain.
            The order of the pattern will define more of the chain.
        """
        item1 = item2 = MarkovChain.START
        for item3 in iterable:
            self[(item1, item2)].add_side(item3)
            item1 = item2
            item2 = item3
        self[(item1, item2)].add_side(MarkovChain.END)
 
    def random_output(self, max=100):
        """ Generate a list of elements from the markov chain.
            The `max` value is in place in order to prevent excessive iteration.
        """
        output = []
        item1 = item2 = MarkovChain.START
        for i in range(max-2):
            item3 = self[(item1, item2)].roll()
            if item3 is MarkovChain.END:
                break
            output.append(item3)
            item1 = item2
            item2 = item3
        return output

It’s not really complicated as long as you understand Markov chains. Let’s use this in an example to generate some fake words:

import markov
chain = markov.MarkovChain()
 
# Load the dictionary into the markov chain
dictionary = "/usr/share/dict/words"
for word in open(dictionary):
    word = word.strip()
    if word != "" and not word.endswith("'s"):
        chain.add(word)
 
# Let's generate 10 random pseudowords
for i in range(10):
    print "".join(chain.random_output())

The way I did it above isn’t actually that good since it loads every word in the English language into the Markov chain. Of course, you don’t need all of the words to get good looking output. This is just an example, though. I’ll leave it up to you to make a sufficient solution.

As always, if you see a way that the code in this post could be improved, please leave a comment!

Update: I updated it to use the dynamic die object. This solves the linear search problem that Marius mentions below.

 

 

6 Comments

  1. Marius Gedminas wrote,

    The linear search in _ChainLink.add() rubs me wrong. If you used a dictionary and then iterated over items() in get_random(), would that change any properties of the output? I don’t think so.

  2. Guillaume wrote,

    ius

    First it’s easy to replace the linear search by a list an a dict of weight :

    class _ChainLink(object):
        def __init__(self):
            self._weight = collections.defaultdict(int)
            self._list = []
     
        def add(self, item):
            if item not in self._weight:
                self._list.append(item)
     
            self._weight[item] += 1
     
        def get_random(self):	   
            total = sum(self._weight.itervalues())
            random_num = random.randint(0, total)
            total_pos = 0
            for value in self._list:
                total_pos += self._weight[value]
                if random_num &lt;= total_pos:
                    return value

    This drops the running time from 26 seconds to 12 seconds for me and gave the same results (I hack a little the random to have the same seed)

    If I totaly remove the list for the order, I get 10s

    def __init__(self):	    
        self._weight = collections.defaultdict(int)
     
    def add(self, item):
        self._weight[item] += 1
     
    def get_random(self):
        total = sum(self._weight.itervalues())
        random_num = random.randint(0, total)
        total_pos = 0
        for value,weight in self._weight.iteritems():
            total_pos += weight
            if random_num &lt;= total_pos:
                return value

    Profiling shows that the most of the time is spent in MarkovChain.add which can be difficult to optimize.

     2486824    3.721    0.000    3.721    0.000 markov.py:12(add)
       234937    9.784    0.000   13.522    0.000 markov.py:48(add)
    

    Note: if we use python3 and replace all the iteritems and itervalues by items and values(), time fall to 10s ;)

  3. Evan wrote,

    Nice catch, Marius. I had planned to update it before I posted, but I forgot.

  4. Guillaume wrote,

    I’m wondering why my post about some optimisation and profiling does not appear ?

    Basicely, it’s interesting to see that the solution runs quicker (10%) on python3 than python2, I’m wondering why.

  5. Eric wrote,

    I find better results with a third element in the Markov state. (A fourth generates a few too many real words for my taste.) A quick generalization of your code allows that:

        class MarkovChain(collections.defaultdict):
            """ Yet another markov chain implementation.
                This one differs in the sense that it is able to better support
                huge amounts of data since the weighted randomization doesn't rely
                on duplicates.
            """
     
            # Sentinels 
            # Discussion here: http://stackoverflow.com/questions/1677726
            class START(object):pass
            class END(object):pass
     
            def __init__(self, level=2):
                self.level = level
                collections.defaultdict.__init__(self, DynamicDie)
     
            def add(self, iterable):
                """ Insert an iterable (pattern) item into the markov chain.
                    The order of the pattern will define more of the chain.
                """
                prefix = [MarkovChain.START] * self.level
                for item in iterable:
                    self[tuple(prefix)].add_side(item)
                    prefix.append(item)
                    prefix.pop(0)
                self[tuple(prefix)].add_side(MarkovChain.END)
     
            def random_output(self, max=100):
                """ Generate a list of elements from the markov chain.
                    The `max` value is in place in order to prevent excessive iteration.
                """
                output = []
                prefix = [MarkovChain.START] * self.level
                for i in range(max - self.level):
                    item = self[tuple(prefix)].roll()
                    if item is MarkovChain.END:
                        break
                    output.append(item)
                    prefix.append(item)
                    prefix.pop(0)
                return output
  6. Evan wrote,

    @Guillaume,

    For some reason, Akismet flagged your comments. I’ve re-instated them now.

Leave a comment