In [1]:
# Cell A
# load data
import os
github_path = "https://raw.githubusercontent.com/najoungkim/COGS/refs/heads/main/data"
data = {}
for name in ['train', 'dev', 'test', 'gen']:
    url = os.path.join(github_path, '{}.tsv'.format(name))
    filepath = '{}.tsv'.format(name)
    if not os.path.isfile(filepath):
        os.system('wget {}'.format(url))
    assert os.path.isfile(filepath)
    lines = open(filepath, 'r')
    lines = map(lambda x: x.strip(), lines)
    lines = map(lambda x: x.split('\t')[0], lines)
    data[name] = list(lines)

# look at the data
for name in ['train', 'dev', 'test', 'gen']:
    print('=== {} ({} sentences)==='.format(name.upper(), len(data[name])))
    print('\n'.join(data[name][:10]))

--2025-09-16 11:16:54--  https://raw.githubusercontent.com/najoungkim/COGS/refs/heads/main/data/train.tsv
Résolution de raw.githubusercontent.com (raw.githubusercontent.com)… 2606:50c0:8001::154, 2606:50c0:8003::154, 2606:50c0:8000::154, ...
Connexion à raw.githubusercontent.com (raw.githubusercontent.com)|2606:50c0:8001::154|:443… connecté.
requête HTTP transmise, en attente de la réponse… 200 OK
Taille : 4146574 (4,0M) [text/plain]
Enregistre : ‘train.tsv’

     0K .......... .......... .......... .......... ..........  1% 2,59M 2s
    50K .......... .......... .......... .......... ..........  2% 4,19M 1s
   100K .......... .......... .......... .......... ..........  3% 6,73M 1s
   150K .......... .......... .......... .......... ..........  4% 14,7M 1s
   200K .......... .......... .......... .......... ..........  6% 6,73M 1s
   250K .......... .......... .......... .......... ..........  7% 8,86M 1s
   300K .......... .......... .......... .......... ..........  8% 11,1M 1s
   3

=== TRAIN (24155 sentences)===
A rose was helped by a dog .
The sailor dusted a boy .
Emma rolled a teacher .
Evelyn rolled the girl .
A cake was forwarded to Levi by Charlotte .
The captain ate .
The girl needed to cook .
A cake rolled .
The cookie was passed to Emma .
Emma ate the ring beside a bed .
=== DEV (3000 sentences)===
Liam hoped that a box was burned by a girl .
The donkey lended the cookie to a mother .
A melon was given to a girl by the guard .
A donut was given to a butterfly .
A rose was mailed to Isabella .
The girl offered the weapon beside a machine to a chicken .
A donut was touched by Emma .
Liam painted a box on a table beside the chair .
Emma ate a hammer .
A drink was juggled .
=== TEST (3000 sentences)===
Mila liked that the cake was offered to Emma .
A coach supported that the cake was snapped .
The moose wanted to read .
A box was given to the cat by Aiden .
Emma cleaned the boy .
The dog painted the boy .
The sailor cleaned .
The customer held the priest in 

....... .......... .......... 96% 16,0M 0s
  5200K .......... .......... .......... .......... .......... 97% 6,82M 0s
  5250K .......... .......... .......... .......... .......... 98%  118M 0s
  5300K .......... .......... .......... .......... .......... 99% 17,5M 0s
  5350K .......... .......... .......... .......... .......... 99% 30,4M 0s
  5400K ..                                                    100% 20,1M=0,4s

2025-09-16 11:17:01 (12,7 MB/s) - ‘gen.tsv’ enregistré [5532102/5532102]



In [None]:
# Cell B
# Tokenization
# The final period is separated with a white space from the final word. This is a basic form of tokenization.
# The same is done as a pre-processing step with all punctuation (comma, question mark...).
# Turn sentences into lists of tokens: (here, token = word or punctuation mark; more generally, any sequence of ascii considered a unit)
for name in data:
    data[name] = list(map(lambda x: x.split(), data[name]))
# Normalization
# Here, uppercase vs lowercase is not so useful, and it just increases the size of the vocabulary (see below).
# So, let's turn all words into lowercase. Other possible normalization includes stemming.
# After this last stage of pre-processing, we have tokens.
tokens = {}
for name in data:
    tokens[name] = list(map(lambda x: [y.lower() for y in x], data[name]))
# Finally, add beginning-of-sentence and end-of-sentence tokens
for name in tokens:
    tokens[name] = list(map(lambda x: ['<bos>'] + x + ['<eos>'], tokens[name]))

In [None]:
# Cell C
# Vocabulary
# Let's set up the alphabet (noted V during the lecture), usually called vocabulary to avoid confusion
class Vocabulary:
    def __init__(self):
        self.vocab = set()
    def add(self, word):
        self.vocab.add(word)
    def process(self, sentence):
        assert isinstance(sentence, list) or isinstance(sentence, tuple)
        assert all(isinstance(x, str) for x in sentence)
        self.vocab.update(set(sentence))
    def size(self):
        return len(self.vocab)
# Create the vocabulary from the training data
vocabulary = Vocabulary()
for sentence in tokens['train']:
    vocabulary.process(sentence)
# Q1: What is the size of the vocabulary?

In [None]:
# Cell D
# n-gram model
import math

class Probability:
    # special class to store small probabilities
    def __init__(self, p, log_scale = False):
        if log_scale:
            assert p >= 0, p
            self.value = p
        else:
            assert p <= 1, p
            assert p >= 0, p
            if p == 0:
                self.value = float('inf')
            else:
                self.value = -math.log(p)

    def get_p(self):
        p = math.exp(-self.value)
        assert p >= 0, p
        assert p <= 1, p
        return p

    def multiply(self, p):
        assert isinstance(p, Probability)
        self.value += p.value
        
class NgramCounter:
    def __init__(self, n, smoothing=0):
        assert isinstance(n, int)
        assert n > 0
        assert isinstance(smoothing, int)
        assert smoothing >= 0
        self.n = n # the length of ngrams that we store
        self.smoothing = smoothing
        self.ngrams = {}
        self.vocab = Vocabulary()

    def count(self, ngram):
        assert isinstance(ngram, tuple)
        assert len(ngram) <= self.n
        self.ngrams.setdefault(ngram, 0)
        self.ngrams[ngram] += 1
        self.vocab.process(ngram)

    def shorten_history(self, history, max_length=None):
        if max_length is None:
            max_length = self.n-1
        if max_length == 0:
            return []
        if len(history) > max_length:
            history = history[-max_length:]
        return history
        
    def process(self, tokens):
        assert isinstance(tokens, list)
        assert all(isinstance(x, str) for x in tokens)
        for tt in range(len(tokens)):
            for ll in range(1,self.n+1): # we need to count all lengths of ngrams
                if tt+ll > len(tokens):
                    break
                ngram = tokens[tt:tt+ll]
                self.count(tuple(ngram))

    def get_count(self, ngram, history=False):
        if isinstance(ngram, str):
            count = self.ngrams.get((ngram,), 0)
        else:
            count = self.ngrams.get(tuple(ngram), 0)
        if history:
            count += self.smoothing*self.vocab.size()
        else:
            count += self.smoothing
        return count

    def get_unigram_count(self):
        output = 0
        for ngram, count in self.ngrams.items():
            if len(ngram) == 1:
                output += count
        output += self.smoothing*self.vocab.size()
        return output
    
    def conditional_probability(self, token, history):
        # return p(token | history)
        assert isinstance(token, str)
        assert isinstance(history, list)
        assert all(isinstance(x, str) for x in history)
        history = self.shorten_history(history)
        ngram = tuple(history) + (token,)
        count = self.get_count(ngram)
        assert count is not None, ngram
        if len(history) == 0:
            Z = self.get_unigram_count()
            assert Z > 0, Z
            return Probability(count/Z)
        else:
            Z = self.get_count(history, history=True)
            assert Z > 0, '{}: {}'.format(history, Z)
            return Probability(count/Z)
    
    def probability(self, tokens):
        logprob = self.compute_logprob(tokens) 
        return logprob.get_p()

    def compute_logprob(self, tokens):
        probs = [None for _ in tokens]
        for tt in range(len(tokens)):
            history = tokens[:tt]
            cp = self.conditional_probability(tokens[tt], history)
            probs[tt] = cp
        assert all(x is not None for x in probs)
        logprob = sum(x.value for x in probs)
        logprob = Probability(logprob, log_scale = True)
        return logprob

    def perplexity(self, tokens):
        logprob = self.compute_logprob(tokens)
        perplexity = Probability(logprob.value/len(tokens), log_scale = True)
        return perplexity.value

    def get_top_ngrams(self, length=None, n=10):
        ngrams = sorted(self.ngrams.items(), key=lambda x: (-x[1], x[0]))
        if length is not None:
            ngrams = list(filter(lambda x: len(x[0]) == length, ngrams))
        return ngrams[:n] 

In [None]:
# Cell E
# build ngram model
def fit(model, data):
    for sentence in data:
        model.process(sentence)
ngram_model = NgramCounter(25)
fit(ngram_model, tokens['train'])

In [None]:
# Cell F
# Q2: What do you expect the following probabilities to be? Why?
print(ngram_model.conditional_probability('<eos>', ['.']).get_p())
print(ngram_model.conditional_probability('<eos>', ['<bos>']).get_p())

In [None]:
# Cell G
# Look at the top ngrams
for n in range(1,ngram_model.n+1):
    print('=== {} ==='.format(n))
    top_ngrams = ngram_model.get_top_ngrams(length=n)
    for x in top_ngrams:
        print('{}: {}'.format(*x))

In [None]:
# Cell H
# Zipf's law https://en.wikipedia.org/wiki/Zipf's_law
# Q3: Run this code to produce a plot. What does this plot show? What is on the x-axis, what is on the y-axis?
# Q4: According to this plot, is the ngram assumption justified? If you don't remember what the ngram assumption is, ask Nils.
%matplotlib ipympl
import matplotlib.pyplot as plt
import numpy as np

def get_ngram_frequencies(n):
    frequencies = filter(lambda x: len(x[0]) == n, ngram_model.ngrams.items())
    frequencies = map(lambda x: x[1], frequencies)
    y = sorted(frequencies, reverse = True)
    x = np.arange(len(y))
    if len(x) == 0:
        return None, None
    else:
        return x, y

fig, ax = plt.subplots()
ax.set_xscale('log')
ax.set_yscale('log')
for n in range(1,11):
    x, y = get_ngram_frequencies(n)
    if x is None:
        continue
    ax.plot(x, y, label=n)
ax.legend()

In [None]:
# Cell I
# Perplexity
from tqdm import tqdm
def dataset_perplexity(model, data, progress=False, details=False):
    output = 0
    samples = [] if details else None
    it = tqdm(data) if progress else data
    for sentence in it:
        perplexity_ = model.perplexity(sentence)
        output += perplexity_
        if samples is not None:
            samples.append((sentence, perplexity_))
    output /= len(data)
    if details:
        return output, sorted(samples, key=lambda x: x[1])
    else:
        return output
# Q5: Why does this throw an error?
perplexity = dataset_perplexity(ngram_model, tokens['dev'])

In [None]:
# Cell J
# Try again, but with "smoothing = 1"
# "smoothing" adds counts to unknown ngrams. No probability is equal to 0 anymore,
# but we give probability mass away that could be used for known ngrams.
ngram_model = NgramCounter(25, smoothing=1)
fit(ngram_model, tokens['train'])
perplexity = dataset_perplexity(ngram_model, tokens['dev'], progress=True)
print(perplexity)

In [None]:
# Cell K
# small grid search: find the best possible length for ngrams, and the best possible smoothing value
# while this runs, get up and stretch your legs
import itertools
n_values = [1,2,3,4]
smoothing_values = [1,2,4]
params = list(itertools.product(*[n_values, smoothing_values]))
for param in itertools.product(*[n_values, smoothing_values]):
    n, smoothing = param
    ngram_model = NgramCounter(n, smoothing=smoothing)
    fit(ngram_model, tokens['train'])
    perplexity = dataset_perplexity(ngram_model, tokens['dev'])
    print('n={}, smoothing={}, perplexity={}'.format(n, smoothing, perplexity))
# Q6: Why doesn't the dev perplexity keep decreasing as n increases?

In [None]:
# Cell L
# Q7: What are the best values for n and smoothing?
best_n = # TODO
best_smoothing = # TODO
# Q8: How many parameters does the best ngram model have? Explain how you compute this quantity.
best_ngram_model = NgramCounter(best_n, smoothing=best_smoothing)
fit(best_ngram_model, tokens['train'])

In [None]:
# Cell M
# Build the best model on train and dev using the parameters for n and smoothing found in cell L.
# Then compute its perplexity on test and gen.
# Q9: What do you notice?
best_n = # TODO
best_smoothing = # TODO
ngram_model = NgramCounter(best_n, smoothing=best_smoothing)
fit(ngram_model, tokens['train']+tokens['dev'])
for split in ['train', 'dev', 'test', 'gen']:
    perplexity = dataset_perplexity(ngram_model, tokens[split], progress=True)
    print('{}: {}'.format(split, perplexity))

In [None]:
# Cell N
# RNN LM
# Q10: How does this RNN LM deal with words outside of its vocabulary?
import torch
from torch import nn
import itertools

# redefine this helper class
class Vocabulary:
    def __init__(self, words=set()):
        self.vocab = set(words)
        self.unk = '<unk>'
        self.bos = '<bos>'
        self.eos = '<eos>'
        self.vocab.add(self.unk)
        self.vocab.add(self.bos)
        self.vocab.add(self.eos)
        self.vocab = sorted(self.vocab)
        
    def add(self, word):
        if word not in self.vocab:
            self.vocab.append(word)
        
    def process(self, sentence):
        assert isinstance(sentence, list) or isinstance(sentence, tuple)
        assert all(isinstance(x, str) for x in sentence)
        for word in sentence:
            self.add(word)
        
    def size(self):
        return len(self.vocab)

    def word_to_int(self, word):
        assert isinstance(word, str)
        if word in self.vocab:
            key = word
        else:
            key = self.unk
        return self.vocab.index(key)

    def sentence_to_int(self, sentence):
        output = [None for _ in sentence]
        for ii, token in enumerate(sentence):
            output[ii] = self.word_to_int(token)
        return output
        
class RNNLM(nn.Module):
    def __init__(self, vocab=None, n_layers=2, n_units=512):
        super(RNNLM, self).__init__()
        assert isinstance(n_layers, int)
        assert n_layers > 0
        self.n_layers = n_layers
        assert isinstance(n_units, int)
        assert n_units > 0
        self.n_units = n_units
        assert isinstance(vocab, Vocabulary)
        self.vocab = vocab
        self.padding = self.vocab.size()
        # model parameters
        # dense word embeddings
        self.embeddings = nn.Embedding(self.vocab.size()+1, self.n_units, padding_idx=self.padding)
        # recurrent neural network
        self.rnn = nn.LSTM(self.n_units, self.n_units, num_layers=self.n_layers, batch_first=True)
        # final projection layer
        self.projection = nn.Sequential(
            nn.Linear(self.n_units, self.vocab.size()),
            nn.LogSoftmax(2)
        )

    def forward(self, batch):
        assert isinstance(batch, list), batch
        assert all(isinstance(x, list) for x in batch), batch
        assert all(all(isinstance(x, str) for x in sentence) for sentence in batch)
        assert all(x[0] == self.vocab.bos for x in batch)
        assert all(x[-1] == self.vocab.eos for x in batch)        
        # turn into indices
        indices = [self.vocab.sentence_to_int(x) for x in batch]
        # pad
        max_len = max(len(x) for x in indices)
        indices = [x if len(x) == max_len else x+[self.padding]*(max_len-len(x)) for x in indices]
        # turn into tensor
        indices = torch.tensor(indices)
        # feed through model
        word_embeddings = self.embeddings(indices)
        rnn_outputs, _ = self.rnn(word_embeddings)
        logits = self.projection(rnn_outputs)
        return logits, indices

    def perplexity(self, sentence):
        assert isinstance(sentence, list), sentence
        assert all(isinstance(x, str) for x in sentence)
        assert sentence[0] == self.vocab.bos, sentence
        assert sentence[-1] == self.vocab.eos, sentence
        logits, indices = self.forward([sentence])
        logits = logits[0,:-1,:]
        indices = indices[0,1:]
        # there is probably a better way for the following
        perplexity = 0
        for tt, index in enumerate(indices.numpy().tolist()):
            perplexity += logits[tt,index].item()
        perplexity = -perplexity/len(sentence)
        return perplexity

    def batch_perplexity(self, batch):
        assert isinstance(batch, list), batch
        assert all(isinstance(x, list) for x in batch), batch
        assert len(batch) > 0
        perplexities = [ self.perplexity(sentence) for sentence in batch ]
        return sum(perplexities)/len(perplexities)
            
    def number_parameters(self):
        return sum(p.numel() for p in self.parameters() if p.requires_grad)

class Trainer:
    def __init__(self, model, data, batch_size=128):
        self.model = model
        self.data = data
        self.optimizer = torch.optim.Adam(self.model.parameters())
        assert isinstance(batch_size, int)
        assert batch_size>0
        self.batch_size = batch_size
        self.loss_function = nn.CrossEntropyLoss(ignore_index=self.model.padding, size_average=True)

    def step(self, split, position, grad=False):
        start = position
        end = min(start + self.batch_size, len(self.data[split]))
        batch = self.data[split][start:end]
        logits, indices = self.model.forward(batch)
        targets = indices[:,1:]
        logits = logits[:,:-1,:]
        assert logits.size(0) == targets.size(0)
        assert logits.size(1) == targets.size(1)
        logits = torch.reshape(logits, (-1, self.model.vocab.size()))
        targets = torch.reshape(targets, (-1,))
        loss = self.loss_function(logits, targets)
        if grad:
            self.optimizer.zero_grad()
            loss.backward()
            nn.utils.clip_grad_norm_(self.model.parameters(), 5)
            self.optimizer.step()
        return loss

    def train(self, num_epochs, verbose=False):
        for epoch in range(num_epochs):
            train_loss = 0
            num_steps = 0
            it = tqdm(range(0, len(self.data['train']), self.batch_size)) if verbose \
                else range(0, len(self.data['train']), self.batch_size)
            for position in it:
                loss_ = self.step('train', position, grad=True)
                train_loss += loss_.item()
                num_steps += 1
            train_loss /= num_steps
            dev_loss = 0
            num_steps = 0
            it = tqdm(range(0, len(self.data['dev']), self.batch_size)) if verbose \
                else range(0, len(self.data['dev']), self.batch_size)
            for position in it:
                loss_ = self.step('dev', position, grad=False)
                dev_loss += loss_
                num_steps += 1
            dev_loss /= num_steps
            print('epoch {}/{}; train loss {}; dev loss {}'.format(epoch+1, num_epochs, train_loss, dev_loss))

In [None]:
# Cell O
# Train RNN LM
vocabulary = Vocabulary()
for sentence in tokens['train']:
    vocabulary.process(sentence)
rnn_lm = RNNLM(vocab=vocabulary, n_layers=1, n_units=7)
print(rnn_lm)
print('number of parameters: {}'.format(rnn_lm.number_parameters()))
trainer = Trainer(rnn_lm, tokens)
print('get up and stretch your legs while this runs')
trainer.train(100)
# Q11: How do the number of parameters of this model compare to the number of parameters of the best ngram model?

In [None]:
# Cell P
# Q12: How does the following compare to the best ngram model?
for split in ['train', 'dev', 'test', 'gen']:
    perplexity = dataset_perplexity(rnn_lm, tokens[split], progress=True)
    print('{}: {}'.format(split, perplexity))

In [None]:
# Cell Q
# Note that we underexploited at least 3 things:
# 1. The training time. The dev loss keeps decreasing, even after 200 epochs of training.
# See this by running trainer.train(100)
# 2. The RNN size. This RNN is *tiny* compared to what you'd use in practice. Here we just made it comparable to the best ngram model.
# 3. The data. Once we know at what epoch our RNN converges, we can train on train+dev, using the optimal epoch number as our stopping criterion.
# Q13: What else could we have done to further push the dev loss down?

In [None]:
# Cell R
# Train larger RNN LM
vocabulary = Vocabulary()
for sentence in tokens['train']:
    vocabulary.process(sentence)
rnn_lm_large = RNNLM(vocab=vocabulary, n_layers=2, n_units=512)
print(rnn_lm_large)
print('number of parameters: {}'.format(rnn_lm_large.number_parameters()))
trainer_large = Trainer(rnn_lm_large, tokens)
print('get up and stretch your legs while this runs')
trainer_large.train(5)

In [None]:
# Cell S
# Q14: How does the following compare to the other models?
for split in ['train', 'dev', 'test', 'gen']:
    perplexity = dataset_perplexity(rnn_lm_large, tokens[split], progress=True)
    print('{}: {}'.format(split, perplexity))

In [None]:
# Cell T
# Look at some high and low perplexity samples
# Look at overall distribution of perplexity
# Q15: What do you notice? Is there a phrase that describes this mismatch between distributions?
fig, ax = plt.subplots(1,3)
for ii, (name, model) in enumerate([('ngram', ngram_model), ('rnn', rnn_lm), ('rnn large', rnn_lm_large)]):
    for split in ['gen', 'test']:
        perplexity, samples = dataset_perplexity(model, tokens[split], progress=True, details=True)
        print('{} on {}:'.format(name, split))
        print('  perplexity: {}'.format(perplexity))
        print('  low perplexity:')
        low_p = ('    '+' '.join(x[0]) + ' -- {}'.format(x[1]) for x in samples[:5])
        print('\n'.join(low_p))
        print('  high perplexity:')
        high_p = ('    '+' '.join(x[0]) + ' -- {}'.format(x[1]) for x in samples[-5:])
        print('\n'.join(high_p))
        y = sorted((x[1] for x in samples), reverse=True)
        assert all(z>0 for z in y)
        x = np.arange(len(y))
        #ax.plot(x, y, label='{}-{}'.format(name,split))
        ax[ii].hist(y, bins=100, density=True, label='{}-{}'.format(name,split))
for ii,_ in enumerate(ax):
    ax[ii].legend()