Advertisement
thief_g

MultipartUpload_UploadWorker

Aug 2nd, 2024
635
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Kotlin 18.64 KB | None | 0 0
  1. /******************************************************************************
  2. Welcome to GDB Online.
  3. GDB online is an online compiler and debugger tool for C, C++, Python, Java, PHP, Ruby, Perl,
  4. C#, OCaml, VB, Swift, Pascal, Fortran, Haskell, Objective-C, Assembly, HTML, CSS, JS, SQLite, Prolog.
  5. Code, Compile, Run and Debug online from anywhere in world.
  6.  
  7. *******************************************************************************/
  8. package com.tekion.sales.multipart_upload.worker
  9.  
  10. import android.content.Context
  11. import android.os.Handler
  12. import android.os.Looper
  13. import android.util.Log
  14. import androidx.work.CoroutineWorker
  15. import androidx.work.WorkerParameters
  16. import com.tekion.sales.SharedPref
  17. import com.tekion.sales.multipart_upload.UploadProgressManager
  18.  
  19. import com.tekion.sales.multipart_upload.dataclass.CompleteMultipartUploadRequest
  20. import com.tekion.sales.multipart_upload.network.ApiClient
  21. import com.tekion.sales.multipart_upload.network.ProgressRequestBody
  22. import com.tekion.sales.multipart_upload.helper.UploadManager
  23. import com.tekion.sales.multipart_upload.dataclass.InitiateUploadRequest
  24. import com.tekion.sales.multipart_upload.dataclass.MediaInfo
  25. import com.tekion.sales.multipart_upload.dataclass.PartInfo
  26. import com.tekion.sales.multipart_upload.dataclass.PartNumber
  27. import com.tekion.sales.multipart_upload.dataclass.PartsDetail
  28. import com.tekion.sales.multipart_upload.dataclass.UploadData
  29. import com.tekion.sales.multipart_upload.dataclass.UploadPartData
  30. import com.tekion.sales.multipart_upload.network.NetworkMonitor
  31. import com.tekion.sales.multipart_upload.roomDB.AppDatabase
  32. import com.tekion.sales.multipart_upload.roomDB.MediaEntities
  33. import com.tekion.sales.multipart_upload.roomDB.MediaUploadRepository
  34. import com.tekion.sales.multipart_upload.utils.UploadConstants
  35. import com.tekion.sales.multipart_upload.utils.UploadUtils
  36. import kotlinx.coroutines.Deferred
  37. import kotlinx.coroutines.Dispatchers
  38. import kotlinx.coroutines.async
  39. import kotlinx.coroutines.awaitAll
  40. import kotlinx.coroutines.channels.Channel
  41. import kotlinx.coroutines.coroutineScope
  42. import kotlinx.coroutines.delay
  43. import kotlinx.coroutines.withContext
  44. import kotlinx.coroutines.withTimeoutOrNull
  45. import okhttp3.MultipartBody
  46. import okhttp3.ResponseBody
  47. import retrofit2.Response
  48. import java.io.File
  49. import com.tekion.sales.R
  50.  
  51. class UploadWorker(appContext : Context, params : WorkerParameters): CoroutineWorker(appContext, params ) {
  52.     val _context: Context = appContext
  53.     private val maxConcurrentUploads = 5
  54.     private var totalBytesUploaded: Long = 0 // Tracks total bytes uploaded
  55.     private var previousBytesUploaded : Long = 0 // Tracks total bytes uploaded
  56.     private var totalFileSize: Long = 0
  57.     private val handler = Handler(Looper.getMainLooper())
  58.     private val updateInterval: Long = 1000 // 1 second
  59.     private var networkMonitor = NetworkMonitor(appContext)
  60.     private var uploadProgress = 0
  61.     private var partProgressList  = mutableListOf<Long>()
  62.     private lateinit var repository: MediaUploadRepository
  63.     private lateinit var uploadManager: UploadManager
  64.     private var currentMediaId = ""
  65.     private val speedUpdateRunnable = object : Runnable {
  66.         override fun run() {
  67.             totalBytesUploaded  = 0
  68.             partProgressList.forEach { bytes ->
  69.                 totalBytesUploaded += bytes
  70.             }
  71.             previousBytesUploaded = totalBytesUploaded
  72.             updateTotalProgress()
  73.  
  74.             handler.postDelayed(this, updateInterval)
  75.  
  76.         }
  77.     }
  78.     override suspend fun doWork(): Result = withContext(Dispatchers.IO) {
  79.         try {
  80.  
  81.             val database = AppDatabase.getDatabase(applicationContext)
  82.             repository = MediaUploadRepository(database.mediaDao(), database.mediaPartDao())
  83.             uploadManager = UploadManager(repository)
  84.  
  85.             Log.e("UploadWorker", "You are here")
  86.             val result =  startOrResumeUpload()
  87.             result
  88.  
  89.         } catch (e: Exception) {
  90.             log("Error in doWork: ${e.message}")
  91.             Result.failure()
  92.         } finally {
  93.             handler.removeCallbacks(speedUpdateRunnable)
  94.         }
  95.     }
  96.  
  97.     private suspend fun startOrResumeUpload(): Result {
  98.         val currentMedia: MediaEntities.Media? = repository.getNextPendingMedia()
  99.         log("IncompleteParts: $currentMedia")
  100.         if(currentMedia == null) {
  101.             return Result.success()
  102.         }
  103.         val result = uploadSingleFile(currentMedia)
  104.         if (result) {
  105.             UploadUtils.deleteChunksForFile(applicationContext, currentMedia.fileName)
  106.             UploadProgressManager.completeUpload(currentMediaId)
  107.         }
  108.         return startOrResumeUpload()
  109.     }
  110.  
  111.  
  112.     private suspend fun uploadSingleFile(media: MediaEntities.Media): Boolean {
  113.         val incompleteParts = repository.getIncompleteParts(media.mediaId)
  114.  
  115.         val partsList = getUpdatedIncompleteParts(media, incompleteParts)
  116.  
  117.         val completedParts = repository.getMediaParts(media.mediaId)
  118.  
  119.         resetProgressList(media.totalParts, media.contentLength, completedParts, media.mediaId)
  120.  
  121.         return uploadParts(media, partsList)
  122.     }
  123.  
  124.     private suspend fun getUpdatedIncompleteParts(media: MediaEntities.Media, parts: List<MediaEntities.MediaPart>): List<UploadPartData> {
  125.         val currentTime = System.currentTimeMillis()
  126.         val expiredParts = parts.filter { part ->
  127.             currentTime - part.updatedAt.time > UploadConstants.TIME_PRESIGNED_URL_EXPIRY
  128.         }
  129.  
  130.         return if (expiredParts.isNotEmpty()) {
  131.             regeneratePresignedUrls(media, expiredParts)
  132.         } else {
  133.             parts.map { UploadPartData(PartsDetail(it.partNumber, File(it.partDataPath), it.size, media.contentType), it.presignedUrl) }
  134.         }
  135.     }
  136.     private suspend fun uploadParts(media: MediaEntities.Media, partsList: List<UploadPartData>): Boolean {
  137.         val uploadQueue = Channel<UploadPartData>(Channel.UNLIMITED)
  138.         partsList.forEach { uploadQueue.send(it) }
  139.  
  140.         val uploadJobs = mutableListOf<Deferred<Unit>>()
  141.  
  142.         coroutineScope {
  143.             repeat(maxConcurrentUploads) {
  144.                 val job = async {
  145.                     try {
  146.                         uploadWorker(media, uploadQueue)
  147.                     } catch (e: Exception) {
  148.                         log("Error in upload worker : ${e.message}")
  149.                     }
  150.                 }
  151.                 uploadJobs.add(job)
  152.             }
  153.             uploadQueue.close()
  154.         }
  155.  
  156.         uploadJobs.awaitAll()
  157.  
  158.  
  159.         val allPartsCompleted = repository.getMediaParts(media.mediaId).all { it.status == UploadConstants.STATUS_COMPLETED }
  160.         log("All parts completed? = $allPartsCompleted")
  161.         return if (allPartsCompleted) {
  162.             UploadProgressManager.completeUpload(currentMediaId)
  163.             val partETags = repository.getMediaParts(media.mediaId).map { PartInfo(it.partNumber.toString(), it.etag!!) }
  164.             log("PartsEtags: $partETags")
  165.             val completeResponse = completeMultipartUpload(media.mediaId, partETags)
  166.             log("CompleteResponse: $completeResponse")
  167.             if(completeResponse == UploadConstants.RESPONSE_SUCCESS) {
  168.                 repository.updateMediaStatus(media.mediaId, UploadConstants.STATUS_COMPLETED)
  169.             }
  170.             true
  171.         } else {
  172.             repository.updateMediaStatus(media.mediaId, UploadConstants.STATUS_FAILED)
  173.             false
  174.         }
  175.     }
  176.  
  177. //    private suspend fun regeneratePresignedUrls(media: MediaEntities.Media, expiredParts: List<MediaEntities.MediaPart>): List<UploadPartData> {
  178. //        val partNumbers = expiredParts.map { PartNumber(it.partNumber, it.size) }
  179. //        val newPresignedUrls = uploadManager.refreshPresignedUrls( media.mediaId, partNumbers)
  180. //
  181. //        return expiredParts.map { part ->
  182. //            val newUrl = newPresignedUrls.presignedUrl[part.partNumber] ?: ""
  183. //            repository.updateMediaPartPresignedUrl(media.mediaId, part.partNumber, newUrl)
  184. //            val partFile = File(part.partDataPath).takeIf { it.exists() }
  185. //                ?: UploadUtils.findFileUsingMediaStore(_context, part.partDataPath)
  186. //                ?: throw IllegalStateException("Part file not found: ${part.partDataPath}")
  187. //            UploadPartData(PartsDetail(part.partNumber, partFile, partFile.length(),"video/mp4"), newUrl)
  188. //        }
  189. //    }
  190.  
  191.     private suspend fun regeneratePresignedUrls(media: MediaEntities.Media, expiredParts: List<MediaEntities.MediaPart>): List<UploadPartData> {
  192.         val partInfos = expiredParts.map { PartNumber(it.partNumber, it.size) }
  193.         val mediaInfo = MediaInfo(media.mediaId, partInfos)
  194.         val newPresignedUrls = uploadManager.regeneratePresignedUrls(listOf(mediaInfo))
  195.  
  196.         return expiredParts.map { part ->
  197.             val newUrl = newPresignedUrls[media.mediaId]?.get(part.partNumber) ?: ""
  198.             repository.updateMediaPartPresignedUrl(media.mediaId, part.partNumber, newUrl)
  199.             val partFile = File(part.partDataPath).takeIf { it.exists() }
  200.                 ?: UploadUtils.findFileUsingMediaStore(_context, part.partDataPath)
  201.                 ?: throw IllegalStateException("Part file not found: ${part.partDataPath}")
  202.             UploadPartData(PartsDetail(part.partNumber, partFile, partFile.length(), "video/mp4"), newUrl)
  203.         }
  204.     }
  205.  
  206.  
  207.     // TODO: remove logs
  208.     private suspend fun uploadWorker(media: MediaEntities.Media, uploadQueue: Channel<UploadPartData>) {
  209. //        val expiredParts = mutableListOf<UploadPartData>()
  210. //        var finallyFailedParts = 0
  211. //        for (part in uploadQueue) {
  212. //            var retries = 0
  213. //            var uploaded = false
  214. //            var presignedUrlExpired = false
  215. //            while ( retries < maxRetries && !uploaded) {
  216. //                if (networkMonitor.isNetworkAvailable()) {
  217. //                    try {
  218. //                        val response = uploadPart(media.mediaId, part.partsDetail, part.url)
  219. //                        if (response.isSuccessful) {
  220. //                            log("Successfully uploaded part ${part.partsDetail.number}")
  221. //                            uploaded = true
  222. //                            val eTag = response.headers()["ETag"]
  223. //                            repository.updateMediaPartStatus(media.mediaId,part.partsDetail.number,UploadConstants.STATUS_COMPLETED,eTag)
  224. //
  225. //                        } else {
  226. //                            presignedUrlExpired = true
  227. //                            repository.updateMediaPartError(media.mediaId,part.partsDetail.number, response.code().toString())
  228. //                            retries++
  229. //                            log("Failed to upload part ${part.partsDetail.number}")
  230. //                            delay(1000 * retries.toLong())
  231. //                        }
  232. //                    } catch (e: Exception) {
  233. //                        log("Failed to upload part ${part.partsDetail.number}. Retry $retries of $maxRetries")
  234. //                        retries++
  235. //                        delay(1000 * retries.toLong())
  236. //                    }
  237. //                } else {
  238. //                    log("Network unavailable. Waiting for network...")
  239. //                    if (!waitForNetworkWithTimeout(60_000)) { // 60 seconds = 1 minute
  240. //                        log("Network timeout reached. Skipping this attempt.")
  241. //                        retries++
  242. //                    }
  243. //                }
  244. //            }
  245. //            if(presignedUrlExpired) {
  246. //                expiredParts.add(part)
  247. //            } else if(!uploaded && retries >= maxRetries) {
  248. //                finallyFailedParts++
  249. //                repository.updateMediaPartStatus(media.mediaId,part.partsDetail.number,UploadConstants.STATUS_FAILED,null)
  250. //                log("Failed to upload part ${part.partsDetail.number} after $maxRetries retries")
  251. //            }
  252. //        }
  253. //
  254. //        if(expiredParts.isNotEmpty() && finallyFailedParts == 0) {
  255. //            uploadParts(media,expiredParts)
  256. //        }
  257.  
  258.         val expiredParts = mutableListOf<UploadPartData>()
  259.         var finallyFailedParts = 0
  260.         for (part in uploadQueue) {
  261.             var retries = 0
  262.             var uploaded = false
  263.             var presignedUrlExpired = false
  264.             while (retries < UploadConstants.MAX_RETRIES&& !uploaded) {
  265.                 if (networkMonitor.isNetworkAvailable()) {
  266.                     try {
  267.                         val response = uploadPart(media.mediaId, part.partsDetail, part.url)
  268.                         if (response.isSuccessful) {
  269.                             log("Successfully uploaded part ${part.partsDetail.number}")
  270.                             uploaded = true
  271.                             val eTag = response.headers()["ETag"]
  272.                             repository.updateMediaPartStatus(media.mediaId, part.partsDetail.number, UploadConstants.STATUS_COMPLETED, eTag)
  273.                         } else {
  274.                             if (response.code() == 403) {  // Assuming 403 means expired URL
  275.                                 presignedUrlExpired = true
  276.                                 break
  277.                             }
  278.                             repository.updateMediaPartError(media.mediaId, part.partsDetail.number, response.code().toString())
  279.                             retries++
  280.                             log("Failed to upload part ${part.partsDetail.number}. Response code: ${response.code()}")
  281.                             delay(1000 * retries.toLong())
  282.                         }
  283.                     } catch (e: Exception) {
  284.                         log("Error uploading part ${part.partsDetail.number}: ${e.message}")
  285.                         retries++
  286.                         delay(1000 * retries.toLong())
  287.                     }
  288.                 } else {
  289.                     log("Network unavailable. Waiting for network...")
  290.                     if (!waitForNetworkWithTimeout(60_000)) {
  291.                         log("Network timeout reached. Skipping this attempt.")
  292.                         retries++
  293.                     }
  294.                 }
  295.             }
  296.             if (presignedUrlExpired) {
  297.                 expiredParts.add(part)
  298.             } else if (!uploaded && retries >= UploadConstants.MAX_RETRIES) {
  299.                 finallyFailedParts++
  300.                 repository.updateMediaPartStatus(media.mediaId, part.partsDetail.number, UploadConstants.STATUS_FAILED, null)
  301.                 log("Failed to upload part ${part.partsDetail.number} after ${UploadConstants.MAX_RETRIES} retries")
  302.             }
  303.         }
  304.  
  305.         if (expiredParts.isNotEmpty()) {
  306.             val regeneratedParts = regeneratePresignedUrls(media, expiredParts.map {
  307.                 MediaEntities.MediaPart(mediaId = media.mediaId, partNumber = it.partsDetail.number,
  308.                     presignedUrl =  it.url, partDataPath = it.partsDetail.file.absolutePath,
  309.                     status = UploadConstants.STATUS_TO_START, size = it.partsDetail.length)
  310.             })
  311.             uploadParts(media, regeneratedParts)
  312.         }
  313.  
  314.     }
  315.  
  316.     private suspend fun waitForNetworkWithTimeout(timeoutMs: Long): Boolean {
  317.         return withTimeoutOrNull(timeoutMs) {
  318.             while (!networkMonitor.isNetworkAvailable()) {
  319.                 delay(1000)
  320.             }
  321.             true // Network became available
  322.         } ?: false // Timeout reached
  323.     }
  324.     private suspend fun uploadPart(
  325.         mediaId: String,
  326.         part: PartsDetail,
  327.         presignedUrl: String,
  328.  
  329.         ): Response<ResponseBody> {
  330.         repository.updateMediaPartStatus(mediaId, part.number, UploadConstants.STATUS_IN_PROGRESS)
  331.         val requestFile = ProgressRequestBody(part.file, part.contentType) { progress ->
  332.             partProgressList[part.number - 1] = ((progress * part.length) / 100)
  333.         }
  334.         val body = MultipartBody.Part.createFormData("file", part.file.name, requestFile)
  335.         val apiService = ApiClient.getApiService()
  336.         return apiService.uploadFile(presignedUrl, body)
  337.     }
  338.  
  339.     private fun updateTotalProgress() {
  340.         uploadProgress = ((totalBytesUploaded * 100) / totalFileSize).toInt()
  341.         log("UploadWorker Progress ....$uploadProgress%")
  342.         UploadProgressManager.updateProgress(currentMediaId, totalBytesUploaded, totalFileSize)
  343.     }
  344.  
  345.     suspend fun completeMultipartUpload(
  346.         mediaId: String,
  347.         partsInfo: List<PartInfo>
  348.     ): String {
  349.         val request = CompleteMultipartUploadRequest(
  350.             mediaId = mediaId,
  351.             partsInfo = partsInfo
  352.         )
  353.         val apiService = ApiClient.getApiService()
  354.         val roId = SharedPref.read(UploadConstants.KEY_RO_ID,"")
  355.         log("RoId: $roId")
  356.  
  357.         return withContext(Dispatchers.IO) {
  358.             val response = apiService.completeMultipartUpload(roId, request)
  359.  
  360.             if (response.status == _context.getString(R.string.su)) {
  361.                 Log.d("UploadWorker","MultiPartComplete: $response")
  362.                 response.status
  363.             } else {
  364.                 Log.d("UploadWorker","Failed MultiPartComplete: $response")
  365.                 throw Exception("Failed to complete upload: ${response.status}")
  366.             }
  367.         }
  368.     }
  369.     private suspend fun initiateMultipartUpload(
  370.         fileName: String,
  371.         contentLength: Long,
  372.         mimeType: String,
  373.         numParts: Int,
  374.         chunkMaxSize: Long
  375.     ): UploadData {
  376.         val request = InitiateUploadRequest(fileName, contentLength,mimeType,numParts, chunkMaxSize)
  377.         return withContext(Dispatchers.IO) {
  378.             val response = ApiClient.getApiService().initiateUpload(request)
  379.             if (response.status == "success") {
  380.                 response.data
  381.             } else {
  382.                 throw Exception("Failed to initiate upload: ${response.status}")
  383.             }
  384.         }
  385.     }
  386.  
  387.  
  388.     private fun log(message: String) {
  389.         println("UploadWorker: $message")
  390.     }
  391.  
  392.     private fun resetProgressList(numberOfParts: Int, totalBytes: Long, parts: List<MediaEntities.MediaPartInfo>, mediaId: String) {
  393.         partProgressList.clear()
  394.         currentMediaId = mediaId
  395.         partProgressList = MutableList(numberOfParts){0}
  396.         previousBytesUploaded = 0
  397.         totalFileSize = totalBytes
  398.  
  399.         parts.forEach { partInfo ->
  400.             if(partInfo.status == UploadConstants.STATUS_COMPLETED) {
  401.                 previousBytesUploaded += partInfo.size
  402.                 partProgressList[partInfo.partNumber - 1] = partInfo.size
  403.             }
  404.         }
  405.         totalBytesUploaded = previousBytesUploaded
  406.         networkMonitor = NetworkMonitor(_context)
  407.         networkMonitor.startMonitoring()
  408.         handler.post(speedUpdateRunnable)
  409.     }
  410.  
  411.  
  412. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement