forked from geolba/tethys.backend
96 lines
2.9 KiB
TypeScript
96 lines
2.9 KiB
TypeScript
import ResumptionToken from './ResumptionToken.js';
|
|
import { createClient, RedisClientType } from 'redis';
|
|
import InternalServerErrorException from '#app/exceptions/InternalServerException';
|
|
import { sprintf } from 'sprintf-js';
|
|
import dayjs from 'dayjs';
|
|
import TokenWorkerContract from './TokenWorkerContract.js';
|
|
|
|
export default class TokenWorkerService implements TokenWorkerContract {
|
|
protected filePrefix = 'rs_';
|
|
protected fileExtension = 'txt';
|
|
|
|
private cache: RedisClientType;
|
|
public ttl: number;
|
|
private url: string;
|
|
private connected = false;
|
|
|
|
constructor(ttl: number) {
|
|
this.ttl = ttl; // time to live
|
|
this.url = process.env.REDIS_URL || 'redis://127.0.0.1:6379';
|
|
}
|
|
|
|
public async connect() {
|
|
this.cache = createClient({ url: this.url });
|
|
this.cache.on('error', (err) => {
|
|
this.connected = false;
|
|
console.log('[Redis] Redis Client Error: ', err);
|
|
});
|
|
this.cache.on('connect', () => {
|
|
this.connected = true;
|
|
});
|
|
await this.cache.connect();
|
|
}
|
|
|
|
public get isConnected(): boolean {
|
|
return this.connected;
|
|
}
|
|
|
|
public async has(key: string): Promise<boolean> {
|
|
const result = await this.cache.get(key);
|
|
return result !== undefined && result !== null;
|
|
}
|
|
|
|
public async set(token: ResumptionToken): Promise<string> {
|
|
const uniqueName = await this.generateUniqueName();
|
|
|
|
const serialToken = JSON.stringify(token);
|
|
await this.cache.setEx(uniqueName, this.ttl, serialToken);
|
|
return uniqueName;
|
|
}
|
|
|
|
private async generateUniqueName(): Promise<string> {
|
|
let fc = 0;
|
|
const uniqueId = dayjs().unix().toString();
|
|
let uniqueName: string;
|
|
let cacheKeyExists: boolean;
|
|
do {
|
|
// format values
|
|
// %s - String
|
|
// %d - Signed decimal number (negative, zero or positive)
|
|
// [0-9] (Specifies the minimum width held of to the variable value)
|
|
uniqueName = sprintf('%s%05d', uniqueId, fc++);
|
|
cacheKeyExists = await this.has(uniqueName);
|
|
} while (cacheKeyExists);
|
|
return uniqueName;
|
|
}
|
|
|
|
public async get(key: string): Promise<ResumptionToken | null> {
|
|
if (!this.cache) {
|
|
throw new InternalServerErrorException('Dataset is not available for OAI export!');
|
|
}
|
|
|
|
const result = await this.cache.get(key);
|
|
return result ? this.parseToken(result) : null;
|
|
}
|
|
|
|
private parseToken(result: string): ResumptionToken {
|
|
const rToken: ResumptionToken = new ResumptionToken();
|
|
const parsed = JSON.parse(result);
|
|
Object.assign(rToken, parsed);
|
|
return rToken;
|
|
}
|
|
|
|
public del(key: string) {
|
|
this.cache.del(key);
|
|
}
|
|
|
|
public flush() {
|
|
this.cache.flushAll();
|
|
}
|
|
|
|
public async close() {
|
|
await this.cache.disconnect();
|
|
this.connected = false;
|
|
}
|
|
}
|