创建新信息#

这是一个如何用Flower在服务器和客户端之间创建新类型的信息的简要指导。

假设我们在脚本code:`server.py`和code:`numpy_client.py`中有以下的示例函数...

在服务器端:

def example_request(self, client: ClientProxy) -> Tuple[str, int]:
    question = "Could you find the sum of the list, Bob?"
    l = [1, 2, 3]
    return client.request(question, l)

在客户端:

def example_response(self, question: str, l: List[int]) -> Tuple[str, int]:
    response = "Here you go Alice!"
    answer = sum(question)
    return response, answer

现在让我们来看看,为了让服务器和客户端之间的这个简单的函数正常工作,我们需要实现哪些功能!

协议缓冲区的信息类型#

The first thing we need to do is to define a message type for the RPC system in transport.proto. Note that we have to do it for both the request and response messages. For more details on the syntax of proto3, please see the official documentation.

ServerMessage 代码块中:

message ExampleIns{
    string question=1;
    repeated int64 l=2;
}
oneof msg {
    ReconnectIns reconnect_ins = 1;
    GetPropertiesIns get_properties_ins = 2;
    GetParametersIns get_parameters_ins = 3;
    FitIns fit_ins = 4;
    EvaluateIns evaluate_ins = 5;
    ExampleIns example_ins = 6;
}

在 ClientMessage 代码块中:

message ExampleRes{
    string response = 1;
    int64 answer = 2;
}

oneof msg {
    DisconnectRes disconnect_res = 1;
    GetPropertiesRes get_properties_res = 2;
    GetParametersRes get_parameters_res = 3;
    FitRes fit_res = 4;
    EvaluateRes evaluate_res = 5;
    ExampleRes examples_res = 6;
}

确保在 oneof msg 中也添加一个新创建的消息类型字段。

完成后,我们将使用:

$ python -m flwr_tool.protoc

如果编译成功,你应该会看到以下信息:

Writing mypy to flwr/proto/transport_pb2.pyi
Writing mypy to flwr/proto/transport_pb2_grpc.pyi

序列化和反序列化函数#

下一步是添加函数,以便将 Python 数据类型序列化和反序列化为我们定义的 RPC 消息类型或从我们定义的 RPC 消息类型反序列化和反序列化 Python 数据类型。您应该在 serde.py 中添加这些函数。

四种函数:

def example_msg_to_proto(question: str, l: List[int]) -> ServerMessage.ExampleIns:
    return ServerMessage.ExampleIns(question=question, l=l)


def example_msg_from_proto(msg: ServerMessage.ExampleIns) -> Tuple[str, List[int]]:
    return msg.question, msg.l


def example_res_to_proto(response: str, answer: int) -> ClientMessage.ExampleRes:
    return ClientMessage.ExampleRes(response=response, answer=answer)


def example_res_from_proto(res: ClientMessage.ExampleRes) -> Tuple[str, int]:
    return res.response, res.answer

从服务器发送信息#

现在,在客户端代理类(例如 grpc_client_proxy.py)中使用刚才创建的 serde 函数编写请求函数:

def request(self, question: str, l: List[int]) -> Tuple[str, int]:
    request_msg = serde.example_msg_to_proto(question, l)
    client_msg: ClientMessage = self.bridge.request(
        ServerMessage(example_ins=request_msg)
    )
    response, answer = serde.example_res_from_proto(client_msg.examples_res)
    return response, answer

由客户端接收信息#

最后一步 修改 message_handler.py 中的代码,检查信息的字段并调用 example_response 函数。记住使用 serde 函数!

在句柄函数内:

if server_msg.HasField("example_ins"):
    return _example_response(client, server_msg.example_ins), 0, True

并增加一个新函数:

def _example_response(client: Client, msg: ServerMessage.ExampleIns) -> ClientMessage:
    question,l = serde.evaluate_ins_from_proto(msg)
    response, answer = client.example_response(question,l)
    example_res = serde.example_res_to_proto(response,answer)
    return ClientMessage(examples_res=example_res)

希望您在运行程序时能得到预期的结果!

('Here you go Alice!', 6)