概述

sentry是大数据安全组件,用于管控hive、hbase等组件权限的控制,在hadoop3.0之后使用到的安全组件是ranger,sentry已经废弃,不过很多大数据平台 版本依然是2.x,而且这种安全组件原理也是相通的,因此有必要研究一下(主要是最近被分到了这个组件,😄),由于在工程实践中我们这里更多的是使用sentry 管控hive的权限,因此有必要在介绍sentry之前对hive的体系结构进行说明。

HIVE概述

HIVE架构

如下图所示为hive的架构:

  • UI:hive的客户端,包含了hiveCli、beeLine,用户通过UI来实现自己的操作,CliDriver是SQL本地直接编译,然后访问metastore,并提交作业,是重客户端,BeeLine会将SQL提交给Hive Server2,由Hive server2编译,然后访问metastore,并提交作业,是轻客户端。(因此在使用sentry来管控权限的时候,通过hiveCli是无法完美的做到权限管控的,通过beeLine来进行操作才是可以,原因见下面hive的扩展机制)
  • Driver:接收查询请求,并处理会话
  • Complier:解析查询语句,做语义分析,借助于metastore来生成执行计划
  • Execution Engine:用来执行生成的执行计划,是Hive和Hadoop的桥梁
  • metastore:提供hive元数据相关的服务

HIVE扩展机制

下面来看一下HIVE中存在的扩展机制,sentry的权限的控制就是基于这种扩展机制完成的: 在生产环境中,hive通常会对应3个不同的进程,这些进程分别是Hive server、Metastore、RDBMS,其中提供扩展机制的是:

  • Metastore的Listener
  • hive server的Hook

测试需要,我本地搭建了一个hive的环境,下面是针对Listener和Hook的测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.MetaStorePreEventListener;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.events.PreEventContext;
public class CustomListener1 extends MetaStorePreEventListener {
public CustomListener1(Configuration config) {
super(config);
System.out.println("初始化 CustomListener1");
}
@Override
public void onEvent(PreEventContext preEventContext) throws MetaException, NoSuchObjectException, InvalidOperationException {
System.out.println("MetaStorePreEventListener" + preEventContext + preEventContext.getEventType());
}
}

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
public class CustomListener extends MetaStoreEventListener {
public CustomListener(Configuration config) {
super(config);
}
@Override
public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
System.out.println("wes create table : " + tableEvent.getTable());
}
@Override
public void onDropTable(DropTableEvent tableEvent) throws MetaException {
System.out.println("wes drop table : " + tableEvent.getTable());
}
}

import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
public class HiveExampleHook implements ExecuteWithHookContext {
@Override
public void run(HookContext hookContext) throws Exception {
System.out.println("hello wes, this is hive hook !");
}
}

下面是将这些listener、hook注册到hive之后运行相关的DDL、DML操作的时候触发的事件,如下: 上面测试的Listener和Hook也是sentry使用到的Listener和Hook,sentry中使用Hook鉴权, 使用Listener监听权限及相关的元数据的变更, 并通知到服务端,在sentry的listener中会 从生成的事件中获取事件的id,并将事件的id记录到hive的metastore中, 之后hive的hivemetastoreclient 定时拉取相关的事件并更新到sentry server端的数据库中, 但是在实际测试的过程中却发现应 该存在的事件id真实情况却是null, 线上测试环境中有相关的指标,怀疑和使用的hive版本有关 系或并非所有的event都有eventid。 查看hive的metastore所使用到的数据库发现有表专门对 notification进行了记录,下面展示了eventid为空的情况: 关于hive的相关的知识就到这里,下面我们来看一下sentry的整体架构。

sentry

ER关系图

下图展示了sentry的实体关系图(有些表的作用尚不清楚) 上面的关系表可以看到的是权限的控制是通过RBAC的方式,不过这里的角色是和group绑定的,并不是user,可以让用户加入不同的组来完成权限的控制, 不过用户和组的绑定关系并没有对应的表,而是复用了hadoop中user和group的关系,这一点不论从当前的表中还是代码中都可以得到印证。

sentry整体架构

如上图所示为sentry整体架构图,其中sentry-binding就是利用了Hive上面的扩展机制, 分别使用Listener和Hook对数据的DDL、DML操作进行权限的控制,sentry实现的Listener和Hook的主要的作用:

  • Listener主要是针对DDL操作发布对应的事件的id,这样sentryServer端会使用HMSFollower从hive的metastore处进行同步相关的数据。
  • Hook的话,会截取DML中的信息,并进行鉴权操作,也会截取DDL中的信息进行授权操作,这个具体可以看下面的代码

权限的grant、revoke以及权限的校验

在hive-binding模块的HiveAuthzBindingHook的postAnalyze中会将task转换成SentryGrantRevokeTask,SentryGrantRevokeTask中重 写的execute方法中通过RPC请求执行了权限的grant、revoke操作。 HiveAuthzBindingHook 继承了AbstractSemanticAnalyzerHook,重写的两个重要的方法分别是preAnalyze、postAnalyze两个方法, 我们来看一下这两个方法所做的操作:

client端鉴权操作(sentry-binding模块)

preAnalyze

下面是preAnalyze方法,我们可以看到preAnalyze所做的操作仅仅是提取hiveSQL语句中的分区、库、表等信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, ASTNode ast)
throws SemanticException {
switch (ast.getToken().getType()) {
// Hive parser doesn't capture the database name in output entity, so we store it here for now
case HiveParser.TOK_CREATEDATABASE:
case HiveParser.TOK_ALTERDATABASE_PROPERTIES:
case HiveParser.TOK_DROPDATABASE:
case HiveParser.TOK_SWITCHDATABASE:
case HiveParser.TOK_DESCDATABASE:
// 针对库的操作,只需要提取对应的数据库即可
currDB = new Database(BaseSemanticAnalyzer.unescapeIdentifier(ast.getChild(0).getText()));
break;
case HiveParser.TOK_CREATETABLE:
case HiveParser.TOK_CREATEVIEW:
/*
* Compiler doesn't create read/write entities for create table.
* Hence we need extract dbname from db.tab format, if applicable
*/
currDB = extractDatabase((ASTNode)ast.getChild(0));
break;
case HiveParser.TOK_DROPTABLE:
case HiveParser.TOK_DROPVIEW:
case HiveParser.TOK_SHOW_CREATETABLE:
case HiveParser.TOK_ALTERTABLE_SERIALIZER:
case HiveParser.TOK_ALTERVIEW_ADDPARTS:
case HiveParser.TOK_ALTERVIEW_DROPPARTS:
case HiveParser.TOK_ALTERVIEW_PROPERTIES:
case HiveParser.TOK_ALTERVIEW_RENAME:
case HiveParser.TOK_ALTERVIEW:
case HiveParser.TOK_DROPINDEX:
case HiveParser.TOK_LOCKTABLE:
case HiveParser.TOK_UNLOCKTABLE:
currTab = extractTable((ASTNode)ast.getFirstChildWithType(HiveParser.TOK_TABNAME));
currDB = extractDatabase((ASTNode) ast.getChild(0));
break;
case HiveParser.TOK_CREATEINDEX:
currTab = extractTable((ASTNode)ast.getFirstChildWithType(HiveParser.TOK_TABNAME));
currDB = extractDatabase((ASTNode) ast.getChild(0));
indexURI = extractTableLocation(ast);//As index location is captured using token HiveParser.TOK_TABLELOCATION
break;
case HiveParser.TOK_ALTERINDEX_REBUILD:
currTab = extractTable((ASTNode)ast.getChild(0)); //type is not TOK_TABNAME
currDB = extractDatabase((ASTNode) ast.getChild(0));
break;
case HiveParser.TOK_SHOW_TABLESTATUS:
currDB = extractDatabase((ASTNode)ast.getChild(0));
int children = ast.getChildCount();
for (int i = 1; i < children; i++) {
ASTNode child = (ASTNode) ast.getChild(i);
if (child.getToken().getType() == HiveParser.Identifier) {
currDB = new Database(child.getText());
break;
}
}
//loosing the requested privileges for possible wildcard tables, since
//further authorization will be done at the filter step and those unwanted will
//eventually be filtered out from the output
currTab = Table.ALL;
break;
case HiveParser.TOK_ALTERTABLE_RENAME:
case HiveParser.TOK_ALTERTABLE_PROPERTIES:
case HiveParser.TOK_ALTERTABLE_DROPPARTS:
case HiveParser.TOK_ALTERTABLE_RENAMECOL:
case HiveParser.TOK_ALTERTABLE_ADDCOLS:
case HiveParser.TOK_ALTERTABLE_REPLACECOLS:
case HiveParser.TOK_SHOW_TBLPROPERTIES:
case HiveParser.TOK_SHOWINDEXES:
case HiveParser.TOK_SHOWPARTITIONS:
//token name TOK_TABNAME is not properly set in this case
currTab = extractTable((ASTNode)ast.getChild(0));
currDB = extractDatabase((ASTNode)ast.getChild(0));
break;
case HiveParser.TOK_MSCK:
extractDbTableNameFromTOKTABLE((ASTNode) ast.getChild(1));
break;
case HiveParser.TOK_ALTERTABLE_ADDPARTS:
/*
* Compiler doesn't create read/write entities for create table.
* Hence we need extract dbname from db.tab format, if applicable
*/
currTab = extractTable((ASTNode)ast.getChild(0));
currDB = extractDatabase((ASTNode)ast.getChild(0));
// 解析sql中的分区
partitionURI = extractPartition(ast);
break;
case HiveParser.TOK_CREATEFUNCTION:
String udfClassName = BaseSemanticAnalyzer.unescapeSQLString(ast.getChild(1).getText());
try {
CodeSource udfSrc =
Class.forName(udfClassName, true, Utilities.getSessionSpecifiedClassLoader())
.getProtectionDomain().getCodeSource();
if (udfSrc == null) {
throw new SemanticException("Could not resolve the jar for UDF class " + udfClassName);
}
String udfJar = udfSrc.getLocation().getPath();
if (udfJar == null || udfJar.isEmpty()) {
throw new SemanticException("Could not find the jar for UDF class " + udfClassName +
"to validate privileges");
}
udfURIs.add(parseURI(udfSrc.getLocation().toString(), true));
} catch (ClassNotFoundException e) {
List<String> functionJars = getFunctionJars(ast);
if (functionJars.isEmpty()) {
throw new SemanticException("Error retrieving udf class:" + e.getMessage(), e);
} else {
// Add the jars from the command "Create function using jar" to the access list
// Defer to hive to check if the class is in the jars
for(String jar : functionJars) {
udfURIs.add(parseURI(jar, false));
}
}
}
// create/drop function is allowed with any database
currDB = Database.ALL;
break;
case HiveParser.TOK_DROPFUNCTION:
// create/drop function is allowed with any database
currDB = Database.ALL;
break;
case HiveParser.TOK_LOAD:
String dbName = BaseSemanticAnalyzer.unescapeIdentifier(ast.getChild(1).getChild(0).getChild(0).getText());
currDB = new Database(dbName);
break;
case HiveParser.TOK_DESCTABLE:
currDB = getCanonicalDb();
// For DESCRIBE FORMATTED/EXTENDED ast will have an additional child node with value
// "FORMATTED/EXTENDED".
isDescTableBasic = (ast.getChildCount() == 1);
break;
case HiveParser.TOK_TRUNCATETABLE:
// SENTRY-826:
// Truncate empty partitioned table should throw SemanticException only if the
// user does not have permission.
// In postAnalyze, currOutDB and currOutTbl will be added into outputHierarchy
// which will be validated in the hiveAuthzBinding.authorize method.
Preconditions.checkArgument(ast.getChildCount() == 1);
// childcount is 1 for table without partition, 2 for table with partitions
Preconditions.checkArgument(ast.getChild(0).getChildCount() >= 1);
ASTNode tableTok = (ASTNode) ast.getChild(0).getChild(0);
Preconditions.checkArgument(tableTok.getChildCount() >= 1);
if (tableTok.getChildCount() == 1) {
// If tableTok chilcount is 1, tableTok does not has database information, use current working DB
currOutDB = extractDatabase((ASTNode) ast.getChild(0));
currOutTab = extractTable((ASTNode) tableTok.getChild(0));
} else {
// If tableTok has fully-qualified name(childcount is 2),
// get the db and table information from tableTok.
extractDbTableNameFromTOKTABLE(tableTok);
}
break;
case HiveParser.TOK_ALTERTABLE:
currDB = getCanonicalDb();
for (Node childNode : ast.getChildren()) {
ASTNode childASTNode = (ASTNode) childNode;
if ("TOK_ALTERTABLE_SERIALIZER".equals(childASTNode.getText())) {
ASTNode serdeNode = (ASTNode) childASTNode.getChild(0);
String serdeClassName = BaseSemanticAnalyzer.unescapeSQLString(serdeNode.getText());
setSerdeURI(serdeClassName);
}
if ("TOK_ALTERTABLE_RENAME".equals(childASTNode.getText())) {
currDB = extractDatabase((ASTNode)ast.getChild(0));
ASTNode newTableNode = (ASTNode)childASTNode.getChild(0);
currOutDB = extractDatabase(newTableNode);
}
}
break;
default:
currDB = getCanonicalDb();
break;
}
return ast;
}

postAnalyze

下面是postAnalyze方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public void postAnalyze(HiveSemanticAnalyzerHookContext context,
List<Task<? extends Serializable>> rootTasks) throws SemanticException {
// 获取stmt的操作类型
HiveOperation stmtOperation = getCurrentHiveStmtOp();
// inputPrivileges(操作对应的输入路径), outputPrivileges(操作对应的输出路径), operationType(DDL、DML、QUERY等操作的类型), operationScope(库、表、列、函数等)
HiveAuthzPrivileges stmtAuthObject;
// 根据操作的类型获取当前这种操作必须要具备的权限,这并不需要RPC调用来获取用户的权限,而仅仅是获取这种操作需要具备的权限,主题是操作而不是用户
stmtAuthObject = HiveAuthzPrivilegesMap.getHiveAuthzPrivileges(stmtOperation);
// must occur above the null check on stmtAuthObject
// TODO 授权、取消并不是在binding模块中认证的???? since GRANT/REVOKE/etc are not authorized by binding layer at present
// subject包装用户信息
Subject subject = getCurrentSubject(context);
// 直接从配置文件中获取用户所属的组的信息,因此可以猜测用户和组的映射的关系并不是在sentry的表中的,而是存在于配置文件中(查看表信息确实如此)
Set<String> subjectGroups = hiveAuthzBinding.getGroups(subject);
for (Task<? extends Serializable> task : rootTasks) {
if (task instanceof SentryGrantRevokeTask) {
// 应该是将原来的无权限的task包装成新的task,该task内部执行授权、取消授权的操作
SentryGrantRevokeTask sentryTask = (SentryGrantRevokeTask)task;
sentryTask.setHiveAuthzBinding(hiveAuthzBinding);
sentryTask.setAuthzConf(authzConf);
sentryTask.setSubject(subject);
sentryTask.setSubjectGroups(subjectGroups);
sentryTask.setIpAddress(context.getIpAddress());
sentryTask.setOperation(stmtOperation);
}
}
try {
if (stmtAuthObject == null) {
// We don't handle authorizing this statement
return;
}
/**
* Replace DDLTask using the SentryFilterDDLTask for protection,
* such as "show column" only allow show some column that user can access to.
* SENTRY-847
*/
for (int i = 0; i < rootTasks.size(); i++) {
Task<? extends Serializable> task = rootTasks.get(i);
if (task instanceof DDLTask) {
ShowColumnsDesc showCols = ((DDLTask) task).getWork().getShowColumnsDesc();
if (showCols != null) {
// 针对DDL操作,这里是只展示用于具备权限的列
SentryFilterDDLTask filterTask =
new SentryFilterDDLTask(hiveAuthzBinding, subject, stmtOperation);
filterTask.copyDDLTask((DDLTask) task);
rootTasks.set(i, filterTask);
}
}
}
// 最后进行鉴权,此处会远程调用来获取用户的权限
authorizeWithHiveBindings(context, stmtAuthObject, stmtOperation);
} catch (AuthorizationException e) {
executeOnFailureHooks(context, stmtOperation, e);
String permsRequired = "";
for (String perm : hiveAuthzBinding.getLastQueryPrivilegeErrors()) {
permsRequired += perm + ";";
}
SessionState.get().getConf().set(HiveAuthzConf.HIVE_SENTRY_AUTH_ERRORS, permsRequired);
String msgForLog = HiveAuthzConf.HIVE_SENTRY_PRIVILEGE_ERROR_MESSAGE
+ "\n Required privileges for this query: "
+ permsRequired;
String msgForConsole = HiveAuthzConf.HIVE_SENTRY_PRIVILEGE_ERROR_MESSAGE + "\n "
+ e.getMessage()+ "\n The required privileges: " + permsRequired;
// AuthorizationException is not a real exception, use the info level to record this.
LOG.info(msgForLog);
throw new SemanticException(msgForConsole, e);
} finally {
hiveAuthzBinding.close();
}
if ("true".equalsIgnoreCase(context.getConf().
get(HiveAuthzConf.HIVE_SENTRY_MOCK_COMPILATION))) {
throw new SemanticException(HiveAuthzConf.HIVE_SENTRY_MOCK_ERROR + " Mock query compilation aborted. Set " +
HiveAuthzConf.HIVE_SENTRY_MOCK_COMPILATION + " to 'false' for normal query processing");
}
}

上面最后调用了authorizeWithHiveBindings方法,这里才是权限校验的地方,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
private void authorizeWithHiveBindings(HiveSemanticAnalyzerHookContext context,
HiveAuthzPrivileges stmtAuthObject, HiveOperation stmtOperation) throws AuthorizationException {
// 获取当前操作输入、输出相关的信息
Set<ReadEntity> inputs = context.getInputs();
Set<WriteEntity> outputs = context.getOutputs();
List<List<DBModelAuthorizable>> inputHierarchy = new ArrayList<List<DBModelAuthorizable>>();
List<List<DBModelAuthorizable>> outputHierarchy = new ArrayList<List<DBModelAuthorizable>>();
if(LOG.isDebugEnabled()) {
LOG.debug("stmtAuthObject.getOperationScope() = " + stmtAuthObject.getOperationScope());
LOG.debug("context.getInputs() = " + context.getInputs());
LOG.debug("context.getOutputs() = " + context.getOutputs());
}
// Workaround to allow DESCRIBE <table> to be executed with only column-level privileges, while
// still authorizing DESCRIBE [EXTENDED|FORMATTED] as table-level.
// This is done by treating DESCRIBE <table> the same as SHOW COLUMNS, which only requires column
// level privs.
if (isDescTableBasic) {
stmtAuthObject = HiveAuthzPrivilegesMap.getHiveAuthzPrivileges(HiveOperation.SHOWCOLUMNS);
}
// 根据操作的对象提取对象的信息,分别存放到输入、输出的认证模型中
switch (stmtAuthObject.getOperationScope()) {
case SERVER :
// validate server level privileges if applicable. Eg create UDF,register jar etc ..
List<DBModelAuthorizable> serverHierarchy = new ArrayList<DBModelAuthorizable>();
serverHierarchy.add(hiveAuthzBinding.getAuthServer());
inputHierarchy.add(serverHierarchy);
break;
case DATABASE:
// workaround for database scope statements (create/alter/drop db)
List<DBModelAuthorizable> dbHierarchy = new ArrayList<DBModelAuthorizable>();
dbHierarchy.add(hiveAuthzBinding.getAuthServer());
dbHierarchy.add(currDB);
inputHierarchy.add(dbHierarchy);
if (currOutDB != null) {
List<DBModelAuthorizable> outputDbHierarchy = new ArrayList<DBModelAuthorizable>();
outputDbHierarchy.add(hiveAuthzBinding.getAuthServer());
outputDbHierarchy.add(currOutDB);
outputHierarchy.add(outputDbHierarchy);
} else {
outputHierarchy.add(dbHierarchy);
}
getInputHierarchyFromInputs(inputHierarchy, inputs);
break;
case TABLE:
// workaround for add partitions
if(partitionURI != null) {
inputHierarchy.add(ImmutableList.of(hiveAuthzBinding.getAuthServer(), partitionURI));
}
if(indexURI != null) {
outputHierarchy.add(ImmutableList.of(hiveAuthzBinding.getAuthServer(), indexURI));
}
getInputHierarchyFromInputs(inputHierarchy, inputs);
for (WriteEntity writeEntity: outputs) {
if (filterWriteEntity(writeEntity)) {
continue;
}
List<DBModelAuthorizable> entityHierarchy = new ArrayList<DBModelAuthorizable>();
entityHierarchy.add(hiveAuthzBinding.getAuthServer());
entityHierarchy.addAll(getAuthzHierarchyFromEntity(writeEntity));
outputHierarchy.add(entityHierarchy);
}
// workaround for metadata queries.
// Capture the table name in pre-analyze and include that in the input entity list
if (currTab != null) {
List<DBModelAuthorizable> externalAuthorizableHierarchy = new ArrayList<DBModelAuthorizable>();
externalAuthorizableHierarchy.add(hiveAuthzBinding.getAuthServer());
externalAuthorizableHierarchy.add(currDB);
externalAuthorizableHierarchy.add(currTab);
inputHierarchy.add(externalAuthorizableHierarchy);
}

// workaround for DDL statements
// Capture the table name in pre-analyze and include that in the output entity list
if (currOutTab != null) {
List<DBModelAuthorizable> externalAuthorizableHierarchy = new ArrayList<DBModelAuthorizable>();
externalAuthorizableHierarchy.add(hiveAuthzBinding.getAuthServer());
externalAuthorizableHierarchy.add(currOutDB);
externalAuthorizableHierarchy.add(currOutTab);
outputHierarchy.add(externalAuthorizableHierarchy);
}
break;
case FUNCTION:
/* The 'FUNCTION' privilege scope currently used for
* - CREATE TEMP FUNCTION
* - DROP TEMP FUNCTION.
*/
if (!udfURIs.isEmpty()) {
List<DBModelAuthorizable> udfUriHierarchy = new ArrayList<DBModelAuthorizable>();
udfUriHierarchy.add(hiveAuthzBinding.getAuthServer());
udfUriHierarchy.addAll(udfURIs);
inputHierarchy.add(udfUriHierarchy);
for (WriteEntity writeEntity : outputs) {
List<DBModelAuthorizable> entityHierarchy = new ArrayList<DBModelAuthorizable>();
entityHierarchy.add(hiveAuthzBinding.getAuthServer());
entityHierarchy.addAll(getAuthzHierarchyFromEntity(writeEntity));
outputHierarchy.add(entityHierarchy);
}
}
break;
case CONNECT:
/* The 'CONNECT' is an implicit privilege scope currently used for
* - USE <db>
* It's allowed when the user has any privilege on the current database. For application
* backward compatibility, we allow (optional) implicit connect permission on 'default' db.
*/
List<DBModelAuthorizable> connectHierarchy = new ArrayList<DBModelAuthorizable>();
connectHierarchy.add(hiveAuthzBinding.getAuthServer());
// by default allow connect access to default db
Table currTbl = Table.ALL;
Column currCol = Column.ALL;
if ((DEFAULT_DATABASE_NAME.equalsIgnoreCase(currDB.getName()) &&
"false".equalsIgnoreCase(authzConf.
get(HiveAuthzConf.AuthzConfVars.AUTHZ_RESTRICT_DEFAULT_DB.getVar(), "false")))) {
currDB = Database.ALL;
currTbl = Table.SOME;
}
connectHierarchy.add(currDB);
connectHierarchy.add(currTbl);
connectHierarchy.add(currCol);
inputHierarchy.add(connectHierarchy);
outputHierarchy.add(connectHierarchy);
break;
case COLUMN:
for (ReadEntity readEntity: inputs) {
if (readEntity.getAccessedColumns() != null && !readEntity.getAccessedColumns().isEmpty()) {
addColumnHierarchy(inputHierarchy, readEntity);
} else {
List<DBModelAuthorizable> entityHierarchy = new ArrayList<DBModelAuthorizable>();
entityHierarchy.add(hiveAuthzBinding.getAuthServer());
entityHierarchy.addAll(getAuthzHierarchyFromEntity(readEntity));
entityHierarchy.add(Column.ALL);
inputHierarchy.add(entityHierarchy);
}
}
break;
default:
throw new AuthorizationException("Unknown operation scope type " +
stmtAuthObject.getOperationScope().toString());
}
HiveAuthzBinding binding = null;
try {
// 这里比较关键,是通过远程调用获取用户的权限
binding = getHiveBindingWithPrivilegeCache(hiveAuthzBinding, context.getUserName());
} catch (SemanticException e) {
// Will use the original hiveAuthzBinding
binding = hiveAuthzBinding;
}
// validate permission
// 最后一步验证权限
binding.authorize(stmtOperation, stmtAuthObject, getCurrentSubject(context), inputHierarchy,
outputHierarchy);
}

由于获取binding的会通过远程调用获取权限,因此有必要看一下这一步的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static HiveAuthzBinding getHiveBindingWithPrivilegeCache(HiveAuthzBinding hiveAuthzBinding,
String userName) throws SemanticException {
// get the original HiveAuthzBinding, and get the user's privileges by AuthorizationProvider
AuthorizationProvider authProvider = hiveAuthzBinding.getCurrentAuthProvider();
// 这一步很关键,查看可以知道这一步会通过远程调用获取用户的权限
Set<String> userPrivileges = authProvider.getPolicyEngine().getPrivileges(
authProvider.getGroupMapping().getGroups(userName), hiveAuthzBinding.getActiveRoleSet(),
hiveAuthzBinding.getAuthServer());
// create PrivilegeCache using user's privileges
// 这一步主要是将之前获取到的权限数据缓存,便于在同一次会话中使用
PrivilegeCache privilegeCache = new SimplePrivilegeCache(userPrivileges);
try {
// create new instance of HiveAuthzBinding whose backend provider should be SimpleCacheProviderBackend
return new HiveAuthzBinding(HiveAuthzBinding.HiveHook.HiveServer2, hiveAuthzBinding.getHiveConf(),
hiveAuthzBinding.getAuthzConf(), privilegeCache);
} catch (Exception e) {
LOG.error("Can not create HiveAuthzBinding with privilege cache.");
throw new SemanticException(e);
}
}

至于最后一步权限的校验逻辑就不再赘述了,具体可以查看相应的代码。由上面的代码分析可以知道, Hook这里实现了授权(SentryGrantRevokeTask)、鉴权(#getHiveBindingWithPrivilegeCache), 也即是用户的所有的请求都会被这一层拦截,并针对对象的类型执行相关的操作。client端的鉴权操作到此可以告一段落

服务端鉴权操作

在client端鉴权操作的过程中我们看到了客户端会通过远程调用执行授权操作,也会通过远程调用执行鉴权操作,这个调用的过程是通过 RPC调用来实现的, 接下来我们看一下服务端的的操作,服务端的操作不止包含了鉴权和授权的操作,还包含了其他的操作,因此我们有必要先概要的介绍 一下服务端的操作,然后 详细的介绍一下鉴权和授权的操作。通过上面的整体架构图我们可以知道,服务端的入口在sentryService这个类中,主要的组成部 分如上图所示其中包含的模块和每一个模块的重要作用如下:

  • thrift server:针对hook、listener发送过来的事件做相应的处理操作,处理的逻辑在sentryPolicyStoreProcessor
  • sentryWebServer:sentryService内置的一个基于jetty的webserver,有对应的界面:http://${sentry.server.url}:29000,查看了一下主要是一些配置信息和一些监控数据
  • sentryStore:(是一个单例)sentryStore在SentryService的构造函数中初始化的,初始化的时候有针对是否开启HDFS同步做检测。
  • leadMonitor:sentry开启高可用之后,用来监听leader变更的组件
  • HMSFollower:封装了hivemetastoreclient,用于从hivemetastore中同步hive相关的元数据

接下来看一下sentryService的入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public String call() throws Exception {
SentryKerberosContext kerberosContext = null;
try {
status = Status.STARTED;
if (kerberos) {
kerberosContext = new SentryKerberosContext(principal, keytab, true);
Subject.doAs(kerberosContext.getSubject(), new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
runServer();
return null;
}
});
} else {
runServer();
}
} catch (Exception t) {
LOGGER.error("Error starting server", t);
throw new Exception("Error starting server", t);
} finally {
if (kerberosContext != null) {
kerberosContext.shutDown();
}
status = Status.NOT_STARTED;
}
return null;
}

可以看到call方法内仅仅是针对是否开启kerberos进行判断,最终入口都会调用runServer方法,跟进runServer方法来看一下:

1、首先会启动一些清理工作

2、接下来会启动HMSFollower,首先会获取metastore的thrifturi,然后会以500ms的周期定时调度,在hmsfollower构造函数中,会分别实例化 NotificationProcessor:用于处理notification,主要是针对DDL操作通过sentrystore持久化变更(这里会判断是否开启HDFS同步)。 SentryHMSClient:会首先获取sentryStore的notification的id,获取所有的库表信息(没有持久化),接下来再次获取notification的id, 如果两次获取到的id相同,说明在同步库表信息的过程中并没有新的DDL操作,这就表示已经完成同步操作。如果两次notification的id不同, 说明同步库表信息的时候有DDL操作,这个时候只需要继续同步即可。 HiveNotificationFetcher:会使用sentryHMSClient获取notification

3、接下来会构建thrift server,thrift server包含了processor逻辑(用于处理RPC请求,这里主要是权限的一些校验) 和sentrystore(持久化)的逻辑,之后启动sentryService的服务端来处理客户端的请求

4、然后启动sentryWebServer,这里主要是一些配置信息的查看和内置的一些监控数据 上面的鉴权、授权的主要的业务逻辑是在thriftserver的processor中,对应的实现类为:SentryPolicyStoreProcessor, 我们可以具体查看一下相关的逻辑,下面是该类中包含的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public TCreateSentryRoleResponse create_sentry_role(
TCreateSentryRoleRequest request) throws TException {
final Timer.Context timerContext = sentryMetrics.createRoleTimer.time();
TCreateSentryRoleResponse response = new TCreateSentryRoleResponse();
try {
validateClientVersion(request.getProtocol_version());
// 校验当前用户的权限,创建角色是特殊的权限,需要用户所在的group和admin所在的group存在交集,如果不满足要求会抛出异常
authorize(request.getRequestorUserName(),
getRequestorGroups(request.getRequestorUserName()));
// 将对应的角色持久化到sentry的数据库中
sentryStore.createSentryRole(request.getRoleName());
response.setStatus(Status.OK());
// 责任链模式的业务,功能未知
notificationHandlerInvoker.create_sentry_role(request, response);
} catch (SentryAlreadyExistsException e) {
String msg = "Role: " + request + " already exists.";
LOGGER.error(msg, e);
response.setStatus(Status.AlreadyExists(e.getMessage(), e));
} catch (SentryAccessDeniedException e) {
LOGGER.error(e.getMessage(), e);
response.setStatus(Status.AccessDenied(e.getMessage(), e));
} catch (SentryThriftAPIMismatchException e) {
LOGGER.error(e.getMessage(), e);
response.setStatus(Status.THRIFT_VERSION_MISMATCH(e.getMessage(), e));
} catch (Exception e) {
String msg = "Unknown error for request: " + request + ", message: " + e.getMessage();
LOGGER.error(msg, e);
response.setStatus(Status.RuntimeError(msg, e));
} finally {
timerContext.stop();
}
try {
AUDIT_LOGGER.info(JsonLogEntityFactory.getInstance()
.createJsonLogEntity(request, response, conf).toJsonFormatLog());
} catch (Exception e) {
// if any exception, log the exception.
String msg = "Error creating audit log for create role: " + e.getMessage();
LOGGER.error(msg, e);
}
return response;
}

上面代码核心代码只有三行,我们已经对其进行标注,其他的业务处理流程类似,具体可以查看相关的代码,这里不再赘述。

表变更事件同步

client端触发notification的地方是SentrySyncHMSNotificationsPostEventListener这个类,如下: 上面这些方法在监听到特定的事件的时候会触发notification,查看hive的metastore对应的数据库发现有针 对notification的记录,如下为监听到特定事件之后的client的触发机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void syncNotificationEvents(ListenerEvent event, String eventName) {
// Do not sync notifications if the event has failed.
if (failedEvent(event, eventName)) {
return;
}
Map<String, String> eventParameters = event.getParameters();
if (!eventParameters.containsKey(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME)) {
return;
}
/* If the HMS is running in an active transaction, then we do not want to sync with Sentry
* because the desired eventId is not available for Sentry yet, and Sentry may block the HMS
* forever or until a read time-out happens. */
if (isMetastoreTransactionActive(eventParameters)) {
return;
}
long eventId =
Long.parseLong(eventParameters.get(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME));
// This check is only for performance reasons to avoid calling the sync thrift call if the Sentry server
// already processed the requested eventId.
if (eventId <= latestProcessedId.get()) {
return;
}
// 上面是对event的合法性做的校验,可以不用关心
try(SentryPolicyServiceClient sentryClient = this.getSentryServiceClient()) {
LOGGER.debug("Starting Sentry/HMS notifications sync for {} (id: {})", eventName, eventId);
// 将最新的notificationId通知到服务端
long sentryLatestProcessedId = sentryClient.syncNotifications(eventId);
LOGGER.debug("Finishedd Sentry/HMS notifications sync for {} (id: {})", eventName, eventId);
LOGGER.debug("Latest processed event ID returned by the Sentry server: {}", sentryLatestProcessedId);
// 更新本地的latestProcessedId,默认的情况下,在client启动的时候latestProcessedId是0
updateProcessedId(sentryLatestProcessedId);
} catch (Exception e) {
// This error is only logged. There is no need to throw an error to Hive because HMS sync is called
// after the notification is already generated by Hive (as post-event).
LOGGER.error("Failed to sync requested HMS notifications up to the event ID: " + eventId, e);
}
}

如下,还是在SentryPolicyStoreProcessor这个类中,服务端会对客户端发过来的消息进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public TSentrySyncIDResponse sentry_sync_notifications(TSentrySyncIDRequest request)
throws TException {
TSentrySyncIDResponse response = new TSentrySyncIDResponse();
try (Timer.Context timerContext = hmsWaitTimer.time()) {
// Wait until Sentry Server processes specified HMS Notification ID.
// 阻塞服务端,直至处理完该notification
response.setId(sentryStore.getCounterWait().waitFor(request.getId()));
response.setStatus(Status.OK());
} catch (InterruptedException e) {
String msg = String.format("wait request for id %d is interrupted",
request.getId());
LOGGER.error(msg, e);
response.setId(0);
response.setStatus(Status.RuntimeError(msg, e));
Thread.currentThread().interrupt();
} catch (TimeoutException e) {
String msg = String.format("timed out wait request for id %d", request.getId());
LOGGER.warn(msg, e);
response.setId(0);
response.setStatus(Status.RuntimeError(msg, e));
}
return response;
}

核心代码只有一行,我们看到在sentryStore中包含了一个counterWaiter,该变量会用在thriftserver和hmsfollower同步notification, 存放在sentryStore并不合适,只不过sentryStore是thriftserver和hmsfollower唯一的纽带,后面可能会调整。 在上面的代码中,通过counterWaiter实现了notification的发布机制,真正的处理逻辑在hmsfollower中, 因此我们接下来看一下hmsfollower是怎么处理这些事件的。Hmsfollower的构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor,
HiveConnectionFactory hiveConnectionFactory, String authServerName) {
LOGGER.info("HMSFollower is being initialized");
readyToServe = false;
authzConf = conf;
this.leaderMonitor = leaderMonitor;
sentryStore = store;
if (authServerName == null) {
authServerName = conf.get(AUTHZ_SERVER_NAME.getVar(),
conf.get(AUTHZ_SERVER_NAME_DEPRECATED.getVar(), AUTHZ_SERVER_NAME_DEPRECATED.getDefault()));
}
// 用于处理从hive metastore接收到的数据
notificationProcessor = new NotificationProcessor(sentryStore, authServerName, authzConf);
// 包装了hiveMetaStore的客户端,用于从hive元数据仓库获取相应的数据
client = new SentryHMSClient(authzConf, hiveConnectionFactory);
hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(authzConf); // no cache to test different settings for hdfs sync
// 使用封装的client获取相应的数据
notificationFetcher = new HiveNotificationFetcher(sentryStore, hiveConnectionFactory);
// subscribe to full update notification
if (conf.getBoolean(ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB, false)) {
LOGGER.info(FULL_UPDATE_TRIGGER + "subscribing to topic " + PubSub.Topic.HDFS_SYNC_HMS.getName());
PubSub.getInstance().subscribe(PubSub.Topic.HDFS_SYNC_HMS, this);
}
}

比较重要的地方都已经用中文注释,从名字可以大致看出来notificationFetcher是用来从hive的metastore中拉取notification的,notificationProcessor是用来处理获取到的notification的。 由于hms是定时调度的,因此我们可以通过定时调度的方法作为入口来分析hmsfollower,对应的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void run() {
// 用于发布HMSFollower的状态
SentryStateBank.enableState(HMSFollowerState.COMPONENT,HMSFollowerState.STARTED);
long lastProcessedNotificationId;
try {
try {
// Initializing lastProcessedNotificationId based on the latest persisted notification ID.
lastProcessedNotificationId = sentryStore.getLastProcessedNotificationID();
} catch (Exception e) {
LOGGER.error("Failed to get the last processed notification id from sentry store, "
+ "Skipping the processing", e);
return;
}
// Wake any clients connected to this service waiting for HMS already processed notifications.
// 唤醒所有在lastProcessedNotificationId上等待的client,这里的client对应的是从hivemetastore同步数据的client(猜测)
wakeUpWaitingClientsForSync(lastProcessedNotificationId);
// Only the leader should listen to HMS updates
if (!isLeader()) {
// Close any outstanding connections to HMS
close();
return;
}
// 处理新到达的notifications
syncupWithHms(lastProcessedNotificationId);
} finally {
// 重置状态
SentryStateBank.disableState(HMSFollowerState.COMPONENT,HMSFollowerState.STARTED);
}
}

核心的代码在syncupWithHms中,最终方法会调度到notificationProcessor.processNotificationEvent(event), 进入该方法发现会针对特定的事件执行权限的变更操作,具体的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
try (Context ignored = timer.time()) {
switch (eventType) {
case CREATE_DATABASE:
return processCreateDatabase(event);
case DROP_DATABASE:
return processDropDatabase(event);
case CREATE_TABLE:
return processCreateTable(event);
case DROP_TABLE:
return processDropTable(event);
case ALTER_TABLE:
return processAlterTable(event);
case ADD_PARTITION:
return true; // return processAddPartition(event);
case DROP_PARTITION:
return true; // return processDropPartition(event);
case ALTER_PARTITION:
return true; //return processAlterPartition(event);
default:
LOGGER.error("Notification with ID:{} has invalid event type: {}", event.getEventId(),
event.getEventType());
return false;
}
}
}

进入到对应的方法查看,发现根据具体的事件同步的变更权限,如:删除数据库会把数据库相关的权限的数据都删除。在完成同步操作之后会记录到数据库中本次metastore的变更。

Hdfs namenode鉴权

元数据同步

hdfs namenode会定期从sentry server中同步权限以及hive的元数据,程序的入口在SentryAuthorizationProvider,最终会通过SentryAuthorizationInfo定时更新authzPaths和authzPermissions,核心代码在类SentryAuthorizationInfo中,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private boolean update() {
//Looks like getting same updates multiple times
SentryAuthzUpdate updates = updater.getUpdates();
// Updates can be null if Sentry Service is un-reachable
if (updates != null) {
if (updates.isEmpty()) {
return true; // no updates is a norm, it's still success
}
UpdateableAuthzPaths newAuthzPaths = processUpdates(
updates.getPathUpdates(), authzPaths);
UpdateableAuthzPermissions newAuthzPerms = processUpdates(
updates.getPermUpdates(), authzPermissions);
// processUpdates() should return different newAuthzPaths and newAuthzPerms object references
// if FULL updates were fetched from the Sentry server, otherwise, the same authzPaths and authzPermissions
// objects will be returned.
if (newAuthzPaths != authzPaths || newAuthzPerms != authzPermissions) {
lock.writeLock().lock();
try {
if (LOG.isDebugEnabled()) {
LOG.debug(updates.dumpContent());
}
if (newAuthzPaths != authzPaths) {
LOG.info(String.format("FULL Updated paths seq Num [old=%d], [new=%d]",
authzPaths.getLastUpdatedSeqNum(), newAuthzPaths.getLastUpdatedSeqNum()));
authzPaths = newAuthzPaths;
if (LOG.isTraceEnabled()) {
LOG.trace(authzPaths.dumpContent());
}
}
if (newAuthzPerms != authzPermissions) {
LOG.info(String.format("FULL Updated perms seq Num [old=%d], [new=%d]",
authzPermissions.getLastUpdatedSeqNum(), newAuthzPerms.getLastUpdatedSeqNum()));
authzPermissions = newAuthzPerms;
if (LOG.isTraceEnabled()) {
LOG.trace(authzPermissions.dumpContent());
}
}
} finally {
lock.writeLock().unlock();
}
} else {
if (LOG.isDebugEnabled()) {
lock.writeLock().lock();
try {
LOG.debug(updates.dumpContent());
if (LOG.isTraceEnabled()) {
LOG.trace(newAuthzPaths.dumpContent());
LOG.trace(newAuthzPerms.dumpContent());
}
} finally {
lock.writeLock().unlock();
}
}
}
return true;
}
return false;
}

在上面的代码中,更新权限或者hive元数据的时候,会使用一个readwritelock来实现线程之间的同步,这一块应该是考虑让两个变量的同步更新。这一个锁在更新数据的时候会让其他的读操作阻塞,因此可能会产生性能瓶颈。

鉴权操作

hdfs鉴权操作在类SentryAuthorizationProvider中,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public void checkPermission(String user, Set<String> groups,
INodeAuthorizationInfo[] inodes, int snapshotId,
boolean doCheckOwner, FsAction ancestorAccess, FsAction parentAccess,
FsAction access, FsAction subAccess, boolean ignoreEmptyDir)
throws AccessControlException, UnresolvedLinkException {
if (LOG.isDebugEnabled()) {
// Note: Arrays.asList() returns "[null]" string for null argument
LOG.debug("### checkPermission(): " +
"User {}, Groups {}, Nodes {}, snapshotId {}, " +
"doCheckOwner {}, ancestorAccess {}, parentAccess {}, " +
"access {}, subAccess {}, ignoreEmptyDir {}",
user, groups, Arrays.asList(inodes), snapshotId,
doCheckOwner, ancestorAccess, parentAccess,
access, subAccess, ignoreEmptyDir);
}
long start = System.currentTimeMillis();
//绕过原生NN鉴权过程对x权限的判断,DefaultAuthorizationProvider.checkPermission从跟节点逐级判断x权限
//直接return表示鉴权成功,鉴权失败时会抛异常
//http://km.vivo.xyz/pages/viewpage.action?pageId=85251073
if (!doCheckOwner && ancestorAccess == null && parentAccess == null &&
access == null && subAccess == null && !ignoreEmptyDir) {
if (LOG.isDebugEnabled()) {
LOG.debug("### checkPermission() return: " +
"User {}, Groups {}, Nodes {}, snapshotId {}, " +
"doCheckOwner {}, ancestorAccess {}, parentAccess {}, " +
"access {}, subAccess {}, ignoreEmptyDir {}",
user, groups, Arrays.asList(inodes), snapshotId,
doCheckOwner, ancestorAccess, parentAccess,
access, subAccess, ignoreEmptyDir);
}
return;
}
//filter to check
//过滤hive执行mr和spark任务产生的临时文件的鉴权
if (isFilter(inodes, "checkPermission()")) {
return;
}
try {
//只进行一次鉴权
inodes = truncate(inodes, doCheckOwner, ancestorAccess, parentAccess);
if (inodes.length == 1 && access == null) {
if (subAccess != null) {
access = subAccess;
subAccess = null;
} else if (ancestorAccess != null) {
access = ancestorAccess;
ancestorAccess = null;
} else if (parentAccess != null) {
access = parentAccess;
parentAccess = null;
}
}
} catch (Throwable e) {
LOG.warn("### truncate Exception. " +
"User {" + user + "}, Groups {" + groups + "}, Nodes {" + Arrays.asList(inodes) + "}, snapshotId {" + snapshotId + "}, " +
"doCheckOwner {" + doCheckOwner + "}, ancestorAccess {" + ancestorAccess + "}, parentAccess {" + parentAccess + "}, " +
"access {" + access + "}, subAccess {" + subAccess + "}, ignoreEmptyDir {" + ignoreEmptyDir + "}, FullPath {" +
(inodes[0] == null ? "null" : inodes[0].getFullPathName()) + "} taken {" + (System.currentTimeMillis() - start) + "}.", e);
}
long truncateTime = System.currentTimeMillis() - start;
try {
defaultAuthzProvider.checkPermission(user, groups, inodes, snapshotId,
doCheckOwner, ancestorAccess, parentAccess, access, subAccess,
ignoreEmptyDir);
if (LOG.isDebugEnabled()) {
LOG.debug("### checkPermission(): " +
"User {}, Groups {}, Nodes {}, snapshotId {}, " +
"doCheckOwner {}, ancestorAccess {}, parentAccess {}, " +
"access {}, subAccess {}, ignoreEmptyDir {}, FullPath {} taken {}, truncateTime {}",
user, groups, Arrays.asList(inodes), snapshotId,
doCheckOwner, ancestorAccess, parentAccess,
access, subAccess, ignoreEmptyDir, inodes[0] == null ? "null" : inodes[0].getFullPathName(),
(System.currentTimeMillis() - start), truncateTime);
}
} catch (AccessControlException e) {
LOG.debug("### AccessControlException", e);
throw e;
} catch (UnresolvedLinkException e) {
LOG.debug("### UnresolvedLinkException", e);
throw e;
} catch (RuntimeException e) {
LOG.error("### Unexpected Exception", e);
throw e;
}
}

在上面的代码中我们可以看到,最终是由于defaultAuthzProvider来进行权限的校验的,因此SentryAuthorizationProvider更多的像是一个门面。

总结

sentry权限的管控用到了hive的hook、listener机制,本质上就是一个切面操作,这里比较重要的,我个人认为应当是DDL、DML操作数据的提取, 这样如果在定制化权限控制的时候才可以做到较快的响应