-
Kafka通过topic来分主题存放数据,主题内有分区,分区可以有多个副本,分区的内部还细分为若干个segment。所谓的分区其实就是在Kafka对应存储目录下创建的文件夹,文件夹的名字是主题名加上分区编号。①Topic 一类消息,消息存放的目录即主题,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。②Partition topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列③Segment 所谓的segment其实就是在分区对应的文件夹下产生的文件。一个分区会被划分成大小相等的若干segment,这样一方面保证了分区的数据被划分到多个文件中保证不会产生体积过大的文件;另-方面可以基于这些segment文件进行历史数据的删除,提高效率。一个segment又由一个log和一个index文件组成。④Log Log由多个Segment文件组成,接收到的新消息永远是以追加的方式于Segment文件中,Segment的文件个数随着数据量的累积而增加,每个消息有自增编号,这种只追加不修改的方式避免了变更前的查询消耗。⑤index Index文件仅记录固定消息量的索引编号范围,Kafka在查询时,先从Index中定位到小范围的索引编号区间,再去Log中在小范围的数据块中查询具体数据,此索引区间的查询方式称为:稀疏索引。二、原理概念1、持久化 kafka使用文件存储消息(append only log),这就直接决定kafka在性能上严重依赖文件系统的本身特性。且无论任何OS下,对文件系统本身的优化是非常艰难的。文件缓存/直接内存映射等是常用的手段。因为kafka是对日志文件进行append操作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数。对于kafka而言,较高性能的磁盘,将会带来更加直接的性能提升。2、性能 除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题。kafka并没有提供太多高超的技巧;对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;对于consumer端也是一样,批量fetch多条消息。不过消息量的大小可以通过配置文件来指定。对于Kafka broker端,似乎有个Send file系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换(这里涉及到"磁盘IO数据"、"内核内存"、"进程内存"、"网络缓冲区",多者之间的数据copy)。3、Topic模型 其他JMS实现,消息消费的位置是有provider保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态。这就要求JMS broker需要太多额外的工作。在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的。当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset。由此可见,consumer客户端也很轻量级。4、负载均衡 kafka集群中的任何一个broker,都可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息。消息由producer直接通过socket发送到broker,中间不会经过任何"路由层"。 异步发送,将多条消息暂且在客户端buffer起来,并将他们批量发送到broker;小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率;不过这也有一定的隐患,比如当producer失效时,那些尚未发送的消息将会丢失。
-
部署3台ceph 节点,如下:[root@ceph-1 ceph]# cat ceph.conf[global]fsid = 835f6d27-8e03-4f7f-8d21-b0a26993b529mon_initial_members = ceph-1mon_host = 172.24.193.62auth_cluster_required = cephxauth_service_required = cephxauth_client_required = cephxmon_allow_pool_delete=trueosd_pool_default_size = 2[root@ceph-1 ~]# ceph -s cluster: id: 835f6d27-8e03-4f7f-8d21-b0a26993b529 health: HEALTH_OK services: mon: 1 daemons, quorum ceph-1 mgr: ceph-1(active) osd: 3 osds: 3 up, 3 in data: pools: 1 pools, 128 pgs objects: 510 objects, 1.9 GiB usage: 6.8 GiB used, 233 GiB / 240 GiB avail pgs: 128 active+clean[root@ceph-1 ~]# ceph osd status+----+--------+-------+-------+--------+---------+--------+---------+-----------+| id | host | used | avail | wr ops | wr data | rd ops | rd data | state |+----+--------+-------+-------+--------+---------+--------+---------+-----------+| 0 | ceph-1 | 2170M | 77.8G | 0 | 0 | 0 | 0 | exists,up || 1 | ceph-2 | 2376M | 77.6G | 0 | 0 | 0 | 0 | exists,up || 2 | ceph-3 | 2456M | 77.5G | 0 | 0 | 0 | 0 | exists,up |+----+--------+-------+-------+--------+---------+--------+---------+-----------+[root@ceph-1 ceph]# ceph auth lsinstalled auth entries:osd.0 key: AQDH9qNjHRvdDBAAIS8klDI+PhUUSAwlxmEhKA== caps: [mgr] allow profile osd caps: [mon] allow profile osd caps: [osd] allow *osd.1 key: AQDf9qNjNSttFRAA28htGbtUWAYBxK9kacVHXQ== caps: [mgr] allow profile osd caps: [mon] allow profile osd caps: [osd] allow *osd.2 key: AQDu9qNj43mtGRAAJ4tAU5Ie9carFkhRcwnMnQ== caps: [mgr] allow profile osd caps: [mon] allow profile osd caps: [osd] allow *client.admin key: AQC+86NjAev/HRAAD/wXEPaDVlEOWMn0H2x8ng== caps: [mds] allow * caps: [mgr] allow * caps: [mon] allow * caps: [osd] allow *client.bootstrap-mds key: AQC+86NjHAIAHhAAm0L2LXEzwRx7ijTBMlsmSQ== caps: [mon] allow profile bootstrap-mdsclient.bootstrap-mgr key: AQC+86Nj4xEAHhAADcU3pbq04mk4wK8DxNH6hA== caps: [mon] allow profile bootstrap-mgrclient.bootstrap-osd key: AQC+86NjryAAHhAAzCw13oaWl8EvdrVRCqc+wQ== caps: [mon] allow profile bootstrap-osdclient.bootstrap-rbd key: AQC+86NjEDAAHhAAWev9IkV4zd6gSoUjEEK2ZA== caps: [mon] allow profile bootstrap-rbdclient.bootstrap-rgw key: AQC+86NjXz4AHhAADlfczh0/8voJZuDWkHaJ9w== caps: [mon] allow profile bootstrap-rgwclient.libvirt key: AQDULP9jtWHCMxAAfIBCbNynpHADCYMwQr8xpg== caps: [mon] allow r caps: [osd] allow class-read object_prefix rbd_children, allow rwx pool=libvirt-poolmgr.ceph-1 key: AQAF9KNj6Io3OBAAX6koyzKcwSn+IL5IemY3xg== caps: [mds] allow * caps: [mon] allow profile mgr caps: [osd] allow *在ceph client 上创建vm ,使用ceph rbd 访问 ceph pool libvirt-pool里的qcow2 镜像: 43 <emulator>/usr/local/bin/qemu-system-x86_64</emulator> 44 <disk type='network' device='disk'> 45 <driver name='qemu' type='raw' cache='writeback'/> 46 <auth username='libvirt'> 47 <secret type='ceph' uuid='8187dd57-7e0d-4a9d-8e5a-fe5234d40e51'/> 48 </auth> 49 <source protocol='rbd' name='libvirt-pool/root-vsys_c1.qcow2'> 50 <host name='172.24.193.62' port='6789'/> 51 <host name='172.24.193.63' port='6789'/> 52 <host name='172.24.193.64' port='6789'/> 53 </source> 54 <target dev='vda' bus='virtio'/> 55 <address type='pci' domain='0x0000' bus='0x00' slot='0x09' function='0x0'/> 56 </disk>在ceph-client 上可以利用 client.libvirt 用户访问 ceph 上的libvirt-pool ,如下:如上图卡了,看样子是找不到启动盘了。。。libvirtd 把 root-vsys_c1 xml 中的 启动 虚拟机的命令如下:[root@ceph-client-65 ceph]# rbd --user libvirt ls libvirt-poolroot-vsys_c1.qcow2而我不通过virsh start xxx 方式启动vm,采取直接qemu 启动,则可以成功进入vm 内部,并正常引导kernel这种方式和virsh 方式的不同点,就是 qemu 没有同ceph 进行cephx 验证吧。本人怀疑是 ceph client 接入ceph 认证导致的,可是我都已经参考了此ceph 官网链接:https://docs.ceph.com/en/quincy/rbd/libvirt/ 反复检查了。在创建vm 的客户端节点上都可以访问ceph 集群内。希望有从事ceph 分布式存储这方面实战经验的高手多多指教下这个现象出现的原因及解决方法。谢谢!
-
创建minio,没配置访问链接的地方,如何配置外部存储的minio?
-
存储位置不同,打包到生产后,开发环境配置的资源获取不到,有没有能通过中间层,或者配置参数的方式,打包到生产能正常调用不同存储的资源?
-
ERROR - 2022-12-07 18:05:29 --> Severity: Warning --> fread(): SSL: Connection reset by peer /dev/third_party/huaweicloud-obs/vendor/guzzlehttp/psr7/src/Stream.php 231ERROR - 2022-12-07 17:32:56 --> Severity: error --> Exception: Argument 1 passed to Obs\ObsClient::Obs\Internal\{closure}() must be an instance of GuzzleHttp\Exception\RequestException, instance of GuzzleHttp\Exception\ConnectException given, called in /dev/third_party/huaweicloud-obs/vendor/guzzlehttp/promises/src/Promise.php on line 204 /dev/third_party/huaweicloud-obs/Obs/Internal/SendRequestTrait.php 640
-
迁移环境:操作系统: CentOS Linux release 7.6.1810 (AltArch) Linux ### 4.14.0-115.el7a.0.1.aarch64 #1 SMP Sun Nov 25 20:54:21 UTC 2018 aarch64 aarch64 aarch64 GNU/Linux组件: tfs(开源地址:https://github.com/alibaba/tfs/tree/release-2.2.13)依赖:jemalloc,mysql-connector-c,zlib-devel,libuuid,libtbsys.so,libtbnet.so(后两个的迁移案例:https://bbs.huaweicloud.com/forum/thread-102614-1-1.html)迁移过程:里面的脚本等小错误还是比较多的,但这些都不是重点,稍微修改一下就好(后面附带修改后的完整代码),来看看一下报错 unknown mnemonic 'lock' -- 'lock' unknown mnemonic 'xaddl' -- xaddl x20,[x0] unknown mnemonic 'lock' -- 'lock' unknown mnemonic 'xaddl' -- xaddl x7,[x1]错误类似上一篇https://bbs.huaweicloud.com/forum/thread-102614-1-1.html我们接着看具体的代码:src/common/atomic.h-----------------------------------------------------------------------------------------------------#if defined(__i386__) || defined(__x86_64__) // Atomic operations that C can't guarantee us. Useful for // resource counting etc.. // SMP lock prefix #ifdef CONFIG_SMP #define LOCK_PREFIX "lock ; " #else #define LOCK_PREFIX "" #endif //return: the incremented value; /// 原子地做 8位,16位,32位,64位的++i的操作 /// 该操作虽然参数和返回值都是无符号型整数,但是一样可以 /// 对有符号型整数做操作,只需要做适当的参数转换即可 /// @param pv 指向操作数的指针 /// @return 操作数加1以后的数值 #ifdef __x86_64__ static __inline__ uint64_t atomic_inc(volatile uint64_t * pv) { register unsigned long __res; __asm__ __volatile__ ( "movq $1,%0;" LOCK_PREFIX "xaddq %0,(%1);" "incq %0" :"=a" (__res), "=q" (pv): "1" (pv)); return __res; } #endif static __inline__ uint32_t atomic_inc(volatile uint32_t * pv) { register unsigned int __res; __asm__ __volatile__ ( "movl $1,%0;" LOCK_PREFIX "xaddl %0,(%1);" "incl %0" :"=a" (__res), "=q" (pv): "1" (pv)); return __res; } static __inline__ uint16_t atomic_inc(volatile uint16_t * pv) { register unsigned short __res; __asm__ __volatile__ ( "movw $1,%0;" LOCK_PREFIX "xaddw %0,(%1);" "incw %0" :"=a" (__res), "=q" (pv): "1" (pv)); return __res; } static __inline__ uint8_t atomic_inc(volatile uint8_t * pv) { register unsigned char __res; __asm__ __volatile__ ( "movb $1,%0;" LOCK_PREFIX "xaddb %0,(%1);" "incb %0" :"=a" (__res), "=q" (pv): "1" (pv)); return __res; } //return: the decremented value; /// 原子地做 8位,16位,32位,64位的--i的操作 /// 该操作虽然参数和返回值都是无符号型整数,但是一样可以 /// 对有符号型整数做操作,只需要做适当的参数转换即可 /// @param pv 指向操作数的指针 /// @return 操作数减1后的数值 #ifdef __x86_64__ static __inline__ uint64_t atomic_dec(volatile uint64_t * pv) { register unsigned long __res; __asm__ __volatile__ ( "movq $0xffffffffffffffff,%0;" LOCK_PREFIX "xaddq %0,(%1);" "decq %0" : "=a" (__res), "=q" (pv): "1" (pv)); return __res; } #endif static __inline__ uint32_t atomic_dec(volatile uint32_t * pv) { register unsigned int __res; __asm__ __volatile__ ( "movl $0xffffffff,%0;" LOCK_PREFIX "xaddl %0,(%1);" "decl %0" : "=a" (__res), "=q" (pv): "1" (pv)); return __res; } static __inline__ uint16_t atomic_dec(volatile uint16_t * pv) { register unsigned short __res; __asm__ __volatile__ ( "movw $0xffff,%0;" LOCK_PREFIX "xaddw %0,(%1);" "decw %0" : "=a" (__res), "=q" (pv): "1" (pv)); return __res; } static __inline__ uint8_t atomic_dec(volatile uint8_t * pv) { register unsigned char __res; __asm__ __volatile__ ( "movb $0xff,%0;" LOCK_PREFIX "xaddb %0,(%1);" "decb %0" : "=a" (__res), "=q" (pv): "1" (pv)); return __res; } //return: the initial value of *pv /// 原子地做 8位,16位,32位,64位的加法的操作 /// 该操作虽然参数和返回值都是无符号型整数,但是一样可以 /// 对有符号型整数做操作,只需要做适当的参数转换即可 /// @param pv 指向操作数的指针 /// @return 操作数加法之前的数值 #ifdef __x86_64__ static __inline__ uint64_t atomic_add(volatile uint64_t * pv, const uint64_t av) { //:"=a" (__res), "=q" (pv): "m"(av), "1" (pv)); register unsigned long __res; __asm__ __volatile__ ( "movq %2,%0;" LOCK_PREFIX "xaddq %0,(%1);" :"=a" (__res), "=q" (pv): "mr"(av), "1" (pv)); return __res; } #endif static __inline__ uint32_t atomic_add(volatile uint32_t * pv, const uint32_t av) { //:"=a" (__res), "=q" (pv): "m"(av), "1" (pv)); register unsigned int __res; __asm__ __volatile__ ( "movl %2,%0;" LOCK_PREFIX "xaddl %0,(%1);" :"=a" (__res), "=q" (pv): "mr"(av), "1" (pv)); return __res; } static __inline__ uint16_t atomic_add(volatile uint16_t * pv, const uint16_t av) { //:"=a" (__res), "=q" (pv): "m"(av), "1" (pv)); register unsigned short __res; __asm__ __volatile__ ( "movw %2,%0;" LOCK_PREFIX "xaddw %0,(%1);" :"=a" (__res), "=q" (pv): "mr"(av), "1" (pv)); return __res; } static __inline__ uint8_t atomic_add(volatile uint8_t * pv, const uint8_t av) { //:"=a" (__res), "=q" (pv): "m"(av), "1" (pv)); register unsigned char __res; __asm__ __volatile__ ( "movb %2,%0;" LOCK_PREFIX "xaddb %0,(%1);" :"=a" (__res), "=q" (pv): "mr"(av), "1" (pv)); return __res; } //function: set *pv to nv //return: the initial value of *pv /// 原子地把nv赋值给pv指向的整数,支持8位,16位,32位,84位操作 /// @param pv 待赋值的整数(目的操作数) /// @param nv 向pv赋的整数 /// @return pv指向的赋值前的数值 #ifdef __x86_64__ static __inline__ uint64_t atomic_exchange(volatile uint64_t * pv, const uint64_t nv) { register unsigned long __res; __asm__ __volatile__ ( "1:" LOCK_PREFIX "cmpxchgq %3,(%1);" \ "jne 1b": "=a" (__res), "=q" (pv): "1" (pv), "q" (nv), "0" (*pv)); return __res; } #endif static __inline__ uint32_t atomic_exchange(volatile uint32_t * pv, const uint32_t nv) { register unsigned int __res; __asm__ __volatile__ ( "1:" LOCK_PREFIX "cmpxchgl %3,(%1);" \ "jne 1b": "=a" (__res), "=q" (pv): "1" (pv), "q" (nv), "0" (*pv)); return __res; } static __inline__ uint16_t atomic_exchange(volatile uint16_t * pv, const uint16_t nv) { register unsigned short __res; __asm__ __volatile__ ( "1:" LOCK_PREFIX "cmpxchgw %3,(%1);" \ "jne 1b": "=a" (__res), "=q" (pv): "1" (pv), "q" (nv), "0" (*pv)); return __res; } static __inline__ uint8_t atomic_exchange(volatile uint8_t * pv, const uint8_t nv) { register unsigned char __res; __asm__ __volatile__ ( "1:" LOCK_PREFIX "cmpxchgb %3,(%1);" \ "jne 1b": "=a" (__res), "=q" (pv): "1" (pv), "q" (nv), "0" (*pv)); return __res; } //function: compare *pv to cv, if equal, set *pv to nv, otherwise do nothing. //return: the initial value of *pv /// 比较pv和cv,如果两者相等,则返回pv原有数值并且把nv赋值给pv /// 否则什么也不作,返回pv原有数值 /// @param pv 待赋值的整数(目的操作数) /// @param nv 向pv赋的整数 /// @param cv 和pv比较的整数 /// @return pv指向的操作前的数值 #ifdef __x86_64__ static __inline__ uint64_t atomic_compare_exchange(volatile uint64_t * pv, const uint64_t nv, const uint64_t cv) { register unsigned long __res; __asm__ __volatile__ ( LOCK_PREFIX "cmpxchgq %3,(%1)" : "=a" (__res), "=q" (pv) : "1" (pv), "q" (nv), "0" (cv)); return __res; } #endif static __inline__ uint32_t atomic_compare_exchange(volatile uint32_t * pv, const uint32_t nv, const uint32_t cv) { register unsigned int __res; __asm__ __volatile__ ( LOCK_PREFIX "cmpxchgl %3,(%1)" : "=a" (__res), "=q" (pv) : "1" (pv), "q" (nv), "0" (cv)); return __res; } static __inline__ uint16_t atomic_compare_exchange(volatile uint16_t * pv, const uint16_t nv, const uint16_t cv) { register unsigned short __res; __asm__ __volatile__ ( LOCK_PREFIX "cmpxchgw %3,(%1)" : "=a" (__res), "=q" (pv) : "1" (pv), "q" (nv), "0" (cv)); return __res; } static __inline__ uint8_t atomic_compare_exchange(volatile uint8_t * pv, const uint8_t nv, const uint8_t cv) { register unsigned char __res; __asm__ __volatile__ ( LOCK_PREFIX "cmpxchgb %3,(%1)" : "=a" (__res), "=q" (pv) : "1" (pv), "q" (nv), "0" (cv)); return __res; } typedef void * pvoid; //function: set *pv to nv //return: the initial value of *pv /// 把nv原子地赋值给*pv static __inline__ pvoid atomic_exchange_pointer(volatile pvoid * pv, const pvoid nv) { #ifdef __x86_64__ return (pvoid) atomic_exchange((uint64_t *) pv, (uint64_t) nv); #else return (pvoid) atomic_exchange((uint32_t *) pv, (uint32_t) nv); #endif } //function: compare *pv to cv, if equal, set *pv to nv, otherwise do nothing. //return: the initial value of *pv /// 比较cv和*pv,如果两者相等则把nv赋值给*pv,并且返回*pv原有数值 /// 否则返回*pv原有数值,不做赋值操作 static __inline__ pvoid atomic_compare_exchange_pointer(volatile pvoid * pv, const pvoid nv, const pvoid cv) { #ifdef __x86_64__ return (pvoid) atomic_compare_exchange((uint64_t *) pv, (uint64_t) nv, (uint64_t)cv); #else return (pvoid) atomic_compare_exchange((uint32_t *) pv, (uint32_t) nv, (uint32_t)cv); #endif } #undef LOCK_PREFIX好嘛,又是定义x86下的asm函数,好吧,继续使用gcc的内建__sync的原子操作函数来做实现,修改后的文件如下:
上滑加载中
推荐直播
-
全面解析华为云EI-API服务:理论基础与实践应用指南
2024/11/29 周五 18:20-20:20
Alex 华为云学堂技术讲师
本期直播给大家带来的是理论与实践结合的华为云EI-API的服务介绍。从“主要功能,应用场景,实践案例,调用流程”四个维度来深入解析“语音交互API,文字识别API,自然语言处理API,图像识别API及图像搜索API”五大场景下API服务,同时结合实验,来加深开发者对API服务理解。
去报名 -
企业员工、应届毕业生、在读研究生共探项目实践
2024/12/02 周一 19:00-21:00
姚圣伟 在职软件工程师 昇腾社区优秀开发者 华为云云享专家 HCDG天津地区发起人
大神带你一键了解和掌握LeakyReLU自定义算子在ONNX网络中应用和优化技巧,在线分享如何入门,以及在工作中如何结合实际项目进行学习
即将直播 -
昇腾云服务ModelArts深度解析:理论基础与实践应用指南
2024/12/03 周二 14:30-16:30
Alex 华为云学堂技术讲师
如何快速创建和部署模型,管理全周期AI工作流呢?本期直播聚焦华为昇腾云服务ModelArts一站式AI开发平台功能介绍,同时结合基于ModelArts 的实践性实验,帮助开发者从理论到实验更好地理解和使用ModelArts。
去报名
热门标签