Flink协调器Coordinator及自定义Operator
最近的项目开发过程中,使用到了Flink中的协调器以及自定义算子相关的内容,本篇文章主要介绍Flink中的协调器是什么,如何用,以及协调器与算子间的交互。
协调器Coordinator
Flink中的协调器是用来协调运行时的算子,运行在JobManager中,通过事件的方式与算子通信。例如Source和Sink算子中的协调器是用来发现和分配工作或者聚合和提交元数据。
线程模型
所有协调器方法都由作业管理器的主线程(邮箱线程)调用。这意味着这些方法在任何情况下都不得执行阻塞操作(如 I/ O 或等待锁或或Futures)。这很有可能使整个 JobManager 瘫痪。
因此,涉及更复杂操作的协调器应生成线程来处理 I/ O 工作。上 OperatorCoordinator. Context 的方法可以安全地从另一个线程调用,而不是从调用协调器方法的线程调用。
一致性
与调度程序的视图相比,协调器对任务执行的视图高度简化,但允许与在并行子任务上运行的操作员进行一致的交互。具体而言,保证严格按顺序调用以下方法:
- executionAttemptReady(int, int, OperatorCoordinator.SubtaskGateway):在子任务就绪的时候调用一次。SubtaskGateway是用来与子任务交互的网关。这是与子任务尝试交互的开始。
executionAttemptFailed(int, int, Throwable):在尝试失败或取消后立即调用每个子任务。此时,应停止与子任务尝试的交互。 - subtaskReset(int, long) 或 resetToCheckpoint(long, byte[]):一旦调度程序确定了要还原的检查点,这些方法就会通知协调器。前一种方法在发生区域故障/ 恢复(可能影响子任务的子集)时调用,后一种方法在全局故障/ 恢复的情况下调用。此方法应用于确定要恢复的操作,因为它会告诉要回退到哪个检查点。协调器实现需要恢复自还原的检查点以来与相关任务的交互。只有在子任务的所有尝试被调用后 executionAttemptFailed(int, int, Throwable) ,才会调用它。
- executionAttemptReady(int, int, OperatorCoordinator. SubtaskGateway):在恢复的任务(新尝试)准备就绪后再次调用。这晚于 subtaskReset(int, long),因为在这些方法之间,会计划和部署新的尝试。
接口方法说明
实现自定义的协调器需要实现OperatorCoordinator接口方法,各方法说明如下所示:
public interface OperatorCoordinator extends CheckpointListener, AutoCloseable {
// ------------------------------------------------------------------------
/**
* 启动协调器,启动时调用一次当前方法在所有方法之前
* 此方法抛出的异常都会导致当前作业失败
*/
void start() throws Exception;
/**
* 释放协调器时调用当前方法,此方法应当释放持有的资源
* 此方法抛出的异常不会导致作业失败
*/
@Override
void close() throws Exception;
// ------------------------------------------------------------------------
/**
* 处理来自并行算子实例的事件
* 此方法抛出的异常会导致作业失败并恢复
*/
void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
throws Exception;
// ------------------------------------------------------------------------
/**
* 为协调器做checkpoint,将当前协调器中的状态序列化到checkpoint中,执行成功需要调用CompletableFuture的complete方法,失败需要调用CompletableFuture的completeExceptionally方法
*/
void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture)
throws Exception;
/**
* We override the method here to remove the checked exception. Please check the Java docs of
* {@link CheckpointListener#notifyCheckpointComplete(long)} for more detail semantic of the
* method.
*/
@Override
void notifyCheckpointComplete(long checkpointId);
/**
* We override the method here to remove the checked exception. Please check the Java docs of
* {@link CheckpointListener#notifyCheckpointAborted(long)} for more detail semantic of the
* method.
*/
@Override
default void notifyCheckpointAborted(long checkpointId) {}
/**
* 从checkpoint重置当前的协调器
*/
void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception;
// ------------------------------------------------------------------------
/**
* 子任务重置时调用此方法
*/
void subtaskReset(int subtask, long checkpointId);
/**
* 子任务失败时调用此方法
*/
void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason);
/**
* 子任务就绪时调用此方法
*/
void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway);
}
算子Operator
Flink中执行计算任务的算子,像使用DataStream API时调用的map、flatmap、process传入的自定义函数最终都会封装为一个一个的算子。使用UDF已经能够满足大多数的开发场景,但涉及到与协调器打交道时需要自定义算子,自定义算子相对比较好简单,具体可以参考org.apache.flink.streaming.api.operators.KeyedProcessOperator的实现。
自定义算子需要实现AbstractStreamOperator和OneInputStreamOperator接口方法
实现定时器功能,需要实现Triggerable接口方法
实现处理协调器的事件功能,需要实现OperatorEventHandler接口方法
示例
自定义算子
这里实现一个自定义的算子,用来处理KeyedStream的数据,它能够接受来自协调器的事件,并且能够给协调器发送事件。
MyKeyedProcessOperator实现代码如下:
package com.examples.operator;
import com.examples.event.MyEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.operators.*;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 自定义的KeyedProcessOperator
* @author shirukai
*/
public class MyKeyedProcessOperator<KEY, IN, OUT> extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT>,
Triggerable<KEY, VoidNamespace>,
OperatorEventHandler {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(MyKeyedProcessOperator.class);
private transient TimestampedCollector<OUT> collector;
private transient TimerService timerService;
private final OperatorEventGateway operatorEventGateway;
public MyKeyedProcessOperator(ProcessingTimeService processingTimeService, OperatorEventGateway operatorEventGateway) {
this.processingTimeService = processingTimeService;
this.operatorEventGateway = operatorEventGateway;
}
@Override
public void open() throws Exception {
super.open();
collector = new TimestampedCollector<>(output);
InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
timerService = new SimpleTimerService(internalTimerService);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
LOG.info("processElement: {}", element);
collector.setTimestamp(element);
// 注册事件时间定时器
timerService.registerEventTimeTimer(element.getTimestamp() + 10);
// 注册处理时间定时器
timerService.registerProcessingTimeTimer(element.getTimestamp() + 100);
// 给协调器发送消息
operatorEventGateway.sendEventToCoordinator(new MyEvent("hello,I'm from operator"));
// 不做任何处理直接发送到下游
collector.collect((OUT) element.getValue());
}
@Override
public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
LOG.info("onEventTime: {}", timer);
}
@Override
public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
LOG.info("onProcessingTime: {}", timer);
}
@Override
public void handleOperatorEvent(OperatorEvent evt) {
LOG.info("handleOperatorEvent: {}", evt);
}
}
算子工厂类MyKeyedProcessOperatorFactory:
package com.examples.operator;
import com.examples.coordinator.MyKeyedProcessCoordinatorProvider;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.streaming.api.operators.*;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
/**
* 自定义算子工厂类
* @author shirukai
*/
public class MyKeyedProcessOperatorFactory<IN> extends AbstractStreamOperatorFactory<IN>
implements OneInputStreamOperatorFactory<IN, IN>,
CoordinatedOperatorFactory<IN>,
ProcessingTimeServiceAware {
@Override
public OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) {
return new MyKeyedProcessCoordinatorProvider(operatorName, operatorID);
}
@Override
public <T extends StreamOperator<IN>> T createStreamOperator(StreamOperatorParameters<IN> parameters) {
final OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
final OperatorEventGateway gateway =
parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);
try {
final MyKeyedProcessOperator<?, IN, IN> operator = new MyKeyedProcessOperator<>(processingTimeService, gateway);
operator.setup(
parameters.getContainingTask(),
parameters.getStreamConfig(),
parameters.getOutput());
parameters
.getOperatorEventDispatcher()
.registerEventHandler(operatorId, operator);
return (T) operator;
} catch (Exception e) {
throw new IllegalStateException(
"Cannot create operator for "
+ parameters.getStreamConfig().getOperatorName(),
e);
}
}
@Override
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return MyKeyedProcessOperator.class;
}
}
自定义协调器
协调器执行器线程工厂类CoordinatorExecutorThreadFactory,当前类可以通用,用来创建协调器线程。
package com.examples.coordinator;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.FatalExitExceptionHandler;
import javax.annotation.Nullable;
import java.util.concurrent.ThreadFactory;
/**
* A thread factory class that provides some helper methods.
*/
public class CoordinatorExecutorThreadFactory
implements ThreadFactory, Thread.UncaughtExceptionHandler {
private final String coordinatorThreadName;
private final ClassLoader classLoader;
private final Thread.UncaughtExceptionHandler errorHandler;
@Nullable
private Thread thread;
// TODO discuss if we should fail the job(JM may restart the job later) or directly kill JM
// process
// Currently we choose to directly kill JM process
CoordinatorExecutorThreadFactory(
final String coordinatorThreadName, final ClassLoader contextClassLoader) {
this(coordinatorThreadName, contextClassLoader, FatalExitExceptionHandler.INSTANCE);
}
@VisibleForTesting
CoordinatorExecutorThreadFactory(
final String coordinatorThreadName,
final ClassLoader contextClassLoader,
final Thread.UncaughtExceptionHandler errorHandler) {
this.coordinatorThreadName = coordinatorThreadName;
this.classLoader = contextClassLoader;
this.errorHandler = errorHandler;
}
@Override
public synchronized Thread newThread(Runnable r) {
thread = new Thread(r, coordinatorThreadName);
thread.setContextClassLoader(classLoader);
thread.setUncaughtExceptionHandler(this);
return thread;
}
@Override
public synchronized void uncaughtException(Thread t, Throwable e) {
errorHandler.uncaughtException(t, e);
}
public String getCoordinatorThreadName() {
return coordinatorThreadName;
}
boolean isCurrentThreadCoordinatorThread() {
return Thread.currentThread() == thread;
}
}
协调器上下文CoordinatorContext,当前类可以通用。
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.examples.coordinator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
* A context class for the {@link OperatorCoordinator}.
*
* <p>The context serves a few purposes:
*
* <ul>
* <li>Thread model enforcement - The context ensures that all the manipulations to the
* coordinator state are handled by the same thread.
* </ul>
*/
@Internal
public class CoordinatorContext implements AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(CoordinatorContext.class);
private final ScheduledExecutorService coordinatorExecutor;
private final ScheduledExecutorService workerExecutor;
private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;
private final OperatorCoordinator.Context operatorCoordinatorContext;
private final Map<Integer, OperatorCoordinator.SubtaskGateway> subtaskGateways;
public CoordinatorContext(
CoordinatorExecutorThreadFactory coordinatorThreadFactory,
OperatorCoordinator.Context operatorCoordinatorContext) {
this(
Executors.newScheduledThreadPool(1, coordinatorThreadFactory),
Executors.newScheduledThreadPool(
1,
new ExecutorThreadFactory(
coordinatorThreadFactory.getCoordinatorThreadName() + "-worker")),
coordinatorThreadFactory,
operatorCoordinatorContext);
}
public CoordinatorContext(
ScheduledExecutorService coordinatorExecutor,
ScheduledExecutorService workerExecutor,
CoordinatorExecutorThreadFactory coordinatorThreadFactory,
OperatorCoordinator.Context operatorCoordinatorContext) {
this.coordinatorExecutor = coordinatorExecutor;
this.workerExecutor = workerExecutor;
this.coordinatorThreadFactory = coordinatorThreadFactory;
this.operatorCoordinatorContext = operatorCoordinatorContext;
this.subtaskGateways = new HashMap<>(operatorCoordinatorContext.currentParallelism());
}
@Override
public void close() throws InterruptedException {
// Close quietly so the closing sequence will be executed completely.
ComponentClosingUtils.shutdownExecutorForcefully(
workerExecutor, Duration.ofNanos(Long.MAX_VALUE));
ComponentClosingUtils.shutdownExecutorForcefully(
coordinatorExecutor, Duration.ofNanos(Long.MAX_VALUE));
}
public void runInCoordinatorThread(Runnable runnable) {
coordinatorExecutor.execute(runnable);
}
// --------- Package private methods for the DynamicCepOperatorCoordinator ------------
ClassLoader getUserCodeClassloader() {
return this.operatorCoordinatorContext.getUserCodeClassloader();
}
void subtaskReady(OperatorCoordinator.SubtaskGateway gateway) {
final int subtask = gateway.getSubtask();
if (subtaskGateways.get(subtask) == null) {
subtaskGateways.put(subtask, gateway);
} else {
throw new IllegalStateException("Already have a subtask gateway for " + subtask);
}
}
void subtaskNotReady(int subtaskIndex) {
subtaskGateways.put(subtaskIndex, null);
}
Set<Integer> getSubtasks() {
return subtaskGateways.keySet();
}
public void sendEventToOperator(int subtaskId, OperatorEvent event) {
callInCoordinatorThread(
() -> {
final OperatorCoordinator.SubtaskGateway gateway =
subtaskGateways.get(subtaskId);
if (gateway == null) {
LOG.warn(
String.format(
"Subtask %d is not ready yet to receive events.",
subtaskId));
} else {
gateway.sendEvent(event);
}
return null;
},
String.format("Failed to send event %s to subtask %d", event, subtaskId));
}
/**
* Fail the job with the given cause.
*
* @param cause the cause of the job failure.
*/
void failJob(Throwable cause) {
operatorCoordinatorContext.failJob(cause);
}
// ---------------- private helper methods -----------------
/**
* A helper method that delegates the callable to the coordinator thread if the current thread
* is not the coordinator thread, otherwise call the callable right away.
*
* @param callable the callable to delegate.
*/
private <V> V callInCoordinatorThread(Callable<V> callable, String errorMessage) {
// Ensure the split assignment is done by the coordinator executor.
if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()
&& !coordinatorExecutor.isShutdown()) {
try {
final Callable<V> guardedCallable =
() -> {
try {
return callable.call();
} catch (Throwable t) {
LOG.error(
"Uncaught Exception in Coordinator Executor",
t);
ExceptionUtils.rethrowException(t);
return null;
}
};
return coordinatorExecutor.submit(guardedCallable).get();
} catch (InterruptedException | ExecutionException e) {
throw new FlinkRuntimeException(errorMessage, e);
}
}
try {
return callable.call();
} catch (Throwable t) {
LOG.error("Uncaught Exception in Source Coordinator Executor", t);
throw new FlinkRuntimeException(errorMessage, t);
}
}
}
自定义协调器
package com.examples.coordinator;
import com.examples.event.MyEvent;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.concurrent.CompletableFuture;
/**
* 自定义协调器
* 需要实现 OperatorCoordinator 接口
* @author shirukai
*/
public class MyKeyedProcessCoordinator implements OperatorCoordinator {
private static final Logger LOG = LoggerFactory.getLogger(MyKeyedProcessCoordinator.class);
/**
* The name of the operator this RuleDistributorCoordinator is associated with.
*/
private final String operatorName;
private final CoordinatorContext context;
private boolean started;
public MyKeyedProcessCoordinator(String operatorName, CoordinatorContext context) {
this.operatorName = operatorName;
this.context = context;
}
@Override
public void start() throws Exception {
LOG.info(
"Starting Coordinator for {}: {}.",
this.getClass().getSimpleName(),
operatorName);
// we mark this as started first, so that we can later distinguish the cases where 'start()'
// wasn't called and where 'start()' failed.
started = true;
runInEventLoop(
() -> {
LOG.info("Coordinator started.");
},
"do something for coordinator.");
}
@Override
public void close() throws Exception {
}
@Override
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception {
LOG.info("Received event {} from operator {}.", event, subtask);
}
@Override
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception {
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
}
@Override
public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {
}
@Override
public void subtaskReset(int subtask, long checkpointId) {
LOG.info(
"Recovering subtask {} to checkpoint {} for operator {} to checkpoint.",
subtask,
checkpointId,
operatorName);
runInEventLoop(
() -> {
},
"making event gateway to subtask %d available",
subtask);
}
@Override
public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {
runInEventLoop(
() -> {
LOG.info(
"Removing itself after failure for subtask {} of operator {}.",
subtask,
operatorName);
context.subtaskNotReady(subtask);
},
"handling subtask %d failure",
subtask);
}
@Override
public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {
assert subtask == gateway.getSubtask();
LOG.debug("Subtask {} of operator {} is ready.", subtask, operatorName);
runInEventLoop(
() -> {
context.subtaskReady(gateway);
sendEventToOperator(new MyEvent("hello,I'm from coordinator"));
},
"making event gateway to subtask %d available",
subtask);
}
private void sendEventToOperator(OperatorEvent event) {
for (Integer subtask : context.getSubtasks()) {
try {
context.sendEventToOperator(subtask, event);
} catch (Exception e) {
LOG.error(
"Failed to send OperatorEvent to operator {}",
operatorName,
e);
context.failJob(e);
return;
}
}
}
private void runInEventLoop(
final ThrowingRunnable<Throwable> action,
final String actionName,
final Object... actionNameFormatParameters) {
ensureStarted();
context.runInCoordinatorThread(
() -> {
try {
action.run();
} catch (Throwable t) {
// If we have a JVM critical error, promote it immediately, there is a good
// chance the logging or job failing will not succeed any more
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
final String actionString =
String.format(actionName, actionNameFormatParameters);
LOG.error(
"Uncaught exception in the coordinator for {} while {}. Triggering job failover.",
operatorName,
actionString,
t);
context.failJob(t);
}
});
}
private void ensureStarted() {
if (!started) {
throw new IllegalStateException("The coordinator has not started yet.");
}
}
}
自定义协调器提供器
package com.examples.coordinator;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
/**
* 自定义协调器的提供者
*
* @author shirukai
*/
public class MyKeyedProcessCoordinatorProvider extends RecreateOnResetOperatorCoordinator.Provider {
private static final long serialVersionUID = 1L;
private final String operatorName;
public MyKeyedProcessCoordinatorProvider(String operatorName, OperatorID operatorID) {
super(operatorID);
this.operatorName = operatorName;
}
@Override
protected OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) throws Exception {
final String coordinatorThreadName = " MyKeyedProcessCoordinator-" + operatorName;
CoordinatorExecutorThreadFactory coordinatorThreadFactory =
new CoordinatorExecutorThreadFactory(
coordinatorThreadName, context.getUserCodeClassloader());
CoordinatorContext coordinatorContext =
new CoordinatorContext(coordinatorThreadFactory, context);
return new MyKeyedProcessCoordinator(
operatorName, coordinatorContext);
}
}
执行测试
package com.examples;
import com.examples.operator.MyKeyedProcessOperatorFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author shirukai
*/
public class CoordinatorExamples {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource<MyData> source = env
.fromElements(new MyData(1, 1.0), new MyData(2, 2.0), new MyData(1, 3.0));
MyKeyedProcessOperatorFactory<MyData> operatorFactory = new MyKeyedProcessOperatorFactory<>();
source
.keyBy((KeySelector<MyData, Integer>) MyData::getId)
.transform("MyKeyedProcess", TypeInformation.of(MyData.class), operatorFactory)
.print();
env.execute();
}
public static class MyData {
private Integer id;
private Double value;
public MyData(Integer id, Double value) {
this.id = id;
this.value = value;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Double getValue() {
return value;
}
public void setValue(Double value) {
this.value = value;
}
@Override
public String toString() {
return "MyData{" +
"id=" + id +
", value=" + value +
'}';
}
}
}