Commit 2462e153 authored by abdullh.alsoleman's avatar abdullh.alsoleman

Uploading

parent 8346357d
package com.example.UploadingFile_Client;
import Uploading_Service.FileMetadata;
import io.grpc.Context;
import io.grpc.Metadata;
public class Constant {
public static final Metadata.Key<byte[]> fileMetaDataKey = Metadata.Key.of("file-metadata-bin", Metadata.BINARY_BYTE_MARSHALLER);
public static final Context.Key<FileMetadata> fileMetaContext = Context.key("file-meta");
}
package com.example.UploadingFile_Client.Controllers;
import com.example.UploadingFile_Client.Services.UploadFileService;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
@RestController
public class FileUploadController {
private final UploadFileService uploadFileService;
public FileUploadController(UploadFileService uploadFileService) {
this.uploadFileService = uploadFileService;
}
@PostMapping(value = "/upload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public String uploadFile(@RequestParam("file") MultipartFile file) {
return this.uploadFileService.uploadFile(file);
}
}
package com.example.UploadingFile_Client.Services;
import Uploading_Service.*;
import com.example.UploadingFile_Client.Constant;
import com.google.protobuf.ByteString;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.CountDownLatch;
@Service
public class UploadFileService {
//Asynchronous stub of client
private final uploadingServiceGrpc.uploadingServiceStub client;
// @GrpcClient is help annotation refer to which server it requires to call
//file-upload is the key will be read from properties file
public UploadFileService(@GrpcClient(value = "file-upload") uploadingServiceGrpc.uploadingServiceStub client) {
this.client = client;
}
public String uploadFile(final MultipartFile multipartFile) {
//file name
String name;
//file size
int size;
//to read the bytes
InputStream inputStream;
name = multipartFile.getOriginalFilename();
try {
size = multipartFile.getBytes().length;
inputStream = multipartFile.getInputStream();
} catch (IOException e) {
return "unable to extract file info";
}
StringBuilder response = new StringBuilder();
CountDownLatch countDownLatch = new CountDownLatch(1);
Metadata metadata = new Metadata();
metadata.put(Constant.fileMetaDataKey,
FileMetadata.newBuilder()
.setFileNameWithType(name)
.setContentLength(size)
.build()
.toByteArray());
//using this object we will stream the file content to the server
StreamObserver<FileUploadRequest> fileUploadRequestStreamObserver = this.client
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
.uploadFile(
new StreamObserver<FileUploadResponse>() {
@Override
//called when server sends the response
public void onNext(FileUploadResponse fileUploadResponse) {
response.append(fileUploadResponse.getUploadStatus());
}
@Override
//called when server send an error
public void onError(Throwable throwable) {
response.append(UploadStatus.FAILED);
throwable.printStackTrace();
countDownLatch.countDown();
}
@Override
//called when server finish processing the request
public void onCompleted() {
countDownLatch.countDown();
}
});
//chunk data, i use the size 10 KB for each chunk
byte[] tenKB = new byte[5120];
int length;
try {
while ((length = inputStream.read(tenKB)) > 0) {
FileUploadRequest request = FileUploadRequest
.newBuilder()
.setFile(File.newBuilder().setContent(ByteString.copyFrom(tenKB, 0, length)))
.build();
//sending the request that contain the chunk data of file
fileUploadRequestStreamObserver.onNext(request);
}
inputStream.close();
//notify the server we have completed sending the chunk data
fileUploadRequestStreamObserver.onCompleted();
//waiting server response
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
response.append(UploadStatus.FAILED);
}
return response.toString();
}
}
......@@ -8,16 +8,13 @@ package Uploading_Service;
message File {
bytes content = 1;
}
message FileUploadRequest {
File file = 1;
}
enum UploadStatus {
SUCCESS = 0;
FAILED = 1;
}
message FileUploadResponse {
string fileName = 1;
UploadStatus uploadStatus = 2;
......
package com.example.UploadingFile_Server;
import Uploading_Service.FileMetadata;
import io.grpc.Context;
import io.grpc.Metadata;
public class Constant {
public static final Metadata.Key<byte[]> fileMetaDataKey = Metadata.Key.of("file-metadata-bin", Metadata.BINARY_BYTE_MARSHALLER);
public static final Context.Key<FileMetadata> fileMetaContext = Context.key("file-meta");
}
package com.example.UploadingFile_Server.Services;
import com.example.UploadingFile_Server.Constant;
import com.example.UploadingFile_Server.Store.FileStorage;
import Uploading_Service.*;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@GrpcService
public class UploadFileService extends uploadingServiceGrpc.uploadingServiceImplBase {
@Override
public StreamObserver<FileUploadRequest> uploadFile(StreamObserver<FileUploadResponse> responseObserver) {
FileMetadata fileMetadata = Constant.fileMetaContext.get();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
FileStorage fileStorage = new FileStorage();
return new StreamObserver<FileUploadRequest>() {
@Override
//Called when client sends data
public void onNext(FileUploadRequest fileUploadRequest) {
try {
fileUploadRequest.getFile().getContent()
.writeTo(byteArrayOutputStream);
} catch (IOException e) {
responseObserver.onError(io.grpc.Status.INTERNAL
.withDescription("a problem occur in writing data: " + e.getMessage())
.asRuntimeException());
}
}
@Override
//Called when client sends error
public void onError(Throwable throwable) {
System.out.println(throwable.toString());
}
@Override
//Called when client finish sending data
public void onCompleted() {
int allReceivedBytes = byteArrayOutputStream.size();
try {
if (allReceivedBytes == fileMetadata.getContentLength()) {
fileStorage.write(fileMetadata.getFileNameWithType(), byteArrayOutputStream);
byteArrayOutputStream.close();
} else {
responseObserver.onError(Status.FAILED_PRECONDITION
.withDescription(String.format("there is an different between expected bytes %d and received %d ", fileMetadata.getContentLength(), allReceivedBytes))
.asRuntimeException());
return;
}
} catch (IOException e) {
responseObserver.onError(io.grpc.Status.INTERNAL
.withDescription("There is a problem in saving data: " + e.getMessage())
.asRuntimeException());
return;
}
responseObserver.onNext(
FileUploadResponse
.newBuilder()
.setFileName(fileMetadata.getFileNameWithType())
.setUploadStatus(UploadStatus.SUCCESS)
.build()
);
responseObserver.onCompleted();
}
};
}
}
package com.example.UploadingFile_Server.Store;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
public class FileStorage {
private final String DEFAULT_PATH = "output/";
public void write(String fileNameWithType,ByteArrayOutputStream byteArrayOutputStream) throws IOException {
String filePath = DEFAULT_PATH + fileNameWithType;
try (BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(filePath))) {
byteArrayOutputStream.writeTo(bufferedOutputStream);
}
byteArrayOutputStream.close();
}
}
package com.example.UploadingFile_Server.interceptor;
import Uploading_Service.FileMetadata;
import com.example.UploadingFile_Server.Constant;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.*;
import net.devh.boot.grpc.server.interceptor.GrpcGlobalServerInterceptor;
@GrpcGlobalServerInterceptor
public class FileUploadInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
FileMetadata fileMetadata = null;
if (metadata.containsKey(Constant.fileMetaDataKey)) {
byte[] metaBytes = metadata.get(Constant.fileMetaDataKey);
try {
fileMetadata = FileMetadata.parseFrom(metaBytes);
} catch (InvalidProtocolBufferException e) {
Status status = Status.INTERNAL.withDescription("unable to create file metadata");
serverCall.close(status, metadata);
}
Context context = Context.current().withValue(
Constant.fileMetaContext,
fileMetadata
);
return Contexts.interceptCall(context, serverCall, metadata, serverCallHandler);
}
return new ServerCall.Listener<ReqT>() {
};
}
}
......@@ -8,16 +8,13 @@ package Uploading_Service;
message File {
bytes content = 1;
}
message FileUploadRequest {
File file = 1;
}
enum UploadStatus {
SUCCESS = 0;
FAILED = 1;
}
message FileUploadResponse {
string fileName = 1;
UploadStatus uploadStatus = 2;
......@@ -28,6 +25,7 @@ message FileMetadata {
int32 contentLength = 2;
}
service uploadingService {
rpc uploadFile (stream FileUploadRequest) returns (FileUploadResponse) {}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment