Adding download tab and download manager, ffmpeg in worker thread

This commit is contained in:
Mark Cooper 2021-09-04 14:17:26 -05:00
parent a86bda59f6
commit e4dac5dd05
28 changed files with 757 additions and 60 deletions

View file

@ -1,15 +1,16 @@
const express = require('express')
const Logger = require('./Logger')
const User = require('./User')
const User = require('./objects/User')
const { isObject } = require('./utils/index')
class ApiController {
constructor(db, scanner, auth, streamManager, rssFeeds, emitter) {
constructor(db, scanner, auth, streamManager, rssFeeds, downloadManager, emitter) {
this.db = db
this.scanner = scanner
this.auth = auth
this.streamManager = streamManager
this.rssFeeds = rssFeeds
this.downloadManager = downloadManager
this.emitter = emitter
this.router = express()
@ -40,13 +41,13 @@ class ApiController {
this.router.patch('/user/password', this.userChangePassword.bind(this))
this.router.patch('/user/settings', this.userUpdateSettings.bind(this))
this.router.post('/authorize', this.authorize.bind(this))
this.router.get('/genres', this.getGenres.bind(this))
this.router.post('/feed', this.openRssFeed.bind(this))
this.router.get('/download/:id', this.download.bind(this))
}
find(req, res) {
@ -307,6 +308,30 @@ class ApiController {
})
}
async download(req, res) {
var downloadId = req.params.id
Logger.info('Download Request', downloadId)
var download = this.downloadManager.getDownload(downloadId)
if (!download) {
Logger.error('Download request not found', downloadId)
return res.sendStatus(404)
}
var options = {
headers: {
// 'Content-Disposition': `attachment; filename=${download.filename}`,
'Content-Type': download.mimeType
// 'Content-Length': download.size
}
}
Logger.info('Starting Download', options, 'SIZE', download.size)
res.download(download.fullPath, download.filename, options, (err) => {
if (err) {
Logger.error('Download Error', err)
}
})
}
getGenres(req, res) {
res.json({
genres: this.db.getGenres()

View file

@ -1,10 +1,9 @@
const fs = require('fs-extra')
const Path = require('path')
const njodb = require("njodb")
const jwt = require('jsonwebtoken')
const Logger = require('./Logger')
const Audiobook = require('./Audiobook')
const User = require('./User')
const Audiobook = require('./objects/Audiobook')
const User = require('./objects/User')
class Db {
constructor(CONFIG_PATH) {

212
server/DownloadManager.js Normal file
View file

@ -0,0 +1,212 @@
const Path = require('path')
const fs = require('fs-extra')
const workerThreads = require('worker_threads')
const Logger = require('./Logger')
const Download = require('./objects/Download')
const { writeConcatFile } = require('./utils/ffmpegHelpers')
const { getFileSize } = require('./utils/fileUtils')
class DownloadManager {
constructor(db, MetadataPath, emitter) {
this.db = db
this.MetadataPath = MetadataPath
this.emitter = emitter
this.downloadDirPath = Path.join(this.MetadataPath, 'downloads')
this.pendingDownloads = []
this.downloads = []
}
getDownload(downloadId) {
return this.downloads.find(d => d.id === downloadId)
}
async removeOrphanDownloads() {
try {
var dirs = await fs.readdir(this.downloadDirPath)
if (!dirs || !dirs.length) return true
await Promise.all(dirs.map(async (dirname) => {
var fullPath = Path.join(this.downloadDirPath, dirname)
Logger.info(`Removing Orphan Download ${dirname}`)
return fs.remove(fullPath)
}))
return true
} catch (error) {
return false
}
}
downloadSocketRequest(socket, payload) {
var client = socket.sheepClient
var audiobook = this.db.audiobooks.find(a => a.id === payload.audiobookId)
var options = {
...payload
}
delete options.audiobookId
this.prepareDownload(client, audiobook, options)
}
getBestFileType(tracks) {
if (!tracks || !tracks.length) {
return null
}
var firstTrack = tracks[0]
return firstTrack.ext.substr(1)
}
async prepareDownload(client, audiobook, options = {}) {
var downloadId = (Math.trunc(Math.random() * 1000) + Date.now()).toString(36)
var dlpath = Path.join(this.downloadDirPath, downloadId)
Logger.info(`Start Download for ${audiobook.id} - DownloadId: ${downloadId} - ${dlpath}`)
await fs.ensureDir(dlpath)
var downloadType = options.type || 'singleAudio'
delete options.type
var filepath = null
var filename = null
var fileext = null
var audiobookDirname = Path.basename(audiobook.path)
if (downloadType === 'singleAudio') {
var audioFileType = options.audioFileType || this.getBestFileType(audiobook.tracks)
delete options.audioFileType
filename = audiobookDirname + '.' + audioFileType
fileext = '.' + audioFileType
filepath = Path.join(dlpath, filename)
}
var downloadData = {
id: downloadId,
audiobookId: audiobook.id,
type: downloadType,
options: options,
dirpath: dlpath,
fullPath: filepath,
filename,
ext: fileext,
userId: (client && client.user) ? client.user.id : null,
socket: (client && client.socket) ? client.socket : null
}
var download = new Download()
download.setData(downloadData)
if (downloadData.socket) {
downloadData.socket.emit('download_started', download.toJSON())
}
if (download.type === 'singleAudio') {
this.processSingleAudioDownload(audiobook, download)
}
}
async processSingleAudioDownload(audiobook, download) {
// var ffmpeg = Ffmpeg()
var concatFilePath = Path.join(download.dirpath, 'files.txt')
await writeConcatFile(audiobook.tracks, concatFilePath)
var workerData = {
input: concatFilePath,
inputFormat: 'concat',
inputOption: '-safe 0',
options: [
'-loglevel warning',
'-map 0:a',
'-c:a copy'
],
output: download.fullPath
}
var worker = new workerThreads.Worker('./server/utils/downloadWorker.js', { workerData })
worker.on('message', (message) => {
if (message != null && typeof message === 'object') {
if (message.type === 'RESULT') {
this.sendResult(download, message)
}
} else {
Logger.error('Invalid worker message', message)
}
})
this.pendingDownloads.push({
id: download.id,
download,
worker
})
}
async downloadExpired(download) {
Logger.info(`[DownloadManager] Download ${download.id} expired`)
if (download.socket) {
download.socket.emit('download_expired', download.toJSON())
}
this.removeDownload(download)
}
async sendResult(download, result) {
// Remove pending download
this.pendingDownloads = this.pendingDownloads.filter(d => d.id !== download.id)
if (result.isKilled) {
if (download.socket) {
download.socket.emit('download_killed', download.toJSON())
}
return
}
if (!result.success) {
if (download.socket) {
download.socket.emit('download_failed', download.toJSON())
}
this.removeDownload(download)
return
}
// Remove files.txt if it was used
if (download.type === 'singleAudio') {
var concatFilePath = Path.join(download.dirpath, 'files.txt')
try {
await fs.remove(concatFilePath)
} catch (error) {
Logger.error('[DownloadManager] Failed to remove files.txt')
}
}
result.size = await getFileSize(download.fullPath)
download.setComplete(result)
if (download.socket) {
download.socket.emit('download_ready', download.toJSON())
}
download.setExpirationTimer(this.downloadExpired.bind(this))
this.downloads.push(download)
Logger.info(`[DownloadManager] Download Ready ${download.id}`)
}
async removeDownload(download) {
Logger.info('[DownloadManager] Removing download ' + download.id)
var pendingDl = this.pendingDownloads.find(d => d.id === download.id)
if (pendingDl) {
this.pendingDownloads = this.pendingDownloads.filter(d => d.id !== download.id)
Logger.warn(`[DownloadManager] Removing download in progress - stopping worker`)
try {
pendingDl.worker.postMessage('STOP')
} catch (error) {
Logger.error('[DownloadManager] Error posting stop message to worker', error)
}
}
await fs.remove(download.dirpath).then(() => {
Logger.info('[DownloadManager] Deleted download', download.dirpath)
}).catch((err) => {
Logger.error('[DownloadManager] Failed to delete download', err)
})
this.downloads = this.downloads.filter(d => d.id !== download.id)
}
}
module.exports = DownloadManager

View file

@ -1,6 +1,6 @@
const Logger = require('./Logger')
const BookFinder = require('./BookFinder')
const Audiobook = require('./Audiobook')
const Audiobook = require('./objects/Audiobook')
const audioFileScanner = require('./utils/audioFileScanner')
const { getAllAudiobookFiles } = require('./utils/scandir')
const { comparePaths, getIno } = require('./utils/index')

View file

@ -12,6 +12,7 @@ const ApiController = require('./ApiController')
const HlsController = require('./HlsController')
const StreamManager = require('./StreamManager')
const RssFeeds = require('./RssFeeds')
const DownloadManager = require('./DownloadManager')
const Logger = require('./Logger')
class Server {
@ -32,10 +33,10 @@ class Server {
this.scanner = new Scanner(this.AudiobookPath, this.MetadataPath, this.db, this.emitter.bind(this))
this.streamManager = new StreamManager(this.db, this.MetadataPath)
this.rssFeeds = new RssFeeds(this.Port, this.db)
this.apiController = new ApiController(this.db, this.scanner, this.auth, this.streamManager, this.rssFeeds, this.emitter.bind(this))
this.downloadManager = new DownloadManager(this.db, this.MetadataPath, this.emitter.bind(this))
this.apiController = new ApiController(this.db, this.scanner, this.auth, this.streamManager, this.rssFeeds, this.downloadManager, this.emitter.bind(this))
this.hlsController = new HlsController(this.db, this.scanner, this.auth, this.streamManager, this.emitter.bind(this), this.MetadataPath)
this.server = null
this.io = null
@ -90,6 +91,7 @@ class Server {
async init() {
Logger.info('[Server] Init')
await this.streamManager.removeOrphanStreams()
await this.downloadManager.removeOrphanDownloads()
await this.db.init()
this.auth.init()
@ -186,6 +188,7 @@ class Server {
socket.on('open_stream', (audiobookId) => this.streamManager.openStreamSocketRequest(socket, audiobookId))
socket.on('close_stream', () => this.streamManager.closeStreamRequest(socket))
socket.on('stream_update', (payload) => this.streamManager.streamUpdate(socket, payload))
socket.on('download', (payload) => this.downloadManager.downloadSocketRequest(socket, payload))
socket.on('test', () => {
socket.emit('test_received', socket.id)
})

View file

@ -1,4 +1,4 @@
const Stream = require('./Stream')
const Stream = require('./objects/Stream')
const StreamTest = require('./test/StreamTest')
const Logger = require('./Logger')
const fs = require('fs-extra')

View file

@ -1,4 +1,4 @@
var { bytesPretty } = require('./utils/fileUtils')
var { bytesPretty } = require('../utils/fileUtils')
class AudioTrack {
constructor(audioTrack = null) {

View file

@ -1,7 +1,7 @@
const Path = require('path')
const { bytesPretty, elapsedPretty } = require('./utils/fileUtils')
const { comparePaths, getIno } = require('./utils/index')
const Logger = require('./Logger')
const { bytesPretty, elapsedPretty } = require('../utils/fileUtils')
const { comparePaths, getIno } = require('../utils/index')
const Logger = require('../Logger')
const Book = require('./Book')
const AudioTrack = require('./AudioTrack')
const AudioFile = require('./AudioFile')

View file

@ -1,6 +1,7 @@
const Path = require('path')
const Logger = require('./Logger')
const parseAuthors = require('./utils/parseAuthors')
const Logger = require('../Logger')
const parseAuthors = require('../utils/parseAuthors')
class Book {
constructor(book = null) {
this.olid = null

107
server/objects/Download.js Normal file
View file

@ -0,0 +1,107 @@
const DEFAULT_EXPIRATION = 1000 * 60 * 10 // 10 minutes
class Download {
constructor(download) {
this.id = null
this.audiobookId = null
this.type = null
this.options = {}
this.dirpath = null
this.fullPath = null
this.ext = null
this.filename = null
this.size = 0
this.userId = null
this.socket = null // Socket to notify when complete
this.isReady = false
this.startedAt = null
this.finishedAt = null
this.expiresAt = null
this.expirationTimeMs = 0
if (download) {
this.construct(download)
}
}
get mimeType() {
if (this.ext === '.mp3' || this.ext === '.m4b' || this.ext === '.m4a') {
return 'audio/mpeg'
} else if (this.ext === '.mp4') {
return 'audio/mp4'
} else if (this.ext === '.ogg') {
return 'audio/ogg'
} else if (this.ext === '.aac' || this.ext === '.m4p') {
return 'audio/aac'
}
return 'audio/mpeg'
}
toJSON() {
return {
id: this.id,
audiobookId: this.audiobookId,
type: this.type,
options: this.options,
dirpath: this.dirpath,
fullPath: this.fullPath,
ext: this.ext,
filename: this.filename,
size: this.size,
userId: this.userId,
isReady: this.isReady,
startedAt: this.startedAt,
finishedAt: this.finishedAt,
expirationSeconds: this.expirationSeconds
}
}
construct(download) {
this.id = download.id
this.audiobookId = download.audiobookId
this.type = download.type
this.options = { ...download.options }
this.dirpath = download.dirpath
this.fullPath = download.fullPath
this.ext = download.ext
this.filename = download.filename
this.size = download.size || 0
this.userId = download.userId
this.socket = download.socket || null
this.isReady = !!download.isReady
this.startedAt = download.startedAt
this.finishedAt = download.finishedAt || null
this.expirationTimeMs = download.expirationTimeMs || DEFAULT_EXPIRATION
this.expiresAt = download.expiresAt || null
}
setData(downloadData) {
downloadData.startedAt = Date.now()
downloadData.isProcessing = true
this.construct(downloadData)
}
setComplete(fileSize) {
this.finishedAt = Date.now()
this.size = fileSize
this.isReady = true
this.expiresAt = this.finishedAt + this.expirationTimeMs
}
setExpirationTimer(callback) {
setTimeout(() => {
if (callback) {
callback(this)
}
}, this.expirationTimeMs)
}
}
module.exports = Download

View file

@ -2,9 +2,10 @@ const Ffmpeg = require('fluent-ffmpeg')
const EventEmitter = require('events')
const Path = require('path')
const fs = require('fs-extra')
const Logger = require('./Logger')
const { secondsToTimestamp } = require('./utils/fileUtils')
const hlsPlaylistGenerator = require('./utils/hlsPlaylistGenerator')
const Logger = require('../Logger')
const { secondsToTimestamp } = require('../utils/fileUtils')
const { writeConcatFile } = require('../utils/ffmpegHelpers')
const hlsPlaylistGenerator = require('../utils/hlsPlaylistGenerator')
class Stream extends EventEmitter {
constructor(streamPath, client, audiobook) {
@ -19,7 +20,7 @@ class Stream extends EventEmitter {
this.streamPath = Path.join(streamPath, this.id)
this.concatFilesPath = Path.join(this.streamPath, 'files.txt')
this.playlistPath = Path.join(this.streamPath, 'output.m3u8')
this.fakePlaylistPath = Path.join(this.streamPath, 'fake-output.m3u8')
this.finalPlaylistPath = Path.join(this.streamPath, 'final-output.m3u8')
this.startTime = 0
this.ffmpeg = null
@ -211,29 +212,12 @@ class Stream extends EventEmitter {
}, 2000)
}
escapeSingleQuotes(path) {
// return path.replace(/'/g, '\'\\\'\'')
return path.replace(/\\/g, '/').replace(/ /g, '\\ ').replace(/'/g, '\\\'')
}
async start() {
Logger.info(`[STREAM] START STREAM - Num Segments: ${this.numSegments}`)
this.ffmpeg = Ffmpeg()
var currTrackEnd = 0
var startingTrack = this.tracks.find(t => {
currTrackEnd += t.duration
return this.startTime < currTrackEnd
})
var trackStartTime = currTrackEnd - startingTrack.duration
var tracksToInclude = this.tracks.filter(t => t.index >= startingTrack.index)
var trackPaths = tracksToInclude.map(t => {
var line = 'file ' + this.escapeSingleQuotes(t.fullPath) + '\n' + `duration ${t.duration}`
return line
})
var inputstr = trackPaths.join('\n\n')
await fs.writeFile(this.concatFilesPath, inputstr)
var trackStartTime = await writeConcatFile(this.tracks, this.concatFilesPath, this.startTime)
this.ffmpeg.addInput(this.concatFilesPath)
this.ffmpeg.inputFormat('concat')
@ -266,7 +250,7 @@ class Stream extends EventEmitter {
])
var segmentFilename = Path.join(this.streamPath, this.segmentBasename)
this.ffmpeg.addOption(`-hls_segment_filename ${segmentFilename}`)
this.ffmpeg.output(this.fakePlaylistPath)
this.ffmpeg.output(this.finalPlaylistPath)
this.ffmpeg.on('start', (command) => {
Logger.info('[INFO] FFMPEG transcoding started with command: ' + command)

View file

@ -1,8 +1,6 @@
const Path = require('path')
const Logger = require('../Logger')
const prober = require('./prober')
const AudioFile = require('../AudioFile')
function getDefaultAudioStream(audioStreams) {
if (audioStreams.length === 1) return audioStreams[0]

View file

@ -0,0 +1,68 @@
const Ffmpeg = require('fluent-ffmpeg')
if (process.env.NODE_ENV !== 'production') {
Ffmpeg.setFfmpegPath(process.env.FFMPEG_PATH)
}
const { parentPort, workerData } = require("worker_threads")
const Logger = require('../Logger')
Logger.info('[DownloadWorker] Starting Worker...')
const ffmpegCommand = Ffmpeg()
const startTime = Date.now()
ffmpegCommand.input(workerData.input)
if (workerData.inputFormat) ffmpegCommand.inputFormat(workerData.inputFormat)
if (workerData.inputOption) ffmpegCommand.inputOption(workerData.inputOption)
if (workerData.options) ffmpegCommand.addOption(workerData.options)
ffmpegCommand.output(workerData.output)
var isKilled = false
async function runFfmpeg() {
var success = await new Promise((resolve) => {
ffmpegCommand.on('start', (command) => {
Logger.info('[DownloadWorker] FFMPEG concat started with command: ' + command)
})
ffmpegCommand.on('stderr', (stdErrline) => {
Logger.info(stdErrline)
})
ffmpegCommand.on('error', (err, stdout, stderr) => {
if (err.message && err.message.includes('SIGKILL')) {
// This is an intentional SIGKILL
Logger.info('[DownloadWorker] User Killed singleAudio')
} else {
Logger.error('[DownloadWorker] Ffmpeg Err', err.message)
}
resolve(false)
})
ffmpegCommand.on('end', (stdout, stderr) => {
Logger.info('[DownloadWorker] singleAudio ended')
resolve(true)
})
ffmpegCommand.run()
})
var resultMessage = {
type: 'RESULT',
isKilled,
elapsed: Date.now() - startTime,
success
}
parentPort.postMessage(resultMessage)
}
parentPort.on('message', (message) => {
if (message === 'STOP') {
Logger.info('[DownloadWorker] Requested a hard stop')
isKilled = true
ffmpegCommand.kill()
}
})
runFfmpeg()

View file

@ -0,0 +1,37 @@
const fs = require('fs-extra')
function escapeSingleQuotes(path) {
// return path.replace(/'/g, '\'\\\'\'')
return path.replace(/\\/g, '/').replace(/ /g, '\\ ').replace(/'/g, '\\\'')
}
// Returns first track start time
// startTime is for streams starting an encode part-way through an audiobook
async function writeConcatFile(tracks, outputPath, startTime = 0) {
var trackToStartWithIndex = 0
var firstTrackStartTime = 0
// Find first track greater than startTime
if (startTime > 0) {
var currTrackEnd = 0
var startingTrack = tracks.find(t => {
currTrackEnd += t.duration
return startTime < currTrackEnd
})
if (startingTrack) {
firstTrackStartTime = currTrackEnd - startingTrack.duration
trackToStartWithIndex = startingTrack.index
}
}
var tracksToInclude = tracks.filter(t => t.index >= trackToStartWithIndex)
var trackPaths = tracksToInclude.map(t => {
var line = 'file ' + escapeSingleQuotes(t.fullPath) + '\n' + `duration ${t.duration}`
return line
})
var inputstr = trackPaths.join('\n\n')
await fs.writeFile(outputPath, inputstr)
return firstTrackStartTime
}
module.exports.writeConcatFile = writeConcatFile

View file

@ -17,6 +17,13 @@ async function getFileStat(path) {
}
module.exports.getFileStat = getFileStat
async function getFileSize(path) {
var stat = await getFileStat(path)
if (!stat) return 0
return stat.size || 0
}
module.exports.getFileSize = getFileSize
function bytesPretty(bytes, decimals = 0) {
if (bytes === 0) {
return '0 Bytes'