Operation System : Windows 10
IDE : Visual Studio Code
Java Implementation : Java and a MySQL Database
Problem
Goal
Generate a mysql database that holds a table of restaurant‘s menu which includes information about each meal.
Pseudo-code
Solution
In order to connect to the MySQL database, we will need to import the mysql.connector which provides the needed driver. Furthermore, to enforce correct typing, the typing module will be loaded.
import mysql.connector
from mysql.connector import MySQLConnection
from typing import List, Dict, Tuple
from mysql.connector.cursor import MySQLCursor
Connect to database…
The function below simply establishes connection with the local MySQL and throws an exception when something wrong happens. The mysql.connector.connect call returns a MySQLConnection object which has a cursor function. This cursor function, when called, returns a MySQLCursor object by default, however depending on the arguments passed to the call another type of cursor will be returned. The different types of cursor allows the program to determine the format, by which the table rows are returned.
In case of an error, the general or base error class object is thrown. This will be caught by the except scope.
def switchDBON(self):
try:
self.connection = mysql.connector.connect(
**self.config)
self.cursor = self.connection.cursor()
print("Database connection established...")
except mysql.connector.Error as err:
print(err)
Create a database if it does not exist…
After a successful connection to the DB, the execute function of the MySQLCursor object is used to create our restaurant database. If the db already exists, an error will be thrown.
def setupDB(self):
print("Creating database...")
try:
self.cursor.execute(
"CREATE DATABASE IF NOT EXISTS {}".format(self.dbname))
except mysql.connector.Error as err:
print(err)
Create a table…
def createColumnStatementString(self, columnName: str, type: str, otherProps: str) -> str:
statement: str = "{} {} {}".format(columnName, type, otherProps)
return statement
The following code will generate the following statement: <br>
CREATE TABLE IF NOT EXISTS nigerian ( dish_price float NOT NULL,number_sold
int DEFAULT 0,id int NOT NULL AUTO_INCREMENT ,dish_name varchar(30) NOT NULL ,PRIMARY KEY ( id ))
This will create a table with the 4 columns and throw and error if the table already exists.
def createTable(self, tablename: str, createColumnStatements: List[str], primaryKey: str) -> None:
print("Creating table...")
createTableStatement: str = "\n\tCREATE TABLE IF NOT EXISTS {} (".format(
tablename)
columnStatements: str = ",".join(createColumnStatements)
primaryKeyDefinition: str = ",PRIMARY KEY ( {} ))".format(primaryKey)
executionString: str = "{} {} {}".format(
createTableStatement , columnStatements ,primaryKeyDefinition)
try:
self.cursor.execute(executionString)
except mysql.connector.Error as err:
print(err)
def setupNigerianRestaurantTable(self) -> None:
createColumnStatements: List[str] = {
self.createColumnStatementString(
"id", "int", "NOT NULL AUTO_INCREMENT "),
self.createColumnStatementString(
"dish_name", "varchar(30)", "NOT NULL"),
self.createColumnStatementString(
"dish_price", "float", "NOT NULL"),
self.createColumnStatementString("number_sold", "int", "DEFAULT 0")
}
self.createTable(self.tablename, createColumnStatements, "id")
Fill the table…
After the table has been created, it has to be filled. Each sub-list in the values 2D array represents a row in the table. The dish name and the price are the only values to be added to the table, while the others are either automatically generated (id ) or their default values used. Onet the “Insert” statements for each array is created, the script is run.
def populateTable(self) -> None:
print("Populating table...")
values: List[List[str]] = [["Egusi Soup and Eba", "24.5"],
["Porridge Yam", "12"], ["Porridge Beans", "13"]]
columns: List[str] = ["dish_name", "dish_price"]
columnsString: str = ",".join(columns)
preInsertExecString: str = "INSERT INTO {}.{}({})".format(self.dbname,
self.tablename, columnsString)
for row in values:
executionString: str = "{} VALUES (".format(preInsertExecString)
for index, col in enumerate(row):
executionString += "'{}'".format(col)
if index < len(row) - 1:
executionString += ","
executionString += ")"
print(executionString)
self.cursor.execute(executionString)
Display the table…
This method pulls all the rows from the table created in the previous steps. However, only the id, dish name and dish price columns are to be extracted. Luckily, the fetchall function of the MySQLCursor object allows us to collect all the rows at once.
def getTable(self) -> Tuple[List[str]]:
columns: List[str] = ["id", "dish_name", "dish_price", "number_sold"]
self.cursor.execute(
"SELECT * FROM {}.{}".format(self.dbname, self.tablename))
data: Tuple[List[str]] = self.cursor.fetchall()
return data
Once the data is collected, the table is printed as shown below.
def printOutTable(self) -> None:
'''
-------------------------------------------------------
* | id | dish_name | dish_price | number_sold |
* --------------------------+------------+--------------
* | 1 | Egusi Soup and Eba | 24.5 | 0 |
* | 2 | Egusi Soup and Eba | 24.5 | 0 |
* | 3 | Porridge Yam | 12 | 0 |
* | 4 | Porridge Beans | 13 | 0 |
* --------------------------+------------+-------------+
'''
data: Tuple[List[str]] = self.getTable()
header: str = ""
values: str = ""
line: str = "--------------------------------------------\n"
columns: List[str] = ["id", "dish_name", "dish_price", "number_sold"]
for column in columns:
print("| {} ".format(column), end=" ")
print("")
print(line)
for row in data:
for col in row:
print("| {} ".format(col), end=" ")
print("")
def __del__(self):
self.connection.close()
The blueprint
The functions shown above are all methods on the class provided in the following code snippet.
class ConnectDB:
tablename: str = "nigerian"
dbname = "restaurants"
connection: MySQLConnection
cursor: MySQLCursor
config = {
'user': 'root',
'password': '',
'host': '127.0.0.1',
'database': 'restaurants',
'raise_on_warnings': True
}
populated: bool = True
def __init__(self):
super().__init__()
self.switchDBON()
self.setupDB()
self.setupNigerianRestaurantTable()
if(self.populated == False):
self.populateTable()
self.printOutTable()
...
main: ConnectDB = ConnectDB()
Author Notes
This is most likely not the cleanest implementation, just a heads up.
Links
- Classes
- Type hinting
- Errors and Exceptions
- Destructors in Python
- Accessing the index in ‘for’ loops?
- cursor.MySQLCursor Class
Full code
import mysql.connector
from mysql.connector import MySQLConnection
from typing import List, Dict, Tuple
from mysql.connector.cursor import MySQLCursor
class ConnectDB:
tablename: str = "nigerian"
dbname = "restaurants"
connection: MySQLConnection
cursor: MySQLCursor
config = {
'user': 'root',
'password': '',
'host': '127.0.0.1',
'database': 'restaurants',
'raise_on_warnings': True
}
populated: bool = True
def __init__(self):
super().__init__()
self.switchDBON()
self.setupDB()
self.setupNigerianRestaurantTable()
if(self.populated == False):
self.populateTable()
self.printOutTable()
def switchDBON(self):
try:
self.connection = mysql.connector.connect(
**self.config)
self.cursor = self.connection.cursor()
print(type(self.cursor))
print("Database connection established...")
except mysql.connector.Error as err:
print(err)
def setupDB(self):
print("Creating database...")
try:
self.cursor.execute(
"CREATE DATABASE IF NOT EXISTS {}".format(self.dbname))
except mysql.connector.Error as err:
print(err)
def createColumnStatementString(self, columnName: str, type: str, otherProps: str) -> str:
statement: str = "{} {} {}".format(columnName, type, otherProps)
return statement
def setupNigerianRestaurantTable(self) -> None:
createColumnStatements: List[str] = {
self.createColumnStatementString(
"id", "int", "NOT NULL AUTO_INCREMENT "),
self.createColumnStatementString(
"dish_name", "varchar(30)", "NOT NULL"),
self.createColumnStatementString(
"dish_price", "float", "NOT NULL"),
self.createColumnStatementString("number_sold", "int", "DEFAULT 0")
}
self.createTable(self.tablename, createColumnStatements, "id")
def createTable(self, tablename: str, createColumnStatements: List[str], primaryKey: str) -> None:
print("Creating table...")
createTableStatement: str = "\n\tCREATE TABLE IF NOT EXISTS {} (".format(
tablename)
columnStatements: str = ",".join(createColumnStatements)
primaryKeyDefinition: str = ",PRIMARY KEY ( {} ))".format(primaryKey)
executionString: str = "{} {} {}".format(
createTableStatement, columnStatements, primaryKeyDefinition)
try:
# self.cursor.execute(executionString)
print(executionString)
except mysql.connector.Error as err:
print(err)
def populateTable(self) -> None:
print("Populating table...")
values: List[List[str]] = [["Egusi Soup and Eba", "24.5"],
["Porridge Yam", "12"], ["Porridge Beans", "13"]]
columns: List[str] = ["dish_name", "dish_price"]
columnsString: str = ",".join(columns)
preInsertExecString: str = "INSERT INTO {}.{}({})".format(self.dbname,
self.tablename, columnsString)
for row in values:
executionString: str = "{} VALUES (".format(preInsertExecString)
for index, col in enumerate(row):
executionString += "'{}'".format(col)
if index < len(row) - 1:
executionString += ","
executionString += ")"
print(executionString)
self.cursor.execute(executionString)
def getTable(self) -> Tuple[List[str]]:
columns: List[str] = ["id", "dish_name", "dish_price", "number_sold"]
self.cursor.execute(
"SELECT * FROM {}.{}".format(self.dbname, self.tablename))
data: Tuple[List[str]] = self.cursor.fetchall()
return data
def printOutTable(self) -> None:
'''
-------------------------------------------------------
* | id | dish_name | dish_price | number_sold |
* --------------------------+------------+--------------
* | 1 | Egusi Soup and Eba | 24.5 | 0 |
* | 2 | Egusi Soup and Eba | 24.5 | 0 |
* | 3 | Porridge Yam | 12 | 0 |
* | 4 | Porridge Beans | 13 | 0 |
* --------------------------+------------+-------------+
'''
data: Tuple[List[str]] = self.getTable()
header: str = ""
values: str = ""
line: str = "--------------------------------------------\n"
columns: List[str] = ["id", "dish_name", "dish_price", "number_sold"]
for column in columns:
print("| {} ".format(column), end=" ")
print("")
print(line)
for row in data:
for col in row:
print("| {} ".format(col), end=" ")
print("")
def __del__(self):
self.connection.close()
main: ConnectDB = ConnectDB()