Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /******************************************************************************
- Welcome to GDB Online.
- GDB online is an online compiler and debugger tool for C, C++, Python, Java, PHP, Ruby, Perl,
- C#, OCaml, VB, Swift, Pascal, Fortran, Haskell, Objective-C, Assembly, HTML, CSS, JS, SQLite, Prolog.
- Code, Compile, Run and Debug online from anywhere in world.
- *******************************************************************************/
- package com.tekion.sales.multipart_upload.worker
- import android.content.Context
- import android.os.Handler
- import android.os.Looper
- import android.util.Log
- import androidx.work.CoroutineWorker
- import androidx.work.WorkerParameters
- import com.tekion.sales.SharedPref
- import com.tekion.sales.multipart_upload.UploadProgressManager
- import com.tekion.sales.multipart_upload.dataclass.CompleteMultipartUploadRequest
- import com.tekion.sales.multipart_upload.network.ApiClient
- import com.tekion.sales.multipart_upload.network.ProgressRequestBody
- import com.tekion.sales.multipart_upload.helper.UploadManager
- import com.tekion.sales.multipart_upload.dataclass.InitiateUploadRequest
- import com.tekion.sales.multipart_upload.dataclass.MediaInfo
- import com.tekion.sales.multipart_upload.dataclass.PartInfo
- import com.tekion.sales.multipart_upload.dataclass.PartNumber
- import com.tekion.sales.multipart_upload.dataclass.PartsDetail
- import com.tekion.sales.multipart_upload.dataclass.UploadData
- import com.tekion.sales.multipart_upload.dataclass.UploadPartData
- import com.tekion.sales.multipart_upload.network.NetworkMonitor
- import com.tekion.sales.multipart_upload.roomDB.AppDatabase
- import com.tekion.sales.multipart_upload.roomDB.MediaEntities
- import com.tekion.sales.multipart_upload.roomDB.MediaUploadRepository
- import com.tekion.sales.multipart_upload.utils.UploadConstants
- import com.tekion.sales.multipart_upload.utils.UploadUtils
- import kotlinx.coroutines.Deferred
- import kotlinx.coroutines.Dispatchers
- import kotlinx.coroutines.async
- import kotlinx.coroutines.awaitAll
- import kotlinx.coroutines.channels.Channel
- import kotlinx.coroutines.coroutineScope
- import kotlinx.coroutines.delay
- import kotlinx.coroutines.withContext
- import kotlinx.coroutines.withTimeoutOrNull
- import okhttp3.MultipartBody
- import okhttp3.ResponseBody
- import retrofit2.Response
- import java.io.File
- import com.tekion.sales.R
- class UploadWorker(appContext : Context, params : WorkerParameters): CoroutineWorker(appContext, params ) {
- val _context: Context = appContext
- private val maxConcurrentUploads = 5
- private var totalBytesUploaded: Long = 0 // Tracks total bytes uploaded
- private var previousBytesUploaded : Long = 0 // Tracks total bytes uploaded
- private var totalFileSize: Long = 0
- private val handler = Handler(Looper.getMainLooper())
- private val updateInterval: Long = 1000 // 1 second
- private var networkMonitor = NetworkMonitor(appContext)
- private var uploadProgress = 0
- private var partProgressList = mutableListOf<Long>()
- private lateinit var repository: MediaUploadRepository
- private lateinit var uploadManager: UploadManager
- private var currentMediaId = ""
- private val speedUpdateRunnable = object : Runnable {
- override fun run() {
- totalBytesUploaded = 0
- partProgressList.forEach { bytes ->
- totalBytesUploaded += bytes
- }
- previousBytesUploaded = totalBytesUploaded
- updateTotalProgress()
- handler.postDelayed(this, updateInterval)
- }
- }
- override suspend fun doWork(): Result = withContext(Dispatchers.IO) {
- try {
- val database = AppDatabase.getDatabase(applicationContext)
- repository = MediaUploadRepository(database.mediaDao(), database.mediaPartDao())
- uploadManager = UploadManager(repository)
- Log.e("UploadWorker", "You are here")
- val result = startOrResumeUpload()
- result
- } catch (e: Exception) {
- log("Error in doWork: ${e.message}")
- Result.failure()
- } finally {
- handler.removeCallbacks(speedUpdateRunnable)
- }
- }
- private suspend fun startOrResumeUpload(): Result {
- val currentMedia: MediaEntities.Media? = repository.getNextPendingMedia()
- log("IncompleteParts: $currentMedia")
- if(currentMedia == null) {
- return Result.success()
- }
- val result = uploadSingleFile(currentMedia)
- if (result) {
- UploadUtils.deleteChunksForFile(applicationContext, currentMedia.fileName)
- UploadProgressManager.completeUpload(currentMediaId)
- }
- return startOrResumeUpload()
- }
- private suspend fun uploadSingleFile(media: MediaEntities.Media): Boolean {
- val incompleteParts = repository.getIncompleteParts(media.mediaId)
- val partsList = getUpdatedIncompleteParts(media, incompleteParts)
- val completedParts = repository.getMediaParts(media.mediaId)
- resetProgressList(media.totalParts, media.contentLength, completedParts, media.mediaId)
- return uploadParts(media, partsList)
- }
- private suspend fun getUpdatedIncompleteParts(media: MediaEntities.Media, parts: List<MediaEntities.MediaPart>): List<UploadPartData> {
- val currentTime = System.currentTimeMillis()
- val expiredParts = parts.filter { part ->
- currentTime - part.updatedAt.time > UploadConstants.TIME_PRESIGNED_URL_EXPIRY
- }
- return if (expiredParts.isNotEmpty()) {
- regeneratePresignedUrls(media, expiredParts)
- } else {
- parts.map { UploadPartData(PartsDetail(it.partNumber, File(it.partDataPath), it.size, media.contentType), it.presignedUrl) }
- }
- }
- private suspend fun uploadParts(media: MediaEntities.Media, partsList: List<UploadPartData>): Boolean {
- val uploadQueue = Channel<UploadPartData>(Channel.UNLIMITED)
- partsList.forEach { uploadQueue.send(it) }
- val uploadJobs = mutableListOf<Deferred<Unit>>()
- coroutineScope {
- repeat(maxConcurrentUploads) {
- val job = async {
- try {
- uploadWorker(media, uploadQueue)
- } catch (e: Exception) {
- log("Error in upload worker : ${e.message}")
- }
- }
- uploadJobs.add(job)
- }
- uploadQueue.close()
- }
- uploadJobs.awaitAll()
- val allPartsCompleted = repository.getMediaParts(media.mediaId).all { it.status == UploadConstants.STATUS_COMPLETED }
- log("All parts completed? = $allPartsCompleted")
- return if (allPartsCompleted) {
- UploadProgressManager.completeUpload(currentMediaId)
- val partETags = repository.getMediaParts(media.mediaId).map { PartInfo(it.partNumber.toString(), it.etag!!) }
- log("PartsEtags: $partETags")
- val completeResponse = completeMultipartUpload(media.mediaId, partETags)
- log("CompleteResponse: $completeResponse")
- if(completeResponse == UploadConstants.RESPONSE_SUCCESS) {
- repository.updateMediaStatus(media.mediaId, UploadConstants.STATUS_COMPLETED)
- }
- true
- } else {
- repository.updateMediaStatus(media.mediaId, UploadConstants.STATUS_FAILED)
- false
- }
- }
- // private suspend fun regeneratePresignedUrls(media: MediaEntities.Media, expiredParts: List<MediaEntities.MediaPart>): List<UploadPartData> {
- // val partNumbers = expiredParts.map { PartNumber(it.partNumber, it.size) }
- // val newPresignedUrls = uploadManager.refreshPresignedUrls( media.mediaId, partNumbers)
- //
- // return expiredParts.map { part ->
- // val newUrl = newPresignedUrls.presignedUrl[part.partNumber] ?: ""
- // repository.updateMediaPartPresignedUrl(media.mediaId, part.partNumber, newUrl)
- // val partFile = File(part.partDataPath).takeIf { it.exists() }
- // ?: UploadUtils.findFileUsingMediaStore(_context, part.partDataPath)
- // ?: throw IllegalStateException("Part file not found: ${part.partDataPath}")
- // UploadPartData(PartsDetail(part.partNumber, partFile, partFile.length(),"video/mp4"), newUrl)
- // }
- // }
- private suspend fun regeneratePresignedUrls(media: MediaEntities.Media, expiredParts: List<MediaEntities.MediaPart>): List<UploadPartData> {
- val partInfos = expiredParts.map { PartNumber(it.partNumber, it.size) }
- val mediaInfo = MediaInfo(media.mediaId, partInfos)
- val newPresignedUrls = uploadManager.regeneratePresignedUrls(listOf(mediaInfo))
- return expiredParts.map { part ->
- val newUrl = newPresignedUrls[media.mediaId]?.get(part.partNumber) ?: ""
- repository.updateMediaPartPresignedUrl(media.mediaId, part.partNumber, newUrl)
- val partFile = File(part.partDataPath).takeIf { it.exists() }
- ?: UploadUtils.findFileUsingMediaStore(_context, part.partDataPath)
- ?: throw IllegalStateException("Part file not found: ${part.partDataPath}")
- UploadPartData(PartsDetail(part.partNumber, partFile, partFile.length(), "video/mp4"), newUrl)
- }
- }
- // TODO: remove logs
- private suspend fun uploadWorker(media: MediaEntities.Media, uploadQueue: Channel<UploadPartData>) {
- // val expiredParts = mutableListOf<UploadPartData>()
- // var finallyFailedParts = 0
- // for (part in uploadQueue) {
- // var retries = 0
- // var uploaded = false
- // var presignedUrlExpired = false
- // while ( retries < maxRetries && !uploaded) {
- // if (networkMonitor.isNetworkAvailable()) {
- // try {
- // val response = uploadPart(media.mediaId, part.partsDetail, part.url)
- // if (response.isSuccessful) {
- // log("Successfully uploaded part ${part.partsDetail.number}")
- // uploaded = true
- // val eTag = response.headers()["ETag"]
- // repository.updateMediaPartStatus(media.mediaId,part.partsDetail.number,UploadConstants.STATUS_COMPLETED,eTag)
- //
- // } else {
- // presignedUrlExpired = true
- // repository.updateMediaPartError(media.mediaId,part.partsDetail.number, response.code().toString())
- // retries++
- // log("Failed to upload part ${part.partsDetail.number}")
- // delay(1000 * retries.toLong())
- // }
- // } catch (e: Exception) {
- // log("Failed to upload part ${part.partsDetail.number}. Retry $retries of $maxRetries")
- // retries++
- // delay(1000 * retries.toLong())
- // }
- // } else {
- // log("Network unavailable. Waiting for network...")
- // if (!waitForNetworkWithTimeout(60_000)) { // 60 seconds = 1 minute
- // log("Network timeout reached. Skipping this attempt.")
- // retries++
- // }
- // }
- // }
- // if(presignedUrlExpired) {
- // expiredParts.add(part)
- // } else if(!uploaded && retries >= maxRetries) {
- // finallyFailedParts++
- // repository.updateMediaPartStatus(media.mediaId,part.partsDetail.number,UploadConstants.STATUS_FAILED,null)
- // log("Failed to upload part ${part.partsDetail.number} after $maxRetries retries")
- // }
- // }
- //
- // if(expiredParts.isNotEmpty() && finallyFailedParts == 0) {
- // uploadParts(media,expiredParts)
- // }
- val expiredParts = mutableListOf<UploadPartData>()
- var finallyFailedParts = 0
- for (part in uploadQueue) {
- var retries = 0
- var uploaded = false
- var presignedUrlExpired = false
- while (retries < UploadConstants.MAX_RETRIES&& !uploaded) {
- if (networkMonitor.isNetworkAvailable()) {
- try {
- val response = uploadPart(media.mediaId, part.partsDetail, part.url)
- if (response.isSuccessful) {
- log("Successfully uploaded part ${part.partsDetail.number}")
- uploaded = true
- val eTag = response.headers()["ETag"]
- repository.updateMediaPartStatus(media.mediaId, part.partsDetail.number, UploadConstants.STATUS_COMPLETED, eTag)
- } else {
- if (response.code() == 403) { // Assuming 403 means expired URL
- presignedUrlExpired = true
- break
- }
- repository.updateMediaPartError(media.mediaId, part.partsDetail.number, response.code().toString())
- retries++
- log("Failed to upload part ${part.partsDetail.number}. Response code: ${response.code()}")
- delay(1000 * retries.toLong())
- }
- } catch (e: Exception) {
- log("Error uploading part ${part.partsDetail.number}: ${e.message}")
- retries++
- delay(1000 * retries.toLong())
- }
- } else {
- log("Network unavailable. Waiting for network...")
- if (!waitForNetworkWithTimeout(60_000)) {
- log("Network timeout reached. Skipping this attempt.")
- retries++
- }
- }
- }
- if (presignedUrlExpired) {
- expiredParts.add(part)
- } else if (!uploaded && retries >= UploadConstants.MAX_RETRIES) {
- finallyFailedParts++
- repository.updateMediaPartStatus(media.mediaId, part.partsDetail.number, UploadConstants.STATUS_FAILED, null)
- log("Failed to upload part ${part.partsDetail.number} after ${UploadConstants.MAX_RETRIES} retries")
- }
- }
- if (expiredParts.isNotEmpty()) {
- val regeneratedParts = regeneratePresignedUrls(media, expiredParts.map {
- MediaEntities.MediaPart(mediaId = media.mediaId, partNumber = it.partsDetail.number,
- presignedUrl = it.url, partDataPath = it.partsDetail.file.absolutePath,
- status = UploadConstants.STATUS_TO_START, size = it.partsDetail.length)
- })
- uploadParts(media, regeneratedParts)
- }
- }
- private suspend fun waitForNetworkWithTimeout(timeoutMs: Long): Boolean {
- return withTimeoutOrNull(timeoutMs) {
- while (!networkMonitor.isNetworkAvailable()) {
- delay(1000)
- }
- true // Network became available
- } ?: false // Timeout reached
- }
- private suspend fun uploadPart(
- mediaId: String,
- part: PartsDetail,
- presignedUrl: String,
- ): Response<ResponseBody> {
- repository.updateMediaPartStatus(mediaId, part.number, UploadConstants.STATUS_IN_PROGRESS)
- val requestFile = ProgressRequestBody(part.file, part.contentType) { progress ->
- partProgressList[part.number - 1] = ((progress * part.length) / 100)
- }
- val body = MultipartBody.Part.createFormData("file", part.file.name, requestFile)
- val apiService = ApiClient.getApiService()
- return apiService.uploadFile(presignedUrl, body)
- }
- private fun updateTotalProgress() {
- uploadProgress = ((totalBytesUploaded * 100) / totalFileSize).toInt()
- log("UploadWorker Progress ....$uploadProgress%")
- UploadProgressManager.updateProgress(currentMediaId, totalBytesUploaded, totalFileSize)
- }
- suspend fun completeMultipartUpload(
- mediaId: String,
- partsInfo: List<PartInfo>
- ): String {
- val request = CompleteMultipartUploadRequest(
- mediaId = mediaId,
- partsInfo = partsInfo
- )
- val apiService = ApiClient.getApiService()
- val roId = SharedPref.read(UploadConstants.KEY_RO_ID,"")
- log("RoId: $roId")
- return withContext(Dispatchers.IO) {
- val response = apiService.completeMultipartUpload(roId, request)
- if (response.status == _context.getString(R.string.su)) {
- Log.d("UploadWorker","MultiPartComplete: $response")
- response.status
- } else {
- Log.d("UploadWorker","Failed MultiPartComplete: $response")
- throw Exception("Failed to complete upload: ${response.status}")
- }
- }
- }
- private suspend fun initiateMultipartUpload(
- fileName: String,
- contentLength: Long,
- mimeType: String,
- numParts: Int,
- chunkMaxSize: Long
- ): UploadData {
- val request = InitiateUploadRequest(fileName, contentLength,mimeType,numParts, chunkMaxSize)
- return withContext(Dispatchers.IO) {
- val response = ApiClient.getApiService().initiateUpload(request)
- if (response.status == "success") {
- response.data
- } else {
- throw Exception("Failed to initiate upload: ${response.status}")
- }
- }
- }
- private fun log(message: String) {
- println("UploadWorker: $message")
- }
- private fun resetProgressList(numberOfParts: Int, totalBytes: Long, parts: List<MediaEntities.MediaPartInfo>, mediaId: String) {
- partProgressList.clear()
- currentMediaId = mediaId
- partProgressList = MutableList(numberOfParts){0}
- previousBytesUploaded = 0
- totalFileSize = totalBytes
- parts.forEach { partInfo ->
- if(partInfo.status == UploadConstants.STATUS_COMPLETED) {
- previousBytesUploaded += partInfo.size
- partProgressList[partInfo.partNumber - 1] = partInfo.size
- }
- }
- totalBytesUploaded = previousBytesUploaded
- networkMonitor = NetworkMonitor(_context)
- networkMonitor.startMonitoring()
- handler.post(speedUpdateRunnable)
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement