Unverified Commit a504b937 authored by Lau Ching Jun's avatar Lau Ching Jun Committed by GitHub

Addresses the feedbacks in #95738 (#97457)

parent 13f106b0
......@@ -1340,12 +1340,38 @@ class ProxyDomain extends Domain {
Future<String> connect(Map<String, dynamic> args) async {
final int targetPort = _getIntArg(args, 'port', required: true);
final String id = 'portForwarder_${targetPort}_${_id++}';
final Socket socket = await Socket.connect('127.0.0.1', targetPort);
Socket socket;
try {
socket = await Socket.connect(InternetAddress.loopbackIPv4, targetPort);
} on SocketException {
globals.logger.printTrace('Connecting to localhost:$targetPort failed with IPv4');
}
try {
// If connecting to IPv4 loopback interface fails, try IPv6.
socket ??= await Socket.connect(InternetAddress.loopbackIPv6, targetPort);
} on SocketException {
globals.logger.printError('Connecting to localhost:$targetPort failed');
}
if (socket == null) {
throw Exception('Failed to connect to the port');
}
_forwardedConnections[id] = socket;
socket.listen((List<int> data) {
sendEvent('proxy.data.$id', null, data);
}, onError: (dynamic error, StackTrace stackTrace) {
// Socket error, probably disconnected.
globals.logger.printTrace('Socket error: $error, $stackTrace');
});
unawaited(socket.done.then((dynamic _) {
unawaited(socket.done.catchError((dynamic error, StackTrace stackTrace) {
// Socket error, probably disconnected.
globals.logger.printTrace('Socket error: $error, $stackTrace');
}).then((dynamic _) {
sendEvent('proxy.disconnected.$id');
}));
return id;
......@@ -1376,7 +1402,7 @@ class ProxyDomain extends Domain {
@override
Future<void> dispose() async {
for (final Socket connection in _forwardedConnections.values) {
await connection.close();
connection.destroy();
}
await _tempDirectory?.delete(recursive: true);
}
......
......@@ -98,39 +98,11 @@ class DaemonInputStreamConverter {
// Processes a single chunk received in the input stream.
void _processChunk(List<int> chunk) {
const int LF = 10; // The '\n' character
int start = 0;
while (start < chunk.length) {
if (state == _InputStreamParseState.json) {
// Search for newline character.
final int indexOfNewLine = chunk.indexOf(LF, start);
if (indexOfNewLine < 0) {
bytesBuilder.add(chunk.sublist(start));
start = chunk.length;
} else {
bytesBuilder.add(chunk.sublist(start, indexOfNewLine + 1));
start = indexOfNewLine + 1;
// Process chunk here
final Uint8List combinedChunk = bytesBuilder.takeBytes();
String jsonString = utf8.decode(combinedChunk).trim();
if (jsonString.startsWith('[{') && jsonString.endsWith('}]')) {
jsonString = jsonString.substring(1, jsonString.length - 1);
final Map<String, Object?>? value = castStringKeyedMap(json.decode(jsonString));
if (value != null) {
// Check if we need to consume another binary blob.
if (value[_binaryLengthKey] != null) {
remainingBinaryLength = value[_binaryLengthKey]! as int;
currentBinaryStream = StreamController<List<int>>();
state = _InputStreamParseState.binary;
_controller.add(DaemonMessage(value, currentBinaryStream.stream));
} else {
_controller.add(DaemonMessage(value));
}
}
}
}
start += _processChunkInJsonMode(chunk, start);
} else if (state == _InputStreamParseState.binary) {
final int bytesSent = _addBinaryChunk(chunk, start, remainingBinaryLength);
start += bytesSent;
......@@ -146,6 +118,41 @@ class DaemonInputStreamConverter {
}
}
/// Processes a chunk in JSON mode, and returns the number of bytes processed.
int _processChunkInJsonMode(List<int> chunk, int start) {
const int LF = 10; // The '\n' character
// Search for newline character.
final int indexOfNewLine = chunk.indexOf(LF, start);
if (indexOfNewLine < 0) {
bytesBuilder.add(chunk.sublist(start));
return chunk.length - start;
}
bytesBuilder.add(chunk.sublist(start, indexOfNewLine + 1));
// Process chunk here
final Uint8List combinedChunk = bytesBuilder.takeBytes();
String jsonString = utf8.decode(combinedChunk).trim();
if (jsonString.startsWith('[{') && jsonString.endsWith('}]')) {
jsonString = jsonString.substring(1, jsonString.length - 1);
final Map<String, Object?>? value = castStringKeyedMap(json.decode(jsonString));
if (value != null) {
// Check if we need to consume another binary blob.
if (value[_binaryLengthKey] != null) {
remainingBinaryLength = value[_binaryLengthKey]! as int;
currentBinaryStream = StreamController<List<int>>();
state = _InputStreamParseState.binary;
_controller.add(DaemonMessage(value, currentBinaryStream.stream));
} else {
_controller.add(DaemonMessage(value));
}
}
}
return indexOfNewLine + 1 - start;
}
int _addBinaryChunk(List<int> chunk, int start, int maximumSizeToRead) {
if (start == 0 && chunk.length <= remainingBinaryLength) {
currentBinaryStream.add(chunk);
......@@ -170,6 +177,32 @@ class DaemonStreams {
inputStream = DaemonInputStreamConverter(rawInputStream).convertedStream,
_logger = logger;
/// Creates a [DaemonStreams] that uses stdin and stdout as the underlying streams.
DaemonStreams.fromStdio(Stdio stdio, { required Logger logger })
: this(stdio.stdin, stdio.stdout, logger: logger);
/// Creates a [DaemonStreams] that uses [Socket] as the underlying streams.
DaemonStreams.fromSocket(Socket socket, { required Logger logger })
: this(socket, socket, logger: logger);
/// Connects to a server and creates a [DaemonStreams] from the connection as the underlying streams.
factory DaemonStreams.connect(String host, int port, { required Logger logger }) {
final Future<Socket> socketFuture = Socket.connect(host, port);
final StreamController<List<int>> inputStreamController = StreamController<List<int>>();
final StreamController<List<int>> outputStreamController = StreamController<List<int>>();
socketFuture.then((Socket socket) {
inputStreamController.addStream(socket);
socket.addStream(outputStreamController.stream);
}).onError((Object error, StackTrace stackTrace) {
logger.printError('Socket error: $error');
logger.printTrace('$stackTrace');
// Propagate the error to the streams.
inputStreamController.addError(error, stackTrace);
unawaited(outputStreamController.close());
});
return DaemonStreams(inputStreamController.stream, outputStreamController.sink, logger: logger);
}
final StreamSink<List<int>> _outputSink;
final Logger _logger;
......@@ -197,28 +230,6 @@ class DaemonStreams {
Future<void> dispose() async {
unawaited(_outputSink.close());
}
/// Creates a [DaemonStreams] that uses stdin and stdout as the underlying streams.
static DaemonStreams fromStdio(Stdio stdio, { required Logger logger }) {
return DaemonStreams(stdio.stdin, stdio.stdout, logger: logger);
}
/// Creates a [DaemonStreams] that uses [Socket] as the underlying streams.
static DaemonStreams fromSocket(Socket socket, { required Logger logger }) {
return DaemonStreams(socket, socket, logger: logger);
}
/// Connects to a server and creates a [DaemonStreams] from the connection as the underlying streams.
static DaemonStreams connect(String host, int port, { required Logger logger }) {
final Future<Socket> socketFuture = Socket.connect(host, port);
final StreamController<List<int>> inputStreamController = StreamController<List<int>>();
final StreamController<List<int>> outputStreamController = StreamController<List<int>>();
socketFuture.then((Socket socket) {
inputStreamController.addStream(socket);
socket.addStream(outputStreamController.stream);
});
return DaemonStreams(inputStreamController.stream, outputStreamController.sink, logger: logger);
}
}
/// Connection between a flutter daemon and a client.
......
......@@ -39,15 +39,11 @@ class Category {
String toString() => value;
static Category? fromString(String category) {
switch (category) {
case 'web':
return web;
case 'desktop':
return desktop;
case 'mobile':
return mobile;
}
return null;
return <String, Category>{
'web': web,
'desktop': desktop,
'mobile': mobile,
}[category];
}
}
......@@ -70,25 +66,16 @@ class PlatformType {
String toString() => value;
static PlatformType? fromString(String platformType) {
switch (platformType) {
case 'web':
return web;
case 'android':
return android;
case 'ios':
return ios;
case 'linux':
return linux;
case 'macos':
return macos;
case 'windows':
return windows;
case 'fuchsia':
return fuchsia;
case 'custom':
return custom;
}
return null;
return <String, PlatformType>{
'web': web,
'android': android,
'ios': ios,
'linux': linux,
'macos': macos,
'windows': windows,
'fuchsia': fuchsia,
'custom': custom,
}[platformType];
}
}
......
......@@ -11,6 +11,8 @@
@Tags(<String>['no-shuffle'])
import 'dart:async';
import 'dart:io' as io;
import 'dart:typed_data';
import 'package:fake_async/fake_async.dart';
import 'package:file/src/interface/file.dart';
......@@ -564,6 +566,115 @@ void main() {
}, overrides: <Type, Generator>{
DevtoolsLauncher: () => FakeDevtoolsLauncher(null),
});
testUsingContext('proxy.connect tries to connect to an ipv4 address and proxies the connection correctly', () async {
final TestIOOverrides ioOverrides = TestIOOverrides();
await io.IOOverrides.runWithIOOverrides(() async {
final FakeSocket socket = FakeSocket();
bool connectCalled = false;
int connectPort;
ioOverrides.connectCallback = (dynamic host, int port) async {
connectCalled = true;
connectPort = port;
if (host == io.InternetAddress.loopbackIPv4) {
return socket;
}
throw const io.SocketException('fail');
};
daemon = Daemon(
daemonConnection,
notifyingLogger: notifyingLogger,
);
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': 0, 'method': 'proxy.connect', 'params': <String, dynamic>{'port': 123}}));
final Stream<DaemonMessage> broadcastOutput = daemonStreams.outputs.stream.asBroadcastStream();
final DaemonMessage firstResponse = await broadcastOutput.firstWhere(_notEvent);
expect(firstResponse.data['id'], 0);
expect(firstResponse.data['result'], isNotNull);
expect(connectCalled, true);
expect(connectPort, 123);
final Object id = firstResponse.data['result'];
// Can send received data as event.
socket.controller.add(Uint8List.fromList(<int>[10, 11, 12]));
final DaemonMessage dataEvent = await broadcastOutput.firstWhere(
(DaemonMessage message) => message.data['event'] != null && message.data['event'] == 'proxy.data.$id',
);
expect(dataEvent.binary, isNotNull);
final List<List<int>> data = await dataEvent.binary.toList();
expect(data[0], <int>[10, 11, 12]);
// Can proxy data to the socket.
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': 0, 'method': 'proxy.write', 'params': <String, dynamic>{'id': id}}, Stream<List<int>>.value(<int>[21, 22, 23])));
await pumpEventQueue();
expect(socket.addedData[0], <int>[21, 22, 23]);
// Closes the connection when disconnect request received.
expect(socket.closeCalled, false);
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': 0, 'method': 'proxy.disconnect', 'params': <String, dynamic>{'id': id}}));
await pumpEventQueue();
expect(socket.closeCalled, true);
// Sends disconnected event when socket.done completer finishes.
socket.doneCompleter.complete();
final DaemonMessage disconnectEvent = await broadcastOutput.firstWhere(
(DaemonMessage message) => message.data['event'] != null && message.data['event'] == 'proxy.disconnected.$id',
);
expect(disconnectEvent.data, isNotNull);
}, ioOverrides);
});
testUsingContext('proxy.connect connects to ipv6 if ipv4 failed', () async {
final TestIOOverrides ioOverrides = TestIOOverrides();
await io.IOOverrides.runWithIOOverrides(() async {
final FakeSocket socket = FakeSocket();
bool connectIpv4Called = false;
int connectPort;
ioOverrides.connectCallback = (dynamic host, int port) async {
connectPort = port;
if (host == io.InternetAddress.loopbackIPv4) {
connectIpv4Called = true;
} else if (host == io.InternetAddress.loopbackIPv6) {
return socket;
}
throw const io.SocketException('fail');
};
daemon = Daemon(
daemonConnection,
notifyingLogger: notifyingLogger,
);
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': 0, 'method': 'proxy.connect', 'params': <String, dynamic>{'port': 123}}));
final Stream<DaemonMessage> broadcastOutput = daemonStreams.outputs.stream.asBroadcastStream();
final DaemonMessage firstResponse = await broadcastOutput.firstWhere(_notEvent);
expect(firstResponse.data['id'], 0);
expect(firstResponse.data['result'], isNotNull);
expect(connectIpv4Called, true);
expect(connectPort, 123);
}, ioOverrides);
});
testUsingContext('proxy.connect fails if both ipv6 and ipv4 failed', () async {
final TestIOOverrides ioOverrides = TestIOOverrides();
await io.IOOverrides.runWithIOOverrides(() async {
ioOverrides.connectCallback = (dynamic host, int port) => throw const io.SocketException('fail');
daemon = Daemon(
daemonConnection,
notifyingLogger: notifyingLogger,
);
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': 0, 'method': 'proxy.connect', 'params': <String, dynamic>{'port': 123}}));
final Stream<DaemonMessage> broadcastOutput = daemonStreams.outputs.stream.asBroadcastStream();
final DaemonMessage firstResponse = await broadcastOutput.firstWhere(_notEvent);
expect(firstResponse.data['id'], 0);
expect(firstResponse.data['result'], isNull);
expect(firstResponse.data['error'], isNotNull);
}, ioOverrides);
});
});
testUsingContext('notifyingLogger outputs trace messages in verbose mode', () async {
......@@ -851,3 +962,46 @@ class FakeApplicationPackageFactory implements ApplicationPackageFactory {
}
class FakeApplicationPackage extends Fake implements ApplicationPackage {}
class TestIOOverrides extends io.IOOverrides {
Future<io.Socket> Function(dynamic host, int port) connectCallback;
@override
Future<io.Socket> socketConnect(dynamic host, int port,
{dynamic sourceAddress, int sourcePort = 0, Duration timeout}) {
return connectCallback(host, port);
}
}
class FakeSocket extends Fake implements io.Socket {
bool closeCalled = false;
final StreamController<Uint8List> controller = StreamController<Uint8List>();
final List<List<int>> addedData = <List<int>>[];
final Completer<bool> doneCompleter = Completer<bool>();
@override
StreamSubscription<Uint8List> listen(
void Function(Uint8List event) onData, {
Function onError,
void Function() onDone,
bool cancelOnError,
}) {
return controller.stream.listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
@override
void add(List<int> data) {
addedData.add(data);
}
@override
Future<void> close() async {
closeCalled = true;
}
@override
Future<bool> get done => doneCompleter.future;
@override
void destroy() {}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment