• [基础知识] 关于*/minddata/dataset/core/config_manager.cc的注释
    这是一个配置管理器,用于参数的初始化和设置。 下面是关于代码段的简单标注#include "minddata/dataset/core/config_manager.h"//导入config_manager.h,以便使用其中得函数#include <fstream>#include <iostream>#include <limits>#include <string>#include <thread>#include <utility>/* 如果程序没有定义ENABLE_ANDROID,则执行#include "utils/log_adapter.h",否则执行include "mindspore/lite/src/common/log_adapter.h"。我认为,用这种,可以很方便的开启/关闭整个程序的某项特定功能。 */#ifndef ENABLE_ANDROID//用于使能安卓#include "utils/log_adapter.h"#else#include "mindspore/lite/src/common/log_adapter.h"#endif#include "minddata/dataset/util/system_pool.h"namespace mindspore {namespace dataset {/* 这里进行数据的配置,包括并行工作的数量、 工作连接器尺寸、op连接器尺寸、等级id等。然后获取环境的变量值,根据宿主和端口的值来判断环境变量的MS_CACHE_PORT是否有效。根据 *end != '\0' 来判断端口失效了,因为端口范围验证在验证检查期间生成了一个错误。*/ ConfigManager::ConfigManager(): num_parallel_workers_(kCfgParallelWorkers),worker_connector_size_(kCfgWorkerConnectorSize),//op_connector_size_(kCfgOpConnectorSize),rank_id_(kCfgDefaultRankId),seed_(kCfgDefaultSeed),numa_enable_(false),monitor_sampling_interval_(kCfgMonitorSamplingInterval),stop_profiler_(false),file_ready_(true),callback_timout_(kCfgCallbackTimeout),cache_host_(kCfgDefaultCacheHost),cache_port_(kCfgDefaultCachePort),num_connections_(kDftNumConnections),prefetch_size_(kDftPrefetchSize),auto_num_workers_(kDftAutoNumWorkers),num_cpu_threads_(std::thread::hardware_concurrency()),auto_num_workers_num_shards_(1),auto_worker_config_(0) {num_cpu_threads_ = num_cpu_threads_ > 0 ? num_cpu_threads_ : std::numeric_limits<uint16_t>::max();num_parallel_workers_ = num_parallel_workers_ < num_cpu_threads_ ? num_parallel_workers_ : num_cpu_threads_;auto env_cache_host = std::getenv("MS_CACHE_HOST");//获取环境变量值(宿主)auto env_cache_port = std::getenv("MS_CACHE_PORT");//获取环境变量值(端口)if (env_cache_host != nullptr) {cache_host_ = env_cache_host;}if (env_cache_port != nullptr) {char *end = nullptr;cache_port_ = strtol(env_cache_port, &end, 10);if (*end != '\0') {MS_LOG(WARNING) << "Cache port from env variable MS_CACHE_PORT is invalid\n";cache_port_ = 0; // 因为端口范围验证在验证检查期间生成错误}}}/* 一个用于调试的打印方法,输出一个流 */void ConfigManager::Print(std::ostream &out) const {// 不要显示测试/内部的。这里只显示主要的。// 仅供参考,boolalpha告诉输出流为bools写“true”和“false”out << "\nClient config settings :"<< "\nParallelOp workers : " << num_parallel_workers_<< "\nParallelOp worker connector size : " << worker_connector_size_<< "\nSize of each Connector : " << op_connector_size_ << std::endl;}/* Private helper函数,采用nlohmann json格式并填充设置,用来设置各种参数 */Status ConfigManager::FromJson(const nlohmann::json &j) {RETURN_IF_NOT_OK(set_num_parallel_workers(j.value("numParallelWorkers", num_parallel_workers_)));set_worker_connector_size(j.value("workerConnectorSize", worker_connector_size_));set_op_connector_size(j.value("opConnectorSize", op_connector_size_));set_seed(j.value("seed", seed_));set_monitor_sampling_interval(j.value("monitorSamplingInterval", monitor_sampling_interval_));set_cache_host(j.value("cacheHost", cache_host_));set_cache_port(j.value("cachePort", cache_port_));set_num_connections(j.value("numConnections", num_connections_));set_prefetch_size(j.value("prefetchSize", prefetch_size_));return Status::OK();}/* 加载带有默认设置的json文件并填充所有设置. 有些设置是强制的,有些则不是(默认设置)。如果设置是可选的,那么如果文件中缺少配置,它将设置一个默认值。*/Status ConfigManager::LoadFile(const std::string &settingsFile) {Status rc;if (!Path(settingsFile).Exists()) {RETURN_STATUS_UNEXPECTED("File is not found.");}// try {std::ifstream in(settingsFile);nlohmann::json js;in >> js;rc = FromJson(js);} catch (const nlohmann::json::type_error &e) {std::ostringstream ss;ss << "Client file failed to load:\n" << e.what();std::string err_msg = ss.str();RETURN_STATUS_UNEXPECTED(err_msg);} catch (const std::exception &err) {RETURN_STATUS_UNEXPECTED("Client file failed to load.");}return rc;}/* 以下是各种用于功能的函数 */// 设置函数(设置并行工作的数量)Status ConfigManager::set_num_parallel_workers(int32_t num_parallel_workers) {if (num_parallel_workers > num_cpu_threads_ || num_parallel_workers < 1) {std::string err_msg = "Invalid Parameter, num_parallel_workers exceeds the boundary between 1 and " +std::to_string(num_cpu_threads_) + ", as got " + std::to_string(num_parallel_workers) + ".";RETURN_STATUS_UNEXPECTED(err_msg);}num_parallel_workers_ = num_parallel_workers;return Status::OK();}// 设置函数(设置工作的连接器尺寸)void ConfigManager::set_worker_connector_size(int32_t connector_size) { worker_connector_size_ = connector_size; }// 设置函数(设置选择的连接器尺寸)void ConfigManager::set_op_connector_size(int32_t connector_size) { op_connector_size_ = connector_size; }// 生成种子uint32_t ConfigManager::seed() const { return seed_; }// 设置等级idvoid ConfigManager::set_rank_id(int32_t rank_id) {if (rank_id_ == kCfgDefaultRankId) rank_id_ = rank_id;}// 设置numa使能void ConfigManager::set_numa_enable(bool numa_enable) { numa_enable_ = numa_enable; }// 设置种子void ConfigManager::set_seed(uint32_t seed) { seed_ = seed; }// 监视采样间隔void ConfigManager::set_monitor_sampling_interval(uint32_t interval) { monitor_sampling_interval_ = interval; }// 停止数据集探查器void ConfigManager::stop_dataset_profiler(bool stop_profiler) { stop_profiler_ = stop_profiler; }// 设置探查器文件的状态void ConfigManager::set_profiler_file_status(bool file_ready) { file_ready_ = file_ready; }// 设置回调超时void ConfigManager::set_callback_timeout(uint32_t timeout) { callback_timout_ = timeout; }// 设置缓存主机void ConfigManager::set_cache_host(std::string cache_host) { cache_host_ = std::move(cache_host); }// 设置缓存端口void ConfigManager::set_cache_port(int32_t cache_port) { cache_port_ = cache_port; }// 设置连接数void ConfigManager::set_num_connections(int32_t num_connections) { num_connections_ = num_connections; }// 设置数据预读的数量void ConfigManager::set_prefetch_size(int32_t prefetch_size) { prefetch_size_ = prefetch_size; }} // namespace dataset} // namespace mindspore
  • [活动体验] lite\tools\benchmark\benchmark.cc&quot;注释9
    ** "\lite\tools\benchmark\benchmark.cc"注释9** ================================= ```python //从 Json 初始化转储配置函数 int Benchmark::InitDumpConfigFromJson(char *path) { auto real_path = RealPath(path);//获取真值路径 std::ifstream ifs(real_path);//获取流 if (!ifs.good()) {//判断文件是否存在 MS_LOG(ERROR) "file: " real_path " is not exist"; return RET_ERROR; } if (!ifs.is_open()) {//判断文件是否打开成功 MS_LOG(ERROR) "file: " real_path " open failed"; return RET_ERROR; } try { dump_cfg_json_ = nlohmann::json::parse(ifs);//对json文件进行解析 } catch (const nlohmann::json::parse_error &error) {//捕获异常 MS_LOG(ERROR) "parse json file failed, please check your file."; return RET_ERROR;//返回错误 } if (dump_cfg_json_[dump::kSettings] == nullptr) {//判断是否通过转储设置 MS_LOG(ERROR) "\"common_dump_settings\" is required."; return RET_ERROR; } if (dump_cfg_json_[dump::kSettings][dump::kMode] == nullptr) {//判断mode是否转储过 MS_LOG(ERROR) "\"dump_mode\" is required."; return RET_ERROR; } if (dump_cfg_json_[dump::kSettings][dump::kPath] == nullptr) {//判断路径是否转储过 MS_LOG(ERROR) "\"path\" is required."; return RET_ERROR; } if (dump_cfg_json_[dump::kSettings][dump::kNetName] == nullptr) {//初始化netname dump_cfg_json_[dump::kSettings][dump::kNetName] = "Default"; } if (dump_cfg_json_[dump::kSettings][dump::kInputOutput] == nullptr) {//初始化inputoutput dump_cfg_json_[dump::kSettings][dump::kInputOutput] = 0; } if (dump_cfg_json_[dump::kSettings][dump::kKernels] != nullptr && !dump_cfg_json_[dump::kSettings][dump::kKernels].empty()) {//判断内核是否为空,以及对应的转储模式是否正确 if (dump_cfg_json_[dump::kSettings][dump::kMode] == 0) { MS_LOG(ERROR) R"("dump_mode" should be 1 when "kernels" isn't empty.)"; return RET_ERROR; } } auto abs_path = dump_cfg_json_[dump::kSettings][dump::kPath].get();//初始化路径和name auto net_name = dump_cfg_json_[dump::kSettings][dump::kNetName].get(); if (abs_path.back() == '\\' || abs_path.back() == '/') { dump_file_output_dir_ = abs_path + net_name;//将path按照要求格式化 } else { #ifdef _WIN32 dump_file_output_dir_ = abs_path + "\\" + net_name;//面对不同的系统用不同方式 #else dump_file_output_dir_ = abs_path + "/" + net_name; #endif } //创建输出目录 auto status = CreateOutputDir(&dump_file_output_dir_); if (status != RET_OK) {//同时判断是否创建成功 MS_LOG(ERROR) "create data output directory failed."; return RET_ERROR; } return RET_OK; } //初始化回调参数函数 int Benchmark::InitCallbackParameter() { int ret = RET_OK; if (flags_->time_profiling_) {//判断时间分析的状态是否正常 ret = InitTimeProfilingCallbackParameter();//初始化时间分析回调参数 } else if (flags_->perf_profiling_) {//判断性能分析 ret = InitPerfProfilingCallbackParameter();//初始化性能分析回调参数 } else if (flags_->print_tensor_data_) {//判断tensordata是否正常print ret = InitPrintTensorDataCallbackParameter();//初始化tensordata } else if (flags_->dump_tensor_data_) {//判断tensordata是否正常转储 ret = InitDumpTensorDataCallbackParameter();//初始化dump_tensor_data } return ret; } //初始化函数· int Benchmark::Init() { if (this->flags_ == nullptr) { return 1; } //输出运行状态的信息 MS_LOG(INFO) "ModelPath = " this->flags_->model_file_; MS_LOG(INFO) "InDataPath = " this->flags_->in_data_file_; MS_LOG(INFO) "InDataType = " this->flags_->in_data_type_in_; MS_LOG(INFO) "LoopCount = " this->flags_->loop_count_; MS_LOG(INFO) "DeviceType = " this->flags_->device_; MS_LOG(INFO) "AccuracyThreshold = " this->flags_->accuracy_threshold_; MS_LOG(INFO) "WarmUpLoopCount = " this->flags_->warm_up_loop_count_; MS_LOG(INFO) "NumThreads = " this->flags_->num_threads_; MS_LOG(INFO) "Fp16Priority = " this->flags_->enable_fp16_; MS_LOG(INFO) "calibDataPath = " this->flags_->benchmark_data_file_; std::cout "ModelPath = " this->flags_->model_file_ std::endl; std::cout "InDataPath = " this->flags_->in_data_file_ std::endl; std::cout "InDataType = " this->flags_->in_data_type_in_ std::endl; std::cout "LoopCount = " this->flags_->loop_count_ std::endl; std::cout "DeviceType = " this->flags_->device_ std::endl; std::cout "AccuracyThreshold = " this->flags_->accuracy_threshold_ std::endl; std::cout "WarmUpLoopCount = " this->flags_->warm_up_loop_count_ std::endl; std::cout "NumThreads = " this->flags_->num_threads_ std::endl; std::cout "Fp16Priority = " this->flags_->enable_fp16_ std::endl; std::cout "calibDataPath = " this->flags_->benchmark_data_file_ std::endl; if (this->flags_->loop_count_ 1) {//判断循环计算的大小是否正确 MS_LOG(ERROR) "LoopCount:" this->flags_->loop_count_ " must be greater than 0"; std::cerr "LoopCount:" this->flags_->loop_count_ " must be greater than 0" std::endl; return RET_ERROR; } if (this->flags_->num_threads_ 1) {//判断线程数量是否正确 MS_LOG(ERROR) "numThreads:" this->flags_->num_threads_ " must be greater than 0"; std::cerr "numThreads:" this->flags_->num_threads_ " must be greater than 0" std::endl; return RET_ERROR; } static std::vector CPU_BIND_MODE_MAP = {"NO_BIND", "HIGHER_CPU", "MID_CPU"};/创建一个容器存储cpu绑定的几种模式图 if (this->flags_->cpu_bind_mode_ >= 1) {//若模式符合,输出相应的模式图类型 MS_LOG(INFO) "cpuBindMode = " CPU_BIND_MODE_MAP[this->flags_->cpu_bind_mode_]; std::cout "cpuBindMode = " CPU_BIND_MODE_MAP[this->flags_->cpu_bind_mode_] std::endl; } else {//不支持之外的mode MS_LOG(INFO) "cpuBindMode = NO_BIND"; std::cout "cpuBindMode = NO_BIND" std::endl; } this->flags_->in_data_type_ = this->flags_->in_data_type_in_ == "img" ? kImage : kBinary; if (!flags_->benchmark_data_type_.empty()) {//判断type是否为空 if (data_type_map_.find(flags_->benchmark_data_type_) == data_type_map_.end()) {//判断是否支持校准数据类型 MS_LOG(ERROR) "CalibDataType not supported: " flags_->benchmark_data_type_.c_str(); return RET_ERROR; } msCalibDataType = data_type_map_.at(flags_->benchmark_data_type_);//输出校准数据类型的type MS_LOG(INFO) "CalibDataType = " flags_->benchmark_data_type_.c_str(); std::cout "CalibDataType = " flags_->benchmark_data_type_.c_str() std::endl; } if (flags_->model_file_.empty()) {//判断model文件路径是否空 MS_LOG(ERROR) "modelPath is required"; std::cerr "modelPath is required" std::endl; return 1; } flags_->InitInputDataList();//初始化iputdatalist flags_->InitResizeDimsList(); if (!flags_->resize_dims_.empty() && !flags_->input_data_list_.empty() &&//检测输入的resizeDims的大小是否符合输入路径大小 flags_->resize_dims_.size() != flags_->input_data_list_.size()) { MS_LOG(ERROR) "Size of input resizeDims should be equal to size of input inDataPath";//不符合则报错 std::cerr "Size of input resizeDims should be equal to size of input inDataPath" std::endl; return RET_ERROR; } if (flags_->device_ != "CPU" && flags_->device_ != "GPU" && flags_->device_ != "NPU") {//判断使用的装置是否支持 MS_LOG(ERROR) "Device type:" flags_->device_ " is not supported."; std::cerr "Device type:" flags_->device_ " is not supported." std::endl; return RET_ERROR; } if (flags_->time_profiling_ && flags_->perf_profiling_) { //启用了时间分析,不会运行性能分析 MS_LOG(INFO) "time_profiling is enabled, will not run perf_profiling."; } // 获取转储数据输出路径 auto dump_cfg_path = std::getenv(dump::kConfigPath); if (dump_cfg_path != nullptr) { flags_->dump_tensor_data_ = true; if (InitDumpConfigFromJson(dump_cfg_path) != RET_OK) {//解析转储文件 MS_LOG(ERROR) "parse dump config file failed."; return RET_ERROR; } } else { //环境中没有 MINDSPORE 转储配置,不需要转储数据 MS_LOG(INFO) "No MINDSPORE_DUMP_CONFIG in env, don't need to dump data"; } auto status = InitCallbackParameter();//初始化回调参数 if (status != RET_OK) {//判断是否回调成功 MS_LOG(ERROR) "Init callback Parameter failed."; std::cerr "Init callback Parameter failed." std::endl; return RET_ERROR; } return RET_OK; } ```
  • [活动体验] minddata\mindrecord\include\shard_column.h的个人理解
    /*开始之前,先给定uintx_t类型的typedef文档typedef signed char int8_t;typedef short int16_t;typedef int int32_t;typedef long long int64_t;//Unsignedtypedef unsigned char uint8_t;typedef unsigned short uint16_t;typedef unsigned int uint32_t;typedef unsigned long long uint64_t;*/// idndef:防止双重定义,使用define宏定义常量#ifndef MINDSPORE_CCSRC_MINDDATA_MINDRECORD_INCLUDE_SHARD_COLUMN_H_#define MINDSPORE_CCSRC_MINDDATA_MINDRECORD_INCLUDE_SHARD_COLUMN_H_// 导入系统自带头文件#include <memory>#include <string>#include <unordered_map>#include <utility>#include <vector>// 导入自定义的头文件#include "minddata/mindrecord/include/shard_header.h"namespace mindspore { // 双重嵌套命名空间namespace mindrecord {// 定义并赋值7个unsigned long long类型的常量const uint64_t kUnsignedOne = 1;const uint64_t kBitsOfByte = 8;const uint64_t kDataTypeBits = 2;const uint64_t kNumDataOfByte = 4;const uint64_t kBytesOfColumnLen = 4;const uint64_t kDataTypeBitMask = 3;const uint64_t kDataTypes = 6;// 定义枚举类型,并对部分值进行赋值enum IntegerType { kInt8Type = 0, kInt16Type, kInt32Type, kInt64Type };// 定义枚举类型enum ColumnCategory { ColumnInRaw, ColumnInBlob, ColumnNotFound };// 定义枚举类型,并对所有值进行赋值enum ColumnDataType {ColumnBytes = 0,ColumnString = 1,ColumnInt32 = 2,ColumnInt64 = 3,ColumnFloat32 = 4,ColumnFloat64 = 5,ColumnNoDataType = 6};// 定义 const unsigned int 类型的数组为ColumnDataTypeSize[kDataTypes]const uint32_t ColumnDataTypeSize[kDataTypes] = {1, 1, 4, 8, 4, 8};// 定义const vector<std::string>类型的列表const std::vector<std::string> ColumnDataTypeNameNormalized = {"uint8", "string", "int32","int64", "float32", "float64"};// 定义unordered_map容器,key是std::string类型, value是ColumnDataType类型(类似python的字典)const std::unordered_map<std::string, ColumnDataType> ColumnDataTypeMap = {{"bytes", ColumnBytes}, {"string", ColumnString}, {"int32", ColumnInt32},{"int64", ColumnInt64}, {"float32", ColumnFloat32}, {"float64", ColumnFloat64}};/*GNU C 的一大特色就是attribute 机制,防止一个函数在俩个动态链接库中调用混淆*/// 默认,设置为:default之后就可以让外面的类看见了class __attribute__((visibility("default"))) ShardColumn {public:explicit ShardColumn(const std::shared_ptr<ShardHeader> &shard_header, bool compress_integer = true);explicit ShardColumn(const json &schema_json, bool compress_integer = true);~ShardColumn() = default;// 通过列名获取列值MSRStatus GetColumnValueByName(const std::string &column_name, const std::vector<uint8_t> &columns_blob,const json &columns_json, const unsigned char **data,std::unique_ptr<unsigned char[]> *data_ptr, uint64_t *const n_bytes,ColumnDataType *column_data_type, uint64_t *column_data_type_size,std::vector<int64_t> *column_shape);// 压缩blobstd::vector<uint8_t> CompressBlob(const std::vector<uint8_t> &blob, int64_t *compression_size);// 检查 blob 是否压缩bool CheckCompressBlob() const { return has_compress_blob_; }// 获取num_blob_column_uint64_t GetNumBlobColumn() const { return num_blob_column_; }// 获取column_name_std::vector<std::string> GetColumnName() { return column_name_; }// 获取column_data_type_std::vector<ColumnDataType> GeColumnDataType() { return column_data_type_; }// 获取column_shape_std::vector<std::vector<int64_t>> GetColumnShape() { return column_shape_; }// 从 blob 中获取列值MSRStatus GetColumnFromBlob(const std::string &column_name, const std::vector<uint8_t> &columns_blob,const unsigned char **data, std::unique_ptr<unsigned char[]> *data_ptr,uint64_t *const n_bytes);// 获取列的类型std::pair<MSRStatus, ColumnCategory> GetColumnTypeByName(const std::string &column_name,ColumnDataType *column_data_type,uint64_t *column_data_type_size,std::vector<int64_t> *column_shape);// 从json中获取列值MSRStatus GetColumnFromJson(const std::string &column_name, const json &columns_json,std::unique_ptr<unsigned char[]> *data_ptr, uint64_t *n_bytes);private:// 初始化void Init(const json &schema_json, bool compress_integer = true);// 从json中获取float类型的值template <typename T>MSRStatus GetFloat(std::unique_ptr<unsigned char[]> *data_ptr, const json &json_column_value, bool use_double);// 从json中获取integer类型的值template <typename T>MSRStatus GetInt(std::unique_ptr<unsigned char[]> *data_ptr, const json &json_column_value);// 从 blob 中获取列偏移地址和大小MSRStatus GetColumnAddressInBlock(const uint64_t &column_id, const std::vector<uint8_t> &columns_blob,uint64_t *num_bytes, uint64_t *shift_idx);// 检查列名是否可用ColumnCategory CheckColumnName(const std::string &column_name);// 压缩整数列static vector<uint8_t> CompressInt(const vector<uint8_t> &src_bytes, const IntegerType &int_type);// 解压缩整数数组列template <typename T>static MSRStatus UncompressInt(const uint64_t &column_id, std::unique_ptr<unsigned char[]> *const data_ptr,const std::vector<uint8_t> &columns_blob, uint64_t *num_bytes, uint64_t shift_idx);// 将大端字节转换为无符号整数static uint64_t BytesBigToUInt64(const std::vector<uint8_t> &bytes_array, const uint64_t &pos,const IntegerType &i_type);// 将无符号整数转换为大端字节static std::vector<uint8_t> UIntToBytesBig(uint64_t value, const IntegerType &i_type);// 将 unsigned int 转换为 little-endian 字节static std::vector<uint8_t> UIntToBytesLittle(uint64_t value, const IntegerType &i_type);// 将 unsigned int 转换为 little-endian 字节static int64_t BytesLittleToMinIntType(const std::vector<uint8_t> &bytes_array, const uint64_t &pos,const IntegerType &src_i_type, IntegerType *dst_i_type = nullptr);private:std::vector<std::string> column_name_; // column name liststd::vector<ColumnDataType> column_data_type_; // column data type liststd::vector<std::vector<int64_t>> column_shape_; // column shape liststd::unordered_map<string, uint64_t> column_name_id_; // column name id mapstd::vector<std::string> blob_column_; // blob column liststd::unordered_map<std::string, uint64_t> blob_column_id_; // blob column name id mapbool has_compress_blob_; // if has compress blob(判断)uint64_t num_blob_column_; // number of blob columns};} // namespace mindrecord} // namespace mindspore#endif // MINDSPORE_CCSRC_MINDDATA_MINDRECORD_INCLUDE_SHARD_COLUMN_H_
  • [活动体验] 数据处理中的配置管理器
    ```C++ // 导入自定义的.h文件 #include "minddata/dataset/core/config_manager.h" // 导入头文件 #include #include #include #include #include #include // 如果宏定义ENABLE_ANDROID,则执行下方语句;如果没有,就执行else后方语句 // 无论如何,都需要执行endif的语句 #ifndef ENABLE_ANDROID #include "utils/log_adapter.h" #else #include "mindspore/lite/src/common/log_adapter.h" #endif #include "minddata/dataset/util/system_pool.h" // 双重命名空间 namespace mindspore { namespace dataset { ConfigManager::ConfigManager() : num_parallel_workers_(kCfgParallelWorkers), worker_connector_size_(kCfgWorkerConnectorSize), op_connector_size_(kCfgOpConnectorSize), rank_id_(kCfgDefaultRankId), seed_(kCfgDefaultSeed), numa_enable_(false), monitor_sampling_interval_(kCfgMonitorSamplingInterval), stop_profiler_(false), file_ready_(true), callback_timout_(kCfgCallbackTimeout), cache_host_(kCfgDefaultCacheHost), cache_port_(kCfgDefaultCachePort), num_connections_(kDftNumConnections), prefetch_size_(kDftPrefetchSize), auto_num_workers_(kDftAutoNumWorkers), num_cpu_threads_(std::thread::hardware_concurrency()), auto_num_workers_num_shards_(1), auto_worker_config_(0) { num_cpu_threads_ = num_cpu_threads_ > 0 ? num_cpu_threads_ : std::numeric_limits::max(); num_parallel_workers_ = num_parallel_workers_ num_cpu_threads_ ? num_parallel_workers_ : num_cpu_threads_; /* auto:用来声明自动变量 它是存储类型标识符,表明变量(自动)具有本地范围,块范围的变量声明(如for循环体内的变量声明)默认为auto存储类型。 其实大多普通声明方式声明的变量都是auto变量,他们不需要明确指定auto关键字,默认就是auto的了。 auto变量在离开作用域是会变程序自动释放,不会发生内存溢出情况(除了包含指针的类)。 使用auto变量的优势是不需要考虑去变量是否被释放,比较安全 */ auto env_cache_host = std::getenv("MS_CACHE_HOST"); auto env_cache_port = std::getenv("MS_CACHE_PORT"); // 如果指向为空 if (env_cache_host != nullptr) { cache_host_ = env_cache_host; } if (env_cache_port != nullptr) { char *end = nullptr; cache_port_ = strtol(env_cache_port, &end, 10); if (*end != '\0') { MS_LOG(WARNING) "Cache port from env variable MS_CACHE_PORT is invalid\n"; cache_port_ = 0; // 导致端口范围验证在验证检查期间生成错误 } } } //通常用于调试的打印方法 void ConfigManager::Print(std::ostream &out) const { // 不要显示测试/内部的。 这里只显示主要的。 // 仅供参考,boolalpha 告诉输出流为 bool 写入“true”和“false” out "\nClient config settings :" "\nParallelOp workers : " num_parallel_workers_ "\nParallelOp worker connector size : " worker_connector_size_ "\nSize of each Connector : " op_connector_size_ std::endl; } // 采用 nlohmann json 格式并填充设置的私有辅助函数 Status ConfigManager::FromJson(const nlohmann::json &j) { RETURN_IF_NOT_OK(set_num_parallel_workers(j.value("numParallelWorkers", num_parallel_workers_))); set_worker_connector_size(j.value("workerConnectorSize", worker_connector_size_)); set_op_connector_size(j.value("opConnectorSize", op_connector_size_)); set_seed(j.value("seed", seed_)); set_monitor_sampling_interval(j.value("monitorSamplingInterval", monitor_sampling_interval_)); set_cache_host(j.value("cacheHost", cache_host_)); set_cache_port(j.value("cachePort", cache_port_)); set_num_connections(j.value("numConnections", num_connections_)); set_prefetch_size(j.value("prefetchSize", prefetch_size_)); return Status::OK(); } // 使用默认设置加载一个 json 文件并填充所有设置 Status ConfigManager::LoadFile(const std::string &settingsFile) { Status rc; if (!Path(settingsFile).Exists()) { RETURN_STATUS_UNEXPECTED("File is not found."); } // 有些设置是强制性的,有些则不是(默认)。 如果一个设置 // 是可选的,如果文件中缺少配置,它将设置一个默认值。 // 抛出异常 try { std::ifstream in(settingsFile); nlohmann::json js; in >> js; rc = FromJson(js); } catch (const nlohmann::json::type_error &e) { std::ostringstream ss; // 客户端文件加载失败 ss "Client file failed to load:\n" e.what(); std::string err_msg = ss.str(); RETURN_STATUS_UNEXPECTED(err_msg); } catch (const std::exception &err) { // 客户端文件加载失败 RETURN_STATUS_UNEXPECTED("Client file failed to load."); } return rc; } // 设置函数 Status ConfigManager::set_num_parallel_workers(int32_t num_parallel_workers) { if (num_parallel_workers > num_cpu_threads_ || num_parallel_workers 1) { // 参数无效,num_parallel_workers 超出了 范围 std::string err_msg = "Invalid Parameter, num_parallel_workers exceeds the boundary between 1 and " + std::to_string(num_cpu_threads_) + ", as got " + std::to_string(num_parallel_workers) + "."; RETURN_STATUS_UNEXPECTED(err_msg); } num_parallel_workers_ = num_parallel_workers; return Status::OK(); } // 设置函数 void ConfigManager::set_worker_connector_size(int32_t connector_size) { worker_connector_size_ = connector_size; } // 设置函数 void ConfigManager::set_op_connector_size(int32_t connector_size) { op_connector_size_ = connector_size; } uint32_t ConfigManager::seed() const { return seed_; } void ConfigManager::set_rank_id(int32_t rank_id) { if (rank_id_ == kCfgDefaultRankId) rank_id_ = rank_id; } // 设置函数 void ConfigManager::set_numa_enable(bool numa_enable) { numa_enable_ = numa_enable; } void ConfigManager::set_seed(uint32_t seed) { seed_ = seed; } void ConfigManager::set_monitor_sampling_interval(uint32_t interval) { monitor_sampling_interval_ = interval; } void ConfigManager::stop_dataset_profiler(bool stop_profiler) { stop_profiler_ = stop_profiler; } void ConfigManager::set_profiler_file_status(bool file_ready) { file_ready_ = file_ready; } void ConfigManager::set_callback_timeout(uint32_t timeout) { callback_timout_ = timeout; } void ConfigManager::set_cache_host(std::string cache_host) { cache_host_ = std::move(cache_host); } void ConfigManager::set_cache_port(int32_t cache_port) { cache_port_ = cache_port; } void ConfigManager::set_num_connections(int32_t num_connections) { num_connections_ = num_connections; } void ConfigManager::set_prefetch_size(int32_t prefetch_size) { prefetch_size_ = prefetch_size; } } // 命名空间 dataset } // 命名空间 mindspore ```
  • [活动体验] 数据处理中的分片段
    ```C++ // 导入自定义的.h文件 #include "minddata/mindrecord/include/shard_segment.h" #include "utils/ms_utils.h" #include "./securec.h" #include "minddata/mindrecord/include/common/shard_utils.h" #include "pybind11/pybind11.h" // 导入mindspore框架 using mindspore::LogStream; using mindspore::ExceptionType::NoExceptionType; using mindspore::MsLogLevel::ERROR; using mindspore::MsLogLevel::INFO; // 双重命名空间 namespace mindspore { namespace mindrecord { ShardSegment::ShardSegment() { SetAllInIndex(false); } std::pair> ShardSegment::GetCategoryFields() { // 如果已填充则跳过 if (!candidate_category_fields_.empty()) return {SUCCESS, candidate_category_fields_}; std::string sql = "PRAGMA table_info(INDEXES);"; // sql语句 std::vector> field_names; // 指针变量定义并初始化 char *errmsg = nullptr; int rc = sqlite3_exec(database_paths_[0], common::SafeCStr(sql), SelectCallback, &field_names, &errmsg); // 判断sql的选择语句是否错误 if (rc != SQLITE_OK) { MS_LOG(ERROR) "Error in select statement, sql: " sql ", error: " errmsg; // 函数调用 sqlite3_free(errmsg); sqlite3_close(database_paths_[0]); database_paths_[0] = nullptr; return {FAILED, vector{}}; } else { // 获取索引中的记录 MS_LOG(INFO) "Get " static_cast(field_names.size()) " records from index."; } /* u:代表 unsigned 即无符号,即定义的变量不能为负数; int:代表类型为 int 整形; 32:代表四个字节,即为 int 类型; _t:代表用 typedef 定义的; uint32_t整体代表:用 typedef 定义的无符号 int 型宏定义; */ uint32_t idx = kStartFieldId; while (idx field_names.size()) { if (field_names[idx].size() 2) { // 函数调用 sqlite3_free(errmsg); sqlite3_close(database_paths_[0]); database_paths_[0] = nullptr; return {FAILED, vector{}}; } candidate_category_fields_.push_back(field_names[idx][1]); // 尾部插入 idx += 2; } sqlite3_free(errmsg); return {SUCCESS, candidate_category_fields_}; } MSRStatus ShardSegment::SetCategoryField(std::string category_field) { // 是否获取候选类别字段失败 if (GetCategoryFields().first != SUCCESS) { MS_LOG(ERROR) "Get candidate category field failed"; return FAILED; } category_field = category_field + "_0"; // 在category_field后加一个0 if (std::any_of(std::begin(candidate_category_fields_), std::end(candidate_category_fields_), [category_field](std::string x) { return x == category_field; })) { current_category_field_ = category_field; return SUCCESS; } // 字段不是候选类别字段 MS_LOG(ERROR) "Field " category_field " is not a candidate category field."; return FAILED; } std::pair ShardSegment::ReadCategoryInfo() { // 阅读类别开始 MS_LOG(INFO) "Read category begin"; auto ret = WrapCategoryInfo(); if (ret.first != SUCCESS) { // 获取类别信息失败 MS_LOG(ERROR) "Get category info failed"; return {FAILED, ""}; } // 将类别信息转换为 json 字符串 auto category_json_string = ToJsonForCategory(ret.second); // 阅读类别结束 MS_LOG(INFO) "Read category end"; return {SUCCESS, category_json_string}; } std::pair>> ShardSegment::WrapCategoryInfo() { std::map counter; // sql语句 std::string sql = "SELECT " + current_category_field_ + ", COUNT(" + current_category_field_ + ") AS `value_occurrence` FROM indexes GROUP BY " + current_category_field_ + ";"; for (auto &db : database_paths_) { std::vector> field_count; char *errmsg = nullptr; int rc = sqlite3_exec(db, common::SafeCStr(sql), SelectCallback, &field_count, &errmsg); if (rc != SQLITE_OK) { // 选择语句错误 MS_LOG(ERROR) "Error in select statement, sql: " sql ", error: " errmsg; sqlite3_free(errmsg); sqlite3_close(db); db = nullptr; return {FAILED, std::vector>()}; } else { // 获取索引中的记录 MS_LOG(INFO) "Get " static_cast(field_count.size()) " records from index."; } for (const auto &field : field_count) { counter[field[0]] += std::stoi(field[1]); // 累加 } sqlite3_free(errmsg); } int idx = 0; std::vector> category_vec(counter.size()); // 函数调用 (void)std::transform(counter.begin(), counter.end(), category_vec.begin(), [&idx](std::tuple item) { return std::make_tuple(idx++, std::get0>(item), std::get1>(item)); }); return {SUCCESS, std::move(category_vec)}; } std::string ShardSegment::ToJsonForCategory(const std::vector> &tri_vec) { std::vector category_json_vec; // 轻量级json 字符串 for (auto q : tri_vec) { json j; // 结构体赋值 j["id"] = std::get0>(q); j["name"] = std::get1>(q); j["count"] = std::get2>(q); // 尾部插入赋值后的j category_json_vec.emplace_back(j); } // JSON是一种超轻量级的数据交换格式 json j_vec(category_json_vec); json category_info; category_info["key"] = current_category_field_; category_info["categories"] = j_vec; // json模块是用来编码和解码json数据的 return category_info.dump(); } std::pair>> ShardSegment::ReadAtPageById(int64_t category_id, int64_t page_no, int64_t n_rows_of_page) { auto ret = WrapCategoryInfo(); if (ret.first != SUCCESS) { // 获取类别信息 MS_LOG(ERROR) "Get category info"; return {FAILED, std::vector>{}}; } if (category_id >= static_cast(ret.second.size()) || category_id 0) { // 非法类别 ID MS_LOG(ERROR) "Illegal category id, id: " category_id; return {FAILED, std::vector>{}}; } int total_rows_in_category = std::get2>(ret.second[category_id]); // 如果未找到类别或页码超出范围则退出 if (total_rows_in_category = 0 || page_no 0 || n_rows_of_page = 0 || page_no * n_rows_of_page >= total_rows_in_category) { // 确定页码 MS_LOG(ERROR) "Illegal page no / page size, page no: " page_no ", page size: " n_rows_of_page; return {FAILED, std::vector>{}}; } // typedef unsigned char:uint8_t std::vector> page; auto row_group_summary = ReadRowGroupSummary(); // typedef unsigned long int:uint64_t uint64_t i_start = page_no * n_rows_of_page; uint64_t i_end = std::min(static_cast(total_rows_in_category), (page_no + 1) * n_rows_of_page); uint64_t idx = 0; // 常量指针 for (const auto &rg : row_group_summary) { if (idx >= i_end) break; auto shard_id = std::get0>(rg); auto group_id = std::get1>(rg); auto details = ReadRowGroupCriteria( // make_pair内部调用pair构造函数,把模板参数通过std::forward转发 group_id, shard_id, std::make_pair(CleanUp(current_category_field_), std::get1>(ret.second[category_id]))); if (SUCCESS != std::get0>(details)) { return {FAILED, std::vector>{}}; } // auto是C语言的一个关键字,关键字主要用于声明变量的生存期为自动, // 即将不在任何类、结构、枚举、联合和函数中定义的变量视为全局变量,而在函数中定义的变量视为局部变量 auto offsets = std::get4>(details); uint64_t number_of_rows = offsets.size(); if (idx + number_of_rows i_start) { idx += number_of_rows; // 累加 continue; } for (uint64_t i = 0; i number_of_rows; ++i, ++idx) { if (idx >= i_start && idx i_end) { // 判断 auto ret1 = PackImages(group_id, shard_id, offsets[i]); // C语言关键词 if (SUCCESS != ret1.first) { // 判断 return {FAILED, std::vector>{}}; } // 尾部插入 page.push_back(std::move(ret1.second)); } } } return {SUCCESS, std::move(page)}; } std::pair> ShardSegment::PackImages(int group_id, int shard_id, std::vector offset) { const auto &ret = shard_header_->GetPageByGroupId(group_id, shard_id); // 常量指针 if (SUCCESS != ret.first) { return {FAILED, std::vector()}; } // shared_ptr在boost中地位相当重要,其行为接近原始指针,但又比指针更加安全,甚至还能提供基本的线程安全保证。 // 它基本上解决了在使用c++开发过程中不可避免的使用指针而遇到的许多问题, // 常见的毫无疑问是内存泄漏和内存的提前释放,还有一些关于指针内存申请而产生的异常问题等 const std::shared_ptr &blob_page = ret.second; // 打包图片列表 std::vector images(offset[1] - offset[0]); auto file_offset = header_size_ + page_size_ * (blob_page->GetPageID()) + offset[0]; auto &io_seekg = file_streams_random_[0][shard_id]->seekg(file_offset, std::ios::beg); if (!io_seekg.good() || io_seekg.fail() || io_seekg.bad()) { // 文件搜索失败 MS_LOG(ERROR) "File seekg failed"; file_streams_random_[0][shard_id]->close(); // 关闭空间 return {FAILED, {}}; } auto &io_read = file_streams_random_[0][shard_id]->read(reinterpret_cast(&images[0]), offset[1] - offset[0]); if (!io_read.good() || io_read.fail() || io_read.bad()) { MS_LOG(ERROR) "File read failed"; // 文件读取失败 file_streams_random_[0][shard_id]->close(); return {FAILED, {}}; } return {SUCCESS, std::move(images)}; } std::pair>> ShardSegment::ReadAtPageByName(std::string category_name, int64_t page_no, int64_t n_rows_of_page) { auto ret = WrapCategoryInfo(); if (ret.first != SUCCESS) { MS_LOG(ERROR) "Get category info"; // 获取类别信息 return {FAILED, std::vector>{}}; } for (const auto &categories : ret.second) { if (std::get1>(categories) == category_name) { auto result = ReadAtPageById(std::get0>(categories), page_no, n_rows_of_page); return result; } } return {FAILED, std::vector>()}; } std::pair, json>>> ShardSegment::ReadAllAtPageById( int64_t category_id, int64_t page_no, int64_t n_rows_of_page) { auto ret = WrapCategoryInfo(); if (ret.first != SUCCESS || category_id >= static_cast(ret.second.size())) { MS_LOG(ERROR) "Illegal category id, id: " category_id; // 非法类别 ID return {FAILED, std::vector, json>>{}}; } int total_rows_in_category = std::get2>(ret.second[category_id]); // 如果未找到类别或页码超出范围则退出 if (total_rows_in_category = 0 || page_no 0 || page_no * n_rows_of_page >= total_rows_in_category) { MS_LOG(ERROR) "Illegal page no: " page_no ", page size: " n_rows_of_page; // 非法页数 return {FAILED, std::vector, json>>{}}; } std::vector, json>> page; auto row_group_summary = ReadRowGroupSummary(); int i_start = page_no * n_rows_of_page; int i_end = std::min(static_cast(total_rows_in_category), (page_no + 1) * n_rows_of_page); int idx = 0; for (const auto &rg : row_group_summary) { if (idx >= i_end) break; auto shard_id = std::get0>(rg); auto group_id = std::get1>(rg); auto details = ReadRowGroupCriteria( group_id, shard_id, std::make_pair(CleanUp(current_category_field_), std::get1>(ret.second[category_id]))); if (SUCCESS != std::get0>(details)) { return {FAILED, std::vector, json>>{}}; } auto offsets = std::get4>(details); auto labels = std::get5>(details); int number_of_rows = offsets.size(); if (idx + number_of_rows i_start) { idx += number_of_rows; continue; } if (number_of_rows > static_cast(labels.size())) { MS_LOG(ERROR) "Illegal row number of page: " number_of_rows; // 页的非法行数 return {FAILED, std::vector, json>>{}}; } for (int i = 0; i number_of_rows; ++i, ++idx) { if (idx >= i_start && idx i_end) { auto ret1 = PackImages(group_id, shard_id, offsets[i]); if (SUCCESS != ret1.first) { return {FAILED, std::vector, json>>{}}; } page.emplace_back(std::move(ret1.second), std::move(labels[i])); } } } return {SUCCESS, std::move(page)}; } std::pair, json>>> ShardSegment::ReadAllAtPageByName( std::string category_name, int64_t page_no, int64_t n_rows_of_page) { auto ret = WrapCategoryInfo(); if (ret.first != SUCCESS) { MS_LOG(ERROR) "Get category info"; // 获取类别信息 return {FAILED, std::vector, json>>{}}; } // category_name 到 category_id int64_t category_id = -1; for (const auto &categories : ret.second) { std::string categories_name = std::get1>(categories); if (categories_name == category_name) { category_id = std::get0>(categories); break; } } if (category_id == -1) { return {FAILED, std::vector, json>>{}}; } return ReadAllAtPageById(category_id, page_no, n_rows_of_page); } std::pair, pybind11::object>>> ShardSegment::ReadAtPageByIdPy( int64_t category_id, int64_t page_no, int64_t n_rows_of_page) { auto res = ReadAllAtPageById(category_id, page_no, n_rows_of_page); if (res.first != SUCCESS) { return {FAILED, std::vector, pybind11::object>>{}}; } vector, pybind11::object>> json_data; std::transform(res.second.begin(), res.second.end(), std::back_inserter(json_data), [](const std::tuple, json> &item) { auto &j = std::get1>(item); pybind11::object obj = nlohmann::detail::FromJsonImpl(j); return std::make_tuple(std::get0>(item), std::move(obj)); }); return {SUCCESS, std::move(json_data)}; } std::pair, pybind11::object>>> ShardSegment::ReadAtPageByNamePy( std::string category_name, int64_t page_no, int64_t n_rows_of_page) { auto res = ReadAllAtPageByName(category_name, page_no, n_rows_of_page); if (res.first != SUCCESS) { return {FAILED, std::vector, pybind11::object>>{}}; } vector, pybind11::object>> json_data; std::transform(res.second.begin(), res.second.end(), std::back_inserter(json_data), [](const std::tuple, json> &item) { auto &j = std::get1>(item); pybind11::object obj = nlohmann::detail::FromJsonImpl(j); return std::make_tuple(std::get0>(item), std::move(obj)); }); return {SUCCESS, std::move(json_data)}; } std::pair> ShardSegment::GetBlobFields() { std::vector blob_fields; for (auto &p : GetShardHeader()->GetSchemas()) { // 假设一个模式 const auto &fields = p->GetBlobFields(); blob_fields.assign(fields.begin(), fields.end()); break; } return std::make_pair(kCV, blob_fields); } std::string ShardSegment::CleanUp(std::string field_name) { while (field_name.back() >= '0' && field_name.back() = '9') field_name.pop_back(); field_name.pop_back(); return field_name; } } // 命名空间 mindrecord } // 命名空间 mindspore ```
  • [基础知识] dataset / engine / serializer_deserialize.py 代码解读
    - 本程序提供了对数据集类`dataset`进行序列化以及你序列化的相关函数,主要是通过类的自带的方法实现。 ```python # Copyright 2019-2021 Huawei Technologies Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """ Functions to support dataset serialize and deserialize. """ import json import os from mindspore import log as logger from . import datasets as de def serialize(dataset, json_filepath=""): """ Serialize dataset pipeline into a JSON file. Note: Currently some Python objects are not supported to be serialized. For Python function serialization of map operator, de.serialize will only return its function name. Args: dataset (Dataset): The starting node. json_filepath (str): The filepath where a serialized JSON file will be generated. Returns: Dict, The dictionary contains the serialized dataset graph. Raises: OSError: Can not open a file Examples: >>> dataset = ds.MnistDataset(mnist_dataset_dir, 100) >>> one_hot_encode = c_transforms.OneHot(10) # num_classes is input argument >>> dataset = dataset.map(operation=one_hot_encode, input_column_names="label") >>> dataset = dataset.batch(batch_size=10, drop_remainder=True) >>> # serialize it to JSON file >>> ds.engine.serialize(dataset, json_filepath="/path/to/mnist_dataset_pipeline.json") >>> serialized_data = ds.engine.serialize(dataset) # serialize it to Python dict """ return dataset.to_json(json_filepath) # 调用dataset类的to_json方法将结果序列化 def deserialize(input_dict=None, json_filepath=None): """ Construct dataset pipeline from a JSON file produced by de.serialize(). Note: Currently Python function deserialization of map operator are not supported. Args: input_dict (dict): A Python dictionary containing a serialized dataset graph. json_filepath (str): A path to the JSON file. Returns: de.Dataset or None if error occurs. Raises: OSError: Can not open the JSON file. Examples: >>> dataset = ds.MnistDataset(mnist_dataset_dir, 100) >>> one_hot_encode = c_transforms.OneHot(10) # num_classes is input argument >>> dataset = dataset.map(operation=one_hot_encode, input_column_names="label") >>> dataset = dataset.batch(batch_size=10, drop_remainder=True) >>> # Use case 1: to/from JSON file >>> ds.engine.serialize(dataset, json_filepath="/path/to/mnist_dataset_pipeline.json") >>> dataset = ds.engine.deserialize(json_filepath="/path/to/mnist_dataset_pipeline.json") >>> # Use case 2: to/from Python dictionary >>> serialized_data = ds.engine.serialize(dataset) >>> dataset = ds.engine.deserialize(input_dict=serialized_data) """ data = None if input_dict: data = de.DeserializedDataset(input_dict) # 调用dataset类的DeserializedDataset方法将输入逆序列化,获得数据集类型 if json_filepath: data = de.DeserializedDataset(json_filepath) return data def expand_path(node_repr, key, val): """Convert relative to absolute path.""" # 用于相对路径与绝对路径的转换 # 根据传入的是否为列表进行相应处理 if isinstance(val, list): node_repr[key] = [os.path.abspath(file) for file in val] else: node_repr[key] = os.path.abspath(val) def show(dataset, indentation=2): """ Write the dataset pipeline graph to logger.info file. Args: dataset (Dataset): The starting node. indentation (int, optional): The indentation used by the JSON print. Do not indent if indentation is None. Examples: >>> dataset = ds.MnistDataset(mnist_dataset_dir, 100) >>> one_hot_encode = c_transforms.OneHot(10) >>> dataset = dataset.map(operation=one_hot_encode, input_column_names="label") >>> dataset = dataset.batch(batch_size=10, drop_remainder=True) >>> ds.show(dataset) """ pipeline = dataset.to_json() # 将数据集中的流水线进行json转换 logger.info(json.dumps(pipeline, indent=indentation)) def compare(pipeline1, pipeline2): """ Compare if two dataset pipelines are the same. Args: pipeline1 (Dataset): a dataset pipeline. pipeline2 (Dataset): a dataset pipeline. Returns: Whether pipeline1 is equal to pipeline2. Examples: >>> pipeline1 = ds.MnistDataset(mnist_dataset_dir, 100) >>> pipeline2 = ds.Cifar10Dataset(cifar_dataset_dir, 100) >>> ds.compare(pipeline1, pipeline2) """ # 以json的形式比较量数据集流水线是否相同 return pipeline1.to_json() == pipeline2.to_json() ```
  • [问题求助] 我最近一直在做数据集成,主要方向是物联网json数据对接数据库
    我最近一直在做数据集成,主要方向是物联网json数据对接数据库我希望知道 ,华为的物联网 展示平台 更 适合用 哪个数据库,mysql   oracle   DB等 
  • [技术干货] python json数据与字典数据转换
    python中JSON字符串与字典数据的互相转换:一. json 转换为字典1. 使用json模块的loads函数,该函数通过参数传入json字符串,然后返回与该字符串对应的字典。2. 使用eval函数将json格式字符串当做普通的Python代码执行,eval函数会直接返回与json格式字符串对应的字典。二.字典转换为json1.使用json模块的dumps函数,该函数通过参数传入字典,然后返回与改字典对应的json格式字符串。首先会将名为data的字典转换为字符串,然后将json字符串s通过eval函数转换为字典最后从products.json文件中读取json字符串,并使用loads函数和eval函数两种方法将json字符串转换为字典。vi demo.py#练习从字典转换为jsonimport jsondata = {        'name':'Bill',    'company':'Microsoft',    'age':34    }# 将字典转换为json字符串jsonstr = json.dumps(data)print(type(jsonstr))print(jsonstr)运行结果:<class 'str'>{"name": "Bill", "company": "Microsoft", "age": 34}# 将json字符串转换为字典data = json.loads(jsonstr)print(type(data))print(data)运行结果:<class 'dict'>{'name': 'Bill', 'company': 'Microsoft', 'age': 34}# 定义一个json字符串,使用eval函数将json字符串转换为字典s = '''{    'name':'Bill',    'company':'Microsoft',    'age':20}'''data = eval(s)print(type(data))print(data)print(data['company'])运行结果:<class 'dict'>{'name': 'Bill', 'company': 'Microsoft', 'age': 20}Microsoft#读一个json格式文件,将json数据用两种方法转换为字典列表f = open('p.json','r',encoding='utf-8')jsonStr = f.read()json1 = eval(jsonStr)json2 = json.loads(jsonStr)print(json1)print(type(json1))print(json2)print(type(json2))print(json2[1]['name'])运行结果:[{'name': 'iPhone13', 'price': 9999.9, 'count': 2000}, {'name': '吉利', 'price': 1000000, 'count': 123}]<class 'list'>[{'name': 'iPhone13', 'price': 9999.9, 'count': 2000}, {'name': '吉利', 'price': 1000000, 'count': 123}]<class 'list'>吉利user@cyyxzuk0hs7mxjv-machine:~/ghj-1635423305223$ 
  • [算子开发] CropAndResizeGradImage的st测试
    【功能模块】【操作步骤&问题现象】1、CropAndResizeGradImage的st测试。由于输出的shape是根据第四个输入的张量一致,于是在输入的第4个张量使用“value”字段。在运行cpu算子时出现图二所示的错误,之后又用相同的json文件运行tf算子也出现了同样的错误,可能时json文件出现了毛病。在看算子的原型定义时发现有一个REQUIRED_ATTR的属性(图1所示),但在官方的文档中并未找到如何在json文件中配置此属性。具体错误原因还请专家指出,感谢!2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [技术干货] 基于C语言的json数据映射解析库CSON
    基于C语言的json数据映射解析库CSON基于C语言的json数据映射解析库CSON摘要:1:模型解析工具CSON2:打开KEIL,新建工程3:添加CSON文件到工程并编译4:具体实例应用4.1:声明结构体4.2:定义数据模型4.3:使用CSON来进行解析JSON格式摘要:在物联网通讯中,由于经常要跟网络服务数据通讯,在这个过程里面,必须理解JSON格式,JSON(JavaScript Object Notation, JS 对象简谱) 是一种轻量级的数据交换格式。我们把数据JSON封装好之后去做,然后再上传,同时,我们通过上位机或者web进行下发控制的时候,也是以JSON格式来处理,下面讲下基于C语言的json数据映射解析库CSON。在STM32F1最小系统板上来实现处理。1:模型解析工具CSONcJSON,运行于C语言平台的json-struct的解析库,这个解析库在github上有开源,地址为:https://github.com/NevermindZZT/cson2:打开KEIL,新建工程3:添加CSON文件到工程并编译将github上下载的两个文件添加到keil工程里面。然后编译,查看编译结果。4:具体实例应用引用GIthub上作者原话, CSON是一个简单的cJSON的二次封装,相比于使用原生cJSON一层一层解析的方式,CSON采用模型映射的方式,使用模型将结构体的特征进行描述,然后根据模型,将json数据直接解析成结构体,免去使用原生cJSON需要多次调用API的复杂性,可以很大程度减少代码冗余,增加代码逻辑性。所以你会在工程里面看到CJSON文件,下面进行实际操作下。4.1:声明结构体4.2:定义数据模型4.3:使用CSON来进行解析JSON格式json格式示例:测试函数在函数调用之前,还需要进行初始化。查看串口调试助手信息:json格式已经解析,这个模块的功能还有很多,也可以编码结构体,将结构体对象序列化成json字符串,大家可以自己去实现处理。是不是很方便,大家可以具体去实现!
  • [技术干货] [studio 2.16] Studio中很多时候都会用到Json与Dict类型的转化
    主要用到 json.loads(str_json), json.load(f_read_json), json.dumps(dict),json.dump(f_write_dict)这四个函数。 1.函数:loads()是将str类型的json数据转化为dict类型的数据,load()是将json文件读出来的str类型的json数据转化为dict类型的数据,dumps()是将dict类型的数据转化为str类型的json数据,dump()是将dict类型的数据写入json文件当中。2.参数:Str_json :str类型的json数据,f_read_json:能json文件读出来的函数名命名,dict:dict类型的数据,f_write_dict:能在json文件写入的函数名命名。其中 f_read_json、f_write_dict :类似与如下的f:下图可更清晰看出其函数的作用:Demo.py Json_load.json:Json_dump.json:运行结果:
  • [赋能学习] 华为FusionInsight MRS FlinkSQL 复杂嵌套Json解析最佳实践
    # 华为FusionInsight MRS FlinkSQL 复杂嵌套Json解析最佳实践 ## 背景说明 随着流计算的发展,挑战不再仅限于数据量和计算量,业务变得越来越复杂,开发者可能是资深的大数据从业者、初学 Java 的爱好者,或是不懂代码的数据分析者。如何提高开发者的效率,降低流计算的门槛,对推广实时计算非常重要。 SQL 是数据处理中使用最广泛的语言,它允许用户简明扼要地展示其业务逻辑。Flink 作为流批一体的计算引擎,致力于提供一套 SQL 支持全部应用场景,Flink SQL 的实现也完全遵循 ANSI SQL 标准。之前,用户可能需要编写上百行业务代码,使用 SQL 后,可能只需要几行 SQL 就可以轻松搞定。 本文介绍如何使用华为FusionInsight MRS FlinkServer服务进行界面化的FlinkSQL编辑,从而处理复杂的嵌套Json格式 ## Json内容 下面以cdl新增数据的json为例 ``` { "schema":{ "type":"struct", "fields":[ { "type":"string", "optional":false, "field":"DATA_STORE" }, { "type":"string", "optional":false, "field":"SEG_OWNER" }, { "type":"string", "optional":false, "field":"TABLE_NAME" }, { "type":"int64", "optional":false, "name":"org.apache.kafka.connect.data.Timestamp", "version":1, "field":"TIMESTAMP" }, { "type":"string", "optional":false, "field":"OPERATION" }, { "type":"string", "optional":true, "field":"LOB_COLUMNS" }, { "type":"struct", "fields":[ { "type":"array", "items":{ "type":"struct", "fields":[ { "type":"string", "optional":false, "field":"name" }, { "type":"string", "optional":true, "field":"value" } ], "optional":false }, "optional":false, "field":"properties" } ], "optional":false, "name":"transaction", "field":"transaction" }, { "type":"struct", "fields":[ { "type":"int64", "optional":false, "field":"uid" } ], "optional":true, "name":"unique", "field":"unique" }, { "type":"struct", "fields":[ { "type":"int64", "optional":false, "field":"uid" }, { "type":"string", "optional":true, "default":"", "field":"uname" }, { "type":"int64", "optional":true, "field":"age" }, { "type":"string", "optional":true, "field":"sex" }, { "type":"string", "optional":true, "field":"mostlike" }, { "type":"string", "optional":true, "field":"lastview" }, { "type":"int64", "optional":true, "field":"totalcost" } ], "optional":true, "name":"data", "field":"data" }, { "type":"struct", "fields":[ ], "optional":true, "name":"EMPTY", "field":"before" }, { "type":"string", "optional":true, "field":"HEARTBEAT_IDENTIFIER" } ], "optional":false, "name":"hudi.hudisource" }, "payload":{ "DATA_STORE":"MYSQL", "SEG_OWNER":"hudi", "TABLE_NAME":"hudisource", "TIMESTAMP":1631070742000, "OPERATION":"INSERT", "LOB_COLUMNS":"", "transaction":{ "properties":[ { "name":"file", "value":"mysql-bin.000005" }, { "name":"pos", "value":"32307" }, { "name":"gtid", "value":"" } ] }, "unique":{ "uid":11 }, "data":{ "uid":11, "uname":"蒋语堂", "age":38, "sex":"女", "mostlike":"图", "lastview":"播放器", "totalcost":28732 }, "before":null, "HEARTBEAT_IDENTIFIER":"998d66cc-1405-40e2-bbdc-41f2adf40724" } } ``` 上面的数据信息为复杂的json嵌套结构,包含了 Map、Array、Row 等类型, 对于这样的复杂格式需要有一种高效的方式进行解析,下面介绍如何实现。 ## 华为FusionInsight MRS Flink WebUI介绍 Flink WebUI提供基于Web的可视化开发平台,用户只需要编写SQL即可开发作业,极大降低作业开发门槛。同时通过作业平台能力开放,支持业务人员自行编写SQL开发作业来快速应对需求,大大减少Flink作业开发工作量。 Flink WebUI主要有以下特点: - 企业级可视化运维:运维管理界面化、作业监控、作业开发Flink SQL标准化等。 - 快速建立集群连接:通过集群连接功能配置访问一个集群,需要客户端配置、用户认证密钥文件。 - 快速建立数据连接:通过数据连接功能配置访问一个组件。创建“数据连接类型”为“HDFS”类型时需创建集群连接,其他数据连接类型的“认证类型”为“KERBEROS”需创建集群连接,“认证类型”为“SIMPLE”不需创建集群连接。 - 可视化开发平台:支持自定义输入/输出映射表,满足不同输入来源、不同输出目标端的需求。 - 图形化作业管理:简单易用。 下面介绍如何使用Flink WebUI开发FlinkSQL DDL语句解析出有效信息 ### 操作步骤 - 登录华为FusionInisght MRS Flink WebUI ![20210908_115504_73.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202109/08/143304gewyk4tfntn0qbnv.png) - 在作业管理选择新建作业创建一个FlinkSQL任务 ![20210908_115556_96.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202109/08/143327rkmbxwdvkb9h3qpw.png) - 编辑Flink SQL语句 SQL说明:创建两张kafka流表,起作用为从kafka源端读取cdl对应topic,解析出需要的字段。并将结果写入另外一个kafka topic 1. Json 中的每个 {} 都需要用 Row 类型来表示 2. Json 中的每个 [] 都需要用 Arrary 类型来表示 3. 数组的下标是从 1 开始的不是 0 如下面 SQL 中的 \`schema\`.\`fields\`[1].type 4. 关键字在任何地方都需要加反引号 如上面 SQL 中的 \`type\` 5. select 语句中的字段类型和顺序一定要和结果表的字段类型和顺序保持一致 6. 可使用flink函数比如LOCALTIMESTAMP为获取flink系统时间 ![20210908_141252_72.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202109/08/143416bvvocyi0issczzbt.png) ``` CREATE TABLE huditableout_source( `schema` ROW `fields` ARRAY ROW> >, payload ROW `TIMESTAMP` BIGINT, `data` ROW uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT> >, type1 as `schema`.`fields`[1].type, optional1 as `schema`.`fields`[1].optional, field1 as `schema`.`fields`[1].field, type2 as `schema`.`fields`[2].type, optional2 as `schema`.`fields`[2].optional, field2 as `schema`.`fields`[2].field, ts as payload.`TIMESTAMP`, uid as payload.`data`.uid, uname as payload.`data`.uname, age as payload.`data`.age, sex as payload.`data`.sex, mostlike as payload.`data`.mostlike, lastview as payload.`data`.lastview, totalcost as payload.`data`.totalcost, localts as LOCALTIMESTAMP ) WITH( 'connector' = 'kafka', 'topic' = 'huditableout', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' ); CREATE TABLE huditableout( type1 VARCHAR(32), optional1 BOOLEAN, field1 VARCHAR(32), type2 VARCHAR(32), optional2 BOOLEAN, field2 VARCHAR(32), ts BIGINT, uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT, localts TIMESTAMP ) WITH( 'connector' = 'kafka', 'topic' = 'huditableout2', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' ); insert into huditableout select type1, optional1, field1, type2, optional2, field2, ts, uid, uname, age, sex, mostlike, lastview, totalcost, localts from huditableout_source; ``` - 点击语义校验,确保语义校验通过 ![20210908_142109_23.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202109/08/143441tzcttezpo9y9tc5i.png) - 启动该Flink SQL任务 ![20210908_142205_53.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202109/08/143503v7firh7gsfdvapen.png) - 检查结果 源端kafka 数据 ![20210908_142329_14.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202109/08/143526b6zmjpwltlow0xd3.png) 目标端kafka 数据 ![20210908_142409_93.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202109/08/143550svt7xujrsryihybu.png)
  • [技术干货] jackson使用@jsonserialize格式
    一、 问题最近开发中使用BigDecimal这个数据类型 返回json数据时出现了点问题:# 1.前端第一次保存的时候 穿过来的数据格式 240.00240.77# 2. mysql数据库存储的数据格式(数据库字段已经设置了保留小数点后两位)240240.77# 3. java程序中查看从数据库中查询的回来的数据格式:240.00240.77# 4. 返回前端的json字符串里的数据格式:240240.77# 4. 前端想要的json字符串里的数据格式:240.00240.77由上面的一系列分析可知:要想解决这个根源在于返回json数据的时候需要将数据格式化。二、解决方案:使用@JsonSerialize输出数据保留两位小数步骤1.创建一个BigDecimal格式化工具import com.fasterxml.jackson.core.JsonGenerator;import com.fasterxml.jackson.databind.JsonSerializer;import com.fasterxml.jackson.databind.SerializerProvider; import java.io.IOException;import java.math.BigDecimal;public class BigDecimalSerialize extends JsonSerializer<BigDecimal> {  @Override  public void serialize(BigDecimal value, JsonGenerator gen, SerializerProvider serializerProvider) throws IOException {    if (value != null && !"".equals(value)) {      gen.writeString(value.setScale(2, BigDecimal.ROUND_HALF_DOWN) + "");    } else {      gen.writeString(value + "");    }  }}步骤二:在返回的实体类对应的属性上加上注解:@JsonSerialize(using = BigDecimalSerialize.class) private BigDecimal totalCost;总结这个方案可以统一解决json的Date日期类型,String类型。double类型。。。等等的序列化格式问题延伸:@JsonSerialize正确使用实际开发中,我们一定遇到过这样的问题:前端显示和后台存储数据单位不统一,而且各有各自的理由,统一不了,那就只能由后端转换。每次返回给前端时再转换一遍,返回给前端的json数据,在后端里定义的往往是一个对象,如何做到优雅的转换呢?只需两步操作:1. 写一个负责转换的类,里面写好规则public class MySerializerUtils extends JsonSerializer<Integer> {  @Override  public void serialize(Integer status, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException, JsonProcessingException {    String statusStr = "";     switch (status) {       case 0:         statusStr = "新建状态";         break;       case 1:        statusStr = "就绪状态";         break;       case 2:         statusStr = "运行状态";         break;       case 3:         statusStr = "阻塞和唤醒线程";         break;       case 4:        statusStr = " 死亡状态";        break;       default:         statusStr = "状态信息不符合";     }     jsonGenerator.writeString(statusStr);   } }2. 在实体类上需要装换的字段上加上注解/** * 多线程生命周期状态值 */@JsonSerialize(using = MySerializerUtils.class)private int status;注:@JsonSerialize注解,主要应用于数据转换,该注解作用在该属性的getter()方法上。
  • [推理经验] 使用Dump功能比对Ascend310和Ascend910推理过程中相同算子的输入和输出数据
    本文主要分享在Ascend310和Ascend910的推理过程使用Dump功能比对相同算子的输入和输出数据,帮助定位问题算子。一、使用该方法的前提条件和建议:1.训练好的模型在Ascend910上推理精度已达到预期要求;2.确保Ascend310上预处理图片结果是与Ascend910上处理结果是一致的;3.当满足上述两点时,开发Ascend310推理过程中若遇到Ascend310上的推理精度与Ascend910上的推理精度有较大偏差时可以尝试用该方法定位问题算子;4.建议使用一张相同的输入图片进行数据Dump。二、Ascend910上Dump图和数据:官网已给出训练过程中Dump功能操作流程:https://www.mindspore.cn/docs/programming_guide/zh-CN/r1.3/dump_in_graph_mode.html?推理过程使用Dump功能大同小异,下面对操作步骤进行总结:1.使用静态图模式;2.使用同步dump时json文件设置方式,需要将trans_flag设置为false;{ "common_dump_settings": { "dump_mode": 0, "path": "/absolute_path", "net_name": "ResNet50", "iteration": 0, "input_output": 0, "kernels": ["Default/Conv-op12"], "support_device": [0,1,2,3,4,5,6,7] }, "e2e_dump_settings": { "enable": true, "trans_flag": false }}3.配置环境变量:export MINDSPORE_DUMP_CONFIG=/path/to/data_dump.json4.随后运行推理脚本即可,Dump的数据和图会被保存在json文件设置的路径当中。Dump生成的数据文件是后缀名为.bin的文件,可以使用numpy.fromfile命令查看具体数据,需要注意的是要根据文件名指定输出数据类型和shape。np.fromfile('Default--decode-DetectionDecode--gather_topk-GatherTopK--Mod-op1608_input_0_shape_1_80_100_Int32_DefaultFormat.bin', dtype=np.int32).reshape((1, 80, 100))对于图的两个文件,后缀名分别是.pb和.ir文件,.pb文件可以使用MindInsight查看,.ir可以使用vi命令查看。三、Ascend310上Dump图和数据:dump的图和数据可在全部配置完成后再运行推理脚本一起生成。Dump图:1.设置以下环境变量后重新运行推理脚本即可,需要将之前编译的c++文件删除,此外商用版本不具备此功能:export DUMP_GE_GRAPH=22.运行推理脚本后,Dump下来的图会被保存在运行路径中,后缀名为.pbtxt文件,可安装netron包后使用如下命令查看,通常查看'ge_onnx_00000070_graph_0_Build.pbtxt'文件即可:netron.start('ge_onnx_00000070_graph_0_Build.pbtxt')Dump数据:1.获取Mindir文件的图名,获取方式如下:from mindspore.train._utils import read_protomodel = read_proto("mindir_path")with open('mindir.log', 'w+') as f: f.write(str(model))生成mindir.log文件后使用如下命令:grep -rn "name:" mindir.log |grep ":  name"输出类似图名:"4855_2425_1_construct_wrapper.101"图名一定要正确,否则无法成功dump数据。2.设置如下acl.json文件,注意json文件中的模型名需要在最后增加.0:{ "dump":{ "dump_list":[ { "model_name":"4855_2425_1_construct_wrapper.101.0" } ], "dump_path":"/absolute_path", "dump_mode":"all" }}3.修改main.cc文件:#include "acl/acl.h"aclInit("acl.json的绝对地址");  //加在main函数中如输入数据类型为fp32还需要在main.cc文件添加以下语句:ascend310->SetBufferOptimizeMode("off_optimize"); ascend310->SetPrecisionMode("allow_fp32_to_fp16");ascend310->SetOpSelectImplMode("high_precision");4.在CMakeList.txt文件添加如下语句:include_directories(/usr/local/Ascend/fwkacllib/include/                    ../inc)target_link_libraries(main ${MS_LIB} ${MD_LIB} ascendcl acl_dvpp acl_cblas gflags)5.运行推理脚本前,需要提前创建好json文件中设置的保存dump数据的文件夹。6.运行推理脚本后需要对dump下来的数据进行解析:首先需要找到run包中提供的msaccucmp.py文件,在根目录下使用如下命令:find ${run_path} -name "msaccucmp.py"找到后使用如下命令进行解析:python ${The absolute path of msaccucmp.py} convert -d {file path of dump} -out {file path of output}解析的文件保存格式为.npy文件,可以使用numpy.load命令直接查看,不需要像910中设置数据类型和shape。该过程在https://www.mindspore.cn/docs/programming_guide/zh-CN/r1.3/dump_in_graph_mode.html?中的异步Dump数据分析样例中有详细描述。四、Dump数据比对技巧:1.根据生成的图找到对应算子所dump下来的输入和输出数据;2.由于网络模型通常会比较大,可以采用区间查看的方法,找到Ascend310和Ascend910中第一次出现算子输入相同输出不同的算子。五、其他注意事项:1.310中conv2d算子不支持fp16,所以如果输入数据类型为fp32会前插cast将数据转为fp16,运算结束后再转回fp32;2.Exp算子的输出范围为fp16,所以nn.Sigmoid算子输出张量会出现大量相同输出数据,不会影响推理结果;3.目前ops.Mod算子在输入数据相同的情况下,在310和910上输出数据会有偏差,可用ops.FloorMod算子替代。
  • [实践系列] GaussDB(DWS)实践系列-函数实现JSON类型解析
    GaussDB(DWS)实践-函数实现JSON类型解析       在项目交付中会遇到针对JSON类型解析的场景,例如key值获取value,当前GaussDB(DWS)不支持(需求已规划),针对该场景可参考本文方法实现,实际使用过程中可按照注释内容按需调整。 函数定义:DROP FUNCTION IF EXISTS public.jsonpars;CREATE OR REPLACE FUNCTION public.jsonpars( p_str IN TEXT,key_name IN TEXT )RETURN TEXTIMMUTABLEASDECLARE    p_str_tmp TEXT := '' ; --定义TEXT类型变量:p_str_tmp,缓存读取的json文件去除'['和']'字符    len_p_str_tmp_1 INTEGER := 0 ; --定义INTEGER类型变量:len_p_str_tmp_1,缓存读取的json文件的长度    str_tmp_1 TEXT ; --定义TEXT类型变量:str_tmp_1,缓存读取的json文件replace之后的内容    len_p_str_tmp_2 INTEGER := 0 ; --定义INTEGER类型变量:len_p_str_tmp_2,缓存读取的replace后的json文件的长度    comma_position INTEGER := 0 ; --定义INTEGER类型变量:comma_position,缓存逗号在选定文本字符串中位置    symbol_position INTEGER := 1 ; --定义INTEGER类型变量:symbol_position,缓存指定字符开始匹配起始位置    lbrace_position INTEGER := 0 ; --定义INTEGER类型变量:lbrace_position,缓存左大括号在选定文本字符串中位置    rbrace_position INTEGER := 0 ; --定义INTEGER类型变量:rbrace_position,缓存右大括号在选定文本字符串中位置    str_tmp_key_value TEXT := '' ; --定义TEXT类型变量:str_tmp_key_value,缓存解析出的"key":"value"单一值    TYPE ARRSTR IS VARRAY(1024) OF TEXT ; --定义TEXT类型数组类型:ARRSTR    array_value ARRSTR := ARRSTR() ; --定义ARRSTR类型的数组:array_value,缓存解析出的"key":"value"的所有值    array_cnt INTEGER := 1 ; --定义INTEGER类型变量:array_cnt,缓存数组下标变量值,初始值为1,(GaussDB 200的数组下标值从1开始)    lbrace_ajust_num INTEGER := 0 ; --定义INTEGER类型变量:lbrace_ajust_num,缓存左大括号在选定文本字符串中个数    rbrace_ajust_num INTEGER := 0 ; --定义INTEGER类型变量:rbrace_ajust_num,缓存右大括号在选定文本字符串中个数    change_position INTEGER := 0 ; --定义INTEGER类型变量:change_position,缓存指定读取右括号次数    array_loop_cnt INTEGER := 1 ; --定义INTEGER类型变量:array_loop_cnt,缓存读取数组下标值    colon_position INTEGER := 0 ; --定义INTEGER类型变量:colon_position,缓存"key":"value"值中的:所在位置    array_key_value TEXT := '' ; --定义TEXT类型变量:array_key_value,缓存从数组中取出的key值    array_value_value TEXT := '' ; --定义TEXT类型变量:array_value_value,缓存从数组中取出的value值    return_value TEXT := '' ; --定义TEXT类型变量:return_value,缓存返回值BEGIN    p_str_tmp := replace( replace( cast( p_str as TEXT ) , '[{' , '{' ) , '}]' , '}' ) ;    len_p_str_tmp_1 := LENGTH( p_str_tmp ) ;    str_tmp_1 := SUBSTR( p_str_tmp , 2 , ( len_p_str_tmp_1 - 2 ) ) ;    len_p_str_tmp_2 := LENGTH( str_tmp_1 ) ;    IF ( len_p_str_tmp_2 > 0 ) THEN        WHILE ( comma_position < len_p_str_tmp_2 ) LOOP            comma_position := INSTR( str_tmp_1 , ',' , symbol_position ) ;            lbrace_position := INSTR( str_tmp_1 , '{' , symbol_position ) ;            rbrace_position := INSTR( str_tmp_1 , '}' , symbol_position ) ;            IF ( comma_position = 0 ) THEN                comma_position := len_p_str_tmp_2 ;                str_tmp_key_value := substr( str_tmp_1 , symbol_position , ( len_p_str_tmp_2 - symbol_position + 1 ) ) ;                array_value.EXTEND ;                array_value( array_cnt ) := str_tmp_key_value ;                EXIT ;            ELSIF ( ( lbrace_position > 0 ) AND ( lbrace_position < comma_position ) AND ( comma_position < rbrace_position ) ) THEN                SELECT ( LENGTH( SUBSTR( str_tmp_1 , symbol_position , ( rbrace_position - symbol_position + 1 ) ) ) - LENGTH( REPLACE( SUBSTR( str_tmp_1 , symbol_position , ( rbrace_position - symbol_position + 1 ) ) , '{' , '' ) ) ) INTO lbrace_ajust_num ;                SELECT ( LENGTH( SUBSTR( str_tmp_1 , symbol_position , ( rbrace_position - symbol_position + 1 ) ) ) - LENGTH( REPLACE( SUBSTR( str_tmp_1 , symbol_position , ( rbrace_position - symbol_position + 1 ) ) , '}' , '' ) ) ) INTO rbrace_ajust_num ;                WHILE ( lbrace_ajust_num <> rbrace_ajust_num ) LOOP                    change_position := ( change_position + 1 ) ;                    rbrace_position := INSTR( str_tmp_1 , '}' , symbol_position , change_position ) ;                    SELECT ( LENGTH( SUBSTR( str_tmp_1 , symbol_position , ( rbrace_position - symbol_position + 1 ) ) ) - LENGTH( REPLACE( SUBSTR( str_tmp_1 , symbol_position , ( rbrace_position - symbol_position + 1 ) ) , '{' , '' ) ) ) INTO lbrace_ajust_num ;                    SELECT ( LENGTH( SUBSTR( str_tmp_1 , symbol_position , ( rbrace_position - symbol_position + 1 ) ) ) - LENGTH( REPLACE( SUBSTR( str_tmp_1 , symbol_position , ( rbrace_position - symbol_position + 1 ) ) , '}' , '' ) ) ) INTO rbrace_ajust_num ;                END LOOP ;                change_position := 0 ;                str_tmp_key_value := SUBSTR( str_tmp_1 , symbol_position , ( rbrace_position - symbol_position + 1 ) ) ;                symbol_position := ( rbrace_position + 2 ) ;                array_value.EXTEND ;                array_value( array_cnt ) := str_tmp_key_value ;                array_cnt := ( array_cnt + 1 ) ;            ELSE                str_tmp_key_value := SUBSTR( str_tmp_1 , symbol_position , ( comma_position - symbol_position ) ) ;                symbol_position := ( comma_position + 1 ) ;                array_value.EXTEND ;                array_value( array_cnt ) := str_tmp_key_value ;                array_cnt := ( array_cnt + 1 ) ;            END IF ;        END LOOP ;        FOR array_loop_cnt IN 1..array_value.count LOOP            colon_position := 0 ;            array_key_value := '' ;            array_value_value := '' ;            str_tmp_key_value := btrim( array_value( array_loop_cnt ) ) ; --去除开头和结尾的空格            colon_position := INSTR( str_tmp_key_value , TO_CHAR( ':' ), 1 , 1 ) ;            array_key_value := btrim( SUBSTR( str_tmp_key_value , 1 , ( colon_position - 1 ) ) ) ;            array_value_value := btrim( SUBSTR( str_tmp_key_value , ( colon_position + 1 ), LENGTH( str_tmp_key_value ) ) ) ;            IF( array_key_value = ( '"' || key_name || '"' ) ) THEN                return_value := array_value_value ;                RETURN return_value ;            END IF ;        END LOOP ;           END IF ;    RETURN return_value ;END ;/  测试用例:【用例1】select jsonpars('{"a":"info_a","b":{"c":"info_c","d":"info_d"}}','a');【用例2】select jsonpars('{"a":"info_a","b":{"c":"info_c","d":"info_d"}}','b'); 【用例3】select jsonpars(jsonpars('{"a":"info_a","b":{"c":"info_c","d":"info_d"}}','b'),'d'); 【用例4】select jsonpars(jsonpars('{"a":"info_a","b":{"c1":"info_c","c2":{"d1":"info_d1","d2":"info_d2"},"d":"info_d"}}','b'),'d');