diff --git a/sql/tools/convertor.py b/sql/tools/convertor.py index f672cd73c1..7ab8ad1ef3 100644 --- a/sql/tools/convertor.py +++ b/sql/tools/convertor.py @@ -4,6 +4,14 @@ Author: dhb52 (https://gitee.com/dhb52) pip install simple-ddl-parser + +or with uv +uv run --with simple-ddl-parser convertor.py postgres > ../postgresql/ruoyi-vue-pro.sql 239ms  四 5/22 21:03:16 2025 +uv run --with simple-ddl-parser convertor.py sqlserver > ../sqlserver/ruoyi-vue-pro.sql +uv run --with simple-ddl-parser convertor.py kingbase > ../kingbase/ruoyi-vue-pro.sql +uv run --with simple-ddl-parser convertor.py opengauss > ../opengauss/ruoyi-vue-pro.sql +uv run --with simple-ddl-parser convertor.py oracle > ../oracle/ruoyi-vue-pro.sql +uv run --with simple-ddl-parser convertor.py dm8 > ../dm/ruoyi-vue-pro-dm8.sql """ import argparse @@ -38,6 +46,7 @@ def load_and_clean(sql_file: str) -> str: str: 清理后的sql文件内容 """ REPLACE_PAIR_LIST = ( + (")\nVALUES ", ") VALUES "), (" CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci ", " "), (" KEY `", " INDEX `"), ("UNIQUE INDEX", "UNIQUE KEY"), @@ -45,7 +54,7 @@ def load_and_clean(sql_file: str) -> str: ("b'1'", "'1'"), ) - content = open(sql_file).read() + content = open(sql_file, encoding="utf-8").read() for replace_pair in REPLACE_PAIR_LIST: content = content.replace(*replace_pair) content = re.sub(r"ENGINE.*COMMENT", "COMMENT", content) @@ -110,18 +119,28 @@ class Convertor(ABC): pass @abstractmethod - def gen_comment(self, table_sql: str, table_name: str) -> str: + def gen_comment(self, table_ddl: Dict) -> str: """生成字段/表注释 Args: - table_sql (str): 原始表SQL - table_name (str): 表名 + table_ddl (Dict): 表DDL Returns: str: 生成脚本 """ pass + @abstractmethod + def gen_uk(self, table_ddl: Dict) -> str: + """生成 + + Args: + table_ddl (Dict): 表DDL + + Returns: + str: 生成脚本 + """ + @abstractmethod def gen_insert(self, table_name: str) -> str: """生成 insert 语句块 @@ -178,6 +197,16 @@ class Convertor(ABC): table_name = ddl["table_name"].lower() yield f"CREATE INDEX idx_{table_name}_{no:02d} ON {table_name} ({columns})" + @staticmethod + def unique_index(ddl: Dict) -> Generator: + if "constraints" in ddl and "uniques" in ddl["constraints"]: + uk_list = ddl["constraints"]["uniques"] + for uk in uk_list: + table_name = ddl["table_name"] + uk_name = uk["constraint_name"] + uk_columns = uk["columns"] + yield table_name, uk_name, uk_columns + @staticmethod def filed_comments(table_sql: str) -> Generator: for line in table_sql.split("\n"): @@ -188,7 +217,7 @@ class Convertor(ABC): yield field, comment_string def table_comment(self, table_sql: str) -> str: - match = re.search(r"COMMENT \= '([^']+)';", table_sql) + match = re.search(r"COMMENT \='([^']+)';", table_sql) return match.group(1) if match else None def print(self): @@ -226,11 +255,21 @@ class Convertor(ABC): if table_name.lower().startswith("qrtz"): continue - # 为每个表生成个5个基本部分 + # 解析注释 + for column in table_ddl["columns"]: + column["comment"] = bytes(column["comment"], "utf-8").decode( + "unicode_escape" + )[1:-1] + table_ddl["comment"] = bytes(table_ddl["comment"], "utf-8").decode( + "unicode_escape" + )[1:-1] + + # 为每个表生成个6个基本部分 create = self.gen_create(table_ddl) pk = self.gen_pk(table_name) + uk = self.gen_uk(table_ddl) index = self.gen_index(table_ddl) - comment = self.gen_comment(table_sql, table_name) + comment = self.gen_comment(table_ddl) inserts = self.gen_insert(table_name) # 组合当前表的DDL脚本 @@ -238,6 +277,8 @@ class Convertor(ABC): {pk} +{uk} + {index} {comment} @@ -267,17 +308,19 @@ class PostgreSQLConvertor(Convertor): if type == "varchar": return f"varchar({size})" - if type == "int": + if type in ("int", "int unsigned"): return "int4" - if type == "bigint" or type == "bigint unsigned": + if type in ("bigint", "bigint unsigned"): return "int8" if type == "datetime": return "timestamp" + if type == "timestamp": + return f"timestamp({size})" if type == "bit": return "bool" if type in ("tinyint", "smallint"): return "int2" - if type == "text": + if type in ("text", "longtext"): return "text" if type in ("blob", "mediumblob"): return "bytea" @@ -316,18 +359,22 @@ CREATE TABLE {table_name} ( def gen_index(self, ddl: Dict) -> str: return "\n".join(f"{script};" for script in self.index(ddl)) - def gen_comment(self, table_sql: str, table_name: str) -> str: + def gen_comment(self, table_ddl: Dict) -> str: """生成字段及表的注释""" script = "" - for field, comment_string in self.filed_comments(table_sql): + for column in table_ddl["columns"]: + table_comment = column["comment"] script += ( - f"COMMENT ON COLUMN {table_name}.{field} IS '{comment_string}';" + "\n" + f"COMMENT ON COLUMN {table_ddl['table_name']}.{column['name']} IS '{table_comment}';" + + "\n" ) - table_comment = self.table_comment(table_sql) + table_comment = table_ddl["comment"] if table_comment: - script += f"COMMENT ON TABLE {table_name} IS '{table_comment}';\n" + script += ( + f"COMMENT ON TABLE {table_ddl['table_name']} IS '{table_comment}';\n" + ) return script @@ -335,6 +382,15 @@ CREATE TABLE {table_name} ( """生成主键定义""" return f"ALTER TABLE {table_name} ADD CONSTRAINT pk_{table_name} PRIMARY KEY (id);\n" + def gen_uk(self, table_ddl: Dict) -> str: + script = "" + uk_list = list(Convertor.unique_index(table_ddl)) + for idx, (table_name, _, uk_columns) in enumerate(uk_list, 1): + uk_name = f"uk_{table_name}_{idx:02d}" + script += f"CREATE UNIQUE INDEX {uk_name} ON {table_name} ({', '.join(uk_columns)});\n" + + return script + def gen_insert(self, table_name: str) -> str: """生成 insert 语句,以及根据最后的 insert id+1 生成 Sequence""" @@ -393,17 +449,19 @@ class OracleConvertor(Convertor): if type == "varchar": return f"varchar2({size if size < 4000 else 4000})" - if type == "int": + if type in ("int", "int unsigned"): return "number" if type == "bigint" or type == "bigint unsigned": return "number" if type == "datetime": return "date" + if type == "timestamp": + return f"timestamp({size})" if type == "bit": return "number(1,0)" if type in ("tinyint", "smallint"): return "smallint" - if type == "text": + if type in ("text", "longtext"): return "clob" if type in ("blob", "mediumblob"): return "blob" @@ -423,6 +481,8 @@ class OracleConvertor(Convertor): type = col["type"].lower() full_type = self.translate_type(type, col["size"]) nullable = "NULL" if col["nullable"] else "NOT NULL" + # Oracle的 INSERT '' 不能通过NOT NULL校验,因此对文字类型字段覆写为 NULL + nullable = "NULL" if type in ("varchar", "text", "longtext") else nullable default = f"DEFAULT {col['default']}" if col["default"] is not None else "" # Oracle 中 size 不能作为字段名 field_name = '"size"' if name == "size" else name @@ -447,16 +507,20 @@ CREATE TABLE {table_name} ( def gen_index(self, ddl: Dict) -> str: return "\n".join(f"{script};" for script in self.index(ddl)) - def gen_comment(self, table_sql: str, table_name: str) -> str: + def gen_comment(self, table_ddl: Dict) -> str: script = "" - for field, comment_string in self.filed_comments(table_sql): + for column in table_ddl["columns"]: + table_comment = column["comment"] script += ( - f"COMMENT ON COLUMN {table_name}.{field} IS '{comment_string}';" + "\n" + f"COMMENT ON COLUMN {table_ddl['table_name']}.{column['name']} IS '{table_comment}';" + + "\n" ) - table_comment = self.table_comment(table_sql) + table_comment = table_ddl["comment"] if table_comment: - script += f"COMMENT ON TABLE {table_name} IS '{table_comment}';\n" + script += ( + f"COMMENT ON TABLE {table_ddl['table_name']} IS '{table_comment}';\n" + ) return script @@ -464,6 +528,15 @@ CREATE TABLE {table_name} ( """生成主键定义""" return f"ALTER TABLE {table_name} ADD CONSTRAINT pk_{table_name} PRIMARY KEY (id);\n" + def gen_uk(self, table_ddl: Dict) -> str: + script = "" + uk_list = list(Convertor.unique_index(table_ddl)) + for idx, (table_name, _, uk_columns) in enumerate(uk_list, 1): + uk_name = f"uk_{table_name}_{idx:02d}" + script += f"CREATE UNIQUE INDEX {uk_name} ON {table_name} ({', '.join(uk_columns)});\n" + + return script + def gen_index(self, ddl: Dict) -> str: return "\n".join(f"{script};" for script in self.index(ddl)) @@ -521,17 +594,17 @@ class SQLServerConvertor(Convertor): if type == "varchar": return f"nvarchar({size if size < 4000 else 4000})" - if type == "int": + if type in ("int", "int unsigned"): return "int" - if type == "bigint" or type == "bigint unsigned": + if type in ("bigint", "bigint unsigned"): return "bigint" - if type == "datetime": + if type in ("datetime", "timestamp"): return "datetime2" if type == "bit": return "varchar(1)" if type in ("tinyint", "smallint"): return "tinyint" - if type == "text": + if type in ("text", "longtext"): return "nvarchar(max)" if type in ("blob", "mediumblob"): return "varbinary(max)" @@ -571,14 +644,18 @@ GO""" return script - def gen_comment(self, table_sql: str, table_name: str) -> str: + def gen_comment(self, table_ddl: Dict) -> str: """生成字段及表的注释""" script = "" + table_name = table_ddl["table_name"] + + for column in table_ddl["columns"]: + column_comment = column["comment"] + field = column["name"] - for field, comment_string in self.filed_comments(table_sql): script += f"""EXEC sp_addextendedproperty - 'MS_Description', N'{comment_string}', + 'MS_Description', N'{column_comment}', 'SCHEMA', N'dbo', 'TABLE', N'{table_name}', 'COLUMN', N'{field}' @@ -586,7 +663,7 @@ GO """ - table_comment = self.table_comment(table_sql) + table_comment = table_ddl["comment"] if table_comment: script += f"""EXEC sp_addextendedproperty 'MS_Description', N'{table_comment}', @@ -601,6 +678,15 @@ GO """生成主键定义""" return "" + def gen_uk(self, table_ddl: Dict) -> str: + script = "" + uk_list = list(Convertor.unique_index(table_ddl)) + for idx, (table_name, _, uk_columns) in enumerate(uk_list, 1): + uk_name = f"uk_{table_name}_{idx:02d}" + script += f"CREATE UNIQUE INDEX {uk_name} ON {table_name} ({', '.join(uk_columns)})\nGO" + + return script + def gen_index(self, ddl: Dict) -> str: """生成 index""" return "\n".join(f"{script}\nGO" for script in self.index(ddl)) @@ -674,22 +760,22 @@ class DM8Convertor(Convertor): if type == "varchar": return f"varchar({size})" - if type == "int": + if type in ("int", "int unsigned"): return "int" - if type == "bigint" or type == "bigint unsigned": + if type in ("bigint", "bigint unsigned"): return "bigint" if type == "datetime": return "datetime" + if type == "timestamp": + return f"timestamp({size})" if type == "bit": return "bit" if type in ("tinyint", "smallint"): return "smallint" - if type == "text": + if type in ("text", "longtext"): return "text" - if type == "blob": + if type in ("blob", "mediumblob"): return "blob" - if type == "mediumblob": - return "varchar(10240)" if type == "decimal": return ( f"decimal({','.join(str(s) for s in size)})" if len(size) else "decimal" @@ -724,19 +810,20 @@ CREATE TABLE {table_name} ( return script - def gen_index(self, ddl: Dict) -> str: - return "\n".join(f"{script};" for script in self.index(ddl)) - - def gen_comment(self, table_sql: str, table_name: str) -> str: + def gen_comment(self, table_ddl: Dict) -> str: script = "" - for field, comment_string in self.filed_comments(table_sql): + for column in table_ddl["columns"]: + table_comment = column["comment"] script += ( - f"COMMENT ON COLUMN {table_name}.{field} IS '{comment_string}';" + "\n" + f"COMMENT ON COLUMN {table_ddl['table_name']}.{column['name']} IS '{table_comment}';" + + "\n" ) - table_comment = self.table_comment(table_sql) + table_comment = table_ddl["comment"] if table_comment: - script += f"COMMENT ON TABLE {table_name} IS '{table_comment}';\n" + script += ( + f"COMMENT ON TABLE {table_ddl['table_name']} IS '{table_comment}';\n" + ) return script @@ -744,6 +831,15 @@ CREATE TABLE {table_name} ( """生成主键定义""" return "" + def gen_uk(self, table_ddl: Dict) -> str: + script = "" + uk_list = list(Convertor.unique_index(table_ddl)) + for idx, (table_name, _, uk_columns) in enumerate(uk_list, 1): + uk_name = f"uk_{table_name}_{idx:02d}" + script += f"CREATE UNIQUE INDEX {uk_name} ON {table_name} ({', '.join(uk_columns)});\n" + + return script + def gen_index(self, ddl: Dict) -> str: return "\n".join(f"{script};" for script in self.index(ddl)) @@ -784,6 +880,8 @@ class KingbaseConvertor(PostgreSQLConvertor): type = col["type"].lower() full_type = self.translate_type(type, col["size"]) nullable = "NULL" if col["nullable"] else "NOT NULL" + if full_type == "text": + nullable = "NULL" default = f"DEFAULT {col['default']}" if col["default"] is not None else "" return f"{name} {full_type} {nullable} {default}"