ALTER TABLE(Add Column)
声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
本文主要参考了 OpenGauss5.1.0 的开源代码和《OpenGauss数据库源码解析》一书
概述
ALTER TABLE 命令中的 Add Column 子命令用于向已存在的数据库表中添加新的列。这个过程涉及到更新表的元数据,以及可能对表的物理结构进行调整,特别是在需要初始化新列的默认值时。在数据库系统中,执行 Add Column 操作通常需要考虑锁定机制以避免与其他数据库操作冲突,并且必须确保数据的完整性和一致性不被破坏。此外,添加列的操作还可能触发相关的事件触发器,这些触发器可以用于执行一些定制的逻辑,如自动更新相关视图或维护日志等。整个过程需要高效地管理,以最小化对数据库性能的影响,特别是在涉及大量数据或高并发环境下。
ATController 函数
ATController 函数属于 openGauss 数据库系统中处理 ALTER TABLE 语句的核心控制逻辑。此函数主要负责协调 ALTER TABLE 操作的各个阶段,包括检查命令、准备命令、更新系统目录、分布式系统的特定处理以及实际的表重写。
这个函数主要负责处理 ALTER TABLE 命令的执行,涵盖了从命令检查、预处理,到系统目录更新和表的实际修改。在分布式环境中(如 PGXC,PostgreSQL-XC),还处理数据的重新分配,确保在分布式节点间数据一致性和表结构的同步。通过多阶段的设计,确保了操作的原子性和一致性,同时通过精细的错误检查和条件处理,保持了系统的健壮性。函数源码如下所示(路径:src\gausskernel\optimizer\commands\tablecmds.cpp
)
// 定义函数ATController,参数包括解析树、关系、命令列表、是否递归和锁模式
static void ATController(AlterTableStmt *parsetree, Relation rel, List* cmds, bool recurse, LOCKMODE lockmode)
{
List* wqueue = NIL; // 工作队列,用于存储处理过程中的临时信息
ListCell* lcmd = NULL; // 用于遍历命令列表的循环变量
#ifdef PGXC
RedistribState* redistribState = NULL; // 分布式状态,用于处理数据重新分配
bool doRedistribute = false; // 标记是否需要进行数据重新分配
#endif
// 第一阶段:对命令进行初步检查,创建工作队列
foreach (lcmd, cmds) {
AlterTableCmd* cmd = (AlterTableCmd*)lfirst(lcmd); // 从命令列表中取出命令
#ifdef PGXC
// 在集群中检查ALTER TABLE的限制
ATCheckCmd(rel, cmd);
#endif
// 检查账本表命令的兼容性
ATCheckLedgerTableCmd(rel, cmd);
// 准备命令,包括验证和设置工作队列
ATPrepCmd(&wqueue, rel, cmd, recurse, false, lockmode);
}
#ifdef PGXC
// 仅在本地协调器上检查
if (IS_PGXC_COORDINATOR) {
ListCell* ltab = NULL;
// 对于父表进行数据重新分配处理,不适用于子表或递归情况
foreach (ltab, wqueue) {
AlteredTableInfo* tab = (AlteredTableInfo*)lfirst(ltab);
// 如果当前关系是处理队列中的关系且有重新分配命令
if (RelationGetRelid(rel) == tab->relid && list_length(tab->subcmds[AT_PASS_DISTRIB]) > 0) {
// 检查是否有与重新分配不兼容的命令
doRedistribute = true;
if (!IsConnFromCoord()) {
if (list_length(tab->subcmds[AT_PASS_ADD_COL]) > 0 || list_length(tab->subcmds[AT_PASS_DROP]) > 0 ||
list_length(tab->subcmds[AT_PASS_ALTER_TYPE]) > 0 ||
list_length(tab->subcmds[AT_PASS_OLD_CONSTR]) > 0 ||
list_length(tab->subcmds[AT_PASS_COL_ATTRS]) > 0 ||
list_length(tab->subcmds[AT_PASS_ADD_INDEX]) > 0 ||
list_length(tab->subcmds[AT_PASS_ADD_CONSTR]) > 0 ||
list_length(tab->subcmds[AT_PASS_MISC]) > 0)
ereport(ERROR, // 报错,操作不兼容
(errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
errmsg("Incompatible operation with data redistribution")));
// 扫描重新分配命令并优化操作
redistribState = BuildRedistribCommands(RelationGetRelid(rel), tab->subcmds[AT_PASS_DISTRIB]);
}
break;
}
}
}
#endif
// 关闭关系,但直到提交前保持锁
relation_close(rel, NoLock);
// 第二阶段:更新系统目录
ATRewriteCatalogs(&wqueue, lockmode);
#ifdef PGXC
// 无效缓存以重新分配关系
if (doRedistribute) {
Relation rel2 = relation_open(RelationGetRelid(rel), NoLock);
// 使与此关系相关的所有条目无效
CacheInvalidateRelcache(rel2);
// 确保重新构建定位信息
RelationCacheInvalidateEntry(RelationGetRelid(rel));
relation_close(rel2, NoLock);
}
// 如果有重新分配状态,则释放它
if (redistribState != NULL)
FreeRedistribState(redistribState);
#endif
// 第三阶段:扫描/重写表
ATRewriteTables(parsetree, &wqueue, lockmode);
}
ATPrepCmd 函数
函数 ATPrepCmd 是用于处理 ALTER TABLE 命令在执行前的预处理的关键函数。这个函数作为“交通警察”(Traffic cop)角色,负责第一阶段的操作,包括简单的递归处理、权限检查、以及准备命令的进一步执行。它通过分析传入的 AlterTableCmd 命令来决定如何处理各种 ALTER TABLE 子命令,如添加列、添加分区、修改列类型等,并将这些命令根据类型分配到不同的处理队列中。
具体来说,这个函数首先为每个表创建或查找一个工作队列条目,复制原始子命令以避免在处理不同子表时出现解析冲突。然后,根据子命令的类型,执行相应的权限检查,并根据需要对子表进行递归处理。对于每种类型的命令,函数会调用相应的预处理函数(如 ATPrepAddColumn、ATPrepDropColumn 等),并根据命令的性质决定是否在执行阶段进行递归。最后,根据命令的具体类型,将命令添加到工作队列的相应部分,以便在第二阶段进行系统目录的更新和实际的表更改。函数源码如下所示(路径:src\gausskernel\optimizer\commands\tablecmds.cpp
)
/*
* ATPrepCmd
*
* Traffic cop for ALTER TABLE Phase 1 operations, including simple
* recursion and permission checks.
*
* Caller must have acquired appropriate lock type on relation already.
* This lock should be held until commit.
*/
static void ATPrepCmd(List** wqueue, Relation rel, AlterTableCmd* cmd, bool recurse, bool recursing, LOCKMODE lockmode,
bool isDeltaTable)
{
AlteredTableInfo* tab = NULL;
int pass;
/* Find or create work queue entry for this table */
tab = ATGetQueueEntry(wqueue, rel, isDeltaTable);
/*
* Copy the original subcommand for each table. This avoids conflicts
* when different child tables need to make different parse
* transformations (for example, the same column may have different column
* numbers in different children).
*/
cmd = (AlterTableCmd*)copyObject(cmd);
/*
* Do permissions checking, recursion to child tables if needed, and any
* additional phase-1 processing needed.
*/
switch (cmd->subtype) {
case AT_AddColumn: /* ADD COLUMN */
ATSimplePermissions(rel, ATT_TABLE | ATT_COMPOSITE_TYPE | ATT_FOREIGN_TABLE | ATT_SEQUENCE);
ATPrepAddColumn(wqueue, tab, rel, recurse, recursing, cmd, lockmode);
/* Recursion occurs during execution phase */
pass = AT_PASS_ADD_COL;
break;
case AT_AddPartition: /* ADD PARTITION */
ATSimplePermissions(rel, ATT_TABLE);
ATPrepAddPartition(rel);
/* Recursion occurs during execution phase */
pass = AT_PASS_ADD_PARTITION;
break;
case AT_AddSubPartition: /* ADD SUBPARTITION */
ATSimplePermissions(rel, ATT_TABLE);
ATPrepAddSubPartition(rel);
/* ADD SUBPARTITION obeys the same recursion order with ADD PARTITION */
pass = AT_PASS_ADD_PARTITION;
break;
case AT_AddColumnToView: /* add column via CREATE OR REPLACE
* VIEW */
ATSimplePermissions(rel, ATT_VIEW);
ATPrepAddColumn(wqueue, NULL, rel, recurse, recursing, cmd, lockmode);
/* Recursion occurs during execution phase */
pass = AT_PASS_ADD_COL;
break;
case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */
/*
* We allow defaults on views so that INSERT into a view can have
* default-ish behavior. This works because the rewriter
* substitutes default values into INSERTs before it expands
* rules.
*/
ATSimplePermissions(rel, ATT_TABLE | ATT_VIEW);
ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
ATPrepCheckDefault(cmd->def);
/* No command-specific prep needed */
pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
break;
case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* No command-specific prep needed */
pass = AT_PASS_DROP;
ATCheckNotNullConstr(cmd, tab);
break;
case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* No command-specific prep needed */
pass = AT_PASS_ADD_CONSTR;
ATCheckNotNullConstr(cmd, tab);
break;
case AT_SetStatistics: /* ALTER COLUMN SET STATISTICS */
ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* Performs own permission checks */
ATPrepSetStatistics(rel);
pass = AT_PASS_MISC;
break;
case AT_AddStatistics: /* ADD STATISTICS */
case AT_DeleteStatistics: /* DELETE STATISTICS */
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* Performs own permission checks */
ATPrepSetStatistics(rel);
es_check_alter_table_statistics(rel, cmd);
pass = AT_PASS_MISC;
break;
case AT_SetOptions: /* ALTER COLUMN SET ( options ) */
case AT_ResetOptions: /* ALTER COLUMN RESET ( options ) */
ATSimplePermissions(rel, ATT_TABLE | ATT_MATVIEW | ATT_INDEX | ATT_FOREIGN_TABLE);
/* This command never recurses */
pass = AT_PASS_MISC;
break;
case AT_SetStorage: /* ALTER COLUMN SET STORAGE */
ATSimplePermissions(rel, ATT_TABLE | ATT_MATVIEW);
ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
case AT_DropColumn: /* DROP COLUMN */
ATSimplePermissions(rel,
ATT_TABLE | ATT_COMPOSITE_TYPE | ATT_FOREIGN_TABLE |
(u_sess->attr.attr_common.IsInplaceUpgrade ? ATT_VIEW : ATT_NULL));
ATPrepDropColumn(wqueue, rel, recurse, recursing, cmd, lockmode);
/* Recursion occurs during execution phase */
pass = AT_PASS_DROP;
break;
case AT_DropPartition: /* DROP PARTITION */
ATSimplePermissions(rel, ATT_TABLE);
ATPrepDropPartition(rel);
/* Recursion occurs during execution phase */
pass = AT_PASS_DROP;
break;
case AT_DropSubPartition: /* DROP SUBPARTITION */
ATSimplePermissions(rel, ATT_TABLE);
ATPrepDropSubPartition(rel);
/* Recursion occurs during execution phase */
pass = AT_PASS_DROP;
break;
case AT_UnusableIndexPartition: /* UNUSEABLE INDEX PARTITION */
ATSimplePermissions(rel, ATT_INDEX);
ATPrepUnusableIndexPartition(rel);
/* Recursion occurs during execution phase */
pass = AT_PASS_MISC;
break;
case AT_UnusableAllIndexOnPartition: /* UNUSEABLE ALL INDEX ON PARTITION */
ATSimplePermissions(rel, ATT_TABLE | ATT_INDEX);
ATPrepUnusableAllIndexOnPartition(rel);
/* Recursion occurs during execution phase */
pass = AT_PASS_MISC;
break;
case AT_AddIndex: /* ADD INDEX */
ATSimplePermissions(rel, ATT_TABLE | ATT_MATVIEW | ATT_FOREIGN_TABLE);
/* This command never recurses */
/* No command-specific prep needed */
pass = AT_PASS_ADD_INDEX;
break;
case AT_AddConstraint: /* ADD CONSTRAINT */
ATSimplePermissions(rel, ATT_TABLE);
/* Recursion occurs during execution phase */
/* No command-specific prep needed except saving recurse flag */
if (recurse)
cmd->subtype = AT_AddConstraintRecurse;
pass = AT_PASS_ADD_CONSTR;
break;
case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */
ATSimplePermissions(rel, ATT_TABLE);
/* This command never recurses */
/* No command-specific prep needed */
pass = AT_PASS_ADD_CONSTR;
break;
case AT_DropConstraint: /* DROP CONSTRAINT */
/* @hdfs
* ATSimplePermissions's second parameter is change from ATT_TABLE to
* ATT_TABLE|ATT_FOREIGN_TABLE to suppert droping HDFS foreign table.
*/
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
/* Recursion occurs during execution phase */
/* No command-specific prep needed except saving recurse flag */
if (recurse)
cmd->subtype = AT_DropConstraintRecurse;
pass = AT_PASS_DROP;
break;
case AT_AlterColumnType: /* ALTER COLUMN TYPE */
ATSimplePermissions(rel, ATT_TABLE | ATT_COMPOSITE_TYPE | ATT_FOREIGN_TABLE);
/* Performs own recursion */
ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd, lockmode);
pass = AT_PASS_ALTER_TYPE;
ATCheckDuplicateColumn(cmd, tab->subcmds[pass]);
break;
case AT_AlterColumnGenericOptions:
ATSimplePermissions(rel, ATT_FOREIGN_TABLE);
/* This command never recurses */
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
case AT_ChangeOwner: /* ALTER OWNER */
/* This command never recurses */
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
case AT_ClusterOn: /* CLUSTER ON */
case AT_DropCluster: /* SET WITHOUT CLUSTER */
ATSimplePermissions(rel, ATT_TABLE | ATT_MATVIEW);
/* These commands never recurse */
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
case AT_AddOids: /* SET WITH OIDS */
/*
* partitioned table can not be setted with or without oids
*/
if (RELATION_IS_PARTITIONED(rel)) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot set with oids on partitioned table")));
}
ATSimplePermissions(rel, ATT_TABLE);
if (!rel->rd_rel->relhasoids || recursing)
ATPrepAddOids(wqueue, rel, recurse, cmd, lockmode);
/* Recursion occurs during execution phase */
pass = AT_PASS_ADD_COL;
break;
case AT_DropOids: /* SET WITHOUT OIDS */
/*
* partitioned table can not be setted with or without oids
*/
if (RELATION_IS_PARTITIONED(rel)) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot set without oids on partitioned table")));
}
ATSimplePermissions(rel, ATT_TABLE);
/* Performs own recursion */
if (rel->rd_rel->relhasoids) {
AlterTableCmd* dropCmd = makeNode(AlterTableCmd);
dropCmd->subtype = AT_DropColumn;
dropCmd->name = pstrdup("oid");
dropCmd->behavior = cmd->behavior;
ATPrepCmd(wqueue, rel, dropCmd, recurse, false, lockmode);
}
pass = AT_PASS_DROP;
break;
case AT_SetTableSpace: /* SET TABLESPACE */
case AT_SetPartitionTableSpace:
ATSimplePermissions(rel, ATT_TABLE | ATT_INDEX | ATT_MATVIEW);
/* This command never recurses */
ATPrepSetTableSpace(tab, rel, cmd->name, lockmode);
pass = AT_PASS_MISC; /* doesn't actually matter */
break;
case AT_UnusableIndex:
case AT_SetRelOptions: /* SET (...) */
case AT_ResetRelOptions: /* RESET (...) */
case AT_ReplaceRelOptions: /* reset them all, then set just these */
case AT_InvisibleIndex:
case AT_VisibleIndex:
ATSimplePermissions(rel, ATT_TABLE | ATT_INDEX | ATT_VIEW);
/* This command never recurses */
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
case AT_AddInherit: /* INHERIT */
ATSimplePermissions(rel, ATT_TABLE);
/* This command never recurses */
ATPrepAddInherit(rel);
pass = AT_PASS_MISC;
break;
case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */
ATSimplePermissions(rel, ATT_TABLE);
/* Recursion occurs during execution phase */
/* No command-specific prep needed except saving recurse flag */
if (recurse)
cmd->subtype = AT_ValidateConstraintRecurse;
pass = AT_PASS_MISC;
break;
case AT_ReplicaIdentity: /* REPLICA IDENTITY ... */
ATSimplePermissions(rel, ATT_TABLE);
pass = AT_PASS_MISC;
/* This command never recurses */
/* No command-specific prep needed */
break;
case AT_EnableTrig: /* ENABLE TRIGGER variants */
case AT_EnableAlwaysTrig:
case AT_EnableReplicaTrig:
case AT_EnableTrigAll:
case AT_EnableTrigUser:
case AT_DisableTrig: /* DISABLE TRIGGER variants */
case AT_DisableTrigAll:
case AT_DisableTrigUser:
case AT_EnableRule: /* ENABLE/DISABLE RULE variants */
case AT_EnableAlwaysRule:
case AT_EnableReplicaRule:
case AT_DisableRule:
case AT_EnableRls: /* ENABLE/DISABLE ROW LEVEL SECURITY */
case AT_DisableRls:
case AT_ForceRls: /* FORCE/NO-FORCE ROW LEVEL SECURITY */
case AT_NoForceRls:
case AT_EncryptionKeyRotation:
case AT_DropInherit: /* NO INHERIT */
case AT_AddOf: /* OF */
case AT_DropOf: /* NOT OF */
case AT_SetAutoIncrement:
case AT_SetCharsetCollate:
ATSimplePermissions(rel, ATT_TABLE);
/* These commands never recurse */
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
case AT_ConvertCharset:
ATSimplePermissions(rel, ATT_TABLE);
sqlcmd_alter_prep_convert_charset(tab, rel, cmd, lockmode);
pass = AT_PASS_MISC;
break;
case AT_GenericOptions:
ATSimplePermissions(rel, ATT_FOREIGN_TABLE);
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
case AT_SET_COMPRESS:
ATSimplePermissions(rel, ATT_TABLE);
pass = AT_PASS_MISC;
break;
case AT_EnableRowMoveMent:
ATSimplePermissions(rel, ATT_TABLE);
ATPrepEnableRowMovement(rel);
pass = AT_PASS_MISC;
break;
case AT_DisableRowMoveMent:
ATSimplePermissions(rel, ATT_TABLE);
ATPrepDisableRowMovement(rel);
pass = AT_PASS_MISC;
break;
case AT_TruncatePartition:
ATPrepTruncatePartition(rel);
pass = AT_PASS_MISC;
break;
case AT_TruncateSubPartition:
ATPrepTruncateSubPartition(rel);
pass = AT_PASS_MISC;
break;
case AT_ExchangePartition:
ATSimplePermissions(rel, ATT_TABLE);
ATPrepExchangePartition(rel);
pass = AT_PASS_MISC;
break;
case AT_MergePartition:
ATSimplePermissions(rel, ATT_TABLE);
ATPrepMergePartition(rel);
pass = AT_PASS_MISC;
break;
case AT_SplitPartition:
ATSimplePermissions(rel, ATT_TABLE);
ATPrepSplitPartition(rel);
pass = AT_PASS_MISC;
break;
case AT_AddIntoCBI:
ATSimplePermissions(rel, ATT_INDEX);
pass = AT_PASS_MISC;
break;
case AT_SplitSubPartition:
ATSimplePermissions(rel, ATT_TABLE);
ATPrepSplitSubPartition(rel);
pass = AT_PASS_MISC;
break;
case AT_ResetPartitionno:
ATSimplePermissions(rel, ATT_TABLE);
ATPrepResetPartitionno(rel);
pass = AT_PASS_MISC;
break;
case AT_ModifyColumn:
ATSimplePermissions(rel, ATT_TABLE);
ATPrepAlterModifyColumn(wqueue, tab, rel, recurse, recursing, cmd, lockmode);
pass = AT_PASS_ALTER_TYPE;
ATAlterCheckModifiyColumnRepeatedly(cmd, tab->subcmds[pass]);
break;
#ifdef PGXC
case AT_DistributeBy:
case AT_SubCluster:
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE | ATT_MATVIEW);
/* No command-specific prep needed */
pass = AT_PASS_DISTRIB;
break;
/* @hdfs
* The HDFS foreign table support 'ALTER FOREIGN TABLE ADD NODE/DELETE NODE' cmd.
*/
case AT_AddNodeList:
case AT_DeleteNodeList:
case AT_UpdateSliceLike:
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
/* No command-specific prep needed */
pass = AT_PASS_DISTRIB;
break;
case AT_COMMENTS:
pass = AT_COMMENT;
break;
#endif
default: /* oops */
ereport(ERROR,
(errcode(ERRCODE_UNRECOGNIZED_NODE_TYPE),
errmsg("unrecognized alter table type: %d", (int)cmd->subtype)));
pass = 0; /* keep compiler quiet */
break;
}
/* Add the subcommand to the appropriate list for phase 2 */
tab->subcmds[pass] = lappend(tab->subcmds[pass], cmd);
}
ATRewriteCatalogs
emspATRewriteCatalogs 函数用于处理 ALTER TABLE 命令的第二阶段操作,即实际执行修改数据库目录的操作。该函数负责将 ALTER TABLE 命令中的各种子命令按照安全的顺序执行,如添加、修改列类型、删除等。通过并行处理每个表的不同修改步骤,确保了修改过程中数据一致性和执行效率。此外,还处理了特殊情况如自增索引的检查和 TOAST 表的创建,确保表的结构和数据完整性。函数源码如下所示(路径:src\gausskernel\optimizer\commands\tablecmds.cpp
)
static void ATRewriteCatalogs(List** wqueue, LOCKMODE lockmode)
{
int pass;
ListCell* ltab = NULL;
// 以多个阶段进行处理,确保可以正确处理依赖和并发问题
for (pass = 0; pass < AT_NUM_PASSES; pass++) {
// 遍历待处理的表列表
foreach (ltab, *wqueue) {
AlteredTableInfo* tab = (AlteredTableInfo*)lfirst(ltab); // 从列表中取出一个表的修改信息
List* subcmds = tab->subcmds[pass]; // 获取当前阶段对该表的子命令列表
Relation rel;
ListCell* lcmd = NULL;
if (subcmds == NIL) // 如果没有子命令,则跳过当前表
continue;
// 打开表,但不加锁,因为锁已在之前的阶段中获取
rel = relation_open(tab->relid, NoLock);
// 遍历并执行所有子命令
foreach (lcmd, subcmds)
ATExecCmd(wqueue, tab, rel, (AlterTableCmd*)lfirst(lcmd), lockmode);
// 如果当前是处理类型更改的阶段,执行清理工作
if (pass == AT_PASS_ALTER_TYPE && !tab->isDeltaTable)
ATPostAlterTypeCleanup(wqueue, tab, lockmode);
// 关闭对表的引用
relation_close(rel, NoLock);
}
}
// 再次遍历处理队列,检查是否需要添加 TOAST 表或其他后续处理
foreach (ltab, *wqueue) {
AlteredTableInfo* tab = (AlteredTableInfo*)lfirst(ltab);
// 更新生成的表达式
if (tab->is_first_after) {
UpdateGeneratedExpr(tab);
}
// 如果是全局临时表,创建存储文件
if (get_rel_persistence(tab->relid) == RELPERSISTENCE_GLOBAL_TEMP) {
gtt_create_storage_files(tab->relid);
}
// 检查是否需要为常规表或物化视图创建 TOAST 表
if ((tab->relkind == RELKIND_RELATION || tab->relkind == RELKIND_MATVIEW) &&
!u_sess->attr.attr_sql.enable_cluster_resize) {
Relation rel = relation_open(tab->relid, NoLock);
Datum toast_reloptions = (Datum)0;
if (rel->rd_options != NULL && RelationIsTableAccessMethodUStoreType(rel->rd_options)) {
List* optsList = NIL;
DefElem* def = makeDefElem(pstrdup("storage_type"),
(Node*)makeString((char*)(TABLE_ACCESS_METHOD_USTORE)));
optsList = lappend(optsList, def);
toast_reloptions = transformRelOptions((Datum)0, optsList, NULL, NULL, false, false);
}
AlterTableCreateToastTable(tab->relid, toast_reloptions);
relation_close(rel, NoLock);
}
// 检查自增索引
if (tab->relkind == RELKIND_RELATION) {
CheckRelAutoIncrementIndex(tab->relid, NoLock);
}
// 重新创建所有表触发器
foreach_cell(def_item, tab->changedTriggerDefs) {
char* cmd_str = (char*)lfirst(def_item);
List* raw_parsetree_list = raw_parser(cmd_str);
Node* stmt = (Node*)linitial(raw_parsetree_list);
Assert(IsA(stmt, CreateTrigStmt)); // 确保解析结果为创建触发器语句
(void)CreateTrigger(
(CreateTrigStmt*)stmt, cmd_str, InvalidOid, InvalidOid, InvalidOid, InvalidOid, false);
}
}
}
ATExecCmd 函数
ATExecCmd 函数负责根据子命令的类型将 ALTER TABLE 命令分派到适当的执行例程。这个函数是 ALTER TABLE 操作中的关键组成部分,用于确保各种表结构变更操作能够正确执行。
它首先会根据命令类型检查是否涉及到分区操作,并进行相应的处理。接着,它根据命令的子类型调用相应的处理函数。此外,函数还负责记录对象的修改时间、向事件触发器报告操作,以及管理命令执行过程中的锁和命令计数器,确保操作的原子性和可见性。这些都是确保数据库结构变更正确、高效执行的重要步骤。函数源码如下所示(路径:src\gausskernel\optimizer\commands\tablecmds.cpp
)
// 函数定义,包括工作队列、被修改的表信息、操作的表关系、具体的修改命令和锁模式
static void ATExecCmd(List** wqueue, AlteredTableInfo* tab, Relation rel, AlterTableCmd* cmd, LOCKMODE lockmode)
{
ObjectAddress address = InvalidObjectAddress; // 初始化对象地址,用于记录本次操作影响的数据库对象
elog(ES_LOGLEVEL, "[ATExecCmd] cmd subtype: %d", cmd->subtype); // 记录日志,显示命令的子类型
// 如果命令类型属于分区DDL命令,并且关系对象是一个分区表
if (PARTITION_DDL_CMD(cmd->subtype) && RELATION_IS_PARTITIONED(rel)) {
int partitionno = -GetCurrentPartitionNo(RelOidGetPartitionTupleid(rel->rd_id)); // 获取当前分区号
if (!PARTITIONNO_IS_VALID(partitionno)) { // 如果分区号无效
RelationResetPartitionno(rel->rd_id, ShareUpdateExclusiveLock); // 重置分区号
}
}
// 根据命令的子类型,分派到相应的处理例程
switch (cmd->subtype) {
case AT_AddColumn: /* 添加列 */
case AT_AddColumnToView: /* 通过 CREATE OR REPLACE VIEW 添加列 */
(后续代码省略)
default: /* 异常情况处理 */
ereport(ERROR,
(errcode(ERRCODE_UNRECOGNIZED_NODE_TYPE),
errmsg("unrecognized alter table type: %d", (int)cmd->subtype))); // 报告无法识别的表更改类型错误
break;
}
// 记录修改时间,这是为了保持对象的元数据是最新的
PgObjectType objectType = GetPgObjectTypePgClass(tab->relkind);
if (objectType != OBJECT_TYPE_INVALID) {
UpdatePgObjectMtime(tab->relid, objectType);
}
/*
* 报告子命令给感兴趣的事件触发器。
*/
EventTriggerCollectAlterTableSubcmd((Node *) cmd, address);
// 处理分区DDL命令中涉及的独占锁逻辑
#ifndef ENABLE_MULTIPLE_NODES
if (PARTITION_DDL_CMD(cmd->subtype)) {
AddPartitionDDLInfo(RelationGetRelid(rel));
}
#endif
/*
* 为了保证序列中的下一个子命令能看到到目前为止的变更,
* 增加命令计数器。
*/
CommandCounterIncrement();
}
ATExecAddColumn 函数
ATExecAddColumn 是 OpenGauss 数据库系统中用于处理 ALTER TABLE ADD COLUMN 操作的函数。这个函数的主要作用是将新列添加到现有的表中,同时确保这一变更符合数据库的完整性和约束要求。ATExecAddColumn 的实现考虑到了多种数据库表类型和存储格式,如行存储表、列存储表、外部表等,它涵盖了从权限检查、数据类型验证到索引和默认值处理的多个方面。
功能概述
以下是 ATExecAddColumn 函数的主要功能和处理步骤:
- 权限检查:
- 在添加列操作开始前,需要对操作的表进行权限检查,确认执行操作的用户具有足够的权限。这通常在函数的上层调用中完成(例如 ATPrepAddColumn)。
- 打开和锁定表:
- 函数首先需要打开目标表,并对其加锁以防止其他操作干扰。这个锁通常是排他锁,确保在添加列的过程中表结构不被其他操作修改。
- 处理列存储表的特殊情况:
- 对于列存储表,特别是 PAX 格式的表,添加列的操作与行存储表有所不同,可能需要特殊处理,如只更新目录而不实际重写数据。
- 数据类型和默认值处理:
- 确定新列的数据类型,包括从类型名称解析 OID、修饰符(typmod)和排序规则(collation)。
- 检查并处理列的默认值,如果新列定义中包含默认值,需要将这些默认值存储在系统目录中,同时考虑是否触发表的重写以包含新的默认值。
- 更新系统目录:
- 更新 pg_attribute 系统目录表以包含新列的元数据。
- 如果新列是 OID 类型,则还需更新 pg_class 来标记表现在包含 OID。
- 错误处理和特殊类型表的适配:
- 对于外部表或 MOT 类型的表,添加列的处理可能需要调用特定的外部表处理函数或 MOT 相关的函数。
- 对于分布式环境或时间序列数据库特有的表,如 Delta 表,需要进行额外的处理以确保表的结构一致性。
- 递归处理和继承:
- 如果表是继承结构的父表,需要递归地将新列添加到所有子表中。这一步骤需要处理继承计数和本地标志,以确保继承逻辑的正确性。
- 处理表的注释和额外属性:
- 为新列添加注释和其他相关属性。
- 完成操作和解锁:
- 在所有必要的更改完成后,需要更新内部缓存和索引,以确保数据库的内部状态与实际数据保持一致。
- 最后,解锁表并关闭与之相关的系统表。
函数源码如下所示(路径:src\gausskernel\optimizer\commands\tablecmds.cpp
)
// 定义一个函数,用于处理添加列操作,接收多个参数,包括工作队列、表信息、表关系对象、列定义、是否为OID列等
static ObjectAddress ATExecAddColumn(List** wqueue, AlteredTableInfo* tab, Relation rel, ColumnDef* colDef, bool isOid,
bool recurse, bool recursing, bool is_first, char *after_name, LOCKMODE lockmode) {
// 获取表的 OID
Oid myrelid = RelationGetRelid(rel);
// 定义关系变量
Relation pgclass = NULL;
Relation attrdesc = NULL;
Relation cedesc = NULL;
// 堆元组变量初始化
HeapTuple reltup = NULL;
// 定义一个新的 pg_attribute 表单数据结构
FormData_pg_attribute attribute;
// 新列号和当前列号变量初始化
int newattnum = 0;
int currattnum = 0;
// 表类型
char relkind;
// 类型元组和类型 OID
HeapTuple typeTuple;
Oid typeOid = InvalidOid;
// 类型修饰符和排序规则 OID
int32 typmod = -1;
Oid collOid = InvalidOid;
// 类型表单和默认值表达式
Form_pg_type tform = NULL;
Expr* defval = NULL;
// 子表列表和遍历用的 ListCell
List* children = NIL;
ListCell* child = NULL;
// 访问控制结果
AclResult aclresult;
// 检测是否为列存储表
bool isDfsTable = RelationIsPAXFormat(rel);
// 对象地址初始化
ObjectAddress address;
// 判断是否为首列添加或指定位置添加
bool is_addloc = is_first || after_name != NULL;
// 查询字符串列表初始化,用于执行动态 SQL
List *query_str = NIL;
// 若为递归调用,则进行简单的权限检查
if (recursing)
ATSimplePermissions(rel, ATT_TABLE);
// 打开 pg_attribute 表
attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
// 对于首列添加或指定位置添加,进行特定的错误处理
if (is_addloc) {
if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Unsupported feature"),
errdetail("foreign table is not supported for add column first|after columnName")));
}
if (RelationIsColumnFormat(rel)) {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Unsupported feature"),
errdetail("column orientated table is not supported for add column first|after columnName")));
}
}
// 如果添加的是加密列
CeHeapInfo* ceHeapInfo = NULL;
if (colDef->clientLogicColumnRef != NULL) {
if (is_addloc) {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Unsupported feature"),
errdetail("encryption column is not supported for add column first|after columnName")));
}
if (colDef->clientLogicColumnRef != NULL) {
ceHeapInfo = (CeHeapInfo *)palloc(sizeof(CeHeapInfo));
process_encrypted_columns(colDef, ceHeapInfo);
}
cedesc = heap_open(ClientLogicCachedColumnsId, RowExclusiveLock);
}
// 检查是否在递归子表中添加列,若是,则检查是否需要与现有定义合并
if (colDef->inhcount > 0) {
HeapTuple tuple;
// 在子表中查找是否已存在同名列
tuple = SearchSysCacheCopyAttName(myrelid, colDef->colname);
if (HeapTupleIsValid(tuple)) {
Form_pg_attribute childatt = (Form_pg_attribute)GETSTRUCT(tuple);
// 子列必须在类型、类型修饰符和排序规则上与新列匹配
Oid ctypeId = InvalidOid;
int32 ctypmod = -1;
Oid ccollid = InvalidOid;
typenameTypeIdAndMod(NULL, colDef->typname, &ctypeId, &ctypmod);
if (ctypeId != childatt->atttypid || ctypmod != childatt->atttypmod)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("child table \"%s\" has different type for column \"%s\"",
RelationGetRelationName(rel),
colDef->colname)));
ccollid = GetColumnDefCollation(NULL, colDef, ctypeId);
if (ccollid != childatt->attcollation)
ereport(ERROR,
(errcode(ERRCODE_COLLATION_MISMATCH),
errmsg("child table \"%s\" has different collation for column \"%s\"",
RelationGetRelationName(rel),
colDef->colname),
errdetail("\"%s\" versus \"%s\"",
get_collation_name(ccollid),
get_collation_name(childatt->attcollation))));
// 如果是 OID 列,子列也必须是 OID
if (isOid && childatt->attnum != ObjectIdAttributeNumber)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("child table \"%s\" has a conflicting \"%s\" column",
RelationGetRelationName(rel),
colDef->colname)));
// 增加现有子列的 inhcount
childatt->attinhcount++;
simple_heap_update(attrdesc, &tuple->t_self, tuple);
CatalogUpdateIndexes(attrdesc, tuple);
tableam_tops_free_tuple(tuple);
// 通知用户关于合并的信息
ereport(NOTICE,
(errmsg("merging definition of column \"%s\" for child \"%s\"",
colDef->colname,
RelationGetRelationName(rel))));
// 关闭 pg_attribute 表并返回
heap_close(attrdesc, RowExclusiveLock);
return InvalidObjectAddress;
}
}
// 打开 pg_class 表
pgclass = heap_open(RelationRelationId, RowExclusiveLock);
// 从系统缓存中复制 pg_class 元组
reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(myrelid));
if (!HeapTupleIsValid(reltup)) {
ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmsg("cache lookup failed for relation %u", myrelid)));
}
relkind = ((Form_pg_class)GETSTRUCT(reltup))->relkind;
// 检查新名称是否已存在
check_for_column_name_collision(rel, colDef->colname);
// 确定新属性的编号
if (isOid) {
newattnum = ObjectIdAttributeNumber;
} else {
currattnum = ((Form_pg_class)GETSTRUCT(reltup))->relnatts;
if (currattnum + 1 > MaxHeapAttributeNumber) {
ereport(ERROR,
(errcode(ERRCODE_TOO_MANY_COLUMNS),
errmsg("tables can have at most %d columns", MaxHeapAttributeNumber)));
}
if (is_first) {
newattnum = 1;
} else if (after_name != NULL) {
newattnum = GetAfterColumnAttnum(myrelid, after_name);
} else {
newattnum = currattnum + 1;
}
}
// 获取类型元组和设置类型 OID
typeTuple = typenameType(NULL, colDef->typname, &typmod);
tform = (Form_pg_type)GETSTRUCT(typeTuple);
typeOid = HeapTupleGetOid(typeTuple);
// 获取列的排序规则
Oid rel_coll_oid = rel->rd_options == NULL ? InvalidOid : ((StdRdOptions*)(rel)->rd_options)->collate;
collOid = GetColumnDefCollation(NULL, colDef, typeOid, rel_coll_oid);
if (DB_IS_CMPT(B_FORMAT)) {
typeOid = binary_need_transform_typeid(typeOid, &collOid);
if (RelationIsColStore(rel) || RelationIsTsStore(rel)) {
check_unsupported_charset_for_column(collOid, colDef->colname);
}
}
// 检查类型使用权限
aclresult = pg_type_aclcheck(typeOid, GetUserId(), ACL_USAGE);
if (aclresult != ACLCHECK_OK)
aclcheck_error_type(aclresult, typeOid);
// 确保数据类型适用于列
CheckAttributeType(colDef->colname, typeOid, collOid, list_make1_oid(rel->rd_rel->reltype), false);
#ifdef ENABLE_MOT
if (relkind == RELKIND_FOREIGN_TABLE && isMOTFromTblOid(RelationGetRelid(rel))) {
DetermineColumnCollationForMOTTable(&collOid);
}
#endif
// 检查是否为 Delta 表
bool isDelta = RELATION_IS_DELTA(rel);
// 构造新属性的 pg_attribute 条目
attribute.attrelid = myrelid;
(void)namestrcpy(&(attribute.attname), colDef->colname);
attribute.atttypid = typeOid;
attribute.attstattarget = (newattnum > 0) ? -1 : 0;
attribute.attlen = tform->typlen;
attribute.attcacheoff = -1;
attribute.atttypmod = typmod;
attribute.attnum = newattnum;
attribute.attbyval = tform->typbyval;
attribute.attndims = list_length(colDef->typname->arrayBounds);
attribute.attstorage = tform->typstorage;
attribute.attalign = tform->typalign;
attribute.attnotnull = colDef->is_not_null;
attribute.atthasdef = false;
attribute.attisdropped = false;
attribute.attislocal = colDef->is_local;
attribute.attkvtype = colDef->kvtype;
if (!isDelta) {
VerifyAttrCompressMode(colDef->cmprs_mode, attribute.attlen, colDef->colname);
attribute.attcmprmode = colDef->cmprs_mode;
} else {
attribute.attcmprmode = ATT_CMPR_NOCOMPRESS;
}
attribute.attinhcount = colDef->inhcount;
attribute.attcollation = collOid;
// attribute.attacl 是通过 InsertPgAttributeTuple 处理的
ReleaseSysCache(typeTuple);
// 如果是行存储表且不支持压缩
if (!isDelta && RelationIsRowFormat(rel) && ATT_CMPR_NOCOMPRESS < colDef->cmprs_mode
&& colDef->cmprs_mode <= ATT_CMPR_NUMSTR) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("row-oriented table does not support compression")));
}
// 如果是首列添加或指定位置添加,更新 pg_attribute 和其他相关表
if (is_addloc) {
UpdatePgAttributeFirstAfter(attrdesc, myrelid, newattnum, currattnum, true);
UpdatePgDescriptionFirstAfter(rel, newattnum, currattnum, true);
UpdatePgIndexFirstAfter(rel, newattnum, currattnum, true);
UpdatePgConstraintFirstAfter(rel, newattnum, currattnum, true);
UpdatePgConstraintConfkeyFirstAfter(rel, newattnum, currattnum, true);
UpdatePgAttrdefFirstAfter(rel, newattnum, currattnum, true);
UpdatePgPartitionFirstAfter(rel, newattnum, currattnum, true, false, NULL);
UpdatePgTriggerFirstAfter(rel, newattnum, currattnum, true);
UpdatePgRlspolicyFirstAfter(rel, newattnum, currattnum, true);
query_str = CheckPgRewriteFirstAfter(rel);
tab->rewrite |= AT_REWRITE_ALTER_PERSISTENCE;
tab->is_first_after = true;
}
// 插入新的 pg_attribute 元组
InsertPgAttributeTuple(attrdesc, &attribute, NULL);
// 关闭 pg_attribute 表
heap_close(attrdesc, RowExclusiveLock);
// 如果添加的是加密列,处理加密列相关的元组插入
if (colDef->clientLogicColumnRef != NULL) {
ceHeapInfo->attnum = newattnum;
insert_gs_sec_encrypted_column_tuple(ceHeapInfo, cedesc, myrelid, NULL);
heap_close(cedesc, RowExclusiveLock);
}
// 更新 pg_class 元组
if (isOid)
((Form_pg_class)GETSTRUCT(reltup))->relhasoids = true;
else
((Form_pg_class)GETSTRUCT(reltup))->relnatts = currattnum + 1;
simple_heap_update(pgclass, &reltup->t_self, reltup);
// 保持目录索引更新
CatalogUpdateIndexes(pgclass, reltup);
// 释放元组
tableam_tops_free_tuple(reltup);
// 执行新属性创建后的钩子
InvokeObjectAccessHook(OAT_POST_CREATE, RelationRelationId, myrelid, newattnum, NULL);
// 关闭 pg_class 表
heap_close(pgclass, RowExclusiveLock);
// 使属性的目录条目可见
CommandCounterIncrement();
// 如果是首列添加或指定位置添加,更新依赖关系和生成列等
if (is_addloc) {
UpdatePgDependFirstAfter(rel, newattnum, currattnum, true);
UpdateGenerateColFirstAfter(rel, newattnum, currattnum, true);
UpdateIndexFirstAfter(rel);
// 创建或替换视图
ReplaceViewQueryFirstAfter(query_str);
}
// 如果存在默认值,将其存储在目录中
if (colDef->raw_default) {
RawColumnDefault* rawEnt = NULL;
// 对于外部表的特殊处理
if (relkind == RELKIND_FOREIGN_TABLE) {
#ifdef ENABLE_MOT
if (!isMOTFromTblOid(RelationGetRelid(rel)) && !isPostgresFDWFromTblOid(RelationGetRelid(rel))) {
#else
if (!isPostgresFDWFromTblOid(RelationGetRelid(rel))) {
#endif
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("%s on foreign tables are not supported",
colDef->generatedCol ? "generated column" : "default values")));
}
} else if (relkind == RELKIND_STREAM) {
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("%s on streams are not supported",
colDef->generatedCol ? "generated column" : "default values")));
} else if (RelationIsTsStore(rel)) {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("It's not supported to add column with default value for timeseries tables.")));
}
// 初始化 RawColumnDefault 结构体并处理默认值和生成列
rawEnt = (RawColumnDefault*)palloc(sizeof(RawColumnDefault));
rawEnt->attnum = attribute.attnum;
rawEnt->raw_default = (Node*)copyObject(colDef->raw_default);
rawEnt->update_expr = (Node*)copyObject(colDef->update_default);
rawEnt->generatedCol = colDef->generatedCol;
// 这个函数原本用于 CREATE TABLE,因此它处理的是默认值列表,但这里只处理一个。
(void)AddRelationNewConstraints(rel, list_make1(rawEnt), NIL, false, true);
// 使额外的目录更改可见
CommandCounterIncrement();
}
// 如果有默认值,告知第三阶段填充默认表达式
#ifdef ENABLE_MOT
if ((relkind == RELKIND_FOREIGN_TABLE && isMOTFromTblOid(RelationGetRelid(rel)) && attribute.attnum > 0) ||
(relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE && relkind != RELKIND_FOREIGN_TABLE &&
relkind != RELKIND_STREAM && relkind != RELKIND_CONTQUERY && attribute.attnum > 0)) {
#else
if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE && relkind != RELKIND_FOREIGN_TABLE &&
relkind != RELKIND_STREAM && relkind != RELKIND_CONTQUERY && attribute.attnum > 0) {
#endif
// 测试新列是否非空
bool testNotNull = colDef->is_not_null;
// 在原地或在线升级期间,不应清除 nailed-in 系统目录的 relcache,这是考虑到可靠性
// 此外,目前不支持在原地或在线升级期间为 pg_class、pg_attribute、pg_proc 中的新列添加非 NULL 默认值。
if (u_sess->attr.attr_common.IsInplaceUpgrade &&
(rel->rd_id == RelationRelationId || rel->rd_id == AttributeRelationId))
defval = NULL;
else
defval = (Expr*)build_column_default(rel, attribute.attnum);
// 如果默认值为 NULL 并且类型是域类型,可能需要显式处理 NULL 默认值
if (defval == NULL && (GetDomainConstraints(typeOid) != NIL || is_addloc)) {
Oid baseTypeId;
int32 baseTypeMod;
Oid baseTypeColl;
baseTypeMod = typmod;
baseTypeId = getBaseTypeAndTypmod(typeOid, &baseTypeMod);
baseTypeColl = get_typcollation(baseTypeId);
defval = (Expr*)makeNullConst(baseTypeId, baseTypeMod, baseTypeColl);
if (GetDomainConstraints(typeOid) != NIL) {
defval = (Expr*)coerce_to_target_type(
NULL, (Node*)defval, baseTypeId, typeOid, typmod, COERCION_ASSIGNMENT, COERCE_IMPLICIT_CAST, -1);
if (defval == NULL) /* 应该不会发生 */
ereport(ERROR,
(errcode(ERRCODE_UNEXPECTED_NULL_VALUE), errmsg("failed to coerce base type to domain")));
}
}
// 如果 defval 不为 NULL,则根据具体情况处理
if (defval != NULL) {
// 如果当前是行存储表且必须重写,
// 则不适用 alter-table-instantly 特性,同样排除临时表和列表。
if (attribute.attnum == RelAutoIncAttrNum(rel)) {
if (colDef->is_not_null) {
ATExecAppendDefValExpr(attribute.attnum, defval, tab, colDef, true, is_addloc);
}
} else if (contain_specified_function((Node*)defval, NEXTVALFUNCOID)) {
// 不支持默认值为 nextval 表达式的 alter table add column
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("It's not supported to alter table add column default with nextval expression.")));
} else if (RelationIsCUFormat(rel)) {
ATExecAppendDefValExpr(attribute.attnum, defval, tab, colDef, false, false);
} else if (tab->rewrite>0 || colDef->generatedCol ||
RelationUsesSpaceType(rel->rd_rel->relpersistence) == SP_TEMP) {
ATExecAppendDefValExpr(attribute.attnum, defval, tab, colDef, false, true);
} else {
bytea* value = NULL;
AT_INSTANT_DEFAULT_VALUE ret =
shouldUpdateAllTuples(defval, attribute.atttypid, attribute.attlen, attribute.attbyval, &value);
// 如果默认值为常量且不为空
if (ret == DEFAULT_NOT_NULL_CONST) {
Assert(value != NULL);
updateInitDefVal(value, rel, attribute.attnum);
pfree_ext(value);
// 新列有常数默认值,
// 因此不需要进行非空检测。
testNotNull = false;
} else if (ret == DEFAULT_OTHER) {
if (isDfsTable) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
(errmsg("It is not supported on DFS table. The detailed reasons are the"
" followings:"),
errdetail("1. the default value may be a volatile function.\n"
"2. the storage length of default value may be greater than 127.\n"
"3. the data type of new column is not supported."))));
}
ATExecAppendDefValExpr(attribute.attnum, defval, tab, colDef, false, false);
}
// 如果默认值为 NULL,无需进一步操作
/* nothing to do if ret is DEFAULT_NULL */
}
}
// 如果新列为 NOT NULL,告诉第三阶段需要检查这一点。
// 注意:我们不对 OID 列这样做。OID 会被标记为 not null,但由于它特别填充,无需检查。
if (testNotNull) {
tab->new_notnull = true;
}
}
// 如果我们正在添加一个 OID 列,我们必须告诉第三阶段重写表以修复这一点。
if (isOid) {
tab->rewrite |= AT_REWRITE_ALTER_OID;
}
// 如果表为列存储,处理相关逻辑
if (RelationIsColStore(rel)) {
// DFS表不重写数据,只更新目录中的默认值
if (isDfsTable) {
tab->rewrite = AT_REWRITE_ALTER_OID;
} else {
tab->rewrite |= AT_REWRITE_ALTER_OID;
}
}
// 为新列添加数据类型和排序规则的依赖关系
add_column_datatype_dependency(myrelid, newattnum, attribute.atttypid);
add_column_collation_dependency(myrelid, newattnum, attribute.attcollation);
#ifdef ENABLE_MOT
// 如果是外部表并且表是MOT类型,执行MOT特有的ALTER TABLE处理
if (relkind == RELKIND_FOREIGN_TABLE && isMOTFromTblOid(RelationGetRelid(rel))) {
AlterForeingTableCmd fcmd = {
T_AlterForeingTableCmd,
AT_AddColumn,
rel,
nullptr,
(Node*)colDef,
typeOid,
defval
};
ATExecMOTAlterTable(&fcmd);
}
#endif
#ifdef ENABLE_MULTIPLE_NODES
// 如果表是时间序列存储表
if (unlikely(RelationIsTsStore(rel))) {
// 如果新增的列是标签类型
if (colDef->kvtype == ATT_KV_TAG) {
// 获取标签表的 OID
Oid tag_relid = get_tag_relid(RelationGetRelationName(rel), rel->rd_rel->relnamespace);
Relation tagrel;
AlteredTableInfo* tagtab = NULL;
List* index_col_name = NIL;
// 打开标签表
tagrel = heap_open(tag_relid, lockmode);
// 检查标签表是否正在被使用
CheckTableNotInUse(tagrel, "ALTER TABLE");
// 获取或创建一个修改信息结构用于记录对标签表的修改
tagtab = ATGetQueueEntry(wqueue, tagrel);
// 递归调用 ATExecAddColumn 来处理标签表,添加新列
ATExecAddColumn(wqueue, tagtab, tagrel, colDef, isOid, false, false, false, NULL, lockmode);
char tag_relname[NAMEDATALEN] = {0};
// 生成标签表的元数据表名称
Tsdb::GenMetaRelname(rel->rd_rel->relnamespace, Tsdb::MetaTableType::META_TABLE_TAGS,
tag_relname, TsConf::MAX_TS_NAME_LEN, RelationGetRelationName(rel));
// 为新标签列创建索引
index_col_name = lappend(index_col_name, colDef->colname);
create_tag_index(tag_relid, tag_relname, index_col_name);
// 释放列表资源
list_free_ext(index_col_name);
// 关闭标签表
heap_close(tagrel, NoLock);
} else if (colDef->kvtype == ATT_KV_FIELD && Tsdb::RelationEnablesTsdbDelta(rel)) {
// 如果新增的列是字段类型并且表启用了时间序列Delta存储
// 获取Delta表关系
Relation delta_rel = Tsdb::RelationGetDeltaRelation(rel, lockmode);
// 检查Delta表是否正在被使用
CheckTableNotInUse(delta_rel, "ALTER TABLE");
// 获取或创建一个修改信息结构用于记录对Delta表的修改
AlteredTableInfo* delta_tab = ATGetQueueEntry(wqueue, delta_rel);
// 在Delta表上执行添加列操作
ATExecAddColumn(wqueue, delta_tab, delta_rel, colDef, isOid, false, false, false, NULL, lockmode);
// 关闭Delta表
heap_close(delta_rel, NoLock);
}
}
#endif /* ENABLE_MULTIPLE_NODES */
// 如果表是PAX格式,即列存储表
if (RelationIsPAXFormat(rel)) {
// 为delta表添加列
children = lappend_oid(children, RelationGetDeltaRelId(rel));
// 获取锁以同步防止并发删除操作
LockRelationOid(RelationGetDeltaRelId(rel), lockmode);
elog(DEBUG1,
"[GET LOCK] Successfully acquired the lock %d on the delta table of %s for altering operation.",
lockmode,
RelationGetRelationName(rel));
#ifdef ENABLE_MULTIPLE_NODES
} else if (g_instance.attr.attr_storage.enable_delta_store && RelationIsCUFormat(rel)) {
#else
// 在集中模式下,delta表可能有唯一索引。当检查唯一约束时,将使用delta表上的唯一索引。因此,这里忽略enable_delta_store标志并同时修改delta表。
} else if (RelationIsCUFormat(rel)) {
#endif
// 添加C-Store关系的delta表到递归处理中,如果列支持继承特性
// 我们还需要调用find_inheritance_children
children = find_cstore_delta(rel, lockmode);
} else {
// 适当地将操作传播到子表。与大多数其他ALTER操作不同,我们必须逐级递归处理,不能一次性使用find_all_inheritors完成。
children = find_inheritance_children(RelationGetRelid(rel), lockmode);
}
// 如果我们被告知不要递归,而且确实有子表存在,那么添加操作会使它们不同步。
if (children && !recurse)
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("column must be added to child tables too")));
// 子表应视列为单一继承,Cstore表和delta表不是继承表
if (!recursing && !RelationIsCUFormat(rel)) {
colDef = (ColumnDef*)copyObject(colDef);
colDef->inhcount = 1;
colDef->is_local = false;
}
// 添加列的注释
ATCreateColumComments(myrelid, colDef);
// 遍历所有子表
foreach (child, children) {
Oid childrelid = lfirst_oid(child);
Relation childrel;
AlteredTableInfo* childtab = NULL;
// find_inheritance_children已经获取了锁
childrel = heap_open(childrelid, NoLock);
CheckTableNotInUse(childrel, "ALTER TABLE");
// 查找或创建子表的工作队列条目
childtab = ATGetQueueEntry(wqueue, childrel);
// 递归到子表
ATExecAddColumn(wqueue, childtab, childrel, colDef, isOid, recurse, true, is_first, after_name, lockmode);
heap_close(childrel, NoLock);
}
ObjectAddressSubSet(address, RelationRelationId, myrelid, newattnum);
return address;
}
ATRewriteTables 函数
ATRewriteTables 是 ALTER TABLE 操作的第三阶段,处理表结构重写,如改变列的数据类型或添加/删除 OID 等。它会根据表的种类(如普通表、分区表、索引等)和存储格式(如行存、列存)来选择适当的处理函数。针对系统表、临时表等有特殊要求的表进行了错误处理。同时,它还处理触发器事件和数据依赖关系,确保数据库的一致性和完整性。函数源码如下所示(路径:src\gausskernel\optimizer\commands\tablecmds.cpp
)
/*
* ATRewriteTables: ALTER TABLE phase 3
*/
static void ATRewriteTables(AlterTableStmt *parsetree, List** wqueue, LOCKMODE lockmode)
{
ListCell* ltab = NULL;
/* 遍历每个需要检查或重写的表 */
foreach (ltab, *wqueue) {
AlteredTableInfo* tab = (AlteredTableInfo*)lfirst(ltab); // 获取表的修改信息对象
int rel_format_idx = IDX_ROW_TBL; // 默认索引为行存储表
int idxPartitionedOrNot = IDX_ORDINARY_TBL; // 默认索引为非分区表
#ifdef PGXC
/* 检查是否存在与数据重新分配不兼容的表重写操作 */
if (tab->rewrite > 0 && list_length(tab->subcmds[AT_PASS_DISTRIB]) > 0 && IS_PGXC_COORDINATOR && !IsConnFromCoord())
ereport(ERROR,
(errcode(ERRCODE_STATEMENT_TOO_COMPLEX), errmsg("Incompatible operation with data redistribution")));
#endif
/* 如果是外部表或流表,则跳过,因为它们没有存储 */
if (tab->relkind == RELKIND_FOREIGN_TABLE || tab->relkind == RELKIND_STREAM)
continue;
/* 根据表的类型打开表并设置格式和分区类型的索引 */
if (tab->relkind == RELKIND_RELATION) {
Relation temprel = heap_open(tab->relid, NoLock);
rel_format_idx =
RelationIsCUFormat(temprel) ? IDX_COL_TBL : IDX_ROW_TBL;
idxPartitionedOrNot = RELATION_IS_PARTITIONED(temprel) ? IDX_PARTITIONED_TBL : IDX_ORDINARY_TBL;
heap_close(temprel, NoLock);
} else if (tab->relkind == RELKIND_INDEX || tab->relkind == RELKIND_GLOBAL_INDEX) {
Relation temprel = index_open(tab->relid, NoLock);
rel_format_idx = IDX_ROW_TBL; // 索引默认为行关系
idxPartitionedOrNot = RelationIsPartitioned(temprel) ? IDX_PARTITIONED_TBL : IDX_ORDINARY_TBL;
index_close(temprel, NoLock);
}
/* 检查是否需要因类型变更或默认值添加而传播更改 */
if (tab->newvals != NIL || tab->rewrite > 0) {
Relation rel;
rel = heap_open(tab->relid, NoLock);
find_composite_type_dependencies(rel->rd_rel->reltype, rel, NULL);
heap_close(rel, NoLock);
}
/* 如果需要重写表,则构建临时表并复制数据 */
if (tab->rewrite > 0) {
Relation OldHeap;
Oid NewTableSpace;
OldHeap = heap_open(tab->relid, NoLock);
if (parsetree)
EventTriggerTableRewrite((Node *)parsetree, tab->relid, tab->rewrite);
if (IsSystemRelation(OldHeap))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot rewrite system relation \"%s\"", RelationGetRelationName(OldHeap))));
if (RelationIsUsedAsCatalogTable(OldHeap))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot rewrite table \"%s\" used as a catalog table", RelationGetRelationName(OldHeap))));
if (RELATION_IS_OTHER_TEMP(OldHeap))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot rewrite temporary tables of other sessions")));
if (tab->newTableSpace)
NewTableSpace = tab->newTableSpace;
else
NewTableSpace = OldHeap->rd_rel->reltablespace;
heap_close(OldHeap, NoLock);
ExecRewriteFuncPtrArray[rel_format_idx][idxPartitionedOrNot](tab, NewTableSpace, lockmode);
} else {
/* 如果没有重写需求,仅测试新约束或不变更表空间时的操作 */
if (tab->constraints != NIL || tab->new_notnull) {
ExecOnlyTestFuncPtrArray[rel_format_idx][idxPartitionedOrNot](tab);
}
if (tab->newTableSpace) {
CheckTopRelationIsInMyTempSession(tab->relid);
if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && IsTransactionBlock() && rel_format_idx == 0 &&
tab->relkind == RELKIND_RELATION) {
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("alter row table tablespace cannot run inside a transaction block")));
}
ExecChangeTabspcFuncPtrArray[rel_format_idx][idxPartitionedOrNot](tab, lockmode);
}
}
}
}