Skip to content
This repository was archived by the owner on Oct 21, 2024. It is now read-only.

Commit e7da2a3

Browse files
committed
Create sqlite3worker.py
1 parent d62bd40 commit e7da2a3

File tree

1 file changed

+216
-0
lines changed

1 file changed

+216
-0
lines changed

sqlite3worker.py

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
import os
2+
import sqlite3
3+
from typing import Any, Union, Iterable, Literal, Optional
4+
5+
class __info__:
6+
name = "SQLite3Worker"
7+
version = ("0.9-beta", 0.9)
8+
authors = ("Роман Слабицкий",)
9+
10+
class __types__:
11+
INTEGER = ["INTEGER", "INT", "TINYINT", "SMALLINT", "MEDIUMINT", "BIGINT", "UNSIGNED BIG INT", "INT2", "INT8"]
12+
FLOAT = ["REAL", "DOUBLE", "DOUBLE PRECISION", "FLOATNONE"]
13+
STRING = ["TEXT", "CHARACTER(20)", "VARCHAR(255)", "VARYING CHARACTER(255)", "NCHAR(55)", "NATIVE CHARACTER(70)", "NVARCHAR(100)", "CLOB"]
14+
BOOLEAN = ["BOOLEAN"]
15+
BYTES = ["BLOB"]
16+
17+
class __func__:
18+
def to_sqltype(tp: Any, primary: bool=False) -> str:
19+
if tp == int:
20+
return "INTEGER" + (" PRIMARY KEY" if (primary) else "")
21+
elif tp == float:
22+
return "REAL" + (" PRIMARY KEY" if (primary) else "")
23+
elif tp == str:
24+
return "TEXT" + (" PRIMARY KEY" if (primary) else "")
25+
elif tp == bool:
26+
return "BOOLEAN"
27+
elif tp == bytes:
28+
return "BLOB"
29+
else:
30+
raise TypeError("This type of data is not supported")
31+
32+
def to_pythontype(tp: str) -> Any:
33+
if tp.upper() in __types__.INTEGER:
34+
return int
35+
elif tp.upper() in __types__.FLOAT:
36+
return float
37+
elif tp.upper() in __types__.STRING:
38+
return str
39+
elif tp.upper() in __types__.BOOLEAN:
40+
return bool
41+
elif tp.upper() in __types__.BYTES:
42+
return bytes
43+
else:
44+
return None
45+
46+
class SQLite3Worker():
47+
def __init__(
48+
self,
49+
path: str,
50+
check_same_thread: Optional[bool]=None
51+
) -> None:
52+
"""Создание файла SQLite3 или открытие существуещего для работы"""
53+
cst = check_same_thread or False
54+
self.path = os.path.abspath(path)
55+
self.db_connect = sqlite3.connect(self.path, check_same_thread=cst)
56+
self.db_cursor = self.db_connect.cursor()
57+
58+
def request(
59+
self,
60+
request_text: str,
61+
mode: Optional[Literal["a", "r", "w"]]=None,
62+
params: Optional[list[Any]]=None
63+
) -> Union[None, list, list[list]]:
64+
"""`Функция для запросов` к базе данных SQLite"""
65+
mode = mode or "a"
66+
if params is None:
67+
self.db_cursor.execute(request_text)
68+
else:
69+
self.db_cursor.execute(request_text, params)
70+
if mode == "w":
71+
self.db_connect.commit()
72+
out = None
73+
elif mode == "r":
74+
out = self.db_cursor.fetchall()
75+
elif mode == "a":
76+
self.db_cursor.execute(request_text)
77+
self.db_connect.commit()
78+
out = self.db_cursor.fetchall()
79+
else:
80+
out = None
81+
return out
82+
83+
def create_table(
84+
self,
85+
table_name: str,
86+
colons: dict[str, tuple[Any, bool]]
87+
) -> None:
88+
"""`Создание таблицы`, ЕСЛИ ЕЁ НЕТУ в базе данных SQLite"""
89+
colons_list = []
90+
for i in list(colons.items()):
91+
colons_list.append(
92+
"\"{0}\" {1}".format(i[0], __func__.to_sqltype(i[1][0], i[1][1]))
93+
)
94+
self.request("CREATE TABLE IF NOT EXISTS \"{0}\" ({1});".format(table_name, ", ".join(colons_list)), "w")
95+
96+
def get_tables_list(self) -> Union[list[str], list]:
97+
"""`Список таблиц` в базе данных SQLite"""
98+
tables_info, tables_list = self.request("SELECT * FROM sqlite_master WHERE type = 'table';", "r"), []
99+
for i in tables_info:
100+
tables_list.append(str(i[1]))
101+
return tables_list
102+
103+
def delete_table(self, table_name: str) -> None:
104+
"""`Удаление таблицы` из базы данных SQLite"""
105+
self.request(f"DROP TABLE IF EXISTS \"{table_name}\";", "w")
106+
107+
def get_colons_list(
108+
self,
109+
table_name: str
110+
) -> Union[list[dict[str, Any]], list]:
111+
"""`Возвращает список с информацией о колонках` в таблице базе данных SQLite"""
112+
colons, colons_list = self.request(f"PRAGMA table_info(\"{table_name}\");", "r"), []
113+
for i in colons:
114+
colons_list.append(
115+
{
116+
"id": int(i[0]),
117+
"name": str(i[1]),
118+
"type": __func__.to_pythontype(str(i[2])),
119+
"primary": True if (int(i[5]) == 1) else False
120+
}
121+
)
122+
colons_list.sort(key=lambda x: x["id"])
123+
return colons_list
124+
125+
def get_colons_names(
126+
self,
127+
table_name: str
128+
) -> Union[list[str], list]:
129+
"""`Возвращает список с именами колонок` в таблице базе данных SQLite"""
130+
return [i["name"] for i in self.get_colons_list(table_name)]
131+
132+
def add_data(self, table_name: str, data: Iterable[Any]) -> None:
133+
"""`Добавление данных в таблицу` базы данных SQLite"""
134+
if not isinstance(data, tuple):
135+
data = tuple(data)
136+
colons_names = [i["name"] for i in self.get_colons_list(table_name)]
137+
self.request(
138+
"INSERT INTO {0} ({1}) VALUES({2});".format(
139+
table_name,
140+
",".join(colons_names),
141+
",".join(["?" for i in range(len(data))])
142+
),
143+
"w",
144+
data
145+
)
146+
147+
def add_datas(
148+
self,
149+
table_name: str,
150+
data: Iterable[Iterable[Any]]
151+
) -> None:
152+
"""`Многократное добавление данных в таблицу` базы данных SQLite"""
153+
for i in data:
154+
self.add_data(table_name, i)
155+
156+
def get_data(
157+
self,
158+
table_name: str,
159+
colon_name: str,
160+
value: Any
161+
) -> Union[list[tuple[Any]], list]:
162+
"""`Возращает данные из таблицы` базы данных SQLite"""
163+
return self.request("SELECT * FROM {0} WHERE {1} = ?;".format(table_name, colon_name), "r", [value])
164+
165+
def get_data_all(self, table_name: str) -> Union[list[tuple[Any]], list]:
166+
"""`Возращает все данные из таблицы` базы данных SQLite"""
167+
return self.request(f"SELECT * FROM \"{table_name}\";", "r")
168+
169+
def get_count_data(self, table_name: str) -> int:
170+
"""`Возращает количество строк в таблице` базы данных SQLite"""
171+
return self.request(f"SELECT COUNT(*) FROM \"{table_name}\";", "r")[0][0]
172+
173+
def exists_data(
174+
self,
175+
table_name: str,
176+
colon_name: str,
177+
value: Any
178+
) -> Union[tuple[Literal[False], None], tuple[Literal[True], list]]:
179+
"""`Проверяет наличия данных в таблице` базе данных SQLite"""
180+
data = self.request(f"SELECT * FROM \"{table_name}\" WHERE \"{colon_name}\"= ?;", "r", [value])
181+
anwer = (len(data) != 0)
182+
return anwer, (data[0] if (anwer) else None)
183+
184+
def update_data(
185+
self,
186+
table_name: str,
187+
colon_name: str,
188+
value: Any,
189+
new_value: dict[str, Any]
190+
) -> None:
191+
"""`Обновляет данные в таблице` базе данных SQLite"""
192+
invalue = list(new_value.items())
193+
for i in invalue:
194+
self.request(
195+
"UPDATE \"{0}\" SET \"{1}\" = ? where \"{2}\" = ?;".format(
196+
table_name, i[0], colon_name
197+
),
198+
"w",
199+
[i[1], value]
200+
)
201+
202+
def delete_data(self, table_name: str, colon_name: str, value: Any) -> None:
203+
"""`Удаление данных из таблицы` базы данных SQLite"""
204+
self.request(f"DELETE FROM {table_name} WHERE {colon_name}= ?;", "w", [value])
205+
206+
def exists_colon(self, table_name: str, colon_name: str) -> bool:
207+
"""Проверка наличия `колонки` в таблице базе данных SQLite"""
208+
cl = self.get_colons_list(table_name)
209+
for i in cl:
210+
if i["name"] == colon_name:
211+
return True
212+
return False
213+
214+
def exists_table(self, table_name: str) -> bool:
215+
"""Проверка наличия `таблицы` базы данных SQLite"""
216+
return (table_name in self.get_tables_list())

0 commit comments

Comments
 (0)