Image Cataloger / Experimenting with Kotlin Coroutines May 2020
Kotlin's Coroutines provide some alternative to the classical Java way of thread-based concurrency:
- Instead of completing one unit of work on a single thread with Coroutines many threads may participate in the execution of one unit of work. This approach goes hand in hand with non blocking IO.
- The programming model provides code which almost looks like sequential code would look since no Callbacks, Futures, Observables, etc. are used.
While it takes some time to get accustomed, the resulting code looks quite clean in my eyes. The central processing pipeline code:
class ImageCatalogPipeline { private val scope = MainScope() private val threadPool = Executors .newFixedThreadPool(Runtime.getRuntime().availableProcessors()) .asCoroutineDispatcher() private val locationService = LocationService() data class ImageData(val filePath: Path, val id: ImageId, val newFile: NewFileService.NewFile) fun run(dir: String): List<ImageData> { info("starting to collect") val filesSeq = FilesSequenceGenerator.generate(dir).take(filesLimit) fun <T> listOfSequences(amount: Int, seq: Sequence<T>): List<Sequence<T>> = (1..amount).map { pos -> seq. filterIndexed { i, _ -> i % amount == pos - 1 } } val filesSequences = listOfSequences(concurrency, filesSeq) val deferredImageDataLists: List<Deferred<List<ImageData>>> = filesSequences.map { filePathList -> scope.async(threadPool) { filePathList.mapNotNull { filePath -> val fname = filePath.fileName.toString() log("working on $fname") val fileContent = scope.async(threadPool) { AsyncFileReader.read(filePath) } val imageIdDeferred = scope.async(threadPool) { ImageIdentityService.identify(fname, fileContent.await()) } val newFileDeferred = scope.async(threadPool) { mapToImageFileName(filePath, fileContent, fname) } runBlocking(threadPool) { val newFile = newFileDeferred.await() /* return: */ if (newFile == null) null else ImageData(filePath, imageIdDeferred.await(), newFile) } }.asIterable().toList() // list of deferred sequences -> list of deferred lists } } val deferredImageDataList: Deferred<List<ImageData>> = scope.async(threadPool) { deferredImageDataLists.flatMap { it.await() } } val imageDataList = runBlocking() { deferredImageDataList.await() }.toList() info("collected ${imageDataList.size} images") return imageDataList } private suspend fun mapToImageFileName(filePath: Path, fileContent: Deferred<ByteArray>, fname: String): NewFileService.NewFile? { val metadata = ImageMetadataService.extract(filePath, fileContent.await()) ?: return null val geolocation = metadata.geolocation val location = if (geolocation != null) locationService.find(fname, geolocation.longitude, geolocation.latitude) else null return NewFileService.build(filePath, metadata, location) } }
Souces available on GitHub: https://github.com/micgn/imagecatalog