Build GraphQL API in Python Using PostgreSQL

In today’s world of modern web development, building efficient and flexible APIs is more important than ever. GraphQL has quickly become a popular alternative to REST APIs due to its powerful querying capabilities and developer-friendly syntax. In this blog post, we’ll explore how to build a GraphQL API using Python and connect it to a PostgreSQL database.

To understand basics of building GraphQL API in python, visit this link: GraphQL in Python using Graphene

We will have project structure with three files inside our graphql_pg_python directory as given below:

graphql_pg_python/
|- database.py
|- schema.py
|- query.py


In this guide, we will go step by step from scratch.

Open terminal or command prompt based on your OS and create a working directory and go inside the directory

mkdir graphql_pg_python
cd graphql_pg_python



Create a virtualenv and activate the virtualenv. If you want a guide about this, visit venv — Creation of virtual environments

python -m venv .venv
source .venv/bin/activate # for linux/macos
.venv\Scripts\activate # for windows



To work with GraphQL in Python, we have to install graphene library. And to work with PostgreSQL in python, we need pyscopg2. Open your terminal/command prompt and install these libraries.

pip install graphene
pip instal psycopg2

Note: pip install pyscopg2 can give error. In case of error, you can try these methods:

  • For Windows users: If you are using python version 3.12.x and getting such error, you can downgrade to python 3.11.x
  • For Ubuntu and Debian users:
sudo apt install python3-dev libpq-dev


Now, open schema.py file and create a type, StudentType with three fields, id, name and address:

import graphene

class StudentType(graphene.ObjectType):
    id = graphene.ID() # unique identifier field
    name = graphene.String() 
    address = graphene.String()


Make sure that you have created database, user and granted database privileges to that user. You can either use PostgreSQL CLI or pgAdmin to create database and users. Also, create student table with some data. I am using PostgreSQL CLI in Linux based OS for demonstration purpose:

sudo su postgres
psql # Opens up PostgreSQL CLI 


Command to create database using PostgreSQL CLI:

create database school_db;


Command to create database user using PostgreSQL CLI:

create user dev with password 'admin';


To make sure, we don't face permission issue later, we make this user superuser. Warning: It's not a good practice to make all users superuser

ALTER USER dev with SUPERUSER;


Grant database privileges to user dev in PostgreSQL CLI:

grant all privileges on database school_db to dev;


Select our database school_db in PostgreSQL CLI:

\c school_db;


Now lets create a database table for student with three attributes: id, name and address in PostgreSQL CLI:

CREATE TABLE student(
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    address VARCHAR(100)
);


Now, we will add three students data to student table using POSTGRESQL CLI:

INSERT INTO student (name, address) VALUES
('Prashish Phuyal', 'Kirne, Dolakha'),
('Dixit Dahal', 'Khimti, Ramechhap'),
('Rajan Khadka', 'Khimti, Ramechhap');


Now, we write python code to make connection to the database in database.py file.

import psycopg2

conn = psycopg2.connect(
            database="school_db", 
            user="dev", 
            password="admin", 
            host="localhost", 
            port=5432
        )

cursor = conn.cursor()



Getting students list


Now, we will add getallstudents() functions in database.py file. In this function, we write logic to read all students data from student table.

def get_all_students():
    cursor.execute("SELECT * from student;")  # sql query
    records = cursor.fetchall()  # fetch all row data query returned

    students = []  # list to store student data

    for record in records:
        sd = {
            "id": record[0], 
            "name": record[1], 
            "address": record[2]
        }

        students.append(sd)
    return students
  • In this function, we execute a SQL query and stores all rows returned by this query on records variable

  • And we loop through each record and convert each record from records into sd dictionary and store these dictionaries on students list and return this list.

Now, we will add Query class and add a field named all_students along with resolver function to handle query request to get students list. For this, let's add code to schema.py file. Also, we will create schema in this file.

class Query(graphene.ObjectType):
    all_students = graphene.List(StudentType)

    def resolve_all_students(self, info):
        return get_all_students()


schema = graphene.Schema(query=Query)


Now, lets add query to get all students in query.py file.

from schema import schema

query_students = """
    query{
        allStudents{
            id
            name
            address
        }
    }
"""

students = schema.execute(query_students)
print(students.data)

If you are confused with contents of query_students variable, look at this blog: GraphQL in Python using Graphene

Now, lets run our code on terminal/command prompt:

python query.py

We will see result like this

{'allStudents': [{'id': '1', 'name': 'Prashish Phuyal', 'address': 'Kirne, Dolakha'}, {'id': '2', 'name': 'Dixit Dahal', 'address': 'Khimti, Ramechhap'}, {'id': '3', 'name': 'Rajan Khadka', 'address': 'Khimti, Ramechhap'}]}



Create student data


Now, we will add two functions on database.py file. Function getlaststudent() is needed to fetch data of student that is just created. And, create_student() function handles logic to create new student on student table.

def get_last_student():
    try:
        cursor.execute("select * from student order by id desc limit 1")
        record = cursor.fetchone() # fetch one data

        if record:
            return {
                "id": record[0], 
                "name": record[1], 
                "address": record[2]
            }
        else:
            return {"message": "No student found"}
    except Exception as e:
        return {"error": str(e)}


def create_student(name, address):
    try:
        instruction = (
            f"insert into student (name, address) values('{name}', '{address}');"
        )
        cursor.execute(instruction)
        conn.commit() # is like pressing save. 
                      # It saves newly added data to database 
        student = get_last_student()
        return student
    except Exception as e:
        # conn.rollback() is the opposite of conn.commit()
        # It undoes all changes made in the current database transaction
        # since the last commit.
        conn.rollback()
        return {"error": str(e)}


Now, lets add two classes CreateStudent and Mutation class to query.py file.

import graphene

# don't forget to import create_student
from database import create_student, get_all_students


class CreateStudent(graphene.Mutation):
    class Arguments:
        name = graphene.String()
        address = graphene.String()

    ok = graphene.Boolean()
    student = graphene.Field(StudentType)

    def mutate(self, info, name, address):
        try:
            new_student = create_student(name, address)
            return CreateStudent(student=new_student, ok=True)
        except Exception:
            return CreateStudent(student=None, ok=False)


class Mutation(graphene.ObjectType):
    create_student = CreateStudent.Field()

#  Add Mutation to schema 
schema = graphene.Schema(query=Query, mutation=Mutation)

In Mutation class we have create_student field. The createstudent field of the Mutation class registers the CreateStudent mutation, and when a mutation request is made to *createstudent*, it is handled by the mutate() method of the CreateStudent class.

Add mutation operation to query.py file and run this file to see the result.

mutation_student = """
    mutation {
        createStudent(name: "Kashyap Neu", address: "Khimti, Ramechhap"){
            student {
                id
                name
                address
            }
            ok
        }
    }
"""
student = schema.execute(mutation_student)
print(student.data)



Fetching student based on id


we will add function, getstudentby_id() on database.py file.

def get_student_by_id(id):
    try:
        instruction = f"select * from student where id={id};"
        cursor.execute(instruction)
        record = cursor.fetchone()
        if record:
            return {"id": record[0], "name": record[1], "address": record[2]}
        else:
            return {"message": f"Student with id: {id} not found"}
    except Exception:
        return {"error: str(e)"}


Next, we will add field and resolver on Query class to handle query request.

# Don't forget to import get_student_by_id
from database import create_student, get_all_students, get_student_by_id


class Query(graphene.ObjectType):
    all_students = graphene.List(StudentType)

    # We added id arugment to student_by_id field
    student_by_id = graphene.Field(
                        StudentType, 
                        id=graphene.ID(required=True)
                    )

    def resolve_all_students(self, info):
        return get_all_students()

    def resolve_student_by_id(self, info, id): # takes id as arugment
        return get_student_by_id(id=id)


Finally lets add query to query.py and run our code.

student_by_id = """
    query{
        studentById(id: 1){
            id
            name
            address
        }
    }
"""
student_by_id = schema.execute(student_by_id)
print(student_by_id.data)



Final codebase


In case of confusion, our final database.py looks like this:

import psycopg2

conn = psycopg2.connect(
    database="school_db", user="dev", password="admin", host="localhost", port=5432
)

cursor = conn.cursor()


def get_all_students():
    cursor.execute("SELECT * from student;")  # sql query
    records = cursor.fetchall()  # fetch all row data query returned

    students = []  # list to store student data

    for record in records:
        sd = {"id": record[0], "name": record[1], "address": record[2]}

        students.append(sd)
    return students


def get_last_student():
    try:
        cursor.execute("select * from student order by id desc limit 1")
        record = cursor.fetchone()  # fetch one data

        if record:
            return {"id": record[0], "name": record[1], "address": record[2]}
        else:
            return {"message": "No student found"}
    except Exception as e:
        return {"error": str(e)}


def create_student(name, address):
    try:
        instruction = (
            f"insert into student (name, address) values('{name}', '{address}');"
        )
        cursor.execute(instruction)
        conn.commit()  # is like pressing save.
        # It saves newly added data to database
        student = get_last_student()
        return student
    except Exception as e:
        # conn.rollback() is the opposite of conn.commit()
        # It undoes all changes made in the current database transaction
        # since the last commit.
        conn.rollback()
        return {"error": str(e)}


def get_student_by_id(id):
    try:
        instruction = f"select * from student where id={id};"
        cursor.execute(instruction)
        record = cursor.fetchone()
        if record:
            return {"id": record[0], "name": record[1], "address": record[2]}
        else:
            return {"message": f"Student with id: {id} not found"}
    except Exception:
        return {"error: str(e)"}

And, our final schema.py looks like this:

import graphene

from database import create_student, get_all_students, get_student_by_id


class StudentType(graphene.ObjectType):
    id = graphene.ID()  # unique identifier field
    name = graphene.String()
    address = graphene.String()


class Query(graphene.ObjectType):
    all_students = graphene.List(StudentType)

    # We added id arugment to student_by_id field
    student_by_id = graphene.Field(StudentType, id=graphene.ID(required=True))

    def resolve_all_students(self, info):
        return get_all_students()

    def resolve_student_by_id(self, info, id):  # takes id as arugment
        return get_student_by_id(id=id)


class CreateStudent(graphene.Mutation):
    class Arguments:
        name = graphene.String()
        address = graphene.String()

    ok = graphene.Boolean()
    student = graphene.Field(StudentType)

    def mutate(self, info, name, address):
        try:
            new_student = create_student(name, address)
            return CreateStudent(student=new_student, ok=True)
        except Exception:
            return CreateStudent(student=None, ok=False)


class Mutation(graphene.ObjectType):
    create_student = CreateStudent.Field()


schema = graphene.Schema(query=Query, mutation=Mutation)

And, our final query.py looks like this:

from schema import schema

query_students = """
    query{
        allStudents{
            id
            name
            address
        }
    }
"""

# students = schema.execute(query_students)
# print(students.data)

mutation_student = """
    mutation {
        createStudent(name: "Kashyap Neu", address: "Khimti, Ramechhap"){
            student {
                id
                name
                address
            }
            ok
        }
    }
"""

# student = schema.execute(mutation_student)
# print(student.data)

student_by_id = """
    query{
        studentById(id: 1){
            id
            name
            address
        }
    }
"""
student_by_id = schema.execute(student_by_id)
print(student_by_id.data)