Browse Source

Add `SchedulerAPI`

pull/8/head
Anton Tarasenko 2 years ago
parent
commit
0b33d18927
  1. 22
      config/AcediaSystem.ini
  2. 44
      sources/BaseRealm/API/Scheduler/API/MockJob.uc
  3. 216
      sources/BaseRealm/API/Scheduler/API/TEST_SchedulerAPI.uc
  4. 421
      sources/BaseRealm/API/Scheduler/SchedulerAPI.uc
  5. 29
      sources/BaseRealm/API/Scheduler/SchedulerDiskRequest.uc
  6. 47
      sources/BaseRealm/API/Scheduler/SchedulerJob.uc
  7. 3
      sources/BaseRealm/Global.uc
  8. 1
      sources/ClientRealm/ClientLevelCore.uc
  9. 15
      sources/CoreRealm/CoreGlobal.uc
  10. 10
      sources/Data/Database/DBAPI.uc
  11. 49
      sources/Data/Database/Local/LocalDatabaseInstance.uc
  12. 13
      sources/Manifest.uc
  13. 1
      sources/ServerRealm/ServerLevelCore.uc

22
config/AcediaSystem.ini

@ -63,6 +63,28 @@ allowReplacingDamageTypes=true
; compatibility reasons), you should set this value at `BHIJ_Root`. ; compatibility reasons), you should set this value at `BHIJ_Root`.
broadcastHandlerInjectionLevel=BHIJ_Root broadcastHandlerInjectionLevel=BHIJ_Root
[AcediaCore.SchedulerAPI]
; How often can files be saved on disk. This is a relatively expensive
; operation and we don't want to write a lot of different files at once.
; But since we lack a way to exactly measure how much time that saving will
; take, AcediaCore falls back to simply performing every saving with same
; uniform time intervals in-between.
; This variable decides how much time there should be between two file
; writing accesses.
; Negative and zero values mean that all writing disk access will be
; granted as soon as possible, without any cooldowns.
diskSaveCooldown=0.25
; Maximum total work units for jobs allowed per tick. Jobs are expected to be
; constructed such that they don't lead to a crash if they have to perform
; this much work.
;
; Changing default value of `10000` is not advised.
maxWorkUnits=10000
; How many different jobs can be performed per tick. This limit is added so
; that `maxWorkUnits` won't be spread too thin if a lot of jobs get registered
; at once.
maxJobsPerTick=5
[AcediaCore.UserAPI] [AcediaCore.UserAPI]
userDataDBLink="local:database/users" userDataDBLink="local:database/users"

44
sources/BaseRealm/API/Scheduler/API/MockJob.uc

@ -0,0 +1,44 @@
/**
* Simple object that represents a job, capable of being scheduled on the
* `SchedulerAPI`. Use `IsCompleted()` to mark job as completed.
* Copyright 2022 Anton Tarasenko
*------------------------------------------------------------------------------
* This file is part of Acedia.
*
* Acedia is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3 of the License, or
* (at your option) any later version.
*
* Acedia is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Acedia. If not, see <https://www.gnu.org/licenses/>.
*/
class MockJob extends SchedulerJob;
var public string mark;
var public int unitsLeft;
// We use `default` value only
var public string callStack;
public function bool IsCompleted()
{
return (unitsLeft <= 0);
}
public function DoWork(int allottedWorkUnits)
{
unitsLeft -= allottedWorkUnits;
if (IsCompleted()) {
default.callStack = default.callStack $ mark;
}
}
defaultproperties
{
}

216
sources/BaseRealm/API/Scheduler/API/TEST_SchedulerAPI.uc

@ -0,0 +1,216 @@
/**
* Set of tests for Scheduler API.
* Copyright 2022 Anton Tarasenko
*------------------------------------------------------------------------------
* This file is part of Acedia.
*
* Acedia is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3 of the License, or
* (at your option) any later version.
*
* Acedia is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Acedia. If not, see <https://www.gnu.org/licenses/>.
*/
class TEST_SchedulerAPI extends TestCase
abstract;
var int diskUses;
protected static function UseDisk()
{
default.diskUses += 1;
}
protected static function MockJob MakeJob(string mark, int totalUnits)
{
local MockJob newJob;
newJob = MockJob(__().memory.Allocate(class'MockJob'));
newJob.mark = mark;
newJob.unitsLeft = totalUnits;
return newJob;
}
protected static function TESTS()
{
Test_MockJob();
}
protected static function Test_MockJob()
{
Context("Testing job scheduling.");
SubText_SimpleScheduling();
SubText_ManyScheduling();
SubText_DiskScheduling();
SubText_DiskSchedulingDeallocate();
SubText_JobDiskMix();
}
protected static function SubText_SimpleScheduling()
{
Issue("Simple scheduling doesn't process jobs in intended order");
class'MockJob'.default.callStack = "";
__().scheduler.ManualTick(); // Reset work units
__().scheduler.AddJob(MakeJob("A", 2400));
__().scheduler.AddJob(MakeJob("B", 3000));
__().scheduler.AddJob(MakeJob("C", 7600));
__().scheduler.AddJob(MakeJob("D", 1000));
__().scheduler.ManualTick(); // 10,000 units => -2,500 units per job
TEST_ExpectTrue(class'MockJob'.default.callStack == "AD");
TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 2);
__().scheduler.ManualTick(); // 10,000 units => -5,000 units per job
TEST_ExpectTrue(class'MockJob'.default.callStack == "ADB");
TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 1);
__().scheduler.ManualTick(); // 10,000 units => -5,000 units per job
TEST_ExpectTrue(class'MockJob'.default.callStack == "ADBC");
TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 0);
}
protected static function SubText_ManyScheduling()
{
Issue("After scheduling jobs over per-tick limit, scheduler doesn't process"
@ "jobs in intended order");
class'MockJob'.default.callStack = "";
__().scheduler.ManualTick(); // Reset work units
// 10,000 units => 2,000 units per job for 5 jobs
__().scheduler.AddJob(MakeJob("A", 3000));
__().scheduler.AddJob(MakeJob("B", 3000));
__().scheduler.AddJob(MakeJob("C", 3000));
__().scheduler.AddJob(MakeJob("D", 1000));
__().scheduler.AddJob(MakeJob("E", 3000));
__().scheduler.AddJob(MakeJob("F", 3000));
__().scheduler.AddJob(MakeJob("G", 1000));
__().scheduler.AddJob(MakeJob("H", 5000));
__().scheduler.AddJob(MakeJob("I", 1000));
__().scheduler.ManualTick();
// A:1000, B:1000, C:1000, D:0, E:1000, F:3000, G:1000, H:5000, I:1000
TEST_ExpectTrue(class'MockJob'.default.callStack == "D");
TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 8);
__().scheduler.ManualTick();
// A:0, B:1000, C:1000, D:0, E:1000, F:1000, G:0, H:3000, I:0
TEST_ExpectTrue(class'MockJob'.default.callStack == "DGIA");
TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 5);
__().scheduler.ManualTick();
// A:0, B:0, C:0, D:0, E:0, F:0, G:0, H:1000, I:0
TEST_ExpectTrue(class'MockJob'.default.callStack == "DGIABCEF");
TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 1);
__().scheduler.ManualTick();
// A:0, B:0, C:0, D:0, E:0, F:0, G:0, H:0, I:0
TEST_ExpectTrue(class'MockJob'.default.callStack == "DGIABCEFH");
TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 0);
}
protected static function SubText_DiskScheduling()
{
local Text objectInstance;
Issue("Disk scheduling doesn't happen at expected intervals.");
default.diskUses = 0;
__().scheduler.ManualTick(1.0); // Pre-fill cooldown, just in case
objectInstance = __().text.FromString("whatever");
__().scheduler.RequestDiskAccess(objectInstance).connect = UseDisk;
__().scheduler.RequestDiskAccess(objectInstance).connect = UseDisk;
__().scheduler.RequestDiskAccess(objectInstance).connect = UseDisk;
__().scheduler.RequestDiskAccess(objectInstance).connect = UseDisk;
__().scheduler.ManualTick(0.001);
TEST_ExpectTrue(default.diskUses == 1);
__().scheduler.ManualTick(0.21);
TEST_ExpectTrue(default.diskUses == 1);
TEST_ExpectTrue(__().scheduler.GetDiskQueueSize() == 3);
__().scheduler.ManualTick(0.2);
TEST_ExpectTrue(default.diskUses == 2);
__().scheduler.ManualTick(0.21);
TEST_ExpectTrue(default.diskUses == 2);
TEST_ExpectTrue(__().scheduler.GetDiskQueueSize() == 2);
__().scheduler.ManualTick(0.2);
TEST_ExpectTrue(default.diskUses == 3);
TEST_ExpectTrue(__().scheduler.GetDiskQueueSize() == 1);
__().scheduler.ManualTick(0.1);
TEST_ExpectTrue(default.diskUses == 3);
__().scheduler.ManualTick(0.1);
TEST_ExpectTrue(default.diskUses == 3);
TEST_ExpectTrue(__().scheduler.GetDiskQueueSize() == 1);
__().scheduler.ManualTick(0.1);
TEST_ExpectTrue(default.diskUses == 4);
TEST_ExpectTrue(__().scheduler.GetDiskQueueSize() == 0);
}
protected static function SubText_DiskSchedulingDeallocate()
{
local Text objectInstance, deletedInstance;
Issue("Disk scheduling cannot correctly handle deallocated receivers.");
default.diskUses = 0;
__().scheduler.ManualTick(1.0); // Pre-fill cooldown, just in case
objectInstance = __().text.FromString("whatever");
deletedInstance = __().text.FromString("heh");
__().scheduler.RequestDiskAccess(objectInstance).connect = UseDisk;
__().scheduler.RequestDiskAccess(deletedInstance).connect = UseDisk;
__().scheduler.RequestDiskAccess(objectInstance).connect = UseDisk;
__().scheduler.RequestDiskAccess(deletedInstance).connect = UseDisk;
// Fuck off the `deletedInstance` object
deletedInstance.FreeSelf();
// Test!
__().scheduler.ManualTick(0.001);
TEST_ExpectTrue(default.diskUses == 1);
__().scheduler.ManualTick(0.21);
TEST_ExpectTrue(default.diskUses == 1);
__().scheduler.ManualTick(0.2);
TEST_ExpectTrue(default.diskUses == 2);
__().scheduler.ManualTick(0.21);
TEST_ExpectTrue(default.diskUses == 2);
__().scheduler.ManualTick(0.2);
TEST_ExpectTrue(default.diskUses == 2);
__().scheduler.ManualTick(1.0);
TEST_ExpectTrue(default.diskUses == 2);
}
protected static function SubText_JobDiskMix()
{
local Text objectInstance;
Issue("Job and disk scheduling doesn't happen at expected intervals.");
objectInstance = __().text.FromString("whatever");
class'MockJob'.default.callStack = "";
default.diskUses = 0;
__().scheduler.ManualTick(1.0); // Reset work units
// 0.2 * 10,000 = 2,000 units => 1,000 units per job for 2 jobs
__().scheduler.AddJob(MakeJob("A", 30000));
__().scheduler.AddJob(MakeJob("B", 10000));
__().scheduler.RequestDiskAccess(objectInstance).connect = UseDisk;
__().scheduler.RequestDiskAccess(objectInstance).connect = UseDisk;
// Reset disk cooldown
__().scheduler.ManualTick(0.2);
// A:25000, B:5000
TEST_ExpectTrue(default.diskUses == 1);
__().scheduler.ManualTick(0.2); // Disk on cooldown
// A:20000, B:0
TEST_ExpectTrue(class'MockJob'.default.callStack == "B");
TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 1);
TEST_ExpectTrue(default.diskUses == 1);
TEST_ExpectTrue(__().scheduler.GetDiskQueueSize() == 1);
__().scheduler.ManualTick(0.2); // Disk got off cooldown, do writing
// A:10000, B:0
TEST_ExpectTrue(class'MockJob'.default.callStack == "B");
TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 1);
TEST_ExpectTrue(default.diskUses == 2);
TEST_ExpectTrue(__().scheduler.GetDiskQueueSize() == 0);
__().scheduler.ManualTick(0.2); // Disk on cooldown
// A:0, B:0
TEST_ExpectTrue(class'MockJob'.default.callStack == "BA");
TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 0);
TEST_ExpectTrue(default.diskUses == 2);
TEST_ExpectTrue(__().scheduler.GetDiskQueueSize() == 0);
}
defaultproperties
{
caseName = "SchedulerAPI"
caseGroup = "Scheduler"
}

421
sources/BaseRealm/API/Scheduler/SchedulerAPI.uc

@ -0,0 +1,421 @@
/**
* API that provides functions for scheduling jobs and expensive tasks such
* as writing onto the disk. Also provides methods for users to inform API that
* they've recently did an expensive operation, so that `SchedulerAPI` is to
* try and use less resources when managing jobs.
* Copyright 2022 Anton Tarasenko
*------------------------------------------------------------------------------
* This file is part of Acedia.
*
* Acedia is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3 of the License, or
* (at your option) any later version.
*
* Acedia is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Acedia. If not, see <https://www.gnu.org/licenses/>.
*/
class SchedulerAPI extends AcediaObject
config(AcediaSystem);
/**
* # `SchedulerAPI`
*
* UnrealScript is inherently single-threaded and whatever method you call,
* it will be completely executed within a single game's tick.
* This API is meant for scheduling various actions over time to help emulating
* multi-threading by spreading some code executions over several different
* game/server ticks.
*
* ## Usage
*
* ### Job scheduling
*
* One of the reasons which is faulty infinite loop detection system that
* will crash the game/server if it thinks UnrealScript code has executed too
* many operations (it is not about execution time, logging a lot of messages
* with `Log()` can take a lot of time and not crash anything, while simple
* loop, that would've finished much sooner, can trigger a crash).
* This is a very atypical problem for mods to have, but Acedia's
* introduction of databases and avarice link can lead to users trying to read
* (from database or network) an object that is too big, leading to a crash.
* Jobs are not about performance, they're about crash prevention.
*
* In case you have such a job of your own, that can potentially take too
* many steps to finish without crashing, you can convert it into
* a `SchedulerJob` (you make a subclass for your type of the job and
* instantiate it for each execution of the job). This requires you to
* restructure your algorithm in such a way, that it is able to run for some
* finite (maybe small) amount of steps and postpone the rest of calculations
* to the next tick and put it into a method
* `SchedulerJob.DoWork(int allottedWorkUnits)`, where `allottedWorkUnits` is
* how much your method is allowed to do during this call, assuming `10000`
* units of work on their own won't lead to a crash.
* Another method `SchedulerJob.IsCompleted()` needs to be setup to return
* `true` iff your job is done.
* After you prepared an instance of your job subclass, simply pass it to
* `_.scheduler.AddJob()`.
*
* ### Disk usage requests
*
* Writing to the disk (saving data into config file, saving local database
* changes) can be an expensive operation and to avoid lags in gameplay you
* might want to spread such operations over time.
* `_.scheduler.RequestDiskAccess()` method allows you to do that. It is not
* exactly a signal, but it acts similar to one: to request a right to save to
* the disk, just do the following:
* `_.scheduler.RequestDiskAccess(<receiver>).connect = <disk_writing_method>`
* and `disk_writing_method()` will be called once your turn come up.
*
* ## Manual ticking
*
* If any kind of level core (either server or client one) was created,
* this API will automatically perform necessary actions every tick.
* Otherwise, if only base API is available, there's no way to do that, but
* you can manually decide when to tick this API by calling `ManualTick()`
* method.
*/
/**
* How often can files be saved on disk. This is a relatively expensive
* operation and we don't want to write a lot of different files at once.
* But since we lack a way to exactly measure how much time that saving will
* take, AcediaCore falls back to simply performing every saving with same
* uniform time intervals in-between.
* This variable decides how much time there should be between two file
* writing accesses.
* Negative and zero values mean that all writing disk access will be
* granted as soon as possible, without any cooldowns.
*/
var private config float diskSaveCooldown;
/**
* Maximum total work units for jobs allowed per tick. Jobs are expected to be
* constructed such that they don't lead to a crash if they have to perform
* this much work.
*
* Changing default value of `10000` is not advised.
*/
var private config int maxWorkUnits;
/**
* How many different jobs can be performed per tick. This limit is added so
* that `maxWorkUnits` won't be spread too thin if a lot of jobs get registered
* at once.
*/
var private config int maxJobsPerTick;
// We can (and will) automatically tick
var private bool tickAvailable;
// `true` == it is safe to use server API for a tick
// `false` == it is safe to use client API for a tick
var private bool tickFromServer;
// Our `Tick()` method is currently connected to the `OnTick()` signal.
// Keeping track of this allows us to disconnect from `OnTick()` signal
// when it is not necessary.
var private bool connectedToTick;
// How much time if left until we can write to the disk again?
var private float currentDiskCooldown;
// There is a limit (`maxJobsPerTick`) to how many different jobs we can
// perform per tick and if we register an amount jobs over that limit, we need
// to uniformly spread execution time between them.
// To achieve that we simply cyclically (in order) go over `currentJobs`
// array, each time executing exactly `maxJobsPerTick` jobs.
// `nextJobToPerform` remembers what job is to be executed next tick.
var private int nextJobToPerform;
var private array<SchedulerJob> currentJobs;
// Storing receiver objects, following example of signals/slots, is done
// without increasing their reference count, allowing them to get deallocated
// while we are still keeping their reference.
// To avoid using such deallocated receivers, we keep track of the life
// versions they've had when their disk requests were registered.
var private array<SchedulerDiskRequest> diskQueue;
var private array<AcediaObject> receivers;
var private array<int> receiversLifeVersions;
/**
* Registers new scheduler job `newJob` to be executed in the API.
*
* @param newJob New job to be scheduled for execution.
* Does nothing if given `newJob` is already added.
*/
public function AddJob(SchedulerJob newJob)
{
local int i;
if (newJob == none) {
return;
}
for (i = 0; i < currentJobs.length; i += 1)
{
if (currentJobs[i] == newJob) {
return;
}
}
newJob.NewRef();
currentJobs[currentJobs.length] = newJob;
UpdateTickConnection();
}
/**
* Requests another disk access.
*
* Use it like signal: `RequestDiskAccess(<receiver>).connect = <handler>`.
* Since it is meant to be used as a signal, so DO NOT STORE/RELEASE returned
* wrapper object `SchedulerDiskRequest`.
*
* @param receiver Same as for signal/slots, this is an object, responsible
* for the disk request. If this object gets deallocated - request will be
* thrown away.
* Typically this should be an object in which connected method will be
* executed.
* @return Wrapper object that provides `connect` delegate.
*/
public function SchedulerDiskRequest RequestDiskAccess(AcediaObject receiver)
{
local SchedulerDiskRequest newRequest;
if (receiver == none) return none;
if (!receiver.IsAllocated()) return none;
newRequest =
SchedulerDiskRequest(_.memory.Allocate(class'SchedulerDiskRequest'));
diskQueue[diskQueue.length] = newRequest;
receivers[receivers.length] = receiver;
receiversLifeVersions[receiversLifeVersions.length] =
receiver.GetLifeVersion();
UpdateTickConnection();
return newRequest;
}
/**
* Tells you how many incomplete jobs are currently registered in
* the scheduler.
*
* @return How many incomplete jobs are currently registered in the scheduler.
*/
public function int GetJobsAmount()
{
CleanCompletedJobs();
return currentJobs.length;
}
/**
* Tells you how many disk access requests are currently registered in
* the scheduler.
*
* @return How many incomplete disk access requests are currently registered
* in the scheduler.
*/
public function int GetDiskQueueSize()
{
CleanDiskQueue();
return diskQueue.length;
}
/**
* In case neither server, nor client core is registered, scheduler must be
* ticked manually. For that call this method each separate tick (or whatever
* is your closest approximation available for that).
*
* Before manually invoking this method, you should check if scheduler
* actually started to tick *automatically*. Use `_.scheduler.IsAutomated()`
* for that.
*
* NOTE: If neither server-/client- core is created, nor `ManualTick()` is
* invoked manually, `SchedulerAPI` won't actually do anything.
*
* @param delta Time (real one) that is supposedly passes from the moment
* `ManualTick()` was called last time. Used for tracking disk access
* cooldowns. How `SchedulerJob`s are executed is independent from this
* value.
*/
public final function ManualTick(optional float delta)
{
Tick(delta, 1.0);
}
/**
* Is scheduler ticking automated? It can only be automated if either
* server or client level cores are created. Scheduler can automatically enable
* automation and it cannot be prevented, but can be helped by using
* `UpdateTickConnection()` method.
*
* @return `true` if scheduler's tick is automatically called and `false`
* otherwise (and calling `ManualTick()` is required).
*/
public function bool IsAutomated()
{
return tickAvailable;
}
/**
* Causes `SchedulerAPI` to try automating itself by searching for level cores
* (checking if server/client APIs are enabled).
*/
public function UpdateTickConnection()
{
local bool needsConnection;
local UnrealAPI api;
if (!tickAvailable)
{
if (_server.IsAvailable())
{
tickAvailable = true;
tickFromServer = true;
}
else if (_client.IsAvailable())
{
tickAvailable = true;
tickFromServer = false;
}
if (!tickAvailable) {
return;
}
}
needsConnection = (currentJobs.length > 0 || diskQueue.length > 0);
if (connectedToTick == needsConnection) {
return;
}
if (tickFromServer) {
api = _server.unreal;
}
else {
api = _client.unreal;
}
if (connectedToTick && !needsConnection) {
api.OnTick(self).Disconnect();
}
else if (!connectedToTick && needsConnection) {
api.OnTick(self).connect = Tick;
}
connectedToTick = needsConnection;
}
private function Tick(float delta, float dilationCoefficient)
{
delta = delta / dilationCoefficient;
// Manage disk cooldown
if (currentDiskCooldown > 0) {
currentDiskCooldown -= delta;
}
if (currentDiskCooldown <= 0 && diskQueue.length > 0)
{
currentDiskCooldown = diskSaveCooldown;
ProcessDiskQueue();
}
// Manage jobs
if (currentJobs.length > 0) {
ProcessJobs();
}
UpdateTickConnection();
}
private function ProcessJobs()
{
local int unitsPerJob;
local int jobsToPerform;
CleanCompletedJobs();
jobsToPerform = Min(currentJobs.length, maxJobsPerTick);
if (jobsToPerform <= 0) {
return;
}
unitsPerJob = maxWorkUnits / jobsToPerform;
while (jobsToPerform > 0)
{
if (nextJobToPerform >= currentJobs.length) {
nextJobToPerform = 0;
}
currentJobs[nextJobToPerform].DoWork(unitsPerJob);
nextJobToPerform += 1;
jobsToPerform -= 1;
}
}
private function ProcessDiskQueue()
{
local int i;
// Even if we clean disk queue here, we still need to double check
// lifetimes in the code below, since we have no idea what `.connect()`
// calls might do
CleanDiskQueue();
if (diskQueue.length <= 0) {
return;
}
if (diskSaveCooldown > 0)
{
if (receivers[i].GetLifeVersion() == receiversLifeVersions[i]) {
diskQueue[i].connect();
}
_.memory.Free(diskQueue[0]);
diskQueue.Remove(0, 1);
receivers.Remove(0, 1);
receiversLifeVersions.Remove(0, 1);
return;
}
for (i = 0; i < diskQueue.length; i += 1)
{
if (receivers[i].GetLifeVersion() == receiversLifeVersions[i]) {
diskQueue[i].connect();
}
_.memory.Free(diskQueue[i]);
}
diskQueue.length = 0;
receivers.length = 0;
receiversLifeVersions.length = 0;
}
// Removes completed jobs
private function CleanCompletedJobs()
{
local int i;
while (i < currentJobs.length)
{
if (currentJobs[i].IsCompleted())
{
if (i < nextJobToPerform) {
nextJobToPerform -= 1;
}
currentJobs[i].FreeSelf();
currentJobs.Remove(i, 1);
}
else {
i += 1;
}
}
}
// Remove disk requests with deallocated receivers
private function CleanDiskQueue()
{
local int i;
while (i < diskQueue.length)
{
if (receivers[i].GetLifeVersion() == receiversLifeVersions[i])
{
i += 1;
continue;
}
_.memory.Free(diskQueue[i]);
diskQueue.Remove(i, 1);
receivers.Remove(i, 1);
receiversLifeVersions.Remove(i, 1);
}
}
defaultproperties
{
diskSaveCooldown = 0.25
maxWorkUnits = 10000
maxJobsPerTick = 5
}

29
sources/BaseRealm/API/Scheduler/SchedulerDiskRequest.uc

@ -0,0 +1,29 @@
/**
* Slot-like object that represents a request for a writing disk access,
* capable of being scheduled on the `SchedulerAPI`.
* Copyright 2022 Anton Tarasenko
*------------------------------------------------------------------------------
* This file is part of Acedia.
*
* Acedia is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3 of the License, or
* (at your option) any later version.
*
* Acedia is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Acedia. If not, see <https://www.gnu.org/licenses/>.
*/
class SchedulerDiskRequest extends AcediaObject;
delegate connect()
{
}
defaultproperties
{
}

47
sources/BaseRealm/API/Scheduler/SchedulerJob.uc

@ -0,0 +1,47 @@
/**
* Template object that represents a job, capable of being scheduled on the
* `SchedulerAPI`. Use `IsCompleted()` to mark job as completed.
* Copyright 2022 Anton Tarasenko
*------------------------------------------------------------------------------
* This file is part of Acedia.
*
* Acedia is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3 of the License, or
* (at your option) any later version.
*
* Acedia is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Acedia. If not, see <https://www.gnu.org/licenses/>.
*/
class SchedulerJob extends AcediaObject
abstract;
/**
* Checks if caller `SchedulerJob` was completed.
* Once this method returns `true`, it shouldn't start returning `false` again.
*
* @return `true` if `SchedulerJob` is already completed and doesn't need to
* be further executed and `false` otherwise.
*/
public function bool IsCompleted();
/**
* Called when scheduler decides that `SchedulerJob` should be executed, taking
* amount of abstract "work units" that it is allowed to spend for work.
*
* @param allottedWorkUnits Work units allotted to the caller
* `SchedulerJob`. By default there is `10000` work units per second, so
* you can expect about 10000 / 1000 = 10 work units per millisecond or,
* on servers with 30 tick rate, about 10000 * (30 / 1000) = 300 work units
* per tick to be allotted to all the scheduled jobs.
*/
public function DoWork(int allottedWorkUnits);
defaultproperties
{
}

3
sources/BaseRealm/Global.uc

@ -39,6 +39,7 @@ var public UserAPI users;
var public PlayersAPI players; var public PlayersAPI players;
var public JSONAPI json; var public JSONAPI json;
var public DBAPI db; var public DBAPI db;
var public SchedulerAPI scheduler;
var public AvariceAPI avarice; var public AvariceAPI avarice;
var public AcediaEnvironment environment; var public AcediaEnvironment environment;
@ -74,6 +75,7 @@ protected function Initialize()
players = PlayersAPI(memory.Allocate(class'PlayersAPI')); players = PlayersAPI(memory.Allocate(class'PlayersAPI'));
json = JSONAPI(memory.Allocate(class'JSONAPI')); json = JSONAPI(memory.Allocate(class'JSONAPI'));
db = DBAPI(memory.Allocate(class'DBAPI')); db = DBAPI(memory.Allocate(class'DBAPI'));
scheduler = SchedulerAPI(memory.Allocate(class'SchedulerAPI'));
avarice = AvariceAPI(memory.Allocate(class'AvariceAPI')); avarice = AvariceAPI(memory.Allocate(class'AvariceAPI'));
environment = AcediaEnvironment(memory.Allocate(class'AcediaEnvironment')); environment = AcediaEnvironment(memory.Allocate(class'AcediaEnvironment'));
} }
@ -94,6 +96,7 @@ public function DropCoreAPI()
players = none; players = none;
json = none; json = none;
db = none; db = none;
scheduler = none;
avarice = none; avarice = none;
default.myself = none; default.myself = none;
} }

1
sources/ClientRealm/ClientLevelCore.uc

@ -29,6 +29,7 @@ public simulated static function LevelCore CreateLevelCore(Actor source)
if (newCore != none) { if (newCore != none) {
__client().ConnectClientLevelCore(); __client().ConnectClientLevelCore();
} }
__().scheduler.UpdateTickConnection();
return newCore; return newCore;
} }

15
sources/CoreRealm/CoreGlobal.uc

@ -56,6 +56,21 @@ protected function Initialize()
time = TimeAPI(api.Allocate(adapterClass.default.timeAPIClass)); time = TimeAPI(api.Allocate(adapterClass.default.timeAPIClass));
} }
/**
* Checks is caller `CoreGlobal` is available to be used.
*
* Server and client `CoreGlobal` instances are always created, so that they
* can be added to `AcediaObject`s and `AcediaActor`s at any time, even before
* they were initialized (whether they ever will be or not). This method
* allows one to check whether they were already initialized and can be used.
*
* @return `true` if caller `CoreGlobal` can be used and `false` otherwise.
*/
public function bool IsAvailable()
{
return initialized;
}
/** /**
* Changes adapter class for the caller `...Global` instance. * Changes adapter class for the caller `...Global` instance.
* *

10
sources/Data/Database/DBAPI.uc

@ -1,7 +1,7 @@
/** /**
* API that provides methods for creating/destroying and managing available * API that provides methods for creating/destroying and managing available
* databases. * databases.
* Copyright 2021 Anton Tarasenko * Copyright 2021-2022 Anton Tarasenko
*------------------------------------------------------------------------------ *------------------------------------------------------------------------------
* This file is part of Acedia. * This file is part of Acedia.
* *
@ -57,6 +57,7 @@ public final function Database Load(BaseText databaseLink)
local Database result; local Database result;
local Text immutableDatabaseName; local Text immutableDatabaseName;
local MutableText databaseName; local MutableText databaseName;
if (databaseLink == none) { if (databaseLink == none) {
return none; return none;
} }
@ -99,6 +100,7 @@ public final function JSONPointer GetPointer(BaseText databaseLink)
local int slashIndex; local int slashIndex;
local Text textPointer; local Text textPointer;
local JSONPointer result; local JSONPointer result;
if (databaseLink == none) { if (databaseLink == none) {
return none; return none;
} }
@ -167,6 +169,7 @@ public final function LocalDatabaseInstance LoadLocal(BaseText databaseName)
local Text rootRecordName; local Text rootRecordName;
local LocalDatabase newConfig; local LocalDatabase newConfig;
local LocalDatabaseInstance newLocalDBInstance; local LocalDatabaseInstance newLocalDBInstance;
if (databaseName == none) { if (databaseName == none) {
return none; return none;
} }
@ -224,7 +227,6 @@ public final function bool ExistsLocal(BaseText databaseName)
return result; return result;
} }
// TODO: deleted database must be marked as disposed + change tests too
/** /**
* Deletes local database with name `databaseName`. * Deletes local database with name `databaseName`.
* *
@ -237,6 +239,7 @@ public final function bool DeleteLocal(BaseText databaseName)
local LocalDatabase localDatabaseConfig; local LocalDatabase localDatabaseConfig;
local LocalDatabaseInstance localDatabase; local LocalDatabaseInstance localDatabase;
local HashTable.Entry dbEntry; local HashTable.Entry dbEntry;
if (databaseName == none) { if (databaseName == none) {
return false; return false;
} }
@ -246,6 +249,7 @@ public final function bool DeleteLocal(BaseText databaseName)
if (localDatabase != none) if (localDatabase != none)
{ {
localDatabaseConfig = localDatabase.GetConfig(); localDatabaseConfig = localDatabase.GetConfig();
localDatabase.WriteToDisk();
_.memory.Free(localDatabase); _.memory.Free(localDatabase);
} }
dbEntry = loadedLocalDatabases.TakeEntry(databaseName); dbEntry = loadedLocalDatabases.TakeEntry(databaseName);
@ -270,6 +274,7 @@ private function EraseAllPackageData(BaseText packageToErase)
local GameInfo game; local GameInfo game;
local DBRecord nextRecord; local DBRecord nextRecord;
local array<DBRecord> allRecords; local array<DBRecord> allRecords;
packageName = _.text.ToString(packageToErase); packageName = _.text.ToString(packageToErase);
if (packageName == "") { if (packageName == "") {
return; return;
@ -299,6 +304,7 @@ public final function array<Text> ListLocal()
local int i; local int i;
local array<Text> dbNames; local array<Text> dbNames;
local array<string> dbNamesAsStrings; local array<string> dbNamesAsStrings;
dbNamesAsStrings = GetPerObjectNames( "AcediaDB", dbNamesAsStrings = GetPerObjectNames( "AcediaDB",
string(class'LocalDatabase'.name), string(class'LocalDatabase'.name),
MaxInt); MaxInt);

49
sources/Data/Database/Local/LocalDatabaseInstance.uc

@ -72,14 +72,9 @@ var private LocalDatabase configEntry;
// Reference to the `DBRecord` that stores root object of this database // Reference to the `DBRecord` that stores root object of this database
var private DBRecord rootRecord; var private DBRecord rootRecord;
// As long as this `Timer` runs - we are in the "cooldown" period where no disk // Remembers whether we've made a request for the disk access to the scheduler,
// updates can be done (except special cases like this object getting // to avoid sending multiple ones.
// deallocated). var private bool pendingDiskUpdate;
var private Timer diskUpdateTimer;
// Only relevant when `diskUpdateTimer` is running. `false` would mean there is
// nothing to new to write and the timer will be discarded, but `true` means
// that we have to write database on disk and restart the update timer again.
var private bool needsDiskUpdate;
// Last to-be-completed task added to this database // Last to-be-completed task added to this database
var private DBTask lastTask; var private DBTask lastTask;
@ -99,8 +94,6 @@ protected function Finalizer()
WriteToDisk(); WriteToDisk();
rootRecord = none; rootRecord = none;
_server.unreal.OnTick(self).Disconnect(); _server.unreal.OnTick(self).Disconnect();
_.memory.Free(diskUpdateTimer);
diskUpdateTimer = none;
configEntry = none; configEntry = none;
} }
@ -116,39 +109,23 @@ private final function CompleteAllTasks(
lastTaskLifeVersion = -1; lastTaskLifeVersion = -1;
} }
private final function LocalDatabaseInstance ScheduleDiskUpdate() private final function ScheduleDiskUpdate()
{ {
if (diskUpdateTimer != none) if (!pendingDiskUpdate)
{ {
needsDiskUpdate = true; pendingDiskUpdate = true;
return self; _.scheduler.RequestDiskAccess(self).connect = WriteToDisk;
} }
WriteToDisk();
needsDiskUpdate = false;
diskUpdateTimer = _server.time.StartTimer(
class'LocalDBSettings'.default.writeToDiskDelay);
diskUpdateTimer.OnElapsed(self).connect = DoDiskUpdate;
return self;
} }
private final function DoDiskUpdate(Timer source) public final function WriteToDisk()
{
if (needsDiskUpdate)
{
WriteToDisk();
needsDiskUpdate = false;
diskUpdateTimer.Start();
}
else
{
_.memory.Free(diskUpdateTimer);
diskUpdateTimer = none;
}
}
private final function WriteToDisk()
{ {
local string packageName; local string packageName;
if (!pendingDiskUpdate) {
return;
}
pendingDiskUpdate = false;
if (configEntry != none) { if (configEntry != none) {
packageName = _.text.ToString(configEntry.GetPackageName()); packageName = _.text.ToString(configEntry.GetPackageName());
} }

13
sources/Manifest.uc

@ -1,6 +1,6 @@
/** /**
* Manifest is meant to describe contents of the Acedia's package. * Manifest is meant to describe contents of the Acedia's package.
* Copyright 2020 - 2022 Anton Tarasenko * Copyright 2020-2022 Anton Tarasenko
*------------------------------------------------------------------------------ *------------------------------------------------------------------------------
* This file is part of Acedia. * This file is part of Acedia.
* *
@ -53,9 +53,10 @@ defaultproperties
testCases(21) = class'TEST_Command' testCases(21) = class'TEST_Command'
testCases(22) = class'TEST_CommandDataBuilder' testCases(22) = class'TEST_CommandDataBuilder'
testCases(23) = class'TEST_LogMessage' testCases(23) = class'TEST_LogMessage'
testCases(24) = class'TEST_DatabaseCommon' testCases(24) = class'TEST_SchedulerAPI'
testCases(25) = class'TEST_LocalDatabase' testCases(25) = class'TEST_DatabaseCommon'
testCases(26) = class'TEST_AcediaConfig' testCases(26) = class'TEST_LocalDatabase'
testCases(27) = class'TEST_UTF8EncoderDecoder' testCases(27) = class'TEST_AcediaConfig'
testCases(28) = class'TEST_AvariceStreamReader' testCases(28) = class'TEST_UTF8EncoderDecoder'
testCases(29) = class'TEST_AvariceStreamReader'
} }

1
sources/ServerRealm/ServerLevelCore.uc

@ -31,6 +31,7 @@ public static function LevelCore CreateLevelCore(Actor source)
if (newCore != none) { if (newCore != none) {
__server().ConnectServerLevelCore(); __server().ConnectServerLevelCore();
} }
__().scheduler.UpdateTickConnection();
return newCore; return newCore;
} }

Loading…
Cancel
Save