feat: 增加 UNIQUE KEY 支持

This commit is contained in:
dhb52 2025-05-22 22:27:26 +08:00
parent 3140ec8960
commit 4401c703ef
1 changed files with 142 additions and 44 deletions

View File

@ -4,6 +4,14 @@
Author: dhb52 (https://gitee.com/dhb52)
pip install simple-ddl-parser
or with uv
uv run --with simple-ddl-parser convertor.py postgres > ../postgresql/ruoyi-vue-pro.sql 239ms 5/22 21:03:16 2025
uv run --with simple-ddl-parser convertor.py sqlserver > ../sqlserver/ruoyi-vue-pro.sql
uv run --with simple-ddl-parser convertor.py kingbase > ../kingbase/ruoyi-vue-pro.sql
uv run --with simple-ddl-parser convertor.py opengauss > ../opengauss/ruoyi-vue-pro.sql
uv run --with simple-ddl-parser convertor.py oracle > ../oracle/ruoyi-vue-pro.sql
uv run --with simple-ddl-parser convertor.py dm8 > ../dm/ruoyi-vue-pro-dm8.sql
"""
import argparse
@ -38,6 +46,7 @@ def load_and_clean(sql_file: str) -> str:
str: 清理后的sql文件内容
"""
REPLACE_PAIR_LIST = (
(")\nVALUES ", ") VALUES "),
(" CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci ", " "),
(" KEY `", " INDEX `"),
("UNIQUE INDEX", "UNIQUE KEY"),
@ -45,7 +54,7 @@ def load_and_clean(sql_file: str) -> str:
("b'1'", "'1'"),
)
content = open(sql_file).read()
content = open(sql_file, encoding="utf-8").read()
for replace_pair in REPLACE_PAIR_LIST:
content = content.replace(*replace_pair)
content = re.sub(r"ENGINE.*COMMENT", "COMMENT", content)
@ -110,18 +119,28 @@ class Convertor(ABC):
pass
@abstractmethod
def gen_comment(self, table_sql: str, table_name: str) -> str:
def gen_comment(self, table_ddl: Dict) -> str:
"""生成字段/表注释
Args:
table_sql (str): 原始表SQL
table_name (str): 表名
table_ddl (Dict): 表DDL
Returns:
str: 生成脚本
"""
pass
@abstractmethod
def gen_uk(self, table_ddl: Dict) -> str:
"""生成
Args:
table_ddl (Dict): 表DDL
Returns:
str: 生成脚本
"""
@abstractmethod
def gen_insert(self, table_name: str) -> str:
"""生成 insert 语句块
@ -178,6 +197,16 @@ class Convertor(ABC):
table_name = ddl["table_name"].lower()
yield f"CREATE INDEX idx_{table_name}_{no:02d} ON {table_name} ({columns})"
@staticmethod
def unique_index(ddl: Dict) -> Generator:
if "constraints" in ddl and "uniques" in ddl["constraints"]:
uk_list = ddl["constraints"]["uniques"]
for uk in uk_list:
table_name = ddl["table_name"]
uk_name = uk["constraint_name"]
uk_columns = uk["columns"]
yield table_name, uk_name, uk_columns
@staticmethod
def filed_comments(table_sql: str) -> Generator:
for line in table_sql.split("\n"):
@ -188,7 +217,7 @@ class Convertor(ABC):
yield field, comment_string
def table_comment(self, table_sql: str) -> str:
match = re.search(r"COMMENT \= '([^']+)';", table_sql)
match = re.search(r"COMMENT \='([^']+)';", table_sql)
return match.group(1) if match else None
def print(self):
@ -226,11 +255,21 @@ class Convertor(ABC):
if table_name.lower().startswith("qrtz"):
continue
# 为每个表生成个5个基本部分
# 解析注释
for column in table_ddl["columns"]:
column["comment"] = bytes(column["comment"], "utf-8").decode(
"unicode_escape"
)[1:-1]
table_ddl["comment"] = bytes(table_ddl["comment"], "utf-8").decode(
"unicode_escape"
)[1:-1]
# 为每个表生成个6个基本部分
create = self.gen_create(table_ddl)
pk = self.gen_pk(table_name)
uk = self.gen_uk(table_ddl)
index = self.gen_index(table_ddl)
comment = self.gen_comment(table_sql, table_name)
comment = self.gen_comment(table_ddl)
inserts = self.gen_insert(table_name)
# 组合当前表的DDL脚本
@ -238,6 +277,8 @@ class Convertor(ABC):
{pk}
{uk}
{index}
{comment}
@ -267,17 +308,19 @@ class PostgreSQLConvertor(Convertor):
if type == "varchar":
return f"varchar({size})"
if type == "int":
if type in ("int", "int unsigned"):
return "int4"
if type == "bigint" or type == "bigint unsigned":
if type in ("bigint", "bigint unsigned"):
return "int8"
if type == "datetime":
return "timestamp"
if type == "timestamp":
return f"timestamp({size})"
if type == "bit":
return "bool"
if type in ("tinyint", "smallint"):
return "int2"
if type == "text":
if type in ("text", "longtext"):
return "text"
if type in ("blob", "mediumblob"):
return "bytea"
@ -316,18 +359,22 @@ CREATE TABLE {table_name} (
def gen_index(self, ddl: Dict) -> str:
return "\n".join(f"{script};" for script in self.index(ddl))
def gen_comment(self, table_sql: str, table_name: str) -> str:
def gen_comment(self, table_ddl: Dict) -> str:
"""生成字段及表的注释"""
script = ""
for field, comment_string in self.filed_comments(table_sql):
for column in table_ddl["columns"]:
table_comment = column["comment"]
script += (
f"COMMENT ON COLUMN {table_name}.{field} IS '{comment_string}';" + "\n"
f"COMMENT ON COLUMN {table_ddl['table_name']}.{column['name']} IS '{table_comment}';"
+ "\n"
)
table_comment = self.table_comment(table_sql)
table_comment = table_ddl["comment"]
if table_comment:
script += f"COMMENT ON TABLE {table_name} IS '{table_comment}';\n"
script += (
f"COMMENT ON TABLE {table_ddl['table_name']} IS '{table_comment}';\n"
)
return script
@ -335,6 +382,15 @@ CREATE TABLE {table_name} (
"""生成主键定义"""
return f"ALTER TABLE {table_name} ADD CONSTRAINT pk_{table_name} PRIMARY KEY (id);\n"
def gen_uk(self, table_ddl: Dict) -> str:
script = ""
uk_list = list(Convertor.unique_index(table_ddl))
for idx, (table_name, _, uk_columns) in enumerate(uk_list, 1):
uk_name = f"uk_{table_name}_{idx:02d}"
script += f"CREATE UNIQUE INDEX {uk_name} ON {table_name} ({', '.join(uk_columns)});\n"
return script
def gen_insert(self, table_name: str) -> str:
"""生成 insert 语句,以及根据最后的 insert id+1 生成 Sequence"""
@ -393,17 +449,19 @@ class OracleConvertor(Convertor):
if type == "varchar":
return f"varchar2({size if size < 4000 else 4000})"
if type == "int":
if type in ("int", "int unsigned"):
return "number"
if type == "bigint" or type == "bigint unsigned":
return "number"
if type == "datetime":
return "date"
if type == "timestamp":
return f"timestamp({size})"
if type == "bit":
return "number(1,0)"
if type in ("tinyint", "smallint"):
return "smallint"
if type == "text":
if type in ("text", "longtext"):
return "clob"
if type in ("blob", "mediumblob"):
return "blob"
@ -423,6 +481,8 @@ class OracleConvertor(Convertor):
type = col["type"].lower()
full_type = self.translate_type(type, col["size"])
nullable = "NULL" if col["nullable"] else "NOT NULL"
# Oracle的 INSERT '' 不能通过NOT NULL校验因此对文字类型字段覆写为 NULL
nullable = "NULL" if type in ("varchar", "text", "longtext") else nullable
default = f"DEFAULT {col['default']}" if col["default"] is not None else ""
# Oracle 中 size 不能作为字段名
field_name = '"size"' if name == "size" else name
@ -447,16 +507,20 @@ CREATE TABLE {table_name} (
def gen_index(self, ddl: Dict) -> str:
return "\n".join(f"{script};" for script in self.index(ddl))
def gen_comment(self, table_sql: str, table_name: str) -> str:
def gen_comment(self, table_ddl: Dict) -> str:
script = ""
for field, comment_string in self.filed_comments(table_sql):
for column in table_ddl["columns"]:
table_comment = column["comment"]
script += (
f"COMMENT ON COLUMN {table_name}.{field} IS '{comment_string}';" + "\n"
f"COMMENT ON COLUMN {table_ddl['table_name']}.{column['name']} IS '{table_comment}';"
+ "\n"
)
table_comment = self.table_comment(table_sql)
table_comment = table_ddl["comment"]
if table_comment:
script += f"COMMENT ON TABLE {table_name} IS '{table_comment}';\n"
script += (
f"COMMENT ON TABLE {table_ddl['table_name']} IS '{table_comment}';\n"
)
return script
@ -464,6 +528,15 @@ CREATE TABLE {table_name} (
"""生成主键定义"""
return f"ALTER TABLE {table_name} ADD CONSTRAINT pk_{table_name} PRIMARY KEY (id);\n"
def gen_uk(self, table_ddl: Dict) -> str:
script = ""
uk_list = list(Convertor.unique_index(table_ddl))
for idx, (table_name, _, uk_columns) in enumerate(uk_list, 1):
uk_name = f"uk_{table_name}_{idx:02d}"
script += f"CREATE UNIQUE INDEX {uk_name} ON {table_name} ({', '.join(uk_columns)});\n"
return script
def gen_index(self, ddl: Dict) -> str:
return "\n".join(f"{script};" for script in self.index(ddl))
@ -521,17 +594,17 @@ class SQLServerConvertor(Convertor):
if type == "varchar":
return f"nvarchar({size if size < 4000 else 4000})"
if type == "int":
if type in ("int", "int unsigned"):
return "int"
if type == "bigint" or type == "bigint unsigned":
if type in ("bigint", "bigint unsigned"):
return "bigint"
if type == "datetime":
if type in ("datetime", "timestamp"):
return "datetime2"
if type == "bit":
return "varchar(1)"
if type in ("tinyint", "smallint"):
return "tinyint"
if type == "text":
if type in ("text", "longtext"):
return "nvarchar(max)"
if type in ("blob", "mediumblob"):
return "varbinary(max)"
@ -571,14 +644,18 @@ GO"""
return script
def gen_comment(self, table_sql: str, table_name: str) -> str:
def gen_comment(self, table_ddl: Dict) -> str:
"""生成字段及表的注释"""
script = ""
table_name = table_ddl["table_name"]
for column in table_ddl["columns"]:
column_comment = column["comment"]
field = column["name"]
for field, comment_string in self.filed_comments(table_sql):
script += f"""EXEC sp_addextendedproperty
'MS_Description', N'{comment_string}',
'MS_Description', N'{column_comment}',
'SCHEMA', N'dbo',
'TABLE', N'{table_name}',
'COLUMN', N'{field}'
@ -586,7 +663,7 @@ GO
"""
table_comment = self.table_comment(table_sql)
table_comment = table_ddl["comment"]
if table_comment:
script += f"""EXEC sp_addextendedproperty
'MS_Description', N'{table_comment}',
@ -601,6 +678,15 @@ GO
"""生成主键定义"""
return ""
def gen_uk(self, table_ddl: Dict) -> str:
script = ""
uk_list = list(Convertor.unique_index(table_ddl))
for idx, (table_name, _, uk_columns) in enumerate(uk_list, 1):
uk_name = f"uk_{table_name}_{idx:02d}"
script += f"CREATE UNIQUE INDEX {uk_name} ON {table_name} ({', '.join(uk_columns)})\nGO"
return script
def gen_index(self, ddl: Dict) -> str:
"""生成 index"""
return "\n".join(f"{script}\nGO" for script in self.index(ddl))
@ -674,22 +760,22 @@ class DM8Convertor(Convertor):
if type == "varchar":
return f"varchar({size})"
if type == "int":
if type in ("int", "int unsigned"):
return "int"
if type == "bigint" or type == "bigint unsigned":
if type in ("bigint", "bigint unsigned"):
return "bigint"
if type == "datetime":
return "datetime"
if type == "timestamp":
return f"timestamp({size})"
if type == "bit":
return "bit"
if type in ("tinyint", "smallint"):
return "smallint"
if type == "text":
if type in ("text", "longtext"):
return "text"
if type == "blob":
if type in ("blob", "mediumblob"):
return "blob"
if type == "mediumblob":
return "varchar(10240)"
if type == "decimal":
return (
f"decimal({','.join(str(s) for s in size)})" if len(size) else "decimal"
@ -724,19 +810,20 @@ CREATE TABLE {table_name} (
return script
def gen_index(self, ddl: Dict) -> str:
return "\n".join(f"{script};" for script in self.index(ddl))
def gen_comment(self, table_sql: str, table_name: str) -> str:
def gen_comment(self, table_ddl: Dict) -> str:
script = ""
for field, comment_string in self.filed_comments(table_sql):
for column in table_ddl["columns"]:
table_comment = column["comment"]
script += (
f"COMMENT ON COLUMN {table_name}.{field} IS '{comment_string}';" + "\n"
f"COMMENT ON COLUMN {table_ddl['table_name']}.{column['name']} IS '{table_comment}';"
+ "\n"
)
table_comment = self.table_comment(table_sql)
table_comment = table_ddl["comment"]
if table_comment:
script += f"COMMENT ON TABLE {table_name} IS '{table_comment}';\n"
script += (
f"COMMENT ON TABLE {table_ddl['table_name']} IS '{table_comment}';\n"
)
return script
@ -744,6 +831,15 @@ CREATE TABLE {table_name} (
"""生成主键定义"""
return ""
def gen_uk(self, table_ddl: Dict) -> str:
script = ""
uk_list = list(Convertor.unique_index(table_ddl))
for idx, (table_name, _, uk_columns) in enumerate(uk_list, 1):
uk_name = f"uk_{table_name}_{idx:02d}"
script += f"CREATE UNIQUE INDEX {uk_name} ON {table_name} ({', '.join(uk_columns)});\n"
return script
def gen_index(self, ddl: Dict) -> str:
return "\n".join(f"{script};" for script in self.index(ddl))
@ -784,6 +880,8 @@ class KingbaseConvertor(PostgreSQLConvertor):
type = col["type"].lower()
full_type = self.translate_type(type, col["size"])
nullable = "NULL" if col["nullable"] else "NOT NULL"
if full_type == "text":
nullable = "NULL"
default = f"DEFAULT {col['default']}" if col["default"] is not None else ""
return f"{name} {full_type} {nullable} {default}"