In this tutorial we will be looking at how to write a code in Scala programming language to connect to AWS S3 and perform COPY|MOVE|DELETE|LIST operations using AWS SDK (Software development kit).
AWS S3
Amazon Simple Storage Service (Amazon S3) is an object storage service offering industry-leading scalability, data availability, security, and performance. Customers of all sizes and industries can store and protect any amount of data for virtually any use case, such as data lakes, cloud-native applications, and mobile apps. With cost-effective storage classes and easy-to-use management features, you can optimize costs, organize data, and configure fine-tuned access controls to meet specific business, organizational, and compliance requirements.
AWS SDK
For this example, we will be using AWS SDK with Maven project in Scala. We will be developing this app as a maven project using the Java SDK from AWS. Add the below dependencies to your pom.xml.
Prerequisites
- Java version 8
- Scala version 2.11
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.11.449</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.11.449</version>
</dependency>
Let us first define the controller class that will be used to interact with the supported methods.
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.regions.Regions
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.auth.AWSStaticCredentialsProvider
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.MultipleFileUpload;
import com.amazonaws.services.s3.transfer.TransferProgress;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.GetObjectRequest;
import scala.util.matching.Regex
import java.io.File
import org.apache.logging.log4j.scala.Logging
import scala.collection.mutable.Set
import java.io.BufferedReader
import java.io.InputStreamReader
import java.io.BufferedWriter
import java.io.OutputStreamWriter
import org.apache.commons.io.IOUtils
import com.amazonaws.services.s3.model.S3Object
import java.util.ArrayList
import com.amazonaws.services.s3.model.PartETag
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest
import com.amazonaws.services.s3.model.GetObjectMetadataRequest
import com.amazonaws.services.s3.model.CopyPartResult
import com.amazonaws.services.s3.model.CopyPartRequest
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest
import com.amazonaws.services.s3.model.ObjectMetadata
import com.amazonaws.ClientConfiguration
class S3Service(awsKey: String, awsSecret: String) extends Logging {
private val ALLOWED_OBJECT_SIZE = 1073741824 L;
val credentials = new BasicAWSCredentials(awsKey, awsSecret);
val clientConfig = new ClientConfiguration();
clientConfig.setRequestTimeout(30 * 1000);
clientConfig.setConnectionTimeout(20000);
clientConfig.setMaxErrorRetry(8);
val s3client = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(credentials)).withClientConfiguration(clientConfig).withRegion(Regions.US_EAST_1).build();
def shutdown() {
logger.info("Shutting down S3 client!");
s3client.shutdown();
}
}
Now we will define the supporting methods. We will be looking at how to perform COPY, MOVE, DELETE, LIST and other operations.
S3 Get Object Metadata
To get the metadata of an object on S3, you can use this method.
S3 Delete
To delete an object from S3 bucket, use the below method
/**
* Get the list of Files from S3 path matching the pattern.
* @param deleteS3Object S3 path.
* @param fromBucketName
*/
@throws(classOf[Exception]) @throws(classOf[AmazonServiceException])
def deleteObject(deleteS3Object: String, fromBucketName: String) {
s3client.deleteObject(fromS3BucketName, deleteS3Object);
}
S3 Copy
To perform a copy from S3 to local filesystem use the below method.
/** * Get the list of Files from S3 path matching the pattern. * @param sourceBucket * @param source s3 object key * @param target local path */ @throws(classOf[AmazonServiceException])def copyFromS3ToLocal(sourceBucket: String, source: String, target: String) = { val targetFile = new File(target); s3client.getObject(new GetObjectRequest(sourceBucket, source), targetFile) logger.info(s"Object: $source, copied from S3 to $target."); }
To perform a copy from local filesystem to S3 use the below method.
/** * Get the list of Files from S3 path matching the pattern. * @param source local filesystem path * @param targetBucket name * @param target s3 object key */ @throws(classOf[AmazonServiceException])def copyFromLocalToS3(source: String, targetBucket: String, targetKey: String) ={ val srcFile = new File(source); s3client.putObject( targetBucket, targetKey + srcFile.getName, srcFile); logger.info(s"Object: $source, copied to S3. Bucket Name: $targetBucket, Key: $targetKey"); }
We will be using S3 multipart copy process to perform the distributed parallel copy in parts.
/** * Get the list of Files from S3 path matching the pattern. * * @param sourceS3Path S3 object key * @param fromBucketName */def getS3ObjectMetadata(sourceS3Path: String, fromBucketName: String): ObjectMetadata = { // Get the object metadata. var metadataRequest = new GetObjectMetadataRequest(fromS3BucketName, sourceS3Path); var metadataResult = s3client.getObjectMetadata(metadataRequest); metadataResult; }
/** * Copy object from a key to another in multiparts * * @param sourceS3Path S3 object key * @param targetS3Path S3 object key * @param fromBucketName bucket name * @param toBucketName bucket name */ @throws(classOf[Exception]) @throws(classOf[AmazonServiceException]) def copyMultipart(sourceS3Path: String, targetS3Path: String, fromS3BucketName: String, toS3BucketName: String) { // Create a list of ETag objects. You retrieve ETags for each object part uploaded, // then, after each individual part has been uploaded, pass the list of ETags to // the request to complete the upload. var partETags = new ArrayList[PartETag](); // Initiate the multipart upload. val initRequest = new InitiateMultipartUploadRequest(toS3BucketName, targetS3Path); val initResponse = s3client.initiateMultipartUpload(initRequest); // Get the object size to track the end of the copy operation. var metadataResult = getS3ObjectMetadata(sourceS3Path, fromS3BucketName); var objectSize = metadataResult.getContentLength(); // Copy the object using 50 MB parts. val partSize = (50 * 1024 * 1024) * 1L; var bytePosition = 0L; var partNum = 1; var copyResponses = new ArrayList[CopyPartResult](); while (bytePosition < objectSize) { // The last part might be smaller than partSize, so check to make sure // that lastByte isn't beyond the end of the object. val lastByte = Math.min(bytePosition + partSize - 1, objectSize - 1); // Copy this part. val copyRequest = new CopyPartRequest() .withSourceBucketName(fromS3BucketName) .withSourceKey(sourceS3Path) .withDestinationBucketName(toS3BucketName) .withDestinationKey(targetS3Path) .withUploadId(initResponse.getUploadId()) .withFirstByte(bytePosition) .withLastByte(lastByte) .withPartNumber(partNum + 1); partNum += 1; copyResponses.add(s3client.copyPart(copyRequest)); bytePosition += partSize; } // Complete the upload request to concatenate all uploaded parts and make the copied object available. val completeRequest = new CompleteMultipartUploadRequest( toS3BucketName, targetS3Path, initResponse.getUploadId(), getETags(copyResponses)); s3client.completeMultipartUpload(completeRequest); logger.info("Multipart upload complete."); } // This is a helper function to construct a list of ETags. def getETags(responses: java.util.List[CopyPartResult]): ArrayList[PartETag] = { var etags = new ArrayList[PartETag](); val it = responses.iterator(); while (it.hasNext()) { val response = it.next(); etags.add(new PartETag(response.getPartNumber(), response.getETag())); } return etags; }
S3 List Objects
In the below listObject method we will be listing objects/files matching a given pattern.
/** * Get the list of Files from S3 path matching the pattern. * * @param s3Path to search * @param pattern file pattern to find * @param bucketName s3 bucket name */ @throws(classOf[Exception]) @throws(classOf[AmazonServiceException]) def listObjects(s3Path: String, pattern: String, s3BucketName: String): Set[String] = { val fileList = Set[String](); val allFiles = Set[String](); val req = new ListObjectsV2Request().withBucketName(s3BucketName).withPrefix(s3Path) var res: ListObjectsV2Result = null.asInstanceOf[ListObjectsV2Result]; do { res = s3client.listObjectsV2(req); val it = res.getObjectSummaries.iterator while (it.hasNext) { val fileName = it.next().getKey.split("/").last; if (pattern.equals("*")) { fileList += fileName; } else if (fileName.contains(pattern)) { fileList += fileName; } allFiles += fileName; } // If there are more than maxKeys keys in the bucket, get a continuation token and list the next objects. val token = res.getNextContinuationToken(); logger.info("Next Continuation Token for s3 lookup: " + token); req.setContinuationToken(token); } while (res.isTruncated()); if (fileList.size == 0) { logger.info(s"No files identified at $s3Path!"); logger.debug(s" List of all the files in the path: $allFiles") } else logger.info("Files identified on S3: " + fileList.mkString(", ")) fileList; }
S3 Move
There is not out of the box move function in S3, hence we will be mimicking the move process by first copying the object to target and the deleting the source object using the deleteObject method that we defined above.
/** * Get the list of Files from S3 path matching the pattern. * * @param sourceS3Path S3 object key * @param targetS3Path S3 object key * @param fromBucketName bucket name * @param toBucketName bucket name */ @throws(classOf[Exception]) @throws(classOf[AmazonServiceException]) def moveObject(sourceS3Path: String, targetS3Path: String, fromBucketName: String, toBucketName: String) { logger.info(s"Moving S3 frile from $sourceS3Path ==> $targetS3Path") // Get the object size to track the end of the copy operation. var metadataResult = getS3ObjectMetadata(sourceS3Path, fromBucketName); var objectSize = metadataResult.getContentLength(); if (objectSize > ALLOWED_OBJECT_SIZE) { logger.info("Object size is greater than 1GB. Initiating multipart upload."); copyMultipart(sourceS3Path, targetS3Path, fromBucketName, toBucketName); } else { s3client.copyObject(fromBucketName, sourceS3Path, toBucketName, targetS3Path); } // Delete source object after successful copy s3client.deleteObject(fromS3BucketName, sourceS3Path); }
I hope these examples gives you an idea on how to use AWS SDK to perform operations on S3 in Scala programming language. You can apply the same logic for Java applications.
Thanks,
Team DataHackr