Makina Blog

Le blog Makina-corpus

Débo­guer des trig­gers SQL en cascade : une approche visuelle avec Matplot­lib


Dans cet article, je vais parta­ger mon expé­rience de débo­gage à l’aide de Matplot­lib, un outil de visua­li­sa­tion Python puis­sant et flexible.
Sommaire

La main­te­nance des trig­gers SQL devient un véri­table défi tech­nique lorsque la base de données évolue et que les inter­ac­tions entre tables se multi­plient.

Le défi : des trig­gers complexes en cascade

Les trig­gers SQL (ou déclen­cheurs) sont des procé­dures stockées qui s’exé­cutent auto­ma­tique­ment en réponse à certains événe­ments (comme l’in­ser­tion, la modi­fi­ca­tion ou la suppres­sion de données). Ils permettent d’im­plé­men­ter des règles métier et d’as­su­rer l’in­té­grité des données direc­te­ment au niveau de la base de données. Lorsque vous travaillez avec une base rela­tion­nelle compor­tant une grande quan­tité de trig­gers SQL, leur déclen­che­ment en cascade peut engen­drer des cycles de mises à jour qui, bien que logiques indi­vi­duel­le­ment, deviennent rapi­de­ment diffi­ciles à retra­cer dans leur ensemble.

Par exemple, une mise à jour sur une table peut provoquer des modi­fi­ca­tions sur plusieurs autres tables, enclen­chant elles-mêmes des modi­fi­ca­tions sur la première table, rendant le suivi du flot d’exé­cu­tion très complexe. Dans mon cas, j’ai besoin d’iden­ti­fier la source d’une erreur appa­rais­sant au sein d’un test unitaire en échec envi­ron une exécu­tion sur cinq, dans une appli­ca­tion Django connec­tée à une base de données Post­greSQL.

L’Ap­proche graphique : visua­li­ser pour Comprendre

Collecte des données

La première étape consiste à captu­rer les infor­ma­tions sur l’exé­cu­tion des diffé­rents trig­gers. J’ai créé une table dédiée à la collecte des dates d’exé­cu­tion de chaque trig­ger, en rensei­gnant égale­ment leur niveau de profon­deur (la méthode pg_trigger_depth permet­tant de déter­mi­ner combien de trig­gers succes­sifs ont déclen­ché l’exé­cu­tion du trig­ger courant).

Créa­tion de la table :

CREATE TABLE IF NOT EXISTS trigger_timeline(
    id SERIAL PRIMARY KEY,
    trigger_name VARCHAR(100),
    trigger_depth INTEGER,
    created_at TIMESTAMP
);

Inser­tion du logging, à ajou­ter au début de chacun des trig­gers (en chan­geant la valeur 'trigger_name') sur toutes les tables impliquées dans le trai­te­ment :

INSERT INTO trigger_timeline (trigger_name, trigger_depth, created_at) VALUES ('trigger_name', pg_trigger_depth(), clock_timestamp());

Du côté du test unitaire à débo­guer, il est désor­mais possible de récu­pé­rer les infor­ma­tions ainsi logguées juste après la ligne de code qui déclenche toute la cascade :

# Connect to database
from django.db import connection
cursor = connection.cursor()
# Remove database logging entries from last execution
cursor.execute("TRUNCATE trigger_timeline")


# This is the line I need to debug : this insertion launches all the triggers
PathFactory.create(geom=LineString(Point(700070, 6600000),
                                   Point(700020, 6600050),
                                   Point(700060, 6600090),
                                   Point(700100, 6600050),
                                   srid=settings.SRID))


# Extract logging information to a Python list, ensure it is sorted by timestamp
cursor.execute("SELECT * FROM trigger_timeline")
data = cursor.fetchall()
trigger_timeline = []
for _, trigger_name, trigger_depth, trigger_timestamp in data:
    trigger_timeline.append(trigger_name, trigger_depth, trigger_timestamp)
sorted_trigger_timeline = sorted(trigger_timeline, key=lambda x: x[2])
print(f"{sorted_trigger_timeline=}")

J’ob­tiens en sortie une liste de la forme suivante, compre­nant non moins de 160 valeurs :

sorted_trigger_timeline = [
    ('paths_snap_extremities', 1, datetime.datetime(2025, 1, 15, 9, 27, 0, 609516)),
    ('elevation_path_iu', 1, datetime.datetime(2025, 1, 15, 9, 27, 0, 611045)),
    ('paths_topology_intersect_split', 1, datetime.datetime(2025, 1, 15, 9, 27, 0, 611428)),
    ('paths_snap_extremities', 2, datetime.datetime(2025, 1, 15, 9, 27, 0, 612682)),
    ('elevation_path_iu', 2, datetime.datetime(2025, 1, 15, 9, 27, 0, 613426)),
    ('paths_topology_intersect_split', 2, datetime.datetime(2025, 1, 15, 9, 27, 0, 613676)),
    ('paths_snap_extremities', 3, datetime.datetime(2025, 1, 15, 9, 27, 0, 614584)),
    ('elevation_path_iu', 3, datetime.datetime(2025, 1, 15, 9, 27, 0, 615308)),
    ('paths_topology_intersect_split', 3, datetime.datetime(2025, 1, 15, 9, 27, 0, 615541)),
    ('update_topology_geom_when_path_changes', 3, datetime.datetime(2025, 1, 15, 9, 27, 0, 616272)),
     ...
]

Visua­li­sa­tion du flot d’exé­cu­tion

Avec Matplot­lib, j’ai trans­forme ces données en un graphique qui repré­sente visuel­le­ment la cascade des trig­gers :

import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import datetime

def plot_trigger_timeline(data):
    trigger_names = list(set(name for name, _, _ in data))
    trigger_names.sort()

    # Create color map for triggers
    colors = [plt.cm.tab20(i / (len(trigger_names) - 1)) for i in range(len(trigger_names))]
    color_map = dict(zip(trigger_names, colors))
    # Create figure and axis with larger size
    plt.figure(figsize=(15, 10))

    # Plot points for each trigger type
    for trigger in trigger_names:
        # Filter data for this trigger
        trigger_data = [(name, depth, time) for name, depth, time in data if name == trigger]

        # Extract x and y coordinates
        times = [t for _, _, t in trigger_data]
        depths = [depth for _, depth, _ in trigger_data]

        # Plot scatter points
        plt.scatter(times, depths, label=trigger, alpha=0.6, s=100, color=color_map[trigger])

    # Customize the plot
    plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S.%f'))
    plt.gcf().autofmt_xdate()  # Rotate and align the tick labels

    # Add labels and title
    plt.xlabel('Timestamp')
    plt.ylabel('PG Depth')
    plt.title('Trigger Events Timeline')

    # Add legend to the right side of the plot
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0.)

    # Adjust layout to prevent label cutoff
    plt.tight_layout()

    return plt

# Create and display the plot
plt = plot_trigger_timeline(sorted_trigger_timeline)
plt.show()

Je relance mon tests unitaire, qui est en échec une fois sur 5, et trace le résul­tat dans le cas qui fonc­tionne, et dans le cas qui échoue, afin de les compa­rer :

Cas sans échec : exécu­tion souhai­tée

graphe d'éxecution des triggers sql

Cas en échec

graphe d'éxecution des triggers sql

À partir de ces repré­sen­ta­tions, je peux iden­ti­fier deux choses :

  • Cette visua­li­sa­tion met en évidence la nature récur­sive des trig­gers que je suis en train d’ex­plo­rer (forme pyra­mi­dale, avec des motifs qui se répètent, augmen­tant en profon­deur jusqu’à atteindre la condi­tion de sortie au sommet de la pyra­mide)

  • Je commence à iden­ti­fier le point de départ de mon erreur finale, à l’en­droit où la cascade commence à diver­ger de son exécu­tion souhai­tée.

Visua­li­sa­tion des inser­tions et mises à jour

Afin de corré­ler les exécu­tions de trig­gers avec les objets modi­fiés, je repro­duis le même méca­nisme pour logger les INSERT et UPDATE d’un objet dans la table concer­née.

Je décide d’uti­li­ser une seconde table de logging :

CREATE TABLE IF NOT EXISTS trigger_events(
    id SERIAL PRIMARY KEY,
    trigger_type VARCHAR(100),
    path_id VARCHAR(100),
    created_at TIMESTAMP,
    tgr_depth INTEGER
);

Je crée 4 trig­gers desti­nés à logger unique­ment le début « BEFORE » et la fin « AFTER » des évène­ments « INSERT » et « UPDATE » sur ma table de core_path :

CREATE FUNCTION public.log_after_trigger_update() RETURNS trigger SECURITY
DEFINER AS $$
DECLARE
BEGIN
    insert into trigger_events (trigger_type, path_id, created_at, tgr_depth) VALUES('AFTER_UPDATE', NEW.id, clock_timestamp(), pg_trigger_depth());
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER log_after_trigger_tgr_update
AFTER UPDATE OF geom ON core_path
FOR EACH ROW EXECUTE PROCEDURE log_after_trigger_update();

CREATE FUNCTION public.log_before_trigger_update() RETURNS trigger SECURITY
DEFINER AS $$
DECLARE
BEGIN
    insert into trigger_events (trigger_type, path_id, created_at, tgr_depth) VALUES('BEFORE_UPDATE', NEW.id, clock_timestamp(), pg_trigger_depth());
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER log_before_trigger_tgr_update
BEFORE UPDATE OF geom ON core_path
FOR EACH ROW EXECUTE PROCEDURE log_before_trigger_update();

CREATE FUNCTION public.log_after_trigger_insert() RETURNS trigger SECURITY
DEFINER AS $$
DECLARE
BEGIN
    insert into trigger_events (trigger_type, path_id, created_at, tgr_depth) VALUES('AFTER_INSERT', NEW.id, clock_timestamp(), pg_trigger_depth());
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER log_after_trigger_tgr_insert
AFTER INSERT ON core_path
FOR EACH ROW EXECUTE PROCEDURE log_after_trigger_insert();

CREATE FUNCTION public.log_before_trigger_insert() RETURNS trigger SECURITY
DEFINER AS $$
DECLARE
BEGIN
    insert into trigger_events (trigger_type, path_id, created_at, tgr_depth) VALUES('BEFORE_INSERT', NEW.id, clock_timestamp(), pg_trigger_depth());
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER log_before_trigger_tgr_insert
BEFORE INSERT ON core_path
FOR EACH ROW EXECUTE PROCEDURE log_before_trigger_insert();

Je repro­duis le même prin­cipe du côté de mon test unitaire :

# Connect to database
from django.db import connection
cursor = connection.cursor()
# Remove database logging entries from last execution
cursor.execute("TRUNCATE trigger_events")


# This is the line I need to debug : this insertion launches all the triggers
PathFactory.create(geom=LineString(Point(700070, 6600000),
                                   Point(700020, 6600050),
                                   Point(700060, 6600090),
                                   Point(700100, 6600050),
                                   srid=settings.SRID))


# Extract logging information to 4 Python lists, ensure they are sorted by timestamp
cursor.execute("SELECT * FROM trigger_events")
data = cursor.fetchall()
before_update = []
after_update = []
before_insert = []
after_insert = []
for _, trg_type, pk, date, tgr_depth in data:
    if trg_type == "BEFORE_INSERT":
        before_insert.append((pk, tgr_depth, date))
    elif trg_type == "AFTER_INSERT":
        after_insert.append((pk, tgr_depth, date))
    elif trg_type == "BEFORE_UPDATE":
        before_update.append((pk, tgr_depth, date))
    elif trg_type == "AFTER_UPDATE":
        after_update.append((pk, tgr_depth, date))
sorted_before_update = sorted(before_update, key=lambda x: x[1])
sorted_after_update = sorted(after_update, key=lambda x: x[1])
sorted_before_insert = sorted(before_insert, key=lambda x: x[1])
sorted_after_insert = sorted(after_insert, key=lambda x: x[1])

J’ob­tiens en sortie des listes de la forme suivante, me permet­tant de retra­cer les objets impac­tés par la cascade de trig­gers :

before_update = [
    ('207', 2, datetime.datetime(2025, 1, 21, 13, 23, 42, 254063)),
    ('211', 3, datetime.datetime(2025, 1, 21, 13, 23, 42, 256176)),
    ...
]
after_update = [
    ('207', 2, datetime.datetime(2025, 1, 21, 13, 23, 42, 255288)),
    ('211', 3, datetime.datetime(2025, 1, 21, 13, 23, 42, 257131)),
    ...
]
before_insert = [
    ('211', 1, datetime.datetime(2025, 1, 21, 13, 23, 42, 250029)),
    ('217', 2, datetime.datetime(2025, 1, 21, 13, 23, 42, 284971)),
    ...
]
after_insert = [
    ('211', 1, datetime.datetime(2025, 1, 21, 13, 23, 42, 252373)),
    ('217', 2, datetime.datetime(2025, 1, 21, 13, 23, 42, 285437)),
    ...
]

Voici un second script Matplot­lib, pour les repré­sen­ter sur le même graphe :


import matplotlib.pyplot as plt import datetime def plot_trigger_timeline_paths(data, legend, ytext, color, create_new_figure=True): # Extract data for plotting identifiers = [item[0] for item in data] depths = [item[1] for item in data] timestamps = [item[2] for item in data] # Create the plot only if it's the first dataset if create_new_figure: plt.figure(figsize=(10, 6)) # Plot points plt.scatter(timestamps, depths, color=color, label=legend) # Add labels for each point for i, txt in enumerate(identifiers): plt.annotate(txt, (timestamps[i], depths[i]), xytext=(5, ytext), textcoords='offset points') # Customize the plot (only needed once) if create_new_figure: plt.xlabel('Timestamp') plt.ylabel('PG Depth') plt.title('PG Depth vs Timestamp') # Format x-axis to show time nicely plt.gcf().autofmt_xdate() # Add grid for better readability plt.grid(True, linestyle='--', alpha=0.7) return plt # Plot the first dataset (creates the figure) plt = plot_trigger_timeline_paths(before_update, 'before_update', 5, 'pink', create_new_figure=True) # Plot the second dataset (adds to existing figure) plt = plot_trigger_timeline_paths(before_insert, 'before_insert', 5, 'green', create_new_figure=False) plt = plot_trigger_timeline_paths(after_insert, 'after_insert', -15, 'blue', create_new_figure=False) plt = plot_trigger_timeline_paths(after_update, 'after_update', -15, 'red', create_new_figure=False) # Add legend plt.legend() # Adjust layout to prevent label cutoff (do this only once at the end) plt.tight_layout() # Show the plot plt.show()

Cas sans échec : exécu­tion souhai­tée

graphe d'éxecution des update/insert

Cas en échec

graphe d'éxecution des update/insert

Ces repré­sen­ta­tions :

  • Soulignent davan­tage le motif pyra­mi­dal de la récur­sion, ainsi que la diver­gence du flot d’exé­cu­tion dans le cas qui échoue (pyra­mide brisée en deux)
  • Me permettent d’iden­ti­fier le point de départ de la diver­gence des exécu­tions (l’objet numéro 208 a été modi­fié avant l’objet numéro 207).

À partir de ces constats, et en réuti­li­sant le même concept pour logger l’exé­cu­tion de quelques lignes précises de SQL au sein des trig­gers, je réus­sis rapi­de­ment à retrou­ver la ligne respon­sable de l’er­reur : le trig­ger paths_topology_intersect_split itère via une boucle FOR sur le résul­tat d’un SELECT dont la clause manquait d’un ORDER BY, ne garan­tis­sant donc pas l’ordre dans lequel on itère sur les objets. Ceci explique que le test unitaire était en erreur une fois sur cinq seule­ment.

Conclu­sion

En créant une repré­sen­ta­tion visuelle de l’exé­cu­tion des trig­gers SQL, j’ai pu gagner en clarté et en compré­hen­sion. La visua­li­sa­tion m’a permis de :

  • Prendre connais­sance de l’ordre d’exé­cu­tion des trig­gers
  • M’orien­ter vers la source de l’er­reur sans connaître l’in­té­gra­lité du code SQL
  • Affi­ner progres­si­ve­ment ma recherche en croi­sant diffé­rentes infor­ma­tions

J’ai pu produire un outil réuti­li­sable par mon équipe pour explo­rer des problèmes simi­laires à l’ave­nir.

Cette expé­rience souligne un aspect fonda­men­tal des trig­gers SQL : leur nature événe­men­tielle et leur capa­cité à s’en­chaî­ner peuvent faire de leur main­te­nance un casse-tête. Faire l’in­ven­taire des opéra­tions déclen­chant les trig­gers est déjà un exer­cice chro­no­phage, et garder la visi­bi­lité sur les opéra­tions causées par ceux-ci est un véri­table défi. Souli­gnons l’im­por­tance d’uti­li­ser les trig­gers avec parci­mo­nie, en privi­lé­giant des opéra­tions simples via des procé­dures courtes. Je recom­mande d’évi­ter la mise en place de trig­gers récur­sifs impac­tant eux-mêmes la colonne qui les déclenche en premier lieu. Atten­tion égale­ment à la nature non-déter­mi­niste de la clause SELECT.

Formations associées

Formations Outils et bases de données

Formation PostgreSQL

À distance (FOAD) Du 12 au 16 mai 2025

Voir la Formation PostgreSQL

Formations Python

Formation Python

Toulouse Du 5 au 7 mars 2025

Voir la Formation Python

Formations Python

Formation Python avancé

Nantes Du 7 au 11 avril 2025

Voir la Formation Python avancé

Actualités en lien

Utiliser des fonctions PostgreSQL dans des contraintes Django

07/11/2023

Cet article vous présente comment utiliser les fonctions et les check constraints PostgreSQL en tant que contrainte sur vos modèles Django.

Voir l'article
Image
Django PostgreSQL

Créer des vues SQL dans Django et les afficher dans un SIG

06/09/2022

Nous allons décrire un processus via la mise en place de vues SQL qui permettent à l'utilisateur de lire de la donnée formatée, sans possibilité d'influer sur le contenu d'une base et tout en se connectant directement à celle-ci.

Voir l'article
Image
Randonnée

Améliorez votre SQL : utilisez des index filtrés

29/08/2019

L'indexation d'une base de données est un vaste sujet, dans cet article nous examinerons une possibilité offerte par PostgreSQL dans les index et le filtrage de l'index pour qu'il ne s'applique pas à toute la table.

Voir l'article
Image
photographie code par Kevin Ku

Inscription à la newsletter

Nous vous avons convaincus