Sunday, May 30, 2021
VSCode Jupyter Notebook is not opening in Anaconda/conda environment
Wednesday, March 31, 2021
How to check which Python installation/environment is being used within Python
import sys print(sys.executable)
Sunday, October 25, 2020
Python generic methods/functions
class C {
T getValue() {
}
}
Now Python has type hints which are used by the program mypy, but I was having trouble finding how to do generic methods with it. The problem is that generic method examples shown require you to pass in a parameter with the generic type. But if the type is not used in the parameters but only in the return, as shown in the C# example above, then you'd be in trouble.
The solution is to pass the type itself as a parameter, in the same way that the C# example above does but as a parameter instead of outside the parameter list. It is shown here but not as a generic function.
from typing import TypeVar, Type
T = TypeVar('T')
class C:
def get_value(return_type: Type[T]) -> T:
pass
Friday, July 31, 2020
How to capture/redirect output in Python's print
with open('x.txt', 'w', encoding='utf-8') as f:
print('hello', file=f)
The 'file' parameter accepts a stream object and the print function basically just passes the string you give it to this stream object's 'write' method. In fact in the above example you could have written to the file without using print as follows:
with open('x.txt', 'w', encoding='utf-8') as f:
f.write('hello')
f.write('\n')
f.flush()
So question is, what is the stream used by print when it prints to the screen? The print method's signature is defined as follows:
print(value, ..., sep=' ', end='\n', file=sys.stdout, flush=False)
So the answer is sys.stdout, which is a variable that points to the object sys.__stdout__. In fact you can make your own print function by using the sys.__stdout__ object as follows:
import sys
def my_print(line):
sys.__stdout__.write(line)
sys.__stdout__.write('\n')
sys.__stdout__.flush()
my_print('hello')
Note that the output will appear in the command line terminal. Now we saw how the default file parameter of the print function is whatever sys.stdout is pointing to. This means that we can highjack this variable with our own stream so that default prints get redirected to our code instead of the screen. Here's how to make your own stream:
class MyStream(object):
def write(self, text):
pass
def flush(self):
pass
Just replace 'pass' in the write method with whatever you want and that is what will happen with the print string. Now normally you would do something like this:
print('hello', file=MyStream())
but if you're trying to capture the outputs of some library, like sklearn, then you don't have control over the library's print functions. Instead you can do this:
import sys
tmp = sys.stdout
sys.stdout = MyStream()
print('hello')
sys.stdout = tmp
First you hijack the stdout variable with your own stream and then after the prints you restore the stdout variable with the original stream. Now, you need to be careful with what happens inside your stream's write method because if there is another print in there then it will also call your stream's write method which will result in an infinite recursion loop. What you can do is temporarily restore the original stream whilst inside your stream, like this:
import sys
class MyStream(object):
def write(self, text):
tmp = sys.stdout
sys.stdout = sys.__stdout__
print('my', text)
sys.stdout = tmp
def flush(self):
pass
tmp = sys.stdout
sys.stdout = MyStream()
print('hello')
sys.stdout = tmp
Now every time you print something, you'll get 'my' added to its front. Unfortunately the print function sends the '\n' at the end of the line in a separate call to the stream's write method, which means that you actually get two calls per print:
my hello my
A simple solution is to just put an 'if' statement that checks if the text received is just a '\n' and ignore it if so:
import sys
class MyStream(object):
def write(self, text):
if text == '\n':
return
tmp = sys.stdout
sys.stdout = sys.__stdout__
print('my', text)
sys.stdout = tmp
def flush(self):
pass
tmp = sys.stdout
sys.stdout = MyStream()
print('hello')
sys.stdout = tmp
Monday, December 30, 2019
Implementing a Gaussian blur filter together with convolution operation from scratch
Let's use the following grey-scale image as an example:

The image consists of many numbers that are arranged in a grid, each number called a pixel. The pixel values range from 0 to 255 where 0 is represented with a black colour and 255 with a white colour and all the other numbers are shades of grey in between. Here are the values of the 5x5 square of pixels in the top-left corner of the image:
[[156, 157, 160, 159, 158], [156, 157, 159, 158, 158], [158, 157, 156, 156, 157], [160, 157, 154, 154, 156], [158, 157, 156, 156, 157]]
In the case of a colour image, each pixel contains 3 values: the intensity of red, the intensity of blue, and the intensity of green. When these three primary colours are combined in different intensities they can produce any colour.
Let's use the following colour image as an example:

The values of the 5x5 square of pixels in the top-left corner of the image are:
[[[21, 13, 8], [21, 13, 9], [20, 11, 8], [21, 13, 11], [21, 14, 8]], [[21, 13, 7], [21, 13, 9], [20, 14, 7], [22, 14, 8], [20, 13, 7]], [[21, 14, 7], [23, 13, 10], [20, 14, 9], [21, 13, 9], [21, 15, 9]], [[21, 13, 6], [21, 13, 9], [21, 13, 8], [21, 13, 10], [20, 12, 5]], [[21, 13, 5], [21, 13, 7], [21, 13, 4], [21, 13, 7], [21, 13, 8]]]
One of the basic operations in image processing is the convolution. This is when you replace every pixel value $p$ in an image with a weighted sum of the pixel values near $p$, that is, multiply each nearby pixel value with a different number before adding them all up and replacing $p$ with the result. The simplest example of a convolution is when 'nearby' means the pixel itself only. For example, if we convolve a 5x5 pixel greyscale image with a 1x1 weighting, this is what we get:
\left[\begin{array}{ccccc}
1 & 2 & 3 & 4 & 5\\
6 & 7 & 8 & 9 & 10\\
11 & 12 & 13 & 14 & 15\\
16 & 17 & 18 & 19 & 20\\
21 & 22 & 23 & 24 & 25\\
\end{array}\right]
\circledast
\left[\begin{array}{c}
100
\end{array}\right]
=
\left[\begin{array}{ccccc}
1 \times 100 & 2 \times 100 & 3 \times 100 & 4 \times 100 & 5 \times 100\\
6 \times 100 & 7 \times 100 & 8 \times 100 & 9 \times 100 & 10 \times 100\\
11 \times 100 & 12 \times 100 & 13 \times 100 & 14 \times 100 & 15 \times 100\\
16 \times 100 & 17 \times 100 & 18 \times 100 & 19 \times 100 & 20 \times 100\\
21 \times 100 & 22 \times 100 & 23 \times 100 & 24 \times 100 & 25 \times 100\\
\end{array}\right]
=
\left[\begin{array}{ccccc}
100 & 200 & 300 & 400 & 500\\
600 & 700 & 800 & 900 & 1000\\
1100 & 1200 & 1300 & 1400 & 1500\\
1600 & 1700 & 1800 & 1900 & 2000\\
2100 & 2200 & 2300 & 2400 & 2500\\
\end{array}\right]
$$
Here we have replaced the pixel value 1 with 100, 2 with 200, etc. Note that in practice the numbers would be clipped to not exceed 255.
Only the pixel being replaced contributed to its new value. We can extend the range of the weighting by using a 3x3 weight instead of a single number which will combine all the pixels adjacent to the pixel being replaced.
\left[\begin{array}{ccccc}
1 & 2 & 3 & 4 & 5\\
6 & 7 & 8 & 9 & 10\\
11 & 12 & 13 & 14 & 15\\
16 & 17 & 18 & 19 & 20\\
21 & 22 & 23 & 24 & 25\\
\end{array}\right]
\circledast
\left[\begin{array}{ccc}
100 & 200 & 100\\
300 & 400 & 300\\
100 & 200 & 100\\
\end{array}\right]
=
\left[\begin{array}{ccccc}
\left(\begin{array}{cccccc}
& 1 \times 100 & + & 2 \times 200 & + & 3 \times 100\\
+ & 6 \times 300 & + & 7 \times 400 & + & 8 \times 300\\
+ & 11 \times 100 & + & 12 \times 200 & + & 13 \times 100\\
\end{array}\right)
&
\left(\begin{array}{cccccc}
& 2 \times 100 & + & 3 \times 200 & + & 4 \times 100\\
+ & 7 \times 300 & + & 8 \times 400 & + & 9 \times 300\\
+ & 12 \times 100 & + & 13 \times 200 & + & 14 \times 100\\
\end{array}\right)
&
\left(\begin{array}{cccccc}
& 3 \times 100 & + & 4 \times 200 & + & 5 \times 100\\
+ & 8 \times 300 & + & 9 \times 400 & + & 10 \times 300\\
+ & 13 \times 100 & + & 14 \times 200 & + & 15 \times 100\\
\end{array}\right)
\\
\left(\begin{array}{cccccc}
& 6 \times 100 & + & 7 \times 200 & + & 8 \times 100\\
+ & 11 \times 300 & + & 12 \times 400 & + & 13 \times 300\\
+ & 16 \times 100 & + & 17 \times 200 & + & 18 \times 100\\
\end{array}\right)
&
\left(\begin{array}{cccccc}
& 7 \times 100 & + & 8 \times 200 & + & 9 \times 100\\
+ & 12 \times 300 & + & 13 \times 400 & + & 14 \times 300\\
+ & 17 \times 100 & + & 18 \times 200 & + & 19 \times 100\\
\end{array}\right)
&
\left(\begin{array}{cccccc}
& 8 \times 100 & + & 9 \times 200 & + & 10 \times 100\\
+ & 13 \times 300 & + & 14 \times 400 & + & 15 \times 300\\
+ & 18 \times 100 & + & 19 \times 200 & + & 20 \times 100\\
\end{array}\right)
\\
\left(\begin{array}{cccccc}
& 11 \times 100 & + & 12 \times 200 & + & 13 \times 100\\
+ & 16 \times 300 & + & 17 \times 400 & + & 18 \times 300\\
+ & 21 \times 100 & + & 22 \times 200 & + & 23 \times 100\\
\end{array}\right)
&
\left(\begin{array}{cccccc}
& 12 \times 100 & + & 13 \times 200 & + & 14 \times 100\\
+ & 17 \times 300 & + & 18 \times 400 & + & 19 \times 300\\
+ & 22 \times 100 & + & 23 \times 200 & + & 24 \times 100\\
\end{array}\right)
&
\left(\begin{array}{cccccc}
& 13 \times 100 & + & 14 \times 200 & + & 15 \times 100\\
+ & 18 \times 300 & + & 19 \times 400 & + & 20 \times 300\\
+ & 23 \times 100 & + & 24 \times 200 & + & 25 \times 100\\
\end{array}\right)
\\
\end{array}\right]
=
\left[\begin{array}{ccc}
12600 & 14400 & 16200\\
21600 & 23400 & 25200\\
30600 & 32400 & 34200\\
\end{array}\right]
$$
Note how the new image becomes smaller than the original image. This is in order to avoid using pixels values that lie outside of the image edges when computing new pixel values. One way to use pixel values outside of the image is to treat any pixel outside of the image as having a value of zero (zero padding). We can also reflect the image outside of its edges or wrap around to the other side of the image.
We can implement the above convolution operation in Python naively using for loops as follows:
def conv(img, wgt):
img_h = len(img)
img_w = len(img[0])
wgt_h = len(wgt)
wgt_w = len(wgt[0])
res_h = img_h - (wgt_h//2)*2
res_w = img_w - (wgt_w//2)*2
res = [ [ 0 for _ in range(res_w) ] for _ in range(res_h) ]
for img_row in range(wgt_h//2, img_h-wgt_h//2):
for img_col in range(wgt_w//2, img_w-wgt_w//2):
new_pix = 0
for wgt_row in range(wgt_h):
for wgt_col in range(wgt_w-1, -1, -1): #Technically, a convolution operation needs to flip the weights left to right.
new_pix += wgt[wgt_row][wgt_col]*img[img_row+wgt_row-wgt_h//2][img_col+wgt_col-wgt_w//2]
res[img_row - wgt_h//2][img_col - wgt_w//2] = new_pix
return res
print(conv([
[ 1, 2, 3, 4, 5],
[ 6, 7, 8, 9,10],
[11,12,13,14,15],
[16,17,18,19,20],
[21,22,23,24,25]
], [
[100,200,100],
[300,400,300],
[100,200,100]
]))
[[12600, 14400, 16200], [21600, 23400, 25200], [30600, 32400, 34200]]
I say naively because it is possible to perform a convolution using fewer loops and with parallel processing but that would be beyond the scope of this blog post. Note that in the code above we are flipping the weights before multiplying them because that is what a convolution involves, but since we'll be working with symmetric weights then it doesn't really matter.
If we want to use zero padding in our convolution then we would do it as follows:
def conv_pad(img, wgt):
img_h = len(img)
img_w = len(img[0])
wgt_h = len(wgt)
wgt_w = len(wgt[0])
res = [ [ 0 for _ in range(img_w) ] for _ in range(img_h) ]
for img_row in range(img_h):
for img_col in range(img_w):
new_pix = 0
for wgt_row in range(wgt_h):
for wgt_col in range(wgt_w-1, -1, -1):
y = img_row+wgt_row-wgt_h//2
x = img_col+wgt_col-wgt_w//2
if 0 >= y > img_h and 0 >= x > img_w:
pix = img[y][x]
else:
pix = 0
new_pix += wgt[wgt_row][wgt_col]*pix
res[img_row][img_col] = new_pix
return res
print(conv_pad([
[ 1, 2, 3, 4, 5],
[ 6, 7, 8, 9,10],
[11,12,13,14,15],
[16,17,18,19,20],
[21,22,23,24,25]
], [
[100,200,100],
[300,400,300],
[100,200,100]
]))
[[2900, 4800, 6200, 7600, 6100], [8300, 12600, 14400, 16200, 12500], [14800, 21600, 23400, 25200, 19000], [21300, 30600, 32400, 34200, 25500], [19900, 28800, 30200, 31600, 23100]]
We can adapt this function to be able to operate on colour images by simply applying the filter on each of the three intensity channels separately as follows:
def conv_pad_colour(img, wgt):
img_h = len(img)
img_w = len(img[0])
wgt_h = len(wgt)
wgt_w = len(wgt[0])
res = [ [ [ 0, 0, 0 ] for _ in range(img_w) ] for _ in range(img_h) ]
for img_row in range(img_h):
for img_col in range(img_w):
for channel in range(3):
new_pix = 0
for wgt_row in range(wgt_h):
for wgt_col in range(wgt_w-1, -1, -1):
y = img_row+wgt_row-wgt_h//2
x = img_col+wgt_col-wgt_w//2
if 0 <= y < img_h and 0 <= x < img_w:
pix = img[y][x][channel]
else:
pix = 0
new_pix += wgt[wgt_row][wgt_col]*pix
res[img_row][img_col][channel] = new_pix
return res
print(conv_pad_colour([
[[ 1, 2, 3], [ 4, 5, 6], [ 7, 8, 9], [10, 11, 12], [13, 14, 15]],
[[16, 17, 18], [19, 20, 21], [22, 23, 24], [25, 26, 27], [28, 29, 30]],
[[31, 32, 33], [34, 35, 36], [37, 38, 39], [40, 41, 42], [43, 44, 45]],
[[46, 47, 48], [49, 50, 51], [52, 53, 54], [55, 56, 57], [58, 59, 60]],
[[61, 62, 63], [64, 65, 66], [67, 68, 69], [70, 71, 72], [73, 74, 75]]
], [
[100,200,100],
[300,400,300],
[100,200,100]
]))
[[[6700, 7700, 8700], [11600, 13000, 14400], [15800, 17200, 18600], [20000, 21400, 22800], [16300, 17300, 18300]], [[22300, 23600, 24900], [34200, 36000, 37800], [39600, 41400, 43200], [45000, 46800, 48600], [34900, 36200, 37500]], [[41800, 43100, 44400], [61200, 63000, 64800], [66600, 68400, 70200], [72000, 73800, 75600], [54400, 55700, 57000]], [[61300, 62600, 63900], [88200, 90000, 91800], [93600, 95400, 97200], [99000, 100800, 102600], [73900, 75200, 76500]], [[57700, 58700, 59700], [83600, 85000, 86400], [87800, 89200, 90600], [92000, 93400, 94800], [67300, 68300, 69300]]]In practice we can use scipy's convolve function in order to apply a convolution:
import scipy.ndimage
import numpy as np
img = np.array([
[[ 1, 2, 3], [ 4, 5, 6], [ 7, 8, 9], [10, 11, 12], [13, 14, 15]],
[[16, 17, 18], [19, 20, 21], [22, 23, 24], [25, 26, 27], [28, 29, 30]],
[[31, 32, 33], [34, 35, 36], [37, 38, 39], [40, 41, 42], [43, 44, 45]],
[[46, 47, 48], [49, 50, 51], [52, 53, 54], [55, 56, 57], [58, 59, 60]],
[[61, 62, 63], [64, 65, 66], [67, 68, 69], [70, 71, 72], [73, 74, 75]]
])
weights = np.array([
[100,200,100],
[300,400,300],
[100,200,100]
])
res = np.stack([ #Convolve each channel separately as a greyscale image and then combine them back into a single image.
scipy.ndimage.convolve(img[:,:,channel], weights, mode='constant')
for channel in range(3)
], axis=2)
print(res)
In signal processing, the weights are actually called a filter.
import scipy.ndimage
import skimage
import numpy as np
img = skimage.io.imread('image.png')
filter = np.ones([11, 11])
filter = filter/filter.size
res = np.stack([ #Convolve each channel separately as a greyscale image and then combine them back into a single image.
scipy.ndimage.convolve(img[:,:,channel], filter, mode='constant')
for channel in range(3)
], axis=2)
skimage.io.imshow(res)
skimage.io.show()
This is actually called a uniform blur, which is not considered an ideal blur as it gives equal importance to pixels that are far from the pixel being replaced as to pixels that are close. A better blur is a gaussian blur.
A gaussian blur is when the filter values follow a gaussian bell curve with a maximum in the center and a steep decline just around the center. The gaussian function is defined as follows:
$$G(x) = \frac{1}{\sqrt{2\pi\sigma^2}}e^{-\frac{x^2}{2\sigma^2}}$$
where $\sigma$ is a parameter for controlling how wide the bell curve is and therefore how much influence the nearby pixels have on the value of the pixel being replaced (the greater the $\sigma$, the more blurry). In two dimensions all you do is take the gaussian of each variable and multiply them together:
$$G(x,y) = G(x) \times G(y)$$
Now the way the gaussian blur works is to use an infinite 2D gaussian curve as a filter for a convolution. The center of the filter is $G(0,0)$ and each cell in the filter being a distance of one in the gaussian function's domain as follows:
$$
\newcommand\iddots{\mathinner{
\kern1mu\raise1pt{.}
\kern2mu\raise4pt{.}
\kern2mu\raise7pt{\Rule{0pt}{7pt}{0pt}.}
\kern1mu
}}
\begin{array}{ccccc}
\ddots & & \vdots & & \iddots\\
& G(-1, 1) & G( 0, 1) & G( 1, 1) & \\
\cdots & G(-1, 0) & G( 0, 0) & G( 1, 0) & \cdots\\
& G(-1,-1) & G( 0,-1) & G( 1,-1) & \\
\iddots & & \vdots & & \ddots\\
\end{array}
$$
In order to be a perfect gaussian blur, the filter needs to be infinitely large, which is not possible. Fortunately, $G(3\sigma)$ is small enough to be rounded down to zero. This means that our kernel needs to only have a side of $\lceil 6\sigma + 1 \rceil$ at most as anything larger than that will not make a difference ($\lceil x \rceil$ means $x$ rounded up). The problem is that even though the missing values are negligible, the values in the filter will not equal 1 (especially for a small $\sigma$) which makes the resulting image will be either brighter or darker. To get around this we cheat a bit and divide each value in the filter by the filter's total, which results in a guassian blur with a neutral effect on the brightness of the image.
$$
\begin{array}{ccccccc}
\frac{G(-\lceil 3\sigma \rceil, \lceil 3\sigma \rceil)}{T} & & & \frac{G(0, \lceil 3\sigma \rceil)}{T} & & & \frac{G(\lceil 3\sigma \rceil, \lceil 3\sigma \rceil)}{T}\\
& \ddots & & \vdots & & \iddots & \\
& & \frac{G(-1, 1)}{T} & \frac{G( 0, 1)}{T} & \frac{G( 1, 1)}{T} & & \\
\frac{G(-\lceil 3\sigma \rceil, 0)}{T} & \cdots & \frac{G(-1, 0)}{T} & \frac{G( 0, 0)}{T} & \frac{G( 1, 0)}{T} & \cdots & \frac{G(\lceil 3\sigma \rceil, 1)}{T}\\
& & \frac{G(-1,-1)}{T} & \frac{G( 0,-1)}{T} & \frac{G( 1,-1)}{T} & & \\
& \iddots & & \vdots & & \ddots & \\
\frac{G(-\lceil 3\sigma \rceil, -\lceil 3\sigma \rceil)}{T} & & & \frac{G(0, -\lceil 3\sigma \rceil)}{T} & & & \frac{G(\lceil 3\sigma \rceil, -\lceil 3\sigma \rceil)}{T}\\
\end{array}
$$
where $T$ is the total of all the $G(x,y)$ in the filter.
import scipy.ndimage
import skimage
import numpy as np
img = skimage.io.imread('image.png')
sigma = 4
G = lambda x: 1/np.sqrt(2*np.pi*sigma**2)*np.exp(-x**2/(2*sigma**2))
G2 = lambda x,y: G(x)*G(y)
limit = np.ceil(3*sigma).astype(np.int32)
filter = G2(*np.meshgrid(np.arange(-limit, limit+1), np.arange(-limit, limit+1)))
filter = weights/np.sum(filter)
res = np.stack([
scipy.ndimage.convolve(img[:,:,channel], filter, mode='constant')
for channel in range(3)
], axis=2)
skimage.io.imshow(res)
skimage.io.show()
Of course skimage has a gaussian blur filter out of the box for us to use:
import skimage
img = skimage.io.imread('image.png')
sigma = 4
res = skimage.filters.gaussian(img, sigma, mode='constant', truncate=3) #Truncate to 3 sigmas.
skimage.io.imshow(res)
skimage.io.show()
Thursday, September 26, 2019
Python functions for generating all substrings/sublists, combinations, permutations, and sequences
def subs(xs, sub_len):
'''Get all sublists/substrings from xs of the given sub-length.'''
for i in range(len(xs) - sub_len + 1): #Get all indexes of the substrings that are at least sub_len long.
yield xs[i:i+sub_len]
for x in subs('abcd', 2):
print(x)
ab bc cd
def combinations(xs, comb_size):
'''Get all combinations of the given combination size.'''
if comb_size == 0:
yield xs[:0] #Empty list/string
else:
for i in range(len(xs) - comb_size + 1):
for ys in combinations(xs[i+1:], comb_size-1): #For every item, combine it with every item that comes after it.
yield xs[i] + ys
for x in combinations('abcd', 2):
print(x)
ab ac ad bc bd cd
def perms(xs):
'''Get all permutations.'''
if len(xs) == 1:
yield xs
else:
for i in range(len(xs)):
for ys in perms(xs[:i] + xs[i+1:]): #For every item, combine it with every item except itself.
yield xs[i] + ys
for x in perms('abc'):
print(x)
abc acb bac bca cab cba
def permutations(xs, perm_size):
'''Get all permutations of the given permutation size.'''
for cs in combinations(xs, perm_size):
for p in perms(cs): #Get all the permutations of all the combinations.
yield p
for x in permutations('abcd', 2):
print(x)
ab ba ac ca ad da bc cb bd db cd dc
def seqs(xs, seq_len):
'''Get all sequences of a given length.'''
if seq_len == 0:
yield xs[:0] #Empty list/string
else:
for i in range(len(xs)):
for ys in seqs(xs, seq_len-1): #For every item, combine it with every item including itself.
yield xs[i] + ys
for x in seqs('abc', 2):
print(x)
aa ab ac ba bb bc ca cb cc
Wednesday, July 31, 2019
Recognising simple shapes with OpenCV in Python
Basically the trick to recognise polygons is to convert your image into an approximate polygon representation using something like edge detection and then count the number of sides in the polygon. OpenCV handles a lot of this stuff for you so its quite easy.
Here's the image we will be working on:

The first step is to binarise the pixels of the image, that is they are made either black or white. First the image must be turned into greyscale so that there is just one number per pixel and then anything that is not white (greyscale value 255) is replaced with 255 (white) whilst pure white is replaced with 0 (black).
import cv2
import numpy as np
img = cv2.imread('polygons.png', cv2.IMREAD_COLOR)
#Make image greyscale
grey = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
cv2.imshow('greyscale', grey)
#Binarise greyscale pixels
(_, thresh) = cv2.threshold(grey, 254, 255, 1)
cv2.imshow('thresholded', thresh)


Next we're going to find contours around these white shapes, that is, reduce the image into a set of points with each point being a corner in the shape. This is done using the findContours function which returns a list consisting of contour points for every shape and their hierarchy. The hierarchy is the way each shape is nested within other shapes. We won't need this here since we don't have any shapes within shapes but it will be useful in other situations.
#Get shape contours
(contours, hierarchy) = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
thresh_copy = cv2.cvtColor(thresh, cv2.COLOR_GRAY2RGB)
cv2.drawContours(thresh_copy, contours, contourIdx=-1, color=(0, 255, 0), thickness=2)
print('number of sides per shape:')
for contour in contours:
print('', contour.shape[0])
print()
cv2.imshow('contours', thresh_copy)

number of sides per shape: 119 122 4 124
Unfortunately, the number of sides extracted from each shape is nowhere near what it should be. This is because the corners in the shapes are rounded which results in multiple sides around the corner. We therefore need to simplify these contours so that insignificant sides can be removed. This is done using the Ramer–Douglas–Peucker algorithm which is implemented as the approxPolyDP function. The algorithm requires a parameter called epsilon which controls how roughly to chop up the polygon into a smaller number of sides. This number is dependent on the size of the polygon so we make it a fraction of the shape's perimeter.
#Simplify contours
thresh_copy = cv2.cvtColor(thresh, cv2.COLOR_GRAY2RGB)
print('number of sides per shape:')
for contour in contours:
perimeter = cv2.arcLength(contour, True)
e = 0.05*perimeter #The bigger the fraction, the more sides are chopped off the original polygon
contour = cv2.approxPolyDP(contour, epsilon=e, closed=True)
cv2.drawContours(thresh_copy, [ contour ], contourIdx=-1, color=(0, 255, 0), thickness=2)
print('', contour.shape[0])
cv2.imshow('simplified contours', thresh_copy)

number of sides per shape: 6 5 4 3
And with this we have the number of sides in each polygon.
If you want to check for circles as well as polygons then you will not be able to do so by counting sides. Instead you can get the minimum enclosing circle around a contour and check if its area is close to the area of the contour (before it is simplified):
((centre_x, centre_y), radius) = cv2.minEnclosingCircle(contour)
if cv2.contourArea(contour)/(np.pi*radius**2) > 0.95:
print('circle')
The minimum enclosing circle is the smallest circle that completely contains the contour. Therefore it's area must necessarily be larger or equal to the shape of the contour, which can only be equal in the case that the contour is a circle.
This is also one way how you can get the position of each shape, by getting the centre point of the enclosing circle. The proper way to get a single coordinate representing the position of the contour is to get the centre of gravity using the contour's moments:
m = cv2.moments(contour) x = m['m10']/m['m00'] y = m['m01']/m['m00']
Here's the full code:
import cv2
import numpy as np
img = cv2.imread('polygons.png', cv2.IMREAD_COLOR)
#Binarise pixels
grey = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
(_, thresh) = cv2.threshold(grey, 254, 255, 1)
#Get shape contours
(contours, hierarchy) = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
#Recognise shapes
for contour in contours:
m = cv2.moments(contour)
x = round(m['m10']/m['m00'])
y = round(m['m01']/m['m00'])
shape_name = None
(_, radius) = cv2.minEnclosingCircle(contour)
if cv2.contourArea(contour)/(np.pi*radius**2) > 0.95:
shape_name = 'circle'
else:
e = 0.05*cv2.arcLength(contour, True)
simple_contour = cv2.approxPolyDP(contour, epsilon=e, closed=True)
num_sides = simple_contour.shape[0]
shape_name = { 3: 'triangle', 4: 'quad', 5: 'pentagon', 6: 'hexagon' }.get(num_sides, 'unknown')
cv2.putText(img, shape_name, (x, y), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.6, color=(0, 255, 0), thickness=2)
cv2.imshow('named shapes', img)
Tuesday, January 29, 2019
Python string format quick guide
Curly brackets are used to refer to parameter values in the 'format' method. On their own, the first pair of curly brackets will refer to the first parameter, second to the second, etc.
'{} {} {}'.format('a', 1, True)
a 1 True
Alternatively you can specify which parameter you want to use where using indexes.
'{0} {0} {2}'.format('a', 1, True)
a a True
You can even use names instead of indexes.
'{a} {b} {c}'.format(a='a', b=1, c=True)
a 1 True
And even refer to indexes in lists and to instance variables in objects.
'{a[0]} {b.numerator}'.format(a=[3,1,2], b=fractions.Fraction(1,2))
3 1
Adding a colon inside the curly brackets allows you to format the values before they are added to the string. You can combine these with any of the above methods of referencing. The following formatting meta characters are to be used in the following order:
{:[alignment] [grouping] [precision] [data type]}'|{:>3}|{:<3}|{:^3}|'.format('a', 'b', 'c')
| a|b | c |
Any character before the alignment meta character is used instead of spaces.
'{:_>5} {:0>5}'.format('a', 12)
____a 00012
'{:,}'.format(12345678)12,345,678
'{:.3}'.format(12345678.0)1.23e+07
You can use it to change the numeric base of a whole number.
'dec-{:d} hex-{:X} oct-{:o} bin-{:b}'.format(16, 16, 16, 16)dec-16 hex-10 oct-20 bin-10000
You can use it to convert unicode values to characters.
'{:c}'.format(97)a
You can use it to convert numbers into scientific notations,
'{:e}'.format(12345678)1.234568e+07
or fixed point notations,
'{:f}'.format(12345678)12345678.000000
or percentages.
'{:%}'.format(0.1234)12.340000%
'{:0>2X}'.format(10)0A
'{:,d}'.format(1234567)1,234,567
'{:,.2f}'.format(12345.6)12,345.60
'{:.2%}'.format(0.1234)12.34%
Saturday, September 29, 2018
Comparing numpy scalars directly is time consuming, use .tolist() before a comparison
for x in nparr:
if x == 0:
something something
as that uses a lot more time than doing this
for x in nparr.tolist():
if x == 0:
something something
This is because a for loop iterating over a numpy array does not result in a sequence of Python constants but in a sequence of numpy scalars which would result in comparing a numpy array to a constant. Converting the array into a list first before the for loop will then result in a sequence of constants.
Here is some profiling I've done using cProfile to check different ways to do an 'if' on a numpy array element:
import cProfile
import numpy as np
runs = 1000000
print('Comparing numpy to numpy')
x = np.array(1.0, np.float32)
y = np.array(1.0, np.float32)
cProfile.run('''
for _ in range(runs):
if x == y:
pass
''')
print()
print('Comparing numpy to constant')
x = np.array(1.0, np.float32)
cProfile.run('''
for _ in range(runs):
if x == 1.0:
pass
''')
print()
print('Comparing constant to constant')
x = 1.0
cProfile.run('''
for _ in range(runs):
if x == 1.0:
pass
''')
print()
print('Comparing numpy.tolist() to constant')
x = np.array(1.0, np.float32)
cProfile.run('''
for _ in range(runs):
if x.tolist() == 1.0:
pass
''')
print()
print('Comparing numpy to numpy.array(constant)')
x = np.array(1.0, np.float32)
cProfile.run('''
for _ in range(runs):
if x == np.array(1.0, np.float32):
pass
''')
print()
print('Comparing numpy.tolist() to numpy.tolist()')
x = np.array(1.0, np.float32)
y = np.array(1.0, np.float32)
cProfile.run('''
for _ in range(runs):
if x.tolist() == y.tolist():
pass
''')
print()
Here are the results in order of speed:
| Comparing constant to constant: | 0.088 seconds |
|---|---|
| Comparing numpy.tolist() to constant: | 0.288 seconds |
| Comparing numpy.tolist() to numpy.tolist(): | 0.508 seconds |
| Comparing numpy to numpy: | 0.684 seconds |
| Comparing numpy to constant: | 1.192 seconds |
| Comparing numpy to numpy.array(constant): | 1.203 seconds |
It turns out that it is always faster to first convert your numpy scalars into constants via .tolist() than to do anything with them as numpy scalars.
Monday, February 26, 2018
Finding the Greatest Common Divisor (GCD) / Highest Common Factor (HCF) of two numbers
The greatest common divisor or highest common factor of two numbers can be found using a very simple algorithm described since the time of Euclid, known as the Euclidean algorithm. The nice thing about it is that it can be computed visually by drawing squares in a rectangle as follows.
Let's say that you have a line segment of length 6 units and a smaller line segment of length 2 units.
We can check if the smaller line's length is a divisor of the larger line's length if we can place several of the smaller lines next to each other and reach the exact length of the larger line.
Likewise we can check if a number is a common divisor of two other numbers by checking if a square can fill exactly a rectangle.
In the above diagram, the big blue rectangle has sides of 6 units by 4 units and the small red squares have sides 2 units by 2 units. Since the red square can exactly fill the blue rectangle by placing copies of it next to each other, 2 is a common divisor of 4 and 6.
The greatest common divisor is the largest square that can exactly fill a rectangle. The largest square that can do this has its side equal to the smallest side of the rectangle (otherwise not even one square will fit in the rectangle) whilst the smallest square has its side equal to 1 unit (assuming that sides are always whole number lengths). Let's take the above rectangle as an example and use the Euclidean algorithm to find the largest square that fits a 6 by 4 rectangle.
The Euclidean algorithm takes advantage of the fact that the greatest common divisor of two numbers is also a divisor of the difference of the two numbers. If two numbers "a" and "b" have "c" as a common divisor, then they can be rewritten as "c×p" and "c×q". The difference "a-b" is then equal to "c×p - c×q" which is equal to "c×(p - q)" which is therefore a divisor of "c" since one of its factors is "c". Notice that "c" can be any divisor of "a" and "b", which means that "a-b" should also have the greatest common divisor of "a" and "b" as a divisor.
This means that the largest square to exactly fill the original 4 by 6 square will also be the largest square to exactly fill a 2 by 4 rectangle, since 2 is the difference between 4 and 6.
This piece of information allows us to substitute the difficult problem of finding the largest square that exactly fills a rectangle with an easier problem of finding the same square but on a smaller rectangle. The smaller the rectangle, the easier it is to find the largest square to exactly fill it and the Euclidean algorithm allows us to progressively chop off parts of the rectangle until all that remains is a small square which would be the square that exactly fills the original rectangle.
So the algorithm goes as follows:
- If the rectangle has both sides equal then it itself is the square that is the largest square that exactly fills the rectangle and we are done.
- Otherwise put a square inside the rectangle of length equal to the shortest side of the rectangle.
- Chop off the part of the rectangle that is covered by the square and repeat.
The 6 by 4 rectangle is trivial so let's try this algorithm on a slightly harder problem of 217 by 124. This time we'll be using exact pixels to represent the rectangle lengths.
This is a 217 by 124 rectangle.
This is the overlap it has with a square of length equal to its shortest side.
We now chop off that part of the rectangle and find the largest square which fits the remaining 93 by 124 rectangle.
This is the new square's overlap.
This is the remaining 93 by 31 rectangle.
This is the new square's overlap.
This is the remaining 62 by 31 rectangle.
This is the new square's overlap.
This is the remaining 31 by 31 rectangle.
We have finally reached a square of side 31, which means that 31 is the greatest common divisor of 217 and 124.
Of course we don't really need to visualise anything. We can do this all numerically by just repeatedly subtracting the smallest number from the biggest and replacing the biggest with the difference. Here's a Python function that does that:
def gcd(a, b):
while a != b:
if a < b:
b = b - a
else:
a = a - b
return a
We can make this shorter by using pattern matching:
def gcd(a, b):
while a != b:
(a, b) = (min(a,b), max(a,b)-min(a,b))
return a
If one of the numbers is much bigger than the other, this program will waste a lot of time repeatedly subtracting the same number until the bigger number becomes smaller than the other. Fortunately this process can be done in a single operation by dividing and getting the remainder. The remainder is what's left after you can't subtract the second number from the first any more. This speeds the algorithm up considerably. The operation is called the modulo operation. With modulo the subtractions will not stop when both numbers are equal as modulo assumes that you can do one more subtraction and make the first number zero. Instead we will now keep on looping until the remainder is zero, in which case the last number you used to get the remainder was the greatest common divisor:
def gcd(a, b):
while b > 0:
(a, b) = (min(a,b), max(a,b)%min(a,b))
return a
Saturday, October 21, 2017
Hyperparameter tuning using Hyperopt
The problem with doing this is that unless you have several supercomputers at your disposal, testing the learning algorithm is a slow process and there are so many different ways to configure your hyperparameters. In the above example you'd have to try six different configurations: (1.0,(-10.0,10.0)), (1.0,(-1.0,1.0)), (0.1,(-10.0,10.0)), etc.. An exhaustive search (grid search) would take too long if each test takes very long and in practice you'd have a search space which is much bigger than six. We can test a random sample of the configurations but ideally the sample would be chosen intelligently rather than randomly. We could start out by trying some randomly chosen configurations and then start homing in on some of the more promising hyperparameter choices.
This is where the Python library Hyperopt comes in. It's a simple library that searches a space of hyperparameters in a rational way so that you'll end up with a better result after trying 100 configurations than if you just randomly sampled 100 different configurations and picked the best performing one. It does this by using a tree of Parzen Estimators.
Let's start with a simple gradient descent example that finds the minimum of "y = x^2". We'll write a function that takes in a learning rate, and a range within which to initialize "x" (we'll only ask for the maximum of this range and assume that the negative of this number is the minimum). We'll then apply gradient descent on the initial "x" value for 10 epochs. The function will finally return the value of "x" that was found near the minimum.
import random
def loss(x):
return x**2
def loss_grad(x):
return 2*x
def graddesc(learning_rate, max_init_x):
x = random.uniform(-max_init_x, max_init_x)
for _ in range(10):
x = x - learning_rate*loss_grad(x)
return x
Great, now we want to find the best hyperparameters to use. In practice we'd have a validation set for our machine learning model to learn to perform well on. Once the hyperparameters that result in the best performance on the validation set are found, we'd apply them to learn on a separate test set and it is this performance that is used to judge the quality of the learning algorithm. However, for this blog post we'll instead focus on the simpler mathematical optimization problem of finding the minimum of "y = x^2".
This is how you use Hyperopt to find the best hyperparameter combination. First you create a function called an objective function that takes in a tuple with the chosen hyperparameters and returns a dictionary that is read by hyperopt to assess the fitness of the chosen hyperparameters. Then you take this function and the possible hyperparameter choices you want to allow and pass them to the hyperopt function called "fmin" which finds the hyperparameters that give the smallest loss.
import hyperopt
def hyperopt_objective(hyperparams):
(learning_rate, max_init_x) = hyperparams
l = loss(graddesc(learning_rate, max_init_x))
return {
'loss': l,
'status': hyperopt.STATUS_OK,
}
best = hyperopt.fmin(
hyperopt_objective,
space = [
hyperopt.hp.choice('learning_rate', [ 1.0, 0.1, 0.01 ]),
hyperopt.hp.choice('max_init_x', [ 10.0, 1.0 ]),
],
algo = hyperopt.tpe.suggest,
max_evals = 10
)
print(best)
The output of this program is this:
{'learning_rate': 1, 'max_init_x': 1}
This is saying that the best loss is obtained when the learning rate is the item at index 1 in the given list (0.1) and the maximum initial value is the item at index 1 in the given list (1.0). The "space" parameter in fmin is there to say how to construct a combination of hyperparameters. We specified that we want a list of two things: a learning rate that can be either 1.0, or 0.1, or 0.01, and a maximum initial value that can be either 10.0 or 1.0. We use "hp.choice" to let fmin choose among a list. We could instead use "hp.uniform" in order to allow any number within a range. I prefer to use a list of human friendly numbers instead of allowing any number so I use the choice function instead. We have also said that we want to allow exactly 10 evaluations of the objective function in order to find the best hyperparameter combination.Although this is how we are expected to use this library, it is not very user friendly in general. For example there is no feedback given throughout the search process so if each evaluation of a hyperparameter combination takes hours to complete then we could end up waiting for several days without seeing anything, just waiting for the function to return a value. The return value also requires further processing as it's just indexes. We can make this better by adding some extra stuff to the objective function:
eval_num = 0
best_loss = None
best_hyperparams = None
def hyperopt_objective(hyperparams):
global eval_num
global best_loss
global best_hyperparams
eval_num += 1
(learning_rate, max_init_x) = hyperparams
l = loss(graddesc(learning_rate, max_init_x))
print(eval_num, l, hyperparams)
if best_loss is None or l < best_loss:
best_loss = l
best_hyperparams = hyperparams
return {
'loss': l,
'status': hyperopt.STATUS_OK,
}
We can now see each hyperparameter combination being evaluated. This is the output we'll see:
1 1.6868761146697238 (1.0, 10.0) 2 0.34976768426779775 (0.01, 1.0) 3 0.006508209785146999 (0.1, 1.0) 4 1.5999357079405185 (0.01, 10.0) 5 0.2646974732349057 (0.01, 1.0) 6 0.5182259594937579 (1.0, 10.0) 7 53.61565213613977 (1.0, 10.0) 8 1.8239879002601682 (1.0, 10.0) 9 0.15820396975495435 (0.01, 1.0) 10 0.784445725853568 (1.0, 1.0)
Also, the variable best_hyperparams will contain the tuple with the best hyperparameters found. Printing best_hyperparams will show "(0.1, 1.0)". We can even save the best hyperparameters found till now in a file so that we can stop the search early if we run out of patience.
Here is the full code in one place:
import random
import hyperopt
def loss(x):
return x**2
def loss_grad(x):
return 2*x
def graddesc(learning_rate, max_init_x):
x = random.uniform(-max_init_x, max_init_x)
for _ in range(10):
x = x - learning_rate*loss_grad(x)
return x
eval_num = 0
best_loss = None
best_hyperparams = None
def hyperopt_objective(hyperparams):
global eval_num
global best_loss
global best_hyperparams
eval_num += 1
(learning_rate, max_init_x) = hyperparams
l = loss(graddesc(learning_rate, max_init_x))
print(eval_num, l, hyperparams)
if best_loss is None or l < best_loss:
best_loss = l
best_hyperparams = hyperparams
return {
'loss': l,
'status': hyperopt.STATUS_OK,
}
hyperopt.fmin(
hyperopt_objective,
space = [
hyperopt.hp.choice('learning_rate', [ 1.0, 0.1, 0.01 ]),
hyperopt.hp.choice('max_init_x', [ 10.0, 1.0 ]),
],
algo = hyperopt.tpe.suggest,
max_evals = 10
)
print(best_hyperparams)
To find out more about Hyperopt see this documentation page and the Github repository.
Sunday, June 25, 2017
Display all outputs in Python immediately when in cmd/terminal
If it's just one print statement you're concerned with then you only need to import the "sys" module and call "sys.stdout.flush()" in order to force the program to show anything that has been printed but is still hidden in the buffer.
import sys
print('bla bla bla')
sys.stdout.flush()
But most of the time you want this to always happen and not after some particular point in the program. To force Python to always show whatever is printed just add the command line option "-u" when running python.
> python -u myscript.py
You can read more about it here:
https://docs.python.org/3.6/using/cmdline.html#cmdoption-u
Tuesday, May 30, 2017
Neural language models and how to make them in Tensorflow 1.0
A neural language model is a neural network that, given a prefix of a sentence, will output the probability that a word will be the next word in the sentence. For example, given the prefix "the dog", the neural network will tell you that "barked" has a high probability of being the next word. This is done by using a recurrent neural network (RNN), such as a long short term memory (LSTM), to turn the prefix into a single vector (that represents the prefix) which is then passed to a softmax layer that gives a probability distribution over every word in the known vocabulary.
In order to be able to still have prefix before the first word (to be able to predict a first word in the sentence) we will make use of an artificial "<BEG>" word that represents the beginning of a sentence and is placed before every sentence. Likewise we will have an artificial "<END>" word that represents the end of sentence in order to be able to predict when the sentence ends.

It is important to note that the words are presented to the neural network as vectors and not as actual strings. The vectors are called word "embeddings" and are derived from a matrix where each row stands for a different word in the vocabulary. This matrix is trained just like the neural network's weights in order to provide the best vector representation for each word.
The probabilities are not given explicitly during training. Instead we just expose the neural network to complete sentences taken from a corpus (we will use the Brown corpus from NLTK) and let it learn the probabilities indirectly. Before training, each word will have an approximately equal probability given any prefix. The training procedure works by inputting a prefix of a sentence at one end of the neural network and then forcing it to increase the probability of the given next word in the sentence by a little bit. Since the probabilities of all the words in the output must add up to 1, increasing the probability of the given next word will also decrease the probability of all the other words by a little bit. By repeatedly increasing the probability of the correct word, the distribution will accumulate at the correct word.

Of course given a prefix there will be several words that can follow and not just one. If each one is increased just a bit in turn repeatedly, all the known correct next words will increase together and all the other words will decrease, forcing the probability distribution to share the peak among all the correct words. If the correct words occur at different frequencies given the prefix, the most frequent word will get the most probability (due to increasing its probability more often) whilst the rest of the correct words will get a fraction of that probability in proportion to their relative frequencies.
The most common way to train neural language models is not actually to use a training set of prefixes and next words, but to use full sentences. Upon being inputted with a sequence of vectors, an RNN will output another sequence of vectors. The nth output vector represents the prefix of the input sequence up to the nth input.

This means that we can present the neural network with a sentence, which gets turned into a sequence of vectors by the RNN, each of which gets turned into a probability distribution by the softmax. The training objective is to make the neural network predict which is the next word in the sentence after every prefix (including the end of sentence token at the end). This is done with all the sentences in the corpus so that the neural network is forced to extract some kind of pattern in the language of the sentences and be able to predict the next word in any sentence prefix.
If we always provide a correct prefix during training, then we are using a training method called "teacher forcing", where the neural network is only trained to deal with correct prefixes. This is the simplest method (and the method we will be using in this blog post) but it also introduces a bias to the neural network as it might not always be exposed to correct prefixes. Let's say that the neural network is going to be used to generate sentences by, for example, picking the most probable word given the prefix and then adding the chosen word to the end of the prefix. We can repeatedly do this until we choose the end of sentence token, at which point we should have a complete sentence. The problem with teacher forcing is that if the neural network makes one mistake during the generation process and picks a non-sense word as a most probable next word, then the rest of the sentence will probably also be garbage as it was never trained on sentences with mistakes.
One way to deal with this is to include not only prefixes in the training sentences by also prefixes with some of the words replaced by words chosen by the still-in-training neural network and force it to still give a higher probability to the correct next word. This is called scheduled sampling. Another way to deal with this is to take the training prefixes and some generated prefixes (from the still-in-training neural net) and take their vector representation from the RNN. Generative adversarial training will then be used to make the RNN represent both groups of vectors similarly. This forces the RNN to be fault tolerant to prefixes with errors and to be able to represent them in a way that can lead to correct next words. This is called professor forcing.
This is the full code that you can execute to get a Tensorflow neural language model:
from __future__ import absolute_import, division, print_function, unicode_literals
from builtins import ascii, bytes, chr, dict, filter, hex, input, int, map, next, oct, open, pow, range, round, str, super, zip
import tensorflow as tf
import numpy as np
import random
import timeit
import collections
import nltk
TRAINING_SESSION = True
rand = random.Random(0)
embed_size = 256
state_size = 256
max_epochs = 100
minibatch_size = 20
min_token_freq = 3
run_start = timeit.default_timer()
print('Loading raw data...')
all_seqs = [ [ token.lower() for token in seq ] for seq in nltk.corpus.brown.sents() if 5 <= len(seq) <= 20 ]
rand.shuffle(all_seqs)
all_seqs = all_seqs[:20000]
trainingset_size = round(0.9*len(all_seqs))
validationset_size = len(all_seqs) - trainingset_size
train_seqs = all_seqs[:trainingset_size]
val_seqs = all_seqs[-validationset_size:]
print('Training set size:', trainingset_size)
print('Validation set size:', validationset_size)
all_tokens = (token for seq in train_seqs for token in seq)
token_freqs = collections.Counter(all_tokens)
vocab = sorted(token_freqs.keys(), key=lambda token:(-token_freqs[token], token))
while token_freqs[vocab[-1]] < min_token_freq:
vocab.pop()
vocab_size = len(vocab) + 2 # + edge and unknown tokens
token_to_index = { token: i+2 for (i, token) in enumerate(vocab) }
index_to_token = { i+2: token for (i, token) in enumerate(vocab) }
edge_index = 0
unknown_index = 1
print('Vocabulary size:', vocab_size)
def parse(seqs):
indexes = list()
lens = list()
for seq in seqs:
indexes_ = [ token_to_index.get(token, unknown_index) for token in seq ]
indexes.append(indexes_)
lens.append(len(indexes_)+1) #add 1 due to edge token
maxlen = max(lens)
in_mat = np.zeros((len(indexes), maxlen))
out_mat = np.zeros((len(indexes), maxlen))
for (row, indexes_) in enumerate(indexes):
in_mat [row,:len(indexes_)+1] = [edge_index]+indexes_
out_mat[row,:len(indexes_)+1] = indexes_+[edge_index]
return (in_mat, out_mat, np.array(lens))
(train_seqs_in, train_seqs_out, train_seqs_len) = parse(train_seqs)
(val_seqs_in, val_seqs_out, val_seqs_len) = parse(val_seqs)
print('Training set max length:', train_seqs_in.shape[1]-1)
print('Validation set max length:', val_seqs_in.shape[1]-1)
################################################################
print()
print('Training...')
#Full correct sequence of token indexes with start token but without end token.
seq_in = tf.placeholder(tf.int32, shape=[None, None], name='seq_in') #[seq, token]
#Length of sequences in seq_in.
seq_len = tf.placeholder(tf.int32, shape=[None], name='seq_len') #[seq]
tf.assert_equal(tf.shape(seq_in)[0], tf.shape(seq_len)[0])
#Full correct sequence of token indexes without start token but with end token.
seq_target = tf.placeholder(tf.int32, shape=[None, None], name='seq_target') #[seq, token]
tf.assert_equal(tf.shape(seq_in), tf.shape(seq_target))
batch_size = tf.shape(seq_in)[0] #Number of sequences to process at once.
num_steps = tf.shape(seq_in)[1] #Number of tokens in generated sequence.
#Mask of which positions in the matrix of sequences are actual labels as opposed to padding.
token_mask = tf.cast(tf.sequence_mask(seq_len, num_steps), tf.float32) #[seq, token]
with tf.variable_scope('prefix_encoder'):
#Encode each sequence prefix into a vector.
#Embedding matrix for token vocabulary.
embeddings = tf.get_variable('embeddings', [ vocab_size, embed_size ], tf.float32, tf.contrib.layers.xavier_initializer()) #[vocabulary token, token feature]
#3D tensor of tokens in sequences replaced with their corresponding embedding vector.
embedded = tf.nn.embedding_lookup(embeddings, seq_in) #[seq, token, token feature]
#Use an LSTM to encode the generated prefix.
init_state = tf.contrib.rnn.LSTMStateTuple(c=tf.zeros([ batch_size, state_size ]), h=tf.zeros([ batch_size, state_size ]))
cell = tf.contrib.rnn.BasicLSTMCell(state_size)
prefix_vectors = tf.nn.dynamic_rnn(cell, embedded, sequence_length=seq_len, initial_state=init_state, scope='rnn')[0] #[seq, prefix, prefix feature]
with tf.variable_scope('softmax'):
#Output a probability distribution over the token vocabulary (including the end token).
W = tf.get_variable('W', [ state_size, vocab_size ], tf.float32, tf.contrib.layers.xavier_initializer())
b = tf.get_variable('b', [ vocab_size ], tf.float32, tf.zeros_initializer())
logits = tf.reshape(tf.matmul(tf.reshape(prefix_vectors, [ -1, state_size ]), W) + b, [ batch_size, num_steps, vocab_size ])
predictions = tf.nn.softmax(logits) #[seq, prefix, token]
losses = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=seq_target, logits=logits) * token_mask
total_loss = tf.reduce_sum(losses)
train_step = tf.train.AdamOptimizer().minimize(total_loss)
saver = tf.train.Saver()
sess = tf.Session()
if TRAINING_SESSION:
sess.run(tf.global_variables_initializer())
print('epoch', 'val loss', 'duration', sep='\t')
epoch_start = timeit.default_timer()
validation_loss = 0
for i in range(len(val_seqs)//minibatch_size):
minibatch_validation_loss = sess.run(total_loss, feed_dict={
seq_in: val_seqs_in [i*minibatch_size:(i+1)*minibatch_size],
seq_len: val_seqs_len[i*minibatch_size:(i+1)*minibatch_size],
seq_target: val_seqs_out[i*minibatch_size:(i+1)*minibatch_size],
})
validation_loss += minibatch_validation_loss
print(0, round(validation_loss, 3), round(timeit.default_timer() - epoch_start), sep='\t')
last_validation_loss = validation_loss
saver.save(sess, './model')
trainingset_indexes = list(range(len(train_seqs)))
for epoch in range(1, max_epochs+1):
epoch_start = timeit.default_timer()
rand.shuffle(trainingset_indexes)
for i in range(len(trainingset_indexes)//minibatch_size):
minibatch_indexes = trainingset_indexes[i*minibatch_size:(i+1)*minibatch_size]
sess.run(train_step, feed_dict={
seq_in: train_seqs_in [minibatch_indexes],
seq_len: train_seqs_len[minibatch_indexes],
seq_target: train_seqs_out[minibatch_indexes],
})
validation_loss = 0
for i in range(len(val_seqs)//minibatch_size):
minibatch_validation_loss = sess.run(total_loss, feed_dict={
seq_in: val_seqs_in [i*minibatch_size:(i+1)*minibatch_size],
seq_len: val_seqs_len[i*minibatch_size:(i+1)*minibatch_size],
seq_target: val_seqs_out[i*minibatch_size:(i+1)*minibatch_size],
})
validation_loss += minibatch_validation_loss
if validation_loss > last_validation_loss:
break
last_validation_loss = validation_loss
saver.save(sess, './model')
print(epoch, round(validation_loss, 3), round(timeit.default_timer() - epoch_start), sep='\t')
print(epoch, round(validation_loss, 3), round(timeit.default_timer() - epoch_start), sep='\t')
################################################################
print()
print('Evaluating...')
saver.restore(sess, tf.train.latest_checkpoint('.'))
def seq_prob(seq):
seq_indexes = [ token_to_index.get(token, unknown_index) for token in seq ]
outputs = sess.run(predictions, feed_dict={
seq_in: [ [ edge_index ] + seq_indexes ],
seq_len: [ 1+len(seq) ],
})[0]
probs = outputs[np.arange(len(outputs)), seq_indexes+[ edge_index ]]
return np.prod(probs)
print('P(the dog barked.) =', seq_prob(['the', 'dog', 'barked', '.']))
print('P(the cat barked.) =', seq_prob(['the', 'cat', 'barked', '.']))
print()
def next_tokens(prefix):
prefix_indexes = [ token_to_index.get(token, unknown_index) for token in prefix ]
probs = sess.run(predictions, feed_dict={
seq_in: [ [ edge_index ] + prefix_indexes ],
seq_len: [ 1+len(prefix) ],
})[0][-1]
token_probs = list(zip(probs, ['<end>', '<unk>']+vocab))
return token_probs
print('the dog ...', sorted(next_tokens(['the', 'dog']), reverse=True)[:5])
print()
def greedy_gen():
prefix = []
for _ in range(100):
probs = sorted(next_tokens(prefix), reverse=True)
(_, next_token) = probs[0]
if next_token == '<unk>':
(_, next_token) = probs[1]
elif next_token == '<end>':
break
else:
prefix.append(next_token)
return prefix
print('Greedy generation:', ' '.join(greedy_gen()))
print()
def random_gen():
prefix = []
for _ in range(100):
probs = next_tokens(prefix)
(unk_prob, _) = probs[unknown_index]
r = rand.random() * (1.0 - unk_prob)
total = 0.0
for (prob, token) in probs:
if token != '<unk>':
total += prob
if total >= r:
break
if token == '<end>':
break
else:
prefix.append(token)
return prefix
print('Random generation:', ' '.join(random_gen()))
print()
This section explains snippets of the code.
The first thing we need to do is create the training and validation sets. We will use the Brown corpus from NLTK as data. Since the purpose of this tutorial is to quickly train a neural language model on a normal computer, we will work with a subset of the corpus so that training will be manageable. We will only take sentences that are between 5 and 20 tokens long and only use a random sample of 20000 sentences from this pool. From this we will take a random 10% to be used for the validation set and the rest will be used for the training set.
all_seqs = [ [ token.lower() for token in seq ] for seq in nltk.corpus.brown.sents() if 5 <= len(seq) <= 20 ] rand.shuffle(all_seqs) all_seqs = all_seqs[:20000] trainingset_size = round(0.9*len(all_seqs)) validationset_size = len(all_seqs) - trainingset_size train_seqs = all_seqs[:trainingset_size] val_seqs = all_seqs[-validationset_size:]
Next we need to get the vocabulary from the training set. This will consist of all the words that occur frequently in the training set sentences, with the rare words being replaced by an "unknown" token. This will allow the neural network to be able to work with out-of-vocabulary words as they will be represented as "<unk>" and the network will ave seen this token in the training sentences. Each vocabulary word will be represented by an index, with "0" representing the beginning and end token, "1" representing the unknown token, "2" representing the most frequent vocabulary word, "3" the second most frequent word, and so on.
all_tokens = (token for seq in train_seqs for token in seq)
token_freqs = collections.Counter(all_tokens)
vocab = sorted(token_freqs.keys(), key=lambda token:(-token_freqs[token], token))
while token_freqs[vocab[-1]] < min_token_freq:
vocab.pop()
vocab_size = len(vocab) + 2 # + edge and unknown tokens
token_to_index = { token: i+2 for (i, token) in enumerate(vocab) }
index_to_token = { i+2: token for (i, token) in enumerate(vocab) }
edge_index = 0
unknown_index = 1
Finally we need to turn all the sentences in the training and validation set into a matrix of indexes where each row represents a sentence. Since different sentences have different lengths, we will make the matrix as wide as the longest sentence and use 0 to pad each sentence into uniform length (pads are added to the end of the sentences). To know where a sentence ends we will also keep track of the sentence lengths. For training we will need to use an input matrix and a target matrix. The input matrix contains sentences that start with the start-of-sentence token in every row whilst the target matrix contains sentences that end with the end-of-sentence token in every row. The former is used to pass as input to the neural net during training whilst the latter is used to tell which is the correct output of each sentence prefix.
def parse(seqs):
indexes = list()
lens = list()
for seq in seqs:
indexes_ = [ token_to_index.get(token, unknown_index) for token in seq ]
indexes.append(indexes_)
lens.append(len(indexes_)+1) #add 1 due to edge token
maxlen = max(lens)
in_mat = np.zeros((len(indexes), maxlen))
out_mat = np.zeros((len(indexes), maxlen))
for (row, indexes_) in enumerate(indexes):
in_mat [row,:len(indexes_)+1] = [edge_index]+indexes_
out_mat[row,:len(indexes_)+1] = indexes_+[edge_index]
return (in_mat, out_mat, np.array(lens))
(train_seqs_in, train_seqs_out, train_seqs_len) = parse(train_seqs)
(val_seqs_in, val_seqs_out, val_seqs_len) = parse(val_seqs)
The Tensorflow neural network model we shall implement will accept a batch of sentences at once. This means that the input will be a matrix of integers where each row is a sentence and each integer is a word index. Since both the number of sentences and the sentence length are variable we will use "None" as a dimension size. We will then get the size using the "tf.shape" function. The sentences on their own are not enough as an input as we also need to provide the sentence lengths as a vector. The length of this vector needs to be as long as the number of rows in the sequences (one for each sentence). These lengths will be used to generate a mask matrix of 0s and 1s where a "1" indicates the presence of a token in the sequence matrix whilst a "0" indicates the presence of a pad token. This is generated by the "tf.sequence_mask" function.
#Full correct sequence of token indexes with start token but without end token. seq_in = tf.placeholder(tf.int32, shape=[None, None], name='seq_in') #[seq, token] #Length of sequences in seq_in. seq_len = tf.placeholder(tf.int32, shape=[None], name='seq_len') #[seq] tf.assert_equal(tf.shape(seq_in)[0], tf.shape(seq_len)[0]) #Full correct sequence of token indexes without start token but with end token. seq_target = tf.placeholder(tf.int32, shape=[None, None], name='seq_target') #[seq, token] tf.assert_equal(tf.shape(seq_in), tf.shape(seq_target)) batch_size = tf.shape(seq_in)[0] #Number of sequences to process at once. num_steps = tf.shape(seq_in)[1] #Number of tokens in generated sequence. #Mask of which positions in the matrix of sequences are actual labels as opposed to padding. token_mask = tf.cast(tf.sequence_mask(seq_len, num_steps), tf.float32) #[seq, token]
Now that the inputs are handled we need to process the prefixes of the sequences into prefix vectors using an LSTM. We first convert the token indexes into token vectors. This is done using an embedding matrix where the first row of the matrix is the word vector of the first word in the vocabulary, the second for the second word, and so on. This is then passed to an LSTM that is initialized with zero-vectors for both the cell state and the hidden state. We pass in the sequence lengths to keep the RNN from interpreting pad values.
with tf.variable_scope('prefix_encoder'):
#Encode each sequence prefix into a vector.
#Embedding matrix for token vocabulary.
embeddings = tf.get_variable('embeddings', [ vocab_size, embed_size ], tf.float32, tf.contrib.layers.xavier_initializer()) #[vocabulary token, token feature]
#3D tensor of tokens in sequences replaced with their corresponding embedding vector.
embedded = tf.nn.embedding_lookup(embeddings, seq_in) #[seq, token, token feature]
#Use an LSTM to encode the generated prefix.
init_state = tf.contrib.rnn.LSTMStateTuple(c=tf.zeros([ batch_size, state_size ]), h=tf.zeros([ batch_size, state_size ]))
cell = tf.contrib.rnn.BasicLSTMCell(state_size)
prefix_vectors = tf.nn.dynamic_rnn(cell, embedded, sequence_length=seq_len, initial_state=init_state, scope='rnn')[0] #[seq, prefix, prefix feature]
Next we need to take the prefix vectors and derive probabilities for the next word in every prefix. Since the prefix vectors are a 3D tensor and the weights matrix is a 2D tensor we have to first reshape the prefix vectors into a 2D tensor before multiplying them together. Following this we will then reshape the resulting 2D tensor back into a 3D tensor and apply softmax to it.
with tf.variable_scope('softmax'):
#Output a probability distribution over the token vocabulary (including the end token).
W = tf.get_variable('W', [ state_size, vocab_size ], tf.float32, tf.contrib.layers.xavier_initializer())
b = tf.get_variable('b', [ vocab_size ], tf.float32, tf.zeros_initializer())
logits = tf.reshape(tf.matmul(tf.reshape(prefix_vectors, [ -1, state_size ]), W) + b, [ batch_size, num_steps, vocab_size ])
predictions = tf.nn.softmax(logits) #[seq, prefix, token]
Now comes the part that has to do with training the model. We need to add the loss function which is masked to ignore padding tokens. We will take the sum crossentropy and apply the Adam optimizer to it. Crossentropy measures how close to 1.0 the probability of the correct next word in the softmax is. This allows us to maximize the correct probability which is done by using Adam, an optimization technique that works using gradient descent but with the learning rate being adapted for every weight. We would also like to be able to save our model weights during training in order to reuse them later. Finally we create a Tensorflow session variable and use it to initialize all the model weights.
losses = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=seq_target, logits=logits) * token_mask
total_loss = tf.reduce_sum(losses)
train_step = tf.train.AdamOptimizer().minimize(total_loss)
saver = tf.train.Saver()
sess = tf.Session()
sess.run(tf.global_variables_initializer())
The second thing we should do is measure how well the randomly initialised neural net performs in terms of the sum crossentropy. In order to avoid running out of memory, instead of putting all the validation set data as input to the neural net we will break it up into minibatches of a fixed size and find the total loss of all the minibatches together. This final loss value will be placed in a variable called "last_validation_loss" in order to be able to track how the loss is progressing as training goes on. The last line is to save the weights of the neural network in the same folder as the code and give the files (there will be several files with different extensions) the file name "model".
validation_loss = 0
for i in range(len(val_seqs)//minibatch_size):
minibatch_validation_loss = sess.run(total_loss, feed_dict={
seq_in: val_seqs_in [i*minibatch_size:(i+1)*minibatch_size],
seq_len: val_seqs_len[i*minibatch_size:(i+1)*minibatch_size],
seq_target: val_seqs_out[i*minibatch_size:(i+1)*minibatch_size],
})
validation_loss += minibatch_validation_loss
last_validation_loss = validation_loss
saver.save(sess, './model')
Next we'll do the same thing but on the training set and we'll run the "train_step" operation instead of the "total_loss" operation in order to actually optimize the weights into a smaller "total_loss" value. It is more beneficial to take randomly sampled minibatches instead of just breaking the training set into deterministically chosen groups, so we use an array of indexes called "trainingset_indexes" to determine which training pairs will make it to the next minibatch. We randomly shuffle these indexes and then break it into fixed size groups. The indexes in the next group are used to choose the training pairs are used for the next minibatch. Following this we will again calculate the new loss value on the validation set to see how we're progressing. If the new loss is worse than the previous loss then we stop the training. Otherwise we save the model weights and continue training. This is called early stopping. There is a hard limit set to the number of epochs to run in order to avoid training for too long.
trainingset_indexes = list(range(len(train_seqs)))
for epoch in range(1, max_epochs+1):
rand.shuffle(trainingset_indexes)
for i in range(len(trainingset_indexes)//minibatch_size):
minibatch_indexes = trainingset_indexes[i*minibatch_size:(i+1)*minibatch_size]
sess.run(train_step, feed_dict={
seq_in: train_seqs_in [minibatch_indexes],
seq_len: train_seqs_len[minibatch_indexes],
seq_target: train_seqs_out[minibatch_indexes],
})
validation_loss = 0
for i in range(len(val_seqs)//minibatch_size):
minibatch_validation_loss = sess.run(total_loss, feed_dict={
seq_in: val_seqs_in [i*minibatch_size:(i+1)*minibatch_size],
seq_len: val_seqs_len[i*minibatch_size:(i+1)*minibatch_size],
seq_target: val_seqs_out[i*minibatch_size:(i+1)*minibatch_size],
})
validation_loss += minibatch_validation_loss
if validation_loss > last_validation_loss:
break
last_validation_loss = validation_loss
saver.save(sess, './model')
We can now use the trained neural network. We first restore the last saved model weights which are the ones that gave the best validation loss and then we will define two functions: one for getting the probability of a whole sequence and the other for getting the next token after a prefix. "seq_prob" works by getting every prefix's softmax output, getting the probability of each token in the sequence after each prefix and then multiplying them together. "next_tokens" works by passing a prefix to the neural network and only getting the softmax output of the last (longest) prefix. The probabilities and corresponding tokens are returned.
saver.restore(sess, tf.train.latest_checkpoint('.'))
def seq_prob(seq):
seq_indexes = [ token_to_index.get(token, unknown_index) for token in seq ]
outputs = sess.run(predictions, feed_dict={
seq_in: [ [ edge_index ] + seq_indexes ],
seq_len: [ 1+len(seq) ],
})[0]
probs = outputs[np.arange(len(outputs)), seq_indexes+[ edge_index ]]
return np.prod(probs)
print('P(the dog barked.) =', seq_prob(['the', 'dog', 'barked', '.']))
print('P(the cat barked.) =', seq_prob(['the', 'cat', 'barked', '.']))
print()
def next_tokens(prefix):
prefix_indexes = [ token_to_index.get(token, unknown_index) for token in prefix ]
probs = sess.run(predictions, feed_dict={
seq_in: [ [ edge_index ] + prefix_indexes ],
seq_len: [ 1+len(prefix) ],
})[0][-1]
token_probs = list(zip(probs, ['<end>', '<unk>']+vocab))
return token_probs
print('the dog ...', sorted(next_tokens(['the', 'dog']), reverse=True)[:5])
print()
These are the outputs I got:
P(the dog barked.) = 1.71368e-08 P(the cat barked.) = 6.16375e-10 the dog ... [(0.097657956, 'was'), (0.089791521, '<unk>'), (0.058101207, 'is'), (0.055007596, 'had'), (0.02786131, 'could')]
We can extend "next_tokens" to generate whole sentences using one of two ways: generating the most probable sentence or generating a randomly sampled sentence. For the first we are going to use greedy search which chooses the most probable word given a prefix and adds it to the prefix. This will not give the most probable sentence but it should be close (use beam search for a better selection method). For the second function we want to choose words at random but based on their probability such that rare words are rarely chosen. This is called sampling sentences (the probability of sampling a particular sentence is equal to the probability of the sentence). We did this using roulette selection. For both functions we left out the unknown token during generation and gave a hard maximum word limit of 100 words.
def greedy_gen():
prefix = []
for _ in range(100):
probs = sorted(next_tokens(prefix), reverse=True)
(_, next_token) = probs[0]
if next_token == '<unk>':
(_, next_token) = probs[1]
elif next_token == '<end>':
break
else:
prefix.append(next_token)
return prefix
print('Greedy generation:', ' '.join(greedy_gen()))
print()
def random_gen():
prefix = []
for _ in range(100):
probs = next_tokens(prefix)
(unk_prob, _) = probs[unknown_index]
r = rand.random() * (1.0 - unk_prob)
total = 0.0
for (prob, token) in probs:
if token != '<unk>':
total += prob
if total >= r:
break
if token == '<end>':
break
else:
prefix.append(token)
return prefix
print('Random generation:', ' '.join(random_gen()))
print()
These are the outputs I got:
Greedy generation: `` i don't know '' . Random generation: miss the road place , title comes to the seeds of others to many and details under the dominant home



