Skip to content

修复gcc4.8.5编译错误 #287

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions libgo/pool/async_coroutine_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ AsyncCoroutinePool * AsyncCoroutinePool::Create(size_t maxCallbackPoints)
{
return new AsyncCoroutinePool(maxCallbackPoints);
}
void AsyncCoroutinePool::InitCoroutinePool(size_t maxCoroutineCount)
void AsyncCoroutinePool::InitCoroutinePool(size_t maxCoroutineCount, size_t stackSize)
{
maxCoroutineCount_ = maxCoroutineCount;
stackSize_ = stackSize;
}
void AsyncCoroutinePool::Start(int minThreadNumber, int maxThreadNumber)
{
Expand All @@ -21,40 +22,64 @@ void AsyncCoroutinePool::Start(int minThreadNumber, int maxThreadNumber)
maxCoroutineCount_ = (std::max)(minThreadNumber * 128, maxThreadNumber);
maxCoroutineCount_ = (std::min<size_t>)(maxCoroutineCount_, 10240);
}
for (size_t i = 0; i < maxCoroutineCount_; ++i) {
go co_scheduler(scheduler_) [this]{
this->Go();
};

if(stackSize_ > 0) {
for (size_t i = 0; i < maxCoroutineCount_; ++i) {
go_stack(stackSize_) co_scheduler(scheduler_) [this]{
this->Go();
};
}
} else {
for (size_t i = 0; i < maxCoroutineCount_; ++i) {
go co_scheduler(scheduler_) [this]{
this->Go();
};
}
}

}
void AsyncCoroutinePool::Go()
{
for (;;) {
PoolTask task;
tasks_ >> task;

taskRunningPoints++;

if (task.func_)
task.func_();

if (!task.cb_)
if (!task.cb_) {
taskRunningPoints--;
continue;
}

size_t pointsCount = pointsCount_;
if (!pointsCount) {
task.cb_();
taskRunningPoints--;
continue;
}

size_t idx = ++robin_ % pointsCount;
points_[idx]->Post(std::move(task.cb_));
points_[idx]->Notify();

taskRunningPoints--;
}
}
void AsyncCoroutinePool::Post(Func const& func, Func const& callback)
{
PoolTask task{func, callback};
tasks_ << std::move(task);
}

void AsyncCoroutinePool::Post(Func const& func)
{
PoolTask task{func, NULL};
tasks_ << std::move(task);
}

bool AsyncCoroutinePool::AddCallbackPoint(AsyncCoroutinePool::CallbackPoint * point)
{
size_t writeIdx = writePointsCount_++;
Expand All @@ -76,6 +101,11 @@ AsyncCoroutinePool::AsyncCoroutinePool(size_t maxCallbackPoints)
points_ = new CallbackPoint*[maxCallbackPoints_];
}

void AsyncCoroutinePool::WaitStop()
{
while (!tasks_.empty() || taskRunningPoints.load() != 0);
}

size_t AsyncCoroutinePool::CallbackPoint::Run(size_t maxTrigger)
{
size_t i = 0;
Expand Down
10 changes: 8 additions & 2 deletions libgo/pool/async_coroutine_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ class AsyncCoroutinePool
typedef std::function<void()> Func;

// 初始化协程数量
void InitCoroutinePool(size_t maxCoroutineCount);
void InitCoroutinePool(size_t maxCoroutineCount, size_t stackSize = 0);

// 启动协程池
void Start(int minThreadNumber, int maxThreadNumber = 0);
void Start(int minThreadNumber, int maxThreadNumber);

void Post(Func const& func, Func const& callback);

void Post(Func const& func);

void WaitStop();

template <typename R>
void Post(Channel<R> const& ret, std::function<R()> const& func) {
Post([=]{ ret << func(); }, NULL);
Expand Down Expand Up @@ -75,11 +79,13 @@ class AsyncCoroutinePool

private:
size_t maxCoroutineCount_;
size_t stackSize_;
std::atomic<int> coroutineCount_{0};
Scheduler* scheduler_;
Channel<PoolTask> tasks_;
std::atomic<size_t> pointsCount_{0};
std::atomic<size_t> writePointsCount_{0};
std::atomic<size_t> taskRunningPoints{0};
size_t maxCallbackPoints_;
std::atomic<size_t> robin_{0};
CallbackPoint ** points_;
Expand Down
4 changes: 2 additions & 2 deletions libgo/routine_sync/rutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ struct IntValue<IntValueType, true>
{
public:
inline std::atomic<IntValueType>* value() { return ptr_; }
inline void ref(std::atomic<IntValueType>* ptr) { ptr_ = ptr; }
inline void ref(std::atomic<IntValueType>* ptr) { ptr_ = {ptr}; }

protected:
std::atomic<IntValueType>* ptr_ {nullptr};
Expand All @@ -32,7 +32,7 @@ struct IntValue<IntValueType, false>
inline std::atomic<IntValueType>* value() { return &value_; }

protected:
std::atomic<IntValueType> value_ {0};
std::atomic<IntValueType> value_ = {0};
};

struct RutexBase
Expand Down