0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

高并发内存池项目实现

科技绿洲 来源:Linux开发架构之路 作者:Linux开发架构之路 2023-11-09 11:16 次阅读

本项目实现了一个高并发内存池,参考了Google的开源项目tcmalloc实现的简易版;其功能就是实现高效的多线程内存管理。由功能可知,高并发指的是高效的多线程,而内存池则是实现内存管理的。

内存池相关知识

1、池化技术

池化技术就是程序先向系统申请过量的资源,并将这些资源管理起来,避免频繁的申请和释放资源导致的开销。

内存池可以使用池化技术来维护可用内存块的链表。当程序需要分配内存时,内存池会从链表中分配一个可用的内存块。如果没有可用的内存块,内存池会从操作系统申请更多的内存,并将新分配的内存块添加到链表中。当程序释放内存时,内存池会将内存块添加回链表中,以便将来使用。

池化技术可以有效地减少内存碎片,因为它可以将多个小内存块组合成更大的内存块,这样就可以分配更大的连续内存空间,并减少碎片。此外,池化技术还可以提高内存使用效率,因为它可以快速分配和释放内存,而无需每次都调用操作系统的内存分配和释放函数。

2、内存池

内存池指的是程序预先向操作系统申请足够大的一块内存空间;此后,程序中需要申请内存时,不需要直接向操作系统申请,而是直接从内存池中获取;同理,程序释放内存时,也不是将内存直接还给操作系统,而是将内存归还给内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。

3、内存池主要解决的问题

由上可知,内存池首要解决的是效率问题,其次从系统的内存分配器角度出发,还需要解决内存碎片的问题。那么什么是内存碎片问题呢?

内存碎片分为外碎片和内碎片。

  • 外碎片由下图所示:对于程序员申请的内存,可能因为频繁的申请和释放内存导致内存空间不连续,那么就会出现明明由足够大的内存空间,但程序员却申请不出连续的空间出来,这便是外碎片问题了。
  • 内碎片则是由于一些对齐的需求,导致分配出去的内存空间无法被利用,比如本项目中的Round(Size)对size进行的对齐。

4、malloc

C语言中动态申请内存是通过malloc函数去申请内存的,但是实际上malloc并不是直接向堆申请内存的,而malloc也可以使用内存池来管理内存分配,在某些情况下,操作系统或C语言标准库可能会使用内存池来管理堆内存,以提高内存分配效率。当程序将malloc管理的内存池中内存全部申请完时,malloc函数就会继续向操作系统申请空间。

设计思路

第一阶段–设计一个定长的内存池

我们知道malloc函数申请内存空间是通用的,即任何场景下都可以使用,但是各方面都通用就意味着各方面都不顶尖,那么我们可以设计一个定长内存池来保证特定场景下的内存申请效率要高于malloc函数。

图片

适应平台的指针方案

在这里,我们想取出一块对象内存中的前4个字节(32位系统)或者8个字节(64位系统)的内存来存储一个指针指向下一块释放回来的自由对象内存,那么在这里为了不作平台系统的判断,可以使用一个小技巧,即将对象内存强转成void**的类型,那么再对这个二级指针类型解引用就可以取出当前对象的前4个字节(32位系统)或8个字节(64位系统)。

由于这个操作之后会频繁使用,因此定义为内敛函数放在common.h头文件中方便调用:

static inline void*& NextObj(void* obj)
{
return *(void**)obj;
}

由此,我们就可以设计出定长内存池的对象:

定长内存池池的基本思想是在程序运行时预先分配一大块内存,然后在需要使用某个对象时,从这块内存中分配给它。当对象不再使用时,将它归还给对象池,供其他对象使用。这样做的好处在于减少了内存分配和释放的次数,从而减少了内存碎片的产生,并降低了内存分配的开销。

在这段代码中,ObjectPool 类的主要功能包括:

  • New() 函数:用于分配一个新的对象,如果有自由链表中有空闲的对象,则直接从自由链表中取出;否则,如果当前剩余内存块大小不够一个对象的大小,则重新申请一个内存块。申请到内存后,调用对象的构造函数来进行初始化。
  • Delete() 函数:用于释放一个对象,调用对象的析构函数进行清理,然后将其加入自由链表中。

在这段代码中,ObjectPool类的成员变量包括:

  • _memory:指向当前申请的内存块的指针。
  • _remainBytes:当前内存块剩余的字节数。
  • _freeList:自由链表的头指针,用于保存当前有哪些对象可以被重复利用。

在这段代码中,还有一个函数 SystemAlloc(),这是为了避免使用malloc而使用的,它的作用是申请一个新的内存块。如果申请失败,则抛出 std::bad_alloc 异常。

总的来说,这段代码实现了一个简单的对象池,可以有效地管理类型为 T 的对象的内存分配和释放,从而减少了内存碎片的产生,并降低了内存分配的开销。

template
class ObjectPool
{
public:
T* New()
{
T* obj = nullptr;
// 如果自由链表非空,以“头删”的方式从自由链表取走内存块,重复利用
if (_freeList)
{
// 技巧:(void**)强转方便32位下获取前4字节,64位下获取前8字节
void* next = *((void**)_freeList);
obj = (T*)_freeList;
_freeList = next;
}
else
{
// 剩余内存_remainBytes不够一个对象大小时,重新开一块大空间
if (_remainBytes < sizeof(T))
{
_remainBytes = 128 * 1024;

// 分配了 _remainBytes 个字节的空间,即(128 *1024字节,128KB)
// memory = (char*)malloc(_remainBytes);

// >>13 其实就是一页8KB的大小,可以得到具体多少页
_memory = (char*)SystemAlloc(_remainBytes >> 13);

if (_memory == nullptr)
{
throw std::bad_alloc();
}
}

obj = (T*)_memory;
// 保证一次分配的空间够存放下当前平台的指针
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
// 大块内存块往后走,前面objSize大小的内存该分配出去了
_memory += objSize;
_remainBytes -= objSize;
}

// 定位new显式调用T类型构造函数:在内存地址obj处创建一个新的T类型的对象,并调用该对象的构造函数。
// 与普通的new运算符不同的是,它不会使用动态内存分配器来分配内存,而是使用指定的内存地址。
new(obj)T;
return obj;
}
//将obj这块内存链接到_freeList中
void Delete(T* obj)
{
//显式调用obj对象的析构函数,清理空间
obj->~T();

//将obj内存块头插
*(void**)obj = _freeList;
_freeList = obj;
}
private:
char* _memory = nullptr; // 指向大块内存的指针
size_t _remainBytes = 0; // 大块内存在切分过程中的剩余字节数
void* _freeList = nullptr; // 自由链表的头指针,用于保存当前有哪些对象可以被重复利用。
};

对于我们设计的定长内存池,可以通过下面的测试代码来比较一下malloc与定长内存池的效率:

struct TreeNode
{
int _val;
TreeNode* _left;
TreeNode* _right;

TreeNode():_val(0), _left(NULL),_right(NULL){}
TreeNode(int x) : _val(x), _left(nullptr), _right(nullptr) {}
};

void TestObjectPool()
{
// 申请释放的轮次
const size_t Rounds = 5;
// 每轮申请释放多少次
const size_t N = 1000000;
size_t begin1 = clock();
std::vector v1;
v1.reserve(N);
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v1.push_back(new TreeNode);
}
for (int i = 0; i < N; ++i)
{
delete v1[i];
}
v1.clear();
}
size_t end1 = clock();
ObjectPool TNPool;
size_t begin2 = clock();
std::vector v2;
v2.reserve(N);
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v2.push_back(TNPool.New());
}
for (int i = 0; i < 100000; ++i)
{
TNPool.Delete(v2[i]);
}
v2.clear();
}
size_t end2 = clock();
cout << "new cost time:" << end1 - begin1 << endl;
cout << "object pool cost time:" << end2 - begin2 << endl;
}*>*>

图片

可以明显的看出,定长内存池的开销是要低于malloc的,由此可见,在特定场景下,定长内存池的效率高于malloc函数。

第二阶段–高并发内存池整体框架设计

现代开发环境大多都是多核多线程,那么在申请内存的场景下,必然存在激烈的锁竞争问题。其实,malloc本身就已经足够优秀了,但本项目的原型tcmalloc将在多线程高并发的场景下更胜一筹。

而本项目实现的内存池将考虑以下几方面的问题:

  • 1.性能问题
  • 2.多线程场景下的锁竞争问题
  • 3.内存碎片问题

concurrent memory pool(并发内存池),主要有以下3个部分组成:

1.线程缓存(thread cache)

线程缓存是每个线程独有的,用于小于256kb内存的分配。那么对于每一个线程从thread cache申请资源,就无需考虑加锁问题,每个线程独享一个缓存(cache),这也是并发线程池高效的地方。

2.中心缓存(central cache)

中心缓存有所有线程所共享,thread cache 按需从central cache处获取对象,而central cache在合适的时机从thread cache处回收对象从而避免一个线程占用太多资源,导致其他线程资源吃紧,进而实现内存分配在多个线程更加均衡的按需调度。由于所有thread cache都从一个central cache中取内存对象,故central cache是存在竞争的,也就是说从central cache中取内存对象需要加锁,但我们在central cache这里用的是桶锁,且只有thread cache中没有对象后才会来central cache处取对象,因此锁的竞争不会很激烈。

3.页缓存(page cache)

页缓存是中心缓存上一级的缓存,存储并分配以页为单位的内存,central cache中没有内存对象时,会从page cache中分配出一定数量的page,并切割成定长大小的小块内存,给central cache。当page cache中一个span的几个跨度页都回收以后,page cache会回收central cache中满足条件的span对象,并且合并相邻的页,组成更大的页,从而缓解内存碎片(外碎片)的问题。

图片

第三阶段–三级缓存的具体实现

1.Thread Cache框架构建及核心实现

thread cache是哈希桶结构,每个桶是一个根据桶位置映射的挂接内存块的自由链表,每个线程都会有一个thread cache对象,这样就可以保证线程在申请和释放对象时是无锁访问的。

图片

申请与释放内存的规则及无锁访问

申请内存

  1. 当内存申请大小size不超过256KB,则先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
  2. 如果自由链表_freeLists[i]中有对象,则直接Pop一个内存对象返回。
  3. 如果_freeLists[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。

释放内存

1.当释放内存小于256kb时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。

2.当链表的长度过长,则回收一部分内存对象到central cache。

tls - thread local storage

线程局部存储(tls),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。而熟知的全局变量,是所有线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度。

//TLS: thread local storage,实现线程的无锁访问
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

管理内存对齐和映射等关系

计算对齐大小映射的规则

thread cache中的哈希桶映射比例比非均匀的,如果将内存大小均匀划分的话,则会划分出大量的哈希桶,比如256kb如果按照8byte划分,则会创建32768个哈希桶,这就有较大的内存开销;而如果按照更大的字节划分,那么内存开销虽然减少了,但照顾到的场景也少了,且会产生内碎片问题。

那么参考tcmalloc项目,为了保证内碎片的浪费整体控制在10%左右进行的区间映射,同时没有那么大的开销。使用RoundUp 函数的将输入的 size 对齐到一个固定的对齐值。对齐值是根据 size 的大小而定的,它分成了五个区间:

  • 如果 size 位于 [1,128] 之间,那么 size 将被对齐到 8 字节。
  • 如果 size 位于 [128+1,1024] 之间,那么 size 将被对齐到 16 字节。
  • 如果 size 位于 [1024+1,8*1024] 之间,那么 size 将被对齐到 128 字节。
  • 如果 size 位于 [81024+1,641024] 之间,那么 size 将被对齐到 1024 字节。
  • 如果 size 位于 [641024+1,2561024] 之间,那么 size 将被对齐到 8192 字节。

这个函数内部使用了另外一个静态函数_RoundUp来实际计算对齐后的值。

也就是说,对于1byte到128byte的内存对象,按照8byte对齐,划分为下标0-15号的哈希桶,而129byte到1kb的内存对象,按照16byte对齐,划分下标16-71号的哈希桶,以此类推,最终划分为0-207号总共208个哈希桶,这样就保证了内存较小的开销,同时各个对齐关系中内碎片浪费控制在10%左右,比如129byte到144byte区间,取144byte的内存对象,浪费率为(144 - 129) / 144 = 10.42%,当然对于最开始的1byte申请8byte内存对象,虽然浪费高达87.5%,但考虑到最终内碎片浪费了7byte,对比后续内碎片一次浪费7kb来说可以忽略不计了。

这便是申请的内存对象大小对齐的映射关系,这个关系在后续central cache及page cache中仍有应用,因此可以将其定义在头文件common.h中,以后内存大小对齐的管理。

计算相应内存映射在哪一个哈希桶中

这里使用Index 函数计算将输入的 size 映射到哪个自由链表桶(freelist)。和RoundUp函数一样,这个函数也根据size的大小将它分成了五个区间,但是它返回的是一个数组下标。数组的大小和每个区间内的自由链表桶数量是固定的。

这个函数内部使用了另一个静态函数_Index来计算桶的下标。在代码中,size表示要被对齐的内存块的大小,alignNum表示对齐的值,align_shift表示对齐的值的二进制位数。

关于 _RoundUp和 _Index:

对于 _RoundUp 函数,它使用位运算将 size 对齐到最接近它的大于等于它的 alignNum 的倍数。这里有一个简单的例子:假设我们有一个值 size=11,我们想将它对齐到 alignNum=8 的倍数。那么 _RoundUp 函数会返回 16,因为 16 是最接近 11 且大于等于 11 的 alignNum 的倍数。

对于 _Index 函数,它计算的是将 size 映射到桶链的下标。它的计算方法是将 size 向上对齐到最接近它的大于等于它的 2^align_shift 的倍数,然后再减去 1。这个函数的作用和 _RoundUp 函数类似,但是它返回的是下标而不是对齐后的值。

//计算对齐数
size_t _RoundUp(size_t size, size_t alignNum)
{
size_t alignSize;
if (size % alignNum != 0)
{
alignSize = (size / alignNum + 1) * alignNum;
}
else
{
alignSize = size;
}
return alignSize;
}


//计算对应链桶的下标
static inline size_t _Index(size_t bytes, size_t alignNum)
{
if (bytes % alignNum == 0)
{
return bytes / alignNum - 1;
}
else
{
return bytes / alignNum;
}
}

但是参考tcmalloc源码,考虑到位移运算更加接近底层,效率更高,而实际应用中映射对应关系的计算是相当频繁的,因此使用位运算来改进算法

static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
return ((bytes + alignNum - 1) & ~(alignNum - 1));
}
static inline size_t _Index(size_t bytes, size_t align_shift)
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}

代码实现

// 计算对象大小的对齐映射规则
class SizeClass
{
public:
// 整体控制在最多10%左右的内碎片浪费
// [1,128] 8byte对齐 freelist[0,16)
// [128+1,1024] 16byte对齐 freelist[16,72)
// [1024+1,8*1024] 128byte对齐 freelist[72,128)
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)

// 使用位运算将 size 对齐到最接近它的大于等于它的 alignNum 的倍数
// 比如size = 11对齐到16
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
return ((bytes + alignNum - 1) & ~(alignNum - 1));
}
static inline size_t RoundUp(size_t size)
{
if (size <= 128)
{
return _RoundUp(size, 8);
}
else if (size <= 1024)
{
return _RoundUp(size, 16);
}
else if (size <= 8 * 1024)
{
return _RoundUp(size, 128);
}
else if (size <= 64 * 1024)
{
return _RoundUp(size, 1024);
}
else if (size <= 256 * 1024)
{
return _RoundUp(size, 8 * 1024);
}
else
{
assert(false);
return -1;
}
}

// 将 size 映射到桶链的下标:
// 这个函数的作用和 _RoundUp 函数类似,但是它返回的是下标而不是对齐后的值。
// 比如size = 11映射下标到(2 - 1 = 1)
static inline size_t _Index(size_t bytes, size_t align_shift)
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
// 计算映射的哪一个自由链表桶
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);

// 每个区间有多少个链
static int group_array[4] = { 16, 56, 56, 56 };// 打表
if (bytes <= 128)
{
return _Index(bytes, 3);
}
else if (bytes <= 1024)
{
return _Index(bytes - 128, 4) + group_array[0];
}
else if (bytes <= 8 * 1024)
{
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
}
else if (bytes <= 64 * 1024)
{
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
}
else if (bytes <= 256 * 1024)
{
return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
}
else
{
assert(false);
}

return -1;
}

// 计算ThreadCache一次从中心缓存CentralCache获取多少个小对象,总的大小就是MAX_BYTES = 256KB
static size_t NumMoveSize(size_t size)
{
assert(size > 0);

// [2, 512],一次批量移动多少个对象的(慢启动)上限值
// 小对象一次批量上限高
// 小对象一次批量上限低
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;

return num;
}

// 计算中心缓存CentralCache一次向PageCache获取多少页
// 单个对象 8byte
// ...
// 单个对象 256KB
static size_t NumMovePage(size_t size)
{
// 计算一次从中心缓存获取的对象个数num
size_t num = NumMoveSize(size);
// 单个对象大小与对象个数相乘,获得一次需要向PageCache申请的内存大小
size_t npage = num * size;

npage >>= PAGE_SHIFT;
if (npage == 0)
{
npage = 1;
}
return npage;
}
};

NumMoveSize 函数的作用是计算一次从中心缓存获取多少个对象。它的计算方法是首先将单个对象大小除以总的缓存大小 MAX_BYTES,得到的结果即为一次从中心缓存获取的对象个数。为了避免数量太少或太多,可以设置一个范围,在 [2, 512] 之间。如果计算出的对象数量不在这个范围内,就取边界值。

NumMovePage函数的作用是计算中心缓存CentralCache一次向PageCache获取多少页。一页的大小是由PAGE_SHIFT指定的。本项目中一个页大小是8KB,即2的13次方,所以PAGE_SHIFT = 13。NumMovePage函数先调用NumMoveSize函数计算出一次从CentralCache获取多少个对象,然后乘上对象大小,就获得需要向PageCache申请的内存大小,然后除以单个页的大小(左移PAGE_SHIFT)即可获得向PageCache申请的总页数。

突击检查:static inline 函数和 inline函数有什么区别呢?

inline内联函数:为了减少因函数调用而引起的系统开销,内联函数实际上是以空间换效率的一种做法。编译器会尽量将 inline 函数的代码插入到调用函数的代码中,从而减少函数调用的开销。inline 函数的主要优点是可以提高程序的执行效率,因为省去了函数调用的开销。但是,inline 函数的缺点是会降低程序的可读性,代码会变得复杂。

static inline 函数是一种特殊的函数,它同时具有 inline 函数的优点和 static 函数的优点。static 函数是指在编译期间就将函数体内的代码插入到调用函数的代码中,并且只在本文件中可见。static 函数的主要优点是可以隐藏函数的实现细节,只提供接口。所以在头文件中务必要加上static inline,否则和普通函数一样,当多个CPP文件包含是就会重复定义。所以加入static提高代码健壮性。

因此,static inline 函数既可以提高程序的执行效率,又可以隐藏函数的实现细节,是一种很好的函数声明方式。

自由链表的设计

在有了上面的基础之后,我们来设计自由链表,其实就是实现一个单链表,方便插入删除,同时标识链表长度 _size以方便后续释放流程,以及定义 _maxSize来保住thread cache一次申请对象批次的下限。

// 返回“obj前4或8字节内存”强转得来的指针,指向的是下一个结点
static void*& NextObj(void* obj)
{
return *(void**)obj;
}
class FreeList
{
public:
void Push(void* obj)
{
// 将归还的内存块对象头插进自由链表
NextObj(obj) = _freeList;
_freeList = obj;
++_size;
}
void PushRange(void* start, void* end, size_t size)
{
NextObj(end) = _freeList;
_freeList = start;
_size += size;
}
void* Pop()
{
assert(_freeList);
//将自由链表中的内存块头删出去
void* obj = _freeList;
_freeList = NextObj(obj);
--_size;
return obj;
}

void PopRange(void*& start, void*& end, size_t n)
{
assert(n >= _size);
start = _freeList;
end = start;
for (size_t i = 0; i < n - 1; i++)
{
end = NextObj(end);
}
_freeList = NextObj(end);
_size -= n;
NextObj(end) = nullptr;
}

bool Empty()
{
return _freeList == nullptr;
}
size_t& MaxSize()// 传引用
{
return _maxSize;
}
size_t& Size()
{
return _size;
}
private:
void* _freeList = nullptr;
size_t _maxSize = 1;//慢增长用于保住申请批次下限
size_t _size = 0;//计算链表长度
};

thread cache框架构建

在有了上述基础后,我们来搭建thread cache的框架,其实就是一个哈希桶,每个桶中挂接着自由链表对象。

_declspec(thread)是一个Windows平台专用的关键字,用于声明线程局部存储(TLS)变量。在这里,它声明了一个指向ThreadCache对象的指针变量pTLSThreadCache,该变量的值对于每个线程来说都是独立的,可以使线程在向thread cache申请内存对象的时候实现无锁访问。

class ThreadCache
{
public:
// 申请和释放内存对象
void* Allocate(size_t size);
void Deallocate(void* ptr, size_t size);

// 从中心缓存获取对象
void* FetchFromCentralCache(size_t index, size_t size);

// 释放内存时,如果自由链表过长,回收内存到CentralCache中心缓存
void ListTooLong(FreeList& list, size_t size);
private:
// 哈希桶,每个桶中挂接着自由链表对象
FreeList _freeLists[NFREELIST];
};

// pTLSThreadCache是一个指向ThreadCache对象的指针,每个线程都有一个独立的pTLSThreadCache
// 可以使线程在向thread cache申请内存对象的时候实现无锁访问
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

thread cache核心实现

1.FetchFromCentralCache(size_t index, size_t size)

从中央缓存(CentralCache)获取内存块。接受两个参数:ThreadCache自由链表对应的桶索引和想获取的内存块大小。

void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
// 慢开始反馈调节算法
// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
// 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限
// 3、size越大,一次向central cache要的batchNum就越小
// 4、size越小,一次向central cache要的batchNum就越大
size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
if (_freeLists[index].MaxSize() == batchNum)
{
_freeLists[index].MaxSize() += 1;
}

void* start = nullptr;
void* end = nullptr;
size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);

// 至少要获得一块
assert(actualNum > 0);

if (actualNum == 1)// 只有一个内存块
{
assert(start == end);
return start;
}
else// 除了起始地址外的其他内存块插入当前线程的缓存的自由链表中
{
_freeLists[index].PushRange(NextObj(start), end, actualNum - 1);
return start;
}
}

2.Allocate(size_t size)

线程内分配内存

void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
// 计算出内存块的对齐大小 alignSize 和内存块所在的自由链表的下标 index
size_t alignSize = SizeClass::RoundUp(size);
size_t index = SizeClass::Index(size);

// _freeLists[index] 如果不为空,就从 _freeLists[index] 中弹出一个内存块并返回。
if (!_freeLists[index].Empty())
{
return _freeLists[index].Pop();
}
// 如果为空,就调用 FetchFromCentralCache 函数从中央缓存获取内存块;
else
{
FetchFromCentralCache(index, alignSize);
}
}

3.Deallocate(void* ptr, size_t size)

线程内回收内存,传入内存块的指针: ptr 和内存块的大小: size

void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);

// 计算出映射的自由链表桶index,并将 ptr 插入到 _freeLists[index] 中
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);

// 当链表长度大于一次批量申请的内存时,就开始还一段list给CentralCache
if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
{
ListTooLong(_freeLists[index], size);
}
}

4.ListTooLong(FreeList& list, size_t size)

处理线程内过长自由链表,还一部分给中心缓存的span

void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
void* start = nullptr;
void* end = nullptr;
// MaxSize就是归还的list的长度,自由链表结点个数
list.PopRange(start, end, list.MaxSize());

CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}

2.central cache框架构建及核心实现

central cache也是一个哈希表结构,其映射关系与thread cache是一样的,不同的是central cache的哈希桶位置所挂接的是SpanList链表结构,不过每个桶下的span对象被切分成了一块块小内存挂接在span对象的自由链表freeList中。

图片

图稍微有点不对,sapn链是带头双向循环链表,最后不该指向NULL,应该指向头。

申请与释放内存规则

申请内存

1.当thread cache中没有内存后,就会向central cache中申请大块内存。这里的申请过程采用的是类似网络tcp协议拥塞控制的慢开始算法,而central cache中哈希映射的spanlist下挂着的span则向thread cache提供大块内存,而从span中取出对象给thread cache是需要加锁的,这里为了保证效率,提供的是桶锁。

慢开始算法

线程缓存从中央缓存获取内存块的数量是按照“慢开始反馈调节算法”递增的:

1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完

2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限

3、size越大,一次向central cache要的batchNum就越小

4、size越小,一次向central cache要的batchNum就越大

// 预计获取的批次数量
size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
if (_freeLists[index].MaxSize() == batchNum)
_freeLists[index].MaxSize() += 1;

举个例子,线程申请7块大小相同的内存,第一次申请的批次数量为1块,第二次再来申请时,此时thread cache的自由链表下已经没有空闲的内存了,则又需要继续向central cache申请内存,申请的批次数量为2块,第3次直接从thread cache的自由链表中获取内存块;第4次再申请时,则需要向central cache申请内存,此时申请的批次数量为3块,挂接在thread cache的自由链表下,直到第7次来申请内存时,向central cache申请的内存批次数量为4,这时线程取走一块内存,则挂接在thread cache的自由链表下还有3块空闲的内存。

2.当central cache中映射的spanlist下所挂接的所有span对象都没有内存后,则需要向page cache申请一块新的span对象,central cache拿到这块span对象后会按照所管理内存的大小将span对象划分成多块,再挂接到central cache的审判list中;然后再从这块新申请的span对象中去内存分配给thread cache。

3.在这里,为了方便后续的回收,span对象每分配出一块内存,其成员变量_useCount就++;相反thread cache每释放归还一块内存后,_useCount就–。

释放内存

当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时–_useCount。当_useCount变为0后,说明所有分配出去的内存都归还回来了,那么就将这个span对象归还给page cache,page cache会对归还回来的span进行前后页合并。

管理多个大块内存的跨度结构Span及SpanList定义

在上面我们知道central cache的哈希桶下挂接着的是跨度结构Span对象,其实Span对象是管理以页为单位的大块内存的结构体。而为了方便后续回收span对象到page cache,需要将任意位置span对象从spanlist中删除,那么将spanlist定义为一个双向链表更好一些。

由此,span及spanlist的定义如下:

// 管理多个连续页大块内存跨度结构
struct Span
{
PAGE_ID _pageId = 0; // 大块内存起始页的页号
size_t _n = 0; // 页的数量

Span* _next = nullptr; // 指向下一个内存块的指针
Span* _prev = nullptr; // 指向上一个内存块的指针

size_t _objSize = 0; // 切好的小对象大小
size_t _useCount = 0; // 已分配给ThreadCache的小块内存的数量
void* _freeList = nullptr; // 已分配给ThreadCache的小块内存的自由链表

bool _isUse = false; // 标记当前span内存跨度是否在被使用
};

// 带头双向循环链表
class SpanList
{
public:
// 构造函数,创建带头双向循环链表
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}

Span* Begin()
{
return _head->_next;
}

Span* End()
{
return _head;
}

bool Empty()
{
return _head->_next == _head;
}

// 头插
void PushFront(Span* span)
{
Insert(Begin(), span);
}

// 头删,并返回删除的结点指针
Span* PopFront()
{
Span* front = _head->_next;
Erase(front);
return front;
}

// 在链表的指定位置插入新的内存块
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);

Span* prev = pos->_prev;
prev->_next = newSpan;
newSpan->_next = pos;
newSpan->_prev = prev;
pos->_prev = newSpan;
}

// 从链表中删除指定的内存块
void Erase(Span* pos)
{
assert(pos);
// 不能指向链表的头,这是带头双向循环链表,头结点的意义就如同“刷题”里的哑结点,是虚拟的,只是为了操作方便。
assert(pos != _head);

Span* prev = pos->_prev;
Span* next = pos->_next;

prev->_next = next;
next->_prev = prev;
}
private:
Span* _head;// 链表的头指针
public:
std::mutex _mtx;// 桶锁
};

central cache框架构建

在明确了span与spanlist的定义描述后,也不能设计出central cache的框架结构,central cache是一个哈希表结构,其映射的是spanlist与哈希桶位置(内存大小)的关系。

其次,在这里我们将central cache设计为饿汉式单例模式,类的唯一实例在程序启动时就已经被创建出来,并且在整个程序的生命周期内都只有这一个实例。饿汉式优点是线程安全,因为实例在程序启动时就已经被创建,在整个程序的生命周期内都只有这一个实例,不会存在多线程竞争的情况。

class CentralCache
{
public:
// 单例模式的定义,作用:获取唯一实例的静态方法
static CentralCache* GetInstance()
{
// &_sInst 返回 _sInst 对象的地址,因为 _sInst 是一个静态变量
// 所以它的地址是固定的,其他代码也可以通过该地址访问 _sInst 对象
return &_sInst;
}

// 获取一个非空的span
Span* GetOneSpan(SpanList& list, size_t byte_size);

// 从中心缓存获取一定数量的对象给ThreadCache线程缓存
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);

// 将一定数量的对象释放到中心缓存的span跨度
void ReleaseListToSpans(void* start, size_t byte_size);
private:
SpanList _spanLists[NFREELIST];
private:
// 构造函数和一个拷贝构造函数私有化
CentralCache()
{}

CentralCache(const CentralCache&) = delete;

// 定义一个静态的变量 _sInst,该变量保存着 CentralCache 类的唯一实例
static CentralCache _sInst;
};

central cache核心实现

1.GetOneSpan(SpanList& list, size_t size)

从中心缓存获取一个空闲的Span对象,如果当前中心缓存的对应大小类别的桶中没有空闲的Span对象,则会从页缓存中获取一个新的Span对象并将其添加到中心缓存的桶中。

Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
// 查看当前的spanlist中是否有还有未分配对象的span
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList != nullptr)
{
return it;
}
else
{
it = it->_next;
}
}

// 先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞
list._mtx.unlock();

// 走到这里说没有空闲span了,只能找page cache要
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_isUse = true;
span->_objSize = size;
PageCache::GetInstance()->_pageMtx.unlock();

// 对获取span进行切分,不需要加锁,因为这时候这个span是当前进程单例创建的,其他线程访问不到这个span

// 计算span的大块内存的起始地址和大块内存的大小(字节数)

char* start = (char*)(span->_pageId << PAGE_SHIFT);
size_t bytes = span->_n << PAGE_SHIFT;
char* end = start + bytes;

// 把大块内存切成自由链表链接起来
// 先切一块下来去做头,方便尾插
span->_freeList = start;
start += size;
void* tail = span->_freeList;
int i = 1;
while (start < end)
{
i++;
NextObj(tail) = start;
tail = NextObj(tail);
start += size;
}

NextObj(tail) = nullptr; // 记得置空

// 切好span以后,需要把span挂到中心缓存对应的哈希桶里面去的时候,再加锁
list._mtx.lock();
list.PushFront(span);

return span;
}

2.FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)

从中心缓存获取一定数量的对象给thread cache

值得注意void *& start 和 void *& end 都是传址的形式传入的参数,也就是所谓的输入输出型参数

void *& start:输出参数,返回获取到的内存块的起始地址。

void *& end:输出参数,返回获取到的内存块的结束地址。

size_t batchNum:输入参数,指定从中心缓存获取的内存块的数量。

size_t size:输入参数,指定要获取的内存块的大小

size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
// 中央缓存CentralCache哈希桶的映射规则和线程缓存ThreadCache哈希桶映射规则一样
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();

Span* span = GetOneSpan(_spanLists[index], size);
assert(span);// 检查获取的span是否为空
assert(span->_freeList);// 检查获取的span的自由链表是否为空

// 从span中获取batchNum个对象
// 如果不够batchNum个,有多少拿多少
start = span->_freeList;
end = start;
size_t i = 0;
size_t actualNum = 1;
while (i < batchNum - 1 && NextObj(end) != nullptr)
{
end = NextObj(end);
i++;
actualNum++;
}
span->_freeList = NextObj(end);// span的[start, end]被取走了
NextObj(end) = nullptr;// 置空
span->_useCount += actualNum;

// 调试:条件断点
int j = 0;
void* cur = start;
while (cur)
{
cur = NextObj(cur);
++j;
}

if (j != actualNum)
{
int x = 0;
}


_spanLists[index]._mtx.unlock();

return actualNum;
}

3.ReleaseListToSpans(void* start, size_t size)

将一段线程缓存的自由链表还给中心缓存的span。

void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();
while (start)
{
void* next = NextObj(start);

// 把start开头的这一串自由链表内存还给他属于的span,一次循环还一个,一直还
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
NextObj(start) = span->_freeList;
span->_freeList = start;
span->_useCount--;

// 说明span的切分出去的所有小块内存都回来了,那就清理一下span,然后把完整的span交给page
// 这个span就可以再回收给page cache,pagecache可以再尝试去做前后页的合并
if (span->_useCount == 0)
{
_spanLists[index].Erase(span);
span->_freeList = nullptr;
span->_next = nullptr;
span->_prev = nullptr;

// 释放span给page cache时,span已经从_spanLists[index]删除了,不需要再加桶锁了
// 这时把桶锁解掉,使用page cache的锁就可以了,方便其他线程申请/释放内存
_spanLists[index]._mtx.unlock();

PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();

_spanLists[index]._mtx.lock();
}

start = next;
}

_spanLists[index]._mtx.unlock();
}

3.page cache框架构建及核心实现

page cache与前两级缓存略有不同,其映射关系不再是哈希桶位置与自由链表或spanlist的映射,而是页号与spanlist的映射,这里我们设计的是128页的page cache。

图片

申请与释放内存

申请内存

1.当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是1页page,1页page后面没有挂span,则向后面寻找更大的span,假设在100页page位置找到一个span,则将100页page的span分裂为一个1页page span和一个99页page span。

2.如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请128页page span挂在自由链表中,再重复1中的过程。

3.需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。

释放内存

如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。

直接向堆申请或释放以页为单位的大块内存

这里我们为了避免使用malloc及free函数接口去向堆申请和释放内存,因此使用系统调用接口直接向堆申请和释放内存。

这里的系统调用接口在window下为VirtualAlloc与VirtualFree系统调用接口;在Linux系统下为mmap与munmap,brk与sbrk两对系统调用接口。

inline static void* SystemAlloc(size_t kPage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kPage << PAGE_SHIFT, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
//Linux下brk mmap等
#endif // _WIN32

//抛异常
if (ptr == nullptr)
throw std::bad_alloc();

return ptr;
}

Span跨度结构以页为单位管理从堆申请的内存

我们向堆申请内存后会返回这块内存的起始地址,那么我们将这个地址看作一个无符号整型,将其除以8*1024作为Span结构的_pageId,再将申请内存时用的页号赋给 _n,这里为了方便后续回收分配出去的Span跨度结构,我们使用STL的unordered_map来构建 _pageId与Span对象的映射关系。

page cache框架构建

与central cache类似的是,page cache也是单例模式;不过page cache加的不是桶锁,而是整级加的一把大锁,即每次central cache向page cache申请内存时,page cache都要加锁防止出现安全问题。

class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}

// 获取从对象到span的映射
Span* MapObjectToSpan(void* obj);

// 释放空闲span回到Pagecache,并合并相邻的span
void ReleaseSpanToPageCache(Span* span);

// 获取一个k页的span
Span* NewSpan(size_t k);

std::mutex _pageMtx;
private:
SpanList _spanLists[NPAGES];// PageCache自己的双链表哈希桶,映射方式是按照页数直接映射
ObjectPool _spanPool;

// std::unordered_map _idSpanMap;
TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;

PageCache()
{}
PageCache(const PageCache&) = delete;

static PageCache _sInst;
};,>

page cache核心实现

1.NewSpan(size_t k)

获取一个K页的span

首先会检查第k个桶里面是否有span,如果有就直接返回;如果没有,则检查后面的桶里面是否有更大的span,如果有就可以将它进行切分,切出一个k页的span返回,剩下的页数的span放到对应的桶里;如果后面的桶里也没有span,就去系统堆申请一个大小为128页的span,并把它放到对应的桶里。然后再递归调用自己,直到获取到一个k页的span为止。

Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);

// 大于128 page的直接向堆申请,这里的128页相当于128*8*1024 = 1M
if (k > NPAGES - 1)
{
void* ptr = SystemAlloc(k);
//Span* span = new Span;
Span* span = _spanPool.New();

span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;// 页号:地址右移PAGE_SHIFT获得
span->_n = k; // 页数

// _idSpanMap[span->_pageId] = span;
_idSpanMap.set(span->_pageId, span);

return span;
}

// 先检查第k个桶里面有没有span
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();

// 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
// _idSpanMap[kSpan->_pageId + i] = kSpan;
_idSpanMap.set(kSpan->_pageId + i, kSpan);
}

return kSpan;
}

// 检查一下后面的桶里面有没有span,如果有可以把他它进行切分
for (size_t i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
// Span* kSpan = new Span;
Span* kSpan = _spanPool.New();

// 在nSpan的头部切一个k页下来
// k页span返回
// nSpan再挂到对应映射的位置
kSpan->_pageId = nSpan->_pageId;// 标记起始页
kSpan->_n = k;// 标记页数

nSpan->_pageId += k;
nSpan->_n -= k;

_spanLists[nSpan->_n].PushFront(nSpan); // 被切分掉的另一块放入对应哈希桶

// 存储nSpan的首尾页号跟nSpan映射,方便page cache回收内存时进行的合并查找
// 因为没被中心缓存拿走,所以只标记了首尾就够了
// _idSpanMap[nSpan->_pageId] = nSpan;
// _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
_idSpanMap.set(nSpan->_pageId, nSpan);
_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);

// 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
// _idSpanMap[kSpan->_pageId + i] = kSpan;
_idSpanMap.set(kSpan->_pageId + i, kSpan);
}

return kSpan;
}
}

// 走到这个位置就说明后面没有大页的span了
// 这时就去找堆要一个128页的span
Span* bigSpan = new Span;
void* ptr = SystemAlloc(NPAGES - 1);

// 通过将 ptr 地址强制转换为 PAGE_ID 类型,再使用二进制位运算符 >> 将指针的地址右移 PAGE_SHIFT 位
// 最终得到的结果就是这个指针所在的“页的编号”
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;

_spanLists[bigSpan->_n].PushFront(bigSpan);

return NewSpan(k);// 递归调用自己,这一次一定能成功!
}

2.MapObjectToSpan(void* obj)

建立内存地址和span的映射。前期映射方式是哈希或者红黑树,后期性能优化成基数树。

Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;

/* std::unique_lock lock(_pageMtx);// 可以自动解锁
auto ret = _idSpanMap.find(id);
if (ret != _idSpanMap.end())
{
return ret->second;
}
else
{
assert(false);
return nullptr;
} */

// 基数树优化后不需要加锁了
auto ret = (Span*)_idSpanMap.get(id);
assert(ret != nullptr);
return ret;
}

3.ReleaseSpanToPageCache(Span* span)

缓解外碎片问题,对span前后的页,尝试进行合并,缓解内存碎片问题

void PageCache::ReleaseSpanToPageCache(Span* span)
{
// 大于128 page的直接还给堆,这里的128页相当于128*8*1024 = 1M
if (span->_n > NPAGES - 1)
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
//delete span;
SystemFree(ptr); // span结构释放,内存还给堆,类似free
_spanPool.Delete(span);// 放入定长内存池的自由链表,以便下次申请

return;
}

// 向前合并
while (1)
{
PAGE_ID prevId = span->_pageId - 1;
/*auto ret = _idSpanMap.find(prevId);
if (ret == _idSpanMap.end())
{
break;
}*/

auto ret = (Span*)_idSpanMap.get(prevId);
if (ret == nullptr)
{
break;
}

// 前面相邻页的span在使用,不合并了
// Span* prevSpan = ret->second;
Span* prevSpan = ret;
if (prevSpan->_isUse == true)
{
break;
}

// 合并出超过128页的span没办法管理,不合并了
if (prevSpan->_n + span->_n > NPAGES - 1)
{
break;
}

span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;

_spanLists[prevSpan->_n].Erase(prevSpan);// 将prevSpan从页缓存对应的哈希桶的链表中删掉
// delete prevSpan;// 为什么delete?
_spanPool.Delete(prevSpan);
}

// 向后合并
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n;
/*auto ret = _idSpanMap.find(nextId);
if (ret == _idSpanMap.end())
{
break;
}*/

auto ret = (Span*)_idSpanMap.get(nextId);
if (ret == nullptr)
{
break;
}

// Span* nextSpan = ret->second;
Span* nextSpan = ret;
if (nextSpan->_isUse == true)
{
break;
}

if (nextSpan->_n + span->_n > NPAGES - 1)
{
break;
}

span->_n += nextSpan->_n;

_spanLists[nextSpan->_n].Erase(nextSpan);
// delete nextSpan;
_spanPool.Delete(nextSpan);
}

_spanLists[span->_n].PushFront(span);// 将合并完的span挂到页缓存的对应的哈希桶里面。
span->_isUse = false;

//_idSpanMap[span->_pageId] = span;// 首尾存起来,方便被合并
//_idSpanMap[span->_pageId + span->_n - 1] = span;

_idSpanMap.set(span->_pageId, span);
_idSpanMap.set(span->_pageId + span->_n - 1, span);
}

上述代码delete的作用:这里的delete操作是用来释放prevSpan和nextSpan这两个Span结构体的内存的。这两个Span结构体可能是之前由PageCache单例创建的,也可能是之前从中心缓存移动过来的。无论是哪一种情况,它们都不再被使用了,因为已经被合并到了当前的span中。所以可以直接释放掉它们的内存。

这里的delete操作并不会影响prevSpan和nextSpan管理的内存。这些内存依然存在,只是没有了管理它们的Span结构体。在进行合并的时候,这些内存就被合并到了当前的span中,当前的span继续管理这些内存。因此,这里的delete操作仅仅是释放了prevSpan和nextSpan这两个Span结构体的内存,这个span管理的内存并不受影响。

delete释放掉span结构体本身,不会同时释放掉它管理的内存。举个例子,假如你有一个对象A,它管理了一个数组arr,那么你调用delete A时,只会释放掉A对象本身占用的内存,而arr数组的内存依然存在。

细节与性能优化

使用定长内存池配合脱离使用new

我们定义一个Span结构体时是new一个对象,但new的本质是malloc,而本项目是不依赖malloc的,因此我们可以运用我们自己实现的定长内存池来脱离new的使用。

对于Page Cache,由于要多次定义Span结构,因此我们定义一个特化Span对象的定长内存池:

//定义定长的span内存池以脱离使用new
ObjectPool _spanPool;

而对于Thread Cache,由于要保证对于线程而言,全局只有唯一一个Thread Cache对象,故在头文件内定义为静态变量的定长内存池:

//静态成员,保证全局只有一个对象
static ObjectPool tcPool;
//pTLSThreadCache = new ThreadCache;
pTLSThreadCache = tcPool.New();

解决内存大于256kb的申请释放问题

图片

1.ConcurrentAlloc() 时,对于线程申请大于256kb内存的情况, 直接向页缓存申请即可:

if (size > MAX_BYTES) // 大于256kb的超大内存
{
size_t alignSize = SizeClass::RoundUp(size);// size对齐
size_t kPage = alignSize >> PAGE_SHIFT;// 获取页数

PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(kPage);// 找页缓存要kpage页span
span->_objSize = size;// 会有一点内碎片,标记成alignSize也行
PageCache::GetInstance()->_pageMtx.unlock();

void* ptr = (void*)(span->_pageId << PAGE_SHIFT);// 获取对应地址
return ptr;
}

2.当然了页缓存的NewSpan()正常分配内存的能力也有上限,大于128 page的选择直接向堆申请,这里的128页相当于128 * 8KB = 1M。

if (k > NPAGES - 1)
{
void* ptr = SystemAlloc(k);
//Span* span = new Span;
Span* span = _spanPool.New();

span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;// 页号:地址右移PAGE_SHIFT获得
span->_n = k; // 页数

// _idSpanMap[span->_pageId] = span;
_idSpanMap.set(span->_pageId, span);

return span;
}

3.同样的,ConcurrentFree()时,大于256kb的内存的释放就直接释放给页缓存即可:

if (size > MAX_BYTE)
{
//找到ptr对应的那块span
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->RealeaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
}

4.ReleaseSpanToPageCache(Span* span)合并页时,若释放的span大于128页,即span的页数大于NPAGES - 1,则直接将内存释放到堆。

if (span->_n > NPAGES - 1)
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
//delete span;
_spanPool.Delete(span);
return;
}

使用基数树进行性能优化

如果我们在Page Cache中使用STL的unordered_map容器来构建_pageId与span的映射关系,那么通过测试发现,当前项目的运行效率是要满于malloc的。

图片

接下来分析下项目的性能瓶颈:

图片

图片

图片

图片

图片

分析得到项目在unordered_map _idSpanMap;中的锁竞争上浪费了大量性能,这主要是unordered_map是线程不安全的,因此多线程下使用时需要加锁,而大量加锁导致资源的消耗。

因此,这里参考tcmalloc,使用基数树来进行性能的优化。tcmalloc设计了三种基数树,即一层、二层与三层的基数树,其中一层和二层的基数树是适配32位系统下的,而三层基数树则是适配64位系统。

这里简单介绍以下一层和二层基数树,三层基数树类似于二层:

32位系统下,一个页大小213,进程地址空间大小232,所以一共有219个页,所以一层基数树需要开辟219个元素的数组,每个位置存一个指针,需要的内存是4*2^19 = 2M。

图片

32位系统下,两层基数树的结构是第一层一个25个元素,第二层每个结点又有214个元素,这样也就构成了2^19个的数量。这样的话拿到一个页号,(这个页号二进制下有32位,忽略高13位)这个页号高13位之后的高5位决定了他在第一层的哪个位置,这个页号高13位之后的高6位~高19位决定了他在第二层的哪个位置。

多层相较于1层还有个好处,多层不需要一次性开辟所有空间,可以到具体需要时再开辟空间。

图片

基数树相较于哈希桶的优势在于如果要写入_pageId和span的映射关系的话,并不会像哈希桶可能有结构上的改变(红黑树翻转、哈希桶扩容等)(一个线程在读的时候,另一个线程在写),而是一旦基数树构建好映射关系后,就不会改变其结构,之后只会有读的操作,因此多线程环境下无需加锁,从而减少了资源的消耗,优化了性能。

图片

只有NewSpan和ReleaseSpanToPageCache的时候,会去建立id和 span的映射,进行所谓的“写”操作,但是这俩都加了锁,绝对安全。事实上,即便不加锁也没事,因为我们不可能在同一个位置进行写,不可能同时创建一个span和释放一个span。且基数树写之前已经开好空间了,“写”的过程不会改变基数树的结构。

采用基数树不需要加锁的原因:

  • 因为往基数树建立映射的时候span没有在central cache不会给外层使用,并且建立好一次映射关系,后续不需要再建立了,后续都是读了。读写分离了。
//单层基数树
template
class TCMalloc_PageMap1
{
private:
static const int LENGTH = 1 << BITS;// 32 - 13 = 19
void** _array;

public:
typedef uintptr_t Number;//存储指针的一个无符号整型类型
explicit TCMalloc_PageMap1()//一次将数组所需空间开好
{
//计算数组开辟空间所需的大小
size_t size = sizeof(void*) << BITS;
size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
//由于要开辟的空间是2M,已经很大了,故直接想堆申请
_array = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
memset(_array, 0, size);
}
void Set(Number key, void* v)//key的范围是[0, 2^BITS - 1],_pageId
{
_array[key] = v;
}
void* Get(Number key) const
{
if ((key >> BITS) > 0)//若key超出范围或还未被设置,则返回空
{
return nullptr;
}
return _array[key];
}
};

// Two-level radix tree
template
class TCMalloc_PageMap2 {
private:
// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
static const int ROOT_BITS = 5;
static const int ROOT_LENGTH = 1 << ROOT_BITS;

static const int LEAF_BITS = BITS - ROOT_BITS;// 19 - 5 = 14
static const int LEAF_LENGTH = 1 << LEAF_BITS;// 1左移14位

// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};

Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes
void* (*allocator_)(size_t); // Memory allocator

public:
typedef uintptr_t Number;

//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap2() {
//allocator_ = allocator;
memset(root_, 0, sizeof(root_));

PreallocateMoreMemory();
}

void* get(Number k) const {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);// 获取k低14位
if ((k >> BITS) > 0 || root_[i1] == NULL) {
return NULL;
}
return root_[i1]->values[i2];
}

void set(Number k, void* v) {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
ASSERT(i1 < ROOT_LENGTH);
root_[i1]->values[i2] = v;
}

// 确保从start页开始,往后n页的基数树位置都给你开好
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> LEAF_BITS;

// Check for overflow
if (i1 >= ROOT_LENGTH)
return false;

// Make 2nd level node if necessary
if (root_[i1] == NULL) {
//Leaf* leaf = reinterpret_cast((*allocator_)(sizeof(Leaf)));
//if (leaf == NULL) return false;
static ObjectPool leafPool;
Leaf* leaf = (Leaf*)leafPool.New();

memset(leaf, 0, sizeof(*leaf));
root_[i1] = leaf;
}

// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}

void PreallocateMoreMemory() {
// Allocate enough to keep track of all possible pages
Ensure(0, 1 << BITS);
}
};

// Three-level radix tree
template
class TCMalloc_PageMap3 {
private:
// How many bits should we consume at each interior level
static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;

// How many bits should we consume at leaf level
static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;

// Interior node
struct Node {
Node* ptrs[INTERIOR_LENGTH];
};

// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};

Node* root_; // Root of radix tree
void* (*allocator_)(size_t); // Memory allocator

Node* NewNode() {
Node* result = reinterpret_cast((*allocator_)(sizeof(Node)));
if (result != NULL) {
memset(result, 0, sizeof(*result));
}
return result;
}

public:
typedef uintptr_t Number;

explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
allocator_ = allocator;
root_ = NewNode();
}

void* get(Number k) const {
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 ||
root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
return NULL;
}
return reinterpret_cast(root_->ptrs[i1]->ptrs[i2])->values[i3];
}

void set(Number k, void* v) {
ASSERT(k >> BITS == 0);
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
reinterpret_cast(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
}

bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);

// Check for overflow
if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
return false;

// Make 2nd level node if necessary
if (root_->ptrs[i1] == NULL) {
Node* n = NewNode();
if (n == NULL) return false;
root_->ptrs[i1] = n;
}

// Make leaf node if necessary
if (root_->ptrs[i1]->ptrs[i2] == NULL) {
Leaf* leaf = reinterpret_cast((*allocator_)(sizeof(Leaf)));
if (leaf == NULL) return false;
memset(leaf, 0, sizeof(*leaf));
root_->ptrs[i1]->ptrs[i2] = reinterpret_cast(leaf);
}

// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}

void PreallocateMoreMemory() {
}
};*>*>*>*>*>*>

项目总结

结果演示

可以看到通过基数树优化后的高并发内存池在性能上是要优于malloc函数的。

图片

项目对比malloc性能高的原因

malloc底层是采用边界标记法将内存划分成很多块,从而对内存的分配与回收进行管理。简单来说,malloc分配内存时会先获取分配区的锁,然后根据申请内存的大小一级一级的去获取内存空间,最后返回。

所以在高并发的场景下,malloc在申请内存时需要加锁,以避免多个线程同时修改内存分配信息,这会导致性能下降。而内存池可以通过维护自由链表来分配内存,避免了加锁的开销。

总结出本项目效率相对较高的3点原因:

  • 1.第一级thread cache通过tls技术实现了无锁访问。
  • 2.第二级central cache加的是桶锁,可以更好的实现多线程的并行。
  • 3.第三级page cache通过基数树优化,有效减少了锁的竞争。

项目扩展及缺陷

1.实际上在释放内存时由thread cache将自由链表对象归还给central cache只使用了链表过长这一个条件,但是实际中这个条件大概率不能恰好达成,那么就会出现thread cache中自由链表挂着许多未被使用的内存块,从而出现了线程结束时可能导致内存泄露的问题。

解决方法就是使用动态tls或者通过回调函数来回收这部分的内存,并且通过申请批次统计内存占有量,保证线程不会过多占有资源。

2.可以将这个项目打成静态库或动态库替换调用系统调用malloc,不同平台替换方式不同。 基于unix的系统上的glibc,使用了weak alias的方式替换。具体来说是因为这些入口函数都被定义成了weak symbols,再加上gcc支持 alias attribute,所以替换就变成了这种通用形式:

void* malloc(size_t size) THROW attribute__ ((alias (tc_malloc)))

因此所有malloc的调用都跳转到了tc_malloc的实现。有些平台不支持这样的东西,需要使用hook的钩子技术来做。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 内存
    +关注

    关注

    8

    文章

    2767

    浏览量

    72774
  • 操作系统
    +关注

    关注

    37

    文章

    6288

    浏览量

    121896
  • 多线程
    +关注

    关注

    0

    文章

    271

    浏览量

    19726
收藏 人收藏

    评论

    相关推荐

    在DragonBoard 410c上实现并发处理TCP服务器

    ) testServer.serve_forever()到这里我们就完成了整个测试服务器的搭建,该服务器能够借助于gevent实现并发的处理,并且支持异常处理,可以在dragonbaord 410c上稳定运行,这里testBu
    发表于 09-25 15:53

    从服务端视角看并发难题

    能同时开启的进程数、能同时运行的线程数、网络连接数、cpu、I/O、内存等等,由于服务端资源是有限的,那么服务端能同时处理的请求也是有限的。并发问题的本质就是:资源的有限性
    发表于 11-02 15:11

    请问战舰LWIP移植是怎么实现内存管理的?

    如题,最近在移植LWIP,参考原子战舰V3,由于我的系统没实现内存管理,因此,涉及到malloc的函数我全部使用全局数据区来开辟空间(暂时先这么粗略地实现),但对内存
    发表于 09-02 04:36

    【每日一练】第十六节:内存的使用

    本视频为【每日一练】的第16节学习视频,注:刚开始学习的童鞋请从第一节视频开始打卡哦(本节视频在下面打卡即可)学习任务:1、删除内存时,会首先唤醒等待在该内存对象上的所有线程。(判
    发表于 09-08 09:33

    内存可以调节内存的大小吗

    嵌入式–内存直接上代码,自己体会。嵌入式设备,一般keil提供的堆很小,一般都不使用。使用内存,自己可以调节内存大小。头文件 mallo
    发表于 12-17 07:00

    内存的概念和实现原理概述

    { //一:内存的概念和实现原理概述//malloc:内存浪费,频繁分配小块内存,则浪费更加显得明显//“
    发表于 12-17 06:44

    如何去实现一种基于SpringMVC的电商并发秒杀系统设计

    参考博客Java并发秒杀系统API目录业务场景要解决的问题Redis的使用业务场景首页倒计时秒杀活动,抢购商品要解决的问题并发下库存的控制分布式系统事务处理机制(分布式锁)系统设计
    发表于 01-03 07:50

    线程是如何实现

    线程的概念是什么?线程是如何实现的?
    发表于 02-28 06:20

    关于RT-Thread内存管理的内存简析

    这篇文章继续介绍 RT-Thread 内存管理剩下的部分——内存。为何引入内存内存堆虽然方
    发表于 04-06 17:02

    RT-Thread操作系统中静态内存的创建与使用

    程序运行,创建一个内存,一个申请内存任务,一个释放内存任务,u***串口CN3打印内存分配和释放的信息,串口波特率115200//创建
    发表于 05-10 14:51

    ATC'22顶会论文RunD:高密并发的轻量级 Serverless 安全容器运行时 | 龙蜥技术

    时可能导致 kernel panic 的问题。使用模版技术不仅可以降低单个实力的内存占用实现高密部署,还非常适合应用于安全容器的并发场景。因为它可以在某个特定时刻对一个已经启动的安
    发表于 09-05 15:18

    RT-Thread内存管理之内存实现分析

    了解RT-thread 的内存实现及管理。以RTT最新稳定版本4.1.0的内核为蓝本。\\include\\rtdef.h/**Base structure of Memory pool
    发表于 10-17 15:06

    删除静态内存是用rt_mp_detach还是rt_mp_delete

    可否动态申请一块内存作为静态内存,然后再在这块静态内存进行相关的静态内存操作?删除静态
    发表于 11-22 14:42

    移动应用高级语言开发——并发探索

    、精准内存屏障等手段可以实现性能优秀的多线程程序,但也存在一定的问题:线程和锁方案的优化依赖软件工程有良好的并发实践规范和资深并发程序开发者,且容易引发死锁,测试及维护的成本也很高。
    发表于 08-28 17:08

    Go并发模型的实现原理

    Go语言是为并发而生的语言,Go语言是为数不多的在语言层面实现并发的语言;也正是Go语言的并发特性,吸引了全球无数的开发者。
    的头像 发表于 04-15 08:49 1059次阅读