mirror of
https://github.com/advplyr/audiobookshelf.git
synced 2025-07-18 18:04:29 +02:00
New data model start of PlaybackSessionManager to replace StreamManager, remove podcast & ip npm package
This commit is contained in:
parent
68b13ae45f
commit
0af6ad63c1
11 changed files with 109 additions and 116 deletions
235
server/objects/legacy/StreamManager.js
Normal file
235
server/objects/legacy/StreamManager.js
Normal file
|
@ -0,0 +1,235 @@
|
|||
const Stream = require('../Stream')
|
||||
// const StreamTest = require('./test/StreamTest')
|
||||
const Logger = require('../../Logger')
|
||||
const fs = require('fs-extra')
|
||||
const Path = require('path')
|
||||
|
||||
class StreamManager {
|
||||
constructor(db, emitter, clientEmitter) {
|
||||
this.db = db
|
||||
|
||||
this.emitter = emitter
|
||||
this.clientEmitter = clientEmitter
|
||||
|
||||
this.streams = []
|
||||
this.StreamsPath = Path.join(global.MetadataPath, 'streams')
|
||||
}
|
||||
|
||||
getStream(streamId) {
|
||||
return this.streams.find(s => s.id === streamId)
|
||||
}
|
||||
|
||||
removeStream(stream) {
|
||||
this.streams = this.streams.filter(s => s.id !== stream.id)
|
||||
}
|
||||
|
||||
async openStream(client, libraryItem, transcodeOptions = {}) {
|
||||
if (!client || !client.user) {
|
||||
Logger.error('[StreamManager] Cannot open stream invalid client', client)
|
||||
return
|
||||
}
|
||||
var stream = new Stream(this.StreamsPath, client, libraryItem, transcodeOptions)
|
||||
|
||||
stream.on('closed', () => {
|
||||
this.removeStream(stream)
|
||||
})
|
||||
|
||||
this.streams.push(stream)
|
||||
|
||||
await stream.generatePlaylist()
|
||||
stream.start()
|
||||
|
||||
Logger.info('Stream Opened for client', client.user.username, 'for item', stream.itemTitle, 'with streamId', stream.id)
|
||||
|
||||
client.stream = stream
|
||||
client.user.stream = stream.id
|
||||
|
||||
return stream
|
||||
}
|
||||
|
||||
ensureStreamsDir() {
|
||||
return fs.ensureDir(this.StreamsPath)
|
||||
}
|
||||
|
||||
removeOrphanStreamFiles(streamId) {
|
||||
try {
|
||||
var StreamsPath = Path.join(this.StreamsPath, streamId)
|
||||
return fs.remove(StreamsPath)
|
||||
} catch (error) {
|
||||
Logger.debug('No orphan stream', streamId)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
async removeOrphanStreams() {
|
||||
try {
|
||||
var dirs = await fs.readdir(this.StreamsPath)
|
||||
if (!dirs || !dirs.length) return true
|
||||
|
||||
await Promise.all(dirs.map(async (dirname) => {
|
||||
var fullPath = Path.join(this.StreamsPath, dirname)
|
||||
Logger.info(`Removing Orphan Stream ${dirname}`)
|
||||
return fs.remove(fullPath)
|
||||
}))
|
||||
return true
|
||||
} catch (error) {
|
||||
Logger.debug('No orphan stream', error)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
async openStreamApiRequest(res, user, libraryItem) {
|
||||
Logger.info(`[StreamManager] User "${user.username}" open stream request for "${libraryItem.media.metadata.title}"`)
|
||||
var client = {
|
||||
user
|
||||
}
|
||||
var stream = await this.openStream(client, libraryItem)
|
||||
this.db.updateUserStream(client.user.id, stream.id)
|
||||
|
||||
res.json({
|
||||
libraryItemId: libraryItem.id,
|
||||
startTime: stream.startTime,
|
||||
streamId: stream.id,
|
||||
streamUrl: stream.clientPlaylistUri
|
||||
})
|
||||
}
|
||||
|
||||
async openStreamSocketRequest(socket, libraryItemId) {
|
||||
Logger.info('[StreamManager] Open Stream Request', socket.id, libraryItemId)
|
||||
var libraryItem = this.db.libraryItems.find(li => li.id === libraryItemId)
|
||||
var client = socket.sheepClient
|
||||
|
||||
if (client.stream) {
|
||||
Logger.info('Closing client stream first', client.stream.id)
|
||||
await client.stream.close()
|
||||
client.user.stream = null
|
||||
client.stream = null
|
||||
}
|
||||
|
||||
var stream = await this.openStream(client, libraryItem)
|
||||
this.db.updateUserStream(client.user.id, stream.id)
|
||||
|
||||
this.emitter('user_stream_update', client.user.toJSONForPublic(this.streams))
|
||||
}
|
||||
|
||||
async closeStreamRequest(socket) {
|
||||
Logger.info('Close Stream Request', socket.id)
|
||||
var client = socket.sheepClient
|
||||
if (!client || !client.stream) {
|
||||
Logger.error('No stream for client', (client && client.user) ? client.user.username : 'No Client')
|
||||
client.socket.emit('stream_closed', 'n/a')
|
||||
return
|
||||
}
|
||||
// var streamId = client.stream.id
|
||||
await client.stream.close()
|
||||
client.user.stream = null
|
||||
client.stream = null
|
||||
this.db.updateUserStream(client.user.id, null)
|
||||
|
||||
this.emitter('user_stream_update', client.user.toJSONForPublic(this.streams))
|
||||
}
|
||||
|
||||
async closeStreamApiRequest(userId, streamId) {
|
||||
Logger.info('[StreamManager] Close Stream Api Request', streamId)
|
||||
|
||||
var stream = this.streams.find(s => s.id === streamId)
|
||||
if (!stream) {
|
||||
Logger.warn('[StreamManager] Stream not found', streamId)
|
||||
return
|
||||
}
|
||||
|
||||
if (!stream.client || !stream.client.user || stream.client.user.id !== userId) {
|
||||
Logger.warn(`[StreamManager] Stream close request from invalid user ${userId}`, stream.client)
|
||||
return
|
||||
}
|
||||
|
||||
stream.client.user.stream = null
|
||||
stream.client.stream = null
|
||||
this.db.updateUserStream(stream.client.user.id, null)
|
||||
|
||||
await stream.close()
|
||||
|
||||
this.streams = this.streams.filter(s => s.id !== streamId)
|
||||
Logger.info(`[StreamManager] Stream ${streamId} closed via API request by ${userId}`)
|
||||
}
|
||||
|
||||
streamSync(socket, syncData) {
|
||||
const client = socket.sheepClient
|
||||
if (!client || !client.stream) {
|
||||
Logger.error('[StreamManager] streamSync: No stream for client', (client && client.user) ? client.user.id : 'No Client')
|
||||
return
|
||||
}
|
||||
if (client.stream.id !== syncData.streamId) {
|
||||
Logger.error('[StreamManager] streamSync: Stream id mismatch on stream update', syncData.streamId, client.stream.id)
|
||||
return
|
||||
}
|
||||
if (!client.user) {
|
||||
Logger.error('[StreamManager] streamSync: No User for client', client)
|
||||
return
|
||||
}
|
||||
// const { timeListened, currentTime, streamId } = syncData
|
||||
var listeningSession = client.stream.syncStream(syncData)
|
||||
|
||||
if (listeningSession && listeningSession.timeListening > 0) {
|
||||
// Save listening session
|
||||
var existingListeningSession = this.db.sessions.find(s => s.id === listeningSession.id)
|
||||
if (existingListeningSession) {
|
||||
this.db.updateEntity('session', listeningSession)
|
||||
} else {
|
||||
this.db.sessions.push(listeningSession.toJSON()) // Insert right away to prevent duplicate session
|
||||
this.db.insertEntity('session', listeningSession)
|
||||
}
|
||||
}
|
||||
|
||||
var userAudiobook = client.user.updateAudiobookProgressFromStream(client.stream)
|
||||
this.db.updateEntity('user', client.user)
|
||||
|
||||
if (userAudiobook) {
|
||||
this.clientEmitter(client.user.id, 'current_user_audiobook_update', {
|
||||
id: userAudiobook.audiobookId,
|
||||
data: userAudiobook.toJSON()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
streamSyncFromApi(req, res) {
|
||||
var user = req.user
|
||||
var syncData = req.body
|
||||
|
||||
var stream = this.streams.find(s => s.id === syncData.streamId)
|
||||
if (!stream) {
|
||||
Logger.error(`[StreamManager] streamSyncFromApi stream not found ${syncData.streamId}`)
|
||||
return res.status(404).send('Stream not found')
|
||||
}
|
||||
if (stream.userToken !== user.token) {
|
||||
Logger.error(`[StreamManager] streamSyncFromApi Invalid stream not owned by user`)
|
||||
return res.status(500).send('Invalid stream auth')
|
||||
}
|
||||
|
||||
var listeningSession = stream.syncStream(syncData)
|
||||
|
||||
if (listeningSession && listeningSession.timeListening > 0) {
|
||||
// Save listening session
|
||||
var existingListeningSession = this.db.sessions.find(s => s.id === listeningSession.id)
|
||||
if (existingListeningSession) {
|
||||
this.db.updateEntity('session', listeningSession)
|
||||
} else {
|
||||
this.db.sessions.push(listeningSession.toJSON()) // Insert right away to prevent duplicate session
|
||||
this.db.insertEntity('session', listeningSession)
|
||||
}
|
||||
}
|
||||
|
||||
var userAudiobook = user.updateAudiobookProgressFromStream(stream)
|
||||
this.db.updateEntity('user', user)
|
||||
|
||||
if (userAudiobook) {
|
||||
this.clientEmitter(user.id, 'current_user_audiobook_update', {
|
||||
id: userAudiobook.audiobookId,
|
||||
data: userAudiobook.toJSON()
|
||||
})
|
||||
}
|
||||
|
||||
res.sendStatus(200)
|
||||
}
|
||||
}
|
||||
module.exports = StreamManager
|
Loading…
Add table
Add a link
Reference in a new issue