Mini RDBMS (with persistent storage) using only the Python Standard Library
import re
import json
import os
from typing import Any, Dict, List
class Table:
def __init__(self, name: str, columns: List[str], storage_dir="data"):
self.name = name
self.columns = columns
self.rows: List[Dict[str, Any]] = []
self.storage_dir = storage_dir
os.makedirs(storage_dir, exist_ok=True)
self.filepath = os.path.join(storage_dir, f"{name}.json")
self._load()
def _load(self):
if os.path.exists(self.filepath):
with open(self.filepath, "r", encoding="utf-8") as f:
data = json.load(f)
self.columns = data["columns"]
self.rows = data["rows"]
def _save(self):
with open(self.filepath, "w", encoding="utf-8") as f:
json.dump({"columns": self.columns, "rows": self.rows}, f, indent=2)
def insert(self, values: List[Any]):
if len(values) != len(self.columns):
raise ValueError("Column count mismatch")
self.rows.append(dict(zip(self.columns, values)))
self._save()
def select(self, columns: List[str] = None):
if columns is None or columns == ["*"]:
return self.rows
return [{col: row[col] for col in columns} for row in self.rows]
class RDBMS:
def __init__(self, storage_dir="data"):
self.tables: Dict[str, Table] = {}
self.storage_dir = storage_dir
os.makedirs(storage_dir, exist_ok=True)
def execute(self, query: str):
query = query.strip()
if query.upper().startswith("CREATE TABLE"):
return self._create_table(query)
elif query.upper().startswith("INSERT INTO"):
return self._insert_into(query)
elif query.upper().startswith("SELECT"):
return self._select(query)
else:
raise ValueError("Unsupported query")
def _create_table(self, query: str):
match = re.match(r"CREATE TABLE (\w+)\s*\((.+)\)", query, re.IGNORECASE)
if not match:
raise ValueError("Invalid CREATE TABLE syntax")
table_name, cols = match.groups()
columns = [c.strip() for c in cols.split(",")]
self.tables[table_name] = Table(table_name, columns, self.storage_dir)
return f"Table {table_name} created with columns {columns}"
def _insert_into(self, query: str):
match = re.match(r"INSERT INTO (\w+)\s*VALUES\s*\((.+)\)", query, re.IGNORECASE)
if not match:
raise ValueError("Invalid INSERT syntax")
table_name, values = match.groups()
if table_name not in self.tables:
# Load existing table if not in memory
self.tables[table_name] = Table(table_name, [], self.storage_dir)
values = [v.strip().strip("'") for v in values.split(",")]
self.tables[table_name].insert(values)
return f"Inserted into {table_name}: {values}"
def _select(self, query: str):
match = re.match(r"SELECT (.+) FROM (\w+)", query, re.IGNORECASE)
if not match:
raise ValueError("Invalid SELECT syntax")
cols, table_name = match.groups()
if table_name not in self.tables:
self.tables[table_name] = Table(table_name, [], self.storage_dir)
cols = [c.strip() for c in cols.split(",")]
results = self.tables[table_name].select(cols)
return results
# Example usage
db = RDBMS()
print(db.execute("CREATE TABLE students (id, name, age)"))
print(db.execute("INSERT INTO students VALUES (1, 'Alice', 21)"))
print(db.execute("INSERT INTO students VALUES (2, 'Bob', 22)"))
print(db.execute("SELECT * FROM students"))
print(db.execute("SELECT name, age FROM students"))
No comments:
Post a Comment