Create podcast manager and re-organize managers

This commit is contained in:
advplyr 2022-03-20 16:41:06 -05:00
parent 122f2a2556
commit e1e6b46456
12 changed files with 86 additions and 65 deletions

View file

@ -0,0 +1,287 @@
const Path = require('path')
const cron = require('node-cron')
const fs = require('fs-extra')
const archiver = require('archiver')
const StreamZip = require('node-stream-zip')
// Utils
const { getFileSize } = require('../utils/fileUtils')
const filePerms = require('../utils/filePerms')
const Logger = require('../Logger')
const Backup = require('../objects/Backup')
class BackupManager {
constructor(db, emitter) {
this.BackupPath = Path.join(global.MetadataPath, 'backups')
this.ItemsMetadataPath = Path.join(global.MetadataPath, 'items')
this.db = db
this.emitter = emitter
this.scheduleTask = null
this.backups = []
// If backup exceeds this value it will be aborted
this.MaxBytesBeforeAbort = 1000000000 // ~ 1GB
}
get serverSettings() {
return this.db.serverSettings || {}
}
async init() {
var backupsDirExists = await fs.pathExists(this.BackupPath)
if (!backupsDirExists) {
await fs.ensureDir(this.BackupPath)
await filePerms.setDefault(this.BackupPath)
}
await this.loadBackups()
this.scheduleCron()
}
scheduleCron() {
if (!this.serverSettings.backupSchedule) {
Logger.info(`[BackupManager] Auto Backups are disabled`)
return
}
try {
var cronSchedule = this.serverSettings.backupSchedule
this.scheduleTask = cron.schedule(cronSchedule, this.runBackup.bind(this))
} catch (error) {
Logger.error(`[BackupManager] Failed to schedule backup cron ${this.serverSettings.backupSchedule}`, error)
}
}
updateCronSchedule() {
if (this.scheduleTask && !this.serverSettings.backupSchedule) {
Logger.info(`[BackupManager] Disabling backup schedule`)
if (this.scheduleTask.destroy) this.scheduleTask.destroy()
this.scheduleTask = null
} else if (!this.scheduleTask && this.serverSettings.backupSchedule) {
Logger.info(`[BackupManager] Starting backup schedule ${this.serverSettings.backupSchedule}`)
this.scheduleCron()
} else if (this.serverSettings.backupSchedule) {
Logger.info(`[BackupManager] Restarting backup schedule ${this.serverSettings.backupSchedule}`)
if (this.scheduleTask.destroy) this.scheduleTask.destroy()
this.scheduleCron()
}
}
async uploadBackup(req, res) {
var backupFile = req.files.file
if (Path.extname(backupFile.name) !== '.audiobookshelf') {
Logger.error(`[BackupManager] Invalid backup file uploaded "${backupFile.name}"`)
return res.status(500).send('Invalid backup file')
}
var tempPath = Path.join(this.BackupPath, backupFile.name)
var success = await backupFile.mv(tempPath).then(() => true).catch((error) => {
Logger.error('[BackupManager] Failed to move backup file', path, error)
return false
})
if (!success) {
return res.status(500).send('Failed to move backup file into backups directory')
}
const zip = new StreamZip.async({ file: tempPath })
const data = await zip.entryData('details')
var details = data.toString('utf8').split('\n')
var backup = new Backup({ details, fullPath: tempPath })
backup.fileSize = await getFileSize(backup.fullPath)
var existingBackupIndex = this.backups.findIndex(b => b.id === backup.id)
if (existingBackupIndex >= 0) {
Logger.warn(`[BackupManager] Backup already exists with id ${backup.id} - overwriting`)
this.backups.splice(existingBackupIndex, 1, backup)
} else {
this.backups.push(backup)
}
return res.json(this.backups.map(b => b.toJSON()))
}
async requestCreateBackup(res) {
var backupSuccess = await this.runBackup()
if (backupSuccess) res.json(this.backups.map(b => b.toJSON()))
else res.sendStatus(500)
}
async requestApplyBackup(backup) {
const zip = new StreamZip.async({ file: backup.fullPath })
await zip.extract('config/', global.ConfigPath)
if (backup.backupMetadataCovers) {
await zip.extract('metadata-items/', this.ItemsMetadataPath)
}
await this.db.reinit()
this.emitter('backup_applied')
}
async loadBackups() {
try {
var filesInDir = await fs.readdir(this.BackupPath)
for (let i = 0; i < filesInDir.length; i++) {
var filename = filesInDir[i]
if (filename.endsWith('.audiobookshelf')) {
var fullFilePath = Path.join(this.BackupPath, filename)
const zip = new StreamZip.async({ file: fullFilePath })
const data = await zip.entryData('details')
var details = data.toString('utf8').split('\n')
var backup = new Backup({ details, fullPath: fullFilePath })
backup.fileSize = await getFileSize(backup.fullPath)
var existingBackupWithId = this.backups.find(b => b.id === backup.id)
if (existingBackupWithId) {
Logger.warn(`[BackupManager] Backup already loaded with id ${backup.id} - ignoring`)
} else {
this.backups.push(backup)
}
Logger.debug(`[BackupManager] Backup found "${backup.id}"`)
zip.close()
}
}
Logger.info(`[BackupManager] ${this.backups.length} Backups Found`)
} catch (error) {
Logger.error('[BackupManager] Failed to load backups', error)
}
}
async runBackup() {
// Check if Metadata Path is inside Config Path (otherwise there will be an infinite loop as the archiver tries to zip itself)
Logger.info(`[BackupManager] Running Backup`)
var metadataItemsPath = this.serverSettings.backupMetadataCovers ? this.ItemsMetadataPath : null
var newBackup = new Backup()
const newBackData = {
backupMetadataCovers: this.serverSettings.backupMetadataCovers,
backupDirPath: this.BackupPath
}
newBackup.setData(newBackData)
var zipResult = await this.zipBackup(metadataItemsPath, newBackup).then(() => true).catch((error) => {
Logger.error(`[BackupManager] Backup Failed ${error}`)
return false
})
if (zipResult) {
Logger.info(`[BackupManager] Backup successful ${newBackup.id}`)
await filePerms.setDefault(newBackup.fullPath)
newBackup.fileSize = await getFileSize(newBackup.fullPath)
var existingIndex = this.backups.findIndex(b => b.id === newBackup.id)
if (existingIndex >= 0) {
this.backups.splice(existingIndex, 1, newBackup)
} else {
this.backups.push(newBackup)
}
// Check remove oldest backup
if (this.backups.length > this.serverSettings.backupsToKeep) {
this.backups.sort((a, b) => a.createdAt - b.createdAt)
var oldBackup = this.backups.shift()
Logger.debug(`[BackupManager] Removing old backup ${oldBackup.id}`)
this.removeBackup(oldBackup)
}
return true
} else {
return false
}
}
async removeBackup(backup) {
try {
Logger.debug(`[BackupManager] Removing Backup "${backup.fullPath}"`)
await fs.remove(backup.fullPath)
this.backups = this.backups.filter(b => b.id !== backup.id)
Logger.info(`[BackupManager] Backup "${backup.id}" Removed`)
} catch (error) {
Logger.error(`[BackupManager] Failed to remove backup`, error)
}
}
zipBackup(metadataItemsPath, backup) {
return new Promise((resolve, reject) => {
// create a file to stream archive data to
const output = fs.createWriteStream(backup.fullPath)
const archive = archiver('zip', {
zlib: { level: 9 } // Sets the compression level.
})
// listen for all archive data to be written
// 'close' event is fired only when a file descriptor is involved
output.on('close', () => {
Logger.info('[BackupManager]', archive.pointer() + ' total bytes')
resolve()
})
// This event is fired when the data source is drained no matter what was the data source.
// It is not part of this library but rather from the NodeJS Stream API.
// @see: https://nodejs.org/api/stream.html#stream_event_end
output.on('end', () => {
Logger.debug('Data has been drained')
})
output.on('finish', () => {
Logger.debug('Write Stream Finished')
})
output.on('error', (err) => {
Logger.debug('Write Stream Error', err)
reject(err)
})
// good practice to catch warnings (ie stat failures and other non-blocking errors)
archive.on('warning', function (err) {
if (err.code === 'ENOENT') {
// log warning
Logger.warn(`[BackupManager] Archiver warning: ${err.message}`)
} else {
// throw error
Logger.error(`[BackupManager] Archiver error: ${err.message}`)
// throw err
reject(err)
}
})
archive.on('error', function (err) {
Logger.error(`[BackupManager] Archiver error: ${err.message}`)
reject(err)
})
archive.on('progress', ({ fs: fsobj }) => {
if (fsobj.processedBytes > this.MaxBytesBeforeAbort) {
Logger.error(`[BackupManager] Archiver is too large - aborting to prevent endless loop, Bytes Processed: ${fsobj.processedBytes}`)
archive.abort()
setTimeout(() => {
this.removeBackup(backup)
output.destroy('Backup too large') // Promise is reject in write stream error evt
}, 500)
}
})
// pipe archive data to the file
archive.pipe(output)
archive.directory(this.db.LibraryItemsPath, 'config/libraryItems')
archive.directory(this.db.UsersPath, 'config/users')
archive.directory(this.db.SessionsPath, 'config/sessions')
archive.directory(this.db.LibrariesPath, 'config/libraries')
archive.directory(this.db.SettingsPath, 'config/settings')
archive.directory(this.db.CollectionsPath, 'config/collections')
archive.directory(this.db.AuthorsPath, 'config/authors')
archive.directory(this.db.SeriesPath, 'config/series')
if (metadataItemsPath) {
Logger.debug(`[BackupManager] Backing up Metadata Items "${metadataItemsPath}"`)
archive.directory(metadataItemsPath, 'metadata-items')
}
archive.append(backup.detailsString, { name: 'details' })
archive.finalize()
})
}
}
module.exports = BackupManager

View file

@ -0,0 +1,121 @@
const Path = require('path')
const fs = require('fs-extra')
const stream = require('stream')
const Logger = require('../Logger')
const { resizeImage } = require('../utils/ffmpegHelpers')
class CacheManager {
constructor() {
this.CachePath = Path.join(global.MetadataPath, 'cache')
this.CoverCachePath = Path.join(this.CachePath, 'covers')
this.ImageCachePath = Path.join(this.CachePath, 'images')
}
async handleCoverCache(res, libraryItem, options = {}) {
const format = options.format || 'webp'
const width = options.width || 400
const height = options.height || null
res.type(`image/${format}`)
var path = Path.join(this.CoverCachePath, `${libraryItem.id}_${width}${height ? `x${height}` : ''}`) + '.' + format
// Cache exists
if (await fs.pathExists(path)) {
const r = fs.createReadStream(path)
const ps = new stream.PassThrough()
stream.pipeline(r, ps, (err) => {
if (err) {
console.log(err)
return res.sendStatus(400)
}
})
return ps.pipe(res)
}
// Write cache
await fs.ensureDir(this.CoverCachePath)
if (!libraryItem.media.coverPath || !await fs.pathExists(libraryItem.media.coverPath)) {
return res.sendStatus(404)
}
let writtenFile = await resizeImage(libraryItem.media.coverPath, path, width, height)
if (!writtenFile) return res.sendStatus(400)
var readStream = fs.createReadStream(writtenFile)
readStream.pipe(res)
}
purgeCoverCache(libraryItemId) {
return this.purgeEntityCache(libraryItemId, this.CoverCachePath)
}
purgeImageCache(entityId) {
return this.purgeEntityCache(entityId, this.ImageCachePath)
}
async purgeEntityCache(entityId, cachePath) {
// If purgeAll has been called... The cover cache directory no longer exists
await fs.ensureDir(cachePath)
return Promise.all((await fs.readdir(cachePath)).reduce((promises, file) => {
if (file.startsWith(entityId)) {
Logger.debug(`[CacheManager] Going to purge ${file}`);
promises.push(this.removeCache(Path.join(cachePath, file)))
}
return promises
}, []))
}
removeCache(path) {
if (!path) return false
return fs.pathExists(path).then((exists) => {
if (!exists) return false
return fs.unlink(path).then(() => true).catch((err) => {
Logger.error(`[CacheManager] Failed to remove cache "${path}"`, err)
return false
})
})
}
async purgeAll() {
if (await fs.pathExists(this.CachePath)) {
await fs.remove(this.CachePath).catch((error) => {
Logger.error(`[CacheManager] Failed to remove cache dir "${this.CachePath}"`, error)
})
}
}
async handleAuthorCache(res, author, options = {}) {
const format = options.format || 'webp'
const width = options.width || 400
const height = options.height || null
res.type(`image/${format}`)
var path = Path.join(this.ImageCachePath, `${author.id}_${width}${height ? `x${height}` : ''}`) + '.' + format
// Cache exists
if (await fs.pathExists(path)) {
const r = fs.createReadStream(path)
const ps = new stream.PassThrough()
stream.pipeline(r, ps, (err) => {
if (err) {
console.log(err)
return res.sendStatus(400)
}
})
return ps.pipe(res)
}
// Write cache
await fs.ensureDir(this.ImageCachePath)
let writtenFile = await resizeImage(author.imagePath, path, width, height)
if (!writtenFile) return res.sendStatus(400)
var readStream = fs.createReadStream(writtenFile)
readStream.pipe(res)
}
}
module.exports = CacheManager

View file

@ -0,0 +1,252 @@
const fs = require('fs-extra')
const Path = require('path')
const axios = require('axios')
const Logger = require('../Logger')
const readChunk = require('read-chunk')
const imageType = require('image-type')
const filePerms = require('../utils/filePerms')
const globals = require('../utils/globals')
const { downloadFile } = require('../utils/fileUtils')
const { extractCoverArt } = require('../utils/ffmpegHelpers')
class CoverManager {
constructor(db, cacheManager) {
this.db = db
this.cacheManager = cacheManager
this.ItemMetadataPath = Path.posix.join(global.MetadataPath, 'items')
}
getCoverDirectory(libraryItem) {
if (this.db.serverSettings.storeCoverWithBook) {
return libraryItem.path
} else {
return Path.posix.join(this.ItemMetadataPath, libraryItem.id)
}
}
getFilesInDirectory(dir) {
try {
return fs.readdir(dir)
} catch (error) {
Logger.error(`[CoverManager] Failed to get files in dir ${dir}`, error)
return []
}
}
removeFile(filepath) {
try {
return fs.pathExists(filepath).then((exists) => {
if (!exists) Logger.warn(`[CoverManager] Attempting to remove file that does not exist ${filepath}`)
return exists ? fs.unlink(filepath) : false
})
} catch (error) {
Logger.error(`[CoverManager] Failed to remove file "${filepath}"`, error)
return false
}
}
// Remove covers that dont have the same filename as the new cover
async removeOldCovers(dirpath, newCoverExt) {
var filesInDir = await this.getFilesInDirectory(dirpath)
for (let i = 0; i < filesInDir.length; i++) {
var file = filesInDir[i]
var _extname = Path.extname(file)
var _filename = Path.basename(file, _extname)
if (_filename === 'cover' && _extname !== newCoverExt) {
var filepath = Path.join(dirpath, file)
Logger.debug(`[CoverManager] Removing old cover from metadata "${filepath}"`)
await this.removeFile(filepath)
}
}
}
async checkFileIsValidImage(imagepath, removeOnInvalid = false) {
const buffer = await readChunk(imagepath, 0, 12)
const imgType = imageType(buffer)
if (!imgType) {
if (removeOnInvalid) await this.removeFile(imagepath)
return {
error: 'Invalid image'
}
}
if (!globals.SupportedImageTypes.includes(imgType.ext)) {
if (removeOnInvalid) await this.removeFile(imagepath)
return {
error: `Invalid image type ${imgType.ext} (Supported: ${globals.SupportedImageTypes.join(',')})`
}
}
return imgType
}
async uploadCover(libraryItem, coverFile) {
var extname = Path.extname(coverFile.name.toLowerCase())
if (!extname || !globals.SupportedImageTypes.includes(extname.slice(1))) {
return {
error: `Invalid image type ${extname} (Supported: ${globals.SupportedImageTypes.join(',')})`
}
}
var coverDirPath = this.getCoverDirectory(libraryItem)
await fs.ensureDir(coverDirPath)
var coverFullPath = Path.posix.join(coverDirPath, `cover${extname}`)
// Move cover from temp upload dir to destination
var success = await coverFile.mv(coverFullPath).then(() => true).catch((error) => {
Logger.error('[CoverManager] Failed to move cover file', path, error)
return false
})
if (!success) {
return {
error: 'Failed to move cover into destination'
}
}
await this.removeOldCovers(coverDirPath, extname)
await this.cacheManager.purgeCoverCache(libraryItem.id)
Logger.info(`[CoverManager] Uploaded libraryItem cover "${coverFullPath}" for "${libraryItem.media.metadata.title}"`)
libraryItem.updateMediaCover(coverFullPath)
return {
cover: coverFullPath
}
}
async downloadCoverFromUrl(libraryItem, url) {
try {
var coverDirPath = this.getCoverDirectory(libraryItem)
await fs.ensureDir(coverDirPath)
var temppath = Path.posix.join(coverDirPath, 'cover')
var success = await downloadFile(url, temppath).then(() => true).catch((err) => {
Logger.error(`[CoverManager] Download image file failed for "${url}"`, err)
return false
})
if (!success) {
return {
error: 'Failed to download image from url'
}
}
var imgtype = await this.checkFileIsValidImage(temppath, true)
if (imgtype.error) {
return imgtype
}
var coverFilename = `cover.${imgtype.ext}`
var coverFullPath = Path.posix.join(coverDirPath, coverFilename)
await fs.rename(temppath, coverFullPath)
await this.removeOldCovers(coverDirPath, '.' + imgtype.ext)
await this.cacheManager.purgeCoverCache(libraryItem.id)
Logger.info(`[CoverManager] Downloaded libraryItem cover "${coverFullPath}" from url "${url}" for "${libraryItem.media.metadata.title}"`)
libraryItem.updateMediaCover(coverFullPath)
return {
cover: coverFullPath
}
} catch (error) {
Logger.error(`[CoverManager] Fetch cover image from url "${url}" failed`, error)
return {
error: 'Failed to fetch image from url'
}
}
}
async validateCoverPath(coverPath, libraryItem) {
// Invalid cover path
if (!coverPath || coverPath.startsWith('http:') || coverPath.startsWith('https:')) {
Logger.error(`[CoverManager] validate cover path invalid http url "${coverPath}"`)
return {
error: 'Invalid cover path'
}
}
coverPath = coverPath.replace(/\\/g, '/')
// Cover path already set on media
if (libraryItem.media.coverPath == coverPath) {
Logger.debug(`[CoverManager] validate cover path already set "${coverPath}"`)
return {
cover: coverPath,
updated: false
}
}
// Cover path does not exist
if (!await fs.pathExists(coverPath)) {
Logger.error(`[CoverManager] validate cover path does not exist "${coverPath}"`)
return {
error: 'Cover path does not exist'
}
}
// Check valid image at path
var imgtype = await this.checkFileIsValidImage(coverPath, true)
if (imgtype.error) {
return imgtype
}
var coverDirPath = this.getCoverDirectory(libraryItem)
// Cover path is not in correct directory - make a copy
if (!coverPath.startsWith(coverDirPath)) {
await fs.ensureDir(coverDirPath)
var coverFilename = `cover.${imgtype.ext}`
var newCoverPath = Path.posix.join(coverDirPath, coverFilename)
Logger.debug(`[CoverManager] validate cover path copy cover from "${coverPath}" to "${newCoverPath}"`)
var copySuccess = await fs.copy(coverPath, newCoverPath, { overwrite: true }).then(() => true).catch((error) => {
Logger.error(`[CoverManager] validate cover path failed to copy cover`, error)
return false
})
if (!copySuccess) {
return {
error: 'Failed to copy cover to dir'
}
}
await filePerms.setDefault(newCoverPath)
await this.removeOldCovers(coverDirPath, '.' + imgtype.ext)
Logger.debug(`[CoverManager] cover copy success`)
coverPath = newCoverPath
}
await this.cacheManager.purgeCoverCache(libraryItem.id)
libraryItem.updateMediaCover(coverPath)
return {
cover: coverPath,
updated: true
}
}
async saveEmbeddedCoverArt(libraryItem) {
var audioFileWithCover = libraryItem.media.audioFiles.find(af => af.embeddedCoverArt)
if (!audioFileWithCover) return false
var coverDirPath = this.getCoverDirectory(libraryItem)
await fs.ensureDir(coverDirPath)
var coverFilename = audioFileWithCover.embeddedCoverArt === 'png' ? 'cover.png' : 'cover.jpg'
var coverFilePath = Path.join(coverDirPath, coverFilename)
var coverAlreadyExists = await fs.pathExists(coverFilePath)
if (coverAlreadyExists) {
Logger.warn(`[CoverManager] Extract embedded cover art but cover already exists for "${libraryItem.media.metadata.title}" - bail`)
return false
}
var success = await extractCoverArt(audioFileWithCover.metadata.path, coverFilePath)
if (success) {
libraryItem.updateMediaCover(coverFilePath)
return coverFilePath
}
return false
}
}
module.exports = CoverManager

View file

@ -0,0 +1,387 @@
const Path = require('path')
const fs = require('fs-extra')
const archiver = require('archiver')
const workerThreads = require('worker_threads')
const Logger = require('../Logger')
const Download = require('../objects/Download')
const filePerms = require('../utils/filePerms')
const { getId } = require('../utils/index')
const { writeConcatFile, writeMetadataFile } = require('../utils/ffmpegHelpers')
const { getFileSize } = require('../utils/fileUtils')
const TAG = 'DownloadManager'
class DownloadManager {
constructor(db) {
this.db = db
this.downloadDirPath = Path.join(global.MetadataPath, 'downloads')
this.pendingDownloads = []
this.downloads = []
}
getDownload(downloadId) {
return this.downloads.find(d => d.id === downloadId)
}
async removeOrphanDownloads() {
try {
var dirs = await fs.readdir(this.downloadDirPath)
if (!dirs || !dirs.length) return true
await Promise.all(dirs.map(async (dirname) => {
var fullPath = Path.join(this.downloadDirPath, dirname)
Logger.info(`Removing Orphan Download ${dirname}`)
return fs.remove(fullPath)
}))
return true
} catch (error) {
return false
}
}
downloadSocketRequest(socket, payload) {
var client = socket.sheepClient
var audiobook = this.db.audiobooks.find(a => a.id === payload.audiobookId)
var options = {
...payload
}
delete options.audiobookId
this.prepareDownload(client, audiobook, options)
}
removeSocketRequest(socket, downloadId) {
var download = this.downloads.find(d => d.id === downloadId)
if (!download) {
Logger.error('Remove download request download not found ' + downloadId)
return
}
this.removeDownload(download)
}
async prepareDownload(client, audiobook, options = {}) {
var downloadId = getId('dl')
var dlpath = Path.join(this.downloadDirPath, downloadId)
Logger.info(`Start Download for ${audiobook.id} - DownloadId: ${downloadId} - ${dlpath}`)
await fs.ensureDir(dlpath)
var downloadType = options.type || 'singleAudio'
delete options.type
var fileext = null
var audiobookDirname = Path.basename(audiobook.path)
if (downloadType === 'singleAudio') {
var audioFileType = options.audioFileType || '.m4b'
delete options.audioFileType
if (audioFileType === 'same') {
var firstTrack = audiobook.tracks[0]
audioFileType = firstTrack.ext
}
fileext = audioFileType
} else if (downloadType === 'zip') {
fileext = '.zip'
}
var filename = audiobookDirname + fileext
var downloadData = {
id: downloadId,
audiobookId: audiobook.id,
type: downloadType,
options: options,
dirpath: dlpath,
fullPath: Path.join(dlpath, filename),
filename,
ext: fileext,
userId: (client && client.user) ? client.user.id : null,
socket: (client && client.socket) ? client.socket : null
}
var download = new Download()
download.setData(downloadData)
download.setTimeoutTimer(this.downloadTimedOut.bind(this))
if (downloadData.socket) {
downloadData.socket.emit('download_started', download.toJSON())
}
if (download.type === 'singleAudio') {
this.processSingleAudioDownload(audiobook, download)
} else if (download.type === 'zip') {
this.processZipDownload(audiobook, download)
}
}
async processZipDownload(audiobook, download) {
this.pendingDownloads.push({
id: download.id,
download
})
Logger.info(`[DownloadManager] Processing Zip download ${download.fullPath}`)
var success = await this.zipAudiobookDir(audiobook.fullPath, download.fullPath).then(() => {
return true
}).catch((error) => {
Logger.error('[DownloadManager] Process Zip Failed', error)
return false
})
this.sendResult(download, { success })
}
zipAudiobookDir(audiobookPath, downloadPath) {
return new Promise((resolve, reject) => {
// create a file to stream archive data to
const output = fs.createWriteStream(downloadPath)
const archive = archiver('zip', {
zlib: { level: 9 } // Sets the compression level.
})
// listen for all archive data to be written
// 'close' event is fired only when a file descriptor is involved
output.on('close', () => {
Logger.info(archive.pointer() + ' total bytes')
Logger.debug('archiver has been finalized and the output file descriptor has closed.')
resolve()
})
// This event is fired when the data source is drained no matter what was the data source.
// It is not part of this library but rather from the NodeJS Stream API.
// @see: https://nodejs.org/api/stream.html#stream_event_end
output.on('end', () => {
Logger.debug('Data has been drained')
})
// good practice to catch warnings (ie stat failures and other non-blocking errors)
archive.on('warning', function (err) {
if (err.code === 'ENOENT') {
// log warning
Logger.warn(`[DownloadManager] Archiver warning: ${err.message}`)
} else {
// throw error
Logger.error(`[DownloadManager] Archiver error: ${err.message}`)
// throw err
reject(err)
}
})
archive.on('error', function (err) {
Logger.error(`[DownloadManager] Archiver error: ${err.message}`)
reject(err)
})
// pipe archive data to the file
archive.pipe(output)
archive.directory(audiobookPath, false)
archive.finalize()
})
}
async processSingleAudioDownload(audiobook, download) {
// If changing audio file type then encoding is needed
var audioRequiresEncode = audiobook.tracks[0].ext !== download.ext
var shouldIncludeCover = download.includeCover && audiobook.book.cover
var firstTrackIsM4b = audiobook.tracks[0].ext.toLowerCase() === '.m4b'
var isOneTrack = audiobook.tracks.length === 1
const ffmpegInputs = []
if (!isOneTrack) {
var concatFilePath = Path.join(download.dirpath, 'files.txt')
await writeConcatFile(audiobook.tracks, concatFilePath)
ffmpegInputs.push({
input: concatFilePath,
options: ['-safe 0', '-f concat']
})
} else {
ffmpegInputs.push({
input: audiobook.tracks[0].fullPath,
options: firstTrackIsM4b ? ['-f mp4'] : []
})
}
const logLevel = process.env.NODE_ENV === 'production' ? 'error' : 'warning'
var ffmpegOptions = [`-loglevel ${logLevel}`]
var ffmpegOutputOptions = []
if (audioRequiresEncode) {
ffmpegOptions = ffmpegOptions.concat([
'-map 0:a',
'-acodec aac',
'-ac 2',
'-b:a 64k',
'-id3v2_version 3'
])
} else {
ffmpegOptions.push('-max_muxing_queue_size 1000')
if (isOneTrack && firstTrackIsM4b && !shouldIncludeCover) {
ffmpegOptions.push('-c copy')
} else {
ffmpegOptions.push('-c:a copy')
}
}
if (download.ext === '.m4b') {
Logger.info('Concat m4b\'s use -f mp4')
ffmpegOutputOptions.push('-f mp4')
}
if (download.includeMetadata) {
var metadataFilePath = Path.join(download.dirpath, 'metadata.txt')
await writeMetadataFile(audiobook, metadataFilePath)
ffmpegInputs.push({
input: metadataFilePath
})
ffmpegOptions.push('-map_metadata 1')
}
if (shouldIncludeCover) {
var _cover = audiobook.book.coverFullPath.replace(/\\/g, '/')
// Supporting old local file prefix
var bookCoverPath = audiobook.book.cover ? audiobook.book.cover.replace(/\\/g, '/') : null
if (!_cover && bookCoverPath && bookCoverPath.startsWith('/local')) {
_cover = Path.posix.join(global.AudiobookPath, _cover.replace('/local', ''))
Logger.debug('Local cover url', _cover)
}
ffmpegInputs.push({
input: _cover,
options: ['-f image2pipe']
})
ffmpegOptions.push('-vf [2:v]crop=trunc(iw/2)*2:trunc(ih/2)*2')
ffmpegOptions.push('-map 2:v')
}
var workerData = {
inputs: ffmpegInputs,
options: ffmpegOptions,
outputOptions: ffmpegOutputOptions,
output: download.fullPath,
}
var worker = null
try {
var workerPath = Path.join(global.appRoot, 'server/utils/downloadWorker.js')
worker = new workerThreads.Worker(workerPath, { workerData })
} catch (error) {
Logger.error(`[${TAG}] Start worker thread failed`, error)
if (download.socket) {
var downloadJson = download.toJSON()
download.socket.emit('download_failed', downloadJson)
}
this.removeDownload(download)
return
}
worker.on('message', (message) => {
if (message != null && typeof message === 'object') {
if (message.type === 'RESULT') {
if (!download.isTimedOut) {
this.sendResult(download, message)
}
} else if (message.type === 'FFMPEG') {
if (Logger[message.level]) {
Logger[message.level](message.log)
}
}
} else {
Logger.error('Invalid worker message', message)
}
})
this.pendingDownloads.push({
id: download.id,
download,
worker
})
}
async downloadTimedOut(download) {
Logger.info(`[DownloadManager] Download ${download.id} timed out (${download.timeoutTimeMs}ms)`)
if (download.socket) {
var downloadJson = download.toJSON()
downloadJson.isTimedOut = true
download.socket.emit('download_failed', downloadJson)
}
this.removeDownload(download)
}
async downloadExpired(download) {
Logger.info(`[DownloadManager] Download ${download.id} expired`)
if (download.socket) {
download.socket.emit('download_expired', download.toJSON())
}
this.removeDownload(download)
}
async sendResult(download, result) {
download.clearTimeoutTimer()
// Remove pending download
this.pendingDownloads = this.pendingDownloads.filter(d => d.id !== download.id)
if (result.isKilled) {
if (download.socket) {
download.socket.emit('download_killed', download.toJSON())
}
return
}
if (!result.success) {
if (download.socket) {
download.socket.emit('download_failed', download.toJSON())
}
this.removeDownload(download)
return
}
// Set file permissions and ownership
await filePerms.setDefault(download.fullPath)
var filesize = await getFileSize(download.fullPath)
download.setComplete(filesize)
if (download.socket) {
download.socket.emit('download_ready', download.toJSON())
}
download.setExpirationTimer(this.downloadExpired.bind(this))
this.downloads.push(download)
Logger.info(`[DownloadManager] Download Ready ${download.id}`)
}
async removeDownload(download) {
Logger.info('[DownloadManager] Removing download ' + download.id)
download.clearTimeoutTimer()
download.clearExpirationTimer()
var pendingDl = this.pendingDownloads.find(d => d.id === download.id)
if (pendingDl) {
this.pendingDownloads = this.pendingDownloads.filter(d => d.id !== download.id)
Logger.warn(`[DownloadManager] Removing download in progress - stopping worker`)
if (pendingDl.worker) {
try {
pendingDl.worker.postMessage('STOP')
} catch (error) {
Logger.error('[DownloadManager] Error posting stop message to worker', error)
}
}
}
await fs.remove(download.dirpath).then(() => {
Logger.info('[DownloadManager] Deleted download', download.dirpath)
}).catch((err) => {
Logger.error('[DownloadManager] Failed to delete download', err)
})
this.downloads = this.downloads.filter(d => d.id !== download.id)
}
}
module.exports = DownloadManager

View file

@ -0,0 +1,133 @@
const Path = require('path')
const fs = require('fs-extra')
const DailyLog = require('../objects/DailyLog')
const Logger = require('../Logger')
const TAG = '[LogManager]'
class LogManager {
constructor(db) {
this.db = db
this.logDirPath = Path.join(global.MetadataPath, 'logs')
this.dailyLogDirPath = Path.join(this.logDirPath, 'daily')
this.currentDailyLog = null
this.dailyLogBuffer = []
this.dailyLogFiles = []
}
get serverSettings() {
return this.db.serverSettings || {}
}
get loggerDailyLogsToKeep() {
return this.serverSettings.loggerDailyLogsToKeep || 7
}
async init() {
// Load daily logs
await this.scanLogFiles()
// Check remove extra daily logs
if (this.dailyLogFiles.length > this.loggerDailyLogsToKeep) {
var dailyLogFilesCopy = [...this.dailyLogFiles]
for (let i = 0; i < dailyLogFilesCopy.length - this.loggerDailyLogsToKeep; i++) {
var logFileToRemove = dailyLogFilesCopy[i]
await this.removeLogFile(logFileToRemove)
}
}
var currentDailyLogFilename = DailyLog.getCurrentDailyLogFilename()
Logger.info(TAG, `Init current daily log filename: ${currentDailyLogFilename}`)
this.currentDailyLog = new DailyLog()
this.currentDailyLog.setData({ dailyLogDirPath: this.dailyLogDirPath })
if (this.dailyLogFiles.includes(currentDailyLogFilename)) {
Logger.debug(TAG, `Daily log file already exists - set in Logger`)
await this.currentDailyLog.loadLogs()
} else {
this.dailyLogFiles.push(this.currentDailyLog.filename)
}
// Log buffered Logs
if (this.dailyLogBuffer.length) {
this.dailyLogBuffer.forEach((logObj) => {
this.currentDailyLog.appendLog(logObj)
})
this.dailyLogBuffer = []
}
}
async scanLogFiles() {
await fs.ensureDir(this.dailyLogDirPath)
var dailyFiles = await fs.readdir(this.dailyLogDirPath)
if (dailyFiles && dailyFiles.length) {
dailyFiles.forEach((logFile) => {
if (Path.extname(logFile) === '.txt') {
Logger.debug('Daily Log file found', logFile)
this.dailyLogFiles.push(logFile)
} else {
Logger.debug(TAG, 'Unknown File in Daily log files dir', logFile)
}
})
}
this.dailyLogFiles.sort()
}
async removeOldestLog() {
if (!this.dailyLogFiles.length) return
var oldestLog = this.dailyLogFiles[0]
return this.removeLogFile(oldestLog)
}
async removeLogFile(filename) {
var fullPath = Path.join(this.dailyLogDirPath, filename)
var exists = await fs.pathExists(fullPath)
if (!exists) {
Logger.error(TAG, 'Invalid log dne ' + fullPath)
this.dailyLogFiles = this.dailyLogFiles.filter(dlf => dlf.filename !== filename)
} else {
try {
await fs.unlink(fullPath)
Logger.info(TAG, 'Removed daily log: ' + filename)
this.dailyLogFiles = this.dailyLogFiles.filter(dlf => dlf.filename !== filename)
} catch (error) {
Logger.error(TAG, 'Failed to unlink log file ' + fullPath)
}
}
}
logToFile(logObj) {
if (!this.currentDailyLog) {
this.dailyLogBuffer.push(logObj)
return
}
// Check log rolls to next day
if (this.currentDailyLog.id !== DailyLog.getCurrentDateString()) {
var newDailyLog = new DailyLog()
newDailyLog.setData({ dailyLogDirPath: this.dailyLogDirPath })
this.currentDailyLog = newDailyLog
if (this.dailyLogFiles.length > this.loggerDailyLogsToKeep) {
this.removeOldestLog()
}
}
// Append log line to log file
this.currentDailyLog.appendLog(logObj)
}
socketRequestDailyLogs(socket) {
if (!this.currentDailyLog) {
return
}
var lastLogs = this.currentDailyLog.logs.slice(-5000)
socket.emit('daily_logs', lastLogs)
}
}
module.exports = LogManager

View file

@ -0,0 +1,143 @@
const Path = require('path')
const { PlayMethod } = require('../utils/constants')
const PlaybackSession = require('../objects/PlaybackSession')
const Stream = require('../objects/Stream')
const Logger = require('../Logger')
class PlaybackSessionManager {
constructor(db, emitter, clientEmitter) {
this.db = db
this.StreamsPath = Path.join(global.MetadataPath, 'streams')
this.emitter = emitter
this.clientEmitter = clientEmitter
this.sessions = []
}
getSession(sessionId) {
return this.sessions.find(s => s.id === sessionId)
}
getUserSession(userId) {
return this.sessions.find(s => s.userId === userId)
}
getStream(sessionId) {
var session = this.getSession(sessionId)
return session ? session.stream : null
}
async startSessionRequest(user, libraryItem, mediaEntity, options, res) {
const session = await this.startSession(user, libraryItem, mediaEntity, options)
res.json(session.toJSONForClient())
}
async syncSessionRequest(user, session, payload, res) {
await this.syncSession(user, session, payload)
res.json(session.toJSONForClient())
}
async closeSessionRequest(user, session, syncData, res) {
await this.closeSession(user, session, syncData)
res.sendStatus(200)
}
async startSession(user, libraryItem, mediaEntity, options) {
var shouldDirectPlay = options.forceDirectPlay || (!options.forceTranscode && mediaEntity.checkCanDirectPlay(options))
const userProgress = user.getLibraryItemProgress(libraryItem.id)
var userStartTime = 0
if (userProgress) userStartTime = userProgress.currentTime || 0
const newPlaybackSession = new PlaybackSession()
newPlaybackSession.setData(libraryItem, mediaEntity, user)
var audioTracks = []
if (shouldDirectPlay) {
Logger.debug(`[PlaybackSessionManager] "${user.username}" starting direct play session for media entity "${mediaEntity.id}"`)
audioTracks = mediaEntity.getDirectPlayTracklist(libraryItem.id)
newPlaybackSession.playMethod = PlayMethod.DIRECTPLAY
} else {
Logger.debug(`[PlaybackSessionManager] "${user.username}" starting stream session for media entity "${mediaEntity.id}"`)
var stream = new Stream(newPlaybackSession.id, this.StreamsPath, user, libraryItem, mediaEntity, userStartTime, this.clientEmitter.bind(this))
await stream.generatePlaylist()
audioTracks = [stream.getAudioTrack()]
newPlaybackSession.stream = stream
newPlaybackSession.playMethod = PlayMethod.TRANSCODE
stream.on('closed', () => {
Logger.debug(`[PlaybackSessionManager] Stream closed for session "${newPlaybackSession.id}"`)
newPlaybackSession.stream = null
})
}
newPlaybackSession.currentTime = userStartTime
newPlaybackSession.audioTracks = audioTracks
// Will save on the first sync
user.currentSessionId = newPlaybackSession.id
this.sessions.push(newPlaybackSession)
this.emitter('user_stream_update', user.toJSONForPublic(this.sessions, this.db.libraryItems))
return newPlaybackSession
}
async syncSession(user, session, syncData) {
var libraryItem = this.db.libraryItems.find(li => li.id === session.libraryItemId)
if (!libraryItem) {
Logger.error(`[PlaybackSessionManager] syncSession Library Item not found "${sessino.libraryItemId}"`)
return
}
session.currentTime = syncData.currentTime
session.addListeningTime(syncData.timeListened)
Logger.debug(`[PlaybackSessionManager] syncSession "${session.id}" | Total Time Listened: ${session.timeListening}`)
const itemProgressUpdate = {
mediaEntityId: syncData.mediaEntityId || null,
duration: syncData.duration,
currentTime: syncData.currentTime,
progress: session.progress
}
var wasUpdated = user.createUpdateLibraryItemProgress(libraryItem, itemProgressUpdate)
if (wasUpdated) {
await this.db.updateEntity('user', user)
var itemProgress = user.getLibraryItemProgress(session.libraryItemId)
this.clientEmitter(user.id, 'user_item_progress_updated', {
id: itemProgress.id,
data: itemProgress.toJSON()
})
}
this.saveSession(session)
}
async closeSession(user, session, syncData = null) {
if (syncData) {
await this.syncSession(user, session, syncData)
} else {
await this.saveSession(session)
}
Logger.debug(`[PlaybackSessionManager] closeSession "${session.id}"`)
this.emitter('user_stream_update', user.toJSONForPublic(this.sessions, this.db.libraryItems))
return this.removeSession(session.id)
}
saveSession(session) {
if (!session.timeListening) return // Do not save a session with no listening time
if (session.lastSave) {
return this.db.updateEntity('session', session)
} else {
session.lastSave = Date.now()
return this.db.insertEntity('session', session)
}
}
async removeSession(sessionId) {
var session = this.sessions.find(s => s.id === sessionId)
if (!session) return
if (session.stream) {
await session.stream.close()
}
this.sessions = this.sessions.filter(s => s.id !== sessionId)
Logger.debug(`[PlaybackSessionManager] Removed session "${sessionId}"`)
}
}
module.exports = PlaybackSessionManager

View file

@ -0,0 +1,12 @@
class PodcastManager {
constructor(db) {
this.db = db
this.downloadQueue = []
}
async downloadPodcasts(podcasts, targetDir) {
}
}
module.exports = PodcastManager