Browse Source

Fix style for `SchedulerApi`

core_refactor
Anton Tarasenko 2 years ago
parent
commit
5bba953cb0
  1. 362
      sources/BaseRealm/API/Scheduler/SchedulerAPI.uc
  2. 16
      sources/BaseRealm/API/Scheduler/SchedulerDiskRequest.uc
  3. 41
      sources/BaseRealm/API/Scheduler/SchedulerJob.uc
  4. 0
      sources/BaseRealm/API/Scheduler/Tests/MockJob.uc
  5. 0
      sources/BaseRealm/API/Scheduler/Tests/TEST_SchedulerAPI.uc

362
sources/BaseRealm/API/Scheduler/SchedulerAPI.uc

@ -1,9 +1,8 @@
/** /**
* API that provides functions for scheduling jobs and expensive tasks such * Author: dkanus
* as writing onto the disk. Also provides methods for users to inform API that * Home repo: https://www.insultplayers.ru/git/AcediaFramework/AcediaCore
* they've recently did an expensive operation, so that `SchedulerAPI` is to * License: GPL
* try and use less resources when managing jobs. * Copyright 2022-2023 Anton Tarasenko
* Copyright 2022 Anton Tarasenko
*------------------------------------------------------------------------------ *------------------------------------------------------------------------------
* This file is part of Acedia. * This file is part of Acedia.
* *
@ -20,139 +19,81 @@
* You should have received a copy of the GNU General Public License * You should have received a copy of the GNU General Public License
* along with Acedia. If not, see <https://www.gnu.org/licenses/>. * along with Acedia. If not, see <https://www.gnu.org/licenses/>.
*/ */
class SchedulerAPI extends AcediaObject class SchedulerApi extends AcediaObject
config(AcediaSystem); config(AcediaSystem);
/** //! This API is meant for scheduling various actions over time to help emulating
* # `SchedulerAPI` //! multi-threading by spreading some code executions over several different
* //! game/server ticks.
* UnrealScript is inherently single-threaded and whatever method you call, //!
* it will be completely executed within a single game's tick. //! UnrealScript is inherently single-threaded and whatever method you call,
* This API is meant for scheduling various actions over time to help emulating //! it will be completely executed within a single game's tick.
* multi-threading by spreading some code executions over several different
* game/server ticks.
*
* ## Usage
*
* ### Job scheduling
*
* One of the reasons which is faulty infinite loop detection system that
* will crash the game/server if it thinks UnrealScript code has executed too
* many operations (it is not about execution time, logging a lot of messages
* with `Log()` can take a lot of time and not crash anything, while simple
* loop, that would've finished much sooner, can trigger a crash).
* This is a very atypical problem for mods to have, but Acedia's
* introduction of databases and avarice link can lead to users trying to read
* (from database or network) an object that is too big, leading to a crash.
* Jobs are not about performance, they're about crash prevention.
*
* In case you have such a job of your own, that can potentially take too
* many steps to finish without crashing, you can convert it into
* a `SchedulerJob` (you make a subclass for your type of the job and
* instantiate it for each execution of the job). This requires you to
* restructure your algorithm in such a way, that it is able to run for some
* finite (maybe small) amount of steps and postpone the rest of calculations
* to the next tick and put it into a method
* `SchedulerJob.DoWork(int allottedWorkUnits)`, where `allottedWorkUnits` is
* how much your method is allowed to do during this call, assuming `10000`
* units of work on their own won't lead to a crash.
* Another method `SchedulerJob.IsCompleted()` needs to be setup to return
* `true` iff your job is done.
* After you prepared an instance of your job subclass, simply pass it to
* `_.scheduler.AddJob()`.
*
* ### Disk usage requests
*
* Writing to the disk (saving data into config file, saving local database
* changes) can be an expensive operation and to avoid lags in gameplay you
* might want to spread such operations over time.
* `_.scheduler.RequestDiskAccess()` method allows you to do that. It is not
* exactly a signal, but it acts similar to one: to request a right to save to
* the disk, just do the following:
* `_.scheduler.RequestDiskAccess(<receiver>).connect = <disk_writing_method>`
* and `disk_writing_method()` will be called once your turn come up.
*
* ## Manual ticking
*
* If any kind of level core (either server or client one) was created,
* this API will automatically perform necessary actions every tick.
* Otherwise, if only base API is available, there's no way to do that, but
* you can manually decide when to tick this API by calling `ManualTick()`
* method.
*/
/** // How often can files be saved on disk.
* How often can files be saved on disk. This is a relatively expensive //
* operation and we don't want to write a lot of different files at once. // This is a relatively expensive operation and we don't want to write a lot of different files
* But since we lack a way to exactly measure how much time that saving will // at once.
* take, AcediaCore falls back to simply performing every saving with same // But since we lack a way to exactly measure how much time that saving will take, AcediaCore falls
* uniform time intervals in-between. // back to simply performing every saving with same uniform time intervals in-between.
* This variable decides how much time there should be between two file // This variable decides how much time there should be between two file writing accesses.
* writing accesses. // Negative and zero values mean that all writing disk access will be granted as soon as possible,
* Negative and zero values mean that all writing disk access will be // without any cooldowns.
* granted as soon as possible, without any cooldowns.
*/
var private config float diskSaveCooldown; var private config float diskSaveCooldown;
/**
* Maximum total work units for jobs allowed per tick. Jobs are expected to be // Maximum total work units for jobs allowed per tick.
* constructed such that they don't lead to a crash if they have to perform //
* this much work. // Jobs are expected to be constructed such that they don't lead to a crash if they have to perform
* // this much work.
* Changing default value of `10000` is not advised. // Changing default value of `10000` is not advised.
*/
var private config int maxWorkUnits; var private config int maxWorkUnits;
/**
* How many different jobs can be performed per tick. This limit is added so // How many different jobs can be performed per tick.
* that `maxWorkUnits` won't be spread too thin if a lot of jobs get registered //
* at once. // This limit is added so that `maxWorkUnits` won't be spread too thin if a lot of jobs
*/ // get registered at once.
var private config int maxJobsPerTick; var private config int maxJobsPerTick;
// We can (and will) automatically tick // We can (and will) automatically tick
var private bool tickAvailable; var private bool tickAvailable;
// `true` == it is safe to use server API for a tick // `true` == it is safe to use server API for a tick
// `false` == it is safe to use client API for a tick // `false` == it is safe to use client API for a tick
var private bool tickFromServer; var private bool tickFromServer;
// Our `Tick()` method is currently connected to the `OnTick()` signal. // Our `Tick()` method is currently connected to the `OnTick()` signal.
// Keeping track of this allows us to disconnect from `OnTick()` signal //
// when it is not necessary. // Keeping track of this allows us to disconnect from `OnTick()` signal when it is not necessary.
var private bool connectedToTick; var private bool connectedToTick;
// How much time if left until we can write to the disk again? // How much time if left until we can write to the disk again?
var private float currentDiskCooldown; var private float currentDiskCooldown;
// There is a limit (`maxJobsPerTick`) to how many different jobs we can // There is a limit (`maxJobsPerTick`) to how many different jobs we can perform per tick and if we
// perform per tick and if we register an amount jobs over that limit, we need // register an amount jobs over that limit, we need to uniformly spread execution time between them.
// to uniformly spread execution time between them. //
// To achieve that we simply cyclically (in order) go over `currentJobs` // To achieve that we simply cyclically (in order) go over `currentJobs` array, each time executing
// array, each time executing exactly `maxJobsPerTick` jobs. // exactly `maxJobsPerTick` jobs.
// `nextJobToPerform` remembers what job is to be executed next tick. //
var private int nextJobToPerform; // `nextJobToPerform` remembers what job is to be executed next tick.
var private int nextJobToPerform;
var private array<SchedulerJob> currentJobs; var private array<SchedulerJob> currentJobs;
// Storing receiver objects, following example of signals/slots, is done // Storing receiver objects, following example of signals/slots, is done without increasing their
// without increasing their reference count, allowing them to get deallocated // reference count, allowing them to get deallocated while we are still keeping their reference.
// while we are still keeping their reference. //
// To avoid using such deallocated receivers, we keep track of the life // To avoid using such deallocated receivers, we keep track of the life versions they've had when
// versions they've had when their disk requests were registered. // their disk requests were registered.
var private array<SchedulerDiskRequest> diskQueue; var private array<SchedulerDiskRequest> diskQueue;
var private array<AcediaObject> receivers; var private array<AcediaObject> receivers;
var private array<int> receiversLifeVersions; var private array<int> receiversLifeVersions;
/** /// Registers new scheduler job to be executed in the API.
* Registers new scheduler job `newJob` to be executed in the API. ///
* /// Does nothing if given `newJob` is already added.
* @param newJob New job to be scheduled for execution. public function AddJob(SchedulerJob newJob) {
* Does nothing if given `newJob` is already added.
*/
public function AddJob(SchedulerJob newJob)
{
local int i; local int i;
if (newJob == none) { if (newJob == none) {
return; return;
} }
for (i = 0; i < currentJobs.length; i += 1) for (i = 0; i < currentJobs.length; i += 1) {
{
if (currentJobs[i] == newJob) { if (currentJobs[i] == newJob) {
return; return;
} }
@ -162,116 +103,102 @@ public function AddJob(SchedulerJob newJob)
UpdateTickConnection(); UpdateTickConnection();
} }
/** /// Requests another disk access.
* Requests another disk access. ///
* /// Use it like signal: `RequestDiskAccess(<receiver>).connect = <handler>`.
* Use it like signal: `RequestDiskAccess(<receiver>).connect = <handler>`. /// Since it is meant to be used as a signal, so DO NOT STORE/RELEASE returned wrapper object
* Since it is meant to be used as a signal, so DO NOT STORE/RELEASE returned /// [`SchedulerDiskRequest`].
* wrapper object `SchedulerDiskRequest`. ///
* /// Same as for signal/slots, [`receiver`] is an object, responsible for the disk request.
* @param receiver Same as for signal/slots, this is an object, responsible /// If this object gets deallocated - request will be thrown away.
* for the disk request. If this object gets deallocated - request will be /// Typically this should be an object in which connected method will be executed.
* thrown away. /// Returns wrapper object that provides `connect` delegate.
* Typically this should be an object in which connected method will be ///
* executed. /// # Examples
* @return Wrapper object that provides `connect` delegate. ///
*/ /// ```
public function SchedulerDiskRequest RequestDiskAccess(AcediaObject receiver) /// _.scheduler.RequestDiskAccess(self).connect = MethodThatSaves();
{ /// ```
public function SchedulerDiskRequest RequestDiskAccess(AcediaObject receiver) {
local SchedulerDiskRequest newRequest; local SchedulerDiskRequest newRequest;
if (receiver == none) return none; if (receiver == none) return none;
if (!receiver.IsAllocated()) return none; if (!receiver.IsAllocated()) return none;
newRequest = newRequest = SchedulerDiskRequest(_.memory.Allocate(class'SchedulerDiskRequest'));
SchedulerDiskRequest(_.memory.Allocate(class'SchedulerDiskRequest'));
diskQueue[diskQueue.length] = newRequest; diskQueue[diskQueue.length] = newRequest;
receivers[receivers.length] = receiver; receivers[receivers.length] = receiver;
receiversLifeVersions[receiversLifeVersions.length] = receiversLifeVersions[receiversLifeVersions.length] = receiver.GetLifeVersion();
receiver.GetLifeVersion();
UpdateTickConnection(); UpdateTickConnection();
return newRequest; return newRequest;
} }
/** /// Returns amount of incomplete jobs are currently registered in the scheduler.
* Tells you how many incomplete jobs are currently registered in public function int GetJobsAmount() {
* the scheduler.
*
* @return How many incomplete jobs are currently registered in the scheduler.
*/
public function int GetJobsAmount()
{
CleanCompletedJobs(); CleanCompletedJobs();
return currentJobs.length; return currentJobs.length;
} }
/** /// Returns amount of disk access requests are currently registered in the scheduler.
* Tells you how many disk access requests are currently registered in public function int GetDiskQueueSize() {
* the scheduler.
*
* @return How many incomplete disk access requests are currently registered
* in the scheduler.
*/
public function int GetDiskQueueSize()
{
CleanDiskQueue(); CleanDiskQueue();
return diskQueue.length; return diskQueue.length;
} }
/** /// Performs another batch of scheduled tasks.
* In case neither server, nor client core is registered, scheduler must be ///
* ticked manually. For that call this method each separate tick (or whatever /// In case neither server, nor client core is registered, scheduler must be ticked manually.
* is your closest approximation available for that). /// For that call this method each separate tick (or whatever is your closest approximation
* /// available for that).
* Before manually invoking this method, you should check if scheduler /// Before manually invoking this method, you should check if scheduler actually started to tick
* actually started to tick *automatically*. Use `_.scheduler.IsAutomated()` /// *automatically*.
* for that. /// Use `_.scheduler.IsAutomated()` for that.
* ///
* NOTE: If neither server-/client- core is created, nor `ManualTick()` is /// Argument is a time (real, not in-game one) that is supposedly passes from the moment
* invoked manually, `SchedulerAPI` won't actually do anything. /// [`SchedulerApi::ManualTick()`] was called last time.
* /// Used for tracking disk access cooldowns.
* @param delta Time (real one) that is supposedly passes from the moment /// How [`SchedulerJob`]s are executed is independent from this value.
* `ManualTick()` was called last time. Used for tracking disk access ///
* cooldowns. How `SchedulerJob`s are executed is independent from this /// Returns time (real, not in-game one) that is supposedly passes from the moment
* value. /// [`SchedulerApi::ManualTick()`] was called last time.
*/ ///
public final function ManualTick(optional float delta) /// # Examples
{ ///
/// ```
/// if (!_.scheduler.IsAutomated()) {
/// _.scheduler.ManualTick(0.05);
/// }
/// ```
///
/// # Note
///
/// If neither server-/client- core is created, nor [`SchedulerApi::ManualTick()`] is invoked
/// manually, [`SchedulerApi`] won't actually do anything.
public final function ManualTick(optional float delta) {
Tick(delta, 1.0); Tick(delta, 1.0);
} }
/** /// Returns whether scheduler ticking automated.
* Is scheduler ticking automated? It can only be automated if either ///
* server or client level cores are created. Scheduler can automatically enable /// It can only be automated if either server or client level cores are created.
* automation and it cannot be prevented, but can be helped by using /// Scheduler can automatically enable automation and it cannot be prevented, but can be helped by
* `UpdateTickConnection()` method. /// using [`SchedulerApi::UpdateTickConnection()`] method.
* public function bool IsAutomated() {
* @return `true` if scheduler's tick is automatically called and `false`
* otherwise (and calling `ManualTick()` is required).
*/
public function bool IsAutomated()
{
return tickAvailable; return tickAvailable;
} }
/** /// Causes `SchedulerApi` to try automating itself by searching for level cores (checking if
* Causes `SchedulerAPI` to try automating itself by searching for level cores /// server/client APIs are enabled).
* (checking if server/client APIs are enabled). public function UpdateTickConnection() {
*/ local bool needsConnection;
public function UpdateTickConnection()
{
local bool needsConnection;
local UnrealAPI api; local UnrealAPI api;
if (!tickAvailable) if (!tickAvailable) {
{ if (_server.IsAvailable()) {
if (_server.IsAvailable())
{
tickAvailable = true; tickAvailable = true;
tickFromServer = true; tickFromServer = true;
} }
else if (_client.IsAvailable()) else if (_client.IsAvailable()) {
{
tickAvailable = true; tickAvailable = true;
tickFromServer = false; tickFromServer = false;
} }
@ -285,28 +212,23 @@ public function UpdateTickConnection()
} }
if (tickFromServer) { if (tickFromServer) {
api = _server.unreal; api = _server.unreal;
} } else {
else {
api = _client.unreal; api = _client.unreal;
} }
if (connectedToTick && !needsConnection) { if (connectedToTick && !needsConnection) {
api.OnTick(self).Disconnect(); api.OnTick(self).Disconnect();
} } else if (!connectedToTick && needsConnection) {
else if (!connectedToTick && needsConnection) {
api.OnTick(self).connect = Tick; api.OnTick(self).connect = Tick;
} }
connectedToTick = needsConnection; connectedToTick = needsConnection;
} }
private function Tick(float delta, float dilationCoefficient) private function Tick(float delta, float dilationCoefficient) {
{
delta = delta / dilationCoefficient; delta = delta / dilationCoefficient;
// Manage disk cooldown
if (currentDiskCooldown > 0) { if (currentDiskCooldown > 0) {
currentDiskCooldown -= delta; currentDiskCooldown -= delta;
} }
if (currentDiskCooldown <= 0 && diskQueue.length > 0) if (currentDiskCooldown <= 0 && diskQueue.length > 0) {
{
currentDiskCooldown = diskSaveCooldown; currentDiskCooldown = diskSaveCooldown;
ProcessDiskQueue(); ProcessDiskQueue();
} }
@ -328,8 +250,7 @@ private function ProcessJobs()
return; return;
} }
unitsPerJob = maxWorkUnits / jobsToPerform; unitsPerJob = maxWorkUnits / jobsToPerform;
while (jobsToPerform > 0) while (jobsToPerform > 0) {
{
if (nextJobToPerform >= currentJobs.length) { if (nextJobToPerform >= currentJobs.length) {
nextJobToPerform = 0; nextJobToPerform = 0;
} }
@ -350,8 +271,7 @@ private function ProcessDiskQueue()
if (diskQueue.length <= 0) { if (diskQueue.length <= 0) {
return; return;
} }
if (diskSaveCooldown > 0) if (diskSaveCooldown > 0) {
{
if (receivers[i].GetLifeVersion() == receiversLifeVersions[i]) { if (receivers[i].GetLifeVersion() == receiversLifeVersions[i]) {
diskQueue[i].connect(); diskQueue[i].connect();
} }
@ -361,8 +281,7 @@ private function ProcessDiskQueue()
receiversLifeVersions.Remove(0, 1); receiversLifeVersions.Remove(0, 1);
return; return;
} }
for (i = 0; i < diskQueue.length; i += 1) for (i = 0; i < diskQueue.length; i += 1) {
{
if (receivers[i].GetLifeVersion() == receiversLifeVersions[i]) { if (receivers[i].GetLifeVersion() == receiversLifeVersions[i]) {
diskQueue[i].connect(); diskQueue[i].connect();
} }
@ -378,31 +297,25 @@ private function CleanCompletedJobs()
{ {
local int i; local int i;
while (i < currentJobs.length) while (i < currentJobs.length) {
{ if (currentJobs[i].IsCompleted()) {
if (currentJobs[i].IsCompleted())
{
if (i < nextJobToPerform) { if (i < nextJobToPerform) {
nextJobToPerform -= 1; nextJobToPerform -= 1;
} }
currentJobs[i].FreeSelf(); currentJobs[i].FreeSelf();
currentJobs.Remove(i, 1); currentJobs.Remove(i, 1);
} } else {
else {
i += 1; i += 1;
} }
} }
} }
// Remove disk requests with deallocated receivers // Remove disk requests with deallocated receivers
private function CleanDiskQueue() private function CleanDiskQueue() {
{
local int i; local int i;
while (i < diskQueue.length) while (i < diskQueue.length) {
{ if (receivers[i].GetLifeVersion() == receiversLifeVersions[i]) {
if (receivers[i].GetLifeVersion() == receiversLifeVersions[i])
{
i += 1; i += 1;
continue; continue;
} }
@ -413,8 +326,7 @@ private function CleanDiskQueue()
} }
} }
defaultproperties defaultproperties {
{
diskSaveCooldown = 0.25 diskSaveCooldown = 0.25
maxWorkUnits = 10000 maxWorkUnits = 10000
maxJobsPerTick = 5 maxJobsPerTick = 5

16
sources/BaseRealm/API/Scheduler/SchedulerDiskRequest.uc

@ -1,7 +1,8 @@
/** /**
* Slot-like object that represents a request for a writing disk access, * Author: dkanus
* capable of being scheduled on the `SchedulerAPI`. * Home repo: https://www.insultplayers.ru/git/AcediaFramework/AcediaCore
* Copyright 2022 Anton Tarasenko * License: GPL
* Copyright 2022-2023 Anton Tarasenko
*------------------------------------------------------------------------------ *------------------------------------------------------------------------------
* This file is part of Acedia. * This file is part of Acedia.
* *
@ -20,10 +21,11 @@
*/ */
class SchedulerDiskRequest extends AcediaObject; class SchedulerDiskRequest extends AcediaObject;
delegate connect() //! Slot-like object that represents a request for a writing disk access, capable of being scheduled
{ //! on the [`SchedulerApi`].
delegate connect() {
} }
defaultproperties defaultproperties {
{
} }

41
sources/BaseRealm/API/Scheduler/SchedulerJob.uc

@ -1,7 +1,8 @@
/** /**
* Template object that represents a job, capable of being scheduled on the * Author: dkanus
* `SchedulerAPI`. Use `IsCompleted()` to mark job as completed. * Home repo: https://www.insultplayers.ru/git/AcediaFramework/AcediaCore
* Copyright 2022 Anton Tarasenko * License: GPL
* Copyright 2022-2023 Anton Tarasenko
*------------------------------------------------------------------------------ *------------------------------------------------------------------------------
* This file is part of Acedia. * This file is part of Acedia.
* *
@ -21,27 +22,23 @@
class SchedulerJob extends AcediaObject class SchedulerJob extends AcediaObject
abstract; abstract;
/** //! Template object that represents a job, capable of being scheduled on the [`SchedulerAPI`].
* Checks if caller `SchedulerJob` was completed. //! Use [`IsCompleted()`] to mark job as completed.
* Once this method returns `true`, it shouldn't start returning `false` again.
* /// Checks if caller [`SchedulerJob`] was completed.
* @return `true` if `SchedulerJob` is already completed and doesn't need to ///
* be further executed and `false` otherwise. /// Returns `true` if [`SchedulerJob`] is already completed and doesn't need to be further executed
*/ /// and `false` otherwise.
/// Once this method returns `true`, it shouldn't start returning `false` again.
public function bool IsCompleted(); public function bool IsCompleted();
/** /// Called when scheduler decides that [`SchedulerJob`] should be executed, taking amount of abstract
* Called when scheduler decides that `SchedulerJob` should be executed, taking /// "work units" that it is allowed to spend for work.
* amount of abstract "work units" that it is allowed to spend for work. ///
* /// By default there is `10000` work units per second, so you can expect about 10000 / 1000 = 10
* @param allottedWorkUnits Work units allotted to the caller /// work units per millisecond or, on servers with `30` tick rate, about `10000 * (30 / 1000) = 300`
* `SchedulerJob`. By default there is `10000` work units per second, so /// work units per tick to be allotted to all the scheduled jobs.
* you can expect about 10000 / 1000 = 10 work units per millisecond or,
* on servers with 30 tick rate, about 10000 * (30 / 1000) = 300 work units
* per tick to be allotted to all the scheduled jobs.
*/
public function DoWork(int allottedWorkUnits); public function DoWork(int allottedWorkUnits);
defaultproperties defaultproperties {
{
} }

0
sources/BaseRealm/API/Scheduler/API/MockJob.uc → sources/BaseRealm/API/Scheduler/Tests/MockJob.uc

0
sources/BaseRealm/API/Scheduler/API/TEST_SchedulerAPI.uc → sources/BaseRealm/API/Scheduler/Tests/TEST_SchedulerAPI.uc

Loading…
Cancel
Save