使用 Purview Python SDK 以编程方式记录 Microsoft Purview 中的表列

在组织中不断增长的数据战略项目中,数据治理是首要任务,以确保数据工件得到治理、协调和记录良好,以便业务用户有意义地理解和解释数据。

作为数据治理工作的一部分,维护与数据实体相关的数据术语表/字典成为数据管理员的首要任务。 借助 Microsoft Purview,注册数据源后,数据所有者就可以开始记录实体。

有多种方法可以以编程方式使用 Microsoft Purview 实体,特别是在需要执行批量操作(例如动态记录大量表和列)时。

本文展示了如何使用 Python SDK 让 Purview 以编程方式批量记录 Purview 表列 – 假设有许多表和列需要根据参考表自动记录 – 如本示例中所示,数据字典在 Excel 中维护。

另一方面, 权限 REST API 可用于本机使用 REST API,而适用于 Purview 的 Python SDK 是一个包装器,可以更轻松地以编程方式与后端的 Purview Atlas REST API 进行交互。

对于其他 SDK 选项,请参阅以下文档:

爪哇: azure-sdk-for-java/sdk/purview/azure-analytics-purview-catalog/src/samples 位于 main · Azure/azure-sdk…

。网: 适用于 .NET 的 Azure Purview SDK – 适用于 .NET 开发人员的 Azure | 微软学习

电源外壳: Az.Purview 模块 | 微软学习

假设数据字典保存在 Excel 电子表格中。 它可以是任何文本文件,甚至可以是数据库中维护的表。 就本文而言,它基于 Excel 文件。 数据所有者可以根据需要对其进行更新。 该架构很简单,如下所示,其中三列定义表名称、列及其描述。 Excel 电子表格可以位于任何位置,在撰写本文时,Excel 文件保存在 Fabric OneLake 中。

继续,文件被加载到 Pandas 数据框中; Fabric 中的 PySpark 笔记本。

计算机上需要安装以下Python SDK包,您可以使用pip install命令安装它们:

  • azure 权限扫描
  • 天蓝色身份
  • azure 权限管理
  • azure 权限目录
  • azure 权限帐户
  • 天蓝色核心

接下来,导入包如下:

from azure.purview.scanning import PurviewScanningClient
from azure.purview.catalog import PurviewCatalogClient
from azure.purview.administration.account import PurviewAccountClient
from azure.identity import ClientSecretCredential 
from azure.core.exceptions import HttpResponseError

import os, json, requests
import pandas as pd

从 Purview 端,确保服务主体分配给以下 Purview 角色 – 这些角色将在集合中分配。 Python 脚本将通过服务主体进行身份验证,其中需要这些角色来为此方案编写 SDK。

  • 角色:集合管理员
  • 角色:数据源管理员
  • 角色:数据管理者
  • 角色:数据读取器

Danaraj_1-1713114972587.png

返回笔记本,初始化服务主体身份验证所需的变量。


#Consider using Key Vault to store credentials
client_id = "" 
client_secret = ""
tenant_id = "

这些值可以从 Microsoft Entra 中注册的应用程序轻松获取。 请参阅此处有关创建服务主体的文档:
在门户中创建 Microsoft Entra 应用程序和服务主体 - Microsoft 身份平台 | 麦克风...

安装所需的包并初始化服务主体相关变量后,编排的第一步是读取源数据字典 Excel 文件。

#Files/DataDictionary.xlsx
datadict = pd.read_excel('/lakehouse/default/Files/DataDictionary.xlsx', sheet_name="Data dictionary")

接下来,定义了正在调用的函数,这些函数构成了脚本的大部分。

collection_name = "datawarehouse"
reference_name_purview = "enterprisepurview" #Purview account name

def get_credentials():
    credentials = ClientSecretCredential(client_id=client_id, client_secret=client_secret, tenant_id=tenant_id)
    return credentials

获取凭据() 函数初始化 ClientSecretCredential 对象,该对象用作包装器以针对 Purview API 进行身份验证。

def get_catalog_client(reference_name_purview):
    credentials = get_credentials()
    client = PurviewCatalogClient(endpoint=f" credential=credentials, logging_enable=True)
    return client

get_catalog_client(reference_name_purview) 初始化主要访问 Purview 目录接口的 PurviewCatalogClient 对象。 这是客户端包装器,主要用于执行列描述的更新。

def get_admin_client(reference_name_purview):
    credentials = get_credentials()
    client = PurviewAccountClient(endpoint=f" credential=credentials, logging_enable=True)
    return client

get_admin_client(参考名称_权限) 初始化用于访问管理权限操作(包括查询集合对象)的客户端包装器。

def get_collection_Id(collection_name):
    try:
        client = get_admin_client(reference_name_purview)
    except ValueError as e:
        print(e)
    collection_name_unique_id = ''
    
    collection_list = client.collections.list_collections()
    for collection in collection_list:
        print(collection["friendlyName"])
        if collection["friendlyName"].lower() == collection_name.lower():
            collection_name_unique_id = collection["name"]
    return collection_name_unique_id

get_collection_id(集合名称) 函数根据指定要查询的集合名称查询集合以获取集合 id – 请注意 集合名称 之前初始化的变量。

这是在创建集合时分配的更友好的集合名称,但是,此处的集合 id 很重要,可以通过以下函数以编程方式访问通过以下函数获取的集合项:

def queryCollection( collection_name, reference_name_purview):
    purview_endpoint = f"
    payload= {
    "keywords": "*",
    "filter": {
        "and" : [
            {
                "or" :[
                    {
                        "collectionId":get_collection_Id(collection_name)
                    }
                ]
            },
            {
               #Additional properties can be specified here to selectively choose the tables e.g., by qualifiedName, database server name and etc.
               'entityType'  : 'azure_sql_table'
            }
        ]
    }
    }
    
    # create the catalog client
    try:
        catalog_client = get_catalog_client(reference_name_purview)
    except ValueError as e:
        print(e)

    json_results = catalog_client.discovery.query(payload)
    
    return json_results

通过传入两个参数集合名称和引用名称(即 Purview 帐户名称)来调用 queryCollection 函数,将返回 JSON 对象,如下所示。

Danaraj_2-1713115405326.png

提示: 该功能可以进一步增强,添加过滤器参数以选择性地专门选择实体 - 可以从数据字典文件中对其进行参数化。 如果表名维护在数据字典文件中,则可以循环此函数来选取所有表,并调用此函数来动态迭代列。

上面的 JSON 对象列出了在指定集合名称中找到的所有项目,这里感兴趣的是 Product 表 GUID,它可识别为 azure_sql_表 在里面 实体类型 对于上面有效负载中的对象之一。

在这种情况下,表就是实体。 上面的 GUID 用于查询列出给定实体(表)的列的实体项。

def getColumnDescription(columnName, tableName):
    lookupCondition = datadict[(datadict["Column"] == columnName) & (datadict["Table"] == tableName)]
    if len(lookupCondition) > 0:
        return lookupCondition['Description'].iat[0]

上面的单元格定义了另一个辅助函数,该函数根据传递的参数、表和列名称返回列描述。 查找基于加载到 Pandas 数据帧中的数据字典映射表。

代码的下一个单元格调用该函数 get_catalog_client(reference_name_purview) 初始化目录客户端包装对象以充当客户端接口。 然后使用客户端接口调用实体方法来查询实体项 - 请注意第 2 行中指定的表的 guid。

#Get entity by GUID
#Run the queryCollection function to query the collection items and find the table of interest to be documented
#GUID for the table of interest - e.g. Product
#Replace the GUID accordinly - below is for example only.
guid = "11698937-5942-4f90-ba7e-4bf6f6f60000" 

catalog_client = get_catalog_client(reference_name_purview)
entity_response = catalog_client.entity.get_by_guid(guid)

response_Entities = entity_response['referredEntities']

#Iterate all the columns in the collection and update its description based on mapping table
for guid in response_Entities.keys():

    #get description from mapping table and update description in entity_response payload
    entity_response['referredEntities']https://techcommunity.microsoft.com/t5/azure-architecture-blog/programmatically-documenting-table-columns-in-microsoft-purview/ba-p/4113919['attributes']['userDescription'] = getColumnDescription(entity_response['referredEntities']https://techcommunity.microsoft.com/t5/azure-architecture-blog/programmatically-documenting-table-columns-in-microsoft-purview/ba-p/4113919['attributes']['qualifiedName'].split("https://techcommunity.microsoft.com/")[-1].split("#")[-1] ,entity_response['referredEntities']https://techcommunity.microsoft.com/t5/azure-architecture-blog/programmatically-documenting-table-columns-in-microsoft-purview/ba-p/4113919['attributes']['qualifiedName'].split("https://techcommunity.microsoft.com/")[-1].split("#")[0] )
    
catalog_client.entity.create_or_update(entity_response)

上面的新单元迭代所引用实体的 GUID,其中该单元之前已将变量初始化为 响应实体。 referedEntities 保存给定实体(表)的列信息。

GUID 用于访问集合中的referedEntity 项,并以编程方式分配属性属性userDescription(即列描述)。

上面的代码从枚举的referedEntity项目中提取列和表名称,该项目作为参数传递给函数getColumnDescription。 函数 getColumnDescription 根据映射表返回列描述。

合格名称 如下所示保存表名和列名,因此,一个单独提取其值的脚本如下所示。

Danaraj_0-1713190672562.png

第 5 行中的entity_response 初始化更新为实体返回的entity_response 的JSON 负载——在本例中为列描述(userDescription)。

单元格 7 的最后一行调用 create_or_update(entity_response)。

脚本运行完毕后 创建或更新 方法,前往 Purview 门户并查找产品表的列及其描述记录 - 请注意,Excel 映射文件中只列出了列的子集。 :微笑的脸和微笑的眼睛:

Danaraj_4-1713115764095.png

完整代码:

from azure.purview.scanning import PurviewScanningClient
from azure.purview.catalog import PurviewCatalogClient
from azure.purview.administration.account import PurviewAccountClient
from azure.identity import ClientSecretCredential 
from azure.core.exceptions import HttpResponseError

import os, json, requests
import pandas as pd

#Consider using Key Vault to store credentials
client_id = "" 
client_secret = ""
tenant_id = " 0:
        return lookupCondition['Description'].iat[0]

#Get entity by GUID
#Run the queryCollection function to query the collection items and find the table of interest to be documented
guid = "11698937-5942-4f90-ba7e-4bf6f6f60000" 

catalog_client = get_catalog_client(reference_name_purview)
entity_response = catalog_client.entity.get_by_guid(guid)

response_Entities = entity_response['referredEntities']

#Iterate all the columns in the collection and update its description based on mapping table
for guid in response_Entities.keys():

    #get description from mapping table and update description in entity_response payload
    entity_response['referredEntities']https://techcommunity.microsoft.com/t5/azure-architecture-blog/programmatically-documenting-table-columns-in-microsoft-purview/ba-p/4113919['attributes']['userDescription'] = getColumnDescription(entity_response['referredEntities']https://techcommunity.microsoft.com/t5/azure-architecture-blog/programmatically-documenting-table-columns-in-microsoft-purview/ba-p/4113919['attributes']['qualifiedName'].split("https://techcommunity.microsoft.com/")[-1].split("#")[-1] ,entity_response['referredEntities']https://techcommunity.microsoft.com/t5/azure-architecture-blog/programmatically-documenting-table-columns-in-microsoft-purview/ba-p/4113919['attributes']['qualifiedName'].split("https://techcommunity.microsoft.com/")[-1].split("#")[0] )
    
catalog_client.entity.create_or_update(entity_response)

总之,整个程序可以自动化,使用数据字典参考表动态记录 Purview 中的实体。 本文提供了如何使用 Python SDK for Purview 以编程方式记录 Purview 中的实体(通常是需要记录大量实体的情况)的指南。 功能 查询集合 将是添加更多参数以动态循环要记录的实体的函数。 整个 Python 脚本可以与部署到的代码一起编排 Azure 函数 或者通过 Fabric 中的笔记本安排按计划的时间间隔运行 - 当然可以考虑几个选项。

使用 Python SDK 时,请在此处阅读有关实体负载的更多信息: azure.purview.catalog.operations 包 — 适用于 Python 2.0.0 的 Azure SDK 文档 (windows.net)

感谢我的同事 Samarendra Panda 分享设计模式。
此处维护的 GitHub 存储库: purview-playground/metadata_extract_for_reporting 位于 main · Sam-Panda/purview-playground · GitHub。

有关探索 Purview 的 REST API 的其他文章:使用 Python 探索 Purview 的 REST API (microsoft.com)

有关 REST API 文档,请参阅此处: 实体-REST API(Azure 权限)| 微软学习。

1713533015
#使用 #Purview #Python #SDK #以编程方式记录 #Microsoft #Purview #中的表列
2024-04-15 15:57:13

Leave a Reply

Your email address will not be published. Required fields are marked *

近期新闻​

编辑精选​