From 0b33d189279ef0bfc9c0c461eda7351573a6e35a Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Mon, 25 Jul 2022 02:37:08 +0700 Subject: [PATCH] Add `SchedulerAPI` --- config/AcediaSystem.ini | 22 + .../BaseRealm/API/Scheduler/API/MockJob.uc | 44 ++ .../API/Scheduler/API/TEST_SchedulerAPI.uc | 216 +++++++++ .../BaseRealm/API/Scheduler/SchedulerAPI.uc | 421 ++++++++++++++++++ .../API/Scheduler/SchedulerDiskRequest.uc | 29 ++ .../BaseRealm/API/Scheduler/SchedulerJob.uc | 47 ++ sources/BaseRealm/Global.uc | 3 + sources/ClientRealm/ClientLevelCore.uc | 1 + sources/CoreRealm/CoreGlobal.uc | 15 + sources/Data/Database/DBAPI.uc | 10 +- .../Database/Local/LocalDatabaseInstance.uc | 49 +- sources/Manifest.uc | 13 +- sources/ServerRealm/ServerLevelCore.uc | 1 + 13 files changed, 827 insertions(+), 44 deletions(-) create mode 100644 sources/BaseRealm/API/Scheduler/API/MockJob.uc create mode 100644 sources/BaseRealm/API/Scheduler/API/TEST_SchedulerAPI.uc create mode 100644 sources/BaseRealm/API/Scheduler/SchedulerAPI.uc create mode 100644 sources/BaseRealm/API/Scheduler/SchedulerDiskRequest.uc create mode 100644 sources/BaseRealm/API/Scheduler/SchedulerJob.uc diff --git a/config/AcediaSystem.ini b/config/AcediaSystem.ini index bd94299..17d627b 100644 --- a/config/AcediaSystem.ini +++ b/config/AcediaSystem.ini @@ -63,6 +63,28 @@ allowReplacingDamageTypes=true ; compatibility reasons), you should set this value at `BHIJ_Root`. broadcastHandlerInjectionLevel=BHIJ_Root +[AcediaCore.SchedulerAPI] +; How often can files be saved on disk. This is a relatively expensive +; operation and we don't want to write a lot of different files at once. +; But since we lack a way to exactly measure how much time that saving will +; take, AcediaCore falls back to simply performing every saving with same +; uniform time intervals in-between. +; This variable decides how much time there should be between two file +; writing accesses. +; Negative and zero values mean that all writing disk access will be +; granted as soon as possible, without any cooldowns. +diskSaveCooldown=0.25 +; Maximum total work units for jobs allowed per tick. Jobs are expected to be +; constructed such that they don't lead to a crash if they have to perform +; this much work. +; +; Changing default value of `10000` is not advised. +maxWorkUnits=10000 +; How many different jobs can be performed per tick. This limit is added so +; that `maxWorkUnits` won't be spread too thin if a lot of jobs get registered +; at once. +maxJobsPerTick=5 + [AcediaCore.UserAPI] userDataDBLink="local:database/users" diff --git a/sources/BaseRealm/API/Scheduler/API/MockJob.uc b/sources/BaseRealm/API/Scheduler/API/MockJob.uc new file mode 100644 index 0000000..120b539 --- /dev/null +++ b/sources/BaseRealm/API/Scheduler/API/MockJob.uc @@ -0,0 +1,44 @@ +/** + * Simple object that represents a job, capable of being scheduled on the + * `SchedulerAPI`. Use `IsCompleted()` to mark job as completed. + * Copyright 2022 Anton Tarasenko + *------------------------------------------------------------------------------ + * This file is part of Acedia. + * + * Acedia is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License, or + * (at your option) any later version. + * + * Acedia is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Acedia. If not, see . + */ +class MockJob extends SchedulerJob; + +var public string mark; +var public int unitsLeft; + +// We use `default` value only +var public string callStack; + +public function bool IsCompleted() +{ + return (unitsLeft <= 0); +} + +public function DoWork(int allottedWorkUnits) +{ + unitsLeft -= allottedWorkUnits; + if (IsCompleted()) { + default.callStack = default.callStack $ mark; + } +} + +defaultproperties +{ +} \ No newline at end of file diff --git a/sources/BaseRealm/API/Scheduler/API/TEST_SchedulerAPI.uc b/sources/BaseRealm/API/Scheduler/API/TEST_SchedulerAPI.uc new file mode 100644 index 0000000..89e2900 --- /dev/null +++ b/sources/BaseRealm/API/Scheduler/API/TEST_SchedulerAPI.uc @@ -0,0 +1,216 @@ +/** + * Set of tests for Scheduler API. + * Copyright 2022 Anton Tarasenko + *------------------------------------------------------------------------------ + * This file is part of Acedia. + * + * Acedia is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License, or + * (at your option) any later version. + * + * Acedia is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Acedia. If not, see . + */ +class TEST_SchedulerAPI extends TestCase + abstract; + +var int diskUses; + +protected static function UseDisk() +{ + default.diskUses += 1; +} + +protected static function MockJob MakeJob(string mark, int totalUnits) +{ + local MockJob newJob; + + newJob = MockJob(__().memory.Allocate(class'MockJob')); + newJob.mark = mark; + newJob.unitsLeft = totalUnits; + return newJob; +} + +protected static function TESTS() +{ + Test_MockJob(); +} + +protected static function Test_MockJob() +{ + Context("Testing job scheduling."); + SubText_SimpleScheduling(); + SubText_ManyScheduling(); + SubText_DiskScheduling(); + SubText_DiskSchedulingDeallocate(); + SubText_JobDiskMix(); +} + +protected static function SubText_SimpleScheduling() +{ + Issue("Simple scheduling doesn't process jobs in intended order"); + class'MockJob'.default.callStack = ""; + __().scheduler.ManualTick(); // Reset work units + __().scheduler.AddJob(MakeJob("A", 2400)); + __().scheduler.AddJob(MakeJob("B", 3000)); + __().scheduler.AddJob(MakeJob("C", 7600)); + __().scheduler.AddJob(MakeJob("D", 1000)); + __().scheduler.ManualTick(); // 10,000 units => -2,500 units per job + TEST_ExpectTrue(class'MockJob'.default.callStack == "AD"); + TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 2); + __().scheduler.ManualTick(); // 10,000 units => -5,000 units per job + TEST_ExpectTrue(class'MockJob'.default.callStack == "ADB"); + TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 1); + __().scheduler.ManualTick(); // 10,000 units => -5,000 units per job + TEST_ExpectTrue(class'MockJob'.default.callStack == "ADBC"); + TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 0); +} + +protected static function SubText_ManyScheduling() +{ + Issue("After scheduling jobs over per-tick limit, scheduler doesn't process" + @ "jobs in intended order"); + class'MockJob'.default.callStack = ""; + __().scheduler.ManualTick(); // Reset work units + // 10,000 units => 2,000 units per job for 5 jobs + __().scheduler.AddJob(MakeJob("A", 3000)); + __().scheduler.AddJob(MakeJob("B", 3000)); + __().scheduler.AddJob(MakeJob("C", 3000)); + __().scheduler.AddJob(MakeJob("D", 1000)); + __().scheduler.AddJob(MakeJob("E", 3000)); + __().scheduler.AddJob(MakeJob("F", 3000)); + __().scheduler.AddJob(MakeJob("G", 1000)); + __().scheduler.AddJob(MakeJob("H", 5000)); + __().scheduler.AddJob(MakeJob("I", 1000)); + __().scheduler.ManualTick(); + // A:1000, B:1000, C:1000, D:0, E:1000, F:3000, G:1000, H:5000, I:1000 + TEST_ExpectTrue(class'MockJob'.default.callStack == "D"); + TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 8); + __().scheduler.ManualTick(); + // A:0, B:1000, C:1000, D:0, E:1000, F:1000, G:0, H:3000, I:0 + TEST_ExpectTrue(class'MockJob'.default.callStack == "DGIA"); + TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 5); + __().scheduler.ManualTick(); + // A:0, B:0, C:0, D:0, E:0, F:0, G:0, H:1000, I:0 + TEST_ExpectTrue(class'MockJob'.default.callStack == "DGIABCEF"); + TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 1); + __().scheduler.ManualTick(); + // A:0, B:0, C:0, D:0, E:0, F:0, G:0, H:0, I:0 + TEST_ExpectTrue(class'MockJob'.default.callStack == "DGIABCEFH"); + TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 0); +} + +protected static function SubText_DiskScheduling() +{ + local Text objectInstance; + + Issue("Disk scheduling doesn't happen at expected intervals."); + default.diskUses = 0; + __().scheduler.ManualTick(1.0); // Pre-fill cooldown, just in case + objectInstance = __().text.FromString("whatever"); + __().scheduler.RequestDiskAccess(objectInstance).connect = UseDisk; + __().scheduler.RequestDiskAccess(objectInstance).connect = UseDisk; + __().scheduler.RequestDiskAccess(objectInstance).connect = UseDisk; + __().scheduler.RequestDiskAccess(objectInstance).connect = UseDisk; + __().scheduler.ManualTick(0.001); + TEST_ExpectTrue(default.diskUses == 1); + __().scheduler.ManualTick(0.21); + TEST_ExpectTrue(default.diskUses == 1); + TEST_ExpectTrue(__().scheduler.GetDiskQueueSize() == 3); + __().scheduler.ManualTick(0.2); + TEST_ExpectTrue(default.diskUses == 2); + __().scheduler.ManualTick(0.21); + TEST_ExpectTrue(default.diskUses == 2); + TEST_ExpectTrue(__().scheduler.GetDiskQueueSize() == 2); + __().scheduler.ManualTick(0.2); + TEST_ExpectTrue(default.diskUses == 3); + TEST_ExpectTrue(__().scheduler.GetDiskQueueSize() == 1); + __().scheduler.ManualTick(0.1); + TEST_ExpectTrue(default.diskUses == 3); + __().scheduler.ManualTick(0.1); + TEST_ExpectTrue(default.diskUses == 3); + TEST_ExpectTrue(__().scheduler.GetDiskQueueSize() == 1); + __().scheduler.ManualTick(0.1); + TEST_ExpectTrue(default.diskUses == 4); + TEST_ExpectTrue(__().scheduler.GetDiskQueueSize() == 0); +} + +protected static function SubText_DiskSchedulingDeallocate() +{ + local Text objectInstance, deletedInstance; + + Issue("Disk scheduling cannot correctly handle deallocated receivers."); + default.diskUses = 0; + __().scheduler.ManualTick(1.0); // Pre-fill cooldown, just in case + objectInstance = __().text.FromString("whatever"); + deletedInstance = __().text.FromString("heh"); + __().scheduler.RequestDiskAccess(objectInstance).connect = UseDisk; + __().scheduler.RequestDiskAccess(deletedInstance).connect = UseDisk; + __().scheduler.RequestDiskAccess(objectInstance).connect = UseDisk; + __().scheduler.RequestDiskAccess(deletedInstance).connect = UseDisk; + // Fuck off the `deletedInstance` object + deletedInstance.FreeSelf(); + // Test! + __().scheduler.ManualTick(0.001); + TEST_ExpectTrue(default.diskUses == 1); + __().scheduler.ManualTick(0.21); + TEST_ExpectTrue(default.diskUses == 1); + __().scheduler.ManualTick(0.2); + TEST_ExpectTrue(default.diskUses == 2); + __().scheduler.ManualTick(0.21); + TEST_ExpectTrue(default.diskUses == 2); + __().scheduler.ManualTick(0.2); + TEST_ExpectTrue(default.diskUses == 2); + __().scheduler.ManualTick(1.0); + TEST_ExpectTrue(default.diskUses == 2); +} + +protected static function SubText_JobDiskMix() +{ + local Text objectInstance; + + Issue("Job and disk scheduling doesn't happen at expected intervals."); + objectInstance = __().text.FromString("whatever"); + class'MockJob'.default.callStack = ""; + default.diskUses = 0; + __().scheduler.ManualTick(1.0); // Reset work units + // 0.2 * 10,000 = 2,000 units => 1,000 units per job for 2 jobs + __().scheduler.AddJob(MakeJob("A", 30000)); + __().scheduler.AddJob(MakeJob("B", 10000)); + __().scheduler.RequestDiskAccess(objectInstance).connect = UseDisk; + __().scheduler.RequestDiskAccess(objectInstance).connect = UseDisk; + // Reset disk cooldown + __().scheduler.ManualTick(0.2); + // A:25000, B:5000 + TEST_ExpectTrue(default.diskUses == 1); + __().scheduler.ManualTick(0.2); // Disk on cooldown + // A:20000, B:0 + TEST_ExpectTrue(class'MockJob'.default.callStack == "B"); + TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 1); + TEST_ExpectTrue(default.diskUses == 1); + TEST_ExpectTrue(__().scheduler.GetDiskQueueSize() == 1); + __().scheduler.ManualTick(0.2); // Disk got off cooldown, do writing + // A:10000, B:0 + TEST_ExpectTrue(class'MockJob'.default.callStack == "B"); + TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 1); + TEST_ExpectTrue(default.diskUses == 2); + TEST_ExpectTrue(__().scheduler.GetDiskQueueSize() == 0); + __().scheduler.ManualTick(0.2); // Disk on cooldown + // A:0, B:0 + TEST_ExpectTrue(class'MockJob'.default.callStack == "BA"); + TEST_ExpectTrue(__().scheduler.GetJobsAmount() == 0); + TEST_ExpectTrue(default.diskUses == 2); + TEST_ExpectTrue(__().scheduler.GetDiskQueueSize() == 0); +} + +defaultproperties +{ + caseName = "SchedulerAPI" + caseGroup = "Scheduler" +} \ No newline at end of file diff --git a/sources/BaseRealm/API/Scheduler/SchedulerAPI.uc b/sources/BaseRealm/API/Scheduler/SchedulerAPI.uc new file mode 100644 index 0000000..161316f --- /dev/null +++ b/sources/BaseRealm/API/Scheduler/SchedulerAPI.uc @@ -0,0 +1,421 @@ +/** + * API that provides functions for scheduling jobs and expensive tasks such + * as writing onto the disk. Also provides methods for users to inform API that + * they've recently did an expensive operation, so that `SchedulerAPI` is to + * try and use less resources when managing jobs. + * Copyright 2022 Anton Tarasenko + *------------------------------------------------------------------------------ + * This file is part of Acedia. + * + * Acedia is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License, or + * (at your option) any later version. + * + * Acedia is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Acedia. If not, see . + */ +class SchedulerAPI extends AcediaObject + config(AcediaSystem); + +/** + * # `SchedulerAPI` + * + * UnrealScript is inherently single-threaded and whatever method you call, + * it will be completely executed within a single game's tick. + * This API is meant for scheduling various actions over time to help emulating + * multi-threading by spreading some code executions over several different + * game/server ticks. + * + * ## Usage + * + * ### Job scheduling + * + * One of the reasons which is faulty infinite loop detection system that + * will crash the game/server if it thinks UnrealScript code has executed too + * many operations (it is not about execution time, logging a lot of messages + * with `Log()` can take a lot of time and not crash anything, while simple + * loop, that would've finished much sooner, can trigger a crash). + * This is a very atypical problem for mods to have, but Acedia's + * introduction of databases and avarice link can lead to users trying to read + * (from database or network) an object that is too big, leading to a crash. + * Jobs are not about performance, they're about crash prevention. + * + * In case you have such a job of your own, that can potentially take too + * many steps to finish without crashing, you can convert it into + * a `SchedulerJob` (you make a subclass for your type of the job and + * instantiate it for each execution of the job). This requires you to + * restructure your algorithm in such a way, that it is able to run for some + * finite (maybe small) amount of steps and postpone the rest of calculations + * to the next tick and put it into a method + * `SchedulerJob.DoWork(int allottedWorkUnits)`, where `allottedWorkUnits` is + * how much your method is allowed to do during this call, assuming `10000` + * units of work on their own won't lead to a crash. + * Another method `SchedulerJob.IsCompleted()` needs to be setup to return + * `true` iff your job is done. + * After you prepared an instance of your job subclass, simply pass it to + * `_.scheduler.AddJob()`. + * + * ### Disk usage requests + * + * Writing to the disk (saving data into config file, saving local database + * changes) can be an expensive operation and to avoid lags in gameplay you + * might want to spread such operations over time. + * `_.scheduler.RequestDiskAccess()` method allows you to do that. It is not + * exactly a signal, but it acts similar to one: to request a right to save to + * the disk, just do the following: + * `_.scheduler.RequestDiskAccess().connect = ` + * and `disk_writing_method()` will be called once your turn come up. + * + * ## Manual ticking + * + * If any kind of level core (either server or client one) was created, + * this API will automatically perform necessary actions every tick. + * Otherwise, if only base API is available, there's no way to do that, but + * you can manually decide when to tick this API by calling `ManualTick()` + * method. + */ + +/** + * How often can files be saved on disk. This is a relatively expensive + * operation and we don't want to write a lot of different files at once. + * But since we lack a way to exactly measure how much time that saving will + * take, AcediaCore falls back to simply performing every saving with same + * uniform time intervals in-between. + * This variable decides how much time there should be between two file + * writing accesses. + * Negative and zero values mean that all writing disk access will be + * granted as soon as possible, without any cooldowns. + */ +var private config float diskSaveCooldown; +/** + * Maximum total work units for jobs allowed per tick. Jobs are expected to be + * constructed such that they don't lead to a crash if they have to perform + * this much work. + * + * Changing default value of `10000` is not advised. + */ +var private config int maxWorkUnits; +/** + * How many different jobs can be performed per tick. This limit is added so + * that `maxWorkUnits` won't be spread too thin if a lot of jobs get registered + * at once. + */ +var private config int maxJobsPerTick; + +// We can (and will) automatically tick +var private bool tickAvailable; +// `true` == it is safe to use server API for a tick +// `false` == it is safe to use client API for a tick +var private bool tickFromServer; +// Our `Tick()` method is currently connected to the `OnTick()` signal. +// Keeping track of this allows us to disconnect from `OnTick()` signal +// when it is not necessary. +var private bool connectedToTick; + +// How much time if left until we can write to the disk again? +var private float currentDiskCooldown; + +// There is a limit (`maxJobsPerTick`) to how many different jobs we can +// perform per tick and if we register an amount jobs over that limit, we need +// to uniformly spread execution time between them. +// To achieve that we simply cyclically (in order) go over `currentJobs` +// array, each time executing exactly `maxJobsPerTick` jobs. +// `nextJobToPerform` remembers what job is to be executed next tick. +var private int nextJobToPerform; +var private array currentJobs; +// Storing receiver objects, following example of signals/slots, is done +// without increasing their reference count, allowing them to get deallocated +// while we are still keeping their reference. +// To avoid using such deallocated receivers, we keep track of the life +// versions they've had when their disk requests were registered. +var private array diskQueue; +var private array receivers; +var private array receiversLifeVersions; + +/** + * Registers new scheduler job `newJob` to be executed in the API. + * + * @param newJob New job to be scheduled for execution. + * Does nothing if given `newJob` is already added. + */ +public function AddJob(SchedulerJob newJob) +{ + local int i; + + if (newJob == none) { + return; + } + for (i = 0; i < currentJobs.length; i += 1) + { + if (currentJobs[i] == newJob) { + return; + } + } + newJob.NewRef(); + currentJobs[currentJobs.length] = newJob; + UpdateTickConnection(); +} + +/** + * Requests another disk access. + * + * Use it like signal: `RequestDiskAccess().connect = `. + * Since it is meant to be used as a signal, so DO NOT STORE/RELEASE returned + * wrapper object `SchedulerDiskRequest`. + * + * @param receiver Same as for signal/slots, this is an object, responsible + * for the disk request. If this object gets deallocated - request will be + * thrown away. + * Typically this should be an object in which connected method will be + * executed. + * @return Wrapper object that provides `connect` delegate. + */ +public function SchedulerDiskRequest RequestDiskAccess(AcediaObject receiver) +{ + local SchedulerDiskRequest newRequest; + + if (receiver == none) return none; + if (!receiver.IsAllocated()) return none; + + newRequest = + SchedulerDiskRequest(_.memory.Allocate(class'SchedulerDiskRequest')); + diskQueue[diskQueue.length] = newRequest; + receivers[receivers.length] = receiver; + receiversLifeVersions[receiversLifeVersions.length] = + receiver.GetLifeVersion(); + UpdateTickConnection(); + return newRequest; +} + +/** + * Tells you how many incomplete jobs are currently registered in + * the scheduler. + * + * @return How many incomplete jobs are currently registered in the scheduler. + */ +public function int GetJobsAmount() +{ + CleanCompletedJobs(); + return currentJobs.length; +} + +/** + * Tells you how many disk access requests are currently registered in + * the scheduler. + * + * @return How many incomplete disk access requests are currently registered + * in the scheduler. + */ +public function int GetDiskQueueSize() +{ + CleanDiskQueue(); + return diskQueue.length; +} + +/** + * In case neither server, nor client core is registered, scheduler must be + * ticked manually. For that call this method each separate tick (or whatever + * is your closest approximation available for that). + * + * Before manually invoking this method, you should check if scheduler + * actually started to tick *automatically*. Use `_.scheduler.IsAutomated()` + * for that. + * + * NOTE: If neither server-/client- core is created, nor `ManualTick()` is + * invoked manually, `SchedulerAPI` won't actually do anything. + * + * @param delta Time (real one) that is supposedly passes from the moment + * `ManualTick()` was called last time. Used for tracking disk access + * cooldowns. How `SchedulerJob`s are executed is independent from this + * value. + */ +public final function ManualTick(optional float delta) +{ + Tick(delta, 1.0); +} + +/** + * Is scheduler ticking automated? It can only be automated if either + * server or client level cores are created. Scheduler can automatically enable + * automation and it cannot be prevented, but can be helped by using + * `UpdateTickConnection()` method. + * + * @return `true` if scheduler's tick is automatically called and `false` + * otherwise (and calling `ManualTick()` is required). + */ +public function bool IsAutomated() +{ + return tickAvailable; +} + +/** + * Causes `SchedulerAPI` to try automating itself by searching for level cores + * (checking if server/client APIs are enabled). + */ +public function UpdateTickConnection() +{ + local bool needsConnection; + local UnrealAPI api; + + if (!tickAvailable) + { + if (_server.IsAvailable()) + { + tickAvailable = true; + tickFromServer = true; + } + else if (_client.IsAvailable()) + { + tickAvailable = true; + tickFromServer = false; + } + if (!tickAvailable) { + return; + } + } + needsConnection = (currentJobs.length > 0 || diskQueue.length > 0); + if (connectedToTick == needsConnection) { + return; + } + if (tickFromServer) { + api = _server.unreal; + } + else { + api = _client.unreal; + } + if (connectedToTick && !needsConnection) { + api.OnTick(self).Disconnect(); + } + else if (!connectedToTick && needsConnection) { + api.OnTick(self).connect = Tick; + } + connectedToTick = needsConnection; +} + +private function Tick(float delta, float dilationCoefficient) +{ + delta = delta / dilationCoefficient; + // Manage disk cooldown + if (currentDiskCooldown > 0) { + currentDiskCooldown -= delta; + } + if (currentDiskCooldown <= 0 && diskQueue.length > 0) + { + currentDiskCooldown = diskSaveCooldown; + ProcessDiskQueue(); + } + // Manage jobs + if (currentJobs.length > 0) { + ProcessJobs(); + } + UpdateTickConnection(); +} + +private function ProcessJobs() +{ + local int unitsPerJob; + local int jobsToPerform; + + CleanCompletedJobs(); + jobsToPerform = Min(currentJobs.length, maxJobsPerTick); + if (jobsToPerform <= 0) { + return; + } + unitsPerJob = maxWorkUnits / jobsToPerform; + while (jobsToPerform > 0) + { + if (nextJobToPerform >= currentJobs.length) { + nextJobToPerform = 0; + } + currentJobs[nextJobToPerform].DoWork(unitsPerJob); + nextJobToPerform += 1; + jobsToPerform -= 1; + } +} + +private function ProcessDiskQueue() +{ + local int i; + + // Even if we clean disk queue here, we still need to double check + // lifetimes in the code below, since we have no idea what `.connect()` + // calls might do + CleanDiskQueue(); + if (diskQueue.length <= 0) { + return; + } + if (diskSaveCooldown > 0) + { + if (receivers[i].GetLifeVersion() == receiversLifeVersions[i]) { + diskQueue[i].connect(); + } + _.memory.Free(diskQueue[0]); + diskQueue.Remove(0, 1); + receivers.Remove(0, 1); + receiversLifeVersions.Remove(0, 1); + return; + } + for (i = 0; i < diskQueue.length; i += 1) + { + if (receivers[i].GetLifeVersion() == receiversLifeVersions[i]) { + diskQueue[i].connect(); + } + _.memory.Free(diskQueue[i]); + } + diskQueue.length = 0; + receivers.length = 0; + receiversLifeVersions.length = 0; +} + +// Removes completed jobs +private function CleanCompletedJobs() +{ + local int i; + + while (i < currentJobs.length) + { + if (currentJobs[i].IsCompleted()) + { + if (i < nextJobToPerform) { + nextJobToPerform -= 1; + } + currentJobs[i].FreeSelf(); + currentJobs.Remove(i, 1); + } + else { + i += 1; + } + } +} + +// Remove disk requests with deallocated receivers +private function CleanDiskQueue() +{ + local int i; + + while (i < diskQueue.length) + { + if (receivers[i].GetLifeVersion() == receiversLifeVersions[i]) + { + i += 1; + continue; + } + _.memory.Free(diskQueue[i]); + diskQueue.Remove(i, 1); + receivers.Remove(i, 1); + receiversLifeVersions.Remove(i, 1); + } +} + +defaultproperties +{ + diskSaveCooldown = 0.25 + maxWorkUnits = 10000 + maxJobsPerTick = 5 +} \ No newline at end of file diff --git a/sources/BaseRealm/API/Scheduler/SchedulerDiskRequest.uc b/sources/BaseRealm/API/Scheduler/SchedulerDiskRequest.uc new file mode 100644 index 0000000..06a09e0 --- /dev/null +++ b/sources/BaseRealm/API/Scheduler/SchedulerDiskRequest.uc @@ -0,0 +1,29 @@ +/** + * Slot-like object that represents a request for a writing disk access, + * capable of being scheduled on the `SchedulerAPI`. + * Copyright 2022 Anton Tarasenko + *------------------------------------------------------------------------------ + * This file is part of Acedia. + * + * Acedia is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License, or + * (at your option) any later version. + * + * Acedia is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Acedia. If not, see . + */ +class SchedulerDiskRequest extends AcediaObject; + +delegate connect() +{ +} + +defaultproperties +{ +} \ No newline at end of file diff --git a/sources/BaseRealm/API/Scheduler/SchedulerJob.uc b/sources/BaseRealm/API/Scheduler/SchedulerJob.uc new file mode 100644 index 0000000..b1fc45c --- /dev/null +++ b/sources/BaseRealm/API/Scheduler/SchedulerJob.uc @@ -0,0 +1,47 @@ +/** + * Template object that represents a job, capable of being scheduled on the + * `SchedulerAPI`. Use `IsCompleted()` to mark job as completed. + * Copyright 2022 Anton Tarasenko + *------------------------------------------------------------------------------ + * This file is part of Acedia. + * + * Acedia is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License, or + * (at your option) any later version. + * + * Acedia is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Acedia. If not, see . + */ +class SchedulerJob extends AcediaObject + abstract; + +/** + * Checks if caller `SchedulerJob` was completed. + * Once this method returns `true`, it shouldn't start returning `false` again. + * + * @return `true` if `SchedulerJob` is already completed and doesn't need to + * be further executed and `false` otherwise. + */ +public function bool IsCompleted(); + +/** + * Called when scheduler decides that `SchedulerJob` should be executed, taking + * amount of abstract "work units" that it is allowed to spend for work. + * + * @param allottedWorkUnits Work units allotted to the caller + * `SchedulerJob`. By default there is `10000` work units per second, so + * you can expect about 10000 / 1000 = 10 work units per millisecond or, + * on servers with 30 tick rate, about 10000 * (30 / 1000) = 300 work units + * per tick to be allotted to all the scheduled jobs. + */ +public function DoWork(int allottedWorkUnits); + +defaultproperties +{ +} \ No newline at end of file diff --git a/sources/BaseRealm/Global.uc b/sources/BaseRealm/Global.uc index 26b5e5b..238c021 100644 --- a/sources/BaseRealm/Global.uc +++ b/sources/BaseRealm/Global.uc @@ -39,6 +39,7 @@ var public UserAPI users; var public PlayersAPI players; var public JSONAPI json; var public DBAPI db; +var public SchedulerAPI scheduler; var public AvariceAPI avarice; var public AcediaEnvironment environment; @@ -74,6 +75,7 @@ protected function Initialize() players = PlayersAPI(memory.Allocate(class'PlayersAPI')); json = JSONAPI(memory.Allocate(class'JSONAPI')); db = DBAPI(memory.Allocate(class'DBAPI')); + scheduler = SchedulerAPI(memory.Allocate(class'SchedulerAPI')); avarice = AvariceAPI(memory.Allocate(class'AvariceAPI')); environment = AcediaEnvironment(memory.Allocate(class'AcediaEnvironment')); } @@ -94,6 +96,7 @@ public function DropCoreAPI() players = none; json = none; db = none; + scheduler = none; avarice = none; default.myself = none; } \ No newline at end of file diff --git a/sources/ClientRealm/ClientLevelCore.uc b/sources/ClientRealm/ClientLevelCore.uc index 850624a..4c97d47 100644 --- a/sources/ClientRealm/ClientLevelCore.uc +++ b/sources/ClientRealm/ClientLevelCore.uc @@ -29,6 +29,7 @@ public simulated static function LevelCore CreateLevelCore(Actor source) if (newCore != none) { __client().ConnectClientLevelCore(); } + __().scheduler.UpdateTickConnection(); return newCore; } diff --git a/sources/CoreRealm/CoreGlobal.uc b/sources/CoreRealm/CoreGlobal.uc index 64eddbc..0ed413b 100644 --- a/sources/CoreRealm/CoreGlobal.uc +++ b/sources/CoreRealm/CoreGlobal.uc @@ -56,6 +56,21 @@ protected function Initialize() time = TimeAPI(api.Allocate(adapterClass.default.timeAPIClass)); } +/** + * Checks is caller `CoreGlobal` is available to be used. + * + * Server and client `CoreGlobal` instances are always created, so that they + * can be added to `AcediaObject`s and `AcediaActor`s at any time, even before + * they were initialized (whether they ever will be or not). This method + * allows one to check whether they were already initialized and can be used. + * + * @return `true` if caller `CoreGlobal` can be used and `false` otherwise. + */ +public function bool IsAvailable() +{ + return initialized; +} + /** * Changes adapter class for the caller `...Global` instance. * diff --git a/sources/Data/Database/DBAPI.uc b/sources/Data/Database/DBAPI.uc index df5f038..5865051 100644 --- a/sources/Data/Database/DBAPI.uc +++ b/sources/Data/Database/DBAPI.uc @@ -1,7 +1,7 @@ /** * API that provides methods for creating/destroying and managing available * databases. - * Copyright 2021 Anton Tarasenko + * Copyright 2021-2022 Anton Tarasenko *------------------------------------------------------------------------------ * This file is part of Acedia. * @@ -57,6 +57,7 @@ public final function Database Load(BaseText databaseLink) local Database result; local Text immutableDatabaseName; local MutableText databaseName; + if (databaseLink == none) { return none; } @@ -99,6 +100,7 @@ public final function JSONPointer GetPointer(BaseText databaseLink) local int slashIndex; local Text textPointer; local JSONPointer result; + if (databaseLink == none) { return none; } @@ -167,6 +169,7 @@ public final function LocalDatabaseInstance LoadLocal(BaseText databaseName) local Text rootRecordName; local LocalDatabase newConfig; local LocalDatabaseInstance newLocalDBInstance; + if (databaseName == none) { return none; } @@ -224,7 +227,6 @@ public final function bool ExistsLocal(BaseText databaseName) return result; } -// TODO: deleted database must be marked as disposed + change tests too /** * Deletes local database with name `databaseName`. * @@ -237,6 +239,7 @@ public final function bool DeleteLocal(BaseText databaseName) local LocalDatabase localDatabaseConfig; local LocalDatabaseInstance localDatabase; local HashTable.Entry dbEntry; + if (databaseName == none) { return false; } @@ -246,6 +249,7 @@ public final function bool DeleteLocal(BaseText databaseName) if (localDatabase != none) { localDatabaseConfig = localDatabase.GetConfig(); + localDatabase.WriteToDisk(); _.memory.Free(localDatabase); } dbEntry = loadedLocalDatabases.TakeEntry(databaseName); @@ -270,6 +274,7 @@ private function EraseAllPackageData(BaseText packageToErase) local GameInfo game; local DBRecord nextRecord; local array allRecords; + packageName = _.text.ToString(packageToErase); if (packageName == "") { return; @@ -299,6 +304,7 @@ public final function array ListLocal() local int i; local array dbNames; local array dbNamesAsStrings; + dbNamesAsStrings = GetPerObjectNames( "AcediaDB", string(class'LocalDatabase'.name), MaxInt); diff --git a/sources/Data/Database/Local/LocalDatabaseInstance.uc b/sources/Data/Database/Local/LocalDatabaseInstance.uc index a11e5fd..c815afd 100644 --- a/sources/Data/Database/Local/LocalDatabaseInstance.uc +++ b/sources/Data/Database/Local/LocalDatabaseInstance.uc @@ -72,14 +72,9 @@ var private LocalDatabase configEntry; // Reference to the `DBRecord` that stores root object of this database var private DBRecord rootRecord; -// As long as this `Timer` runs - we are in the "cooldown" period where no disk -// updates can be done (except special cases like this object getting -// deallocated). -var private Timer diskUpdateTimer; -// Only relevant when `diskUpdateTimer` is running. `false` would mean there is -// nothing to new to write and the timer will be discarded, but `true` means -// that we have to write database on disk and restart the update timer again. -var private bool needsDiskUpdate; +// Remembers whether we've made a request for the disk access to the scheduler, +// to avoid sending multiple ones. +var private bool pendingDiskUpdate; // Last to-be-completed task added to this database var private DBTask lastTask; @@ -99,8 +94,6 @@ protected function Finalizer() WriteToDisk(); rootRecord = none; _server.unreal.OnTick(self).Disconnect(); - _.memory.Free(diskUpdateTimer); - diskUpdateTimer = none; configEntry = none; } @@ -116,39 +109,23 @@ private final function CompleteAllTasks( lastTaskLifeVersion = -1; } -private final function LocalDatabaseInstance ScheduleDiskUpdate() +private final function ScheduleDiskUpdate() { - if (diskUpdateTimer != none) + if (!pendingDiskUpdate) { - needsDiskUpdate = true; - return self; + pendingDiskUpdate = true; + _.scheduler.RequestDiskAccess(self).connect = WriteToDisk; } - WriteToDisk(); - needsDiskUpdate = false; - diskUpdateTimer = _server.time.StartTimer( - class'LocalDBSettings'.default.writeToDiskDelay); - diskUpdateTimer.OnElapsed(self).connect = DoDiskUpdate; - return self; } -private final function DoDiskUpdate(Timer source) -{ - if (needsDiskUpdate) - { - WriteToDisk(); - needsDiskUpdate = false; - diskUpdateTimer.Start(); - } - else - { - _.memory.Free(diskUpdateTimer); - diskUpdateTimer = none; - } -} - -private final function WriteToDisk() +public final function WriteToDisk() { local string packageName; + + if (!pendingDiskUpdate) { + return; + } + pendingDiskUpdate = false; if (configEntry != none) { packageName = _.text.ToString(configEntry.GetPackageName()); } diff --git a/sources/Manifest.uc b/sources/Manifest.uc index 524593e..9c43224 100644 --- a/sources/Manifest.uc +++ b/sources/Manifest.uc @@ -1,6 +1,6 @@ /** * Manifest is meant to describe contents of the Acedia's package. - * Copyright 2020 - 2022 Anton Tarasenko + * Copyright 2020-2022 Anton Tarasenko *------------------------------------------------------------------------------ * This file is part of Acedia. * @@ -53,9 +53,10 @@ defaultproperties testCases(21) = class'TEST_Command' testCases(22) = class'TEST_CommandDataBuilder' testCases(23) = class'TEST_LogMessage' - testCases(24) = class'TEST_DatabaseCommon' - testCases(25) = class'TEST_LocalDatabase' - testCases(26) = class'TEST_AcediaConfig' - testCases(27) = class'TEST_UTF8EncoderDecoder' - testCases(28) = class'TEST_AvariceStreamReader' + testCases(24) = class'TEST_SchedulerAPI' + testCases(25) = class'TEST_DatabaseCommon' + testCases(26) = class'TEST_LocalDatabase' + testCases(27) = class'TEST_AcediaConfig' + testCases(28) = class'TEST_UTF8EncoderDecoder' + testCases(29) = class'TEST_AvariceStreamReader' } \ No newline at end of file diff --git a/sources/ServerRealm/ServerLevelCore.uc b/sources/ServerRealm/ServerLevelCore.uc index abc7597..90fe938 100644 --- a/sources/ServerRealm/ServerLevelCore.uc +++ b/sources/ServerRealm/ServerLevelCore.uc @@ -31,6 +31,7 @@ public static function LevelCore CreateLevelCore(Actor source) if (newCore != none) { __server().ConnectServerLevelCore(); } + __().scheduler.UpdateTickConnection(); return newCore; }