Operation System : Windows 10

IDE : Visual Studio Code

Java Implementation : Java and a MySQL Database

Problem

Goal

Generate a mysql database that holds a table of restaurant‘s menu which includes information about each meal.

Pseudo-code

Solution

In order to connect to the MySQL database, we will need to import the mysql.connector which provides the needed driver. Furthermore, to enforce correct typing, the typing module will be loaded.

import mysql.connector
from mysql.connector import MySQLConnection
from typing import List, Dict, Tuple
from mysql.connector.cursor import MySQLCursor

Connect to database…

The function below simply establishes connection with the local MySQL and throws an exception when something wrong happens. The mysql.connector.connect call returns a MySQLConnection object which has a cursor function. This cursor function, when called, returns a MySQLCursor object by default, however depending on the arguments passed to the call another type of cursor will be returned. The different types of cursor allows the program to determine the format, by which the table rows are returned.

In case of an error, the general or base error class object is thrown. This will be caught by the except scope.

def switchDBON(self):
        try:
            self.connection = mysql.connector.connect(
                **self.config)
            self.cursor = self.connection.cursor()
            print("Database connection established...")
        except mysql.connector.Error as err:
            print(err)

Create a database if it does not exist…

After a successful connection to the DB, the execute function of the MySQLCursor object is used to create our restaurant database. If the db already exists, an error will be thrown.

def setupDB(self):
        print("Creating database...")

        try:
            self.cursor.execute(
                "CREATE DATABASE IF NOT EXISTS {}".format(self.dbname))
        except mysql.connector.Error as err:
            print(err)

Create a table…

def createColumnStatementString(self, columnName: str, type: str, otherProps: str) -> str:
        statement: str = "{} {} {}".format(columnName, type, otherProps)
        return statement

The following code will generate the following statement: <br>

 CREATE TABLE IF NOT EXISTS nigerian ( dish_price float NOT NULL,number_sold 
int DEFAULT 0,id int NOT NULL AUTO_INCREMENT ,dish_name varchar(30) NOT NULL ,PRIMARY KEY ( id ))

This will create a table with the 4 columns and throw and error if the table already exists.

def createTable(self, tablename: str, createColumnStatements: List[str], primaryKey: str) -> None:
        print("Creating table...")
        createTableStatement: str = "\n\tCREATE TABLE IF NOT EXISTS {} (".format(
            tablename)
        columnStatements: str = ",".join(createColumnStatements)
        primaryKeyDefinition: str = ",PRIMARY KEY ( {} ))".format(primaryKey)
        executionString: str = "{} {} {}".format(
            createTableStatement , columnStatements ,primaryKeyDefinition)
        try:
            self.cursor.execute(executionString)
        except mysql.connector.Error as err:
            print(err)
def setupNigerianRestaurantTable(self) -> None:
        createColumnStatements: List[str] = {
            self.createColumnStatementString(
                "id", "int", "NOT NULL AUTO_INCREMENT "),
            self.createColumnStatementString(
                "dish_name", "varchar(30)", "NOT NULL"),
            self.createColumnStatementString(
                "dish_price", "float", "NOT NULL"),
            self.createColumnStatementString("number_sold", "int", "DEFAULT 0")
        }
        self.createTable(self.tablename, createColumnStatements, "id")

Fill the table…

After the table has been created, it has to be filled. Each sub-list in the values 2D array represents a row in the table. The dish name and the price are the only values to be added to the table, while the others are either automatically generated (id ) or their default values used. Onet the “Insert” statements for each array is created, the script is run.

def populateTable(self) -> None:
        print("Populating table...")
        values: List[List[str]] = [["Egusi Soup and Eba", "24.5"],
                                   ["Porridge Yam", "12"], ["Porridge Beans", "13"]]
        columns: List[str] = ["dish_name", "dish_price"]
        columnsString: str = ",".join(columns)
        preInsertExecString: str = "INSERT INTO {}.{}({})".format(self.dbname,
                                                                  self.tablename, columnsString)
        for row in values:
            executionString: str = "{} VALUES (".format(preInsertExecString)
            for index, col in enumerate(row):
                executionString += "'{}'".format(col)
                if index < len(row) - 1:
                    executionString += ","
            executionString += ")"
            print(executionString)
            self.cursor.execute(executionString)

Display the table…

This method pulls all the rows from the table created in the previous steps. However, only the id, dish name and dish price columns are to be extracted. Luckily, the fetchall function of the MySQLCursor object allows us to collect all the rows at once.

def getTable(self) -> Tuple[List[str]]:
        columns: List[str] = ["id", "dish_name", "dish_price", "number_sold"]
        self.cursor.execute(
            "SELECT * FROM {}.{}".format(self.dbname, self.tablename))
        data: Tuple[List[str]] = self.cursor.fetchall()
        return data

Once the data is collected, the table is printed as shown below.

def printOutTable(self) -> None:
        '''
           -------------------------------------------------------
         * | id | dish_name          | dish_price | number_sold |
         * --------------------------+------------+--------------
         * |  1 | Egusi Soup and Eba |       24.5 |           0 |
         * |  2 | Egusi Soup and Eba |       24.5 |           0 |
         * |  3 | Porridge Yam       |         12 |           0 |
         * |  4 | Porridge Beans     |         13 |           0 |
         * --------------------------+------------+-------------+

        '''
        data: Tuple[List[str]] = self.getTable()
        header: str = ""
        values: str = ""
        line: str = "--------------------------------------------\n"
        columns: List[str] = ["id", "dish_name", "dish_price", "number_sold"]
        for column in columns:
            print("| {}  ".format(column), end=" ")
        print("")
        print(line)
        for row in data:
            for col in row:
                print("| {} ".format(col), end=" ")
            print("")
def __del__(self):
        self.connection.close()

The blueprint

The functions shown above are all methods on the class provided in the following code snippet.

class ConnectDB:
    tablename: str = "nigerian"
    dbname = "restaurants"
    connection: MySQLConnection
    cursor: MySQLCursor
    config = {
        'user': 'root',
        'password': '',
        'host': '127.0.0.1',
        'database': 'restaurants',
        'raise_on_warnings': True
    }
    populated: bool = True
 def __init__(self):
        super().__init__()
        self.switchDBON()
        self.setupDB()
        self.setupNigerianRestaurantTable()
        if(self.populated == False):
            self.populateTable()
        self.printOutTable()
...
main: ConnectDB = ConnectDB()

Author Notes

This is most likely not the cleanest implementation, just a heads up.

Links

  1. Classes
  2. Type hinting
  3. Errors and Exceptions
  4. Destructors in Python
  5. Accessing the index in ‘for’ loops?
  6. cursor.MySQLCursor Class

Full code

import mysql.connector
from mysql.connector import MySQLConnection
from typing import List, Dict, Tuple
from mysql.connector.cursor import MySQLCursor


class ConnectDB:
    tablename: str = "nigerian"
    dbname = "restaurants"
    connection: MySQLConnection
    cursor: MySQLCursor
    config = {
        'user': 'root',
        'password': '',
        'host': '127.0.0.1',
        'database': 'restaurants',
        'raise_on_warnings': True
    }
    populated: bool = True

    def __init__(self):
        super().__init__()
        self.switchDBON()
        self.setupDB()
        self.setupNigerianRestaurantTable()
        if(self.populated == False):
            self.populateTable()
        self.printOutTable()

    def switchDBON(self):
        try:
            self.connection = mysql.connector.connect(
                **self.config)
            self.cursor = self.connection.cursor()
            print(type(self.cursor))
            print("Database connection established...")
        except mysql.connector.Error as err:
            print(err)

    def setupDB(self):
        print("Creating database...")

        try:
            self.cursor.execute(
                "CREATE DATABASE IF NOT EXISTS {}".format(self.dbname))
        except mysql.connector.Error as err:
            print(err)

    def createColumnStatementString(self, columnName: str, type: str, otherProps: str) -> str:
        statement: str = "{} {} {}".format(columnName, type, otherProps)
        return statement

    def setupNigerianRestaurantTable(self) -> None:
        createColumnStatements: List[str] = {
            self.createColumnStatementString(
                "id", "int", "NOT NULL AUTO_INCREMENT "),
            self.createColumnStatementString(
                "dish_name", "varchar(30)", "NOT NULL"),
            self.createColumnStatementString(
                "dish_price", "float", "NOT NULL"),
            self.createColumnStatementString("number_sold", "int", "DEFAULT 0")
        }
        self.createTable(self.tablename, createColumnStatements, "id")

    def createTable(self, tablename: str, createColumnStatements: List[str], primaryKey: str) -> None:
        print("Creating table...")
        createTableStatement: str = "\n\tCREATE TABLE IF NOT EXISTS {} (".format(
            tablename)
        columnStatements: str = ",".join(createColumnStatements)
        primaryKeyDefinition: str = ",PRIMARY KEY ( {} ))".format(primaryKey)
        executionString: str = "{} {} {}".format(
            createTableStatement, columnStatements, primaryKeyDefinition)
        try:

            # self.cursor.execute(executionString)
            print(executionString)
        except mysql.connector.Error as err:
            print(err)

    def populateTable(self) -> None:
        print("Populating table...")
        values: List[List[str]] = [["Egusi Soup and Eba", "24.5"],
                                   ["Porridge Yam", "12"], ["Porridge Beans", "13"]]
        columns: List[str] = ["dish_name", "dish_price"]
        columnsString: str = ",".join(columns)
        preInsertExecString: str = "INSERT INTO {}.{}({})".format(self.dbname,
                                                                  self.tablename, columnsString)
        for row in values:
            executionString: str = "{} VALUES (".format(preInsertExecString)
            for index, col in enumerate(row):
                executionString += "'{}'".format(col)
                if index < len(row) - 1:
                    executionString += ","
            executionString += ")"
            print(executionString)
            self.cursor.execute(executionString)

    def getTable(self) -> Tuple[List[str]]:
        columns: List[str] = ["id", "dish_name", "dish_price", "number_sold"]
        self.cursor.execute(
            "SELECT * FROM {}.{}".format(self.dbname, self.tablename))
        data: Tuple[List[str]] = self.cursor.fetchall()
        return data

    def printOutTable(self) -> None:
        '''
           -------------------------------------------------------
         * | id | dish_name          | dish_price | number_sold |
         * --------------------------+------------+--------------
         * |  1 | Egusi Soup and Eba |       24.5 |           0 |
         * |  2 | Egusi Soup and Eba |       24.5 |           0 |
         * |  3 | Porridge Yam       |         12 |           0 |
         * |  4 | Porridge Beans     |         13 |           0 |
         * --------------------------+------------+-------------+

        '''
        data: Tuple[List[str]] = self.getTable()
        header: str = ""
        values: str = ""
        line: str = "--------------------------------------------\n"
        columns: List[str] = ["id", "dish_name", "dish_price", "number_sold"]
        for column in columns:
            print("| {}  ".format(column), end=" ")
        print("")
        print(line)
        for row in data:
            for col in row:
                print("| {} ".format(col), end=" ")
            print("")

    def __del__(self):
        self.connection.close()


main: ConnectDB = ConnectDB()
%d bloggers like this: